在项目研发发中由于调用了第三方的支付接口,支付接口只有一个异步回调的通知。由于前端和后端是分开部署的,所以引发了一个问题,支付成功后异步回调后端会处理逻辑,但前端并不知道支付的接果。为了解决研发成本,所以就选择了WebSocket。下面主要介绍SpringBoot是怎样集成WebSocket的?
首先呢,我们先引入一个WebSocket依赖。代码如下:
<dependency> <groupId>org.springframework.boot </groupId> <artifactId>spring-boot-starter-websocket </artifactId> </dependency>
加完依赖后,就可以开始编码了,在SpringBoot中添加WebSocket的配置,配置如下:
import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; /** * websocket 配置 * @author zxm * */ @Configuration @EnableWebSocket public class WebSocketH5Config implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { // TODO Auto-generated method stub registry.addHandler(new WebSocketH5Handler(), "/service/{ID}").setAllowedOrigins("*").addInterceptors(new WebSocketInterceptor()); } }
1.@Configuration:注解标识该类为Spring的配置类
2.@EnableWebSocket:开启注解接收和发送消息
3.实现WebSocketConfigurer接口,重写registerWebSocketHandlers方法,这是一个核心实现方法,配置websocket入口,允许访问的域、注册Handler、定义拦截器。客户端通过“/service/{ID}”直接访问Handler核心类,进行socket的连接、接收、发送等操作,这里由于还加了个拦截器,所以建立新的socket访问时,都先进来拦截器再进去Handler类,“new WebSocketInterceptor()”是我实现的拦截器,“new WebSocketInterceptor()”是我实现的一个Handler类
上面提到了WebSocketInterceptor拦截器的实现,下面来看下是如何实现的,代码如下:
import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import lombok.extern.log4j.Log4j; /** * websocket 拦截器 * @author zxm * */ @Log4j public class WebSocketInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { // TODO Auto-generated method stub if(request instanceof ServletServerHttpRequest) { String ID = request.getURI().toString().split("ID=")[1]; log.info("当前的sessionID="+ID); ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request; HttpSession session = serverHttpRequest.getServletRequest().getSession(); attributes.put("WEBSOCKET_USERID", ID); } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { // TODO Auto-generated method stub log.info("后置拦截"); } }
上述代码实现了HandshakeInterceptor 接口,并实现了beforeHandshake该方法,该方法是在进入Handler核心类之前进行拦截。
这里主要实现的逻辑是:
截取客户端建立webSocket连接时发送的URL地址字符串,并通过对该字符串进行特殊标识截取操作,获取客户端发送的唯一标识(由自己定义的,一般是系统用户ID唯一标识,用以标识该用户),并把它以键值对的形式放到Session里,这样后期可以通过该session获取它对应的用户ID了。【一个session对应着一个webSocketSession】
下面再看看handler处理器的具体实现,代码如下:
import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import com.alibaba.fastjson.JSON; import com.cupiday.common.utils.PublicDictUtil; import com.google.common.collect.Maps; import lombok.extern.log4j.Log4j; /** * websocket 消息处理器 * @author zxm * */ @Log4j @Component public class WebSocketH5Handler implements org.springframework.web.socket.WebSocketHandler { private static final Map<String, WebSocketSession> users = Maps.newConcurrentMap(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { // TODO Auto-generated method stub log.info("成功建立连接"); String ID = session.getUri().toString().split("ID=")[1]; log.info("连接ID="+ID); if(ID != null) { users.put(ID, session); HashMap<String,Object> message = Maps.newHashMap(); message.put(PublicDictUtil.KEY , PublicDictUtil.SUCCESS_VALUE); message.put(PublicDictUtil.MSG_KEY, "成功建立socket连接"); session.sendMessage(new TextMessage(JSON.toJSONString(message))); log.info("当前session="+session); } log.info("当前在线人数:"+users.size()); } @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { // TODO Auto-generated method stub log.info(message.getPayload().toString()); log.info(message.getPayload().toString()+" :来自"+(String)session.getAttributes().get("WEBSOCKET_USERID")+"的消息"); sendMessageToUser(session.getAttributes().get("WEBSOCKET_USERID").toString(),new TextMessage("服务器收到了,hello")); } /** * 发送消息给指定连接 * @param clientId * @param message * @return */ public boolean sendMessageToUser(String clientId,TextMessage message) { if(!StringUtils.hasLength(clientId)) { return false; } WebSocketSession webSocketSession = users.get(clientId); if(webSocketSession == null) { return false; } if(!webSocketSession.isOpen()) { return false; } try { webSocketSession.sendMessage(message); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return false; } return true; } /** * 广播消息 * @param message * @return */ public boolean sendMessageToAllUsers(TextMessage message) { boolean allSendSuccess = true; Set clientIds = users.keySet(); WebSocketSession session = null; for (String clientId : clientIds) { try { session = users.get(clientId); if (session.isOpen()) { session.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); allSendSuccess = false; } } return true; } /** * 关闭 * @param clientId */ public void close(String clientId) { WebSocketSession webSocketSession = users.get(clientId); if(webSocketSession != null) { try { webSocketSession.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { // TODO Auto-generated method stub if(session.isOpen()) { session.close(); } log.info("连接出错"); users.remove(getClientId(session)); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { // TODO Auto-generated method stub log.info("连接已经关闭:"+closeStatus); users.remove(getClientId(session)); } @Override public boolean supportsPartialMessages() { // TODO Auto-generated method stub return false; } /** * 获取连接id * @param session * @return */ private String getClientId(WebSocketSession session) { try { String id = (String)session.getAttributes().get("WEBSOCKET_USERID"); return id; } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; }
嗯,代码有点长啊,来具体说说这些代码都是干啥的:
1.实现了WebSocketHandler接口,并实现了关键的几个方法。
① afterConnectionEstablished(接口提供的):建立新的socket连接后回调的方法。主要逻辑是:将成功建立连接的webSocketSssion放到定义好的常量[private static final Map<String, WebSocketSession> users;]中去。这里也截取客户端访问的URL的字符串,拿到标识,以键值对的形式讲每一个webSocketSession存到users里,以记录每个Socket。
② handleMessage(接口提供的):接收客户端发送的Socket。主要逻辑是:获取客户端发送的信息。这里之所以可以获取本次Socket的ID,是因为客户端在第一次进行连接时,拦截器进行拦截后,设置好ID,这样也说明,双方在相互通讯的时候,只是对第一次建立好的socket持续进行操作。
③ sendMessageToUser(自己定义的):发送给指定用户信息。主要逻辑是:根据用户ID从常量users(记录每一个Socket)中,获取Socket,往该Socket里发送消息,只要客户端还在线,就能收到该消息。
④sendMessageToAllUsers (自己定义的):这个广播消息,发送信息给所有socket。主要逻辑是:跟③类型,只不过是遍历整个users获取每一个socket,给每一个socket发送消息即可完广播发送
⑤handleTransportError(接口提供的):连接出错时,回调的方法。主要逻辑是:一旦有连接出错的Socket,就从users里进行移除,有提供该Socket的参数,可直接获取ID,进行移除。这个在客户端没有正常关闭连接时,会进来,所以在开发客户端时,记得关闭连接
⑥afterConnectionClosed(接口提供的):连接关闭时,回调的方法。主要逻辑:一旦客户端/服务器主动关闭连接时,将个socket从users里移除,有提供该Socket的参数,可直接获取ID,进行移除。
至此,后端的开发工作已经完了,剩下的就是前端的实现了,前端JavaScript代码如下:
var ID = 12345865546544; // 随机数 //建立webSocket连接,若服务器端开通了https协议,则客户端必须使用wss协议 var websocket = new WebSocket("ws://192.168.1.18:8080/service/ID="+ID); //打开webSokcet连接时,回调该函数 websocket.onopen = function () { console.log("onpen"); } //关闭webSocket连接时,回调该函数 websocket.onclose = function () { //关闭连接 console.log("onclose"); } //接收信息 websocket.onmessage = function (msg) { console.log(msg.data); }
怎么样,比较简单吧。至此,SpringBoot 与 WebSocket 已经集成完了。