Урок 1. Hello world!

В RabbitMQ используются следующий обозначения. Продюсер - программа, которая посылает сообщения. Будем обозначать его так



Брокер(очередь) - собственно просто буфер в памяти без каких-либо ограничений на количество хранимых сообщений. В одну и ту же очередь могут отсылать сообщения несколько продюсеров, так же как несколько консьюмеров могут пытаться получить сообщения из одной и той же очереди. Очередь будет обозначена так(сверху указано имя очереди)



Консьюмер(получатель) - программа, которая принимает сообщения из очереди. Будем обозначать его так

Здесь важно отметить, что продюсер, консьюмер и брокер могут быть расположены на различных машинах, более того, в большинстве случаев это именно так.



Первый скрипт работы с очередью, своего рода "Hello world", будет отсылать текстовое сообщение с клиента, принимать его на сервере и выводить на экран.



Т.е. схема работы следующая: Первое, что надо сделать, это установить соединение с сервером RabbitMQ. Соединение устанавливается командами

$connection = new AMQPConnection($connection_params);
$connection->connect();

где

$connection_params = array(
  'host' => 'localhost',
  'port' => 5672,
  'vhost' => '/',
  'login' => 'guest',
  'password' => 'guest'
);

это дефолтные значения. Если достаточно дефолтного значения любого из этих параметров, то его можно опустить. И, напротив, если, к примеру, нужно подключиться к другой машине, в параметре host необходимо указать ее имя или ip адрес.
Используя коннект можно получить объект для канала

$channel = new AMQPChannel($connection);

На основе полученного канала создаем обменник

$exchange = new AMQPExchange($channel);
$exchange->setName('ex_hello');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare();

и, собственно, саму очередь

$queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$queue->declare();

Когда обменник и очередь готовы, их можно связать по ключу
$queue->bind($exchange->getName(), 'foo_key');

Объявлять очередь и связывать ее с обменником можно как на продюсере, так и на консьюмере. Все зависит от того, что первым будет запускаться. Если неизвестно, то, возможно следует объявить и там и там. При этом имена очередей должны совпадать. Если имена очередей совпадают, то количество объявлений не имеет значения. Очередь с определенным именем может быть только одна.
Стоит отметить, что сообщение не может быть опубликовано напрямую в очередь, оно должно проходить через обменник. Собственно посредством обменника оно и публикуется
$result = $exchange->publish(json_encode("Hello world!"), "foo_key");

После того как сообщение отослано, коннект можно разорвать.
$connection->disconnect();


Получатель также должен выполнить ту же последовательность
- приконнектиться к серверу сообщений;
- создать канал;
- объявить обменник;
- объявить очередь;
- связать очередь с обменником по ключу
Последние два действия, как упоминалось выше, не обязательны. Теперь можно начать прослушивать очередь

while (true) {
    if ($envelope = $queue->get(AMQP_AUTOACK)) {
        $message = json_decode($envelope->getBody());
        print($message);
    }
}


Здесь методу get в качетсве параметра передается константа ARMQ_AUTOACK, которая оповещает сервер сообщений о том, что данное сообщение получено. Это самый простой способ удалить сообщение из очереди. Однако в данном случае в случае неудачной обработки сообщения, вернуть повторно его в очередь нельзя.

Таким образом, получаем два скрипта

send.php

$connection = new AMQPConnection($connection_params);
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('ex_hello');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare(); $queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$queue->declare();
$result = $exchange->publish(json_encode("Hello world!"), "foo_key");
$connection->disconnect();


receiver.php

$connection = new AMQPConnection($connection_params);
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('ex_hello');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare(); $queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$queue->declare();
$queue->bind($exchange->getName(), 'foo_key');
while (true) {
    if ($envelope = $queue->get(AMQP_AUTOACK)) {
        $message = json_decode($envelope->getBody());
        print($message);
    }
}
$connection->disconnect();


Добавить комментарий

Ваше имя

Сообщение

Подтверждение