RabbitMQ - Урок 6. Реализация RPC шаблона

Во втором уроке была реализована очередь, которая распределяла нагрузку между всеми имеющимися консьюмерами. Но, что если нам нужно получить результат от обработчика очереди. Такой подход известен как вызов удаленных процедур или RPC(remote procedure call). В этом уроке будет реализована модель RPC с использованием очереди сообщений RabbitMQ. Конечно, такой подход предполагает, что обработка не должна занимать много времени. Для реализации примера наша функция обработчик будет изменять сообщение "message before" на "message after".

В целом, реализация RPC посредством RabbitMQ довольно проста. Клиент отправляет сообщение, а сервере отвечает. Для обработки ответа сервера, необходимо создать callback очередь. Чтобы узнать какая callback очередь ожидает ответа, мы должны в запросе послать ее имя. Для этого на продюсере создается анонимная очередь и ее имя добавляется в параметры запроса

$replyQueue = new AMQPQueue($channel);
$replyQueue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_EXCLUSIVE);
$replyQueue->declare();

$replyQueue->bind($exchange->getName(), $replyQueue->getName());

$correlationId = sha1($replyQueue->getName());
$attributes = array(
'reply_to' => $replyQueue->getName(),
'correlation_id' => $correlationId
);
$result = $exchange->publish(json_encode($message), '', AMQP_MANDATORY, $attributes);

// ... then code to read a response message from the callback_queue ...

Обратите внимание, что callback очередь создается с флагом AMQP_EXCLUSIVE, что означает, что только один консьюмер может слушать эту очередь.
Correlation ID
В методе, представленном выше, мы предполагаем создавать callback очередь для каждого RPC запроса. Поскольку нельзя однозначно по имени очереди определить какому запросу принадлежит ответ, в запрос также добавляется параметр correlationId, который имеет уникальное значение для каждого запроса. Позже, когда мы получим ответ, мы сможем сравнить его correlationId со значением, переданным вместе с запросом. И в случае их несовпадения просто отбросить полученный ответ.
Итоговый план действий
- клиент создает анонимную эксклюзивную callback очередь
- клиент отсылает запрос с двумя параметрами:
replyTo - имя callback очереди
corralationId - уникальное значение для каждого запроса
- запрос отправляется в именованную очередь, к примеру, с именем rpc_queue
- RPC воркер (RPC сервер) ждет запрос от этой очереди и когда запрос появляется, обрабатывает его и шлет ответ обратно клиенту, используя имя callback очереди в качестве роутер-ключа
- клиент слушает callback очередь и когда сообщение появляется, сверяет correlationId. Если значение этого свойства из полученного сообщения соответствует ранее сформированном значению, ответ обрабатывается приложением.

Все вместе
Функция обработки сообщения на стороне сервера выглядит следующим образом

function doWork($message)
{
    foreach ($message as &$m) {
        echo $m;
        $sleep_time = rand(0, 10);
        $tmp = explode(' ', $m);
        $m = $tmp[0].' after';
        echo str_repeat('.', floor($sleep_time));
        echo PHP_EOL;
    }
    sleep($sleep_time);
    return $message;
}

Функция обработки сообщения на стороне клиента

function getWork($message)
{
    print_r($message);
}

Продюсер(send.php)

$params = array(
    'host' => 'localhost',
    'port' => 5672,
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
);

$message = 'message before';
$routingKey = isset($argv[1]) ? $argv[1] : 'default_key';

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declare();


$replyQueue = new AMQPQueue($channel);
$replyQueue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_EXCLUSIVE);
$replyQueue->declare();

$replyQueue->bind($exchange->getName(), $replyQueue->getName());

$correlationId = sha1($replyQueue->getName());
$attributes = array(
'reply_to' => $replyQueue->getName(),
'correlation_id' => $correlationId
);
$result = $exchange->publish(json_encode($message), $routing_key, AMQP_MANDATORY, $attributes);

if ($result)
echo 'sent'.PHP_EOL;
else
echo 'error'.PHP_EOL;

while (true) {
    if ($envelope = $replyQueue->get()) {
        if ($envelope->getCorrelationId() == $corrlationId) {
            echo ($envelope->isRedelivery()) ? 'r: ' : 'n: ';
            $message = json_decode($envelope->getBody());
            getWork($message);
            echo PHP_EOL;
            $replyQueue->ack($envelope->getDeliveryTag()); break;
         }
    }
    // for avoid a hunging erlang sleep(1);
}

$connection->disconnect();

Консьюмер (receive.php)

$params = array(
    'host' => 'localhost',
    'port' => 5672,
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
);

$routingKey = isset($argv[1]) ? $argv[1] : 'default_key';

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('rpc_queue');
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_DURABLE);
$queue->declare();

$queue->bind($exchange->getName(), $routingKey);

while (true) {
    if ($envelope = $queue->get()) {
        $message = json_decode($envelope->getBody());
        echo "delivery tag: ".$envelope->getDeliveryTag().PHP_EOL;
        echo "routing key: ".$envelope->getRoutingKey().PHP_EOL;
        if (doWork($message)) {
            $queue->ack($envelope->getDeliveryTag());
        } else {
            $queue->nack($envelope->getDelivaryTag(), AMQP_REQUEUE);
        }
    }
}

$connection->disconnect();

Добавить комментарий

Ваше имя

Сообщение

Подтверждение