RabbitMQ - Урок 4. Селективная рассылка

В предыдущем уроке была рассмотрена возможность отсылки сообщений нескольким получателям.В данном уроке мы рассмотрим как отсылать сообщения в четко определенные очереди. Такая возможность может понадобиться, к примеру, если мы не хотим сохранять все сообщения на диске, а только критический. В то время как на экран будут выводиться все сообщения.

Связывание (bindings)
Связываение уже упоминалось в предыдущем уроке

$queue->bind($exchange->getName(), '');

Повторимся, оно нужно, чтобы сказать обменнику, что он должен публиковать сообщения имеено в эту очередь.
В методе bind() имеется второй параметр - ключ(routingKey), по которому связывается обменник и очередь. В данном уроке он будет играть основную роль. Стоит также напомнить, что ключ напрямую зависит от типа обменника. Так для обменника с типом fanout, он просто игнорируется.
К примеру, если нужно связать обменник и очередь по ключу 'failure_messages'

$queue->bind($exchange->getName(), 'failure_messages');

Прямое связывание(точка-точка)
В предыдущем уроке система логирования выполняла широковещательную рассылку всем консьюмерам. Теперь мы хотим расширить это поведение путем добавления фильтра сообщений по их важности. К примеру, критические ошибки писать на диск, а предупреждения только выводить на экран с целью экономии дискового пространтсва. Ранее мы использовали обменник с типом fanout, который не позволяет это сделать. Сейчас мы используем другой тип обменника - direct, который отправляет сообщения только тем очередям, routingKey которых совпадает с routingKey сообщения. Это поведение проиллюстрировано на изображении



На изображении можно видеть обменник X с типом direct, который связан с очередью Q1 по ключу failure, и с очередью Q2 по ключам notice и warning. В данном случае все сообщения с ключем failure будут отсылаться только в очередь Q1, а все сообщения с ключами notice и warning будут отсылаться в очередь Q2. Сообщения, ключи которые не совпадают с выше указанными, будут игнорироваться всеми очередями.
Множественная связь
Вполне возможно несколько очередей связать с обменником по одному и тому же ключу. Т.е. для нашего примера мы вполне можем установить связь по ключу notice между обменником и очередью Q1 и между обменником и очередью Q2. В таком случае сообщения с ключем notice будут отсылаться на обе очереди, т.е. получаем поведение аналогичное обменнику с типом fanout.



Отправка сообщений
Для отправки сообщений способом точка-точка обменник должен быть создан с типом direct, который сооветствует константе AMQP_EX_TYPE_DIRECT.

$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare();

После чего возможна публикация сообщений по ключу

$exchange->publish($message, 'notice');

Получение сообщений
Получение сообщений ничем не отличается от предыдущего урока, за исключением того, что нам нужно связать обменник с очередью по каждому типу

$queue = new AMQPQueue($channel);
$queue->declare();

foreach ($routingKeys as $routingKey) {
    $queue->bind($exchange->getName(), $routingKey');
}

Все вместе
Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.

Продюсер(send.php)

$params = array(
    'host' => 'localhost',
    'port' => 5672,
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
);

$message = 'default_message';
$routingKey = isset($argv[1]) ? $argv[1] : 'default_key';

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();


$result = $exchange->publish(json_encode($message), $routingKey);

if ($result)
    echo 'sent'.PHP_EOL;
else
    echo 'error'.PHP_EOL;

$connection->disconnect();

Консьюмер (receive.php)

$params = array(
    'host' => 'localhost',
    'port' => 5672,
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
);

$routingKeys = array('notice', 'warning', 'failure');

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_DURABLE);
$queue->declare();

foreach ($routingKeys as $routingKey) {
    $queue->bind($exchange->getName(), $routingKey');
}

while (true) {
    if ($envelope = $queue->get()) {
        $message = json_decode($envelope->getBody());
        echo "delivery tag: ".$envelope->getDeliveryTag().PHP_EOL;
        echo "routing key: ".$envelope->getRoutingKey().PHP_EOL;
        if (doWork($message)) {
            $queue->ack($envelope->getDeliveryTag());
        } else {
            $queue->nack($envelope->getDelivaryTag(), AMQP_REQUEUE);
        }
    }
}

$connection->disconnect();

Добавить комментарий

Ваше имя

Сообщение

Подтверждение