Spring Boot 提供了 Websocket 组件,直接使用即可。Spring 同时也封装了 SockJs 的消息传递功能,使用起来较为方便,开发过程中推荐使用。本文只讨论 SockJs 的方式。

1. 添加依赖

<!-- websocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2. 自定义消息处理器

Spring Boot 提供了一个 WebSocketHandler 消息处理接口,源码如下:

public interface WebSocketHandler {

	void afterConnectionEstablished(WebSocketSession session) throws Exception;

	void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;

	void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;

	void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;

	boolean supportsPartialMessages();

}

WebSocketHandler 提供了4个方法:

  • afterConnectionEstablished:当客户端连接成功后调用
  • handleMessage:当接收到客户端消息后调用
  • handleTransportError:当与客户端连接失败时调用
  • afterConnectionClosed:当与客户端断开连接后调用

而真正用于处理消息的是 TextWebSocketHandler ,继承结构图如下:

image.png

其中 AbstractWebSocketHandler 提供了处理各种消息的方法:

......
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
	if (message instanceof TextMessage) {
		handleTextMessage(session, (TextMessage) message);
	}
	else if (message instanceof BinaryMessage) {
		handleBinaryMessage(session, (BinaryMessage) message);
	}
	else if (message instanceof PongMessage) {
		handlePongMessage(session, (PongMessage) message);
	}
	else {
		throw new IllegalStateException("Unexpected WebSocket message type: " + message);
	}
}
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
}
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
}
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
}
......

接下来我们创建 MyTextWebSocketHandler 继承 TextWebSocketHandler

@Component
public class MyTextWebSocketHandler extends TextWebSocketHandler {

    private final Logger logger = LoggerFactory.getLogger(MyTextWebSocketHandler.class);

    private final AtomicInteger onlineCount = new AtomicInteger(0);

    private static final CopyOnWriteArraySet<WebSocketSession> SESSIONS = new CopyOnWriteArraySet<>();

    /**
     * 连接建立后调用的方法
     *
     * @param session 客户端 session
     * @throws Exception 异常
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        SESSIONS.add(session);
        logger.info("client {} join in, now online number is : {}", session.getId(), onlineCount.incrementAndGet());
        session.sendMessage(new TextMessage("当前客户端会话 ID 为: " + session.getId()));
        groupSendMessage(session.getId(), session.getId() + " 进入聊天室");
    }

    /**
     * 连接断开后调用的方法
     *
     * @param session 客户端 session
     * @param status  关闭状态, 1000 为正常关闭
     * @throws Exception 异常
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        super.afterConnectionClosed(session, status);
        logger.info("client {} was closed, now online number is : {}", session.getId(), onlineCount.decrementAndGet());

        groupSendMessage(session.getId(), session.getId() + " 已断开连接");
    }

    /**
     * 获取到客户端发送的文本消息时调用的方法
     *
     * @param session 客户端 session
     * @param message 消息内容
     * @throws Exception 异常
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String textMessage = message.getPayload();
        logger.info("message from client {}: {}", session.getId(), message);
        session.sendMessage(new TextMessage("我: " + textMessage));

        // 发送消息给其他客户端
        groupSendMessage(session.getId(), session.getId() + " 说: " + textMessage);
    }

    /**
     * 客户端连接异常时调用的方法
     *
     * @param session   客户端 session
     * @param exception 异常信息
     * @throws Exception 异常
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        session.close(CloseStatus.SERVER_ERROR);
        logger.error("server error, Session ID: {}, error info: {}", session.getId(), exception.getMessage());
    }

    /**
     * 向指定客户端发送消息
     *
     * @param sessionId 被指定客户端 session
     * @param message   消息内容
     */
    public void sendMessage(String sessionId, String message) {
        WebSocketSession targetSession = SESSIONS.stream()
                .filter(session -> sessionId.equals(session.getId()))
                .findAny()
                .orElse(null);

        if (targetSession == null) {
            logger.warn("{} is offline", sessionId);
            return;
        }
        try {
            targetSession.sendMessage(new TextMessage(message));
        } catch (IOException e) {
            logger.error("client {} send message fail, error: {}", sessionId, e.getMessage());
        }
    }

    /**
     * 群发消息
     *
     * @param sourceSessionId 发出消息的 session 的 id
     * @param message       消息内容
     */
    public void groupSendMessage(String sourceSessionId, String message) {
        SESSIONS.stream()
                .filter(WebSocketSession::isOpen)
                .filter(session -> !session.getId().equals(sourceSessionId))
                .forEach(session -> sendMessage(session.getId(), message));
    }

}

3. 使用 JavaConfig 方式注册自定义消息处理器

创建 WebsocketConfig 类实现 WebSocketConfigurer 接口:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private MyTextWebSocketHandler myTextWebSocketHandler;

    /**
     * 注册 websocket 处理器
     * @param registry WebSocketHandlerRegistry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myTextWebSocketHandler, "/connect").withSockJS();
    }

//    /**
//     * WebSocket服务器端点
//     * @return ServerEndpointExporter
//     */
//    @Bean
//    public ServerEndpointExporter serverEndpointExporter() {
//        return new ServerEndpointExporter();
//    }

}

使用 @EnableWebSocket 开启 WebSocket 功能,也就是使 Spring 容器初始化 WebSocket 相关组件。至于是如何注册以及初始化这些组件的,可以参考 Spring Boot 深入学习(3) —— 理解 Enable 模块驱动

withSockJS 表示前端客户端通信功能是由 SockJS 实现的,当然也可以不加,使用默认方式实现。

使用 WebSocket 默认的实现方式需要创建一个 服务端点,并在配置类中注册一个 ServerEndpointExporter,这里不做展开。

4. 客户端实现

页面以及样式借用网上的源码:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket客户端</title>
    <link href="https://cdn.bootcss.com/twitter-bootstrap/4.4.1/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<style>
    .jumbotron {
        width: 100%;
    }

    #text {
        height: 3rem;
        font-size: 1rem;
        line-height: 3rem;
        margin: 1rem;
    }

    .btn {
        margin-right: 5px;
    }

    #connect {
        margin-left: 1rem;
    }

    #log {
        margin: 1rem 0 0 1rem;
    }

</style>
<div class="container">
    <div class="row">
        <div class="jumbotron">
            <input type="text" placeholder="请输入你想传输的内容" id="text" class="col-lg-12"/>
            <input type="button" value="连接" class="btn btn-info" id="connect" onclick="connect()"/>
            <input type="button" value="发送" class="btn btn-success" id="sent" disabled="disabled" onclick="sent()"/>
            <input type="button" value="断开" class="btn btn-danger" id="disconnect" disabled="disabled"
                   onclick="disconnect()"/>

            <div id="log">
                <p>聊天记录:</p>
            </div>
        </div>
    </div>
</div>

我们同样可以使用默认方式SockJS 方式创建 WebSocket 连接:

使用 SockJS 方式需要引入 sockjs 库

<script src="https://cdn.bootcss.com/sockjs-client/0.3.4/sockjs.min.js"></script>
function connect() {
    // ws = new WebSocket("ws://localhost:8080/ws");
    ws = new SockJS("/connect");
    ws.onopen = function () {
        setConnected(true);
        log('和服务端连接成功!');
    };
    ws.onmessage = function (event) {
        log(event.data);
    };
    ws.onclose = function () {
        setConnected(false);
        log('和服务端断开连接!')
    }
}

客户端方法实现:

<script type="text/javascript">
    let  text = document.querySelector('#text');
    let connectBtn = document.querySelector("#connect");
    let sentBtn = document.querySelector("#sent");
    let disconnectBtn = document.querySelector("#disconnect");
    let logDiv = document.querySelector("#log");

    /**
     * 发送消息
     */
    function sent() {
        if (ws != null) {
            ws.send(text.value);
            // log('客户端说:' + text.value);
        } else {
            log('请先建立连接!')
        }
    }

    /**
     * 断开连接
     */
    function disconnect() {
        if (ws != null) {
            ws.close();
            ws = null;
        }
        setConnected(false);
    }

    /**
     * 记录聊天记录
     * @param value 聊天内容
     */
    function log(value) {
        let content = document.createElement('p');
        content.innerHTML = value;
        logDiv.appendChild(content);
        text.value = '';
    }

    function setConnected(connected) {
        connectBtn.disabled = connected;
        disconnectBtn.disabled = !connected;
        sentBtn.disabled = !connected;
    }
</script>

5. 测试聊天

开启两个页面:

image.png

分别连接服务端,查看控制台输出:

image.png

client aj7_nwbk join in, now online number is : 1
client 180j6end join in, now online number is : 2

发送消息,查看控制台输出:

image.png

message from client aj7_nwbk: TextMessage payload=[大家好呀], byteCount=12, last=true]
message from client 180j6end: TextMessage payload=[今天要来点兔子吗?], byteCount=27, last=true]
message from client aj7_nwbk: TextMessage payload=[不要兔子,来头🐂,..], byteCount=37, last=true]

断开连接,查看控制台输出:

image.png

client aj7_nwbk was closed, now online number is : 1
client 180j6end was closed, now online number is : 0

6. 设置多房间

上面我们所有用户的连接都相当于是在同一个聊天室中的,但在实际开发过程中可能存在多个群组,也就说要有多个聊天室。

应对这种情况,只需稍微做下改动即可。

为了更贴切实际应用场景,先创建一个 WebSocketRoom 聊天室类,模拟数据库查询聊天室信息:

public class WebSocketRoom implements Serializable {

    private static final long serialVersionUID = 1564587746789703609L;

    private Integer id;

    private String room;

    private String connectPath;

    public WebSocketRoom(Integer id, String room, String connectPath) {
        this.id = id;
        this.room = room;
        this.connectPath = connectPath;
    }

// 省略 setter、getter、equals、hashCode
}
@Service
public class WebSocketRoomServiceImpl implements WebSocketRoomService {
  
    @Override
    public Set<WebSocketRoom> findAll() {
        Set<WebSocketRoom> rooms = new HashSet<>();
        rooms.add(new WebSocketRoom(1, "001", "/connect/001"));
        rooms.add(new WebSocketRoom(2, "002", "/connect/002"));
        rooms.add(new WebSocketRoom(3, "003", "/connect/003"));
        rooms.add(new WebSocketRoom(4, "004", "/connect/004"));
        rooms.add(new WebSocketRoom(5, "005", "/connect/005"));
        rooms.add(new WebSocketRoom(6, "006", "/connect/006"));
        return rooms;
    }
}

然后修改 WebSocketConfig 中注册 websocket 处理器的方法:

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        Set<WebSocketRoom> rooms = webSocketRoomService.findAll();
        List<String> roomConnects = rooms.stream().map(WebSocketRoom::getConnectPath).collect(Collectors.toList());
        registry.addHandler(myTextWebSocketHandler, roomConnects.toArray(new String[] {}))
                .withSockJS();
    }

修改 MyTextWebSocketHandler 中对应的方法,在前端建立连接时传入对应的房间连接即可,代码这里就不贴了,详情可参照源码。

源码地址:https://github.com/NekoChips/SpringDemo/tree/master/19.springboot-websocket


关于作者:NekoChips
本文地址:https://chenyangjie.com.cn/articles/2020/04/26/1587891946175.html
版权声明:本篇所有文章仅用于学习和技术交流,本作品采用 BY-NC-SA 4.0 许可协议,如需转载请注明出处!
许可协议:知识共享许可协议