В предыдущем уроке мы улучшили нашу систему логирования путем использования обменников с типом direct, создав возможность получать сообщения выборочно. Следующим этапом будет создание системы, позволяющей логировать по множеству критериев. Допустим мы хотим разделить обработаку логирования основываясь не только на важности сообщений, но и по устройствам, вызвавшим эту ошибку. Это предоставило бы нам большую гибкость - к примеру, можно было бы выделить обработку логов для критических ошибок, инициированных кроном, и отдельно выделить обработку логов всех сообщений от ядра системы. Для имплементации такой возможности нам неоходимо нечто большее, чем прямая рассылка сообщений (рассылка по методу точка-точка).
Связывание по шаблону
Для выполнения связи по шалбону обменник должен иметь тип topic, который определяется константой AMQP_EX_TYPE_TOPIC. Ключи routingKey составляются из слова, следующих через точку, например, "logs.devices.kernel.notice", "logs.devices.cron". Максимальная длина такого ключа может составлять 255 символов. Логика доставки сообщений по ключу схожа с логикой для обменников с типом direct - сообщения с определенным ключем будут доставлены в очереди с соответствующим ключем. Но есть одна большая разница. Ключи, используемые для связи по шаблону, могут содержать два специальных символа:
- * , соответствует строго одному слову;
- # , соответствует любому количеству слов, в том числе и отсутствию слов;
Например, имеем следующие связи
*.orange.*
*.*.rabbit
lazy.*.*
Первое слово описывает скорость, второе - цвет и третье - вид животного, т.е. [speed][color][species]. Мы создали три связи: очередь Q1 связали по ключу "*.orange.*" и очередь Q2 - по ключам "*.*.rabbit" и "lazy.#". Таким образом, можно сказать, что очередь Q1 рассматривает всех оранжевых животных, а очередь Q2 - всех зайцев и всех медленных животных.
Рассмотрим несколько примеров :
- "quick.orange.rabbit" - в обе очереди
- "lazy.orange.elephant" - в обе очереди
- "quick.orange.fox" - только в 1-ую
- "lazy.brown.fox" - только во 2-ую
- "quick.brown.fox" - будет отброшена
- "quick.orange.male.fox" - будет отброшена
- "lazy.orange.male.fox" - только во 2-ую
Обменник с типом topic может повторять поведение обменника с типом fanout, если с ним связать очередь по ключу "#". Если в ключе не испльзовать специальных символов, то такой обменник будет соответствовать обменнику с типом direct.
Отправка сообщений
Для отправки сообщений по шаблону обменник должен быть создан с типом topic, который сооветствует константе AMQP_EX_TYPE_TOPIC.
$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare();
После чего возможна публикация сообщений по ключу
$exchange->publish($message, 'kern.notice');
Получение сообщений
Получение сообщений ничем не отличается от предыдущего урока
$queue = new AMQPQueue($channel);
$queue->declare();
foreach ($routingKeys as $routingKey) {
$queue->bind($exchange->getName(), $routingKey');
}
Все вместе
Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.
Продюсер(send.php)
$params = array(
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
);
$message = 'default_message';
$routingKey = isset($argv[1]) ? $argv[1] : 'default_key';
$connection = new AMQPConnection();
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declare();
$result = $exchange->publish(json_encode($message), $routingKey);
if ($result)
echo 'sent'.PHP_EOL;
else
echo 'error'.PHP_EOL;
$connection->disconnect();
Консьюмер (receive.php)
$params = array(
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
);
$routingKeys = array('cron.notice', 'kern.*', '*.failure');
$connection = new AMQPConnection();
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_DURABLE);
$queue->declare();
foreach ($routingKeys as $routingKey) {
$queue->bind($exchange->getName(), $routingKey');
}
while (true) {
if ($envelope = $queue->get()) {
$message = json_decode($envelope->getBody());
echo "delivery tag: ".$envelope->getDeliveryTag().PHP_EOL;
echo "routing key: ".$envelope->getRoutingKey().PHP_EOL;
if (doWork($message)) {
$queue->ack($envelope->getDeliveryTag());
} else {
$queue->nack($envelope->getDelivaryTag(), AMQP_REQUEUE);
}
}
}
$connection->disconnect();