change - broker集群订阅Redis实现消息下发

dev
Wen JY 1 year ago
parent 689c39d829
commit a36ab5537e

@ -9,7 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rouyi-modules-mqttBroker</artifactId>
<artifactId>hw-mqtt-broker</artifactId>
<description>
海威物联网平台Broker模块
@ -22,11 +22,10 @@
<dependencies>
<!-- SpringCloud Alibaba Nacos -->
<!--<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- SpringCloud Alibaba Nacos Config -->
<dependency>
@ -47,12 +46,6 @@
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-redis</artifactId>
<version>2.7.9</version>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-logging</artifactId>

@ -0,0 +1,35 @@
package com.hw.mqtt.config;
import com.hw.mqtt.enums.RedisKeys;
import com.hw.mqtt.listener.RedisMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @author Wen JY
* @description: TODO
* @date 2023-09-11 15:35:20
* @version: 1.0
*/
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter msgIngoListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener配置不同的交换机
container.addMessageListener(msgIngoListenerAdapter, new PatternTopic(RedisKeys.REDIS_CHANNEL_DOWN.getKey()));
return container;
}
@Bean
MessageListenerAdapter msgIngoListenerAdapter(RedisMessageListener receiver) {
return new MessageListenerAdapter(receiver, "deviceCommand");
}
}

@ -1,225 +0,0 @@
package com.hw.mqtt.domain;
import java.util.HashMap;
import java.util.Objects;
/**
* @author Wen JY
* @description: TODO
* @date 2023-08-17 13:33:31
* @version: 1.0
*/
public class AjaxResult extends HashMap<String, Object>
{
private static final long serialVersionUID = 1L;
/** 状态码 */
public static final String CODE_TAG = "code";
/** 返回内容 */
public static final String MSG_TAG = "msg";
/** 数据对象 */
public static final String DATA_TAG = "data";
/**
*
*/
public enum Type
{
/** 成功 */
SUCCESS(0),
/** 警告 */
WARN(301),
/** 错误 */
ERROR(500);
private final int value;
Type(int value)
{
this.value = value;
}
public int value()
{
return this.value;
}
}
/**
* AjaxResult 使
*/
public AjaxResult()
{
}
/**
* AjaxResult
*
* @param type
* @param msg
*/
public AjaxResult(Type type, String msg)
{
super.put(CODE_TAG, type.value);
super.put(MSG_TAG, msg);
}
/**
* AjaxResult
*
* @param type
* @param msg
* @param data
*/
public AjaxResult(Type type, String msg, Object data)
{
super.put(CODE_TAG, type.value);
super.put(MSG_TAG, msg);
if (isNotNull(data))
{
super.put(DATA_TAG, data);
}
}
/**
*
*
* @return
*/
public static AjaxResult success()
{
return AjaxResult.success("操作成功");
}
/**
*
*
* @return
*/
public static AjaxResult success(Object data)
{
return AjaxResult.success("操作成功", data);
}
/**
*
*
* @param msg
* @return
*/
public static AjaxResult success(String msg)
{
return AjaxResult.success(msg, null);
}
/**
*
*
* @param msg
* @param data
* @return
*/
public static AjaxResult success(String msg, Object data)
{
return new AjaxResult(Type.SUCCESS, msg, data);
}
/**
*
*
* @param msg
* @return
*/
public static AjaxResult warn(String msg)
{
return AjaxResult.warn(msg, null);
}
/**
*
*
* @param msg
* @param data
* @return
*/
public static AjaxResult warn(String msg, Object data)
{
return new AjaxResult(Type.WARN, msg, data);
}
/**
*
*
* @return
*/
public static AjaxResult error()
{
return AjaxResult.error("操作失败");
}
/**
*
*
* @param msg
* @return
*/
public static AjaxResult error(String msg)
{
return AjaxResult.error(msg, null);
}
/**
*
*
* @param msg
* @param data
* @return
*/
public static AjaxResult error(String msg, Object data)
{
return new AjaxResult(Type.ERROR, msg, data);
}
/**
*
*
* @return
*/
public boolean isSuccess()
{
return !isError();
}
/**
*
*
* @return
*/
public boolean isError()
{
return Objects.equals(Type.ERROR.value, this.get(CODE_TAG));
}
/**
* 便
*
* @param key
* @param value
* @return
*/
@Override
public AjaxResult put(String key, Object value)
{
super.put(key, value);
return this;
}
public static boolean isNotNull(Object object)
{
return !isNull(object);
}
public static boolean isNull(Object object)
{
return object == null;
}
}

@ -1,25 +0,0 @@
package com.hw.mqtt.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author WenJY
* @date 20230314 13:46
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerNode {
/**
*
*/
private String name;
/**
* ip:port
*/
private String peerHost;
}

@ -17,23 +17,20 @@
package com.hw.mqtt.listener;
import com.alibaba.fastjson2.JSONArray;
import com.hw.mqtt.domain.AjaxResult;
import com.hw.mqtt.enums.RedisKeys;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -50,26 +47,26 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);
private final ApplicationContext context;
private final MicaRedisCache redisCache;
private final StringRedisTemplate redisTemplate;
private MqttServerCreator serverCreator;
private MqttServerTemplate mqttServerTemplate;
public MqttConnectStatusListener(ApplicationContext context, MicaRedisCache redisCache) {
public MqttConnectStatusListener(ApplicationContext context, StringRedisTemplate redisTemplate) {
this.context = context;
this.redisCache = redisCache;
this.redisTemplate = redisTemplate;
}
@Override
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
redisCache.sAdd(getRedisKey(), clientId);
redisTemplate.opsForSet().add(getRedisKey(), clientId);
pushConnectStatus(clientId,1);
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
redisCache.sRem(getRedisKey(), clientId);
redisTemplate.opsForSet().remove(getRedisKey(), clientId);
pushConnectStatus(clientId,2);
}
@ -91,7 +88,7 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
@Override
public void destroy() throws Exception {
// 停机时删除集合
redisCache.del(getRedisKey());
redisTemplate.opsForSet().remove(getRedisKey());
}
/**

@ -0,0 +1,56 @@
package com.hw.mqtt.listener;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.utils.StringUtils;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @author Wen JY
* @description: TODO
* @date 2023-09-11 15:28:18
* @version: 1.0
*/
@Component
public class RedisMessageListener {
private static final Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);
@Autowired
private MqttServerTemplate mqttServerTemplate;
/**
*
* @param message
*/
public void deviceCommand(String message){
try {
if(StringUtils.isEmpty(message)){
logger.warn("Redis订阅内容为空");
return;
}
logger.info("Redis订阅内容"+message);
JSONObject jsonObject = JSONObject.parseObject(message);
String topic = jsonObject.get("Topic").toString();
String payload = jsonObject.get("Payload").toString();
boolean publishResult = mqttServerTemplate.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8));
if(publishResult){
logger.info("Topic:"+topic+";Payload:"+payload+";消息发布成功");
}else {
logger.info("Topic:"+topic+";Payload:"+payload+";消息发布失败!!!");
}
}catch (Exception e){
logger.error("设备控制指令下发异常:"+e.getMessage());
}
}
}

@ -1,62 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.service;
import java.util.List;
/**
* mqtt broker
*
* @author L.cm
*/
public interface IMqttBrokerService {
/**
* 线
* @author WenJY
* @date 2023-03-14 13:51
* @return long
*/
long getOnlineClientSize();
/**
* 线
* @author WenJY
* @date 2023-03-14 13:51
* @return java.util.List<java.lang.String>
*/
List<String> getOnlineClients();
/**
*
* @author WenJY
* @date 2023-03-14 13:50
* @param topic
* @param payload
* @return boolean
*/
boolean publish(String topic,String payload);
/**
*
* @author WenJY
* @date 2023-03-18 9:23
* @param clientId
*/
boolean closeClientById(String clientId);
}

@ -1,96 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.service.impl;
import com.hw.mqtt.enums.RedisKeys;
import com.hw.mqtt.service.IMqttBrokerService;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import net.dreamlu.mica.core.utils.StringPool;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* mqtt broker
*
* @author L.cm
*/
@Service
public class MqttBrokerServiceImpl implements IMqttBrokerService {
private static final Logger logger = LoggerFactory.getLogger(MqttBrokerServiceImpl.class);
@Autowired private MicaRedisCache redisCache;
@Autowired private MqttServerTemplate server;
@Override
public long getOnlineClientSize() {
Set<String> keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR));
if (keySet.isEmpty()) {
return 0L;
}
long result = 0;
for (String redisKey : keySet) {
Long count = redisCache.getSetOps().size(redisKey);
if (count != null) {
result += count;
}
}
return result;
}
@Override
public List<String> getOnlineClients() {
Set<String> keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR));
if (keySet.isEmpty()) {
return Collections.emptyList();
}
List<String> clientList = new ArrayList<>();
for (String redisKey : keySet) {
Set<String> members = redisCache.sMembers(redisKey);
if (members != null && !members.isEmpty()) {
clientList.addAll(members);
}
}
return clientList;
}
@Override
public boolean publish(String topic, String payload) {
boolean result = server.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8));
logger.info("Mqtt publishAll result:{};topic:{};payload:{}", result, topic, payload);
return result;
}
@Override
public boolean closeClientById(String clientId) {
try{
server.close(clientId);
}catch (Exception ex){
return false;
}
return true;
}
}

@ -1,27 +0,0 @@
package com.hw.mqtt.task;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
*
* @author WenJY
* @date 2023-03-14 12:16
* @param null
* @return null
*/
@Service
public class PublishAllTask {
@Autowired
private MqttServer mqttServer;
@Scheduled(fixedDelay = 1000)
public void run() {
mqttServer.publishAll("/test/heartbeat", "心跳指令无需处理".getBytes(StandardCharsets.UTF_8));
}
}

@ -1,40 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.util;
import net.dreamlu.mica.core.utils.CharPool;
/**
* redis
*
* @author L.cm
*/
public class RedisUtil {
/**
* redis pattern
*
* @return pattern
*/
public static String getTopicPattern(String topicFilter) {
// mqtt 分享主题 $share/{ShareName}/{filter}
return topicFilter
.replace(CharPool.PLUS, CharPool.STAR)
.replace(CharPool.HASH, CharPool.STAR);
}
}
Loading…
Cancel
Save