add 增加 ruoyi-common-sse 模块 支持SSE推送 比ws更轻量更稳定的推送

2.X
疯狂的狮子Li 7 months ago
parent a9d5f166f9
commit 7f4445eb50

@ -24,9 +24,14 @@ spring:
# username: ${datasource.system-postgres.username}
# password: ${datasource.system-postgres.password}
# 默认/推荐使用sse推送
sse:
enabled: true
path: /sse
websocket:
# 如果关闭 需要和前端开关一起关闭
enabled: true
enabled: false
# 路径
path: /websocket
# 设置访问源地址

@ -44,6 +44,7 @@
<module>ruoyi-common-social</module>
<module>ruoyi-common-nacos</module>
<module>ruoyi-common-bus</module>
<module>ruoyi-common-sse</module>
</modules>
<artifactId>ruoyi-common</artifactId>

@ -250,6 +250,13 @@
<version>${revision}</version>
</dependency>
<!-- sse -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-sse</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

@ -27,17 +27,6 @@
<groupId>cn.dev33</groupId>
<artifactId>sa-token-jwt</artifactId>
<version>${satoken.version}</version>
<exclusions>
<exclusion>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-jwt</artifactId>
</dependency>
<!-- RuoYi Api System -->

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ruoyi-common-sse</artifactId>
<description>
ruoyi-common-sse 模块
</description>
<dependencies>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-core</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-redis</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-satoken</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-json</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,28 @@
package org.dromara.common.sse.config;
import org.dromara.common.sse.core.SseEmitterManager;
import org.dromara.common.sse.listener.SseTopicListener;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
/**
* @author Lion Li
*/
@AutoConfiguration
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
@EnableConfigurationProperties(SseProperties.class)
public class SseAutoConfiguration {
@Bean
public SseEmitterManager sseEmitterManager() {
return new SseEmitterManager();
}
@Bean
public SseTopicListener sseTopicListener() {
return new SseTopicListener();
}
}

@ -0,0 +1,21 @@
package org.dromara.common.sse.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* SSE
*
* @author Lion Li
*/
@Data
@ConfigurationProperties("sse")
public class SseProperties {
private Boolean enabled;
/**
*
*/
private String path;
}

@ -0,0 +1,52 @@
package org.dromara.common.sse.controller;
import cn.dev33.satoken.stp.StpUtil;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.domain.R;
import org.dromara.common.satoken.utils.LoginHelper;
import org.dromara.common.sse.core.SseEmitterManager;
import org.dromara.common.sse.dto.SseMessageDto;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
@RestController
@RequiredArgsConstructor
public class SseController {
private final SseEmitterManager sseEmitterManager;
@GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect() {
String tokenValue = StpUtil.getTokenValue();
Long userId = LoginHelper.getUserId();
return sseEmitterManager.connect(userId, tokenValue);
}
@GetMapping(value = "${sse.path}/close")
public R<Void> close() {
String tokenValue = StpUtil.getTokenValue();
Long userId = LoginHelper.getUserId();
sseEmitterManager.disconnect(userId, tokenValue);
return R.ok();
}
@GetMapping(value = "${sse.path}/send")
public R<Void> send(Long userId, String msg) {
SseMessageDto dto = new SseMessageDto();
dto.setUserIds(List.of(userId));
dto.setMessage(msg);
sseEmitterManager.publishMessage(dto);
return R.ok();
}
@GetMapping(value = "${sse.path}/sendAll")
public R<Void> send(String msg) {
sseEmitterManager.publishAll(msg);
return R.ok();
}
}

@ -0,0 +1,135 @@
package org.dromara.common.sse.core;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.sse.dto.SseMessageDto;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Slf4j
public class SseEmitterManager {
/**
*
*/
private final static String SSE_TOPIC = "global:sse";
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
public SseEmitter connect(Long userId, String token) {
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
SseEmitter emitter = new SseEmitter(0L);
emitters.put(token, emitter);
emitter.onCompletion(() -> emitters.remove(token));
emitter.onTimeout(() -> emitters.remove(token));
emitter.onError((e) -> emitters.remove(token));
try {
emitter.send(SseEmitter.event().comment("connected"));
} catch (IOException e) {
emitters.remove(token);
}
return emitter;
}
public void disconnect(Long userId, String token) {
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
if (emitters != null) {
try {
emitters.get(token).send(SseEmitter.event().comment("disconnected"));
} catch (IOException ignore) {
}
emitters.remove(token);
}
}
/**
* SSE
*
* @param consumer SSE
*/
public void subscribeMessage(Consumer<SseMessageDto> consumer) {
RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
}
/**
*
*
* @param userId id
* @param message
*/
public void sendMessage(Long userId, String message) {
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
if (emitters != null) {
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
try {
entry.getValue().send(SseEmitter.event()
.name("message")
.reconnectTime(-1L)
.data(message));
} catch (Exception e) {
emitters.remove(entry.getKey());
}
}
}
}
/**
*
*
* @param message
*/
public void sendMessage(String message) {
for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
sendMessage(userId, message);
}
}
/**
* SSE
*
* @param sseMessageDto SSE
*/
public void publishMessage(SseMessageDto sseMessageDto) {
List<Long> unsentUserIds = new ArrayList<>();
// 当前服务内用户,直接发送消息
for (Long userId : sseMessageDto.getUserIds()) {
if (USER_TOKEN_EMITTERS.containsKey(userId)) {
sendMessage(userId, sseMessageDto.getMessage());
continue;
}
unsentUserIds.add(userId);
}
// 不在当前服务内用户,发布订阅消息
if (CollUtil.isNotEmpty(unsentUserIds)) {
SseMessageDto broadcastMessage = new SseMessageDto();
broadcastMessage.setMessage(sseMessageDto.getMessage());
broadcastMessage.setUserIds(unsentUserIds);
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage());
});
}
}
/**
* ()
*
* @param message
*/
public void publishAll(String message) {
SseMessageDto broadcastMessage = new SseMessageDto();
broadcastMessage.setMessage(message);
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
});
}
}

@ -0,0 +1,29 @@
package org.dromara.common.sse.dto;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.List;
/**
* dto
*
* @author zendwang
*/
@Data
public class SseMessageDto implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* session key
*/
private List<Long> userIds;
/**
*
*/
private String message;
}

@ -0,0 +1,48 @@
package org.dromara.common.sse.listener;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.sse.core.SseEmitterManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
/**
* SSE
*
* @author Lion Li
*/
@Slf4j
public class SseTopicListener implements ApplicationRunner, Ordered {
@Autowired
private SseEmitterManager sseEmitterManager;
/**
* Spring BootSSE
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sseEmitterManager.subscribeMessage((message) -> {
log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.getUserIds())) {
message.getUserIds().forEach(key -> {
sseEmitterManager.sendMessage(key, message.getMessage());
});
} else {
sseEmitterManager.sendMessage(message.getMessage());
}
});
log.info("初始化SSE主题订阅监听器成功");
}
@Override
public int getOrder() {
return -1;
}
}

@ -0,0 +1,58 @@
package org.dromara.common.sse.utils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.sse.core.SseEmitterManager;
import org.dromara.common.sse.dto.SseMessageDto;
/**
*
*
* @author Lion Li
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class SseMessageUtils {
private final static SseEmitterManager MANAGER = SpringUtils.getBean(SseEmitterManager.class);
/**
* WebSocket
*
* @param userId id
* @param message
*/
public static void sendMessage(Long userId, String message) {
MANAGER.sendMessage(userId, message);
}
/**
*
*
* @param message
*/
public static void sendMessage(String message) {
MANAGER.sendMessage(message);
}
/**
* SSE
*
* @param sseMessageDto SSE
*/
public static void publishMessage(SseMessageDto sseMessageDto) {
MANAGER.publishMessage(sseMessageDto);
}
/**
* ()
*
* @param message
*/
public static void publishAll(String message) {
MANAGER.publishAll(message);
}
}

@ -108,6 +108,11 @@
<artifactId>ruoyi-common-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-sse</artifactId>
</dependency>
<!-- RuoYi Api System -->
<dependency>
<groupId>org.dromara</groupId>

@ -3,8 +3,8 @@ package org.dromara.resource.dubbo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboService;
import org.dromara.common.websocket.dto.WebSocketMessageDto;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.common.sse.dto.SseMessageDto;
import org.dromara.common.sse.utils.SseMessageUtils;
import org.dromara.resource.api.RemoteMessageService;
import org.springframework.stereotype.Service;
@ -29,10 +29,10 @@ public class RemoteMessageServiceImpl implements RemoteMessageService {
*/
@Override
public void publishMessage(Long sessionKey, String message) {
WebSocketMessageDto dto = new WebSocketMessageDto();
SseMessageDto dto = new SseMessageDto();
dto.setMessage(message);
dto.setSessionKeys(List.of(sessionKey));
WebSocketUtils.publishMessage(dto);
dto.setUserIds(List.of(sessionKey));
SseMessageUtils.publishMessage(dto);
}
/**
@ -42,7 +42,7 @@ public class RemoteMessageServiceImpl implements RemoteMessageService {
*/
@Override
public void publishAll(String message) {
WebSocketUtils.publishAll(message);
SseMessageUtils.publishAll(message);
}
}

Loading…
Cancel
Save