diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml index 270ad4dc..c0bb7442 100644 --- a/ruoyi-common/pom.xml +++ b/ruoyi-common/pom.xml @@ -39,6 +39,7 @@ ruoyi-common-json ruoyi-common-encrypt ruoyi-common-tenant + ruoyi-common-websocket ruoyi-common diff --git a/ruoyi-common/ruoyi-common-bom/pom.xml b/ruoyi-common/ruoyi-common-bom/pom.xml index 43901e3d..94206101 100644 --- a/ruoyi-common/ruoyi-common-bom/pom.xml +++ b/ruoyi-common/ruoyi-common-bom/pom.xml @@ -199,6 +199,12 @@ ${revision} + + org.dromara + ruoyi-common-websocket + ${revision} + + diff --git a/ruoyi-common/ruoyi-common-websocket/pom.xml b/ruoyi-common/ruoyi-common-websocket/pom.xml new file mode 100644 index 00000000..5b9fb379 --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/pom.xml @@ -0,0 +1,41 @@ + + + + org.dromara + ruoyi-common + ${revision} + ../pom.xml + + 4.0.0 + + ruoyi-common-websocket + + + ruoyi-common-websocket 模块 + + + + + org.dromara + ruoyi-common-core + + + org.dromara + ruoyi-common-redis + + + org.dromara + ruoyi-common-satoken + + + org.dromara + ruoyi-common-json + + + org.springframework.boot + spring-boot-starter-websocket + + + diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java new file mode 100644 index 00000000..30d109e2 --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java @@ -0,0 +1,60 @@ +package org.dromara.common.websocket.config; + +import cn.hutool.core.util.StrUtil; +import org.dromara.common.websocket.config.properties.WebSocketProperties; +import org.dromara.common.websocket.handler.PlusWebSocketHandler; +import org.dromara.common.websocket.interceptor.PlusWebSocketInterceptor; +import org.dromara.common.websocket.listener.WebSocketTopicListener; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.server.HandshakeInterceptor; + +/** + * WebSocket 配置 + * + * @author zendwang + */ +@AutoConfiguration +@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true") +@EnableConfigurationProperties(WebSocketProperties.class) +@EnableWebSocket +public class WebSocketConfig { + + @Bean + public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, + WebSocketHandler webSocketHandler, + WebSocketProperties webSocketProperties) { + if (StrUtil.isBlank(webSocketProperties.getPath())) { + webSocketProperties.setPath("/websocket"); + } + + if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) { + webSocketProperties.setAllowedOrigins("*"); + } + + return registry -> registry + .addHandler(webSocketHandler, webSocketProperties.getPath()) + .addInterceptors(handshakeInterceptor) + .setAllowedOrigins(webSocketProperties.getAllowedOrigins()); + } + + @Bean + public HandshakeInterceptor handshakeInterceptor() { + return new PlusWebSocketInterceptor(); + } + + @Bean + public WebSocketHandler webSocketHandler() { + return new PlusWebSocketHandler(); + } + + @Bean + public WebSocketTopicListener topicListener() { + return new WebSocketTopicListener(); + } +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java new file mode 100644 index 00000000..d629fe55 --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java @@ -0,0 +1,26 @@ +package org.dromara.common.websocket.config.properties; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * WebSocket 配置项 + * + * @author zendwang + */ +@ConfigurationProperties("websocket") +@Data +public class WebSocketProperties { + + private Boolean enabled; + + /** + * 路径 + */ + private String path; + + /** + * 设置访问源地址 + */ + private String allowedOrigins; +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java new file mode 100644 index 00000000..54eb4470 --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java @@ -0,0 +1,28 @@ +package org.dromara.common.websocket.constant; + +/** + * websocket的常量配置 + * + * @author zendwang + */ +public interface WebSocketConstants { + /** + * websocketSession中的参数的key + */ + String LOGIN_USER_KEY = "loginUser"; + + /** + * 订阅的频道 + */ + String WEB_SOCKET_TOPIC = "global:websocket"; + + /** + * 前端心跳检查的命令 + */ + String PING = "ping"; + + /** + * 服务端心跳恢复的字符串 + */ + String PONG = "pong"; +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDto.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDto.java new file mode 100644 index 00000000..e2d4456a --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDto.java @@ -0,0 +1,29 @@ +package org.dromara.common.websocket.dto; + +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; +import java.util.List; + +/** + * 消息的dto + * + * @author zendwang + */ +@Data +public class WebSocketMessageDto implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * 需要推送到的session key 列表 + */ + private List sessionKeys; + + /** + * 需要发送的消息 + */ + private String message; +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java new file mode 100644 index 00000000..00776cc6 --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java @@ -0,0 +1,101 @@ +package org.dromara.common.websocket.handler; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.websocket.dto.WebSocketMessageDto; +import org.dromara.common.websocket.holder.WebSocketSessionHolder; +import org.dromara.common.websocket.utils.WebSocketUtils; +import org.dromara.system.api.model.LoginUser; +import org.springframework.web.socket.*; +import org.springframework.web.socket.handler.AbstractWebSocketHandler; + +import java.util.List; + +import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; + +/** + * WebSocketHandler 实现类 + * + * @author zendwang + */ +@Slf4j +public class PlusWebSocketHandler extends AbstractWebSocketHandler { + + /** + * 连接成功后 + */ + @Override + public void afterConnectionEstablished(WebSocketSession session) { + LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); + WebSocketSessionHolder.addSession(loginUser.getUserId(), session); + log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); + } + + /** + * 处理发送来的文本消息 + * + * @param session + * @param message + * @throws Exception + */ + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); + log.info("PlusWebSocketHandler, 连接:" + session.getId() + ",已收到消息:" + message.getPayload()); + List userIds = List.of(loginUser.getUserId()); + WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto(); + webSocketMessageDto.setSessionKeys(userIds); + webSocketMessageDto.setMessage(message.getPayload()); + WebSocketUtils.publishMessage(webSocketMessageDto); + } + + @Override + protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { + super.handleBinaryMessage(session, message); + } + + /** + * 心跳监测的回复 + * + * @param session + * @param message + * @throws Exception + */ + @Override + protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { + WebSocketUtils.sendPongMessage(session); + } + + /** + * 连接出错时 + * + * @param session + * @param exception + * @throws Exception + */ + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage()); + } + + /** + * 连接关闭后 + * + * @param session + * @param status + */ + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { + LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); + WebSocketSessionHolder.removeSession(loginUser.getUserId()); + log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); + } + + /** + * 是否支持分片消息 + */ + @Override + public boolean supportsPartialMessages() { + return false; + } + +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java new file mode 100644 index 00000000..9b931938 --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java @@ -0,0 +1,37 @@ +package org.dromara.common.websocket.holder; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.springframework.web.socket.WebSocketSession; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * WebSocketSession 用于保存当前所有在线的会话信息 + * + * @author zendwang + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class WebSocketSessionHolder { + + private static final Map USER_SESSION_MAP = new ConcurrentHashMap<>(); + + public static void addSession(Long sessionKey, WebSocketSession session) { + USER_SESSION_MAP.put(sessionKey, session); + } + + public static void removeSession(Long sessionKey) { + if (USER_SESSION_MAP.containsKey(sessionKey)) { + USER_SESSION_MAP.remove(sessionKey); + } + } + + public static WebSocketSession getSessions(Long sessionKey) { + return USER_SESSION_MAP.get(sessionKey); + } + + public static Boolean existSession(Long sessionKey) { + return USER_SESSION_MAP.containsKey(sessionKey); + } +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java new file mode 100644 index 00000000..fda4b59f --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java @@ -0,0 +1,51 @@ +package org.dromara.common.websocket.interceptor; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.satoken.utils.LoginHelper; +import org.dromara.system.api.model.LoginUser; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.Map; + +import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; + +/** + * WebSocket握手请求的拦截器 + * + * @author zendwang + */ +@Slf4j +public class PlusWebSocketInterceptor implements HandshakeInterceptor { + + /** + * 握手前 + * + * @param request request + * @param response response + * @param wsHandler wsHandler + * @param attributes attributes + * @return 是否握手成功 + */ + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) { + LoginUser loginUser = LoginHelper.getLoginUser(); + attributes.put(LOGIN_USER_KEY, loginUser); + return true; + } + + /** + * 握手后 + * + * @param request request + * @param response response + * @param wsHandler wsHandler + * @param exception 异常 + */ + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { + + } +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java new file mode 100644 index 00000000..be385a7b --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java @@ -0,0 +1,38 @@ +package org.dromara.common.websocket.listener; + +import cn.hutool.core.collection.CollUtil; +import org.dromara.common.websocket.holder.WebSocketSessionHolder; +import org.dromara.common.websocket.utils.WebSocketUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.Ordered; + +/** + * WebSocket 主题订阅监听器 + * + * @author zendwang + */ +@Slf4j +public class WebSocketTopicListener implements ApplicationRunner, Ordered { + + @Override + public void run(ApplicationArguments args) throws Exception { + WebSocketUtils.subscribeMessage((message) -> { + log.info("WebSocket主题订阅收到消息session keys={} message={}!", message.getSessionKeys(), message.getMessage()); + if (CollUtil.isNotEmpty(message.getSessionKeys())) { + message.getSessionKeys().forEach(key -> { + if (WebSocketSessionHolder.existSession(key)) { + WebSocketUtils.sendMessage(key, message.getMessage()); + } + }); + } + }); + log.info("初始化WebSocket主题订阅监听器成功"); + } + + @Override + public int getOrder() { + return -1; + } +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java new file mode 100644 index 00000000..59f9530e --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java @@ -0,0 +1,102 @@ +package org.dromara.common.websocket.utils; + +import cn.hutool.core.collection.CollUtil; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.common.websocket.dto.WebSocketMessageDto; +import org.dromara.common.websocket.holder.WebSocketSessionHolder; +import org.dromara.system.api.model.LoginUser; +import org.springframework.web.socket.PongMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; +import static org.dromara.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC; + +/** + * 工具类 + * + * @author zendwang + */ +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class WebSocketUtils { + + /** + * 发送消息 + * + * @param sessionKey session主键 一般为用户id + * @param message 消息文本 + */ + public static void sendMessage(Long sessionKey, String message) { + WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); + sendMessage(session, message); + } + + /** + * 订阅消息 + * + * @param consumer 自定义处理 + */ + public static void subscribeMessage(Consumer consumer) { + RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer); + } + + /** + * 发布订阅的消息 + * + * @param webSocketMessage 消息对象 + */ + public static void publishMessage(WebSocketMessageDto webSocketMessage) { + List unsentSessionKeys = new ArrayList<>(); + // 当前服务内session,直接发送消息 + for (Long sessionKey : webSocketMessage.getSessionKeys()) { + if (WebSocketSessionHolder.existSession(sessionKey)) { + WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage()); + continue; + } + unsentSessionKeys.add(sessionKey); + } + // 不在当前服务内session,发布订阅消息 + if (CollUtil.isNotEmpty(unsentSessionKeys)) { + WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); + broadcastMessage.setMessage(webSocketMessage.getMessage()); + broadcastMessage.setSessionKeys(unsentSessionKeys); + RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { + log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}", + WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage()); + }); + } + } + + public static void sendPongMessage(WebSocketSession session) { + sendMessage(session, new PongMessage()); + } + + public static void sendMessage(WebSocketSession session, String message) { + sendMessage(session, new TextMessage(message)); + } + + private static void sendMessage(WebSocketSession session, WebSocketMessage message) { + if (session == null || !session.isOpen()) { + log.error("[send] session会话已经关闭"); + } else { + try { + // 获取当前会话中的用户 + LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); + session.sendMessage(message); + log.info("[send] sessionId: {},userId:{},userType:{},message:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType(), message); + } catch (IOException e) { + log.error("[send] session({}) 发送消息({}) 异常", session, message, e); + } + } + } +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..c3a7305a --- /dev/null +++ b/ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.dromara.common.websocket.config.WebSocketConfig