From a6baa9f8aed43997084f6fbac88a3a0d86103feb Mon Sep 17 00:00:00 2001 From: zch Date: Mon, 9 Dec 2024 17:37:56 +0800 Subject: [PATCH] =?UTF-8?q?add(framework):=20=E6=B7=BB=E5=8A=A0=20WebSocke?= =?UTF-8?q?t=20=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 pom.xml 中添加 Spring Boot WebSocket 依赖 - 新增 SemaphoreUtils 类用于信号量处理 - 添加 WebSocketConfig 配置类 - 实现 WebSocketServer 类处理 WebSocket 消息 - 创建 WebSocketUsers 类管理客户端用户 --- ruoyi-framework/pom.xml | 9 +- .../framework/websocket/SemaphoreUtils.java | 58 ++++++++ .../framework/websocket/WebSocketConfig.java | 20 +++ .../framework/websocket/WebSocketServer.java | 106 +++++++++++++ .../framework/websocket/WebSocketUsers.java | 140 ++++++++++++++++++ 5 files changed, 332 insertions(+), 1 deletion(-) create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/SemaphoreUtils.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketConfig.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketServer.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketUsers.java diff --git a/ruoyi-framework/pom.xml b/ruoyi-framework/pom.xml index c4ba93b..848b053 100644 --- a/ruoyi-framework/pom.xml +++ b/ruoyi-framework/pom.xml @@ -59,6 +59,13 @@ ruoyi-system + + + org.springframework.boot + spring-boot-starter-websocket + + + - \ No newline at end of file + diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/SemaphoreUtils.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/SemaphoreUtils.java new file mode 100644 index 0000000..57567f3 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/SemaphoreUtils.java @@ -0,0 +1,58 @@ +package com.ruoyi.framework.websocket; + +import java.util.concurrent.Semaphore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 信号量相关处理 + * + * @author ruoyi + */ +public class SemaphoreUtils +{ + /** + * SemaphoreUtils 日志控制器 + */ + private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class); + + /** + * 获取信号量 + * + * @param semaphore + * @return + */ + public static boolean tryAcquire(Semaphore semaphore) + { + boolean flag = false; + + try + { + flag = semaphore.tryAcquire(); + } + catch (Exception e) + { + LOGGER.error("获取信号量异常", e); + } + + return flag; + } + + /** + * 释放信号量 + * + * @param semaphore + */ + public static void release(Semaphore semaphore) + { + + try + { + semaphore.release(); + } + catch (Exception e) + { + LOGGER.error("释放信号量异常", e); + } + } +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketConfig.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketConfig.java new file mode 100644 index 0000000..be63f35 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketConfig.java @@ -0,0 +1,20 @@ +package com.ruoyi.framework.websocket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +/** + * websocket 配置 + * + * @author ruoyi + */ +@Configuration +public class WebSocketConfig +{ + @Bean + public ServerEndpointExporter serverEndpointExporter() + { + return new ServerEndpointExporter(); + } +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketServer.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketServer.java new file mode 100644 index 0000000..4ce319c --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketServer.java @@ -0,0 +1,106 @@ +package com.ruoyi.framework.websocket; + +import java.util.concurrent.Semaphore; +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.ServerEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * websocket 消息处理 + * + * @author ruoyi + */ +@Component +@ServerEndpoint("/websocket/message") +public class WebSocketServer +{ + /** + * WebSocketServer 日志控制器 + */ + private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class); + + /** + * 默认最多允许同时在线人数100 + */ + public static int socketMaxOnlineCount = 100; + + private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount); + + /** + * 连接建立成功调用的方法 + */ + @OnOpen + public void onOpen(Session session) throws Exception + { + boolean semaphoreFlag = false; + // 尝试获取信号量 + semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore); + if (!semaphoreFlag) + { + // 未获取到信号量 + LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount); + WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount); + session.close(); + } + else + { + // 添加用户 + WebSocketUsers.put(session.getId(), session); + LOGGER.info("\n 建立连接 - {}", session); + LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size()); + WebSocketUsers.sendMessageToUserByText(session, "连接成功"); + } + } + + /** + * 连接关闭时处理 + */ + @OnClose + public void onClose(Session session) + { + LOGGER.info("\n 关闭连接 - {}", session); + // 移除用户 + boolean removeFlag = WebSocketUsers.remove(session.getId()); + if (!removeFlag) + { + // 获取到信号量则需释放 + SemaphoreUtils.release(socketSemaphore); + } + } + + /** + * 抛出异常时处理 + */ + @OnError + public void onError(Session session, Throwable exception) throws Exception + { + if (session.isOpen()) + { + // 关闭连接 + session.close(); + } + String sessionId = session.getId(); + LOGGER.info("\n 连接异常 - {}", sessionId); + LOGGER.info("\n 异常信息 - {}", exception); + // 移出用户 + WebSocketUsers.remove(sessionId); + // 获取到信号量则需释放 + SemaphoreUtils.release(socketSemaphore); + } + + /** + * 服务器接收到客户端消息时调用的方法 + */ + @OnMessage + public void onMessage(String message, Session session) + { + String msg = message.replace("你", "我").replace("吗", ""); + WebSocketUsers.sendMessageToUserByText(session, msg); + } +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketUsers.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketUsers.java new file mode 100644 index 0000000..81e549e --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketUsers.java @@ -0,0 +1,140 @@ +package com.ruoyi.framework.websocket; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import javax.websocket.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * websocket 客户端用户集 + * + * @author ruoyi + */ +public class WebSocketUsers +{ + /** + * WebSocketUsers 日志控制器 + */ + private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class); + + /** + * 用户集 + */ + private static Map USERS = new ConcurrentHashMap(); + + /** + * 存储用户 + * + * @param key 唯一键 + * @param session 用户信息 + */ + public static void put(String key, Session session) + { + USERS.put(key, session); + } + + /** + * 移除用户 + * + * @param session 用户信息 + * + * @return 移除结果 + */ + public static boolean remove(Session session) + { + String key = null; + boolean flag = USERS.containsValue(session); + if (flag) + { + Set> entries = USERS.entrySet(); + for (Map.Entry entry : entries) + { + Session value = entry.getValue(); + if (value.equals(session)) + { + key = entry.getKey(); + break; + } + } + } + else + { + return true; + } + return remove(key); + } + + /** + * 移出用户 + * + * @param key 键 + */ + public static boolean remove(String key) + { + LOGGER.info("\n 正在移出用户 - {}", key); + Session remove = USERS.remove(key); + if (remove != null) + { + boolean containsValue = USERS.containsValue(remove); + LOGGER.info("\n 移出结果 - {}", containsValue ? "失败" : "成功"); + return containsValue; + } + else + { + return true; + } + } + + /** + * 获取在线用户列表 + * + * @return 返回用户集合 + */ + public static Map getUsers() + { + return USERS; + } + + /** + * 群发消息文本消息 + * + * @param message 消息内容 + */ + public static void sendMessageToUsersByText(String message) + { + Collection values = USERS.values(); + for (Session value : values) + { + sendMessageToUserByText(value, message); + } + } + + /** + * 发送文本消息 + * + * @param userName 自己的用户名 + * @param message 消息内容 + */ + public static void sendMessageToUserByText(Session session, String message) + { + if (session != null) + { + try + { + session.getBasicRemote().sendText(message); + } + catch (IOException e) + { + LOGGER.error("\n[发送消息异常]", e); + } + } + else + { + LOGGER.info("\n[你已离线]"); + } + } +}