В предыдущем уроке была рассмотрена возможность отсылки сообщений нескольким получателям.В данном уроке мы рассмотрим как отсылать сообщения в четко определенные очереди. Такая возможность может понадобиться, к примеру, если мы не хотим сохранять все сообщения на диске, а только критический. В то время как на экран будут выводиться все сообщения.
Связывание (bindings)
Связываение уже упоминалось в предыдущем уроке
$queue->bind($exchange->getName(), '');
Повторимся, оно нужно, чтобы сказать обменнику, что он должен публиковать сообщения имеено в эту очередь.
В методе bind() имеется второй параметр - ключ(routingKey), по которому связывается обменник и очередь. В данном уроке он будет играть основную роль. Стоит также напомнить, что ключ напрямую зависит от типа обменника. Так для обменника с типом fanout, он просто игнорируется.
К примеру, если нужно связать обменник и очередь по ключу 'failure_messages'
$queue->bind($exchange->getName(), 'failure_messages');
Прямое связывание(точка-точка)
В предыдущем уроке система логирования выполняла широковещательную рассылку всем консьюмерам. Теперь мы хотим расширить это поведение путем добавления фильтра сообщений по их важности. К примеру, критические ошибки писать на диск, а предупреждения только выводить на экран с целью экономии дискового пространтсва. Ранее мы использовали обменник с типом fanout, который не позволяет это сделать. Сейчас мы используем другой тип обменника - direct, который отправляет сообщения только тем очередям, routingKey которых совпадает с routingKey сообщения. Это поведение проиллюстрировано на изображении
На изображении можно видеть обменник X с типом direct, который связан с очередью Q1 по ключу failure, и с очередью Q2 по ключам notice и warning. В данном случае все сообщения с ключем failure будут отсылаться только в очередь Q1, а все сообщения с ключами notice и warning будут отсылаться в очередь Q2. Сообщения, ключи которые не совпадают с выше указанными, будут игнорироваться всеми очередями.
Множественная связь
Вполне возможно несколько очередей связать с обменником по одному и тому же ключу. Т.е. для нашего примера мы вполне можем установить связь по ключу notice между обменником и очередью Q1 и между обменником и очередью Q2. В таком случае сообщения с ключем notice будут отсылаться на обе очереди, т.е. получаем поведение аналогичное обменнику с типом fanout.
Отправка сообщений
Для отправки сообщений способом точка-точка обменник должен быть создан с типом direct, который сооветствует константе AMQP_EX_TYPE_DIRECT.
$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare();
После чего возможна публикация сообщений по ключу
$exchange->publish($message, 'notice');
Получение сообщений
Получение сообщений ничем не отличается от предыдущего урока, за исключением того, что нам нужно связать обменник с очередью по каждому типу
$queue = new AMQPQueue($channel);
$queue->declare();
foreach ($routingKeys as $routingKey) {
$queue->bind($exchange->getName(), $routingKey');
}
Все вместе
Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.
Продюсер(send.php)
$params = array(
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
);
$message = 'default_message';
$routingKey = isset($argv[1]) ? $argv[1] : 'default_key';
$connection = new AMQPConnection();
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$result = $exchange->publish(json_encode($message), $routingKey);
if ($result)
echo 'sent'.PHP_EOL;
else
echo 'error'.PHP_EOL;
$connection->disconnect();
Консьюмер (receive.php)
$params = array(
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
);
$routingKeys = array('notice', 'warning', 'failure');
$connection = new AMQPConnection();
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_DURABLE);
$queue->declare();
foreach ($routingKeys as $routingKey) {
$queue->bind($exchange->getName(), $routingKey');
}
while (true) {
if ($envelope = $queue->get()) {
$message = json_decode($envelope->getBody());
echo "delivery tag: ".$envelope->getDeliveryTag().PHP_EOL;
echo "routing key: ".$envelope->getRoutingKey().PHP_EOL;
if (doWork($message)) {
$queue->ack($envelope->getDeliveryTag());
} else {
$queue->nack($envelope->getDelivaryTag(), AMQP_REQUEUE);
}
}
}
$connection->disconnect();