Spring Boot 提供了 Websocket 组件,直接使用即可。Spring 同时也封装了 SockJs 的消息传递功能,使用起来较为方便,开发过程中推荐使用。本文只讨论 SockJs 的方式。
1. 添加依赖
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 自定义消息处理器
Spring Boot 提供了一个 WebSocketHandler 消息处理接口,源码如下:
public interface WebSocketHandler {
void afterConnectionEstablished(WebSocketSession session) throws Exception;
void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
boolean supportsPartialMessages();
}
WebSocketHandler
提供了4个方法:
afterConnectionEstablished
:当客户端连接成功后调用handleMessage
:当接收到客户端消息后调用handleTransportError
:当与客户端连接失败时调用afterConnectionClosed
:当与客户端断开连接后调用
而真正用于处理消息的是 TextWebSocketHandler
,继承结构图如下:
其中 AbstractWebSocketHandler
提供了处理各种消息的方法:
......
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (message instanceof TextMessage) {
handleTextMessage(session, (TextMessage) message);
}
else if (message instanceof BinaryMessage) {
handleBinaryMessage(session, (BinaryMessage) message);
}
else if (message instanceof PongMessage) {
handlePongMessage(session, (PongMessage) message);
}
else {
throw new IllegalStateException("Unexpected WebSocket message type: " + message);
}
}
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
}
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
}
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
}
......
接下来我们创建 MyTextWebSocketHandler
继承 TextWebSocketHandler
:
@Component
public class MyTextWebSocketHandler extends TextWebSocketHandler {
private final Logger logger = LoggerFactory.getLogger(MyTextWebSocketHandler.class);
private final AtomicInteger onlineCount = new AtomicInteger(0);
private static final CopyOnWriteArraySet<WebSocketSession> SESSIONS = new CopyOnWriteArraySet<>();
/**
* 连接建立后调用的方法
*
* @param session 客户端 session
* @throws Exception 异常
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
SESSIONS.add(session);
logger.info("client {} join in, now online number is : {}", session.getId(), onlineCount.incrementAndGet());
session.sendMessage(new TextMessage("当前客户端会话 ID 为: " + session.getId()));
groupSendMessage(session.getId(), session.getId() + " 进入聊天室");
}
/**
* 连接断开后调用的方法
*
* @param session 客户端 session
* @param status 关闭状态, 1000 为正常关闭
* @throws Exception 异常
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
logger.info("client {} was closed, now online number is : {}", session.getId(), onlineCount.decrementAndGet());
groupSendMessage(session.getId(), session.getId() + " 已断开连接");
}
/**
* 获取到客户端发送的文本消息时调用的方法
*
* @param session 客户端 session
* @param message 消息内容
* @throws Exception 异常
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String textMessage = message.getPayload();
logger.info("message from client {}: {}", session.getId(), message);
session.sendMessage(new TextMessage("我: " + textMessage));
// 发送消息给其他客户端
groupSendMessage(session.getId(), session.getId() + " 说: " + textMessage);
}
/**
* 客户端连接异常时调用的方法
*
* @param session 客户端 session
* @param exception 异常信息
* @throws Exception 异常
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
session.close(CloseStatus.SERVER_ERROR);
logger.error("server error, Session ID: {}, error info: {}", session.getId(), exception.getMessage());
}
/**
* 向指定客户端发送消息
*
* @param sessionId 被指定客户端 session
* @param message 消息内容
*/
public void sendMessage(String sessionId, String message) {
WebSocketSession targetSession = SESSIONS.stream()
.filter(session -> sessionId.equals(session.getId()))
.findAny()
.orElse(null);
if (targetSession == null) {
logger.warn("{} is offline", sessionId);
return;
}
try {
targetSession.sendMessage(new TextMessage(message));
} catch (IOException e) {
logger.error("client {} send message fail, error: {}", sessionId, e.getMessage());
}
}
/**
* 群发消息
*
* @param sourceSessionId 发出消息的 session 的 id
* @param message 消息内容
*/
public void groupSendMessage(String sourceSessionId, String message) {
SESSIONS.stream()
.filter(WebSocketSession::isOpen)
.filter(session -> !session.getId().equals(sourceSessionId))
.forEach(session -> sendMessage(session.getId(), message));
}
}
3. 使用 JavaConfig 方式注册自定义消息处理器
创建 WebsocketConfig
类实现 WebSocketConfigurer
接口:
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private MyTextWebSocketHandler myTextWebSocketHandler;
/**
* 注册 websocket 处理器
* @param registry WebSocketHandlerRegistry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myTextWebSocketHandler, "/connect").withSockJS();
}
// /**
// * WebSocket服务器端点
// * @return ServerEndpointExporter
// */
// @Bean
// public ServerEndpointExporter serverEndpointExporter() {
// return new ServerEndpointExporter();
// }
}
使用 @EnableWebSocket
开启 WebSocket
功能,也就是使 Spring 容器初始化 WebSocket
相关组件。至于是如何注册以及初始化这些组件的,可以参考 Spring Boot 深入学习(3) —— 理解 Enable 模块驱动 。
withSockJS
表示前端客户端通信功能是由 SockJS
实现的,当然也可以不加,使用默认方式实现。
使用 WebSocket 默认的实现方式需要创建一个 服务端点,并在配置类中注册一个 ServerEndpointExporter,这里不做展开。
4. 客户端实现
页面以及样式借用网上的源码:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket客户端</title>
<link href="https://cdn.bootcss.com/twitter-bootstrap/4.4.1/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<style>
.jumbotron {
width: 100%;
}
#text {
height: 3rem;
font-size: 1rem;
line-height: 3rem;
margin: 1rem;
}
.btn {
margin-right: 5px;
}
#connect {
margin-left: 1rem;
}
#log {
margin: 1rem 0 0 1rem;
}
</style>
<div class="container">
<div class="row">
<div class="jumbotron">
<input type="text" placeholder="请输入你想传输的内容" id="text" class="col-lg-12"/>
<input type="button" value="连接" class="btn btn-info" id="connect" onclick="connect()"/>
<input type="button" value="发送" class="btn btn-success" id="sent" disabled="disabled" onclick="sent()"/>
<input type="button" value="断开" class="btn btn-danger" id="disconnect" disabled="disabled"
onclick="disconnect()"/>
<div id="log">
<p>聊天记录:</p>
</div>
</div>
</div>
</div>
我们同样可以使用默认方式 和 SockJS 方式创建 WebSocket 连接:
使用 SockJS 方式需要引入 sockjs 库
<script src="https://cdn.bootcss.com/sockjs-client/0.3.4/sockjs.min.js"></script>
function connect() {
// ws = new WebSocket("ws://localhost:8080/ws");
ws = new SockJS("/connect");
ws.onopen = function () {
setConnected(true);
log('和服务端连接成功!');
};
ws.onmessage = function (event) {
log(event.data);
};
ws.onclose = function () {
setConnected(false);
log('和服务端断开连接!')
}
}
客户端方法实现:
<script type="text/javascript">
let text = document.querySelector('#text');
let connectBtn = document.querySelector("#connect");
let sentBtn = document.querySelector("#sent");
let disconnectBtn = document.querySelector("#disconnect");
let logDiv = document.querySelector("#log");
/**
* 发送消息
*/
function sent() {
if (ws != null) {
ws.send(text.value);
// log('客户端说:' + text.value);
} else {
log('请先建立连接!')
}
}
/**
* 断开连接
*/
function disconnect() {
if (ws != null) {
ws.close();
ws = null;
}
setConnected(false);
}
/**
* 记录聊天记录
* @param value 聊天内容
*/
function log(value) {
let content = document.createElement('p');
content.innerHTML = value;
logDiv.appendChild(content);
text.value = '';
}
function setConnected(connected) {
connectBtn.disabled = connected;
disconnectBtn.disabled = !connected;
sentBtn.disabled = !connected;
}
</script>
5. 测试聊天
开启两个页面:
分别连接服务端,查看控制台输出:
client aj7_nwbk join in, now online number is : 1
client 180j6end join in, now online number is : 2
发送消息,查看控制台输出:
message from client aj7_nwbk: TextMessage payload=[大家好呀], byteCount=12, last=true]
message from client 180j6end: TextMessage payload=[今天要来点兔子吗?], byteCount=27, last=true]
message from client aj7_nwbk: TextMessage payload=[不要兔子,来头🐂,..], byteCount=37, last=true]
断开连接,查看控制台输出:
client aj7_nwbk was closed, now online number is : 1
client 180j6end was closed, now online number is : 0
6. 设置多房间
上面我们所有用户的连接都相当于是在同一个聊天室中的,但在实际开发过程中可能存在多个群组,也就说要有多个聊天室。
应对这种情况,只需稍微做下改动即可。
为了更贴切实际应用场景,先创建一个 WebSocketRoom
聊天室类,模拟数据库查询聊天室信息:
public class WebSocketRoom implements Serializable {
private static final long serialVersionUID = 1564587746789703609L;
private Integer id;
private String room;
private String connectPath;
public WebSocketRoom(Integer id, String room, String connectPath) {
this.id = id;
this.room = room;
this.connectPath = connectPath;
}
// 省略 setter、getter、equals、hashCode
}
@Service
public class WebSocketRoomServiceImpl implements WebSocketRoomService {
@Override
public Set<WebSocketRoom> findAll() {
Set<WebSocketRoom> rooms = new HashSet<>();
rooms.add(new WebSocketRoom(1, "001", "/connect/001"));
rooms.add(new WebSocketRoom(2, "002", "/connect/002"));
rooms.add(new WebSocketRoom(3, "003", "/connect/003"));
rooms.add(new WebSocketRoom(4, "004", "/connect/004"));
rooms.add(new WebSocketRoom(5, "005", "/connect/005"));
rooms.add(new WebSocketRoom(6, "006", "/connect/006"));
return rooms;
}
}
然后修改 WebSocketConfig
中注册 websocket 处理器的方法:
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
Set<WebSocketRoom> rooms = webSocketRoomService.findAll();
List<String> roomConnects = rooms.stream().map(WebSocketRoom::getConnectPath).collect(Collectors.toList());
registry.addHandler(myTextWebSocketHandler, roomConnects.toArray(new String[] {}))
.withSockJS();
}
修改 MyTextWebSocketHandler 中对应的方法,在前端建立连接时传入对应的房间连接即可,代码这里就不贴了,详情可参照源码。
源码地址:https://github.com/NekoChips/SpringDemo/tree/master/19.springboot-websocket
关于作者:NekoChips
本文地址:https://chenyangjie.com.cn/articles/2020/04/26/1587891946175.html
版权声明:本篇所有文章仅用于学习和技术交流,本作品采用 BY-NC-SA 4.0 许可协议,如需转载请注明出处!
许可协议:
