Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Вступление в Акторы на основе Java/GPars, Часть I

Anna | 2.06.2014 | нет комментариев
Кратко рассматривается API библиотеки GPars и решение многопоточной задачи средней трудности, итоги которой могут быть пригодны в «народном хозяйстве».

Данная статья написана в ходе изыскания разных библиотек акторов, доступных Java-программисту, в процессе подготовки к чтению курса «Multicore programming in Java».

Это первая статья из цикла статей цель которых сравнить API, быстродействие и реализацию акторов Akka с реализациями в других библиотеках на некоторой модельной задаче. Данная статья предлагает такую задачу и решение на GPars.

GPars — библиотека написанная для Clojure с широкой помощью разных подходов к параллельным вычислениям.
Плюсы GPars

  • Начальный код написан на Java (в различии от Akka, написанной на Scala). Неизменно увлекательно посмотреть «что под капотом» на «родном» языке программирования
  • GPars представляет собой целый «зоопарк» подходов (Actor, Agent, STM, CSP, Dataflow)
  • GPars использует классы из runtime-библиотеки Clojure, написанной на Java. Увлекательно покопаться

«Установка» GPars

Подключаете в Maven GPars и Groovy

<dependency>
    <groupId>org.codehaus.gpars</groupId>
    <artifactId>gpars</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy-all</artifactId>
    <version>2.2.2</version>
</dependency>

Без Maven легко качайте из репозитория GPars-1.1.0 (sources) и Groovy-2.2.2 (sources) и подключайте к плану.

Stateless Actor

Начнем с примитивных примеров.
Посылаем сообщение актору.

import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String msg) {
                System.out.println("receive: "   msg);
            }
        }.start();
        actor.send("Hello!");

        System.in.read();
    }
}
>> receive: Hello!

Посылаем сообщение и ожидаем результата

import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String msg) {
                System.out.println("ping: "   msg);
                getSender().send(msg.toUpperCase());
            }            
        }.start();
        System.out.println("pong: "   actor.sendAndWait("Hello!"));
    }
}
>> ping: Hello!
>> pong: HELLO!

Посылаем сообщение и вещаем на результат асинхронный callback

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String msg) {
                System.out.println("ping: "   msg);
                getSender().send(msg.toUpperCase());
            }            
        }.start();
        actor.sendAndContinue("Hello!", new MessagingRunnable<String>() {
            protected void doRun(String msg) {
                System.out.println("pong: "   msg);
            }
        });

        System.in.read();
    }
}
>> ping: Hello!
>> pong: HELLO!

Делаем pattern matching по типам принятого сообщения

import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String arg) {
                getSender().send(arg.toUpperCase());
            }

            public void onMessage(Long arg) {
                getSender().send(1000   arg);
            }
        }.start();

        System.out.println("42.0 -> "   actor.sendAndWait(42.0));
    }
}
>> Hello! -> HELLO!
>> 42 -> 1042

Pattern matching не обнаружил подходящего обработчика

import groovyx.gpars.actor.*;

public class Demo {
    public static void main(String[] args) throws Exception {
        Actor actor = new DynamicDispatchActor() {
            public void onMessage(String arg) {
                getSender().send(arg.toUpperCase());
            }

            public void onMessage(Long arg) {
                getSender().send(1000   arg);
            }
        }.start();

        System.out.println("42.0 -> "   actor.sendAndWait(42.0));
    }
}
>> An exception occurred in the Actor thread Actor Thread 1
>> groovy.lang.MissingMethodException: No signature of method:
>> net.golovach.Demo_4$1.onMessage() is applicable for argument types: (java.lang.Double) values: [42.0]
>> Possible solutions: onMessage(java.lang.Long), onMessage(java.lang.String)
>> 	at org.codehaus.groovy.runtime.ScriptBytecodeAdapter ...
>>   ...

Что видно
— «pattern matching» делает подбором подходящего перегруженного (overloaded) варианта способа onMessage(<one-arg>), если такового нет, то «получаем» исключение
— акторы работают на основе пула потоков-«демонов», так что нам нужно как-то подвесить работу способа main() (я применял System.in.read()) с целью недопустить несвоевременное заключение работы JVM
— на примере способа reply() мы видим, что при наследовании от DynamicDispatchActor в «пространство имен» актора попадает уйма способов (reply, replyIfExists, getSender, terminate, …)

Правда авторы GPars и называют преемников класса DynamicDispatchActor — акторами-без-состояния (stateless actor), это — обыкновенные экземпляры java-классов, которые могут иметь мутирующие поля и беречь в них свое состояние. Продемонстрируем это

import groovyx.gpars.actor.*;

import java.util.ArrayList;
import java.util.List;

public class StatelessActorTest {
    public static void main(String[] args) throws InterruptedException {
        Actor actor = new DynamicDispatchActor() {
            private final List<Double> state = new ArrayList<>();
            public void onMessage(final Double msg) {
                state.add(msg);
                reply(state);
            }
        }.start();

        System.out.println("answer: "   actor.sendAndWait(1.0));
        System.out.println("answer: "   actor.sendAndWait(2.0));
        System.out.println("answer: "   actor.sendAndWait(3.0));
        System.out.println("answer: "   actor.sendAndWait(4.0));
        System.out.println("answer: "   actor.sendAndWait(5.0));
    }
}
>> answer: [1.0]
>> answer: [1.0, 2.0]
>> answer: [1.0, 2.0, 3.0]
>> answer: [1.0, 2.0, 3.0, 4.0]
>> answer: [1.0, 2.0, 3.0, 4.0, 5.0]

Statefull Actor

Вводя деление stateless/statefull, авторы имею в виду, что Statefull Actor разрешают органично создавать реализации образца State. Разглядим примитивный пример (преемники DefaultActor — Statefull Actor-ы)

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

import static java.util.Arrays.asList;

public class StatefulActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatefulActor().start();

        actor.send("A");
        actor.send(1.0);
        actor.send(Arrays.asList(1, 2, 3));

        actor.send("B");
        actor.send(2.0);
        actor.send(Arrays.asList(4, 5, 6));

        System.in.read();
    }

    private static class MyStatefulActor extends DefaultActor {
        protected void act() {
            loop(new Runnable() {
                public void run() {
                    react(new MessagingRunnable<Object>(this) {
                        protected void doRun(final Object msg) {
                            System.out.println("react: "   msg);
                        }
                    });
                }
            });
        }
    }
}
>> react: A
>> react: 1.0
>> react: [1, 2, 3]
>> react: B
>> react: 2.0
>> react: [4, 5, 6]

Впрочем, обещанной реализацией образца State вовсе «не пахнет». Давайте зайдем с такой стороны (Java не наилучший язык для таких трюков, на Clojure/Scala данный код выглядит гораздо суперкомпактнее)

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

import java.util.List;
import static java.util.Arrays.asList;

public class StatefulActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatefulActor().start();

        actor.send("A");
        actor.send(1.0);
        actor.send(asList(1, 2, 3));

        actor.send("B");
        actor.send(2.0);
        actor.send(asList(4, 5, 6));

        System.in.read();
    }

    private static class MyStatefulActor extends DefaultActor {
      protected void act() {
        loop(new Runnable() {
          public void run() {
            react(new MessagingRunnable<String>(this) {
              protected void doRun(final String msg) {
                System.out.println("Stage #0: "   msg);
                react(new MessagingRunnable<Double>() {
                  protected void doRun(final Double msg) {
                    System.out.println("  Stage #1: "   msg);
                    react(new MessagingRunnable<List<Integer>>() {
                      protected void doRun(final List<Integer> msg) {
                        System.out.println("    Stage #2: "   msg   "n");
                      }
                  });
                }
              });
            }
          });
        }
      });
    }
  }
}
>> Stage #0: A
>>   Stage #1: 1.0
>>     Stage #2: [1, 2, 3]
>> 
>> Stage #0: B
>>   Stage #1: 2.0
>>     Stage #2: [4, 5, 6]

Ну либо давайте избавимся от этой жудкой вложенности неизвестных классов и «материализуем состояния»

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

import java.util.List;
import static java.util.Arrays.asList;

public class StatefulActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatefulActor().start();

        actor.send("A");
        actor.send(1.0);
        actor.send(asList(1, 2, 3));

        actor.send("B");
        actor.send(2.0);
        actor.send(asList(4, 5, 6));

        System.in.read();
    }

    private static class MyStatefulActor extends DefaultActor {
        protected void act() {
            loop(new Runnable() {
                public void run() {
                    react(new Stage0(MyStatefulActor.this));
                }
            });
        }
    }

    private static class Stage0 extends MessagingRunnable<String> {
        private final DefaultActor owner;
        private Stage0(DefaultActor owner) {this.owner = owner;}

        protected void doRun(final String msg) {
            System.out.println("Stage #0: "   msg);
            owner.react(new Stage1(owner));
        }
    }

    private static class Stage1 extends MessagingRunnable<Double> {
        private final DefaultActor owner;
        private Stage1(DefaultActor owner) {this.owner = owner;}

        protected void doRun(final Double msg) {
            System.out.println("  Stage #1: "   msg);
            owner.react(new Stage2());
        }
    }

    private static class Stage2 extends MessagingRunnable<List<Integer>> {
        protected void doRun(final List<Integer> msg) {
            System.out.println("    Stage #2: "   msg   "n");
        }
    }
}

Да, да, я с Вами всецело согласен, Java — весьма многословный язык.

Вот как выглядит диаграмма переходов (развилок по доводу мы не делали)

// START
// -----  
//   |
//   |
//   |
//   |   -------- 
//    ->| Stage0 | ---String---- 
//       --------               |
//         ^                    v
//         |                 -------- 
//         |                | Stage1 |
//   List<Integer>           -------- 
//         |                    |
//         |   --------       Double
//          --| Stage2 |<------- 
//             -------- 

Таймер

Для решения моей задачи мне будет нужен таймер — что-то, что дозволено запрограммировать оповестить меня об окончании некоторого интервала времени. В «обыкновенной» Java мы используем java.util.concurrent.ScheduledThreadPoolExecutor либо java.util.Timer на худой конец. Но мы же в мире акторов!
Это Statefull Actor, тот, что висит в ожидании сообщения в способе react() с таймаутом. Если никакое сообщение не приходит в течении этого интервала времени, то инфраструктура GPars присылает нам сообщение Actor.TIMEOUT (это легко строка «TIMEOUT») и мы «возвращаем» нашему создателю сообщение из конструктора timeoutMsg. Если же вы хотите «отключить» таймер — пришлите ему всякое другое сообщение (я буду присылать ему строку «KILL»)

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;
import groovyx.gpars.actor.impl.MessageStream;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class Timer<T> extends DefaultActor {
    private final long timeout;
    private final T timeoutMsg;
    private final MessageStream replyTo;

    public Timer(long timeout, T timeoutMsg, MessageStream replyTo) {
        this.timeout = timeout;
        this.timeoutMsg = timeoutMsg;
        this.replyTo = replyTo;
    }

    protected void act() {
        loop(new Runnable() {
            public void run() {
                react(timeout, MILLISECONDS, new MessagingRunnable() {
                    protected void doRun(Object argument) {
                        if (Actor.TIMEOUT.equals(argument)) {
                            replyTo.send(timeoutMsg);
                        }
                        terminate();
                    }
                });
            }
        });
    }
}

Пример применения таймера.
Я создаю два таймера timerX и timerY, которые с задержкой 1000мс вышлют мне сообщения «X» и «Y» соответственно. Но через 500мс я передумал и «прибил» timerX.

import groovyx.gpars.actor.Actor;
import groovyx.gpars.actor.impl.MessageStream;

public class TimerDemo {
    public static void main(String[] args) throws Exception {
        Actor timerX = new Timer<>(1000, "X", new MessageStream() {
            public MessageStream send(Object msg) {
                System.out.println("timerX send timeout message: '"   msg   "'");
                return this;
            }
        }).start();
        Actor timerY = new Timer<>(1000, "Y", new MessageStream() {
            public MessageStream send(Object msg) {
                System.out.println("timerY send timeout message: '"   msg   "'");
                return this;
            }
        }).start();

        Thread.sleep(500);
        timerX.send("KILL");

        System.in.read();
    }
}
>> timerY send timeout message: 'Y'

Постановка задачи и схема решения

Разглядим следующую крайне всеобщую задачу.
1. У нас есть много потоков, которые довольно Зачастую вызывают некоторую функцию.
2. У этой функции есть два варианта: обработка одного довода и обработка списка доводов.
3. Эта функция такова, что обработка списка доводов потребляет поменьше источников системы, чем сумма обработок всякого в отдельности.
4. Задача состоит в том, что бы между потоками и функцией разместить определенный Batcher, тот, что собирает доводы от потоков в «пачку», передает функции, она обрабатывает список, Batcher «раздает» итоги потокам отправителям.
5. Batcher передает список доводов в 2-х случаях: собрали «пачку» довольного размера либо по истечению времени ожидания, в течении которого не удалось собрать полную «пачку», но потокам теснее пора возвращать итоги.

Давайте разглядим схему решения.
Таймаут 100мс, наивысший размер «пачки» — 3 довода

В момент времени 0 поток T-0 посылает довод «A». Batcher находится в «чистом» состоянии, поколение 0

//time:0
//
//  T-0 --"A"----->      -------  generationId=0
//  T-1                 |Batcher| argList=[]
//  T-2                  -------  replyToList=[]

через миг Batcher знает, что нужно обсчитать «A» и воротить потоку T-0. Заведен таймер для поколения 0

//                                     -----  timeoutMsg=0
//                                    |Timer| timeout=100
//time:0.001                           ----- 
//
//  T-0                  -------  generationId=0
//  T-1                 |Batcher| argList=["A"]
//  T-2                  -------  replyToList=[T-0]

В момент времени 25 миллисекунд поток T-1 посылает на обработку «B»

//                                     -----  timeoutMsg=0
//                                    |Timer| timeout=100
//time:25                              ----- 
//
//  T-0                  -------  generationId=0
//  T-1 ---"B"---->     |Batcher| argList=["A"]
//  T-2                  -------  replyToList=[T-0]

через миг Batcher знает, что нужно обсчитать «A» и «B» и воротить потокам T-0 и T-1

//                                     -----  timeoutMsg=0
//                                    |Timer| timeout=100
//time:25.001                          ----- 
//
//  T-0                  -------  generationId=0
//  T-1                 |Batcher| argList=["A","B"]
//  T-2                  -------  replyToList=[T-0,T-1]

В момент времени 50 миллисекунд поток T-2 посылает на обработку «С»

//                                     -----  timeoutMsg=0
//                                    |Timer| timeout=100
//time:50                              ----- 
//
//  T-0                  -------  generationId=0
//  T-1                 |Batcher| argList=["A","B"]
//  T-2 ----"C"--->      -------  replyToList=[T-0,T-1]

через миг Batcher знает, что нужно обсчитать «A», «B» и «C» и воротить потокам T-0, T-1 и T-2. Выясняет, что «пачка» наполнена и «убивает» таймер

//                                     -----  timeoutMsg=0
//                           -"KILL"->|Timer| timeout=100
//time:50.001               |          ----- 
//                          |
//  T-0                  -------  generationId=0
//  T-1                 |Batcher| argList=["A","B","C"]
//  T-2                  -------  replyToList=[T-0,T-1,T-2]

через миг Batcher отдает данные на обсчет в отдельному актору (anonimous), очищает состояние и меняет поколение с 0 на 1

//time:50.002
//
//  T-0                  -------  generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2                  -------  replyToList=[]
//
//                                  ---------  argList=["A","B","C"]
//                                 |anonymous| replyToList=[T-0,T-1,T-2]
//                                  --------- 

через миг (для «раскадровки» буду считать, что вычисления мгновенны) неизвестный актор исполняет действие над списком доводов [«A»,«B»,«C»] -> [«res#A»,«res#B»,«res#C»]

//time:50.003
//
//  T-0                  -------  generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2                  -------  replyToList=[]
//
//                                  ---------  resultList=["res#A","res#B","res#B"]
//                                 |anonymous| replyToList=[T-0,T-1,T-2]
//                                  --------- 

через миг неизвестный артист раздает итоги вычисления потокам

//time:50.004
//
//  T-0 <-----------     -------  generationId=1
//  T-1 <---------  |   |Batcher| argList=[]
//  T-2 <-------  | |    -------  replyToList=[]
//              | | |
//              | |  ---"res#A"---  --------- 
//              |  ---"res#B"----- |anonymous|
//               --"res#C"--------  --------- 

через миг система возвращает в начальное «чистое» состояние

//time:50.005
//
//  T-0                  -------  generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2                  -------  replyToList=[]

Позднее, в момент времени, 75 поток T-2 передает на вычисление «D»

//time:75
//
//  T-0                  -------  generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2 ----"D"--->      -------  replyToList=[]

через миг Batcher знает, что нужно обсчитать «D» и воротить потоку T-2, помимо того запущен таймер для поколения 1

//                                     -----  timeoutMsg=1
//                                    |Timer| timeout=100
//time:75.001                          ----- 
//
//  T-0                  -------  generationId=1
//  T-1                 |Batcher| argList=["D"]
//  T-2                  -------  replyToList=[T-2]

через 100мс (в момент времени 175мс) инфраструктура GPars информирует таймер о истечении периода ожидания

//                                         --"TIMEOUT"--
//                                        |
//                                        v
//                                     -----  timeoutMsg=1
//                                    |Timer| timeout=100
//time:175                             ----- 
//
//  T-0                  -------  generationId=1
//  T-1                 |Batcher| argList=["D"]
//  T-2                  -------  replyToList=[T-2]

через миг таймер информирует Batcher о том, что время ожидания поколения 1 истекло и кончает жизнь самоубийством вызывая terminate()

//                                      -----  timeoutMsg=1
//                           ----1-----|Timer| timeout=100
//time:175.001              |           ----- 
//                          v
//  T-0                  -------  generationId=1
//  T-1                 |Batcher| argList=["D"]
//  T-2                  -------  replyToList=[T-2]

Создается неизвестный актор, тот, что исполняет вычисления над списком доводов (в котором каждого 1 довод). Поколение с 1 меняется на 2

//time:175.002
//
//  T-0                  -------  generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2                  -------  replyToList=[]
//
//                                  ---------  argList=["D"]
//                                 |anonymous| replyToList=[T-2]
//                                  --------- 

Актор исполнил работу

//time:175.003
//
//  T-0                  -------  generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2                  -------  replyToList=[]
//
//                                  ---------  resultList=["res#D"]
//                                 |anonymous| replyToList=[T-2]
//                                  --------- 

Актор отдает итог

//time:175.004
//
//  T-0                  ------- generationId=2
//  T-1                 |Batcher|argList=[]
//  T-2 <-------         ------- replyToList=[]
//              |
//              |                --------- 
//               --"res#C"----- |anonymous|
//                               --------- 

Система в начальном «чистом» состоянии

//time:175.005
//
//  T-0                  -------  generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2                  -------  replyToList=[]

Решение задачи

BatchProcessor — интерфейс «функции». допускающей «пакетный режим» обработки

import java.util.List;

public interface BatchProcessor<ARG, RES> {
    List<RES> onBatch(List<ARG> argList) throws Exception;
}

Batcher — класс, «пакующий» доводы. Ядро решения

import groovyx.gpars.actor.*;
import groovyx.gpars.actor.impl.MessageStream;

import java.util.*;

public class Batcher<ARG, RES> extends DynamicDispatchActor {
    // fixed parameters
    private final BatchProcessor<ARG, RES> processor;
    private final int maxBatchSize;
    private final long batchWaitTimeout;
    // current state
    private final List<ARG> argList = new ArrayList<>();
    private final List<MessageStream> replyToList = new ArrayList<>();
    private long generationId = 0;
    private Actor lastTimer;

    public Batcher(BatchProcessor<ARG, RES> processor, int maxBatchSize, long batchWaitTimeout) {
        this.processor = processor;
        this.maxBatchSize = maxBatchSize;
        this.batchWaitTimeout = batchWaitTimeout;
    }

    public void onMessage(final ARG elem) {
        argList.add(elem);
        replyToList.add(getSender());
        if (argList.size() == 1) {
            lastTimer = new Timer<>(batchWaitTimeout,   generationId, this).start();
        } else if (argList.size() == maxBatchSize) {
            lastTimer.send("KILL");
            lastTimer = null;
            nextGeneration();
        }
    }

    public void onMessage(final long timeOutId) {
        if (generationId == timeOutId) {nextGeneration();}
    }

    private void nextGeneration() {
        new DynamicDispatchActor() {
            public void onMessage(final Work<ARG, RES> work) throws Exception {
                List<RES> resultList = work.batcher.onBatch(work.argList);
                for (int k = 0; k < resultList.size(); k  ) {
                    work.replyToList.get(k).send(resultList.get(k));
                }
                terminate();
            }
        }.start().send(new Work<>(processor, new ArrayList<>(argList), new ArrayList<>(replyToList)));
        argList.clear();
        replyToList.clear();
        generationId = generationId   1;
    }

    private static class Work<ARG, RES> {
        public final BatchProcessor<ARG, RES> batcher;
        public final List<ARG> argList;
        public final List<MessageStream> replyToList;

        public Work(BatchProcessor<ARG, RES> batcher, List<ARG> argList, List<MessageStream> replyToList) {
            this.batcher = batcher;
            this.argList = argList;
            this.replyToList = replyToList;
        }
    }
}

BatcherDemo — демонстрация работы класса Batcher. Совпадает со схематичным планом

import groovyx.gpars.actor.Actor;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import static java.util.concurrent.Executors.newCachedThreadPool;

public class BatcherDemo {
    public static final int BATCH_SIZE = 3;
    public static final long BATCH_TIMEOUT = 100;

    public static void main(String[] args) throws InterruptedException, IOException {
        final Actor actor = new Batcher<>(new BatchProcessor<String, String>() {
            public List<String> onBatch(List<String> argList) {
                System.out.println("onBatch("   argList   ")");
                ArrayList<String> result = new ArrayList<>(argList.size());
                for (String arg : argList) {
                    result.add("res#"   arg);
                }
                return result;
            }
        }, BATCH_SIZE, BATCH_TIMEOUT).start();

        ExecutorService exec = newCachedThreadPool();
        exec.submit(new Callable<Void>() { // T-0
            public Void call() throws Exception {
                System.out.println(actor.sendAndWait(("A")));
                return null;
            }
        });
        exec.submit(new Callable<Void>() { // T-1
            public Void call() throws Exception {
                Thread.sleep(25);
                System.out.println(actor.sendAndWait(("B")));
                return null;
            }
        });
        exec.submit(new Callable<Void>() { // T-2
            public Void call() throws Exception {
                Thread.sleep(50);
                System.out.println(actor.sendAndWait(("C")));
                Thread.sleep(25);
                System.out.println(actor.sendAndWait(("D")));
                return null;
            }
        });        

        exec.shutdown();
    }
}
>> onBatch([A, B, C])
>> res#A
>> res#B
>> res#C
>> onBatch([D])
>> res#D

Завершение

В моем представлении, акторы отменны для программирования многопоточных примитивов, представляющих собой финальные автоматы со трудной диаграммой переходов, которая помимо каждого прочего может зависеть от поступающих доводов.

Некоторые примеры этой статьи являются вариациями кода обнаруженного в сети в разных места, включаяgpars.org/guide.

Во 2-й части мы

  • Измерим скорость работы предложенного решения
  • Ускорим работу с JDBC объединяя запросы разных потоков из отдельных транзакций в одну огромную транзакцию RDBMS. То есть сделаем batch не в рамках одного Connection, а между разными Connection-ами.

 

 Источник: programmingmaster.ru
Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB