From 7f4445eb505bcc024d3b2e90fdc7b73d3c846305 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=96=AF=E7=8B=82=E7=9A=84=E7=8B=AE=E5=AD=90Li?= <15040126243@163.com> Date: Fri, 26 Jul 2024 17:38:00 +0800 Subject: [PATCH] =?UTF-8?q?add=20=E5=A2=9E=E5=8A=A0=20ruoyi-common-sse=20?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=20=E6=94=AF=E6=8C=81SSE=E6=8E=A8=E9=80=81=20?= =?UTF-8?q?=E6=AF=94ws=E6=9B=B4=E8=BD=BB=E9=87=8F=E6=9B=B4=E7=A8=B3?= =?UTF-8?q?=E5=AE=9A=E7=9A=84=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/nacos/ruoyi-resource.yml | 7 +- ruoyi-common/pom.xml | 1 + ruoyi-common/ruoyi-common-bom/pom.xml | 7 + ruoyi-common/ruoyi-common-satoken/pom.xml | 11 -- ruoyi-common/ruoyi-common-sse/pom.xml | 40 ++++++ .../sse/config/SseAutoConfiguration.java | 28 ++++ .../common/sse/config/SseProperties.java | 21 +++ .../common/sse/controller/SseController.java | 52 +++++++ .../common/sse/core/SseEmitterManager.java | 135 ++++++++++++++++++ .../dromara/common/sse/dto/SseMessageDto.java | 29 ++++ .../common/sse/listener/SseTopicListener.java | 48 +++++++ .../common/sse/utils/SseMessageUtils.java | 58 ++++++++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + ruoyi-modules/ruoyi-resource/pom.xml | 5 + .../dubbo/RemoteMessageServiceImpl.java | 12 +- 15 files changed, 437 insertions(+), 18 deletions(-) create mode 100644 ruoyi-common/ruoyi-common-sse/pom.xml create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports diff --git a/config/nacos/ruoyi-resource.yml b/config/nacos/ruoyi-resource.yml index 9c539725..0352230f 100644 --- a/config/nacos/ruoyi-resource.yml +++ b/config/nacos/ruoyi-resource.yml @@ -24,9 +24,14 @@ spring: # username: ${datasource.system-postgres.username} # password: ${datasource.system-postgres.password} +# 默认/推荐使用sse推送 +sse: + enabled: true + path: /sse + websocket: # 如果关闭 需要和前端开关一起关闭 - enabled: true + enabled: false # 路径 path: /websocket # 设置访问源地址 diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml index ff6f5581..48e00955 100644 --- a/ruoyi-common/pom.xml +++ b/ruoyi-common/pom.xml @@ -44,6 +44,7 @@ ruoyi-common-social ruoyi-common-nacos ruoyi-common-bus + ruoyi-common-sse ruoyi-common diff --git a/ruoyi-common/ruoyi-common-bom/pom.xml b/ruoyi-common/ruoyi-common-bom/pom.xml index dcafd753..5b4a26e7 100644 --- a/ruoyi-common/ruoyi-common-bom/pom.xml +++ b/ruoyi-common/ruoyi-common-bom/pom.xml @@ -250,6 +250,13 @@ ${revision} + + + org.dromara + ruoyi-common-sse + ${revision} + + diff --git a/ruoyi-common/ruoyi-common-satoken/pom.xml b/ruoyi-common/ruoyi-common-satoken/pom.xml index ecdb7630..5643acb8 100644 --- a/ruoyi-common/ruoyi-common-satoken/pom.xml +++ b/ruoyi-common/ruoyi-common-satoken/pom.xml @@ -27,17 +27,6 @@ cn.dev33 sa-token-jwt ${satoken.version} - - - cn.hutool - hutool-all - - - - - - cn.hutool - hutool-jwt diff --git a/ruoyi-common/ruoyi-common-sse/pom.xml b/ruoyi-common/ruoyi-common-sse/pom.xml new file mode 100644 index 00000000..88f89a14 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/pom.xml @@ -0,0 +1,40 @@ + + + + org.dromara + ruoyi-common + ${revision} + + 4.0.0 + + ruoyi-common-sse + + + ruoyi-common-sse 模块 + + + + + org.dromara + ruoyi-common-core + + + org.dromara + ruoyi-common-redis + + + org.dromara + ruoyi-common-satoken + + + org.dromara + ruoyi-common-json + + + org.springframework + spring-webmvc + + + diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java new file mode 100644 index 00000000..de5afa9a --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java @@ -0,0 +1,28 @@ +package org.dromara.common.sse.config; + +import org.dromara.common.sse.core.SseEmitterManager; +import org.dromara.common.sse.listener.SseTopicListener; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +/** + * @author Lion Li + */ +@AutoConfiguration +@ConditionalOnProperty(value = "sse.enabled", havingValue = "true") +@EnableConfigurationProperties(SseProperties.class) +public class SseAutoConfiguration { + + @Bean + public SseEmitterManager sseEmitterManager() { + return new SseEmitterManager(); + } + + @Bean + public SseTopicListener sseTopicListener() { + return new SseTopicListener(); + } + +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java new file mode 100644 index 00000000..ce4e1732 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java @@ -0,0 +1,21 @@ +package org.dromara.common.sse.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * SSE 配置项 + * + * @author Lion Li + */ +@Data +@ConfigurationProperties("sse") +public class SseProperties { + + private Boolean enabled; + + /** + * 路径 + */ + private String path; +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java new file mode 100644 index 00000000..57c7c1e8 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java @@ -0,0 +1,52 @@ +package org.dromara.common.sse.controller; + +import cn.dev33.satoken.stp.StpUtil; +import lombok.RequiredArgsConstructor; +import org.dromara.common.core.domain.R; +import org.dromara.common.satoken.utils.LoginHelper; +import org.dromara.common.sse.core.SseEmitterManager; +import org.dromara.common.sse.dto.SseMessageDto; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.List; + +@RestController +@RequiredArgsConstructor +public class SseController { + + private final SseEmitterManager sseEmitterManager; + + @GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter connect() { + String tokenValue = StpUtil.getTokenValue(); + Long userId = LoginHelper.getUserId(); + return sseEmitterManager.connect(userId, tokenValue); + } + + @GetMapping(value = "${sse.path}/close") + public R close() { + String tokenValue = StpUtil.getTokenValue(); + Long userId = LoginHelper.getUserId(); + sseEmitterManager.disconnect(userId, tokenValue); + return R.ok(); + } + + @GetMapping(value = "${sse.path}/send") + public R send(Long userId, String msg) { + SseMessageDto dto = new SseMessageDto(); + dto.setUserIds(List.of(userId)); + dto.setMessage(msg); + sseEmitterManager.publishMessage(dto); + return R.ok(); + } + + @GetMapping(value = "${sse.path}/sendAll") + public R send(String msg) { + sseEmitterManager.publishAll(msg); + return R.ok(); + } + +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java new file mode 100644 index 00000000..276df102 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -0,0 +1,135 @@ +package org.dromara.common.sse.core; + +import cn.hutool.core.collection.CollUtil; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.common.sse.dto.SseMessageDto; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +@Slf4j +public class SseEmitterManager { + /** + * 订阅的频道 + */ + private final static String SSE_TOPIC = "global:sse"; + + private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); + + public SseEmitter connect(Long userId, String token) { + Map emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); + SseEmitter emitter = new SseEmitter(0L); + + emitters.put(token, emitter); + + emitter.onCompletion(() -> emitters.remove(token)); + emitter.onTimeout(() -> emitters.remove(token)); + emitter.onError((e) -> emitters.remove(token)); + + try { + emitter.send(SseEmitter.event().comment("connected")); + } catch (IOException e) { + emitters.remove(token); + } + return emitter; + } + + public void disconnect(Long userId, String token) { + Map emitters = USER_TOKEN_EMITTERS.get(userId); + if (emitters != null) { + try { + emitters.get(token).send(SseEmitter.event().comment("disconnected")); + } catch (IOException ignore) { + } + emitters.remove(token); + } + } + + /** + * 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息 + * + * @param consumer 处理SSE消息的消费者函数 + */ + public void subscribeMessage(Consumer consumer) { + RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer); + } + + /** + * 向指定的用户会话发送消息 + * + * @param userId 要发送消息的用户id + * @param message 要发送的消息内容 + */ + public void sendMessage(Long userId, String message) { + Map emitters = USER_TOKEN_EMITTERS.get(userId); + if (emitters != null) { + for (Map.Entry entry : emitters.entrySet()) { + try { + entry.getValue().send(SseEmitter.event() + .name("message") + .reconnectTime(-1L) + .data(message)); + } catch (Exception e) { + emitters.remove(entry.getKey()); + } + } + } + } + + /** + * 本机全用户会话发送消息 + * + * @param message 要发送的消息内容 + */ + public void sendMessage(String message) { + for (Long userId : USER_TOKEN_EMITTERS.keySet()) { + sendMessage(userId, message); + } + } + + /** + * 发布SSE订阅消息 + * + * @param sseMessageDto 要发布的SSE消息对象 + */ + public void publishMessage(SseMessageDto sseMessageDto) { + List unsentUserIds = new ArrayList<>(); + // 当前服务内用户,直接发送消息 + for (Long userId : sseMessageDto.getUserIds()) { + if (USER_TOKEN_EMITTERS.containsKey(userId)) { + sendMessage(userId, sseMessageDto.getMessage()); + continue; + } + unsentUserIds.add(userId); + } + // 不在当前服务内用户,发布订阅消息 + if (CollUtil.isNotEmpty(unsentUserIds)) { + SseMessageDto broadcastMessage = new SseMessageDto(); + broadcastMessage.setMessage(sseMessageDto.getMessage()); + broadcastMessage.setUserIds(unsentUserIds); + RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { + log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", + SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage()); + }); + } + } + + /** + * 向所有的用户发布订阅的消息(群发) + * + * @param message 要发布的消息内容 + */ + public void publishAll(String message) { + SseMessageDto broadcastMessage = new SseMessageDto(); + broadcastMessage.setMessage(message); + RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { + log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message); + }); + } +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java new file mode 100644 index 00000000..a2e1210c --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java @@ -0,0 +1,29 @@ +package org.dromara.common.sse.dto; + +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; +import java.util.List; + +/** + * 消息的dto + * + * @author zendwang + */ +@Data +public class SseMessageDto implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * 需要推送到的session key 列表 + */ + private List userIds; + + /** + * 需要发送的消息 + */ + private String message; +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java new file mode 100644 index 00000000..7a4dff13 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java @@ -0,0 +1,48 @@ +package org.dromara.common.sse.listener; + +import cn.hutool.core.collection.CollUtil; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.sse.core.SseEmitterManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.Ordered; + +/** + * SSE 主题订阅监听器 + * + * @author Lion Li + */ +@Slf4j +public class SseTopicListener implements ApplicationRunner, Ordered { + + @Autowired + private SseEmitterManager sseEmitterManager; + + /** + * 在Spring Boot应用程序启动时初始化SSE主题订阅监听器 + * + * @param args 应用程序参数 + * @throws Exception 初始化过程中可能抛出的异常 + */ + @Override + public void run(ApplicationArguments args) throws Exception { + sseEmitterManager.subscribeMessage((message) -> { + log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage()); + // 如果key不为空就按照key发消息 如果为空就群发 + if (CollUtil.isNotEmpty(message.getUserIds())) { + message.getUserIds().forEach(key -> { + sseEmitterManager.sendMessage(key, message.getMessage()); + }); + } else { + sseEmitterManager.sendMessage(message.getMessage()); + } + }); + log.info("初始化SSE主题订阅监听器成功"); + } + + @Override + public int getOrder() { + return -1; + } +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java new file mode 100644 index 00000000..4334e98b --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java @@ -0,0 +1,58 @@ +package org.dromara.common.sse.utils; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.utils.SpringUtils; +import org.dromara.common.sse.core.SseEmitterManager; +import org.dromara.common.sse.dto.SseMessageDto; + +/** + * 工具类 + * + * @author Lion Li + */ +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class SseMessageUtils { + + private final static SseEmitterManager MANAGER = SpringUtils.getBean(SseEmitterManager.class); + + /** + * 向指定的WebSocket会话发送消息 + * + * @param userId 要发送消息的用户id + * @param message 要发送的消息内容 + */ + public static void sendMessage(Long userId, String message) { + MANAGER.sendMessage(userId, message); + } + + /** + * 本机全用户会话发送消息 + * + * @param message 要发送的消息内容 + */ + public static void sendMessage(String message) { + MANAGER.sendMessage(message); + } + + /** + * 发布SSE订阅消息 + * + * @param sseMessageDto 要发布的SSE消息对象 + */ + public static void publishMessage(SseMessageDto sseMessageDto) { + MANAGER.publishMessage(sseMessageDto); + } + + /** + * 向所有的用户发布订阅的消息(群发) + * + * @param message 要发布的消息内容 + */ + public static void publishAll(String message) { + MANAGER.publishAll(message); + } + +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..b8097139 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.dromara.common.sse.config.SseAutoConfiguration diff --git a/ruoyi-modules/ruoyi-resource/pom.xml b/ruoyi-modules/ruoyi-resource/pom.xml index 5e20b624..c679acd3 100644 --- a/ruoyi-modules/ruoyi-resource/pom.xml +++ b/ruoyi-modules/ruoyi-resource/pom.xml @@ -108,6 +108,11 @@ ruoyi-common-websocket + + org.dromara + ruoyi-common-sse + + org.dromara diff --git a/ruoyi-modules/ruoyi-resource/src/main/java/org/dromara/resource/dubbo/RemoteMessageServiceImpl.java b/ruoyi-modules/ruoyi-resource/src/main/java/org/dromara/resource/dubbo/RemoteMessageServiceImpl.java index f3e5e3f8..393f0f5b 100644 --- a/ruoyi-modules/ruoyi-resource/src/main/java/org/dromara/resource/dubbo/RemoteMessageServiceImpl.java +++ b/ruoyi-modules/ruoyi-resource/src/main/java/org/dromara/resource/dubbo/RemoteMessageServiceImpl.java @@ -3,8 +3,8 @@ package org.dromara.resource.dubbo; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.config.annotation.DubboService; -import org.dromara.common.websocket.dto.WebSocketMessageDto; -import org.dromara.common.websocket.utils.WebSocketUtils; +import org.dromara.common.sse.dto.SseMessageDto; +import org.dromara.common.sse.utils.SseMessageUtils; import org.dromara.resource.api.RemoteMessageService; import org.springframework.stereotype.Service; @@ -29,10 +29,10 @@ public class RemoteMessageServiceImpl implements RemoteMessageService { */ @Override public void publishMessage(Long sessionKey, String message) { - WebSocketMessageDto dto = new WebSocketMessageDto(); + SseMessageDto dto = new SseMessageDto(); dto.setMessage(message); - dto.setSessionKeys(List.of(sessionKey)); - WebSocketUtils.publishMessage(dto); + dto.setUserIds(List.of(sessionKey)); + SseMessageUtils.publishMessage(dto); } /** @@ -42,7 +42,7 @@ public class RemoteMessageServiceImpl implements RemoteMessageService { */ @Override public void publishAll(String message) { - WebSocketUtils.publishAll(message); + SseMessageUtils.publishAll(message); } }