Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Постигаем Storm Framework. Часть II

Anna | 5.06.2014 | нет комментариев

В первой части рассматривались базовые представления Storm.

Различные классы задач предъявляют разные требования к безопасности. Одно дело пропустить пару записей при подсчете статистики посещений, где счет идет на сотни тысяч и специальная точность не необходима. И вовсе другое — утратить, скажем, информацию о платеже заказчика.

Дальше разглядим о механизмы охраны от потери данных, которые реализованы в Storm.

Базовый пример

 

Spout

Если нам не значимо были ли ошибки при обработке Tuple, то Spout отправляет Tuple в SpoutOutputCollector посредством вызова способа emit(new Values(…)).

Eсли мы хотим узнать удачно ли обработался Tuple, то вызов будет выглядеть как emit(new Values(…), msgId), где msgId это объект произвольного класса. В этом случае интерфейс ISpout предоставляет способы:

где msgId — это msgId с которым был вызван SpoutOutputCollector.emit.
Пример FailAwareSpout:

public class FailAwareSpout extends BaseRichSpout {
private Message[] messages;
// Skipped ...
    private static class Message implements Serializable {
        private String message;
        private int failCount;

        private Message(String message) {
            this.message = message;
        }
    }
// Skipped ...
    @Override
    public void nextTuple() {
// Skipped ...
// Отправляем Tuple c msgId
        outputCollector.emit(new Values(messages[messageId].message), messageId);
    }

// Tuple обработан типично
    @Override
    public void ack(Object msgId) {
        Message m = messages[(Integer) msgId];

        System.out.println("IN>> ["   Thread.currentThread().getId()   "] message "  
                m.message   " processed successfully");
    }

// Tuple не обработан
    @Override
    public void fail(Object msgId) {
        Message m = messages[(Integer) msgId];
        if(  m.failCount > MAX_RETRY_COUNT) {
            throw new IllegalStateException("Too many message processing errors");
        }
        System.out.println("IN>> ["   Thread.currentThread().getId()   "] message "  
                m.message   " processing failed "   "["   m.failCount   "]");
// Вставляем в очередь на повторную обработку
        sendQueue.addLast((Integer) msgId);
    }
}

Способы nextTuple, ack и fail, вызываются в одном потоке и не требуют дополнительной синхронизации при обращении к полям Spout.

Bolt

Для того что бы Bolt мог оповещать Storm о итогах обработки, он должен реализовывать интерфейс IRichBolt. Проще каждого это сделать унаследовав класс BaseRichBolt.
Bolt сообщает Storm o итогах своей работы посредством вызова следующих способов класса OutputCollector в способе execute(Tuple):

  • ack(Tuple) — обработка прошла удачно
  • fail(Tuple) — обработка завершилась с оплошностью

Пример FailingBolt:

public class FailingBolt extends BaseRichBolt {
    OutputCollector outputCollector;
// Skipped ...
    @Override
    public void execute(Tuple tuple) {
// Skipped ...
            outputCollector.ack(tuple); // Данные удачно обработаны
        }
        else {
// Skipped ...
            outputCollector.fail(tuple); // Обработка завершилась с оплошностью
        }
    }
// Skipped ...
}

Пример применения: BasicFailApp, Spout FailAwareSpout и Bolt FailingBolt случайным образом генерирующий ошибки обработки.

В Bolt’ах унаследованных от класса BaseBasicBoltack(Tuple) вызывается позже выхода из способа executeмеханически.

Anchoring

При обработке входного Tuple, Bolt может генерировать больше одного выходного Tuple. Если Bolt вызвалemit(sourceTuple, resultTuple) то образуется DAG с вершиной в виде начального Tuple и потомками в виде зарожденных Tuple. Storm отслеживает ошибки процессинга всех узлов графа. В случае происхождения ошибки на любом ярусе иерархии, Spout, породивший начальный Tuple, будет уведомлен вызовом fail. ПримерMultiplierBolt:

public class MultiplierBolt extends BaseRichBolt {
// Skipped ...
    @Override
    public void execute(Tuple tuple) {
// Генерируем несколько  исходящих Tuple из одного входящего
        for(int i = 0; i < MULTI_COUNT;   i) {
// Anchoring, привязываем исходящие Tuple к входящему 
            outputCollector.emit(tuple, new Values(tuple.getString(0)   " - "   i));
        }
        outputCollector.ack(tuple);
    }
// Skipped ...
}

Пример применения Anchoring: TreeFailApp

В Bolt’ах унаследованных от класса BaseBasicBolt способ execute(Tuple, BasicOutputCollector) вызывается с коллектором BasicOutputCollector. Специфика BasicOutputCollector в том, что он механически делает Anchor на входной Tuple при emit.

От того что Storm является распределенной системой, Tuple могут передаваться с одного узла кластера на иной. В связи с этим Storm обеспечивает отслеживание таймаутов обработки. По умолчанию, каждый граф должен быть обработан за 30 секунд, либо Storm вызовет способ fail у породившего граф Spout’а. Таймаут дозволено изменить.

Код доступен на GitHub.

Дальнейшая часть будет посвящена Transactional Topologies, применяющихся в связке с транзакционными источниками данных.

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB