Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Постигаем Storm Framework. Часть III

Anna | 5.06.2014 | нет комментариев
Во 2-й части статьи рассказывалось о механизмах выявления ошибок в процессе обработки.

Обработка завершилась с оплошностью, что делать дальше? Абсолютно допустимо, что утрачена связь с одним из узлов кластера либо временно недостижима база данных. В этом случае, невозможно с уверенностью сказать, какие операции выполнились удачно, а какие — нет. Если все операции в цепочке вторично применимы (идемпотентны), скажем установка флага, то дозволено легко перезапустить обработку. Если нет, то на поддержка приходят механизмы транзакций Storm.

Когда говорят о колляциях транзакций, здесь же всплывает термин ACID:

  • Atomicity (атомарность). Все метаморфозы произведенные в системе на протяжении транзакции, либо используются всецело, либо не используются вовсе.
  • Consistency (cогласованность). Транзакция переводит систему из одного непртиворечивого состояния в другое.
  • Isolation (изолированность). Параллельно исполняемые транзакции не оказывают воздействие на итог работы друг друга.
  • Durability (безопасность). Зафиксированные транзакцией метаморфозы гарантированно остаются в системе.

Consistency и Durability в большей степерни относятся к базам данных. Нас будут волновать Atomicity и Isolation.

В версии 0.8.0 в Storm возникла подсистема Trident — аналог Apache Pig. В нее же перекочевал функционалTransactional topology.

Транзакции в Storm

Atomicity

В Topology создается объект реализующий интерфейс State, инкапсулирующий работу с БД. Входные данные, поступающие в Spout, разбиваются на Tuple и собираются в пакеты (batch). Batch ассоцируется с уникальным transaction id. Tuple образующие batch могут обрабатываться параллельно.
В конце цепочки обработки, комплект Tuple, относящихся к одной транзакции, передается в способupdateState класса, реализующего интерфейс StateUpdater, тот, что и призводит модификацию State. В случае удачного заключения, Spout получает уведомление об триумфе обработки batch’a. В случае ошибки, Spout должен передать на обработку каждый batch вторично.
Таким образом Storm гарантирует, что Batch будет зафиксирован в БД всецело и только один раз.

Isolation

Storm гарантирует, что Batch’и передаются в StateUpdater сурово ступенчато, в порядке возрастания transaction id. То есть Batch #2 будет зафиксирован только позже удачной фиксации Batch’а #1.

Реализация

Spout с помощью транзакций должен реализовывать интерфейс ICommitterTridentSpout<TransactionMetadata>. TransactionMetadata — всякий класс, содержит данные для генерации Batch’ей и генерации дальнейшей транзакции: TxMeta.

Спрятанный текст

public class TxMeta {
    private int start;
    private int count;

    public TxMeta(int start, int count) {
        this.start = start;
        this.count = count;
    }
// Skipped getters 
}

Класс реализующий интерфейс ITridentSpout.BatchCoordinator<TransactionMetadata> инициализирует TransactionMetadata при создании транзакции и отвечает на запрос готовы ли данные для следу!br/>

Класс реализующий интерфейс StateFactory, создает экземпляры State: TxDatabaseFactory.

Собираем все совместно TridentTransactionApp:

public class TridentTransactionApp
{
    public static void main( String[] args ) throws Throwable
    {
        Logger.getRootLogger().setLevel(Level.ERROR);

// Создаем топологию
        TridentTopology tridentTopology = new TridentTopology();
// Добавляем наш Spout
        tridentTopology.newStream("TridentTxSpout", new TridentTxSpout()).
// Обработка Tuple пойдет параллельно - OpPrintout легко печатает записи
                shuffle().each(new Fields("msg"), new OpPrintout()).
                parallelismHint(2).
// Сливаем итоги параллельной обработки в один поток
                global().
// Записываем метаморфозы в State (БД)
                partitionPersist(new TxDatabaseFactory(),
                        new Fields("msg"), new TxDatabaseUpdater());
// Skipped
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("T2", config, tridentTopology.build());
        Thread.sleep(1000*100);
        cluster.shutdown();
    }
}

Транзакционные вероятности Storm дюже комфортно применять для передачи данных из одной системы в иную, когда требуется нетривиальная обработка. Скажем одна система генерирует файлы, Storm их разделяет на записи, обрабатывает в параллельном режиме и складывает в БД. В случае ошибки обработки есть ручательство, что файл не будет удален и не будет обработан двукратно.

PS. Раскрыть все вероятности Storm в рамках статей немыслимо, материала хватит на целую книгу. Верю мне удалось показать ключевые вероятности фреймворка и вероятности его использования в реальных планах.
По поводу развертывания кластера — незадолго наткнулся на хорошую статью. Не вижу смысла повторяться. Развернуть Storm в production подлинно нетрудно.

PPS. В Hadoop существует аналог on-line обработки Storm — Hadoop Streaming, но в различии от Storm, транзакции он не поддерживает.

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB