Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

JDK concurrent package

Anna | 5.06.2014 | нет комментариев

Модель памяти, присутствующая на данный момент в Java, гарантирует ожидаемый порядок выполнения многопоточного кода, при отсутствии в этом коде гонок потоков. И для того, Дабы обезопасить ваш код от гонок, придуманы разные методы синхронизации и обмена данными между ними.

Пакет java.util.concurrent, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:

  • Atomic
  • Locks
  • Collections
  • Synchronization points
  • Executors
  • Accumulators _jdk 1.8_

Atomic

В дочернем пакете java.util.concurrent.atomic находится комплект классов для атомарной работы с простыми типами. Контракт данных классов гарантирует выполнение операции compare-and-set за «1 единицу процессорного времени». При установке нового значения этой переменной вы также передаете ее ветхое значение (подход оптимистичной блокировки). Если с момента вызова способа значение переменной отличается от ожидаемого — итогом выполнения будет false.

Для примера возьмем два массива long переменных [1,2,3,4,5] и [-1,-2,-3,-4,-5]. Всякий из потоков будет ступенчато итерироваться по массиву и суммировать элементы в цельную переменную. Код (groovy) с пессимистичной блокировкой выглядит так:

class Sum {
    static monitor = new Object()
    static volatile long sum = 0
}

class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
            synchronized (Sum.monitor) {
                println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                Sum.sum  = it
            }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")

Итог выполнения будет ожидаемым:

pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0

Впрочем такой подход имеет значительные недочеты по продуктивности. В данном случае на непотребную для нас работу, уходит огромнее источников, чем на пригодную:

  • попытка блокирования монитора
  • блокировка потока
  • разблокировка монитора
  • разблокировка потока

Разглядим применение AtomicLong для реализации оптимистичной блокировки при расчете этой же суммы:

class Sum {
    static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
                while(true) {
                    long localSum = Sum.sum.get()
                    if (Sum.sum.compareAndSet(localSum, localSum   it)) {
                        println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                        break;
                    } else {
                        println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                    }
                }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")

Как видно из итогов «ложных» попыток было не так уж и много:

[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0

При решении применять оптимистичную блокировку значимо, Дабы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем Почаще будут случаться ложные compare-and-set, и тем Почаще придется исполнять это действие вторично.

На основе compare-and-set может также реализовываться неблокирующая read бs_rqvmk!

DelayQueue

PriorityBlockingQueue разрешающая получить элемент только позже определенной задержки (задержка объявляется через Delayed интерфейс объекта). DelayQueue может быть использована для реализации планировщика. Емкость очереди не фиксирована.

LinkedBlockingDeque

Двунаправленная BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.

LinkedBlockingQueue

Однонаправленная BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.

LinkedTransferQueue

Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь разрешает ждать когда элемент «заберет» обработчик.

PriorityBlockingQueue

Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сопоставление элементов). Воспрещает null значения.

SynchronousQueue

Однонаправленная `BlockingQueue`, реализующая transfer() логику для put() способов.

Synchronization points

 

CountDownLatch

Барьер (await()), ждущий определенного (либо огромнее) кол-ва вызовов countDown(). Состояние барьера не может быть сброшено.

CyclicBarrier

Барьер (await()), ждущий определенного кол-ва вызовов await() другими потоками. Когда кол-во потоков достигнет указанного будет вызван опциональный callback и блокировка снимется. Барьер сбрасывает свое состояние в исходное при освобождении ждущих потоков и может быть использован вторично.

Exchanger

Барьер (`exchange()`) для синхронизации 2-х потоков. В момент синхронизации допустима volatile передача объектов между потоками.

Phaser

Растяжение `CyclicBarrier`, дозволяющая регистрировать и удалять участников на всякий цикл барьера.

Semaphore

Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` вероятность находиться в блоке нескольким потокам.

Executors

ExecutorService пришел на замену new Thread(runnable) Дабы упростить работу с потоками.ExecutorService помогает вторично применять освободившиеся потоки, организовывать очереди из задач для пула потоков, подписываться на итог выполнения задачи. Взамен интерфейса Runnable пул использует интерфейс Callable (может возвращать итог и кидать ошибки).

ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
    Object call() throws Exception {
        println("In thread")
        return "From thread"
    }
})
println("From main")
println(future.get())

try {
    pool.submit(new Callable() {
        Object call() throws Exception {
            throw new IllegalStateException()
        }
    }).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}

pool.shutdown()

Способ invokeAll отдает управление вызвавшему потоку только по заключению всех задач. СпособinvokeAny возвращает итог первой удачно исполненной задачи, отменяя все дальнейшие.

ThreadPoolExecutor

Пул потоков с вероятностью указывать рабочее и наивысшее кол-во потоков в пуле, очередь для задач.

ScheduledThreadPoolExecutor

Расширяет функционал ThreadPoolExecutor вероятностью исполнять задачи отложенно либо регулярно.

ThreadPoolExecutor

Больше легкий пул потоков для «самовоспроизводящих» задач. Пул ждет вызовов `fork()` и `join()` способов у дочерних задач в родительской.

class LNode {
    List<LNode> childs = []
    def object
}

class Finder extends RecursiveTask<LNode> {
    LNode  node
    Object expect

    protected LNode compute() {
        if (node?.object?.equals(expect)) {
            return node
        }
        node?.childs?.collect {
            new Finder(node: it, expect: expect).fork()
        }?.collect {
            it.join()
        }?.find {
            it != null
        }
    }
}

ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
        node: new LNode(
                childs: [
                        new LNode(object: "ivalid"),
                        new LNode(
                                object: "ivalid",
                                childs: [new LNode(object: "test")]
                        )
                ]
        ),
        expect: "test"
))

print("${invoke?.object}")
Accumulators _jdk 1.8_

Аккумуляторы разрешают исполнять простые операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без применения CAS.

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB