add 新增 websocket 群发功能

2.X
疯狂的狮子Li 1 year ago
parent 3d6badb748
commit af2f1820f6

@ -5,6 +5,7 @@ import lombok.NoArgsConstructor;
import org.springframework.web.socket.WebSocketSession;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -31,6 +32,10 @@ public class WebSocketSessionHolder {
return USER_SESSION_MAP.get(sessionKey);
}
public static Set<Long> getSessionsAll() {
return USER_SESSION_MAP.keySet();
}
public static Boolean existSession(Long sessionKey) {
return USER_SESSION_MAP.containsKey(sessionKey);
}

@ -19,13 +19,18 @@ public class WebSocketTopicListener implements ApplicationRunner, Ordered {
@Override
public void run(ApplicationArguments args) throws Exception {
WebSocketUtils.subscribeMessage((message) -> {
log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage());
log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.getSessionKeys())) {
message.getSessionKeys().forEach(key -> {
if (WebSocketSessionHolder.existSession(key)) {
WebSocketUtils.sendMessage(key, message.getMessage());
}
});
} else {
WebSocketSessionHolder.getSessionsAll().forEach(key -> {
WebSocketUtils.sendMessage(key, message.getMessage());
});
}
});
log.info("初始化WebSocket主题订阅监听器成功");

@ -77,6 +77,22 @@ public class WebSocketUtils {
}
}
/**
* ()
*
* @param message
*/
public static void publishAll(String message) {
WebSocketSessionHolder.getSessionsAll().forEach(key -> {
WebSocketUtils.sendMessage(key, message);
});
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(message);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info(" WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
});
}
public static void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}

Loading…
Cancel
Save