|
|
@ -13,8 +13,14 @@ import java.util.Map;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 管理 Server-Sent Events (SSE) 连接
|
|
|
|
|
|
|
|
*
|
|
|
|
|
|
|
|
* @author Lion Li
|
|
|
|
|
|
|
|
*/
|
|
|
|
@Slf4j
|
|
|
|
@Slf4j
|
|
|
|
public class SseEmitterManager {
|
|
|
|
public class SseEmitterManager {
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 订阅的频道
|
|
|
|
* 订阅的频道
|
|
|
|
*/
|
|
|
|
*/
|
|
|
@ -22,24 +28,44 @@ public class SseEmitterManager {
|
|
|
|
|
|
|
|
|
|
|
|
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
|
|
|
|
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 建立与指定用户的 SSE 连接
|
|
|
|
|
|
|
|
*
|
|
|
|
|
|
|
|
* @param userId 用户的唯一标识符,用于区分不同用户的连接
|
|
|
|
|
|
|
|
* @param token 用户的唯一令牌,用于识别具体的连接
|
|
|
|
|
|
|
|
* @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件
|
|
|
|
|
|
|
|
*/
|
|
|
|
public SseEmitter connect(Long userId, String token) {
|
|
|
|
public SseEmitter connect(Long userId, String token) {
|
|
|
|
|
|
|
|
// 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap)
|
|
|
|
|
|
|
|
// 每个用户可以有多个 SSE 连接,通过 token 进行区分
|
|
|
|
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
|
|
|
|
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 创建一个新的 SseEmitter 实例,超时时间设置为 0 表示无限制
|
|
|
|
SseEmitter emitter = new SseEmitter(0L);
|
|
|
|
SseEmitter emitter = new SseEmitter(0L);
|
|
|
|
|
|
|
|
|
|
|
|
emitters.put(token, emitter);
|
|
|
|
emitters.put(token, emitter);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token
|
|
|
|
emitter.onCompletion(() -> emitters.remove(token));
|
|
|
|
emitter.onCompletion(() -> emitters.remove(token));
|
|
|
|
emitter.onTimeout(() -> emitters.remove(token));
|
|
|
|
emitter.onTimeout(() -> emitters.remove(token));
|
|
|
|
emitter.onError((e) -> emitters.remove(token));
|
|
|
|
emitter.onError((e) -> emitters.remove(token));
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
|
|
|
|
// 向客户端发送一条连接成功的事件
|
|
|
|
emitter.send(SseEmitter.event().comment("connected"));
|
|
|
|
emitter.send(SseEmitter.event().comment("connected"));
|
|
|
|
} catch (IOException e) {
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
|
|
|
// 如果发送消息失败,则从映射表中移除 emitter
|
|
|
|
emitters.remove(token);
|
|
|
|
emitters.remove(token);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return emitter;
|
|
|
|
return emitter;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 断开指定用户的 SSE 连接
|
|
|
|
|
|
|
|
*
|
|
|
|
|
|
|
|
* @param userId 用户的唯一标识符,用于区分不同用户的连接
|
|
|
|
|
|
|
|
* @param token 用户的唯一令牌,用于识别具体的连接
|
|
|
|
|
|
|
|
*/
|
|
|
|
public void disconnect(Long userId, String token) {
|
|
|
|
public void disconnect(Long userId, String token) {
|
|
|
|
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
|
|
|
|
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
|
|
|
|
if (emitters != null) {
|
|
|
|
if (emitters != null) {
|
|
|
|