Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

RabbitMQ tutorial 3 — Публикация/Подписка

Anna | 31.05.2014 | нет комментариев

Хочу продолжить серию перевода уроков с официального сайта. Примеры будут на php, но их дозволено реализовать на большинстве знаменитых ЯП.

Публикация/Подписка

В предыдущей статье было рассмотрено создание рабочей очереди сообщений. Было сделано допущение, что всякое сообщение будет направлено одному обработчику(worker). В этой статье усложним задачу – отправим сообщение нескольким подписчикам. Данный паттерн знаменит как “publish/subscribe” (публикация/подписка).
Дабы осознать данный образец, сделаем примитивную систему логирования. Она будет состоять из 2-х программ – первая будет создавать логи, вторая считывать и печатать их.
В нашей систему логирования всякая программа подписчик будет получать всякое сообщение. Вследствие этому, мы сумеем запустить одного подписчика на сохранение логов на диск, а потом в всякое время сумеем сделать иного подписчика для отображения логов на экран.
По существу, всякое сообщение будет транслироваться всякому подписчику.

Точки обмена(exchanges)

В предыдущих статьях для отправки и принятия сообщений мы трудились с очередью. Сейчас разглядим расширенную модель отправки сообщений Rabbit.
Напомним термины предыдущей статьи:

  • Producer (подрядчик) r/> Сейчас мы можем отправить сообщение в нашу именованную точку доступа.
    $channel->exchange_declare('logs', 'fanout', false, false, false);
    $channel->basic_publish($msg, 'logs');
    
    

    Временные очереди:

    Всё это время мы применяли название очередей (“hello“ либо “task_queue”). Вероятность давать названия помогает указать обработчикам (workers) определенную очередь, а также разделять очередь между продюсерами и подписчиками.
    Но наша система логирования требует, Дабы в очередь поступали все сообщения, а не только часть. Также мы хотим, Дабы сообщения были востребованными, а не ветхими. Для этого нам потребоваться 2 вещи:
    Всякий раз когда мы соединяемся с Rabbit, мы создаем новую очередь, либо даем сделать серверу случайное название.
    Всякий раз когда подписчик отключается от Rabbit, мы удаляем очередь.
    В php-amqplib заказчике, когда мы обращаемся к очереди без названии, мы создаем временную очередь и механически сгенерированным названием:

    list($queue_name, ,) = $channel->queue_declare("");
    

    Способ вернет механически сгенерированное имя очереди. Она может быть такой – ‘amq.gen-JzTY20BRgKO-HjmUJj0wLg.’.
    Когда заявленное соединение оборвется, очередь механически удалиться.

    Переплеты(Bindings)

    image

    Выходит, у нас есть точка доступа с типом fanout и очередь. Теперь нам необходимо сказать точке доступа, Дабы она отправила сообщение в очередь. Отношение между точкой доступа и очередью именуется bindings.

    $channel->queue_bind($queue_name, 'logs');
    

    С этого момента, сообщения для нашей очереди проходят через точку доступа
    Посмотреть список binding-ов дозволено применяя команду rabbitmqctl list_bindings

    Отправка во все очереди:

    image
    Программа продюсер, которая создает сообщения, не изменилась с предыдущей статьи. Исключительное главное различие – сейчас мы направляем сообщения в нашу именованную точку доступа ‘logs’, взамен точки доступа по умолчанию. Нам необходимо было указать имя очереди при отправки сообщения. Но для точки доступа с типом fanout в этом нет необходимости. Разглядим код скрипта emit_log.php:

    <?php
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('logs', 'fanout', false, false, false);
    
    $data = implode(' ', array_slice($argv, 1));
    if(empty($data)) $data = "info: Hello World!";
    $msg = new AMQPMessage($data);
    
    $channel->basic_publish($msg, 'logs');
    
    echo " [x] Sent ", $data, "n";
    
    $channel->close();
    $connection->close();
    
    ?>
    
    

    (emit_log.php source)

    Как вы видите, позже установки соединения мы создаем точку доступа. Данный шаг нужен, так как применение несуществующей точки доступа – запрещено.
    Сообщение в точке доступа будут утрачены, так как ни одна очередь не связана с точкой доступа. Но это отлично для нас: пока нет ни одного подписчика нашей точки доступа, все сообщения могут неопасно удалятся.
    Код подписчика receive_logs.php:

    <?php
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPConnection;
    
    $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('logs', 'fanout', false, false, false);
    
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
    $channel->queue_bind($queue_name, 'logs');
    
    echo ' [*] Waiting for logs. To exit press CTRL C', "n";
    
    $callback = function($msg){
      echo ' [x] ', $msg->body, "n";
    };
    
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    
    ?>
    
    

    (receive_logs.php source)

    Если вы хотите сберечь логи в файл, вам понадобится открыть консоль и набрать:

    $ php receive_logs.php > logs_from_rabbit.log

    Если вы хотите отобразить логи на экран, откройте еще одно окно и наберите:
    $ php receive_logs.php

    Ну и безусловно запуск продюсера сообщений:
    $ php emit_log.php

    С поддержкой команды rabbitmqctl list_bindings мы можем убедиться, что код верно сотворил очередь и связал её с точкой доступа. С двумя открытыми программами receive_logs.php у вас должно получиться следующее:

    $ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done. 

    Тут отображено, что данные точки доступа logs отправляются в две очереди, имена которых сделаны механически. Это именно то, чего мы добивались.
    В дальнейшей статье будет описано, как прослушать только часть сообщений.

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB