Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Постигаем Storm Framework. Часть I

Anna | 5.06.2014 | нет комментариев

В 2011 году Twitter открыл, под лицензией Eclipse Public License, план распределенных вычислений Storm. Storm был сделан в компании BackType и перешел к Twitter позже покупки последней.

Storm это система ориентированная на распределенную обработку крупных потоков данных, аналогичнаяApache Hadoop, но в настоящем времени.

Ключевые особенности Storm:

  • Масштабируемость. Задачи обработки распределяются по узлам кластера и потокам на всяком узле.
  • Гарантированная охрана от потери данных.
  • Простота развертывания и спровождения.
  • Поправление позже сбоев. Если какой либо из обработчиков отказывает, задачи переадресуются на другие обработчики.
  • Вероятность написания компонентов не только на Java. Примитивный Multilang protocol с применением JSON объектов. Есть готовые адаптеры для языков Python, Ruby и Fancy.

В первой части рассматриваются базовые представления и основы создания приложения c применением Storm версии 0.8.2.

Элементы Storm

Tuple
Элемент представления данных. По умолчанию может содержать Long, Integer, Short, Byte, String, Double, Float, Boolean и byte[] поля. Пользовательские типы используемые в Tuple обязаны быть сериализуемыми.

Stream
Последовательность из Tuple. Содержит схему именования полей в Tuple.

Spout
Подрядчик данных для Stream. Получает данные из внешних источников, формирует из них Tuple и отправляет в Stream. Может отправлять Tuple в несколько различных Stream. Есть готовые для знаменитых систем обмена сообщениями: RabbitMQ / AMQPKestrelJMSKafka.

Bolt
Обработчик данных. На вход поступают Tuple. На выход отправляет 0 либо больше Tuple.

Topology
Общность элементов с изложением их связи. Аналог MapReduce job в Hadoop. В различии от MapReduce job — не останавливается позже исчерпания входного потока данных. Осуществляет транспорт Tuple между элементами Spout и Bolt. Может запускаться локально либо загружаться в Storm кластер.

Пример применения

 

Задача

Есть поток данных о телефонных вызовах Cdr. На основании source номера определяется id заказчика. На основании destination номера и id заказчика определяется тариф и считается стоимость громка. Всякий из этапов должен трудиться в несколько потоков.
Пример будет запускаться на локальной машине.

Реализация

Для начала легко распечатаем входные данные BasicApp.

Создаем новую Topology:

TopologyBuilder builder = new TopologyBuilder();

Добавляем Spout CdrSpout генерирующий входные данные:

builder.setSpout("CdrReader", new CdrSpout());

Добавляем Bolt с двумя потоками и указываем что на вход подается выходной поток CdrReader. shuffleGrouping обозначает что данные из CdrReader подаются на нечаянно выбранный Bolt.

builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).shuffleGrouping("CdrReader");

Конфигурируем и запускам локальный Storm кластер:

Config config = new Config(); // Конфигурация кластера по умолчанию
config.setDebug(false);

LocalCluster cluster = new LocalCluster(); // Создаем локальный Storm кластер
cluster.submitTopology("T1", config, builder.createTopology()); // Стартуем Topology
Thread.sleep(1000*10); 
cluster.shutdown(); // Останавливаем кластер

На выходе получаем приблизительно следующее:

Спрятанный текст

OUT>> [80]Cdr{callSource='78119990005', callDestination='8313610698077174239', 
callTime=7631, clientId=0, price=0}
OUT>> [78]Cdr{callSource='78119990006', callDestination='2238707710336895468', 
callTime=20738, clientId=0, price=0}
OUT>> [78]Cdr{callSource='78119990007', callDestination='579372726495390920', 
callTime=31544, clientId=0, price=0}
OUT>> [80]Cdr{callSource='78119990006', callDestination='2010724447342634423', 
callTime=10268, clientId=0, price=0}

Число в квадратных скобках — Thread Id, видно что обработка ведется параллельно.

Для дальнейших экспериментов необходимо разобраться с разделением входных данных между несколькими обработчиками.
В примере выше был использован беспричинный подход. Но в настоящем использовании Bolt’ы наверно будут применять внешние справочные системы и базы данных. В этом случае желанно Дабы всякий Bolt обрабатывал свое подмножество входных данных. Тогда дозволено будет организовать результативное кэширование данных из внешних систем.

Для этого в Storm предусмотрен интерфейс CustomStreamGrouping.
Добавим в план CdrGrouper. Его задача — отправлять Tuple с идентичными source номерами на один и тот же Bolt. Для этого в CustomStreamGrouping предусмотрено два вызова:
prepare — вызывается перед первым применением:

@Override
    public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> integers) {
        tasks = new ArrayList<>(integers); // Запоминаем номера Bolts
    }

и chooseTasks — где на вход подается список из Tuple, а возвращается список состоящий из номеров Bolt’ов для всякой позиции в списке Tuple:

@Override
    public List<Integer> chooseTasks(int i, List<Object> objects) {
        List<Integer> rvalue = new ArrayList<>(objects.size());
        for(Object o: objects) {
            Cdr cdr = (Cdr) o;

            rvalue.add(tasks.get(Math.abs(cdr.getCallSource().hashCode()) % 
                            tasks.size()));
        }
        return rvalue;
    }

Заменим shuffleGrouping на CdrGrouper BasicGroupApp:

builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).
                customGrouping("CdrReader", new CdrGrouper());

Запустим и удостоверимся что работает как задумано:

Спрятанный текст

OUT>> [80]Cdr{callSource='78119990007', callDestination='3314931472251135073', 
callTime=17632, clientId=0, price=0}
OUT>> [80]Cdr{callSource='78119990007', callDestination='4182885669941386786', 
callTime=31533, clientId=0, price=0}

Дальше в план добавляем:
ClientIdBolt — определяет id заказчика по source номеру.
ClientIdGrouper — Группирует по id заказчика.
RaterBolt — занимается тарификацией.
CalcApp — окончательный вариант программы.

Если тема будет увлекательна, то в дальнейшей части верю рассказать о механизмах охраны от потери данных и запуске на настоящем кластере. Код доступен на GitHub.

PS. Из песни безусловно слова не выбросишь, но наименование обработчика данных «Bolt» несколько смущает :)

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB