WebSocket从入门到实战:用Spring Boot打造实时通信系统
WebSocket从入门到实战:用Spring Boot打造实时通信系统
🌺The Begin🌺点点关注,收藏不迷路🌺 |
一、为什么需要WebSocket?
1.1 HTTP的局限性
在开始WebSocket之前,我们先看看传统的HTTP协议有什么问题。
想象一个聊天室场景:用户A发送消息给用户B,如果用HTTP实现,通常是这样:
用户B服务器用户A用户B服务器用户Aloop[轮询]循环N次后...发送消息给B (HTTP请求)返回成功有新消息吗?(HTTP请求)没有 (响应)有新消息吗?有!给你消息
这种轮询方式的痛点很明显:
- 浪费资源:大部分请求都是无意义的空轮询
- 实时性差:轮询间隔决定了延迟
- 服务器压力大:大量无效请求消耗连接资源
1.2 WebSocket的解决方案
WebSocket的出现完美解决了这些问题:
A,S, B用户B服务器用户AA,S, B用户B服务器用户A建立WebSocket连接建立WebSocket连接连接保持,随时双向通信HTTP升级请求101 Switching ProtocolsHTTP升级请求101 Switching Protocols发送消息给B推送消息收到确认(可选)
一次连接,双向通信,服务器可以主动推送数据给客户端。
二、WebSocket基础概念
2.1 什么是WebSocket?
WebSocket是一种在单个TCP连接上进行全双工通信的协议。它由RFC 6455定义,允许服务器主动向客户端推送数据。
核心特点:
- 全双工:客户端和服务器可以同时发送消息
- 低延迟:建立连接后,无需重复握手
- 轻量级:消息头部小,开销低
- 跨域:天然支持跨域通信
2.2 连接状态
WebSocket连接有四种状态:
// 0: CONNECTING - 连接中// 1: OPEN - 已连接// 2: CLOSING - 关闭中// 3: CLOSED - 已关闭 console.log(WebSocket.CONNECTING);// 0 console.log(WebSocket.OPEN);// 1 console.log(WebSocket.CLOSING);// 2 console.log(WebSocket.CLOSED);// 3三、实战:在线聊天室系统
我们来实现一个完整的在线聊天室,包含以下功能:
- 用户连接/断开管理
- 群聊广播
- 私聊功能
- 在线用户列表
- 心跳检测
3.1 整体架构
数据层
业务层
接入层
客户端层
浏览器1
浏览器2
浏览器N
WebSocket服务器
端口:8080/ws
用户管理
消息分发
会话管理
Redis
MySQL
3.2 环境搭建
Maven依赖:
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version></parent><groupId>com.example</groupId><artifactId>websocket-chat</artifactId><version>1.0.0</version><properties><java.version>11</java.version></properties><dependencies><!-- WebSocket Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- Web 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 消息处理 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Redis 支持(用于集群) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- JSON 处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 工具类 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies></project>配置文件 application.yml:
server:port:8080spring:application:name: websocket-chat # Redis配置(用于集群模式)redis:host: localhost port:6379database:0timeout: 5000ms lettuce:pool:max-active:8max-idle:8min-idle:0# 自定义WebSocket配置websocket:# 端点路径endpoint: /ws # 允许的源allowed-origins:"*"# 心跳间隔(秒)heartbeat-interval:30# 消息大小限制(字节)max-message-size:65536# 缓冲区大小send-buffer-size:524288# 发送超时(毫秒)send-timeout:200003.3 核心数据结构
首先定义消息格式:
packagecom.example.websocket.model;importlombok.Data;importlombok.Builder;importlombok.NoArgsConstructor;importlombok.AllArgsConstructor;importjava.time.LocalDateTime;/** * 聊天消息实体 */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublicclassChatMessage{/** * 消息类型 */privateMessageType type;/** * 发送者ID */privateString senderId;/** * 发送者昵称 */privateString senderName;/** * 接收者ID(私聊时有效) */privateString receiverId;/** * 消息内容 */privateString content;/** * 发送时间 */privateLocalDateTime timestamp;/** * 消息ID */privateString messageId;/** * 消息类型枚举 */publicenumMessageType{CHAT,// 聊天消息JOIN,// 加入房间LEAVE,// 离开房间SYSTEM,// 系统消息HEARTBEAT,// 心跳ONLINE_USERS,// 在线用户列表ERROR// 错误消息}}用户会话信息:
packagecom.example.websocket.model;importlombok.Data;importlombok.Builder;importorg.springframework.web.socket.WebSocketSession;importjava.time.LocalDateTime;/** * WebSocket会话信息 */@Data@BuilderpublicclassWebSocketSessionInfo{/** * 会话ID */privateString sessionId;/** * 用户ID */privateString userId;/** * 用户名 */privateString username;/** * 连接时间 */privateLocalDateTime connectTime;/** * 最后心跳时间 */privateLocalDateTime lastHeartbeatTime;/** * 客户端IP */privateString clientIp;/** * 原始会话对象(不参与序列化) */privatetransientWebSocketSession session;}3.4 配置类
WebSocket配置是核心,我们使用STOMP协议:
packagecom.example.websocket.config;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Configuration;importorg.springframework.messaging.simp.config.MessageBrokerRegistry;importorg.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;importorg.springframework.web.socket.config.annotation.StompEndpointRegistry;importorg.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;importorg.springframework.web.socket.config.annotation.WebSocketTransportRegistration;/** * WebSocket配置类 */@Configuration@EnableWebSocketMessageBrokerpublicclassWebSocketConfigimplementsWebSocketMessageBrokerConfigurer{@Value("${websocket.endpoint:/ws}")privateString endpoint;@Value("${websocket.allowed-origins:*}")privateString allowedOrigins;@Value("${websocket.max-message-size:65536}")privateint maxMessageSize;@Value("${websocket.send-buffer-size:524288}")privateint sendBufferSize;@Value("${websocket.send-timeout:20000}")privatelong sendTimeout;/** * 配置消息代理 * 作用:设置消息的前缀路由 */@OverridepublicvoidconfigureMessageBroker(MessageBrokerRegistry registry){// 启用简单消息代理,这些前缀的消息会广播给订阅者 registry.enableSimpleBroker("/topic","/queue","/user");// 设置应用目的地前缀// 客户端发送消息到服务器的目的地需要以/app开头 registry.setApplicationDestinationPrefixes("/app");// 设置用户目的地前缀,用于点对点消息 registry.setUserDestinationPrefix("/user");}/** * 注册STOMP端点 * 作用:客户端建立连接的入口 */@OverridepublicvoidregisterStompEndpoints(StompEndpointRegistry registry){// 注册WebSocket端点 registry.addEndpoint(endpoint).setAllowedOriginPatterns(allowedOrigins).withSockJS();// 启用SockJS降级选项// 注册原生WebSocket端点(不降级) registry.addEndpoint(endpoint +"-raw").setAllowedOriginPatterns(allowedOrigins);}/** * 配置WebSocket传输参数 * 作用:设置消息大小限制、超时等 */@OverridepublicvoidconfigureWebSocketTransport(WebSocketTransportRegistration registration){ registration.setMessageSizeLimit(maxMessageSize).setSendBufferSizeLimit(sendBufferSize).setSendTimeLimit(sendTimeout);}}为什么需要STOMP?
STOMP(Simple Text Oriented Messaging Protocol)是WebSocket的子协议,它提供了:
- 消息格式规范:定义消息头、消息体
- 目的地概念:类似于消息队列的Topic/Queue
- 订阅机制:客户端可以订阅感兴趣的消息
- 易于扩展:支持事务、认证等高级特性
3.5 消息处理器
创建消息处理控制器:
packagecom.example.websocket.controller;importcom.example.websocket.model.ChatMessage;importcom.example.websocket.service.UserSessionService;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.messaging.handler.annotation.*;importorg.springframework.messaging.simp.SimpMessageHeaderAccessor;importorg.springframework.messaging.simp.SimpMessagingTemplate;importorg.springframework.stereotype.Controller;importjava.time.LocalDateTime;importjava.util.Map;importjava.util.UUID;/** * WebSocket消息控制器 */@Slf4j@Controller@RequiredArgsConstructorpublicclassChatController{privatefinalSimpMessagingTemplate messagingTemplate;privatefinalUserSessionService sessionService;/** * 处理聊天消息 * 监听路径:/app/chat.send */@MessageMapping("/chat.send")@SendTo("/topic/public")publicChatMessagesendPublicMessage(@PayloadChatMessage message,SimpMessageHeaderAccessor headerAccessor){ log.info("收到公共聊天消息: {} -> {}", message.getSenderName(), message.getContent());// 补充消息信息 message.setMessageId(UUID.randomUUID().toString()); message.setTimestamp(LocalDateTime.now()); message.setType(ChatMessage.MessageType.CHAT);return message;}/** * 处理私聊消息 * 监听路径:/app/chat.private */@MessageMapping("/chat.private")publicvoidsendPrivateMessage(@PayloadChatMessage message,SimpMessageHeaderAccessor headerAccessor){ log.info("处理私聊消息: {} -> {}: {}", message.getSenderName(), message.getReceiverId(), message.getContent());// 补充消息信息 message.setMessageId(UUID.randomUUID().toString()); message.setTimestamp(LocalDateTime.now()); message.setType(ChatMessage.MessageType.CHAT);// 发送给指定用户// 客户端订阅路径:/user/queue/private messagingTemplate.convertAndSendToUser( message.getReceiverId(),"/queue/private", message );// 同时也发给发送者(回执) messagingTemplate.convertAndSendToUser( message.getSenderId(),"/queue/private", message );}/** * 用户加入房间 * 监听路径:/app/chat.join */@MessageMapping("/chat.join")@SendTo("/topic/public")publicChatMessageuserJoin(@PayloadChatMessage message,SimpMessageHeaderAccessor headerAccessor){// 将用户信息存入sessionString sessionId = headerAccessor.getSessionId(); headerAccessor.getSessionAttributes().put("username", message.getSenderName()); headerAccessor.getSessionAttributes().put("userId", message.getSenderId());// 注册用户会话 sessionService.registerSession( message.getSenderId(), message.getSenderName(), sessionId, headerAccessor.getSession()); log.info("用户加入: {} ({})", message.getSenderName(), message.getSenderId());// 构建系统消息ChatMessage joinMessage =ChatMessage.builder().type(ChatMessage.MessageType.JOIN).senderId("system").senderName("系统").content(message.getSenderName()+" 加入了聊天室").timestamp(LocalDateTime.now()).messageId(UUID.randomUUID().toString()).build();return joinMessage;}/** * 处理心跳 * 监听路径:/app/heartbeat */@MessageMapping("/heartbeat")publicvoidhandleHeartbeat(@PayloadChatMessage message,SimpMessageHeaderAccessor headerAccessor){String sessionId = headerAccessor.getSessionId(); sessionService.updateHeartbeat(sessionId); log.debug("收到心跳: {} - {}", sessionId, message.getSenderId());}/** * 获取在线用户列表 * 监听路径:/app/users.online */@MessageMapping("/users.online")publicvoidgetOnlineUsers(SimpMessageHeaderAccessor headerAccessor){String sessionId = headerAccessor.getSessionId();// 构建在线用户列表消息ChatMessage message =ChatMessage.builder().type(ChatMessage.MessageType.ONLINE_USERS).senderId("system").senderName("系统").content("在线用户列表").timestamp(LocalDateTime.now()).messageId(UUID.randomUUID().toString()).build();// 发送给请求者 messagingTemplate.convertAndSendToUser( sessionId,"/queue/users",Map.of("type","ONLINE_USERS","users", sessionService.getAllOnlineUsers()));}/** * 处理异常 */@MessageExceptionHandler@SendTo("/queue/errors")publicChatMessagehandleException(Exception e){ log.error("处理消息时发生异常", e);returnChatMessage.builder().type(ChatMessage.MessageType.ERROR).senderId("system").senderName("系统").content("消息处理失败: "+ e.getMessage()).timestamp(LocalDateTime.now()).messageId(UUID.randomUUID().toString()).build();}}3.6 会话管理服务
管理所有WebSocket连接:
packagecom.example.websocket.service;importcom.example.websocket.model.WebSocketSessionInfo;importcom.example.websocket.model.ChatMessage;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.messaging.simp.SimpMessagingTemplate;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Service;importorg.springframework.web.socket.WebSocketSession;importjava.time.LocalDateTime;importjava.time.temporal.ChronoUnit;importjava.util.List;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;importjava.util.stream.Collectors;/** * 用户会话管理服务 */@Slf4j@ServicepublicclassUserSessionService{// 存储所有在线会话privatefinalMap<String,WebSocketSessionInfo> sessions =newConcurrentHashMap<>();// 存储用户ID到会话ID的映射privatefinalMap<String,String> userToSession =newConcurrentHashMap<>();privatefinalSimpMessagingTemplate messagingTemplate;@Value("${websocket.heartbeat-interval:30}")privateint heartbeatInterval;publicUserSessionService(SimpMessagingTemplate messagingTemplate){this.messagingTemplate = messagingTemplate;}/** * 注册新会话 */publicvoidregisterSession(String userId,String username,String sessionId,WebSocketSession session){WebSocketSessionInfo sessionInfo =WebSocketSessionInfo.builder().sessionId(sessionId).userId(userId).username(username).connectTime(LocalDateTime.now()).lastHeartbeatTime(LocalDateTime.now()).clientIp(getClientIp(session)).session(session).build(); sessions.put(sessionId, sessionInfo); userToSession.put(userId, sessionId); log.info("用户会话注册成功: {} - {} - {}", userId, username, sessionId);// 广播在线用户更新broadcastOnlineUsers();}/** * 移除会话 */publicvoidremoveSession(String sessionId){WebSocketSessionInfo sessionInfo = sessions.remove(sessionId);if(sessionInfo !=null){ userToSession.remove(sessionInfo.getUserId()); log.info("用户会话移除: {} - {}", sessionInfo.getUserId(), sessionInfo.getUsername());// 广播用户离开消息ChatMessage leaveMessage =ChatMessage.builder().type(ChatMessage.MessageType.LEAVE).senderId("system").senderName("系统").content(sessionInfo.getUsername()+" 离开了聊天室").timestamp(LocalDateTime.now()).build(); messagingTemplate.convertAndSend("/topic/public", leaveMessage);// 广播在线用户更新broadcastOnlineUsers();}}/** * 更新心跳时间 */publicvoidupdateHeartbeat(String sessionId){WebSocketSessionInfo sessionInfo = sessions.get(sessionId);if(sessionInfo !=null){ sessionInfo.setLastHeartbeatTime(LocalDateTime.now());}}/** * 获取会话信息 */publicWebSocketSessionInfogetSession(String sessionId){return sessions.get(sessionId);}/** * 获取用户会话 */publicWebSocketSessionInfogetUserSession(String userId){String sessionId = userToSession.get(userId);return sessionId !=null? sessions.get(sessionId):null;}/** * 获取所有在线用户 */publicList<Map<String,Object>>getAllOnlineUsers(){return sessions.values().stream().map(info ->Map.of("userId", info.getUserId(),"username", info.getUsername(),"connectTime", info.getConnectTime())).collect(Collectors.toList());}/** * 获取在线用户数量 */publicintgetOnlineCount(){return sessions.size();}/** * 广播在线用户列表 */privatevoidbroadcastOnlineUsers(){ messagingTemplate.convertAndSend("/topic/users.online",Map.of("type","ONLINE_USERS","count",getOnlineCount(),"users",getAllOnlineUsers()));}/** * 获取客户端IP */privateStringgetClientIp(WebSocketSession session){try{return(String) session.getAttributes().get("clientIp");}catch(Exception e){return"unknown";}}/** * 心跳检测任务 * 每10秒执行一次,清理超时会话 */@Scheduled(fixedDelay =10000)publicvoidheartbeatCheck(){LocalDateTime now =LocalDateTime.now(); sessions.entrySet().removeIf(entry ->{WebSocketSessionInfo info = entry.getValue();long seconds =ChronoUnit.SECONDS.between(info.getLastHeartbeatTime(), now);if(seconds > heartbeatInterval *3){ log.warn("会话超时关闭: {} - {}, 最后心跳: {}", info.getUserId(), info.getUsername(), info.getLastHeartbeatTime());// 关闭会话try{if(info.getSession()!=null&& info.getSession().isOpen()){ info.getSession().close();}}catch(Exception e){ log.error("关闭会话失败", e);} userToSession.remove(info.getUserId());returntrue;}returnfalse;});}}3.7 事件监听器
监听WebSocket连接事件:
packagecom.example.websocket.listener;importcom.example.websocket.service.UserSessionService;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.context.event.EventListener;importorg.springframework.messaging.simp.stomp.StompHeaderAccessor;importorg.springframework.stereotype.Component;importorg.springframework.web.socket.messaging.SessionConnectedEvent;importorg.springframework.web.socket.messaging.SessionDisconnectEvent;importorg.springframework.web.socket.messaging.SessionSubscribeEvent;importorg.springframework.web.socket.messaging.SessionUnsubscribeEvent;/** * WebSocket事件监听器 */@Slf4j@Component@RequiredArgsConstructorpublicclassWebSocketEventListener{privatefinalUserSessionService sessionService;/** * 连接建立事件 */@EventListenerpublicvoidhandleSessionConnected(SessionConnectedEvent event){StompHeaderAccessor accessor =StompHeaderAccessor.wrap(event.getMessage());String sessionId = accessor.getSessionId(); log.info("WebSocket连接建立: {}", sessionId);}/** * 连接断开事件 */@EventListenerpublicvoidhandleSessionDisconnect(SessionDisconnectEvent event){StompHeaderAccessor accessor =StompHeaderAccessor.wrap(event.getMessage());String sessionId = accessor.getSessionId(); log.info("WebSocket连接断开: {}", sessionId);// 清理会话 sessionService.removeSession(sessionId);}/** * 订阅事件 */@EventListenerpublicvoidhandleSessionSubscribe(SessionSubscribeEvent event){StompHeaderAccessor accessor =StompHeaderAccessor.wrap(event.getMessage());String sessionId = accessor.getSessionId();String destination = accessor.getDestination(); log.info("客户端订阅: {} - {}", sessionId, destination);}/** * 取消订阅事件 */@EventListenerpublicvoidhandleSessionUnsubscribe(SessionUnsubscribeEvent event){StompHeaderAccessor accessor =StompHeaderAccessor.wrap(event.getMessage());String sessionId = accessor.getSessionId(); log.info("客户端取消订阅: {}", sessionId);}}3.8 连接拦截器
添加拦截器进行认证和日志:
packagecom.example.websocket.interceptor;importlombok.extern.slf4j.Slf4j;importorg.springframework.http.server.ServerHttpRequest;importorg.springframework.http.server.ServerHttpResponse;importorg.springframework.http.server.ServletServerHttpRequest;importorg.springframework.stereotype.Component;importorg.springframework.web.socket.WebSocketHandler;importorg.springframework.web.socket.server.HandshakeInterceptor;importjavax.servlet.http.HttpServletRequest;importjava.util.Map;/** * WebSocket握手拦截器 */@Slf4j@ComponentpublicclassWebSocketHandshakeInterceptorimplementsHandshakeInterceptor{/** * 握手前处理 */@OverridepublicbooleanbeforeHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Map<String,Object> attributes)throwsException{if(request instanceofServletServerHttpRequest){HttpServletRequest servletRequest =((ServletServerHttpRequest) request).getServletRequest();// 获取token参数String token = servletRequest.getParameter("token");String userId = servletRequest.getParameter("userId");// 简单的token验证if(token ==null|| token.isEmpty()){ log.warn("WebSocket握手失败: token为空");returnfalse;}// 验证token逻辑...if(!validateToken(token, userId)){ log.warn("WebSocket握手失败: token无效");returnfalse;}// 将用户信息存入attributes,后续可以在session中获取 attributes.put("userId", userId); attributes.put("token", token); attributes.put("clientIp",getClientIp(servletRequest)); log.info("WebSocket握手成功: userId={}, ip={}", userId, attributes.get("clientIp"));}returntrue;}/** * 握手后处理 */@OverridepublicvoidafterHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Exception exception){// 握手完成后的处理}/** * 简单的token验证 */privatebooleanvalidateToken(String token,String userId){// 实际项目中应该调用认证服务return token !=null&&!token.isEmpty();}/** * 获取客户端IP */privateStringgetClientIp(HttpServletRequest request){String ip = request.getHeader("X-Forwarded-For");if(ip ==null|| ip.isEmpty()||"unknown".equalsIgnoreCase(ip)){ ip = request.getHeader("Proxy-Client-IP");}if(ip ==null|| ip.isEmpty()||"unknown".equalsIgnoreCase(ip)){ ip = request.getHeader("WL-Proxy-Client-IP");}if(ip ==null|| ip.isEmpty()||"unknown".equalsIgnoreCase(ip)){ ip = request.getRemoteAddr();}return ip;}}在配置类中注册拦截器:
@OverridepublicvoidregisterStompEndpoints(StompEndpointRegistry registry){ registry.addEndpoint(endpoint).setAllowedOriginPatterns(allowedOrigins).addInterceptors(webSocketHandshakeInterceptor)// 添加拦截器.withSockJS();}3.9 前端实现
创建前端页面 chat.html:
<!DOCTYPEhtml><html><head><metacharset="UTF-8"><title>WebSocket聊天室</title><style>*{margin: 0;padding: 0;box-sizing: border-box;}body{font-family:'Microsoft YaHei', sans-serif;background:linear-gradient(135deg, #667eea 0%, #764ba2 100%);min-height: 100vh;padding: 20px;}#login-container, #chat-container{max-width: 800px;margin: 0 auto;background: white;border-radius: 10px;box-shadow: 0 15px 35px rgba(0,0,0,0.2);overflow: hidden;}#login-container{max-width: 400px;padding: 30px;}.header{background:linear-gradient(135deg, #667eea 0%, #764ba2 100%);color: white;padding: 20px;text-align: center;}.content{padding: 20px;}.form-group{margin-bottom: 20px;}.form-group label{display: block;margin-bottom: 5px;color: #333;font-weight: 500;}.form-group input{width: 100%;padding: 12px;border: 2px solid #e0e0e0;border-radius: 5px;font-size: 16px;transition: border-color 0.3s;}.form-group input:focus{outline: none;border-color: #667eea;}button{background:linear-gradient(135deg, #667eea 0%, #764ba2 100%);color: white;border: none;padding: 12px 24px;border-radius: 5px;font-size: 16px;cursor: pointer;transition: transform 0.2s;width: 100%;}button:hover{transform:translateY(-2px);}button:disabled{opacity: 0.6;cursor: not-allowed;}.chat-header{background:linear-gradient(135deg, #667eea 0%, #764ba2 100%);color: white;padding: 15px 20px;display: flex;justify-content: space-between;align-items: center;}.chat-main{display: flex;height: 500px;}.sidebar{width: 200px;background: #f8f9fa;border-right: 1px solid #e0e0e0;padding: 15px;}.sidebar h3{color: #333;margin-bottom: 15px;font-size: 16px;}.user-list{list-style: none;max-height: 400px;overflow-y: auto;}.user-list li{padding: 8px 10px;margin-bottom: 5px;background: white;border-radius: 5px;box-shadow: 0 2px 5px rgba(0,0,0,0.05);cursor: pointer;transition: all 0.3s;display: flex;align-items: center;}.user-list li:hover{background: #e3f2fd;transform:translateX(5px);}.user-list li.selected{background: #bbdefb;border-left: 3px solid #2196f3;}.user-list .status{width: 8px;height: 8px;border-radius: 50%;margin-right: 8px;}.status.online{background: #4caf50;}.message-area{flex: 1;display: flex;flex-direction: column;}.messages{flex: 1;padding: 15px;overflow-y: auto;background: #f5f5f5;}.message{margin-bottom: 15px;animation: fadeIn 0.3s;}.message.system{text-align: center;color: #999;font-style: italic;}.message.private{background: #e8f5e8;}.message-header{display: flex;justify-content: space-between;margin-bottom: 5px;font-size: 12px;}.message-sender{font-weight: bold;color: #333;}.message-time{color: #999;}.message-content{background: white;padding: 10px 15px;border-radius: 15px;display: inline-block;max-width: 70%;box-shadow: 0 2px 5px rgba(0,0,0,0.1);}.message.own .message-content{background:linear-gradient(135deg, #667eea 0%, #764ba2 100%);color: white;}.input-area{padding: 15px;background: white;border-top: 1px solid #e0e0e0;display: flex;gap: 10px;}.input-area input{flex: 1;padding: 12px;border: 2px solid #e0e0e0;border-radius: 5px;font-size: 14px;}.input-area input:focus{outline: none;border-color: #667eea;}.input-area button{width: auto;padding: 12px 30px;}@keyframes fadeIn{from{opacity: 0;transform:translateY(10px);}to{opacity: 1;transform:translateY(0);}}</style><!-- 引入SockJS和STOMP --><scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.5.1/sockjs.min.js"></script><scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script><scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.29.1/moment.min.js"></script></head><body><!-- 登录界面 --><divid="login-container"style="display: block;"><divclass="header"><h2>WebSocket聊天室</h2></div><divclass="content"><divclass="form-group"><label>用户ID</label><inputtype="text"id="userId"placeholder="请输入用户ID"value="user"+Math.floor(Math.random()*1000)></div><divclass="form-group"><label>昵称</label><inputtype="text"id="username"placeholder="请输入昵称"value="用户"+Math.floor(Math.random()*1000)></div><divclass="form-group"><label>Token</label><inputtype="text"id="token"value="test-token"></div><buttononclick="connect()">连接聊天室</button></div></div><!-- 聊天界面 --><divid="chat-container"style="display: none;"><divclass="chat-header"><div><spanid="currentUser"></span><spanstyle="margin-left: 15px;font-size: 14px;">在线人数: <spanid="onlineCount">0</span></span></div><buttononclick="disconnect()"style="width: auto;padding: 8px 20px;">断开连接</button></div><divclass="chat-main"><!-- 用户列表侧边栏 --><divclass="sidebar"><h3>在线用户</h3><ulclass="user-list"id="userList"></ul></div><!-- 消息区域 --><divclass="message-area"><divclass="messages"id="messages"></div><divclass="input-area"><inputtype="text"id="messageInput"placeholder="输入消息... 私聊请先点击用户"onkeypress="if(event.keyCode==13) sendMessage()"><buttononclick="sendMessage()">发送</button></div></div></div></div><script>// 全局变量let stompClient =null;let currentUser =null;let selectedUserId =null;let heartbeatInterval =null;// 连接WebSocketfunctionconnect(){const userId = document.getElementById('userId').value.trim();const username = document.getElementById('username').value.trim();const token = document.getElementById('token').value.trim();if(!userId ||!username ||!token){alert('请填写完整信息');return;} currentUser ={ userId, username, token };// 创建SockJS连接const socket =newSockJS('http://localhost:8080/ws'); stompClient = Stomp.over(socket);// 配置STOMP客户端 stompClient.connect({token: token,userId: userId },// 连接成功回调function(frame){ console.log('连接成功:', frame);// 隐藏登录框,显示聊天界面 document.getElementById('login-container').style.display ='none'; document.getElementById('chat-container').style.display ='block'; document.getElementById('currentUser').textContent =`当前用户: ${username}`;// 订阅公共频道 stompClient.subscribe('/topic/public',function(message){displayMessage(JSON.parse(message.body));});// 订阅私聊频道 stompClient.subscribe('/user/queue/private',function(message){displayMessage(JSON.parse(message.body));});// 订阅在线用户更新 stompClient.subscribe('/topic/users.online',function(message){updateUserList(JSON.parse(message.body));});// 订阅错误频道 stompClient.subscribe('/queue/errors',function(message){ console.error('收到错误:', message.body);alert('系统错误: '+ message.body);});// 发送加入消息 stompClient.send('/app/chat.join',{},JSON.stringify({senderId: userId,senderName: username,content: username +' 加入了聊天室',type:'JOIN'}));// 开始心跳startHeartbeat();// 显示系统消息displaySystemMessage('连接成功!');// 请求在线用户列表 stompClient.send('/app/users.online',{});},// 连接失败回调function(error){ console.error('连接失败:', error);alert('连接失败,请重试');});}// 断开连接functiondisconnect(){if(stompClient !==null){// 发送离开消息 stompClient.send('/app/chat.leave',{},JSON.stringify({senderId: currentUser.userId,senderName: currentUser.username })); stompClient.disconnect();stopHeartbeat(); console.log('断开连接');// 显示登录界面 document.getElementById('login-container').style.display ='block'; document.getElementById('chat-container').style.display ='none';// 清空消息 document.getElementById('messages').innerHTML =''; document.getElementById('userList').innerHTML =''; stompClient =null;}}// 发送消息functionsendMessage(){const input = document.getElementById('messageInput');const content = input.value.trim();if(!content ||!stompClient)return;if(selectedUserId){// 发送私聊消息 stompClient.send('/app/chat.private',{},JSON.stringify({senderId: currentUser.userId,senderName: currentUser.username,receiverId: selectedUserId,content: content,type:'CHAT'}));// 在本地显示消息displayMessage({senderId: currentUser.userId,senderName: currentUser.username +' (私发)',receiverId: selectedUserId,content: content,timestamp:newDate().toISOString(),type:'CHAT'});}else{// 发送公共消息 stompClient.send('/app/chat.send',{},JSON.stringify({senderId: currentUser.userId,senderName: currentUser.username,content: content,type:'CHAT'}));} input.value ='';}// 显示消息functiondisplayMessage(message){const messagesDiv = document.getElementById('messages');const isOwn = message.senderId === currentUser.userId;const messageDiv = document.createElement('div'); messageDiv.className =`message ${message.type ==='SYSTEM'?'system':''}${message.receiverId ?'private':''}${isOwn ?'own':''}`;const time = message.timestamp ?moment(message.timestamp).format('HH:mm:ss'):moment().format('HH:mm:ss');let html ='';if(message.type ==='SYSTEM'){ html =`<div>[${time}] ${message.content}</div>`;}elseif(message.type ==='JOIN'|| message.type ==='LEAVE'){ html =`<div>[${time}] ${message.content}</div>`;}else{ html =` <div> <span>${message.senderName}</span> <span>${time}</span> </div> <div>${message.content}</div> `;if(message.receiverId){ html =` <div> <span>${message.senderName} → ${message.receiverId === currentUser.userId ?'我':'私聊'}</span> <span>${time}</span> </div> <div>${message.content}</div> `;}} messageDiv.innerHTML = html; messagesDiv.appendChild(messageDiv); messagesDiv.scrollTop = messagesDiv.scrollHeight;}// 显示系统消息functiondisplaySystemMessage(content){displayMessage({type:'SYSTEM',content: content,timestamp:newDate().toISOString()});}// 更新用户列表functionupdateUserList(data){const userList = document.getElementById('userList');const onlineCount = document.getElementById('onlineCount'); onlineCount.textContent = data.count ||0;let html =''; data.users.forEach(user=>{if(user.userId !== currentUser.userId){ html +=` <li onclick="selectUser('${user.userId}', '${user.username}')"token interpolation">${selectedUserId === user.userId ?'selected':''}"> <span></span> ${user.username} </li> `;}}); userList.innerHTML = html;}// 选择私聊用户functionselectUser(userId, username){if(selectedUserId === userId){ selectedUserId =null; document.getElementById('messageInput').placeholder ='输入消息...';}else{ selectedUserId = userId; document.getElementById('messageInput').placeholder =`私聊 ${username}...`;}// 更新选中样式const items = document.querySelectorAll('#userList li'); items.forEach(item=>{ item.classList.remove('selected');});if(selectedUserId){ event.target.classList.add('selected');}}// 开始心跳functionstartHeartbeat(){ heartbeatInterval =setInterval(()=>{if(stompClient && stompClient.connected){ stompClient.send('/app/heartbeat',{},JSON.stringify({senderId: currentUser.userId,type:'HEARTBEAT'}));}},30000);// 30秒发送一次心跳}// 停止心跳functionstopHeartbeat(){if(heartbeatInterval){clearInterval(heartbeatInterval); heartbeatInterval =null;}}// 页面卸载前断开连接 window.addEventListener('beforeunload',function(){if(stompClient && stompClient.connected){ stompClient.send('/app/chat.leave',{},JSON.stringify({senderId: currentUser.userId,senderName: currentUser.username }));}});</script></body></html>3.10 运行测试
启动步骤:
- 启动Spring Boot应用
- 打开浏览器访问
http://localhost:8080/chat.html - 打开多个标签页模拟多用户
- 测试功能:
- 用户连接/断开
- 群聊消息
- 私聊消息
- 在线用户列表
- 心跳检测
四、高级特性
4.1 集群部署
当应用需要水平扩展时,WebSocket的集群部署是个挑战。解决方案是使用消息中间件共享会话:
共享存储
消息中间件
服务器集群
客户端
用户1
负载均衡
用户2
用户3
Server1
WebSocket
Server2
WebSocket
Server3
WebSocket
RabbitMQ/Redis
Redis
使用Redis实现集群:
packagecom.example.websocket.config;importorg.springframework.context.annotation.Configuration;importorg.springframework.messaging.simp.config.MessageBrokerRegistry;importorg.springframework.web.socket.config.annotation.StompEndpointRegistry;importorg.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;@ConfigurationpublicclassClusterWebSocketConfigimplementsWebSocketMessageBrokerConfigurer{@OverridepublicvoidconfigureMessageBroker(MessageBrokerRegistry registry){// 使用Redis作为消息代理 registry.enableStompBrokerRelay("/topic","/queue").setRelayHost("localhost").setRelayPort(61613).setClientLogin("guest").setClientPasscode("guest").setSystemLogin("guest").setSystemPasscode("guest").setSystemHeartbeatSendInterval(10000).setSystemHeartbeatReceiveInterval(10000); registry.setApplicationDestinationPrefixes("/app"); registry.setUserDestinationPrefix("/user");}}4.2 安全性
添加Spring Security支持:
packagecom.example.websocket.config;importorg.springframework.context.annotation.Configuration;importorg.springframework.messaging.simp.config.ChannelRegistration;importorg.springframework.messaging.simp.config.MessageBrokerRegistry;importorg.springframework.security.config.annotation.web.messaging.MessageSecurityMetadataSourceRegistry;importorg.springframework.security.config.annotation.web.socket.AbstractSecurityWebSocketMessageBrokerConfigurer;@ConfigurationpublicclassWebSocketSecurityConfigextendsAbstractSecurityWebSocketMessageBrokerConfigurer{@OverrideprotectedvoidconfigureInbound(MessageSecurityMetadataSourceRegistry messages){ messages // 任何人都可以建立连接.simpTypeMatchers(SimpMessageType.CONNECT).permitAll()// 断开连接不需要认证.simpTypeMatchers(SimpMessageType.DISCONNECT).permitAll()// 订阅需要认证.simpDestMatchers("/topic/**","/queue/**").authenticated()// 发送消息需要认证.simpDestMatchers("/app/**").authenticated()// 其他全部拒绝.anyMessage().denyAll();}@OverrideprotectedbooleansameOriginDisabled(){// 允许跨域returntrue;}}4.3 性能监控
添加指标收集:
packagecom.example.websocket.service;importio.micrometer.core.instrument.MeterRegistry;importio.micrometer.core.instrument.Counter;importio.micrometer.core.instrument.Gauge;importorg.springframework.stereotype.Service;importjavax.annotation.PostConstruct;importjava.util.concurrent.atomic.AtomicInteger;@ServicepublicclassWebSocketMetricsService{privatefinalMeterRegistry meterRegistry;privatefinalUserSessionService sessionService;privateCounter messageCounter;privateCounter connectionCounter;privateAtomicInteger activeConnections;publicWebSocketMetricsService(MeterRegistry meterRegistry,UserSessionService sessionService){this.meterRegistry = meterRegistry;this.sessionService = sessionService;}@PostConstructpublicvoidinit(){// 消息计数 messageCounter =Counter.builder("websocket.messages").description("WebSocket消息总数").register(meterRegistry);// 连接计数 connectionCounter =Counter.builder("websocket.connections").description("WebSocket连接总数").register(meterRegistry);// 活跃连接数 activeConnections = meterRegistry.gauge("websocket.active.connections",newAtomicInteger(0));}publicvoidincrementMessageCount(){ messageCounter.increment();}publicvoidincrementConnectionCount(){ connectionCounter.increment(); activeConnections.incrementAndGet();}publicvoiddecrementConnectionCount(){ activeConnections.decrementAndGet();}}五、常见问题及解决方案
5.1 连接超时问题
现象:连接经常自动断开
解决方案:合理设置心跳间隔
// 前端心跳setInterval(()=>{if(stompClient && stompClient.connected){ stompClient.send('/app/heartbeat',{},'ping');}},25000);// 25秒发送一次// 后端心跳检测@Scheduled(fixedDelay =30000)publicvoidcheckHeartbeats(){LocalDateTime now =LocalDateTime.now(); sessions.entrySet().removeIf(entry ->{WebSocketSessionInfo info = entry.getValue();if(info.getLastHeartbeatTime().plusSeconds(90).isBefore(now)){try{ info.getSession().close();}catch(IOException e){ log.error("关闭超时会话失败", e);}returntrue;}returnfalse;});}5.2 内存泄漏
现象:随着运行时间增长,内存占用越来越大
解决方案:确保及时清理失效会话
@ComponentpublicclassSessionCleanupTask{@Scheduled(fixedDelay =60000)// 每分钟执行一次publicvoidcleanupStaleSessions(){ log.info("开始清理失效会话,当前会话数: {}", sessions.size()); sessions.entrySet().removeIf(entry ->{WebSocketSession session = entry.getValue();if(!session.isOpen()){ log.info("清理已关闭会话: {}", entry.getKey());returntrue;}returnfalse;}); log.info("清理完成,当前会话数: {}", sessions.size());}}5.3 消息丢失
现象:客户端收不到某些消息
解决方案:实现消息确认机制
@MessageMapping("/chat.send")publicvoidsendMessageWithAck(ChatMessage message,SimpMessageHeaderAccessor headerAccessor){try{// 发送消息 messagingTemplate.convertAndSend("/topic/public", message);// 发送确认 messagingTemplate.convertAndSendToUser( headerAccessor.getSessionId(),"/queue/ack",Map.of("messageId", message.getMessageId(),"status","DELIVERED"));}catch(Exception e){// 发送失败通知 messagingTemplate.convertAndSendToUser( headerAccessor.getSessionId(),"/queue/error",Map.of("messageId", message.getMessageId(),"error", e.getMessage()));}}六、总结
6.1 WebSocket的应用场景
- 实时聊天:IM、客服系统
- 实时通知:系统告警、消息推送
- 实时数据:股票行情、游戏状态
- 协同编辑:在线文档、白板
- 物联网:设备状态监控
6.2 技术选型建议
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 简单应用 | 原生WebSocket | 轻量、灵活 |
| 企业级应用 | STOMP + Spring | 功能完善、易于集成 |
| 高并发场景 | Netty | 性能优异 |
| 需要降级支持 | SockJS | 兼容性好 |
6.3 最佳实践
- 连接管理:统一管理会话,及时清理失效连接
- 心跳机制:保持连接活跃,检测断连
- 消息压缩:大消息启用压缩
- 流量控制:限制消息大小和频率
- 监控告警:实时监控连接数和消息量
- 优雅关闭:确保资源正确释放
如果觉得文章对你有帮助,欢迎点赞、收藏、关注!有问题可以在评论区交流。
🌺The End🌺点点关注,收藏不迷路🌺 |