В RabbitMQ используются следующий обозначения. Продюсер - программа, которая посылает сообщения. Будем обозначать его так
Брокер(очередь) - собственно просто буфер в памяти без каких-либо ограничений на количество хранимых сообщений. В одну и ту же очередь могут отсылать сообщения несколько продюсеров, так же как несколько консьюмеров могут пытаться получить сообщения из одной и той же очереди. Очередь будет обозначена так(сверху указано имя очереди)
Консьюмер(получатель) - программа, которая принимает сообщения из очереди. Будем обозначать его так
Здесь важно отметить, что продюсер, консьюмер и брокер могут быть расположены на различных машинах, более того, в большинстве случаев это именно так.

Первый скрипт работы с очередью, своего рода "Hello world", будет отсылать текстовое сообщение с клиента, принимать его на сервере и выводить на экран.

Т.е. схема работы следующая: Первое, что надо сделать, это установить соединение с сервером RabbitMQ. Соединение устанавливается командами
$connection = new AMQPConnection($connection_params);
$connection->connect();
где
$connection_params = array(
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
);
это дефолтные значения. Если достаточно дефолтного значения любого из этих параметров, то его можно опустить. И, напротив, если, к примеру, нужно подключиться к другой машине, в параметре host необходимо указать ее имя или ip адрес.
Используя коннект можно получить объект для канала
$channel = new AMQPChannel($connection);
На основе полученного канала создаем обменник
$exchange = new AMQPExchange($channel);
$exchange->setName('ex_hello');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare();
и, собственно, саму очередь
$queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$queue->declare();
Когда обменник и очередь готовы, их можно связать по ключу
$queue->bind($exchange->getName(), 'foo_key');
Объявлять очередь и связывать ее с обменником можно как на продюсере, так и на консьюмере. Все зависит от того, что первым будет запускаться. Если неизвестно, то, возможно следует объявить и там и там. При этом имена очередей должны совпадать. Если имена очередей совпадают, то количество объявлений не имеет значения. Очередь с определенным именем может быть только одна.
Стоит отметить, что сообщение не может быть опубликовано напрямую в очередь, оно должно проходить через обменник. Собственно посредством обменника оно и публикуется
$result = $exchange->publish(json_encode("Hello world!"), "foo_key");
После того как сообщение отослано, коннект можно разорвать.
$connection->disconnect();
Получатель также должен выполнить ту же последовательность
- приконнектиться к серверу сообщений;
- создать канал;
- объявить обменник;
- объявить очередь;
- связать очередь с обменником по ключу
Последние два действия, как упоминалось выше, не обязательны. Теперь можно начать прослушивать очередь
while (true) {
if ($envelope = $queue->get(AMQP_AUTOACK)) {
$message = json_decode($envelope->getBody());
print($message);
}
}
Здесь методу get в качетсве параметра передается константа ARMQ_AUTOACK, которая оповещает сервер сообщений о том, что данное сообщение получено. Это самый простой способ удалить сообщение из очереди. Однако в данном случае в случае неудачной обработки сообщения, вернуть повторно его в очередь нельзя.
Таким образом, получаем два скрипта
send.php
$connection = new AMQPConnection($connection_params);
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('ex_hello');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare(); $queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$queue->declare();
$result = $exchange->publish(json_encode("Hello world!"), "foo_key");
$connection->disconnect();
receiver.php
$connection = new AMQPConnection($connection_params);
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('ex_hello');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare(); $queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$queue->declare();
$queue->bind($exchange->getName(), 'foo_key');
while (true) {
if ($envelope = $queue->get(AMQP_AUTOACK)) {
$message = json_decode($envelope->getBody());
print($message);
}
}
$connection->disconnect();