Урок 2. Распределенные очереди

Основная идея заключается в следующем. Допустим нужно обработать видео файл, чтобы получить на выходе три сконвертированных файла в различные форматы, информацию о метаданных и создать иконки для этого видеофайла. Т.е. получаем 5 задач, три из которых довольно тяжеловесные(конвертация), одна легкая(получение метаданных) и одна средняя(создание иконок). При этом все эти задачи являются независимыми друг от друга. Таким образом, можно выполнять их одновременно, т.е. распараллелить обработку очереди на уровне сообщений(пункт 2). Для этого при объявлении обменника необходимо установить ему тип AMQP_EX_TYPE_FANOUT. Тогда все сообщения, посылаемые в указанный обменник, независимо от имени очереди и ключа роутера, будут прослушиваться всеми запущенными копиями консьюмера. Т.е. каждое следующее сообщение будет отсылаться на следующий свободный консьюмер. В нашем случае их должно быть пять. Такой способ обработки получил название round-robin dispathing. Обратите внимание, что при отправке продюсером и при получении консьюмером используется одна и та же очередь.



Оповещение(acknowledgment)
Некоторые задачи могут выполняться довольно долго. И неизвестно, что может произойти с сервером в этот момент: сервер может перезагрузиться, либо задача может зависнуть или завершится фатальной ошибкой. В первом туториале оповещение было отключено путем передачи параметра AMQP_AUTOACK в метод get(). В этом случае сообщения удаляются из памяти сразу после выполнения метода get и в случае ошибки, случившейся во время обработки, не вернутся в очередь. Чтобы избежать этого, не будем передавать константу AMQP_AUTOACK в метод get. Вместо этого по завершению обработки вызовем метод ack(), который уведомит брокер о том, что сообщение успешно обработано и его можно удалить из памяти. В противном случае RabbitMQ понимает, что сообщение не обратботано и перенаправляет его другому свободному консьюмеру. Однако здесь стоит отметить один важный момент. Перенаправленные сообщения не будут обрабатываться до того пока консьюмер не отконнектится и приконнектится заново к брокеру. Если необходимо заново обработать сообщение в рамках того же коннекта к серверу сообщений, то необходимо вызвать метод nack() с флагом AMQP_REQUEUE, который поставит неудачно обработанную задачу обратно в очередь и уведомит брокер о том, что эта задача должна быть вновь обработана.

while (true) {
    if ($evnelope = $queue->get()) {
        $message = $enevelope->getBody();
        if (doWork($message)) {
            $queue->ack($envelope->getDeliveryTag());
        } else {
            // этот фрагмент кода может быть опущен, если мы собираемся обработать
            // передоставленный сообщения в рамках нового коннекта к сереверу сообщений
            $queue->nack($envelope->getDeliveryTag(), AMQP_REQUEUE);
        }
    }
}

Распростаненная ошибка - при включенном оповещении не подтверждать корректно обработанные задачи(сообщения). В этом случае при каждом новом коннекте, все уже обработанные задачи будут поступать заново на обработку. Процесс будет выглядеть как беспорядочная повторная отпарка сообщений, что в конечном итоге приведет к переполнению памяти. Отследить такую ситуацию можно путем использования нативного инструмента сервера сообщений rabbitmqctl

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.

Жизнеспособность сообщений(durability)
В предыдущем параграфе мы рассмотрели как не потерять сообщение в очереди путем повторной отправки его в очередь. Тем не менее сообщение может быть потеряно в случае если сервер сообщений был неожиданно остановлен. Чтобы этого избежать, очередь должна быть создана с флагом AMQP_DURABLE.

$queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->setFlags(AMQP_DURABLE);
$queue->declare();

Если очередь 'hello' уже была объявлена, то данный код вызовет ошибку, поскольку один раз объявленную очередь нельзя объявить повторно с другими параметрами. Из этой ситуации есть два выхода, либо обнулить все очереди как сказано здесь, либо создать новую очередь с неиспользуемым именем. Посмотреть список очередей можно спопособом упомянутым в предыдущем параграфе.
Установка флага AMQP_DURABLE не гарантирует стопроцентную сохранность сообщений в очереди. Несмотря на то, что таким спопосбом мы указываем RabbitMQ сохранять сообщения на диске, существует мертвая зона после получения соощения, когда оно уже в памяти, но еще не сохранено на диске. В этот момент, в случае не предвиденной ситуации, оно может быть утеряно из памяти. Для нашего простого примера таких гарантий достаточно, но если необходимо добиться высоких гарантий получения сообщения, то следует использовать транзакции.

Все вместе
Для примера распределения сообщений между очередями нам понадобится функция, имитирующая загруженность системы. Для этого мы используем обычный таймер

function doWork($message) {
    $sleep_interval = rand(1, 5);
    sleep($sleep_interval);
    print($message.str_repeat('.', $sleep_interval).PHP_EOL);
    return true;
}

Полный код продюсера (send.php) будет выглядеть так

$params = array(
    'host' => 'localhost',
    'port' => 5672,
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
);

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName('ex_hello');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_DURABLE);
$queue->declare();
$queue->bind($exchange->getName(), 'foo_key');

$result = $exchange->publish(json_encode("Hello world!"), "foo_key");

if ($result)
    echo 'sent'.PHP_EOL;
else
    echo 'error'.PHP_EOL;

$connection->disconnect();

Консьюмер (receive.php)

$params = array(
    'host' => 'localhost',
    'port' => 5672,
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
);

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName('ex_hello');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->setFlags(AMQP_IFUNUSED | AMQP_AUTODELETE | AMQP_DURABLE);
$queue->declare();
$queue->bind($exchange->getName(), 'foo_key');

while (true) {
    if ($envelope = $queue->get()) {
        $message = json_decode($envelope->getBody());
        echo "delivery tag: ".$envelope->getDeliveryTag().PHP_EOL;
        if (doWork($message)) {
            $queue->ack($envelope->getDeliveryTag());
        } else {
            $queue->nack($envelope->getDelivaryTag(), AMQP_REQUEUE);
        }
    }
}

$connection->disconnect();

Обратите внимание, что очереди создаются с абсолютно идентичными параметрами. Как уже было сказано, повторное создание очереди с иными параметрами вызовет исключение. Также стоит отметить, что если вы уверены, что консьюмер будет запускаться первым, то создание очереди в продюсере не обязательно.

Теперь, если запустить несколько копий консьюмера, то можно будет видеть как между ними распределяются сообщения. Предположим, что мы отправили 8 сообщений в очередь, т.е. запустили скрипт send.php 8 раз. После этого запускаем в двух разных терминалах по консьюмеру

Вывод в первом терминале

$ php receive.php
delivery tag: 1
Hello world!...
delivery tag: 2
Hello world!.....
delivery tag: 3
Hello world!....

Вывод во втором терминале

$ php receive.php
delivery tag: 1
Hello world!.
delivery tag: 2
Hello world!..
delivery tag: 3
Hello world!...
delivery tag: 4
Hello world!..
delivery tag: 5
Hello world!.

Как видно сообщения распределились по мере нагрузки консьюмера.

Добавить комментарий

Ваше имя

Сообщение

Подтверждение