Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Асинхронность: назад в грядущее

Anna | 24.06.2014 | нет комментариев

Асинхронность… Услышав это слово, у программистов начинают сверкать глаза, дыхание становится поверхностным, руки начинают трястись, голос — заикаться, мозг начинает рисовать бесчисленные ярусы абстракции… У администраторов округляются глаза, звуки становятся нечленораздельными, руки сжимаются в кулаки, а голос переходит на обертона… Исключительное, что их объединяет — это учащенный пульс. Только поводы этого разны: программисты рвутся в бой, а администраторы пытаются заглянуть в хрустальный шар и понять риски, начинают судорожно придумывать поводы увеличения сроков в разы… И теснее потом, когда огромная часть кода написана, программисты начинают осмысливать и познавать всю печаль асинхронности, проводя безмерные ночи в дебаггере, отчаянно пытаясь осознать, что же все-таки происходит…

Именно такую картину рисует мое воспаленное воображение при слове “асинхронность”. Безусловно, все это слишком чувствительно и не неизменно правда. чай так?.. Допустимы варианты. Некоторые скажут, что “при верном подходе все будет трудиться хорошо”. Впрочем это дозволено сказать неизменно и всюду при каждом комфортном и не комфортном случае. Но отменнее от этого не становится, баги не исправляются, а бессонница не проходит.

Так что же такое асинхронность? Отчего она так симпатична? А основное: что с ней не так?

Вступление

Асинхронность на нынешний момент является довольно знаменитой темой. Довольно просмотреть последние статьи на прогре, Дабы в этом удостовериться. Здесь тебе и обзор разных библиотек, и применения языка Go, и любые асинхронные фреймворки на JS, и много чего иного.

Традиционно асинхронность применяется для сетевого программирования: любые сокеты-шмокеты, читатели-писатели и прочие акцепторы. Но бывают еще комичные и увлекательные события, исключительно в UI. Тут я буду рассматривать экстраординарно сетевое применение. Впрочем, как будет показано в дальнейшей статье, подход дозволено расширять и углублять в неизвестные дали.

Дабы быть вовсе уж определенным, будем писать примитивный HTTP сервер, тот, что на некоторый всякий запрос посылает некоторый типовой результат. Это чтоб не писать парсер, т.к. к теме асинхронности он имеет ровно такое же отношение, как расположение звезд к нраву человека (см. астрологию).

Синхронный однопоточный сервер

Хм. Синхронный? А при чем здесь синхронный, спросит внимательный читатель, открыв статью про асинхронность. Ну, во-первых, нужно же с чего-то начать. С чего-то простого. А во-вторых… Короче, я автор, следственно будет так. А потом и сами узнаете, для чего.

Для того, Дабы не писать низкоуровневый платформозависимый код, для всех наших целей я буду применять сильную асинхронную библиотеку под наименованием boost.asio. Благо, статей теснее написано про нее довольно, Дабы быть правда бы немного в теме.

Вновь же, для большей наглядности и “продакшенности” кода я сделаю обертки для вызова соответствующих функций из библиотеки boost.asio. Безусловно, кому-то могут нравиться портянки подобноboost::asio::ip::tcp::socket либо boost::asio::ip::udp::resolver::iterator, но ясность и читабельность кода при этом гораздо уменьшается.

Выходит, изложение сокета и акцептора:

typedef std::string Buffer;

// forward declaration
struct Acceptor;
struct Socket
{
    friend struct Acceptor;

    Socket();
    Socket(Socket&& s);

    // чтение данных фиксированного размера
    void read(Buffer&);

    // чтение данных не огромнее, чем указанный размер буфера
    void readSome(Buffer&);

    // чтение данных вплотную до строки until
    int readUntil(Buffer&, const Buffer& until);

    // запись данных фиксированного размера
    void write(const Buffer&);

    // закрытие сокета
    void close();

private:
    boost::asio::ip::tcp::socket socket;
};

struct Acceptor
{
    // слушать порт для принятия соединений
    explicit Acceptor(int port);

    // создание сокета на новое соединение
    void accept(Socket& socket);

private:
    boost::asio::ip::tcp::acceptor acceptor;
};

Ничего лишнего, легко сервер. Socket разрешает писать и читать, в том числе до определенных символов (readUntil). Acceptor слушает указанный порт и принимает соединения.

Реализация каждого этого хозяйства приведена ниже:

boost::asio::io_service& service()
{
    return single<boost::asio::io_service>();
}

Socket::Socket() :
    socket(service())
{
}

Socket::Socket(Socket&& s) :
    socket(std::move(s.socket))
{
}

void Socket::read(Buffer& buffer)
{
    boost::asio::read(socket, boost::asio::buffer(&buffer[0], buffer.size()));
}

void Socket::readSome(Buffer& buffer)
{
    buffer.resize(socket.read_some(boost::asio::buffer(&buffer[0], buffer.size())));
}

bool hasEnd(size_t posEnd, const Buffer& b, const Buffer& end)
{
    return posEnd >= end.size() &&
        b.rfind(end, posEnd - end.size()) != std::string::npos;
}

int Socket::readUntil(Buffer& buffer, const Buffer& until)
{
    size_t offset = 0;
    while (true)
    {
        size_t bytes = socket.read_some(boost::asio::buffer(&buffer[offset], buffer.size() - offset));
        offset  = bytes;
        if (hasEnd(offset, buffer, until))
        {
            buffer.resize(offset);
            return offset;
        }
        if (offset == buffer.size())
        {
            LOG("not enough size: " << buffer.size());
            buffer.resize(buffer.size() * 2);
        }
    }
}

void Socket::write(const Buffer& buffer)
{
    boost::asio::write(socket, boost::asio::buffer(&buffer[0], buffer.size()));
}

void Socket::close()
{
    socket.close();
}

Acceptor::Acceptor(int port) :
    acceptor(service(), boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
{
}

void Acceptor::accept(Socket& socket)
{
    acceptor.accept(socket.socket);
}

Тут я применял синглтон для io_service, Дабы не передавать его всякий раз в сокет очевидно во входных параметрах. И откуда пользователю знать, что там должен быть какой-то io_service? Следственно я его спрятал подальше, Дабы глаза не мозолил. Остальное, я предполагаю, абсолютно ясно, за исключением, быть может, функции readUntil. Но суть ее примитивна: читать байтики до тех пор, пока не встретится сокровенное окончание. Это необходимо как раз для HTTP, т.к. предварительно размер мы, увы, не можем задать. Доводится ресайзиться.

Давайте сейчас напишем долгожданный сервер. Вот он:

#define HTTP_DELIM          "rn"
#define HTTP_DELIM_BODY     HTTP_DELIM HTTP_DELIM

// наш результат
Buffer httpContent(const Buffer& body)
{
    std::ostringstream o;
    o << "HTTP/1.1 200 Ok" HTTP_DELIM
        "Content-Type: text/html" HTTP_DELIM
        "Content-Length: " << body.size() << HTTP_DELIM_BODY
        << body;
    return o.str();
}

// слушаем 8800 порт (внезапно 80 занят?)
Acceptor acceptor(8800);
LOG("accepting");
while (true)
{
    Socket socket;
    acceptor.accept(socket);
    try
    {
        LOG("accepted");
        Buffer buffer(4000, 0);
        socket.readUntil(buffer, HTTP_DELIM_BODY);
        socket.write(httpContent("<h1>Hello sync singlethread!</h1>"));
        socket.close();
    }
    catch (std::exception& e)
    {
        LOG("error: " << e.what());
    }
}

Сервер готов!

Синхронный многопоточный сервер

Недочеты предыдущего сервера очевидны:

  1. Немыслимо обрабатывать несколько соединений единовременно.
  2. Заказчик может переиспользовать соединение для больше результативного взаимодействия, а мы его неизменно закрываем.

Следственно возникает идея, Дабы обрабатывать соединения в ином потоке, продолжая принимать следующие соединения. Для этого нам потребуется функция создания нового потока, которую я неожиданно назову go:

typedef std::function<void ()> Handler;

void go(Handler handler)
{
    LOG("sync::go");
    std::thread([handler] {
        try
        {
            LOG("new thread had been created");
            handler();
            LOG("thread was ended successfully");
        }
        catch (std::exception& e)
        {
            LOG("thread was ended with error: " << e.what());
        }
    }).detach();
}

Стоит подметить одну комичную вещь: если убрать detach(), то угадайте, что сделает программа?

Результат:

Тупо завершится без каких-либо сообщений. Спасибо разработчикам эталона, так удерживать!

Сейчас дозволено и сервер написать:

Acceptor acceptor(8800);
LOG("accepting");
while (true)
{
    Socket* toAccept = new Socket;
    acceptor.accept(*toAccept);
    LOG("accepted");
    go([toAccept] {
        try
        {
            Socket socket = std::move(*toAccept);
            delete toAccept;
            Buffer buffer;
            while (true)
            {
                buffer.resize(4000);
                socket.readUntil(buffer, HTTP_DELIM_BODY);
                socket.write(httpContent("<h1>Hello sync multithread!</h1>"));
            }
        }
        catch (std::exception& e)
        {
            LOG("error: " << e.what());
        }
    });
}

Казалось бы, все отлично, но не здесь то было: на реальных задачах под нагрузкой это дело ложится стремительно и потом не отжимается. Следственно мудрые дядьки подумали, подумали, и решили применять асинхронность.

Асинхронный сервер

В чем задача предыдущего подхода? А в том, что потоки взамен реальной работы огромную часть времени ждут на событиях из сети, отжирая источники. Хочется как-то больше результативно применять потоки для выполнения пригодной работы.

Следственно сейчас буду реализовывать схожие функции, но теснее асинхронно, применяя модель проактора. Что это обозначает? Это обозначает, что мы для всех операций вызываем функцию и передаем callback, тот, что автомагически позовется по окончании операции. Т.е. нас позовут сами, как только операция завершится. Это отличается от модели реактора, когда мы обязаны сами вызывать надобные обработчики, отслеживая за состоянием операций. Классический пример реактора: epoll, kqueue и разные select’ы. Пример проактора: IOCP на Windows. Я буду применять кроссплатформенный проактор boost.asio.

Асинхронные интерфейсы:

typedef boost::system::error_code Error;
typedef std::function<void(const Error&)> IoHandler;

struct Acceptor;
struct Socket
{
    friend struct Acceptor;

    Socket();
    Socket(Socket&&);

    void read(Buffer&, IoHandler);
    void readSome(Buffer&, IoHandler);
    void readUntil(Buffer&, Buffer until, IoHandler);
    void write(const Buffer&, IoHandler);
    void close();

private:
    boost::asio::ip::tcp::socket socket;
};

struct Acceptor
{
    explicit Acceptor(int port);

    void accept(Socket&, IoHandler);

private:
    boost::asio::ip::tcp::acceptor acceptor;
};

Стоит остановиться на некоторых вещах:

  1. Обработка ошибок сейчас значительно отличается. В случае синхронного подхода у нас 2 варианта: возврат кода ошибки либо генерация исключения (именно данный метод и применялся в начале статьи). В случае асинхронного вызова метод существует ровно один: передача ошибки через обработчик. Т.е. даже не через итог, а как входной параметр обработчика. И хочешь, не хочешь — будь вежлив обрабатывай ошибки как в ветхие добродушные времена, когда исключений еще не было: на всякий чих по проверке. Но самое увлекательное, безусловно, не это; увлекательное — это когда появилась оплошность в обработчике и ее нужно обработать. Вспоминание контекста — излюбленная задача асинхронного программирования!
  2. Для единообразного подхода я применял IoHandler, что делает код больше простым и универсальным.

Если наблюдательно присмотреться, то различие от синхронных функций только в том, что асинхронные содержат добавочный обработчик в качестве входного параметра.

Ну что ж, как бы пока ничего ужасного нет.

Реализация:

Socket::Socket() :
    socket(service())
{
}

Socket::Socket(Socket&& s) :
    socket(std::move(s.socket))
{
}

void Socket::read(Buffer& buffer, IoHandler handler)
{
    boost::asio::async_read(socket, boost::asio::buffer(&buffer[0], buffer.size()),
        [&buffer, handler](const Error& error, std::size_t) {
            handler(error);
    }); 
}

void Socket::readSome(Buffer& buffer, IoHandler handler)
{
    socket.async_read_some(boost::asio::buffer(&buffer[0], buffer.size()),
        [&buffer, handler](const Error& error, std::size_t bytes) {
            buffer.resize(bytes);
            handler(error);
    });
}

bool hasEnd(size_t posEnd, const Buffer& b, const Buffer& end)
{
    return posEnd >= end.size() &&
b.rfind(end, posEnd - end.size()) != std::string::npos;
}

void Socket::readUntil(Buffer& buffer, Buffer until, IoHandler handler)
{
    VERIFY(buffer.size() >= until.size(), "Buffer size is smaller than expected");
    struct UntilHandler
    {
        UntilHandler(Socket& socket_, Buffer& buffer_, Buffer until_, IoHandler handler_) :
            offset(0),
            socket(socket_),
            buffer(buffer_),
            until(std::move(until_)),
            handler(std::move(handler_))
        {
        }

        void read()
        {
            LOG("read at offset: " << offset);
            socket.socket.async_read_some(boost::asio::buffer(&buffer[offset], buffer.size() - offset), *this);
        }

        void complete(const Error& error)
        {
            handler(error);
        }

        void operator()(const Error& error, std::size_t bytes)
        {
            if (!!error)
            {
                return complete(error);
            }
            offset  = bytes;
            VERIFY(offset <= buffer.size(), "Offset outside buffer size");
            LOG("buffer: '" << buffer.substr(0, offset) << "'");
            if (hasEnd(offset, buffer, until))
            {
                // found end
                buffer.resize(offset);
                return complete(error);
            }
            if (offset == buffer.size())
            {
                LOG("not enough size: " << buffer.size());
                buffer.resize(buffer.size() * 2);
            }
            read();
        }

    private:
        size_t offset;
        Socket& socket;
        Buffer& buffer;
        Buffer until;
        IoHandler handler;
    };
    UntilHandler(*this, buffer, std::move(until), std::move(handler)).read();
}

void Socket::write(const Buffer& buffer, IoHandler handler)
{
    boost::asio::async_write(socket, boost::asio::buffer(&buffer[0], buffer.size()),
        [&buffer, handler](const Error& error, std::size_t) {
            handler(error);
    }); 
}

void Socket::close()
{
    socket.close();
}

Acceptor::Acceptor(int port) :
    acceptor(service(), boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
{
}

void Acceptor::accept(Socket& socket, IoHandler handler)
{
    acceptor.async_accept(socket.socket, handler);
}

Здесь должно быть все ясно, за исключением способа readUntil. Для того, Дабы несколько раз вызывать асинхронное чтение на сокете, нужно сберегать состояние. Для этого предуготовлен особый классUntilHandler, тот, что сберегает нынешнее состояние асинхронной операции. Схожую реализацию дозволено встретить в boost.asio для разных функций (скажем boost::asio::read), которые требуют нескольких вызовов больше примитивных (но не менее асинхронных) операций.

Помимо этого нужно написать аналог go и диспетчеризацию:

void go(Handler);
void dispatch(int threadCount = 0);

Тут указывается обработчик, тот, что будет запускаться асинхронно в пуле потоков и, собственно, создание пула поток с дальнейшей диспетчеризацией.

Вот как выглядит реализация:

void go(Handler handler)
{
    LOG("async::go");
    service().post(std::move(handler));
}

void run()
{
    service().run();
}

void dispatch(int threadCount)
{
    int threads = threadCount > 0 ? threadCount : int(std::thread::hardware_concurrency());
    RLOG("Threads: " << threads);
    for (int i = 1; i < threads;    i)
        sync::go(run);
    run();
}

Тут мы используем sync::go для создания потоков из синхронного подхода.

Реализация сервера:

Acceptor acceptor(8800);
LOG("accepting");
Handler accepting = [&acceptor, &accepting] {
    struct Connection
    {
        Buffer buffer;
        Socket socket;

        void handling()
        {
            buffer.resize(4000);
            socket.readUntil(buffer, HTTP_DELIM_BODY, [this](const Error& error) {
                if (!!error)
                {
                    LOG("error on reading: " << error.message());
                    delete this;
                    return;
                }
                LOG("read");
                buffer = httpContent("<h1>Hello async!</h1>");
                socket.write(buffer, [this](const Error& error) {
                    if (!!error)
                    {
                        LOG("error on writing: " << error.message());
                        delete this;
                        return;
                    }
                    LOG("written");
                    handling();
                });
            });
        }
    };

    Connection* conn = new Connection;
    acceptor.accept(conn->socket, [conn, &accepting](const Error& error) {
        if (!!error)
        {
            LOG("error on accepting: " << error.message());
            delete conn;
            return;
        }
        LOG("accepted");
        conn->handling();
        accepting();
    });
};

accepting();
dispatch();

Вот такая простыня. С всяким новым вызовом растет вложенность лямбд. Традиционно, безусловно, такое через лямбды не пишут, т.к. есть трудности с зацикливанием: в лямбду нужно пробрасывать саму себя, Дабы внутри самой себя позвать саму себя. Но тем не менее, читабельность кода будет приблизительно идентичная, т.е. идентично плохая при сопоставлении с синхронным кодом.

Выходит, давайте обсудим плюсы и минусы асинхронного подхода:

  1. Безоговорочный плюс (и это, собственно, то, ради чего все эти страдания) — это продуктивность. Причем она не легко в разы выше, она выше на порядки!
  2. Ну а сейчас минусы. Минус ровно один — трудный и замороченный код, тот, что к тому же еще и трудно отлаживать.

Отлично, безусловно, если все написал верно и оно сразу заработало и без багов. А вот если это не так… В всеобщем, радостного дебаггинга, как говорится в таких случаях. И это я еще разглядел довольно легкой пример, где дозволено отследить последовательность вызовов. При маленьком усложнении схемы обработки (скажем, одновременные чтение и запись в сокеты) трудность кода растет как на дрожжах, а число багов начинает расти чуть ли не экспоненциально.

Так стоит ли игра свеч? Стоит ли заниматься асинхронностью? На самом деле выход есть — coroutines либосопрограммы.

Сопрограммы

Выходит, чего же нам каждому хочется? Счастья, здоровья, денег мешок. А хочется простого: применять плюсы асинхронного и синхронного подходов единовременно, т.е. Дабы и эффективность была как у асинхронного, и простота как у синхронного.

На бумаге звучит восхитительно. Допустимо ли это? Для результата на вопрос нам потребуется малое вступление в сопрограммы.

Вот что такое обыкновенные процедуры? Находимся мы, значит, в каком-то месте исполнения и здесь раз, и позвали процедуру. Для вызова вначале запоминается нынешнее место для возврата, после этого зовется процедура, она исполняется, завершается и возвращает управление в то место, откуда была позвана. А сопрограмма — это то же самое, только другое: она тоже возвращает управление в то место, откуда была позвана, но при этом она не завершается, а останавливается в некотором месте, с которого дальше продолжает трудиться при повторном запуске. Т.е. получается этакий пинг-понг: дерзкий бросает мячик, сопрограмма ловит его, перебегает в другое место, бросает обратно, дерзкий тоже что-то делает (перебегает) и вновь бросает в предыдущее место теснее сопрограммы. И так происходит до тех пор, пока сопрограмма не завершится. В целом дозволено сказать, что процедура — это частный случай сопрограммы.

Как сейчас это дозволено применять для наших асинхронных задач? Ну здесь наводит на мысль то, что сопрограмма сберегает некоторый контекст исполнения, что для асинхронности весьма значимо. Именно это и буду я применять: если сопрограмме понадобится исполнить асинхронную операцию, то я легко вызову асинхронный способ и выйду из сопрограммы. А обработчик по заключению асинхронной операции легко продолжит исполнение нашей сопрограммы с места последнего вызова той самой асинхронной операции. Т.е. каждая чумазая работа по сохранению контекста ложится на плечи реализации сопрограмм.

И вот здесь как раз и начинаются задачи. Дело в том, что помощь сопрограмм на стороне языков и процессоров — дела давным-давно минувших дней. Для реализации переключения контекстов исполнения сегодня нужно проделать уйма операций: сберечь состояния регистров, переключить стек и заполнить некоторые служебные поля для правильной работы среды исполнения (скажем, для исключений, TLS и др.). Больше того, реализация зависит не только от архитектуры процессора, но еще и от компилятора и операционной системы. Звучит как конечный гвоздь в крышку гроба…

К счастью, есть boost.context, которая и реализует все, что нужно для поддержки определенной платформы. Написано все на ассемблере, в наилучших традициях. Дозволено, безусловно, применять boost.coroutine, но для чего, когда есть boost.context? Огромнее ада и угара!

Реализация сопрограмм

Выходит, для наших целей напишем свои сопрограммы. Интерфейс будет такой:

// выход из сопрограммы
void yield();

// проверка того, что мы находимся внутри сопрограммы
bool isInsideCoro();

// сопрограмма
struct Coro
{
    // на каждый случай, немного ли
    friend void yield();

    Coro();

    // создание и запуск обработчика
    Coro(Handler);

    // без комментариев
    ~Coro();

    // запуск обработчика
    void start(Handler);

    // продолжение сопрограммы (только если она завершилась yield)
    void resume();

    // проверка того, что сопрограмму дозволено продолжить
    bool isStarted() const;

private:
    ...
};

Вот такой нехитрый интерфейс. Ну и сразу вариант применения:

void coro()
{
    std::cout << '2';
    yield();
    std::cout << '4';
}
std::cout << '1';
Coro c(coro);
std::cout << '3';
c.resume();
std::cout << '5';

Должен выдать на экран:

12345

Начнем с способа start:

void Coro::start(Handler handler)
{
    VERIFY(!isStarted(), "Trying to start already started coro");
    context = boost::context::make_fcontext(&stack.back(), stack.size(), &starterWrapper0);
    jump0(reinterpret_cast<intptr_t>(&handler));
}

Тут boost::context::make_fcontext создает нам контекст и передает в качестве стартовой функции статический способ starterWrapper0:

TLS Coro* t_coro;
void Coro::starterWrapper0(intptr_t p)
{
    t_coro->starter0(p);
}

тот, что легко перенаправляет на способ starter0, извлекая нынешний экземпляр Coro из TLS. Каждая магия по переключению контекстов находится в приватном способе jump0:

void Coro::jump0(intptr_t p)
{
    Coro* old = this;
    std::swap(old, t_coro);
    running = true;
    boost::context::jump_fcontext(&savedContext, context, p);
    running = false;
    std::swap(old, t_coro);
    if (exc != std::exception_ptr())
        std::rethrow_exception(exc);
}

Тут мы заменяем ветхое TLS значение t_coro на новое (необходимо для рекурasync::Acceptor, описанные в пункте про асинхронность.

Применение

Перейдем к применению нашего функционала. Здесь все значительно проще и изысканнее:

Acceptor acceptor(8800);
LOG("accepting");
go([&acceptor] {
    while (true)
    {
        Socket* toAccept = new Socket;
        acceptor.accept(*toAccept);
        LOG("accepted");
        go([toAccept] {
            try
            {
                Socket socket = std::move(*toAccept);
                delete toAccept;
                Buffer buffer;
                while (true)
                {
                    buffer.resize(4000);
                    socket.readUntil(buffer, HTTP_DELIM_BODY);
                    socket.write(httpContent("<h1>Hello synca!</h1>"));
                }
            }
            catch (std::exception& e)
            {
                LOG("error: " << e.what());
            }
        });
    }
});
dispatch();

Приведенный код что-то напоминает… Верно! Это же фактически наш синхронный код:

sync synca
Acceptor acceptor(8800);
LOG("accepting");

while (true)
{
  Socket* toAccept = new Socket;
  acceptor.accept(*toAccept);
  LOG("accepted");
  go([toAccept] {
    try
    {
      Socket socket = std::move(*toAccept);
      delete toAccept;
      Buffer buffer;
      while (true)
      {
        buffer.resize(4000);
        socket.readUntil(buffer, HTTP_DELIM_BODY);
        socket.write(httpContent(
          "<h1>Hello sync multithread!</h1>"));
      }
    }
    catch (std::exception& e)
    {
      LOG("error: " << e.what());
    }
  });
}

Acceptor acceptor(8800);
LOG("accepting");
go([&acceptor] {
  while (true)
  {
    Socket* toAccept = new Socket;
    acceptor.accept(*toAccept);
    LOG("accepted");
    go([toAccept] {
      try
      {
        Socket socket = std::move(*toAccept);
        delete toAccept;
        Buffer buffer;
        while (true)
        {
          buffer.resize(4000);
          socket.readUntil(buffer, HTTP_DELIM_BODY);
          socket.write(httpContent(
            "<h1>Hello synca!</h1>"));
        }
      }
      catch (std::exception& e)
      {
          LOG("error: " << e.what());
      }
    });
  }
});
dispatch();

Здесь ровно одно различие: в синхронной реализации принятие сокета происходит в основном потоке, а потому отсутствует dispatch. Впрочем, если задаться целью, дозволено было бы эти подходы сделать всецело одинаковыми: для этого в синхронной реализации также сделать принятие сокетов в отдельном потоке, применяя go, а функция dispatch тогда легко бы ожидала заключения всех потоков.

Но различие в реализации носит твердый нрав: получившийся код использует асинхронное сетевое взаимодействие, а значит является значительно больше результативной реализацией. Собственно на этом наша цель достигнута: сделать симбиоз синхронного и асинхронного подходов, взяв из них самое лучшее, т.е. простоту синхронного и продуктивность асинхронного.

Совершенствование

Опишу некоторое совершенствование для процесса принятия сокетов. Зачастую, позже принятия происходит разветвление исполнения: тот, кто принимал, будет продолжает принимать, а новейший сокет будет обрабатываться в отдельном контексте исполнения. Следственно сделаем новейший способ goAccept:

async::IoHandler onCompleteGoHandler(coro::Coro* coro, Handler handler)
{
    return [coro, handler](const Error& error) {
        if (!error)
            go(std::move(handler));
        onComplete(coro, error);
    };
}

struct Acceptor
{
    typedef std::function<void(Socket&)> Handler;
    // ...
};

void Acceptor::goAccept(Handler handler)
{
    VERIFY(coro::isInsideCoro(), "goAccept must be called inside coro");
    defer([this, handler](coro::Coro* coro) {
        VERIFY(!coro::isInsideCoro(), "goAccept completion must be called outside coro");
        Socket* socket = new Socket;
        acceptor.accept(socket->socket, onCompleteGoHandler(coro, [socket, handler] {
            Socket s = std::move(*socket);
            delete socket;
            handler(s);
        }));
        LOG("accept scheduled");
    });
}

И тогда наш сервер перепишется в виде:

Acceptor acceptor(8800);
LOG("accepting");
go([&acceptor] {
    while (true)
    {
        acceptor.goAccept([](Socket& socket) {
            try
            {
                Buffer buffer;
                while (true)
                {
                    buffer.resize(4000);
                    socket.readUntil(buffer, HTTP_DELIM_BODY);
                    socket.write(httpContent("<h1>Hello synca!</h1>"));
                }
            }
            catch (std::exception& e)
            {
                LOG("error: " << e.what());
            }
        });
    }
});
dispatch();

Что значительно проще для понимания и применения.

Вопрос 1. А что с эффективностью?

Подлинно, различие от чисто асинхронного подхода в том, что здесь появляются добавочные убыточные расходы на создание/переключение контекстов и смежной атрибутики.

Вначале я было хотел проверить предельные нагрузки, но потом оказалось, что даже в одном (!!!) потоке загружается скорее гигабитная сеть, нежели процессор. Следственно я провел дальнейший тест:

  1. Сервер работает под непрерывной нагрузкой 30K RPS (т.е. 30 тыщ запросов в секунду).
  2. Глядим на загрузку процессора в случае async и synca.

Итоги приведены в таблице:

Способ число запросов в секунду число потоков Загруженность процессорного ядра
async 30000 1 75±5%
synca 30000 1 80±5%

Подмечу, что погрешность полученных значений связана с колебаниями значений в ходе одного испытания. Скорее каждого это связанно с неравномерностью нагрузки канала и обработки.

Тем не менее видно, что не смотря на присутствие добавочного переключения контекстов, а также пробрасыванием исключений взамен кодов возврата (исключение генерится всякий раз при закрытии сокета, т.е. всякий раз на новом запросе) убыточные расходы пренебрежимо малы. А если еще добавить код, тот, что бы Добросовестно парсил HTTP сообщение, а также код, тот, что бы не менее Добросовестно обрабатывал запросы и делал что-нибудь главное и надобное, то дозволено заявить отважно, что различие в продуктивности не будет вообще.

Вопрос 2. Ну возможен. А дозволено ли таким методом решать больше трудные асинхронные задачи?

Теорема. Всякую асинхронную задачу дозволено решить с поддержкой сопрограмм.

Подтверждение.
Сначала возьмем функцию, которая использует асинхронные вызовы. Всякую функцию дозволено превратить в сопрограмму, т.к. функция является частным случаем сопрограммы. Дальше возьмем какой-либо асинхронный вызов в такой преобразованной сопрограмме. Такой вызов дозволено представить в дальнейшем виде:

// код до вызова
async(..., handler);
// код позже вызова

Разглядим случай, когда у нас отсутствует код позже вызова:

// код до вызова
async(..., handler);

Такой код с точки зрения сопрограммы равнозначен дальнейшему:

// код до вызова
synca(...);
handler();

Т.е. внутри synca мы вызываем соответствующую асинхронную функцию async, тот, что нам возвращает управление в сопрограмму по заключению операции, и после этого вызывается обработчик handler()очевидно. Итог ровно один и тот же.

Сейчас осталось разглядеть больше всеобщий случай, когда у нас присутствует код позже асинхронного вызова. Такой код равнозначен:

// код до вызова
go {
    async(..., handler);
}
// код позже вызова

Применяя то, что у нас сейчас отсутствует код позже вызова async внутри go, получаем:

// код до вызова
go {
    synca(...);
    handler();
}
// код позже вызова

Т.е. на один асинхронный вызов стало поменьше. Применяя такой подход к всякому асинхронному вызову функции и к всякой функции мы перепишем каждый код на сопрограммах. Ч.т.д.

Итоги

Асинхронность быстрым домкратом врывается в нашу программистскую жизнь. Трудности, которые появляются при написании кода, способны привести в дрожь даже самых ярых и закаленных специалистов. Впрочем, не стоит списывать со счетов ветхий добросердечный синхронный подход: в умелых руках асинхронность превращается в изящные сопрограммы.

В дальнейшей статье будет рассмотрен значительно больше трудный пример, тот, что раскроет всю мощь и потенциал сопрограмм!

До новых встреч в эфире!

P.S. Каждый код дозволено обнаружить тут: bitbucket:gridem/synca

 

Источник: programmingmaster.ru

 

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB