Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Асинхронные задачи в С 11

Anna | 24.06.2014 | нет комментариев
Доброго времени суток, хотел бы поделиться с сообществом своей маленький библиотектой.
Я программирую на С/C , и, к сожалению, в рабочих планах не могу применять эталон C 11. Но вот пришли майские праздники, возникло свободное время и я решил поэкспериментировать и по-постигать данный запретный плод. Самое лучшее для постижения чего либо — это практика. Чтение статей о языке программирования обучит максимум отличнее читать, следственно я решил написать маленькую библиотеку для асинхронного выполнения функций.
Сразу оговорюсь, что я знаю, что существует std::future, std::async и тп. Мне было увлекательно реализовать самому что-то сходственное и окунуться в мир лямбда-функций, потоков и мьютексов с головой. Праздники — чудесное время для велопрогулок.

Выходит начнем

Я решил что моя библиотека будет работать дальнейшим образом.
Существует определенный пул, с фиксированным числом потоков.
В него добавляются задачи, применяя синтаксис лямбда функций.
Из самой задачи дозволено извлечь итог ее выполнения, либо легко дождаться окончания ее работы.
Забегая вперед, выглядит это приблизительно так:

...
act::control control(N_THREADS);
auto some_task = act::make_task([](std::vector<double>::const_iterator begin, std::vector<double>::const_iterator end)
{
  double sum = 0;
  for (auto i = begin; i != end;   i)
  {
    sum =(*i);
  }
  return sum;
}
, data.begin(), data.end());
control << some_task;  
cout << some_task->get() << endl;
...
Класс задачи

Для начала нужно сделать класс, описывающий задачу:

template <typename T>
  class task
      : public task<decltype(&T::operator())>
  {
  };

  template <typename ClassType, typename ReturnType, typename ... Args>
  class task<ReturnType(ClassType::*)(Args...) const>
  {
  protected:
    const ClassType &m_func;
    std::tuple<Args...> m_vars;
    ReturnType m_return;
  public:
    task(const ClassType &v, Args... args): m_func(v), m_vars(args ...) {}
    virtual ~task() {}   
  private:   
  };

Как вестимо, лямба функция раскрывается в класс-функтор с оператором operator().
Наш класс задачи шаблонный, его тип извлекается из типа оператора функтора &T::operator().
Класс хранит в себе указатель на функтор, доводы функции в виде std::tuple и возвращаемое значение.

Выходит сейчас мы можем беречь в объекте лямбда-функцию с параметрами, сейчас нужно обучиться ее вызывать.
Для этого нужно вызвать opertator() у m_func с параметрами, хранящимися в m_vars.
С начала я не знал как это сделать, но усиленное применения гугла и переход по 2-й ссылке принесло итог:

  template<int ...>
  struct seq { };

  template<int N, int ...S>
  struct gens : gens<N-1, N-1, S...> { };

  template<int ...S>
  struct gens<0, S...> {
    typedef seq<S...> type;
  };

С поддержкой этой конструкции дозволено добавить в класс следующие функции:

  ...
  public:
  void invoke()
  {
    ReturnType r = caller(typename gens<sizeof...(Args)>::type());     
  }    
  private:
  template<int ...S>
  ReturnType caller(seq<S...>) const
  {
    return m_func(std::get<S>(m_vars) ...);
  }    
  ...
Базовый класс задачи

Сейчас реализуем базовый класс задачи::

  class abstract_task
  {
  protected:
    mutable std::mutex m_mutex;
    mutable std::condition_variable m_cond_var;
    mutable bool m_complete;
  public:
    abstract_task(): m_complete(false) {}
    virtual ~abstract_task() {}
    virtual void invoke() = 0;
    virtual void wait() const
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      while (!m_complete)
      {
        m_cond_var.wait(lock);
      }
    }
  };

Класс содержит в себе мьютекс и переменную состояния, сигнализирующую о заключении задачи. Соотвественно наш класс задачи притерпит некоторые метаморфозы, которые я опущую, так как начальный код доступен на гитхабе.

Создание задач

Сделаем функцию-обертку для создания задач:

  template <typename T, typename ... Args>
  std::shared_ptr<task<decltype(&T::operator())>> make_task(T func, Args ... args )
  {
    return std::shared_ptr<task<decltype(&T::operator())>>(new task<decltype(&T::operator())>(func, args ...));
  }

Так как у нас класс воображаемый, разумно применять указатель и мы это сделаем, и не легко указатель, а разумный указатель.

Класс управления

Сейчас реализуем сущность для исполнения задач в фоновых потоках.
Приведу лишь часть кода:

...
  class control
  {
    std::deque<std::shared_ptr<abstract_task>> m_tasks;
    std::vector<std::thread> m_pool;
    std::mutex m_mutex;
    std::condition_variable m_cond_var;
    std::condition_variable m_empty_cond;
    std::atomic<bool> m_run;
    std::vector<bool> m_active;
  public:
    control(std::size_t pool_size = 2)
    {
      m_run.store(true, std::memory_order_relaxed);
      auto func = [this](int n)
      {
        while (m_run.load(std::memory_order_relaxed))
        {
          std::unique_lock<std::mutex> lock(m_mutex);
          m_active[n] = true;
          if (m_tasks.empty())
          {
            m_empty_cond.notify_all();
            m_active[n] = false;
            m_cond_var.wait(lock);
          }
          else
          {
            std::shared_ptr<abstract_task> t = m_tasks.front();
            m_tasks.pop_front();
            lock.unlock();
            t->invoke();
            lock.lock();
            m_active[n] = false;
          }
        }        
      };
      pool_size = pool_size > 0 ? pool_size : 1;
      m_active.resize(pool_size, false);
      for(std::size_t i = 0; i < pool_size;   i)
      {
        m_pool.emplace_back(func, i);
      }
    }
    ...

Для интереса, я применял все фичерсы нового эталона, использование которых я хоть как то мог обосновать.
Данный класс создает массив потоков и массив переменных состояния активности для мониторинга выполнения заданий дочерними потоками.
Основной цикл дочернего потока контролируется атомарной переменной (по идее довольно было объявить ее volatile, так как здесь нет состояния гонки, основной поток в нее только пишет, а дочерние только читают)

Продуктивность

Я бы не стал писать эту статью скорее каждого, если бы не проведенный мною тест продуктивности данного решения по сопоставлению с std::async.
Конфигурация:

Intel® Core(TM) i7-2600 CPU @ 3.40GHz
$gcc –version
gcc (Debian 4.8.2-21) 4.8.2

Тест заключается в параллельном сложении массивов, а после этого асинхронном сложении итогов всех сложений. Итогом операции будет:

res = sum(array)*N_P

Числа указаны в миллисекундах.

Тест 1

Оптимизация отключена, число элементов в массиве 100000000, число порождаемых задач 73, число потоков в пуле 6
Итоги:

test_act 16775 OK
test_async 16028 OK

Продуктивность сравнима.

Тест 2

Оптимизация включена, число элементов в массиве 100000000, число порождаемых задач 73, число потоков в пуле 6
Итоги:

test_act 1597.6 OK
test_async 2530.5 OK

Моя реализация стремительней в полтора раза.

Тест 3

Оптимизация включена, число элементов в массиве 100000000, число порождаемых задач 73, число потоков в пуле 7
Итоги:

test_act 1313.1 OK
test_async 2503.7 OK

Тест 4

Оптимизация включена, число элементов в массиве 100000000, число порождаемых задач 73, число потоков в пуле 8
Итоги:

test_act 1402 OK
test_async 2492.2 OK

Тест 5

Оптимизация включена, число элементов в массиве 100000000, число порождаемых задач 173, число потоков в пуле 8
Итоги:

test_act 4435.7 OK
test_async 5789.4 OK

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB