change - broker 向指定主题推送设备接入状态

dev
Wen JY 1 year ago
parent 2431cdee2e
commit 42d8d624ca

@ -16,9 +16,12 @@
package com.hw.mqtt.listener;
import com.alibaba.fastjson2.JSONArray;
import com.hw.mqtt.domain.AjaxResult;
import com.hw.mqtt.enums.RedisKeys;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,6 +32,11 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
*
*
@ -44,6 +52,7 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
private final ApplicationContext context;
private final MicaRedisCache redisCache;
private MqttServerCreator serverCreator;
private MqttServerTemplate mqttServerTemplate;
public MqttConnectStatusListener(ApplicationContext context, MicaRedisCache redisCache) {
this.context = context;
@ -54,12 +63,14 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
redisCache.sAdd(getRedisKey(), clientId);
pushConnectStatus(clientId,1);
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
redisCache.sRem(getRedisKey(), clientId);
pushConnectStatus(clientId,2);
}
/**
@ -74,6 +85,7 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
@Override
public void afterSingletonsInstantiated() {
this.serverCreator = context.getBean(MqttServerCreator.class);
this.mqttServerTemplate = context.getBean(MqttServerTemplate.class);
}
@Override
@ -81,4 +93,25 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
// 停机时删除集合
redisCache.del(getRedisKey());
}
/**
*
* @param clientId
* @param connectStatus
*/
public void pushConnectStatus(String clientId,Integer connectStatus){
Map<String,Object> entityMap = new HashMap<>();
entityMap.put("msg","设备设备连接状态信息");
entityMap.put("deviceType","edge");
entityMap.put("deviceCode",clientId);
entityMap.put("connectStatus",connectStatus);
entityMap.put("statusTime",System.currentTimeMillis());
String jsonString = JSONArray.toJSONString(entityMap);
boolean result = mqttServerTemplate.publishAll("/device/status/v1", jsonString.getBytes(StandardCharsets.UTF_8));
if(result){
logger.info("客户端:"+clientId+""+ (connectStatus == 1 ? "连接" :"断开") +"状态推送成功");
}else {
logger.info("客户端:"+clientId+""+ (connectStatus == 1 ? "连接" :"断开") +"状态推送失败");
}
}
}

Loading…
Cancel
Save