Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

MassTransit. Сервисная шина для обмена сообщениями на основе сервера очередей RabbitMQ (MSMQ) для .Net

Anna | 17.06.2014 | нет комментариев
Впервой я услышал о библиотеке MassTransit (MT) около года назад от экс-сотрудники, зашедшего в наш офис для обмена навыком. Компания, в которую он устроился, использовала MT для уменьшения связности между модулями разрабатываемого ими обслуживания и, от того что высокая связность начала превращаться в задачу и для нас, Сторонний навык оказался нам дюже кстати. Помимо уменьшения связности путем перехода на событийную модель взаимодействия между модулями MT сгодился нам и для разделения ресурсоемких задач между несколькими процессами.

Что такое MassTransit.

MassTransit — это реализация отлично знаменитого паттерна DataBus. Основная задача этого паттерна — организовать взаимодействие нескольких объектов, не подозревающих о существовании друг друга, через обмен сообщениями между ними. Библиотека была написана Dru Sellers и Chris Patterson как безвозмездный аналог плана NServiceBus, способный применять в качестве транспорта сервера сообщений RabbitMQ либо MSMQ на выбор. В своем плане мы выбрали применять RabbitMQ, следственно тут будет описан навык работы и подводные камни, поджидающие при конфигурации шины на этом сервере очередей. Невзирая на то, что MassTransit является слоем абстракции над протоколом AMQP и разработчики усердствовали спрятать детали реализации так, что познания об устройстве сервера очередей и протокола AMQP для применения библиотеки фактически не требуются, для понимания статьи и удачного обхождения граблей при конфигурации шины всеобщее представление об устройстве сервера RabbitMQ желанно иметь. Это плохая новость, но есть и отличная — познаний нужен самый минимум, абсолютно будет довольно прочесть первые четыре урока отсель. Уроки небольшие и внятные, постижение основ работы с RabbitMQ не займет много времени, но способно принести много пользы. К слову, на прогре даже была переведена первая пара уроков. Урок один и урок два.

К делу.

Перейдем от теории к практике и попытаемся с поддержкой библиотеки MassTransit исполнить задачу, схожую по своему функционалу на 1-й пример из туториала к RabbitMQ. Мы напишем примитивное консольное приложение, в котором будут взаимодействовать два объекта Publisher и Subscriber. Publisher, при нажатии на всякую клавишу, будет посылать в шину сообщение “KeyWasPressed” и код нажатой клавиши. Subscriber будет захватывать это сообщение из шины и выводить его на экран.

Для начала нам придется

1) Установить Erlang
2) Установить RabbitMQ
3) Установить MassTransit.RabbitMQ в тестовое приложение, исполнив команду PM> Install-Package MassTransit.RabbitMQ.

Перейдем к непринужденно к коду. Сообщения, отправляемые в шину, это обыкновенные DTO объекты. В первую очередь нам понадобится сделать класс самого сообщения, пересылаемого от издателя к подписчику. Назовем его KeyWasPressed.

public class KeyWasPressed
{
   //в сообщении будем передавать нажатую пользователем клавишу
   public ConsoleKey PressedKey { get; set; }
}

Сейчас перейдем к написанию простеньких издателя (publisher) и подписчика (subscriber). Ключевым элементом библиотеки является класс ServiceBus. ServiceBus в MassTransit — это среда для обмена сообщениями, в которой за транспорт сообщений отвечает сервер очередей RabbitMQ (либо MSMQ).

Подписчик.
 IServiceBus subscriber = ServiceBusFactory.New(sbc =>
{
     //указываем что в качестве транспорта мы будем применять rabbitMq
     sbc.UseRabbitMq();
     //указываем очередь из которой мы будем получать сообщения на которые мы подписались
     sbc.ReceiveFrom("rabbitmq://localhost/subscriber");
      //подписываемся на сообщение KeyWasPressed. При пds_rqvmk!
Запустим приложение и понажимаем на клавиши.

Сейчас при нажатии на всякую клавишу на экране возникает две строки — с цифровым и символьным кодом всякой нажатой клавиши. Если зайти в панель управления сервером очередей, дозволено увидеть, что сейчас к точке обмена MT:KeyWasPressed привязано теснее две очереди subscriber и anothersubscriber. И всякое полученное сообщение типа MT.KeyWasPressed сервер очередей RabbitMQ отправляет в обе очереди. 

Разделение ресурсоемких задач.


Сейчас давайте посмотрим, как с поддержкой связки MassTransit   RabbitMQ дозволено распределять ресурсоемкие задач между несколькими процессами. 
Предположим, что перед нами стоит задача сделать сервис для конвертации видео файлов. Под эту задачу у нас есть два сервера. Опытным путем мы установили, что оптимальная нагрузка для сервера под номером один — это три паралельно конвертируемых видеофайла, для сервера номер два — число единовременно конвертируемых видеофайлов не должно превышать пяти. Процесс конвертации мы, само собой, будем эмулировать. Предположим, что у нас есть очередь под наименованием filesToConvert, в которую поступают файлы для конвертации. Всякий файл будет представлять у нас объект типа VideoFile. 

public class VideoFile
{
  public int Num { get; set; }
  //Время, требующееся для конвертации файла в мс
  public int TimeToConvert { get; set; }
}

 Подписчик, получив такое сообщение, по правилам игры должен будет уснуть на число миллисекунд, заданных в поле TimeToConvert пришедшего сообщения. 
 Код, по легенде выполняющийся на первом сервере.
int firstServerFilesCount = 0;
IServiceBus firstServer = ServiceBusFactory.New(sbc =>
{
     sbc.UseRabbitMq();
     //указываем число паралельных потоков, получающих сообщения с сервера очередей
     sbc.SetConcurrentConsumerLimit(3);
     sbc.Subscribe(subs => subs.Handler<VideoFile>(msg =>
       {
       firstServerFilesCount  ; 
       Thread.Sleep(msg.TimeToConvert);
       Console.WriteLine("Сервер 1. {0}Файл {1} обработан за {2} мс. Потоков: {3} из 3. ThreadId - {4}", Environment.NewLine, msg.Num, msg.TimeToConvert, firstServerFilesCount, Thread.CurrentThread.ManagedThreadId); 
        firstServerFilesCount--;
       }));
       //prefetch=3. Уведомляем серверу очередей, что мы готовы разбирать до 3 сообщений единовременно 
       sbc.ReceiveFrom("rabbitmq://localhost/filesToConvert?prefetch=3");
 });

 По легенде мы на первом сервере решили ограничить число единовременно разбираемых сообщений тремя. Следственно, мы вызываем способ SetConcurrentConsumerLimit с доводом 3. Это обозначает, что при подключении объекта firstServer к серверу сообщений, MassTransit будет удерживать наготове пул из 3 потоков, предуготовленных для обработки сообщений с сервера. Но нужно помнить, что разделением сообщений занимается RabbitMQ, и он никак не может знать того факта, что объект firstServer готов разбирать до 3 сообщений единовременно. Передать ему эту информацию мы можем указав параметр prefetch в Uri, по которому firstServer подключается к серверу сообщений.

 Код, по легенде выполняющийся на втором сервере.
int secondServerFilesCount = 0;
IServiceBus secondServer = ServiceBusFactory.New(sbc =>
{
   sbc.UseRabbitMq();
   //указываем число паралельных потоков, получающих сообщения с сервера очередей
   sbc.SetConcurrentConsumerLimit(5);
   sbc.Subscribe(subs => subs.Handler<VideoFile>(msg =>
   {
     secondServerFilesCount  ;
     Thread.Sleep(msg.TimeToConvert);
     Console.WriteLine("Сервер 2. {0}Файл {1} обработан за {2} мс. Потоков: {3} из 5. ThreadId - {4}", Environment.NewLine, msg.Num, msg.TimeToConvert, secondServerFilesCount, Thread.CurrentThread.ManagedThreadId);
     secondServerFilesCount--;
    }));
    //prefetch=3. Информируем серверу очередей, что мы готовы разбирать до пяти сообщений единовременно 
    sbc.ReceiveFrom("rabbitmq://localhost/filesToConvert?prefetch=5");
});

 Различия, как дозволено было додуматься, есть только в числе потоков в пуле, призванных разбирать сообщения, и значении prefetch в Uri. Куда значимей подметить тот факт, что мы подключили secondServer к той же очереди, куда был подключен firstServer, тем самым создавая конкуренцию между подписчиками за сообщения, появляющиеся в этой очереди. Если объекты firstServer и secondServer будут подключены к различным очередям, то мы столкнемся с тем, что всякий файл будет сконвертирован двукратно, по разу на всяком сервере. 

 Сейчас напишем код, наполняющим очередь filesToConvert сотней “видеофайлов”, с заданным рандомом временем конвертации.
Random rnd = new Random();
for (int i = 1; i <= 100; i  )
{
  publisher.Publish(new VideoFile() {Num = i, TimeToConvert = rnd.Next(100, 5000)});
}

 Запускаем и убеждаемся, что наши сервера-подписчики работают паралельно, применяя назначенное нами число потоков.


Какие еще вероятности может предложить нам MassTransit.


 В рамках одной статьи разглядеть все вероятности, предлагаемые библиотекой MassTransit, немыслимо. Но дозволено перечислить (чем я и займусь).

  • Sagas. Механизм для координации распределенных процессов.


  • Scheduling Интеграция с библиотекой Quartz.net разрешает отправлять сообщения в очереди по расписанию.


. 



  • Encryption. Шифрование отправляемых сообщений. Для шифрования применяется блочный шифр Rijndael.


  • Unit Testability. Для целей тестирования, MassTransit может применять встроенный транспорт (Loopback), не требующий внешних MQ серверов.


 Код примеров из статьи дозволено взять тут.

 

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB