Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

RabbitMQ tutorial 4 — Роутинг

Anna | 31.05.2014 | нет комментариев
Продолжаю серию перевода уроков с официального сайта. Примеры будут на php, но их дозволено реализовать на большинстве знаменитых ЯП.
В предыдущей статье мы разработали систему логирования. Нам удалось отправлять сообщения нескольким получателям. В этой статье модернизируем нашу программу — будем отправлять получателю только часть сообщений. Скажем, мы сумеем сберегать на диске только сообщения с скептическими ошибками (экономия места на диске), а в консоли будем отображать все сообщения.

Bindings

В предыдущей статье мы создавали связи(bindings). Напомним код:

$channel->queue_bind($queue_name, 'logs');

Binding — это связь между точкой доступа и очередью. Это дозволено интерпретировать как: очередь хочет получить сообщения из точки доступа.
Binding может принимать параметр routing_key. Дабы не путаться с параметром $channel::basic_publish (тоже содержит параметр routing_key), назовем его binding_key. Разглядим создание связи с ключом binding_key:

$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);

Значение этого ключа зависит от типа точки доступа. Точка доступа с типом fanout легко проигнорирует его.

Точка доступа Direct

Наша система логирования в предыдущей статье отправляла каждому подписчиками все сообщения. Мы хотим расширить нашу систему, Дабы фильтровать сообщения по степени значимости. Для примера мы сделаем так, Дабы скрипт, записывающий логи на диск не тратил своё место на собщения с типом warning либо info.
Ранее мы применяли точку доступа с типом fanout, которая не дает нам полной эластичности — она подходит только для примитивный трансляции.
Взамен это мы будем применять тип direct. Его алгорифм дюже примитивен — сообщения идут в ту очередь, binding_key которой совпадает с routing key сообщения.
Разглядим схему на картинке:

На схеме изображена точка доступа X и две связанные с ней очереди. Первая очередь связана с binding key = orange, а вторая очередь имеет две связи. Одна с ключом binding key = black, а вторая с ключом — green.
Сообщения с routing key = orange будут направляться в очередь Q1, а сообщения с ключом black либо green направятся в очередь Q2. Все остальные сообщения будут удалены.

Комбинированные связи (Multiple bindings)


Абсолютно возможно объединять несколько очередей с идентичным ключом binding key. В этом примере мы объединяем точку доступа X и очередь Q1 с тем же ключом black, что и у очереди Q2. В этом примере direct ведет себя также как и fanout: отсылает сообщения во все связанные очереди. Сообщения с ключом black попадет в обе очереди Q1 и Q2.

Отправка логов

Возведем алгорифм отправки сообщений. Взамен fanout для точки доступа будем применять тип direct. Routing key будет совпадать с наименованием типа лога. Возможен что скрипт отправки лога будет знать тип лога.
Для начала сделаем точку доступа:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

Сейчас отправим сообщение:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);

Лог будет иметь 3 типа: ‘info’, ‘warning’, ‘error’.

Подписка

Отправка сообщений будет таким же как и в примере предыдущей статьи, с одним условием — необходимо сделать для всякого типа лога свою связь binding.

foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

Итого получаем


Код скрипта продюсера emit_log_direct.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = $argv[1];
if(empty($severity)) $severity = "info";

$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo " [x] Sent ",$severity,':',$data," n";

$channel->close();
$connection->close();

?>

Код скрипта подписчика receive_logs_direct.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if(empty($severities )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]n");
    exit(1);
}

foreach($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo ' [*] Waiting for logs. To exit press CTRL C', "n";

$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

Если вы хотите сберечь в файл только логи с типом error и warning, наберите в консоли:

$ php receive_logs_direct.php warning error > logs_from_rabbit.log 
Если вы хотите отобразить все логи на экране, наберите в консоли

$ php receive_logs_direct.php info warning error
 [*] Waiting for logs. To exit press CTRL C

Либо Дабы вытянуть только логи error:

$ php emit_log_direct.php error "Run. Run. Or it will explode." 
 [x] Sent 'error':'Run. Run. Or it will explode.'

(исходники (emit_log_direct.php source) и (receive_logs_direct.php source))

В дальнейшей статье будет рассмотрена прослушка сообщений соответствующих какому-либо образцу

Источник: programmingmaster.ru
Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB