永远不要跟别人比幸运,我从来没想过我比别人幸运,我也许比他们更有毅力,在最困难的时候,他们熬不住了,我可以多熬一秒钟、两秒钟,甚至更久。

SpringBoot集成WebSocket

Java 新民 344℃ 已收录 0评论

在项目研发发中由于调用了第三方的支付接口,支付接口只有一个异步回调的通知。由于前端和后端是分开部署的,所以引发了一个问题,支付成功后异步回调后端会处理逻辑,但前端并不知道支付的接果。为了解决研发成本,所以就选择了WebSocket。下面主要介绍SpringBoot是怎样集成WebSocket的?

首先呢,我们先引入一个WebSocket依赖。代码如下:

   <dependency>
		<groupId>org.springframework.boot </groupId>
		<artifactId>spring-boot-starter-websocket </artifactId>
	</dependency>

加完依赖后,就可以开始编码了,在SpringBoot中添加WebSocket的配置,配置如下:

   
	import org.springframework.context.annotation.Configuration;
	import org.springframework.web.socket.config.annotation.EnableWebSocket;
	import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
	import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

	/**
	 * websocket 配置
	 * @author zxm
	 *
	 */
	@Configuration 
	@EnableWebSocket
	public class WebSocketH5Config implements WebSocketConfigurer {

		@Override
		public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
			// TODO Auto-generated method stub
			
			registry.addHandler(new WebSocketH5Handler(), "/service/{ID}").setAllowedOrigins("*").addInterceptors(new WebSocketInterceptor());
		}
	}

1.@Configuration:注解标识该类为Spring的配置类

2.@EnableWebSocket:开启注解接收和发送消息

3.实现WebSocketConfigurer接口,重写registerWebSocketHandlers方法,这是一个核心实现方法,配置websocket入口,允许访问的域、注册Handler、定义拦截器。客户端通过“/service/{ID}”直接访问Handler核心类,进行socket的连接、接收、发送等操作,这里由于还加了个拦截器,所以建立新的socket访问时,都先进来拦截器再进去Handler类,“new WebSocketInterceptor()”是我实现的拦截器,“new WebSocketInterceptor()”是我实现的一个Handler类

上面提到了WebSocketInterceptor拦截器的实现,下面来看下是如何实现的,代码如下:

   
	import org.springframework.http.server.ServerHttpRequest;
	import org.springframework.http.server.ServerHttpResponse;
	import org.springframework.http.server.ServletServerHttpRequest;
	import org.springframework.web.socket.WebSocketHandler;
	import org.springframework.web.socket.server.HandshakeInterceptor;

	import lombok.extern.log4j.Log4j;

	/**
	 * websocket 拦截器
	 * @author zxm
	 *
	 */
	@Log4j
	public class WebSocketInterceptor implements HandshakeInterceptor {

		@Override
		public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
				Map<String, Object> attributes) throws Exception {
			// TODO Auto-generated method stub
			if(request instanceof ServletServerHttpRequest) {
				String ID = request.getURI().toString().split("ID=")[1];
				log.info("当前的sessionID="+ID);
				ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
				HttpSession session = serverHttpRequest.getServletRequest().getSession();
				attributes.put("WEBSOCKET_USERID", ID);
			}
			return true;
		}

		@Override
		public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
				Exception exception) {
			// TODO Auto-generated method stub
			log.info("后置拦截");
			
		}
	}

上述代码实现了HandshakeInterceptor 接口,并实现了beforeHandshake该方法,该方法是在进入Handler核心类之前进行拦截。

这里主要实现的逻辑是:

截取客户端建立webSocket连接时发送的URL地址字符串,并通过对该字符串进行特殊标识截取操作,获取客户端发送的唯一标识(由自己定义的,一般是系统用户ID唯一标识,用以标识该用户),并把它以键值对的形式放到Session里,这样后期可以通过该session获取它对应的用户ID了。【一个session对应着一个webSocketSession】

下面再看看handler处理器的具体实现,代码如下:

	import java.io.IOException;
	import java.util.HashMap;
	import java.util.Map;
	import java.util.Set;

	import org.springframework.stereotype.Component;
	import org.springframework.util.StringUtils;
	import org.springframework.web.socket.CloseStatus;
	import org.springframework.web.socket.TextMessage;
	import org.springframework.web.socket.WebSocketMessage;
	import org.springframework.web.socket.WebSocketSession;

	import com.alibaba.fastjson.JSON;
	import com.cupiday.common.utils.PublicDictUtil;
	import com.google.common.collect.Maps;

	import lombok.extern.log4j.Log4j;

	/**
	 * websocket 消息处理器
	 * @author zxm
	 *
	 */
	@Log4j
	@Component
	public class WebSocketH5Handler implements org.springframework.web.socket.WebSocketHandler {
		
		private  static final   Map<String, WebSocketSession> users = Maps.newConcurrentMap();

		@Override
		public void afterConnectionEstablished(WebSocketSession session) throws Exception {
			// TODO Auto-generated method stub
			log.info("成功建立连接");
			String ID = session.getUri().toString().split("ID=")[1];
			log.info("连接ID="+ID);
			if(ID != null) {
				users.put(ID, session);
				HashMap<String,Object> message = Maps.newHashMap();
				message.put(PublicDictUtil.KEY	, PublicDictUtil.SUCCESS_VALUE);
				message.put(PublicDictUtil.MSG_KEY, "成功建立socket连接");
				session.sendMessage(new TextMessage(JSON.toJSONString(message)));
				log.info("当前session="+session);
			}
			log.info("当前在线人数:"+users.size());
		}

		@Override
		public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
			// TODO Auto-generated method stub
			log.info(message.getPayload().toString());
			log.info(message.getPayload().toString()+" :来自"+(String)session.getAttributes().get("WEBSOCKET_USERID")+"的消息");
			sendMessageToUser(session.getAttributes().get("WEBSOCKET_USERID").toString(),new TextMessage("服务器收到了,hello"));
		}
		
		/**
		 * 发送消息给指定连接
		 * @param clientId
		 * @param message
		 * @return
		 */
		public boolean sendMessageToUser(String clientId,TextMessage message) {
			if(!StringUtils.hasLength(clientId)) {
				return false;
			}
			
			WebSocketSession webSocketSession = users.get(clientId);
			
			if(webSocketSession == null) {
				return false;
			}
			
			if(!webSocketSession.isOpen()) {
				return false;
			}
			try {
				webSocketSession.sendMessage(message);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
				return false;
			}
			return true;
		}
		
		
		/**
		 * 广播消息
		 * @param message
		 * @return
		 */
		public boolean sendMessageToAllUsers(TextMessage message) {
			boolean allSendSuccess = true;
			Set clientIds = users.keySet();
			WebSocketSession session = null;
			for (String clientId : clientIds) {
				try {
					session = users.get(clientId);
					if (session.isOpen()) {
						session.sendMessage(message);
					}
				} catch (IOException e) {
					e.printStackTrace();
					allSendSuccess = false;
				}
			}
			return true;
		}
		
		/**
		 * 关闭
		 * @param clientId
		 */
		public void close(String clientId) {
			WebSocketSession webSocketSession = users.get(clientId);
			if(webSocketSession != null) {
				try {
					webSocketSession.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
		

		@Override
		public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
			// TODO Auto-generated method stub
			if(session.isOpen()) {
				session.close();
			}
			log.info("连接出错");
			users.remove(getClientId(session));
		}

		@Override
		public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
			// TODO Auto-generated method stub
			log.info("连接已经关闭:"+closeStatus);
			users.remove(getClientId(session));
		}

		@Override
		public boolean supportsPartialMessages() {
			// TODO Auto-generated method stub
			return false;
		}
		
		/**
		 * 获取连接id
		 * @param session
		 * @return
		 */
		private String getClientId(WebSocketSession session) {
			try {
				String id = (String)session.getAttributes().get("WEBSOCKET_USERID");
				return id;
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
				
			}
			return null;
		}   

嗯,代码有点长啊,来具体说说这些代码都是干啥的:
1.实现了WebSocketHandler接口,并实现了关键的几个方法。

① afterConnectionEstablished(接口提供的):建立新的socket连接后回调的方法。主要逻辑是:将成功建立连接的webSocketSssion放到定义好的常量[private static final Map<String, WebSocketSession> users;]中去。这里也截取客户端访问的URL的字符串,拿到标识,以键值对的形式讲每一个webSocketSession存到users里,以记录每个Socket。

② handleMessage(接口提供的):接收客户端发送的Socket。主要逻辑是:获取客户端发送的信息。这里之所以可以获取本次Socket的ID,是因为客户端在第一次进行连接时,拦截器进行拦截后,设置好ID,这样也说明,双方在相互通讯的时候,只是对第一次建立好的socket持续进行操作。

③ sendMessageToUser(自己定义的):发送给指定用户信息。主要逻辑是:根据用户ID从常量users(记录每一个Socket)中,获取Socket,往该Socket里发送消息,只要客户端还在线,就能收到该消息。

④sendMessageToAllUsers (自己定义的):这个广播消息,发送信息给所有socket。主要逻辑是:跟③类型,只不过是遍历整个users获取每一个socket,给每一个socket发送消息即可完广播发送

⑤handleTransportError(接口提供的):连接出错时,回调的方法。主要逻辑是:一旦有连接出错的Socket,就从users里进行移除,有提供该Socket的参数,可直接获取ID,进行移除。这个在客户端没有正常关闭连接时,会进来,所以在开发客户端时,记得关闭连接

⑥afterConnectionClosed(接口提供的):连接关闭时,回调的方法。主要逻辑:一旦客户端/服务器主动关闭连接时,将个socket从users里移除,有提供该Socket的参数,可直接获取ID,进行移除。

至此,后端的开发工作已经完了,剩下的就是前端的实现了,前端JavaScript代码如下:

	var ID = 12345865546544; // 随机数
	//建立webSocket连接,若服务器端开通了https协议,则客户端必须使用wss协议
    var   websocket = new WebSocket("ws://192.168.1.18:8080/service/ID="+ID);
      
	//打开webSokcet连接时,回调该函数
   websocket.onopen = function () {      
	console.log("onpen");  
   }
   
   //关闭webSocket连接时,回调该函数
   websocket.onclose = function () {
   //关闭连接    
	console.log("onclose");
   }

   //接收信息
   websocket.onmessage = function (msg) {
	console.log(msg.data);
   }

怎么样,比较简单吧。至此,SpringBoot 与 WebSocket 已经集成完了。

本站文章如未注明,均为原创丨本网站采用BY-NC-SA协议进行授权,转载请注明转自:http://www.snowruin.com/?p=1747
喜欢 (1)or分享 (0)
发表我的评论
取消评论
表情 代码 贴图 加粗 链接 私信 删除线 签到

Hi,请填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址