Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Организация вебсокетного взаимодействия с приложением Spring

Anna | 1.06.2014 | нет комментариев

Скажу сразу, что стандартная реализация такого взаимодействия — существует.

Впрочем, от того что эта статья — продолжение темы “Примитивный вызов удалённых сервисных способов в одностраничных приложениях“, тут будет приведена альтернативная схема взаимодействия, нужная для замены ajax на вебсокеты, в контексте подхода(jrspc), описанного в вышеупомянутой теме.

В первой статье — был описан механизм вызова сервисных способов, с применением ajax.

В этой статье — описано, как дозволено реализовать данный механизм, с заменой ajax на вебсокеты,
не меняя код бизнес-логики приложения.

Такая замена даёт больше стремительное соединение(тесты в конце), экономию серверной памяти, и добавляет вероятность вызывать способы заказчика с сервера.

Для демонстрации, написано малое чат-приложение, с начальным кодом на гитхабе.
на примере разбора которого, я попытаюсь объяснить, как реализованы клиентская и серверная части такого взаимодействия.
Приложение работает на сервере tomcat 7.042.
Поддерживает https и wss (сертификат неподтверждённый), и не ведёт логов на сервере.

Серверная часть

Основным вопросом, возникшем при организации вебсокетного взаимодействия с приложением Spring, был вопрос — как обеспечить вызов компонентов Spring, из его областей видимостей, которые привязаны к http-сессии, из объекта StreamInbound, возвращаемого способом createWebSocketInbound классаWebSocketServlet, тот, что к сессии не привязан?

Дабы обеспечить требуемую функциональность по вызову способов серверных компонентов, нам необходимо как-то получить доступ к рабочему ApplicationContext из преемника StreamInbound.

Если попытаемся заавтоваерить ApplicationContext, Дабы с его поддержкой получать надобные нам компоненты, в преемника WebSocketServlet либо StreamInbound — нас ждёт разочарование, так как он не проинициализируется, что безусловно законно.

Для того, Дабы из вебсокетного обработчика получить доступ к компонентам из контекстов Spring, которые связаны с http-сессией, нам необходимо сделать объект, тот, что бы был сессионным спринговым бином, и тот, что бы хранился в статическом объекте класса-хранилища, доступ к которому, имел бы преемник StreamInbound.

Данный сессионный объект (назовём его ClientManager), создаётся в процессе установки http соединения.

Соответственно, заказчик, раньше чем начинать взаимодействовать с сервером через вебсокет, должен сделать один http handshake-запрос, в итоге которого, он должен получить ид своего ClientManager.

Итог этого запроса дозволено передать в код заказчика двумя методами — вставить clientManagerId в отдаваемую сгенерированную страницу, либо через ajax запрос, со статической страницы (тут — реализован вариант через ajax).

Обработка этого запроса производится в способе initializeClientManager сессионного контроллера:

@Controller
@Scope("session")
public class ClientManagerController {

    @Autowired   
    private ClientManager clientManager;                     

    @RequestMapping(value = "/init", method = RequestMethod.POST)
    @ResponseBody
    private String initializeClientManager(HttpSession session) {  

      JSONObject result = new JSONObject();       
        try{
            boolean loged = ClientManagersStorage.checkClientManager(clientManager, session) ;
            result.put("loged", loged);        
            result.put("clientManagerId", clientManager.getId());      
        }catch(Throwable th){
            result.put("error", th.toString()); 
        }         
        return result.toString();         
    }

ClientManagersStorage — это хранилище наших сессионных администраторов заказчиков, имеющее способы по проверке администратора на null, созданию нового, добавлению в хранилище, поиску и удалению.

public class ClientManagersStorage {

    final static private  Map<String, ClientManager> clientManagers = new ConcurrentHashMap <String, ClientManager>();

    public static boolean checkClientManager(ClientManager clientManager, HttpSession session) {
        ClientManager registeredClientManager = clientManagers.get(clientManager.getId());
        if (registeredClientManager == null) {
            clientManager.setSession(session);
            addClientManager(clientManager);   
            registeredClientManager = clientManager;       
        }
        return registeredClientManager.getUser() != null;        
    }
    ...
}

(вопрос управления жизненным циклом сессии будет рассмотрен немножко ниже)

Как видно, администраторы хранятся в статической мапе, по ключу, являющемуся его hashCode, и когда пользователь перезагружает страницу — ему назначается тот же самый администратор.

Ид этого администратора — передаётся заказчику в переменной clientManagerId результата.

Позже того, как заказчик получил ид своего администратора — он может открывать вебсокетное соединение, передавая свой clientManagerId в исключительном параметре запроса на установку соединения.

Запрос на открытие этого соединения обрабатывается в способе createWebSocketInbound класса WebSocketConnectorServlet — имплементации абстрактного WebSocketServlet.

@Override
	protected StreamInbound createWebSocketInbound(String paramString, HttpServletRequest request) {
		String clientManagerId = request.getParameter("clientManagerId");		
		ClientManager clientManager = ClientManagersStorage.findClientManager(clientManagerId);
		if(clientManager == null){
		    return new WebSocketConnection(null);
		}		
		log.debug("new connection");
		return new WebSocketConnection(clientManager);
	}

в нём, из запроса достаётся clientManagerId, по нему находится ClientManager, и создаётся объект WebSocketConnection (являющийся StreamInbound), к которому привязан ClientManager.

Так как ClientManager — сессионный, и был сделан в «типичном» http запросе, то из него будут доступны все спринговые бины, через автоваеренный в него ApllicationContext, тот, что, тут, будет проинициализирован верно).

При открытии нового соединения с заказчиком, контейнером вызывается способ onOpen класса WebSocketConnection, в котором привязанный к нему ClientManager, добавляет данный WebSocketConnection, в свою мапу соединений, по ид(хэшкоду) объекта.

   @Override
    protected void onOpen(WsOutbound outbound) {
        if(clientManager != null){
           clientManager.addConnection(this);
        }    
    }

(Помощь множества соединений нужна, Дабы пользователь мог открывать приложение в нескольких окнах, всякое из которых будет создавать своё вебсокетное соединение.)

Открыв соединение, заказчик может слать запросы на вызовы серверных способов, которые будут обрабатываться в переопределённом способе onTextMessage класса WebSocketConnection.

    @Override
    protected void onTextMessage(CharBuffer message) throws IOException {
        try {                   
            String connectionId = String.valueOf(this.hashCode());
            String request = message.toString();
            clientManager.handleClientRequest(request, connectionId);            
        } catch (Throwable th) {
            log.error("in onTextMessage: "   th);
        }
    }

способ handleClientRequest класса ClientManager — обрабатывает запрос, и пишет итог в соединение:

@Autowired
    private RequestHandler requestHandler;

    public void handleClientRequest(String request, String connectionId) {
        log.debug("handleClientRequest request="   request);
        log.debug("handleClientRequest user="   getUser());   
        /** handleRequest - never throws exceptions ! */
        JSONObject response = requestHandler.handleRequest(request, this);        
        String responseJson = response.toString();
        CharBuffer buffer = CharBuffer.wrap(responseJson);
        WebSocketConnection connection = connections.get(connectionId);
        try {
            connection.getWsOutbound().writeTextMessage(buffer);
        } catch (IOException ioe) {
            log.error("in handleClientRequest: in writeTextMessage: "   ioe);
        }
    }

requestHandler — автоваеренный компонент, отвечающий за обработку запросов.
В него вварен ApllicationContext, при помощи которого он находит объекты сервисов.

Его способ handleRequest, ищет компонент обслуживания, и вызывает на нём способы необходимый заказчику, верно так же, как способ processAjaxRequest из класса CommonServiceController, из предыдущей статьи.

Такова всеобщая схема взаимодействия.

Сейчас разглядим подробнее момент инициализации ClientManager’а http сессией.

Сессия имеет качество отваливаться по таймауту, тот, что по умолчанию равен 30 минутам.
Дабы избежать этого — выставляем его значение на максимум, и инвалидируем сессию когда нам это необходимо — а именно, в 2-х случаях: 1-й случай — когда кто-то сделал запрос не из приложения, и 2-й, когда заказчик закрыл страницу приложения.

1-й случай обрабатывается прямо в способе инициализации:

  public class ClientManager{

	    public void setSession(HttpSession session) {
	        /** session will be invalidated at connection removing */
	        session.setMaxInactiveInterval(Integer.MAX_VALUE);//69.04204112011317 years
	        this.session = session;
	        new Thread(new Runnable() {            
	            @Override
	            public void run() {
	                /** Giving time to client, for establish websocket connection. */
	                try {Thread.sleep(60000);} catch (InterruptedException ignored) {}
	                /** if client not connected via websocket until this time - it is bot */
	                if (connections.size() == 0) {removeMe();}                                
	            }            
	        }).start();        
	    }	    
	    private void removeMe() {ClientManagersStorage.removeClientManager(this);}

    	...
  }  

а 2-й — в способе onClose класса WebSocketConnection:

public class WebSocketConnection{
	   @Override
	    protected void onClose(int status) {
	        if(clientManager != null){
	            clientManager.removeConnection(this);
	        }        
	    }
        ...
   }

   public class ClientManager{ 

	   public void removeConnection(WebSocketConnection webSocketConnection) {
	        String connectionId = getObjectHash(webSocketConnection);
	        connections.remove(connectionId);
	        if (connections.size() == 0) {
	            log.debug("removeConnection before wait:  connections.size()="   connections.size());
	            /** may be client just reload page? */
	            try {Thread.sleep(waitForReloadTime);} catch (Throwable ignored) {}            
	            if (connections.size() == 0) {
	                /** no, client leave us (page closed in browser)*/      
	                ClientManagersStorage.removeClientManager(this); 
	                log.debug("client "   getId()   " disconnected");                    
	            }
	        }
	    }   
         ...
   }  
 

Сессия инвалидируется в способе removeClientManager класса ClientManagersStorage:

public static void removeClientManager(ClientManager clientManager) {        
        ClientManager removed  = clientManagers.remove(clientManager.getId());    
        if(removed == null){return;}
        User user = removed.getUser();
        if(user != null){                
            Broadcaster.broadcastCommand("userPanel.setLogedCount", UserService.logedCount.decrementAndGet());   
        }                
        Broadcaster.broadcastCommand("userPanel.setOnlineCount", ClientManagersStorage.getClientManagersCount());            
        try {
            clientManager.getSession().invalidate();
            clientManager.setSession(null);     
        } catch (Throwable th) {
            log.error("at removeClientManager: "   th);
        }        
    }  

Из этого же способа делается уведомление пользователей о том, что изменилось число посетителей страницы (обработка этих уведомлений на заказчике — описана ниже).

Для уведомления пользователей о событиях на сервере — применяется класс Broadcaster, имеющий два способа: broadcastCommand и sendCommandToUser:

public class Broadcaster{

    public static void broadcastCommand(String method, Object params) {
        for (ClientManager clientManager : ClientManagersStorage.getClientManagers().values()) {
            clientManager.sendCommandToClient(method, params);
        }
    }    

    public static void sendCommandToUser(Long userId, String method, Object params) {     
        List<ClientManager> userClientManagers = ClientManagersStorage.findUserClientManagers(userId);
        for(ClientManager clientManager: userClientManagers){
            clientManager.sendCommandToClient(method, params);
        }        
    }
  }  

Способ sendCommandToClient класса СlientManager — работает так:

public void sendCommandToClient(String method, Object params) {
        for(WebSocketConnection connection: connections.values()){
            sendCommandToClientConnection(connection, method, params);             
        }        
    }    

    private void sendCommandToClientConnection(WebSocketConnection connection, String method, Object params) {
        JSONObject commandBody = new JSONObject();
        if(params == null){params = new JSONObject();}
        commandBody.put("method", method);
        commandBody.put("params", params);        
        CharBuffer buffer = CharBuffer.wrap(commandBody.toString());
        try {
            connection.getWsOutbound().writeTextMessage(buffer);                     
        } catch (IOException ioe) {
            log.error("in sendCommandToClient: in writeTextMessage: "   ioe);
        }                
    }

На этом, с серверной частью завершим, и перейдём к клиентской.

Клиентская часть

Клиентская часть должна реализовать три функциональности:

первая — handshake на ajax, для инициализации сессионного СlientManager’а, вторая — вебсокетный транспорт, для отправки запросов jsrpc и приобретения на них результатов, и третья — вызов функций на заказчике, с сервера.

Первая часть — самая простая:

От того что мы используем Ангуляр, для инициализирующего http-сессию запроса ajax, применяется $http:

	var appName = "jrspc-ws"; 
	var secured = document.location.protocol == "https:" ? "s" : "";
	var HttpSessionInitializer = {url: "http" secured "://"  document.location.host  "/" appName "/init"};

	/** called from root-controller.js after its initialization */	
	HttpSessionInitializer.init = function($http) {	    	    	
	    	$http.post(this.url, "").success(function(response){
					if (response.error) {
						error(response.error);
					} else {					
						loged = response.loged;					
						Server.initialize("ws" secured "://"  document.location.host  "/" appName "/ws?clientManagerId=" response.clientManagerId);
						if(loged){Listeners.notify("onLogin");}					
					}    		
	    	}).error(function() {error("network error!");});    	    	    	
	}

На сервере, данный запрос обрабатывается в способе initializeClientManager класса ClientManagerController, код которого приведён выше, в изложении серверной части.

Инициализация сокетного соединения происходит в функции Server.initialize:

  	connector.initialize = function(url) {
		connector.url = url;
		try {
			connector.connect(url);
			return true;
		} catch (ex) {
			p("in connector.initialize: "   ex);
			return false;
		}
	}

connector — внутренний объект Server, тот, что отвечает за вебсокетное соединение (его полный код находится в файле ws-connector.js)

Код из ws-connector.js, тот, что отвечает за образование запроса jrspc:

  	Server.socketRequests = {};

	var requestId = 0;

	function sendSocket(service, method, params, successCallback, errorCallback, control) {
		if (!checkSocket()) {return;}
		requestId  ;

		if(!params){params = [];}
		if(!isArray(params)){params = [params];}

		var data = {
			service : service,
			method : method,
			params : params,
			requestId : requestId
		};
		Server.socketRequests["request_"   requestId] = {
			successCallback : successCallback,
			errorCallback : errorCallback,
			control : control
		};

		if (control) {control.disabled = true;}

		var message = JSON.stringify(data);
		log("sendSocket: " message);
		connector.socket.send(message);
	}
	...
	Server.call = sendSocket;

Код из ws-connector.js, тот, что отвечает за обработку результатов на запросы, и обработку серверных команд:

 		connector.socket.onmessage = function(message) {
			var data = message.data;
			var response = JSON.parse(data);
			var requestId = response.requestId;
			if (requestId) {/** server return response */					
				var control = Server.socketRequests["request_"   requestId].control;
				if (control) {control.disabled = false;}									
				if (response.error) {
					var errorCallback = Server.socketRequests["request_"   requestId].errorCallback;
					if (errorCallback) {
						try {
							errorCallback(response.error);
						} catch (ex) {
							error("in connector.socket.onmessage errorCallback: "   ex   ", data="   data);
						}
					}else{
						error(response.error);
					}
				} else {	
					var successCallback = Server.socketRequests["request_"   requestId].successCallback;
					if (successCallback) {
						try {
							successCallback(response.result);
						} catch (ex) {
							error("in connector.socket.onmessage successCallback: "   ex   ", data="   data);
						}
					}
				}
				delete Server.socketRequests["request_"   requestId];
			} else {
				/** server call client or broadcast */
				var method = eval(response.method);
				var params = response.params;
				try {
					method(params);
				} catch (ex) {
					error("in connector.socket.onmessage call method: "   ex   ", data="   data);
				}
			}
		}; 

Использование вышеописанного фреймворка, разрешает реализовать всю бизнес логику, отвечающую за функциональность чата — в 2-х функциях на заказчике (chat-controller.js):

		self.sendMessage = function(command){
	    	var message = {to: (self.sendPrivate ? self.privateTo : "all"), from: userPanel.user.login, text: self.newMessage, clientTime: new Date().getTime()};
	     	Server.call("chatService", "dispatchMessage", message,
	    	function(){	self.newMessage = ""; self.$digest(); }, function(error){self.onError(error);}, command);
	    } 

	    /** called from server */
	    self.onChatMessage = function (message){ 	
	    	message.isPrivate = (message.to != "all");
	    	self.messages.push(message);
	    	self.$digest();	
	    	chatConsole.scrollTop = chatConsole.clientHeight   chatConsole.scrollHeight;    	
	    } 

и одном серверном способе:

		@Component
	    public class ChatService extends AbstractService{

		    @Autowired
		    private UserManager userManager;

		    @Secured("User")
		    @Remote
		    public void dispatchMessage(ChatMessage message){ 
		        message.setServerTime(new Date().getTime());  
		        String to = message.getTo();
		        if("ALL".equalsIgnoreCase(to)){                   
		            Broadcaster.broadcastCommand("chatPanel.onChatMessage", message);
		        }else{            
		            User fromUser = getUser();
		            message.setFrom(fromUser.getLogin());
		            User toUser = userManager.findByLogin(to);    
		            if(toUser == null){throw new RuntimeException("User " to " not found!");}             
		            Broadcaster.sendCommandToUser(toUser.getId(), "chatPanel.onChatMessage", message);        
		            Broadcaster.sendCommandToUser(fromUser.getId(), "chatPanel.onChatMessage", message);     
		        }                
		    }                       
	    }

Тесты на скорость, при последовательной и параллельной отправке 1000 запросов, для ajax и websockets:

ступенчато: ajax (3474, 3380, 3377) ws (1299, 1113, 1054)
параллельно: ajax (1502, 1515, 1469) ws (616, 637, 632)

код тестов

		function testController($scope){	
			var self = $scope;	

		    self.maxIterations = 1000;
		    self.testIterations = self.maxIterations;
		    self.testStart = 0;
		    self.testEnd = 0;

		    self.testForSpeedSerial = function(command){
		    	if(self.testStart == 0){self.testStart = now();}
		    	if(--self.testIterations <= 0){
		    		var duration = now() - self.testStart;
		    		alert("testForSpeedSerial duration=" duration);    
		    		self.testStart = 0;
		    		self.testIterations = self.maxIterations;
		    		return;
		    	}
		    	Server.call("userService", "testForSpeed", "", function(){ self.testForSpeedSerial(command); }, error, command);    	
		    }

		    self.testForSpeedParallelResponses = 0;

		    self.testForSpeedParallel = function(command){	
		    	self.testStart = now();    	
		    	for(var i = 0; i < self.testIterations; i  ){
		    		Server.call("userService", "testForSpeed", "", 
		    				function(){
		    			       self.testForSpeedParallelResponses   ; 
		    			       if(self.testForSpeedParallelResponses >= self.maxIterations){
		    			    	      	var duration = now() - self.testStart;
								    	alert("testForSpeedParallel duration=" duration);    		
								    	self.testForSpeedParallelResponses = 0;
		    			       }
		    				}, error, command); 
		    	}    	
		    } 
		}

серверный способ testForSpeed:

     @Remote public void testForSpeed(){} 

Все скептические примечания и указания на ошибки будут приняты с благодарностью.

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB