Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Taskurotta либо управление процессами в распределенной системе

Anna | 3.06.2014 | нет комментариев
Добрый день, програюзер!

Есть у нас задача объединять разные сервисы и существующие системы в управляемые процессы. Скорость необходима не космическая (т.е. не по биржевым котировкам отклик создавать), но но процессов много и компонент (систем) которые необходимо применять тоже добропорядочно вырисовывается. Не хочется делать p2p связывание. Хочется чего-то прекрасного и управляемого.

Просмотрев рынок, было принято решение сделать реплику по мотивам Amazon Simple Workflow, так как применять его напрямую мы не можем. Свойства фреймворка которые нам подходят:

  • Низкий порог старта (отличный программист ныне дорог). Низкий порог здесь огромнее в части начала программирования так как все делается на высоком ярусе — примерно на ярусе взаимодействия с интерфейсами. Но Дабы хорошо асинхронно руководить процессом нужно безусловно нарастить навык
  • При сохранении параметров задач и итогов выполнения получаем обзор и базу для регрессионного тестирования исполнителей задач процесса
  • Насыщенность логики управления процессом в определенных местах (в Координаторе). Это может на 1-й взор не видимо, но это величайшее благо по сопоставлению с допустимой альтернативой, когда всякий артист имеет свою логику — какие другие компоненты дальше вызвать (передать управление). Зачастую приводит к усложнению системы и неосуществимости переиспользовать компоненты

Это минимум тот, что хотелось бы, но как показывает практика, плюсов огромнее. План назвали Taskurotta в честь «Task» — задачи, и суслика по фински, которому все равно, которого не видно, но он есть. Открытый начальный код доступен на GitHub. План реализован с поддержкой Hazelcast для образования всеобщего пространства памяти и среды выполнения между серверами, Dropwizard для стремительной и комфортной реализации REST сервисов и друзей из Amazon которые были первыми и сделали чудесный продукт вдохновив нас на собственную разработку. С документацией пока трудно, но скоро исправим.

Давайте перейдем от теории к тому, что теперь есть на настоящем примере.

Представим что нам нужно разработать приложение, отсылающее строковое сообщение пользователю. На вход мы получаем Id пользователя и комплект символов. Из его профиля (по Id) достаем данные о предпочтении — получать сообщения по email либо номеру телефона. Номер телефона и email также доступны в профиле. Дальше отправляем сообщение необходимым транспортом. В случае, если
отправка не удалась (по причине не правильного адреса либо номера), нужно подметить это в профиле для предотвращения повторных попыток в грядущем.

Добавим еще к этому не функциональное требование, о том, что сервисы отправляющие сообщения теснее существуют, находятся в других подсетях и их необходимо переиспользовать.

PS: Каждый начальный код описываемого примера доступен также на GitHub taskurotta\taskurotta-getstarted

Taskurotta разрешает реализовывать компоненты системы (Артистов), взаимодействующих между собой привычным для разработчика методом — путем вызовов способов друг друга, но в асинхронной повадке. Артисты делятся на два вида — Исполнителей и Координаторов. Исполнители обязаны Отчетливо исполнять поставленные перед ними задачи. Они представляют собой максимально само­стоятельные модули, и соответственно — максимально переиспользуемые. Исполнители могут взаимодействовать с внешним миром (всякие потоки ввода итога) исполняя задачу таким образом и так длинно, как этого требуется. С иной стороны, Координаторы не исполняют задач, связанных с внешним миром. Они обязаны отрабатывать как дозволено стремительней и не спотыкаться на прямом взаимодействии с БД, сетью и другими допустимо не стабильными компонентами. Их обязанность ставить задачи исполнителям, координировать их действия и тем самым обеспечить реализацию (изложение) процесса. Координаторы могут ставить задачи иным координаторам, реализуя парадигму переиспользуемых подпроцессов.

Задача Координатора как дозволено стремительней раздать знаменитые в данный момент задачи. Т.е. он не должен блокироваться на ожидании итога. Он должен возвести зависимости между вестимыми ему задачами и при необходимости сформировать асинхронные точки определения дальнейших действий.

Для нашего процесса Координатор должен сделать следующее:

  1. Запросить профиль пользователя
  2. Дождаться приобретения профиля
  3. Отправить сообщение пользователю
  4. Дождаться итога отправления

Кодировать данную последовательность действий с поддержкой автомата состояний, способами изменений одного сообщения несколькими исполнителями и другими привычными, но костлявыми методами мы не будем. Сделаем это легко и прекрасно с поддержкой сущности Promise и нашей системы, следящей за действиями Координатора.

Promise<Profile> profilePromise = userProfileService.get(userId);
Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);

В примере видно, что в итоге вызова сервисов мы получаем не настоящий объект, а некоторый Promise — ссылку на итог выполнения задачи. Данный Promise мы можем передавать в качестве довода иным сервисам (т.е. задачам). Вызовы других сервисов будут перехвачены системой (т.е. реального синхронного вызова не произойдет) и выстроена связанность между ними. Задачи не поступят на выполнение к сервисам до тех пор, пока все их доводы типа Promise не будут готовы, т.е. пока не будут исполнены все нужные заблаговременные задачи.

Таким образом, управление процессом выполняется коллективно координатором и нашей системой. Координатор выстраивает зависимости между задачами, а система берет на себя, помимо каждого прочего, функцию ожидания выполнения заблаговременных задач и дальнейшего запуска зависимых от них задач.

Давайте сейчас раскроем, что такое асинхронные точки определения дальнейших действий.

Нам нужно удостовериться в том, что отправка уведомления была удачна и если нет то необходимо заблокировать отправку уведомлений пользователю.

В данном случае позже отправки уведомления и перед дальнейшими действиями нужно проанализировать итог отправки. Т.е. дождаться выполнения задачи, произвести обзор и в зависимости от итога, блокировать либо нет. Для решения такой задачи у координатора есть вероятность сделать задачу на самого себя — т.е. точку определения дальнейших действий, в которую передать нужные Promise. Ниже представлено как это выглядит.

    public void start(String userId, String message) {
           Promise<Profile> profilePromise = userProfileService.get(userId);
           Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);
           decider.blockOnFail(userId, sendResultPromise);
       }

     @Asynchronous
     public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) {
            logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", userId, sendResultPromise);
            if (!sendResultPromise.get()) {
                userProfileService.blockNotification(userId);
            }
     }

Способ start() — это старт процесса. Дальше идет постановка 3 задач. Первая на приобретение профиля, вторую и третью Координатор ставит сам себе для дальнейшего обзора итога (вызов способов sendToTransport и blockOnFail). Таким образом Координатор как бы ожидает решения первой задачи, но без блокировки. Как только задача решена, система Taskurotta вызывает способ координатора sendToTransport, передавая в него готовый Promise объект, из которого дозволено получить настоящие данные способом get(). Позже выполнения задачи sendToTransport запускается задача blockOnFail где мы ставим задачу сервису userProfileService на блокировку сообщений для пользователя userId если случилась оплошность при отправке уведомления.

С поддержкой точек определения дальнейших действий дозволено реализовать разные поведения процесса:

  • Распараллеливание на разные ветки
  • Последующее слияние самостоятельных потоков процесса в одной точке с поддержкой проброса Promise и @NoWait аннотации
  • Асинхронную рекурсию
  • Распараллеливание выполнения однотипных задач, скажем проверки ЭЦП всех файлов и ожидания итогов выполнения в одной точке принятия решений
  • и т.д.

P.S.: Вызов задачи blockOnFail происходит через объект decider. Это неестественный объект, перехватывающий вызов, но реально не дерзкий способ blockOnFail. Нам необходимо поставить задачу, а не вызвать ее синхронно.

Так как по сценарию у нас теснее есть Исполнители для отправки email и смс, то нам остается только сделать Исполнителя для работы с профилем. У данного Исполнителя две задачи:

  1. Воротить профиль по идентификатору пользователя
  2. Сделать в профиле отметку о неосуществимости отправки сообщений для определенного пользователя

Начинаем с объявления его интерфейса. С этим интерфейсом будет трудиться Координатор. Тут и дальше, для компактности опущены комментарии и другие не значительные части кода.

    @Worker
    public interface UserProfileService {

        public Profile get(String userId);

        public void blockNotification(String userId);

    }

Аннотация @Worker определяет данный интерфейс как Исполнителя. У аннотации есть необязательные признаки определяющие его имя и версию (контракта). По умолчанию, именем является полное имя интерфейса, а версия — «1.0». Исполнители разных версий могут единовременно трудиться для различных процессов без каких либо раздоров.

Перейдем к реализации интерфейса.

    public class UserProfileServiceImpl implements UserProfileService {

        private static final Logger logger = LoggerFactory.getLogger(UserProfileServiceImpl.class);

        @Override
        public Profile get(String userId) {
            return ProfileUtil.createRandomProfile(userId);
        }

        @Override
        public void blockNotification(String userId) {
            logger.info(".blockNotification(userId = [{}]", userId);
        }
    }

Здесь мы опустили инициализацию администратора профилей (ProfileUtil). Он может трудиться с БД, LDAP либо иным реестром. Данный пример нам показывает, что Исполнитель получает задачи (вызовы) и делегирует их реальному модулю.

На этом создание Исполнителя завершается.

Для решения поставленной перед нами задачи, Координатор должен передать ссылку на еще не полученный профиль пользователя (объект Promise) в точку определения дальнейших действий. Там он предпочтет транспорт либо не будет ничего отсылать, если отправка сообщений для данного пользователя теснее заблокирована.

Впрочем интерфейс исполнителя, как и сам исполнитель, получают и отдают итог синхронно, а потому не имеют в декларации итогов выполнения в виде объекта Promise, а возвращают чистый объект данных. Это и верно. Исполнитель не должен знать как его применяют. Скажем, наш Исполнитель по приобретению профиля дозволено применять если теснее знаменит идентификатор пользователя, либо если он не знаменит и необходимо передать ссылку на иную задачу, которая данный идентификатор откуда-то получит. Таким образом мы приходим к интерфейсу взаимодействия с Исполнителем. Данный интерфейс определяет сам Координатор для своих нужд. Т.е. он определяется в пакете (плане) Координатора. Добавим интерфейс взаимодействия с Исполнителем для работы с профилем:

    @WorkerClient(worker = UserProfileService.class)
    public interface UserProfileServiceClient {

        public Promise<Profile> get(String userId);

        public void blockNotification(String userId);
    }

Мы видим интерфейс помеченный аннотацией @WorkerClient. Параметр аннотации ссылается на класс реального интерфейса Исполнителя. Таким образом устанавливается связь между присутствующим интерфейсом и нужным интерфейсом для определенного Координатора. Назовем данный интерфейс «клиентским интерфейсом Исполнителя». Данный клиентский интерфейс должен содержать все нужные координатору способы (дозволено не объявлять не используемые) и с одинаковой сигнатурой доводов. Всякий довод может быть типом Promise, если требуется передавать в качестве довода итог еще не законченной задачи.

Сейчас переходим к самому увлекательному — созданию координатора. Для начала ниже представлен интерфейс координатора, применяя тот, что заказчики Taskurotta будут запускать надобные им процессы.

    @Decider
    public interface NotificationDecider {

        @Execute
        public void start(String userId, String message);
    }

Данный интерфейс определен как @Decider — т.е. как Координатор. У этой аннотации есть те же свойства, что и у аннотации @Worker — имя и версия. По умолчанию за имя берется полное имя интерфейса, а за версию — «1.0».

Способ start помечен как @Execute. Это обозначает что через данный способ дозволено запускать процесс.

Сейчас переходим к реализации координатора

    public class NotificationDeciderImpl implements NotificationDecider {

        private static final Logger logger = LoggerFactory.getLogger(NotificationDeciderImpl.class);

        private UserProfileServiceClient userProfileService;
        private MailServiceClient mailService;
        private SMSServiceClient smsService;
        private NotificationDeciderImpl decider;

        @Override
        public void start(String userId, String message) {
            logger.info(".start(userId = [{}], message = [{}])", userId, message);

            Promise<Profile> profilePromise = userProfileService.get(userId);
            Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);
            decider.blockOnFail(userId, sendResultPromise);
        }

        @Asynchronous
        public Promise<Boolean> sendToTransport(Promise<Profile> profilePromise, String message) {
            logger.info(".sendToTransport(profilePromise = [{}], message = [{}])", profilePromise, message);

            Profile profile = profilePromise.get();

            switch (profile.getDeliveryType()) {
                case СМС: {
                    return smsService.send(profile.getPhone(), message);
                }
                case EMAIL: {
                    return mailService.send(profile.getEmail(), message);
                }

            }

            return Promise.asPromise(Boolean.TRUE);
        }

        @Asynchronous
        public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) {
            logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", userId, sendResultPromise);

            if (!sendResultPromise.get()) {
                userProfileService.blockNotification(userId);
            }
        }
    }

В данном коде мы также опустили инициализацию приватных объектов. Полный и работающий код примера дозволено посмотреть в плане taskurotta-getstarted. Здесь только подметим, что значения приватных полей получаются через особую фабрику прокси объектов для Координатора.

В примере реализации есть две точки ожидания итогов выполнения незавершенных задач Координатором. Это способ sendToTransport и blockOnFail. Данные способы будут вызваны только тогда, когда все их доводы типа Promise будут готовы,
т.е. исполнены соответствующий задачи.

Объекты полей типа MailServiceClient и SMSServiceClient также являются клиентскими интерфейсами к соответствующим Исполнителям. Их инициализацию дозволено также посмотреть в плане taskurotta-getstarted.

На данный момент у нас есть все реализованные Исполнители и Координатор. Перейдем непринужденно к запуску Артистов (т.е. Исполнителей и Координаторов).

Выполнение задач может протекать как внутри серверов приложений, так и в виде отдельного java приложения (данный пример использует вариант отдельного приложения из модуля taskurotta\bootstrap). Что делает отдельное приложение:

  • Регистрируется на сервере Taskurotta
  • Запускает пул из N потоков для выполнения задач
  • Получает задачи от серверов Taskurotta
  • Запускает их выполнение
  • Пересылает итог серверу Taskurotta

Для запуска отдельного java приложения применяется пакет bootstrap, а определеннее — класс ru.taskurotta.bootstrap.Main. Ему в качестве довода необходимо передать местонахождение файла конфигурации в формате YAML.

Как это испробовать запустить? Дюже легко. Ниже пошаговая сборка сервера, артистов и их запуск из начального кода. будьте внимательны, нужны незначительные метаморфозы если у вас не linux.

Возможен у вас теснее есть:

  • jdk 1.7
  • maven 3
  • git

соберем сервер Taskurotta

git clone https://github.com/taskurotta/taskurotta.git
cd taskurotta/

Переключимся на последнюю протестированную версию (на днях выйдет версия 0.4 и этого не необходимо будет делать)

git checkout 5918c8db1a

Запустим сборку. Для убыстрения отключим тесты.

mvn clean install -DskipTests

Сейчас запустим кластер из 2-х узлов (мы используем одну машину для демонстрационных целей и следственно разные порты в параметрах запуска). В реальной среде дозволено запустить нужное число машин с идентичной конфигурацией.

Запускаем 1-й узел кластера:

java -Xmx64m -Ddw.http.port=8081 -Ddw.http.adminPort=9081 -Ddw.logging.file.currentLogFilename="assemble/target/server1.log" -jar assemble/target/assemble-0.4.0-SNAPSHOT.jar server assemble/src/main/resources/hz.yml

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB