Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Пригодные приёмы работы с Apache Camel

Anna | 1.06.2014 | нет комментариев
Если вам доводилось создавать интеграционные решения на Java, наверно, вам знаком восхитительный Java framework под наименованием Apache Camel. Он с лёгкостью осуществит связку между несколькими сервисами, импортирует данные из файлов, баз данных и прочих источников, оповестит вас о разных событиях в Jabber-заказчик либо по E-mail, станет основой для композитного приложения на базе большого числа других приложений.

Вступление

В основе модели Apache Camel лежит представление маршрутов (routes), которые дозволено конфигурировать как статически (скажем, в файле Spring-контекста), так и во время работы приложения. По маршрутам ходят караваны сообщений, заодно попадая в разные обработчики, конверторы, аггрегаторы и прочие трансформеры, что в финальном результате разрешает обработать данные из множества разных источников в цельном приложении и передать эти данные иным сервисам либо сберечь в какое-либо хранилище.
В всеобщем и целом Camel — абсолютно самодостаточный фреймворк. Применяя его, нередко, даже не доводится писать личный код — довольно лишь набрать верный маршрут, тот, что дозволит решить поставленную задачу. Впрочем, всё же для построения собственной модели обработки данных, может понадобиться написание кода.

Так было и у нас. Мы используем Camel для реализации конвейеров по обработке множества сообщений из разных источников. Сходственный подход разрешает, скажем, следить за состоянием сервисов, своевременно информируя о загвоздках, получать аггрегированные аналитические срезы, готовить данные для отправки в другие системы и другое. Поток обрабатываемых и «перевариваемых» сообщений в систему может быть достаточно огромным (тысячи сообщений в минуту), следственно мы усердствуем применять горизонтально масштабируемые решения там, где это допустимо. Скажем, у нас есть система отслеживания состояний исполняемых тестов и мониторингов сервисов. Сходственных тестов выполняется по миллиону повседневно, а сообщений для контроля процесса их выполнения мы получаем в разы огромнее.
Дабы «усвоить» сходственный объём сообщений, нужно чётко определить тактику аггрегации — от большего параллелизма к меньшему. Помимо этого нужно иметь правда бы базовую горизонтальную масштабируемость и отказоустойчивость обслуживания.
В качестве очереди сообщений мы используем ActiveMQ, в качестве оперативного хранилища — Hazelcast.

Масштабирование

Для организации параллельной обработки организуется кластер из нескольких равноправных серверов. На всяком из них живёт брокер ActiveMQ, в очереди которого складываются сообщения, поступающие по HTTP-протоколу. HTTP-ручки находятся за балансировщиком, распределяющим сообщения по живым серверам.
Входную очередь сообщений на всяком сервере разбирает Camel-приложение, использующее кластерHazelcast для хранения состояний, а также, при необходимости, синхронизации обработки. ActiveMQ также объеденены в кластер с применением NetworkConnectors, и могут «делиться» сообщениями друг с ином.
В целом схема выглядит дальнейшим образом:
image
Как видно из схемы, выход из строя одного из компонентов системы не нарушает её работоспособность, с учётом равноправия элементов. К примеру, если выходит из строя обработчик сообщений на одном из серверов, ActiveMQ начинает отдавать сообщения из своих очередей иным. Если падает один из брокеров ActiveMQ, то обработчик «зацепляется» за соседний. Ну и наконец, если выходит из строя каждый сервер, остальные сервера продолжают работать в поте лица, как ни в чём не бывало. Для возрастания сохранности данных ноды Hazelcast хранят резервные копии данных своих соседей (копии осуществляются асинхронно, их число на всякой ноде настраивается добавочно).
Данная схема также разрешает без специальных расходов масштабировать сервис, добавляя добавочные сервера, и тем самым увеличивая вычислительный источник.

Распределённые аггрегаторы

При применении аггрегации Apache Camel включает представления “aggregation repository” и “correlation key“. Первое — это репозиторий, где хранятся аггрегированные состояния (скажем, число упавших тестов за день). Второе — это ключ, применяемый для разделения потока сообщений по состояниям. Другими словами correlation key — это ключ в репозитории аггрегации (скажем, нынешняя дата).
Для аггрегаторов в сходственной схеме нам потребовалась реализация своего собственного репозитория аггрегации, умеющего беречь состояния в Hazelcast и синхронизировать обработку идентичных ключей внутри кластера. К сожалению, в стандартной поставке Camel мы сходственной вероятности не нашли. Благо сделать его оказалось вовсе нетрудно — довольно реализовать интерфейс AggregationRepository:

Спрятанный текст

public class HazelcastAggregatorRepository implements AggregationRepository {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    // maximum time of waiting for the lock from hz
    public static final long WAIT_FOR_LOCK_SEC = 20;

    private final HazelcastInstance hazelcastInstance;

    private final String repositoryName;

    private IMap<String, DefaultExchangeHolder> map;

    public HazelcastAggregatorRepository(HazelcastInstance hazelcastInstance, String repositoryName){
        this.hazelcastInstance = hazelcastInstance;
        this.repositoryName = repositoryName;
    }

    @Override
    protected void doStart() throws Exception {
        map = hazelcastInstance.getMap(repositoryName);
    }

    @Override
    protected void doStop() throws Exception {
        /* Nothing to do */
    }

    @Override
    public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
        try {
            DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange);
            map.tryPut(key, holder, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS);
            return toExchange(camelContext, holder);
        } catch (Exception e) {
            logger.error("Failed to add new exchange", e);
        } finally {
            map.unlock(key);
        }
        return null;
    }

    @Override
    public Exchange get(CamelContext camelContext, String key) {
        try {
            map.tryLock(key, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS);
            return toExchange(camelContext, map.get(key));
        } catch (Exception e) {
            logger.error("Failed to get the exchange", e);
        }
        return null;
    }

    @Override
    public void remove(CamelContext camelContext, String key, Exchange exchange) {
        try {
            logger.debug("Removing '"   key   "' tryRemove...");
            map.tryRemove(key, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS);
        } catch (Exception e) {
            logger.error("Failed to remove the exchange", e);
        } finally {
            map.unlock(key);
        }
    }

    @Override
    public void confirm(CamelContext camelContext, String exchangeId) {
        /* Nothing to do */
    }

    @Override
    public Set<String> getKeys() {
        return Collections.unmodifiableSet(map.keySet());
    }

    private Exchange toExchange(CamelContext camelContext, DefaultExchangeHolder holder) {
        Exchange exchange = null;
        if (holder != null) {
            exchange = new DefaultExchange(camelContext);
            DefaultExchangeHolder.unmarshal(exchange, holder);
        }
        return exchange;
    }
}

Дабы воспользоваться сходственным репозиторием, сейчас необходимо лишь подключить к плану Hazelcast и объявить его в контексте, а после этого добавить и комплект репозиториев с указанием на инстанс Hazelcast. Значимо помнить, что всякий аггрегатор должен иметь собственное пространство ключей, а следственно ему нужно передавать также имя репозитория. В настройках Hazelcast необходимо прописать все сервера, которые входят в кластер.
Таким образом, мы получаем вероятность применять аггрегаторы в распределённой среде, не задумываясь о том на каком именно сервере произойдёт аггрегация.

Распределённые таймеры

Число состояний, хранящихся в кластере довольно огромно. Но не все из них необходимы непрерывно. К тому же, некоторые состояния (скажем, состояние тестов, которые давным-давно не применяются, а следственно для них давным-давно не было сообщений) вообще беречь не необходимо. От сходственных состояний хочется избавляться и добавочно информировать об этом прочие системы. Для этого нужно с заданной периодичностью проверять состояния аггрегаторов на предмет устаревания и удалять их.
Примитивный метод это сделать — добавить периодическую задачу, скажем, с поддержкой Quartz. К тому же, Camel это сделать разрешает. Впрочем, нужно помнить, что выполнение происходит в кластере со большинством равноправных серверов. И не дюже хочется, Дабы периодические задачи Quartz срабатывали на всех единовременно. Во избежание этого, довольно сделать синхронизацию вновь же с поддержкой локов Hazelcast. Но как принудить Quartz инициализироваться только на одном сервере, точнее в какой момент изготавливать синхронизацию?
Для инициализации Camel-контекста и всех остальных компонентов системы мы используем Spring, и Дабы принудить Quartz стартовать планировщик только на одном сервере из кластера, во-первых, нужно отключить его механический запуск, очевидно объявив в контексте:

    <bean id="quartz">
        <property name="autoStartScheduler" value="false"/>
    </bean>

Во-вторых, необходимо где-то произвести синхронизацию и запустить планировщик только если удалось захватить лок, а после этого ждать дальнейшего момента его захвата (в случае, если предшествующий сервер, захвативший лок, вышел из строя либо по какой-то причине его отпустил). Это в Spring дозволено реализовать несколькими вариантами, скажем, через ApplicationListener, тот, что разрешает обработать события запуска контекста:

<bean>
    <property name="hazelcastInstance" ref="hazelcastInstance"/>
    <property name="quartzComponent" ref="quartz"/>
</bean> 

Получим следующую реализацию класса инициализации планировщика:

Спрятанный текст

public class HazelcastQuartzSchedulerStartupListener implements ShutdownPrepared, ApplicationListener {

    public static final String DEFAULT_QUARTZ_LOCK = "defaultQuartzLock";
    protected volatile boolean initialized = false;
    Logger log = LoggerFactory.getLogger(getClass());
    Lock lock;

    protected volatile boolean initialized = false;
    protected String lockName;
    protected HazelcastInstance hazelcastInstance;
    protected QuartzComponent quartzComponent;

    public HazelcastQuartzSchedulerStartupListener() {
        super();
        log.info("HazelcastQuartzSchedulerStartupListener created");
    }

    public void setLockName(final String lockName) {
        this.lockName = lockName;
    }

    public synchronized Lock getLock() {
        if (lock == null) {
            lock = hazelcastInstance.getLock(lockName != null ? lockName : DEFAULT_QUARTZ_LOCK);
        }
        return lock;
    }

    @Override
    public void prepareShutdown(boolean forced) {
        unlock();
    }

    @Required
    public void setQuartzComponent(QuartzComponent quartzComponent) {
        this.quartzComponent = quartzComponent;
    }

    @Required
    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
        this.hazelcastInstance = hazelcastInstance;
    }

    @Override
    public synchronized void onApplicationEvent(ApplicationEvent event) {
        if (initialized) {
            return;
        }
        try {
            while (true) {
                try {
                    getLock().lock();
                    log.warn("This node is now the master Quartz!");
                    try {
                        quartzComponent.startScheduler();
                    } catch (Exception e) {
                        unlock();
                        throw new RuntimeException(e);
                    }
                    return;
                } catch (OperationTimeoutException e) {
                    log.warn("This node is not the master Quartz and failed to wait for the lock!");
                }
            }
        } catch (Exception e) {
            log.error("Error while trying to wait for the lock from Hazelcast!", e);
        }
    }

    private synchronized void unlock() {
        try {
            getLock().unlock();
        } catch (IllegalStateException e) {
            log.warn("Exception while trying to unlock quartz lock: Hazelcast instance is already inactive!");
        } catch (Exception e) {
            log.warn("Exception during the unlock of the master Quartz!", e);
        }
    }
}

Таким образом, мы получим вероятность применять периодические задачи рекомендуемым в Camel методом и с учётом распределённой среды выполнения. Скажем, так:

<route id="quartz-route">
     <from uri="quartz://quartz-test/test?cron=* * * * * ?"/>
     <log message="Quartz each second message caught ${in.body.class}!"/>
     <to uri="direct:queue:done-quartz"/>
</route>

Finite state machine

Помимо примитивных методов аггрегации (скажем, подсчёта сумм), нам также Зачастую было нужно переключать состояния аггрегаторов в зависимости от поступающих сообщений, скажем, Дабы неизменно помнить нынешнее состояние исполненного теста. Для реализации этой вероятности отлично подходятфинальные автоматы. Предположим, что у нас есть некоторое состояние теста. Скажем, TestPassedState. При приобретении сообщения TestFailed для данного теста мы обязаны переключить состояние аггрегатора в TestFailedState, а при приобретении TestPassed вновь в TestPassedState. И так до бесконечности. На основе этих переходов дозволено делатьнекоторые итоги, скажем, если переход происходит TestPassed -> TestFailed, нужно оповестить всех заинтересованных лиц о том, что тест сломался. А если происходит обратный переход, то, напротив — рассказать им, что всё стало отлично.
image

Подбирая варианты реализации сходственной стратегии аггрегации, мы пришли к итогу, что нужна некая адаптированная к реалиям обработки сообщений модель финальных автоматов. Во-первых, сообщения поступающие на вход аггрегаторов — это некоторый комплект объектов. Всякое событие имеет личный тип, а следственно легко ложится на классы в Java. Для изложения типов событий мы используем xsd-схему, по которой с поддержкой xjc генерируем комплект классов. Эти классы легко сериализуются и десериализуются в xml и json, с применением jaxb. Состояния, хранящиеся в Hazelcast также представляются комплектом классов, сгенерированных по xsd. Таким образом, нам нужно было обнаружить реализацию финальных автоматов, разрешающую легко оперировать переходами между состояниями на основе типа сообщения и типа нынешнего состояния. И ещё хотелось, Дабы задавались эти переходы декларативно, а не императивно, как во множестве сходственных библиотек. Легковесной реализации сходственной функциональности мы не обнаружили, а следственно решили написать свою собственную, рассматривающую наши надобности и отлично ложащуюся в основу обработки сообщений, приходящих по маршруту в Camel.

Маленькая библиотечка, реализующая наши надобности получила наименование Yatomata (от слов Yet Another auTomata) и доступна на github.
Было решено несколько упростить модель FSM — скажем, контекст задаётся объектом нынешнего состояния, сообщение также хранит некоторые данные. Впрочем, переходы при этом определяются только типами состояний и сообщений. Стейт-машина определяется для класса, тот, что применяется в качестве аггрегатора. Для этого класс помечается аннотацией @FSM. Для неё определено начальное состояние (start) и комплект переходов, некоторые из которых останавливают аггрегацию (stop=true), механически отправляя собранное состояние дальше по маршруту.
Комплект переходов декларируется аннотацией @Transitions и массивом аннотаций @Transit, в всяком из которых дозволено задать комплект начальных состояний (from), финальное состояние (to), комплект событий, по которым данный переход активируется (on), а также указать является ли это состояние окончанием работы машины (stop). Для обработки переходов предусмотрены аннотации @OnTransit@BeforeTransit, а также@AfterTransit, которыми дозволено пометить публичные способы внутри класса. Эти способы будут вызваны в том случае, если обнаружен соответствующий переход, удовлетворяющий его сигнатуре.

@FSM(start = Undefined.class)
@Transitions({
      @Transit(on = TestPassed.class, to = TestPassedState.class),
      @Transit(on = TestFailed.class, to = TestFailedState.class),
      @Transit(stop = true, on = TestExpired.class),
})
public class TestStateFSM {

      @OnTransit
      public void onTestFailed(State oldState, TestFailedState newState, TestFailed event){}

      @OnTransit
      public void onTestPassed(State oldState, TestPassedState newState, TestPassed event){}
    }

Работа со стейт-машиной осуществляется дальнейшим образом:

Yatomata<TestStateFSM> fsm = new FSMBuilder(TestStateFSM.class).build();

fsm.getCurrentState();       // returns instance of Undefined
fsm.isStopped();             // returns false
fsm.getFSM();                // returns instance of TestStateFSM
fsm.fire(new TestPassed());  // returns instance of TestPassedState
fsm.fire(new TestFailed());  // returns instance of TestFailedState
fsm.fire(new TestExpired()); // returns instance of TestFailedState
fsm.isStopped();             // returns true

Путём реализации интерфейса AggregationStrategy, мы сделали FSMAggregationStrategy, объявление которого в контексте Spring происходит так:

    <bean id="runnableAggregator">
        <constructor-arg value="com.my.TestStateFSM"/>
    </bean>

Простейшая реализация стратегии аггрегации в случае применения этой стейт-машины может выглядеть дальнейшим образом:

Спрятанный текст

public class FSMAggregationStrategy<T> implements AggregationStrategy {
    private final Yatomata<T> fsmEngine;

    public FSMAggregationStrategy(Class fsmClass) {
        this.fsmEngine = new FSMBuilder(fsmClass).build();
    }

    @Override
    public Exchange aggregate(Exchange state, Exchange message) {
        Object result = state == null ? null : state.getIn().getBody();
        try {
            Object event = message.getIn().getBody();
            Object fsm = fsmEngine.getFSM();
            result = fsmEngine.fire(event);            
        } catch (Exception e) {
            logger.error(fsm   " error", e);
        }

        if (result != null) {
            message.getIn().setBody(result);
        }
        return message;
    }

    public boolean isCompleted() {
        return fsmEngine.isCompleted();
    }
}

Итоги

Перечисленные приёмы дозволили нам реализовать несколько горизонтально масштабируемых сервисов разного назначения. Apache Camel показал себя с лучшей стороны и оправдал возложенные на него веры. Декларативность гармонирует в нём с высокой гибкостью, что в сумме обеспечивает хорошее масштабирование интеграционных приложений с приложением минимума усилий на поддержку и добавление новой функциональности.

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB