原创

WebSocket

温馨提示:
本文最后更新于 2024年08月01日,已超过 261 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

1.WebSocket

1.1简介

WebSocket 是一种网络通信协议,它提供了全双工的通信通道,即客户端和服务器可以双向发送数据。WebSocket 协议最初是为了克服 HTTP 协议的单向通信限制而设计的,HTTP 协议是基于请求-响应模型的。WebSocket 在 HTTP/HTTPS 上建立连接,并通过一个握手过程升级为持久连接。


1.2特点

全双工通信:
客户端和服务器都可以主动发起数据传输。
低延迟:
相比于轮询或长轮询,WebSocket 提供更低的延迟。
持久连接:
一旦连接建立,就可以持续保持,直到一方主动关闭。
多平台支持:
WebSocket 协议被广泛支持,包括浏览器、移动应用和桌面应用。
扩展性:
可以方便地集成到现有的 Web 应用中。

1.3作用

实时通信:
实现即时消息传递,如聊天应用、实时通知等。
支持游戏中的实时交互。
数据流传输:
实时股票报价、天气数据更新等。
远程控制:
控制远程设备或应用程序,如 IoT 设备管理。
文件传输:
实现实时文件传输功能。
在线协作:
实现多人在线编辑文档等功能。

1.4与 Socket 的区别

协议层次:
Socket:是操作系统提供的底层通信接口,用于建立网络连接。
WebSocket:基于 Socket 的高层协议,用于实现全双工通信。
握手过程:
Socket:通常没有特定的握手过程。
WebSocket:需要经过一个 HTTP 握手过程来升级到 WebSocket 连接。
通信模式:
Socket:可以是半双工或全双工,但通常需要客户端或服务器一方主动发起连接。
WebSocket:始终是全双工的,客户端和服务器都可以主动发送数据。
应用场景:
Socket:广泛应用于各种网络通信场景。
WebSocket:主要用于 Web 应用中的实时通信。

2.SpringBoot集成

WebSocket配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;

/**
 * WebSocket 配置类
 */
@Configuration
public class WebSocketConfig {

    /**
     * 自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    /**
     * 配置 WebSocket 的最大文本消息和二进制消息缓冲区大小,以及会话空闲超时时间。
     * @return
     */
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        // 设置最大文本消息缓冲区大小为 10MB
        container.setMaxTextMessageBufferSize(10240000);
        // 设置最大二进制消息缓冲区大小为 10MB
        container.setMaxBinaryMessageBufferSize(10240000);
        // 设置会话空闲超时时间为 15 分钟
        container.setMaxSessionIdleTimeout(15 * 60000L);
        return container;
    }
}

WebSocket的服务端

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * WebSocket的服务端
 */
@Component
@Slf4j
@ServerEndpoint("/api/pushMessage/{userId}")
public class WebSocketServer {

    /**
     * 静态变量,用来记录当前在线连接数。
     */
    private static final AtomicInteger onlineCount = new AtomicInteger(0);
    /**
     * concurrent包的线程安全Map,用来存放每个客户端对应的WebSocket对象。
     */
    private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private String userId = "";

    /**
     * 连接建立成功调用的方法
     * @param session 当前会话
     * @param userId 用户标识
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        // 保存当前会话
        this.session = session;
        // 设置当前连接的userId
        this.userId = userId;

        // 如果已经存在,则先移除
        webSocketMap.computeIfPresent(userId, (k, v) -> {
            webSocketMap.remove(userId);
            return this;
        });
        // 加入Map中
        webSocketMap.putIfAbsent(userId, this);
        // 在线数加1
        onlineCount.incrementAndGet();
        log.info("用户连接: {}, 当前在线人数为: {}", userId, onlineCount.get());
        sendMessage("连接成功");
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketMap.remove(userId);
        onlineCount.decrementAndGet();
        log.info("用户退出: {}, 当前在线人数为: {}", userId, onlineCount.get());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:" + userId + ",报文:" + message);
        if (StringUtils.isNotBlank(message)) {
            try {
                // 解析发送的报文
                JSONObject jsonObject = JSON.parseObject(message);
                // 追加发送人(防止串改)
                jsonObject.put("fromUserId", this.userId);
                String toUserId = jsonObject.getString("toUserId");

                // 传送给对应toUserId用户的websocket
                WebSocketServer targetWebSocket = webSocketMap.get(toUserId);
                if (targetWebSocket != null) {
                    targetWebSocket.sendMessage(message);
                } else {
                    log.error("请求的userId: {} 不在该服务器上", toUserId);
                }
            } catch (Exception e) {
                log.error("处理消息时发生错误: {}", e.getMessage(), e);
            }
        }
    }


    /**
     * 发生错误时被调用
     * @param session 当前会话
     * @param error 引发的错误
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误: {}, 原因: {}", this.userId, error.getMessage(), error);
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error("发送消息时发生错误: {}", e.getMessage(), e);
        }
    }

    /**
     * 发送自定义消息
     * @param message 消息内容
     * @param userId 接收消息的用户ID
     */
    public static void sendInfo(String message, String userId) {
        log.info("发送消息到: {}, 报文: {}", userId, message);
        WebSocketServer webSocketServer = webSocketMap.get(userId);
        if (webSocketServer != null) {
            webSocketServer.sendMessage(message);
        } else {
            log.error("用户 {} 不在线!", userId);
        }
    }
}

3.测试

Apifox上新建WebSocket接口,输入ws://localhost:8080/api/pushMessage/1,即可进行测试。

正文到此结束