RabbitMQ - Урок 5. Рассылка по шаблону

В предыдущем уроке мы улучшили нашу систему логирования путем использования обменников с типом direct, создав возможность получать сообщения выборочно. Следующим этапом будет создание системы, позволяющей логировать по множеству критериев. Допустим мы хотим разделить обработаку логирования основываясь не только на важности сообщений, но и по устройствам, вызвавшим эту ошибку. Это предоставило бы нам большую гибкость - к примеру, можно было бы выделить обработку логов для критических ошибок, инициированных кроном, и отдельно выделить обработку логов всех сообщений от ядра системы. Для имплементации такой возможности нам неоходимо нечто большее, чем прямая рассылка сообщений (рассылка по методу точка-точка).
Связывание по шаблону
Для выполнения связи по шалбону обменник должен иметь тип topic, который определяется константой AMQP_EX_TYPE_TOPIC. Ключи routingKey составляются из слова, следующих через точку, например, "logs.devices.kernel.notice", "logs.devices.cron". Максимальная длина такого ключа может составлять 255 символов. Логика доставки сообщений по ключу схожа с логикой для обменников с типом direct - сообщения с определенным ключем будут доставлены в очереди с соответствующим ключем. Но есть одна большая разница. Ключи, используемые для связи по шаблону, могут содержать два специальных символа:
- * , соответствует строго одному слову;
- # , соответствует любому количеству слов, в том числе и отсутствию слов;
Например, имеем следующие связи
*.orange.*
*.*.rabbit
lazy.*.*



Первое слово описывает скорость, второе - цвет и третье - вид животного, т.е. [speed][color][species]. Мы создали три связи: очередь Q1 связали по ключу "*.orange.*" и очередь Q2 - по ключам "*.*.rabbit" и "lazy.#". Таким образом, можно сказать, что очередь Q1 рассматривает всех оранжевых животных, а очередь Q2 - всех зайцев и всех медленных животных.

Рассмотрим несколько примеров :
- "quick.orange.rabbit" - в обе очереди
- "lazy.orange.elephant" - в обе очереди
- "quick.orange.fox" - только в 1-ую
- "lazy.brown.fox" - только во 2-ую
- "quick.brown.fox" - будет отброшена
- "quick.orange.male.fox" - будет отброшена
- "lazy.orange.male.fox" - только во 2-ую

Обменник с типом topic может повторять поведение обменника с типом fanout, если с ним связать очередь по ключу "#". Если в ключе не испльзовать специальных символов, то такой обменник будет соответствовать обменнику с типом direct.
Отправка сообщений
Для отправки сообщений по шаблону обменник должен быть создан с типом topic, который сооветствует константе AMQP_EX_TYPE_TOPIC.

$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare();

После чего возможна публикация сообщений по ключу

$exchange->publish($message, 'kern.notice');

Получение сообщений
Получение сообщений ничем не отличается от предыдущего урока

$queue = new AMQPQueue($channel);
$queue->declare();

foreach ($routingKeys as $routingKey) {
    $queue->bind($exchange->getName(), $routingKey');
}

Все вместе
Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.

Продюсер(send.php)

$params = array(
    'host' => 'localhost',
    'port' => 5672,
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
);

$message = 'default_message';
$routingKey = isset($argv[1]) ? $argv[1] : 'default_key';

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declare();

$result = $exchange->publish(json_encode($message), $routingKey);

if ($result)
echo 'sent'.PHP_EOL;
else
echo 'error'.PHP_EOL;

$connection->disconnect();

Консьюмер (receive.php)

$params = array(
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
);

$routingKeys = array('cron.notice', 'kern.*', '*.failure');

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_DURABLE);
$queue->declare();

foreach ($routingKeys as $routingKey) {
    $queue->bind($exchange->getName(), $routingKey');
}
while (true) {
    if ($envelope = $queue->get()) {
        $message = json_decode($envelope->getBody());
        echo "delivery tag: ".$envelope->getDeliveryTag().PHP_EOL;
        echo "routing key: ".$envelope->getRoutingKey().PHP_EOL;
        if (doWork($message)) {
            $queue->ack($envelope->getDeliveryTag());
        } else {
            $queue->nack($envelope->getDelivaryTag(), AMQP_REQUEUE);
    }
}
}

$connection->disconnect();

Добавить комментарий

Ваше имя

Сообщение

Подтверждение