Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Не опасайтесь велосипедов. Либо еще один GCD на C 11

Anna | 25.06.2014 | нет комментариев
С моей точки зрения самое пригодное, что может сделать программист для возрастания своего профессионального яруса — это написание велосипедов. Велосипедостроение — дюже интересный процесс. Изредка он увлекает огромнее, чем задача, ради которой сам велосипед и затевался. При написании велосипеда (под велосипедом я понимаю реализацию теснее присутствующего) происходит больше большое осознавание теснее существующих решений и техник.

Мотивация

Вот теснее огромнее 3 лет моим основным рабочим языком является objective-c, и когда я только начал писать на нем меня славно поразил продуманный высокоуровневый API для работы с многопоточностью NSOperationQueue, а позднее — GCD, тот, что по моему суждению является сутью лаконичности и понятности для Thread concurrency. И вот недавние статьи на Прогре: Техника написания аналога await/async из C# для Cи Thread concurrency C 11. Они и принудили посмотреть на те новые плюшки, которые предоставляет C для работы с многопоточностью. И множество из них (тот-же std::future) для меня выглядят приблизительно так:

Домыслы и хотелки

Вот нормальный сценарий, в котором я использую многопоточность в своих приложениях:

  • асинхронно получить какие-то данные (файлы/сеть);
  • распарсить/подотовить полученные данные;
  • воротить данные в вызываемый поток (скажем, в основной поток и обновить UI).

Комфортно, Дабы для всякой из этих операций была своя очередь.
А еще больше комфортно, когда все это собрано в одном месте, а не разбросано по пяти начальным файлам. Что-то на подобии:

file_io_queue.async([=]{
    file_data = get_data_from_file( file_name );
    parser_queue.async([=]{
        parsed_data = parse( file_data );
        main_queue.async([=]{
            update_ui_with_new_data( parsed_data ) ;
        });
    });
});

Данный код читается как безусловно линейный, синхронный код. Он описывает логику того, как будут протекать метаморфозы в данных. Для меня, по огромному счету, без разницы, в каком потоке будет выполняться вычитка файла, в каком — его парсинг. Основное — последовательность этих операций. Я могу вызвать предшествующий код 100500 раз для 100500 файлов.

?вственное решение — реализация образца пулл потоков. Но примерно все реализации, виденные мною на просторах интернетиков, предлагают применять один std::thread для одной очереди. С моей точки зрения это не есть отлично. Скажем, необходимо беречь инстанс самой очереди все время, пока выполняются асинхронные операции. Создание исттанса std::thread на порядки больше затратная операция, чем захват/освобождение мютекса. Когда нам стоит уничтожать очередь? Да и простаивающее огромное число потоков в то время, когда очередь не применяется — не айс.
Мы сделаем по иному. У нас будет N-ное число потоков (std::thread) и список легковесных очередей с приоритетами. Когда мы добавляем задачу в очередь, то информируем поток о том, что возникла новая задача. Поток берет самую высокоприоритетную задачу и исполняет ее. Если задача с таким приоритетом теснее выполняется, то берет больше низкоприоритетную задачу. Если таких нет — ожидает.

Код

Приступим:

Очередь

namespace dispatch{
    typedef std::function<void ()> function;
    struct queue {
        typedef long priority; // Наш приоритет. Пускай это будет целое число
        const queue::priority queue_priority; 

        static std::shared_ptr<queue> main_queue() ; // Об этом ниже

        virtual void async(function) const; // Собственно способ для добавления задачи в очередь

        queue(queue::priority priority) : queue_priority(priority) {};
    };
}

Реализация способа async

легко перенаправляет вызов в thread pool:

    void queue::async(dispatch::function task) const {
        thread_pool::shared_pool()->push_task_with_priority(task, this->queue_priority);
    };

Каждая работа будет протекать в нашем

Пуле потоков:

    struct queue_impl{
        const queue::priority priority;
        std::queue<function> tasks;
        bool is_running;
        queue_impl(queue::priority priority): priority(priority){};
    };

    struct thread_pool{
        thread_pool();
        static std::shared_ptr<thread_pool>& shared_pool(); // thread_pool
        virtual ~thread_pool();

        bool stop;

        typedef std::shared_ptr<queue_impl> queue_ptr; 

        void push_task_with_priority(const function&, queue::priority);// Добавляем задачу в очередь с приоритетом
        bool get_free_queue(queue_ptr*) const;          // Ищем очередь, задачи из которой не исполняются прямо теперь
        void start_task_in_queue(const queue_ptr&);  // Подмечаем очередь как исполняющуюся
        void stop_task_in_queue(const queue_ptr&);  // Снимаем отметку

        std::mutex mutex; // Мютекс для синхронизации пула
        std::map<queue::priority, queue_ptr> queues; // Тут хранятся очереди по приоритетам

        std::mutex main_thread_mutex;
        std::queue<dispatch::function> main_queue;

        std::condition_variable condition;
        std::vector<std::thread> threads; // Массив реальных потоков, которые и будут исполнять задачи

        dispatch::function main_loop_need_update;
        void add_worker(); // Добавляем еще один поток
    };

Разглядим способы по порядку. Нам нужно обнаружить свободную очередь с максимальным приоритетом:

обнаружить свободную очередь с максимальным приоритетом:

   bool thread_pool::get_free_queue(queue_ptr* out_queue) const {
        //  проходим по каждому очередям с приоритетом от максимального до малейшего
       auto finded = std::find_if(queues.rbegin(), queues.rend(), [](const std::pair<queue::priority, queue_ptr>& iter){
                                         return ! iter.second->is_running; // и находим первую свободную очередь
                                     });

        bool is_free_queue_exist = (finded != queues.rend());
        if (is_free_queue_exist)
            *out_queue = finded->second;

        return  is_free_queue_exist;
    }

Добавляем задачу в очередь

    void thread_pool::push_task_with_priority(const function& task, queue::priority priority){
        {
            std::unique_lock<std::mutex> lock(mutex); // Захватваем мютекс

            // Добавляем задачу в очередь. Если очереди нет - создаем ее
            auto queue = queues[priority];
            if (!queue){
                queue = std::make_shared<dispatch::queue_impl>(priority);
                queues[priority] = queue;
            }
            queue->tasks.push(task);

            // Если нужно, то добавляем потоки
            unsigned max_number_of_threads = std::max<unsigned>(std::thread::hardware_concurrency(), 2);
            unsigned number_of_threads_required = round(log(queues.size())   1);
            while (threads.size() < std::min<unsigned>(max_number_of_threads, number_of_threads_required)){
                add_worker();
            }
        }
        condition.notify_one(); // Информируем поток, что мы добавили задачу в очередь
    }

Подмечаем задачу как исполненную

    void thread_pool::stop_task_in_queue(const queue_ptr& queue){
        {
            std::unique_lock<std::mutex> lock(mutex);
           // Подмечаем задачу как исполненную. Если очередь пуста - убираем ее из списка очередей
            queue->is_running = false;
            if ( queue->tasks.size() ==0 ){
                queues.erase(queues.find(queue->queue_priority));
            }
        }
        condition.notify_one(); // Информируем поток, что одна из задач выполненна
    }

И, собственно, сам поток:

    void thread_pool::add_worker(){
        threads.push_back(std::thread([=]{
                              dispatch::function task;
                              thread_pool::queue_ptr queue;
                              while(true){
                                  {
                                      std::unique_lock<std::mutex> lock(mutex); // Пытаемся захватить мютекс

                                      while(!stop && !get_free_queue(&queue)) // Если нет свободных очередей
                                          condition.wait(lock);                   // То ожидаем оповещения

                                      if(stop) // Если пулл потоков был остановлен, то завершаемся
                                          return;

task = queue->tasks.front(); // Забираем задачу из очереди
                                      queue->tasks.pop(); 

                                      start_task_in_queue(queue); // Подмечаем очередь как занятую
                                  }
                                  task(); // Исполняем задачу
                                  stop_task_in_queue(queue); // Подмечаем очередь как свободную
                              }
                          }));
    }

Main Thread и Run Loop

В С нет такого представления как основной поток. Но на этой доктрины возведены фактически все UI приложения. UI мы можем изменять только из основного потока. Значит, нам необходимо либо самим организовать Run Loop, либо вклиниться в теснее присутствующий.

Для начала сделаем отдельную очередь для «основного потока»:

Main Queue

    struct main_queue : queue{
        virtual void async(dispatch::function task) const override;
        main_queue(): queue(0) {};
    };

    std::shared_ptr<queue> queue::main_queue(){
        return std::static_pointer_cast<dispatch::queue>(std::make_shared<dispatch::main_queue>());
    }

А в способе async будем добавлять задачи в

отдельную очередь

    void main_queue::async(dispatch::function task) const {
        auto pool = thread_pool::shared_pool();
        std::unique_lock<std::mutex> lock(pool->main_thread_mutex);
        pool->main_queue.push(task);
        if (pool->main_loop_need_update != nullptr)
            pool->main_loop_need_update();
    }

Ну и нам нужна функция, которая будет вызываться из основного потока:

Код

    void process_main_loop() {
        auto pool = thread_pool::shared_pool();
        std::unique_lock<std::mutex> lock(pool->main_thread_mutex);
        while (!pool->main_queue.empty()) {
            auto task = pool->main_queue.front();
            pool->main_queue.pop();
            task();
        }
    }

Сейчас только два вопроса: «Как?» и «Для чего?»

Вначале «Для чего?»: C достаточно Зачастую применяется для написания кросс-платформенного ПО. В угоду переносимости от многих комфортных пророческой нужно отказаться. GCD дюже комфортная библиотека, предоставляющая легкой, наглядный и комфортный метод управления асинхронными очередями.
На вопрос «Как?» нет однозначного результата. Вклиниться ранлуп дозволено по различному. Многие системы предастовляют API для этого. Скажем, в iOS есть «performSelectorOnMainThread:». Нам необходимо лишь задать коллбэк через dispatch::set_main_loop_process_callback:

-(void)dispatchMainThread{
    dispatch::process_main_loop();
}
- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions{
    dispatch::set_main_loop_process_callback([=]{
        [self performSelectorOnMainThread:@selector(dispatchMainThread) withObject:nil waitUntilDone:NO];
    });
    return YES;
}

Если-же мы сами организуем наш личный ранлуп, то дозволено сделать что-то такое:

    void main_loop(dispatch::function main_loop_function);

    void main_loop(dispatch::function main_loop_function){
        auto main_queue = queue::main_queue();
        while (!thread_pool::shared_pool()->stop) {
            main_queue->async(main_loop_function);
            process_main_loop();
        }
    }

А сейчас собственно то, ради чего это и затевалось:

Сотворим 6 очередей и запихнем в всякую по 6 заданий:

   auto main_thread_id = std::this_thread::get_id();
    for (unsigned task = 0; task < 6;   task)
    for (unsigned priority = 0; priority < 6;   priority){
        dispatch::queue(priority).async([=]{
            assert(std::this_thread::get_id() != main_thread_id);
            std::string task_string = std::to_string(task);
            std::string palceholder(1 priority*5, ' ');
            dispatch::queue::main_queue()->async([=]{
                assert(std::this_thread::get_id() == main_thread_id);
                std::cout << palceholder << task_string << std::endl;
            });
        });
    }

Получим приблизительно такую картинку


                          0
                          1
                     0
                0
                          2
                     1
                1
                          3
                     2
                2
                          4
                     3
                3
                          5
                     4
                4
           0
                     5
                5
           1
      0
 0
           2
      1
 1
           3
      2
 2
           4
      3
 3
           5
      4
 4
      5
 5

«Столбик» представляет собой очередь. Чем правее, тем больше высокий приоритет у очереди. Линия — это коллбэки на «основной поток».

Ну и код для iOS:

    for (int i = 0; i < 20;   i){
        dispatch::queue(dispatch::QUEUE_PRIORITY::DEFAULT).async([=]{
            NSAssert(![NSThread isMainThread], nil);
            std::string first_string = std::to_string(i);
            dispatch::queue::main_queue()->async([=]{
                NSAssert([NSThread isMainThread], nil);
                std::string second_string = std::to_string(i 1);
                std::cout << first_string << " -> " << second_string << std::endl;
                [self.tableView reloadData]; // Делаем что-то с UI. То, что дозволено делать только из основного потока
            });
        });
    }

Завершение

Завершения не будет. Данный велосипед писался экстраординарно с целью пощупать многопоточность в C 11. Код представляет собой чуть больше 200 строк не особенно отличного С кода, представлен на гитхабе. Проверялся на clang 5.0, g -4.8 и компилятором 2012 Visual Studio. То есть основные компиляторы теснее в довольной мере поддерживают C 11.

Пы.Сы. Призывая писать свои велосипеды я отнюдь не призываю их применять на боевых планах. Правда, с иной стороны, как еще велосипед может превратиться во что-то солидное?

Ну и еще пара велосипедов, которые я не придумал куда впихнуть в статье



 

Источник: programmingmaster.ru

 

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB