Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

«Boost.Asio C Network Programming». Глава 5: Синхронное вопреки асинхронного

Anna | 24.06.2014 | нет комментариев

Продолжаю перевод книги John Torjo «Boost.Asio C Network Programming».

Оглавление:

Авторы Boost.Asio сделали восхитительную работу, давая нам вероятность предпочесть то, что огромнее удовлетворяет нашим приложениям, предпочтя синхронный либо асинхронный путь.
В предыдущей главе мы видели каркасы для всех типов приложений, таких как синхронный заказчик, синхронный сервер, а так же их асинхронные варианты. Вы можете применять всякий из них в качестве основы для вашего приложения. Если же возникнет надобность вникать в подробности о всяком типе приложения, то читаем дальше.

Смешивание синхронного и асинхронного программирования

Библиотека Boost.Asio разрешает смешивать синхронное и асинхронное программирование. Лично я думаю, что это плохая идея, но Boost.Asio, как и С в целом, разрешает выстрелить себе в ногу, если вы того захотите.
Вы легко можете попасть в западню, исключительно если ваше приложение работает асинхронно. Скажем, в результат на асинхронную операцию записи вы, скажем, делаете асинхронную операцию чтения:

io_service service;
ip::tcp::socket sock(service);
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
void on_write(boost::system::error_code err, size_t bytes) 
{
	char read_buff[512];
	read(sock, buffer(read_buff));
}
async_write(sock, buffer("echo"), on_write);

Наверно синхронная операция чтения будет блокировать нынешний поток, таким образом, всякие другие незавершенные асинхронные операции будут находиться в режиме ожидания (для этого потока). Это дрянной код и может привести к тому, что приложение начнет тормозить либо вообще заблокируется (каждый толк применения асинхронного подхода — это Дабы избежать блокировок, следственно применяя синхронные операции, вы опровергаете это). Если у вас есть синхронное приложение, то маловероятно, что вы будете применять асинхронные операции чтения либо записи, так как думать синхронно теснее значит думать линейным образом (сделать А, потом В, потом С и так дальше).
Исключительный случай, по моему суждению, когда синхронные и асинхронные операции могут трудиться совместно, это когда они всецело отделены друг от друга, скажем, синхронная сеть и асинхронные операции ввода и итога из базы данных.

Доставка сообщений от заказчика серверу и напротив

Дюже значимая часть отменного клиент/серверного приложения это доставка сообщений туда и обратно (от сервера к заказчику и от заказчика к серверу). Нужно указать, что идентифицирует сообщение. Другими словами, когда происходит чтение входящего сообщения, как мы можем узнать, что сообщение прочитано всецело?
Вам нужно определить конец сообщения (предисловие определить легко, это 1-й пришедший байт позже конца последнего сообщения), но вы удостоверитесь, что это не так легко.
Вы можете:

  • Сделать сообщение фиксированного размера (это не дюже отличная идея; что делать, когда вам потребуется отправить огромнее данных?)
  • Сделать определенный символ, завершающий сообщение, такой как ‘n’ либо ‘’
  • Указать длину сообщения в качестве префикса сообщения и так дальше

На протяжении каждой книги, я решил применять «символ ‘n’ в качестве конца всякого сообщения». Так, чтение сообщений будет показывать дальнейший фрагмент кода:

char buff_[512];
// synchronous read
read(sock_, buffer(buff_), boost::bind(&read_complete, this, _1, _2));
// asynchronous read
async_read(sock_   buffer(buff_),MEM_FN2(read_complete,_1,_2), 
	MEM_FN2(on_read,_1,_2));
size_t read_complete(const boost::system::error_code & err, size_t bytes) 
{
	if ( err) 
		return 0;
	already_read_ = bytes;
	bool found = std::find(buff_, buff_   bytes, 'n') < buff_   bytes;
	// we read one-by-one until we get to enter, no buffering
	return found ? 0 : 1;
}

Оставим указание длины в качестве префикса сообщения в качестве упражнения для читателя, это достаточно легко.

Синхронный ввод/вывод в клиентских приложениях

Синхронный заказчик, как правило, бывает 2-х видов:

  • Он запрашивает что-то от сервера, читает и обрабатывает результат. После этого запрашивает что-то еще и так дальше. Это, по сути, синхронный заказчик, тот, что рассматривался в предыдущей главе.
  • Читает входящее сообщение от сервера, обрабатывает его и пишет результат. После этого читает следующее входящее сообщение и так дальше.

Оба сценария применяют следующую тактику: сделать запрос – прочитать результат. Другими словами, одна сторона делает запрос, на тот, что иная сторона отвечает в результат. Это примитивный метод реализации клиент/серверного приложения и это то, что я рекомендую вам.
Вы неизменно можете сделать Mambo Jambo клиент/сервер, где всякая сторона пишет когда желательно, но, крайне возможно, что данный путь приведет к катастрофе (как вы узнаете что случилось, когда заказчик либо сервер заблокируется?).
Предыдущие сценарии могут выглядеть идентично, но, они дюже различные:

  • В первом случае сервер реагирует на запросы (сервер ожидает запросы от заказчиков и отвечает на них). Это дерганное (pull-like) соединение, когда заказчик получает по запросу от сервера то, что ему нужно.
  • В последнем случае сервер посылает заказчику события, на которые тот реагирует. Это толчковое (push-like) соединение, когда сервер проталкивает уведомления/события заказчикам.

В основном вы будете сталкиваться с pull-like клиент/серверными приложениями, которые облегчают разработку, а так же, как правило, являются нормой.
Вы можете смешивать эти два подхода: получить по запросу (заказчик-сервер) и протолкнуть запрос (сервер-заказчик), впрочем, это трудно и отменнее этого избежать. Есть задача смешивания 2-х этих подходов, если вы используете тактику сделать запрос – прочитать результат; может случиться следующее:

  • Заказчик пишет (делает запрос)
  • Сервер пишет (посылает уведомление заказчику)
  • Заказчик читает то, что написал сервер и интерпретирует это в качестве результата на свой запрос
  • Сервер блокирует ожидание результата от заказчика, тот, что придет, когда заказчик сделает новейший запрос
  • Заказчик пишет (делает новейший запрос)
  • Сервер будет интерпретировать данный запрос в качестве результата, тот, что он ожидал
  • Заказчик блокируется (сервер не посылает обратного результата, потому что он интерпретировал запрос заказчика в качестве результата на свое уведомление).

В pull-like клиент/серверном приложении дозволено было легко избежать предыдущего сценария. Вы можете моделировать push-like поведение путем реализации процесса пинговки, когда заказчик проверяет связь с сервером, скажем, всякие 5 секунд. Сервер может ответить что-то типа ping_ok, если нечего осведомить либо ping_[event_name], если есть событие для оповещения. Потом заказчик может инициировать новейший запрос для обработки этого события.
Повторим, предшествующий сценарий иллюстрирует синхронный заказчик из предыдущей главы. Его стержневой цикл:

void loop() 
{
	// read answer to our login
	write("login "   username_   "n");
	read_answer();
	while ( started_) 
	{
		write_request();
		read_answer();
		...
	}
}

Дозвольте изменить его, Дабы соответствовать последнему сценарию:

void loop() 
{
	while ( started_) 
	{
		read_notification();
		write_answer();
	}
}
void read_notification() 
{
	already_read_ = 0;
	read(sock_, buffer(buff_), boost::bind(&talk_to_svr::read_complete, this, _1, _2));
	process_notification();
}
void process_notification() 
{
	// ... see what the notification is, and prepare answer
}

Синхронный ввод/вывод в серверных приложениях

Серверы, как и заказчики, ной операции нужно начинать иную асинхронную операцию, Дабы service.run()не прекращала своего действия.
Вот каркас кода, тот, что урезан. Ниже приведены все члены класса talk_to_client:

void start() 
{
	...
	do_read(); // first, we wait for client to login
}
void on_read(const error_code & err, size_t bytes) 
{
	std::string msg(read_buffer_, bytes);
	if ( msg.find("login ") == 0) 
		on_login(msg);
	else if ( msg.find("ping") == 0) 
		on_ping();
	else 
		...
}
void on_login(const std::string & msg) 
{
	std::istringstream in(msg);
	in >> username_ >> username_;
	do_write("login okn");
}
void do_write(const std::string & msg) 
{
	std::copy(msg.begin(), msg.end(), write_buffer_);
	sock_.async_write_some( buffer(write_buffer_, msg.size()), 
		MEM_FN2(on_write,_1,_2));
}
void on_write(const error_code & err, size_t bytes) 
{
	do_read();
}

В 2-х словах, мы неизменно ожидаем операцию чтения, как только она завершится, мы обрабатываем сообщение и отвечаем обратно заказчику.
Преобразуем предшествующий код в push сервер:

void start() 
{
	...
	on_new_client_event();
}
void on_new_client_event() 
{
	std::ostringstream msg;
	msg << "client count " << clients.size();
	for ( array::const_iterator b = clients.begin(), e = clients.end();b != e;   b)
		(*b)->do_write(msg.str());
}
void on_read(const error_code & err, size_t bytes) 
{
	std::string msg(read_buffer_, bytes);
	// basically here, we only acknowledge
	// that our clients received our notifications
}
void do_write(const std::string & msg) 
{
	std::copy(msg.begin(), msg.end(), write_buffer_);
	sock_.async_write_some( buffer(write_buffer_, msg.size()), 
		MEM_FN2(on_write,_1,_2));
}
void on_write(const error_code & err, size_t bytes) 
{
	do_read();
}

Когда происходит событие, скажем, on_new_client_event, каждому заказчикам, которые обязаны быть проинформированы об этом событии, будут отправлены сообщения. Когда они ответят, мы осознаем, что они обработали полученное событие. Обратите внимание, что мы никогда не завершим асинхронно ожидать событий (следственно service.run() не завершит трудиться), так как мы неизменно ожидаем новых заказчиков.

Потоки в асинхронном сервере

Асинхронный сервер был показан в главе 4, он одно-поточный, так как там все происходит в функции main():

int main() 
{
	talk_to_client::ptr client = talk_to_client::new_();
	acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
	service.run();
}

Красота асинхронного подхода заключается в простоте перехода от одно-поточного к много-поточному варианту. Вы неизменно можете пойти одно-поточным путем, по крайней мере, пока ваших заказчиков не будет больше 200 единовременно либо около того. Тогда, Дабы перейти от одного потока к 100 потокам, вам нужно будет применять дальнейший фрагмент кода:

boost::thread_group threads;
void listen_thread() 
{
	service.run();
}
void start_listen(int thread_count) 
{
	for ( int i = 0; i < thread_count;   i)
		threads.create_thread( listen_thread);
}
int main(int argc, char* argv[]) 
{
	talk_to_client::ptr client = talk_to_client::new_();
	acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
	start_listen(100);
	threads.join_all();
}

Безусловно, как только вы начинаете применять много-поточность, вы обязаны думать о потоко-безопасности. Даже если вы вызовете async_* в потоке А, то процедура ее заключения может быть вызвана в потоке В (до тех пор пока поток В вызывает service.run()). Само по себе это не является задачей. До тех пор, пока вы будете следовать логической последовательности, то есть, от async_read() к on_read(), от on_read() кprocess_request, от process_request к async_write(), от async_write() к on_write(), от on_write() кasync_read() и нет никаких public функций, которые вызывали бы ваш класс talk_to_client, правда различные функции могут быть вызваны в различных потоках, они все равно будут вызваны ступенчато. Таким образом, мьютексы не необходимы.
Это, впрочем, обозначает, что для заказчика может быть только одна асинхронная операция в ожидании. Если в какой-то момент у заказчика имеется две отложенные асинхронные функции, то вам потребуются мьютексы. Потому что две отложенные операции могут кончаться приблизительно в одно время и в финальном результате мы моглибы вызвать их обработчики единовременно в 2-х различных потоках. Таким образом, есть надобность в потоко-безопасности, таким образом, в мьютексах.
В нашем асинхронном сервере на самом деле есть единовременно две отложенные операции:

void do_read() 
{
	async_read(sock_, buffer(read_buffer_), 
		MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
	post_check_ping();
}
void post_check_ping() 
{
	timer_.expires_from_now(boost::posix_time::millisec(5000));
	timer_.async_wait( MEM_FN(on_check_ping));
}

При выполнении операции чтения мы будем асинхронно ожидать ее заключения в течение некоторого периода. Таким образом, существует надобность в потоко-безопасности. Мой совет, если вы планируете, что вы предпочтете много-поточный вариант, то сделайте ваш класс потоко-безвредным с самого начала. Это, как правило, не повредит продуктивности (вы, безусловно, можете проверить это). Помимо того, если вы планируете пойти много-поточным путем, то идите по нему с самого начала. Таким образом, вы столкнетесь с допустимыми задачами на ранней стадии. Как только вы найдете загвоздку, первое, что вы обязаны проверить происходит ли это при одном запущенном потоке? Если да, то это легко, легко отладьте. В отвратном случае вы, видимо, позабыли залочить (mutex) какую-то функцию.
От того что наш пример нуждается в потоко-безопасности, мы изменили talk_to_client, применяя мьютексы. Помимо того, у нас есть массив заказчиков, на тот, что мы ссылаемся несколько раз в коде, тот, что также нуждается в собственном мьютексе.
Избежать дедлока и повреждения памяти не так легко. Вот как нужно было изменить функциюupdate_clients_changed():

void update_clients_changed() 
{
	array copy;
	{ 
		boost::recursive_mutex::scoped_lock lk(clients_cs);
		copy = clients; 
	}
	for( array::iterator b = copy.begin(), e = copy.end(); b != e;   b)
		(*b)->set_clients_changed();
}

То чего мы хотим избежать это, Дабы два мьютекса были заблокированы в одно и то же время (что может привести к обстановки взаимной блокировки). В нашем случае, мы не хотим, Дабы clients_cs и клиентскийcs_ мьютекс были заблокированы единовременно.

Асинхронные операции

Boost.Asio так же разрешает исполнять всякие ваши функции асинхронно. Легко используйте дальнейший фрагмент кода:

void my_func() 
{
	...
}
service.post(my_func);

Можете удостовериться, что my_func вызывается в одном из потоков, которые вызывают service.run(). Вы также можете запустить асинхронную функцию и сделать завершающий обработчик, тот, что известит вам, когда функция завершится. Псевдокод будет выглядеть дальнейшим образом:

void on_complete() 
{
	...
}
void my_func() 
{
	...
	service.post(on_complete);
}
async_call(my_func);

Тут нет функции async_call, вам придется сделать свою собственную. К счастью это не так трудно. Глядите дальнейший фрагмент кода:

struct async_op : boost::enable_shared_from_this<async_op>, ... 
{
	typedef boost::function<void(boost::system::error_code)> completion_func;
	typedef boost::function<boost::system::error_code ()> op_func;
	struct operation { ... };
	void start() 
	{
		{ 
			boost::recursive_mutex::scoped_lock lk(cs_);
			if ( started_) 
				return; 
			started_ = true;
		 }
		boost::thread t( boost::bind(&async_op::run,this));
	}
	void add(op_func op, completion_func completion, io_service &service) 
	{
		self_ = shared_from_this();
		boost::recursive_mutex::scoped_lock lk(cs_);
		ops_.push_back( operation(service, op, completion));
		if ( !started_) 
			start();
	}
	void stop() 
	{
		boost::recursive_mutex::scoped_lock lk(cs_);
		started_ = false; 
		ops_.clear();
	}
private:
	boost::recursive_mutex cs_;
	std::vector<operation> ops_; 
	bool started_; 
	ptr self_;
};

В структуре async_op создается фоновый поток, тот, что будет трудиться (run()) со всеми асинхронными функциями, которые вы добавляете (add()) к нему. Для меня это не представляется чем-то трудным, так как для всякой операции выполняется следующее:

  • Функция вызывается асинхронно.
  • completion функция вызывается при первом заключении функции
  • Экземпляр io_service, тот, что будет исполнять completion фуннадежда.
  • Ради простоты, разглядим дальнейший фрагмент кода, он не потоко-неопасный:
    class proxy : public boost::enable_shared_from_this<proxy> 
    {
    	proxy(ip::tcp::endpoint ep_client, ip::tcp::endpoint ep_server) : ... {}
    public:
    	static ptr start(ip::tcp::endpoint ep_client, ip::tcp::endpoint ep_svr) 
    	{
    		ptr new_(new proxy(ep_client, ep_svr));
    		// ... connect to both endpoints
    		return new_;
    	}
    	void stop() 
    	{
    		// ... stop both connections
    	}
    	bool started() 
    	{ 
    		return started_ == 2; 
    	}
    private:
    	void on_connect(const error_code & err) 
    	{
    		if ( !err) 
    		{
    			if (   started_ == 2) 
    				on_start();
    		} 
    		else 
    			stop();
    	}
    	void on_start() 
    	{
    		do_read(client_, buff_client_);
    		do_read(server_, buff_server_);
    	}
    	...
    private:
    	ip::tcp::socket client_, server_;
    	enum { max_msg = 1024 };
    	char buff_client_[max_msg], buff_server_[max_msg];
    	int started_;
    };
    

Это дюже примитивный прокси. При подключении на обоих концах он начинает читать на обоих соединениях (функция on_start()):

class proxy : public boost::enable_shared_from_this<proxy> 
{
	...
	void on_read(ip::tcp::socket & sock, const error_code& err, size_t bytes) 
	{
		char * buff = &sock == &client_ ? buff_client_ : buff_server_;
		do_write(&sock == &client_ ? server_ : client_, buff, bytes);
	}
	void on_write(ip::tcp::socket & sock, const error_code &err, size_t bytes)
	{
		if ( &sock == &client_) 
			do_read(server_, buff_server_);
		else 
			do_read(client_, buff_client_);
	}
	void do_read(ip::tcp::socket & sock, char* buff) 
	{
		async_read(sock, buffer(buff, max_msg), 
			MEM_FN3(read_complete,ref(sock),_1,_2), 
			MEM_FN3(on_read,ref(sock),_1,_2));
	}
	void do_write(ip::tcp::socket & sock, char * buff, size_t size) 
	{
		sock.async_write_some(buffer(buff,size),
			MEM_FN3(on_write,ref(sock),_1,_2));
	}
	size_t read_complete(ip::tcp::socket & sock, const error_code & err, size_t bytes) 
	{
		if ( sock.available() > 0) 
			return sock.available();
		return bytes > 0 ? 0 : 1;
	}
};

Позже всякого удачного чтения (on_read) он передает сообщение иной стороне. Как только сообщение было удачно передано (on_write), мы вновь начинаем читать.
Дабы это работало, используйте дальнейший фрагмент кода:

int main(int argc, char* argv[]) 
{
	ip::tcp::endpoint ep_c( ip::address::from_string("127.0.0.1"), 8001);
	ip::tcp::endpoint ep_s( ip::address::from_string("127.0.0.1"), 8002);
	proxy::start(ep_c, ep_s);
	service.run();
}

Вы подметили, что я вторично использую буферы (buff_client_ и buff_server_) для чтения и записи. Это повторное применение типично, потому что прочитанное сообщение от заказчика написано серверу раньше, чем новое сообщение будет прочитано от заказчика и напротив. Это также обозначает, что эта определенная реализация страдает от задачи живого отклика. В то время, когда мы находимся в процессе записи на стороне В, мы не читаем от стороны А (мы перезапускаем чтение от стороны А позже заключения операции записи на стороне В). Вы можете изменить реализацию, Дабы одолеть эту загвоздку, исполнив следующие действия:

  • Вы обязаны применять несколько буферов для чтения
  • Позже всякой удачной операции чтения, помимо асинхронной записи на иной стороне, сделать дополнительное асинхронно чтение (в новейший буфер).
  • Позже всякой удачной операции записи истребить (либо вторично применять) буфер.

Оставим это в качестве упражнения для вас.

Резюме

Есть еще много пророческой, которые следует рассматривать при выборе, в каком направлении идти: синхронном либо асинхронном.
В этой главе мы разглядели:

  • Как легко реализовать, тестировать и отлаживать всякий тип приложения
  • Как потоки влияют на ваше приложение
  • Как поведение приложения (pull-like либо push-like) влияет на его реализацию
  • Как вы можете подключить личные асинхронные операции, когда вы реализуете асинхронное приложение

Дальше мы собираемся разглядеть несколько не столь знаменитых особенностей Boost.Asio, а так же мою любимую специфика Boost.Asio – co-routines, которая дозволит вам применять все плюсы асинхронного подхода.

Источники к этой статье: ссылка

Каждому огромное спасибо за внимание, до новых встреч!

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB