Add - Mqtt broker

pull/1/head
Wen JY 1 year ago
parent 5b80094a3f
commit 372bbee821

@ -0,0 +1,136 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!--<parent>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-modules</artifactId>
<version>3.6.3</version>
</parent>-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.9</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<artifactId>hw-mqtt-broker</artifactId>
<description>MQTT Broker</description>
<properties>
<revision>2.1.0</revision>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mica.version>2.7.9</mica.version>
<spring.boot.version>2.7.9</spring.boot.version>
<fastjson.version>1.2.83</fastjson.version>
<tinylog.version>2.6.0</tinylog.version>
<junit-jupiter.version>5.9.2</junit-jupiter.version>
</properties>
<dependencies>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-server-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-lite</artifactId>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-logging</artifactId>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-redis</artifactId>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-openapi</artifactId>
</dependency>
<!-- 开启 prometheus 指标收集,详见: http://localhost:30012/actuator/prometheus -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-bom</artifactId>
<version>${mica.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-server-spring-boot-starter</artifactId>
<version>${revision}</version>
</dependency>
<!-- tinylog 内存占用更小、性能更好,适合边缘设备 -->
<dependency>
<groupId>org.tinylog</groupId>
<artifactId>slf4j-tinylog</artifactId>
<version>${tinylog.version}</version>
</dependency>
<dependency>
<groupId>org.tinylog</groupId>
<artifactId>tinylog-impl</artifactId>
<version>${tinylog.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,13 @@
package com.hw.mqtt;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class HwMqttBrokerApplication {
public static void main(String[] args) {
SpringApplication.run(HwMqttBrokerApplication.class, args);
}
}

@ -0,0 +1,22 @@
package com.hw.mqtt.auth;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;
/**
* mqtt tcpwebsocket
* @author WenJY
* @date 2023-03-14 12:19
* @return null
*/
@Configuration(proxyBeanMethods = false)
public class MqttAuthHandler implements IMqttServerAuthHandler {
@Override
public boolean authenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) {
// 客户端认证逻辑实现
return true;
}
}

@ -0,0 +1,37 @@
package com.hw.mqtt.auth;
import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.server.http.api.result.Result;
import net.dreamlu.iot.mqtt.core.server.http.handler.HttpFilter;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Configuration;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
/**
* mqtt http
* @author WenJY
* @date 2023-03-14 12:20
* @return null
*/
@Configuration(proxyBeanMethods = false)
public class MqttHttpAuthFilter implements HttpFilter, InitializingBean {
@Override
public boolean filter(HttpRequest request) throws Exception {
// 自行实现逻辑
return true;
}
@Override
public HttpResponse response(HttpRequest request) {
// 认证不通过时的响应
return Result.fail(request, ResultCode.E103);
}
@Override
public void afterPropertiesSet() throws Exception {
MqttHttpRoutes.addFilter(this);
}
}

@ -0,0 +1,23 @@
package com.hw.mqtt.auth;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;
/**
*
* @author WenJY
* @date 2023-03-14 12:20
* @return null
*/
@Configuration(proxyBeanMethods = false)
public class MqttSubscribeValidator implements IMqttServerSubscribeValidator {
@Override
public boolean isValid(ChannelContext context, String clientId, String topicFilter, MqttQoS qoS) {
// 校验客户端订阅的 topic校验成功返回 true失败返回 false
return true;
}
}

@ -0,0 +1,22 @@
package com.hw.mqtt.auth;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;
/**
* clientId
* @author WenJY
* @date 2023-03-14 12:20
* @return null
*/
@Configuration(proxyBeanMethods = false)
public class MqttUniqueIdService implements IMqttServerUniqueIdService {
@Override
public String getUniqueId(ChannelContext context, String clientId, String userName, String password) {
// 返回的 uniqueId 会替代 mqtt client 传过来的 clientId请保证返回的 uniqueId 唯一。
return clientId;
}
}

@ -0,0 +1,81 @@
package com.hw.mqtt.controller;
import com.hw.mqtt.domain.AjaxResult;
import com.hw.mqtt.service.IMqttBrokerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
/**
* Broker Http
*/
@Tag(name = "Mqtt::服务端")
@RequestMapping("/mqtt/server")
@RestController
public class ServerController {
@Autowired
private IMqttBrokerService service;
@Operation(summary = "Broker发布消息")
@PostMapping("/publish")
public AjaxResult publish(String topic,String payload) {
AjaxResult ajaxResult;
boolean result = false;
try{
result = service.publish(topic,payload);
ajaxResult = AjaxResult.success(result);
}catch (Exception ex){
ajaxResult = AjaxResult.error("Broker发布消息异常"+ex.getMessage(),result);
}
return ajaxResult;
}
@Operation(summary = "获取在线客户端数量")
@GetMapping("/getOnlineClientSize")
public AjaxResult getOnlineClientSize() {
AjaxResult ajaxResult;
long onlineClientSize = 0;
try{
onlineClientSize = service.getOnlineClientSize();
ajaxResult = AjaxResult.success(onlineClientSize);
}catch (Exception ex){
ajaxResult = AjaxResult.error("获取在线客户端数量异常:"+ex.getMessage(),onlineClientSize);
}
return ajaxResult;
}
@Operation(summary = "获取在线客户端ID")
@GetMapping("/getOnlineClients")
public AjaxResult getOnlineClients() {
AjaxResult ajaxResult;
List<String> nodes = new ArrayList<>();
try{
nodes = service.getOnlineClients();
ajaxResult = AjaxResult.success(nodes);
}catch (Exception ex){
ajaxResult = AjaxResult.error("获取在线客户端ID异常"+ex.getMessage(),nodes);
}
return ajaxResult;
}
@Operation(summary = "主动关闭指定客户端连接")
@PostMapping("/closeClientById")
public AjaxResult closeClientById(String clientId) {
AjaxResult ajaxResult;
boolean result = false;
try{
result = service.closeClientById(clientId);
ajaxResult = AjaxResult.success(result);
}catch (Exception ex){
ajaxResult = AjaxResult.error("主动关闭指定客户端异常:"+ex.getMessage(),result);
}
return ajaxResult;
}
}

@ -0,0 +1,225 @@
package com.hw.mqtt.domain;
import java.util.HashMap;
import java.util.Objects;
/**
* @author Wen JY
* @description: TODO
* @date 2023-08-17 13:33:31
* @version: 1.0
*/
public class AjaxResult extends HashMap<String, Object>
{
private static final long serialVersionUID = 1L;
/** 状态码 */
public static final String CODE_TAG = "code";
/** 返回内容 */
public static final String MSG_TAG = "msg";
/** 数据对象 */
public static final String DATA_TAG = "data";
/**
*
*/
public enum Type
{
/** 成功 */
SUCCESS(0),
/** 警告 */
WARN(301),
/** 错误 */
ERROR(500);
private final int value;
Type(int value)
{
this.value = value;
}
public int value()
{
return this.value;
}
}
/**
* AjaxResult 使
*/
public AjaxResult()
{
}
/**
* AjaxResult
*
* @param type
* @param msg
*/
public AjaxResult(Type type, String msg)
{
super.put(CODE_TAG, type.value);
super.put(MSG_TAG, msg);
}
/**
* AjaxResult
*
* @param type
* @param msg
* @param data
*/
public AjaxResult(Type type, String msg, Object data)
{
super.put(CODE_TAG, type.value);
super.put(MSG_TAG, msg);
if (isNotNull(data))
{
super.put(DATA_TAG, data);
}
}
/**
*
*
* @return
*/
public static AjaxResult success()
{
return AjaxResult.success("操作成功");
}
/**
*
*
* @return
*/
public static AjaxResult success(Object data)
{
return AjaxResult.success("操作成功", data);
}
/**
*
*
* @param msg
* @return
*/
public static AjaxResult success(String msg)
{
return AjaxResult.success(msg, null);
}
/**
*
*
* @param msg
* @param data
* @return
*/
public static AjaxResult success(String msg, Object data)
{
return new AjaxResult(Type.SUCCESS, msg, data);
}
/**
*
*
* @param msg
* @return
*/
public static AjaxResult warn(String msg)
{
return AjaxResult.warn(msg, null);
}
/**
*
*
* @param msg
* @param data
* @return
*/
public static AjaxResult warn(String msg, Object data)
{
return new AjaxResult(Type.WARN, msg, data);
}
/**
*
*
* @return
*/
public static AjaxResult error()
{
return AjaxResult.error("操作失败");
}
/**
*
*
* @param msg
* @return
*/
public static AjaxResult error(String msg)
{
return AjaxResult.error(msg, null);
}
/**
*
*
* @param msg
* @param data
* @return
*/
public static AjaxResult error(String msg, Object data)
{
return new AjaxResult(Type.ERROR, msg, data);
}
/**
*
*
* @return
*/
public boolean isSuccess()
{
return !isError();
}
/**
*
*
* @return
*/
public boolean isError()
{
return Objects.equals(Type.ERROR.value, this.get(CODE_TAG));
}
/**
* 便
*
* @param key
* @param value
* @return
*/
@Override
public AjaxResult put(String key, Object value)
{
super.put(key, value);
return this;
}
public static boolean isNotNull(Object object)
{
return !isNull(object);
}
public static boolean isNull(Object object)
{
return object == null;
}
}

@ -0,0 +1,25 @@
package com.hw.mqtt.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author WenJY
* @date 20230314 13:46
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerNode {
/**
*
*/
private String name;
/**
* ip:port
*/
private String peerHost;
}

@ -0,0 +1,58 @@
package com.hw.mqtt.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author WenJY
* @date 20230314 13:36
*/
@Getter
@RequiredArgsConstructor
public enum RedisKeys {
/**
* mqtt
*/
SERVER_NODES("mqtt:server:nodes:"),
/**
* mqtt <-> redis pug/sub
*/
REDIS_CHANNEL_EXCHANGE("mqtt:channel:exchange"),
/**
* -> redis pug/sub mq
*/
REDIS_CHANNEL_UP("mqtt:channel:up"),
/**
* -> redis pug/sub 广 mqtt
*/
REDIS_CHANNEL_DOWN("mqtt:channel:down"),
/**
*
*/
CONNECT_STATUS("mqtt:connect:status:"),
/**
*
*/
MESSAGE_STORE_WILL("mqtt:messages:will:"),
/**
*
*/
MESSAGE_STORE_RETAIN("mqtt:messages:retain:"),
;
private final String key;
/**
*
*
* @param suffix
* @return redis key
*/
public String getKey(String suffix) {
return this.key.concat(suffix);
}
}

@ -0,0 +1,83 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.listener;
import com.hw.mqtt.enums.RedisKeys;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
/**
*
*
* @author WenJY
* @date 2023-03-14 12:17
* @param null
* @return null
*/
@Service
public class MqttConnectStatusListener implements IMqttConnectStatusListener, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);
private final ApplicationContext context;
private final MicaRedisCache redisCache;
private MqttServerCreator serverCreator;
public MqttConnectStatusListener(ApplicationContext context, MicaRedisCache redisCache) {
this.context = context;
this.redisCache = redisCache;
}
@Override
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
redisCache.sAdd(getRedisKey(), clientId);
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
redisCache.sRem(getRedisKey(), clientId);
}
/**
* 线key :nodeName
*
* @return redis key
*/
private String getRedisKey() {
return RedisKeys.CONNECT_STATUS.getKey(serverCreator.getNodeName());
}
@Override
public void afterSingletonsInstantiated() {
this.serverCreator = context.getBean(MqttServerCreator.class);
}
@Override
public void destroy() throws Exception {
// 停机时删除集合
redisCache.del(getRedisKey());
}
}

@ -0,0 +1,40 @@
package com.hw.mqtt.listener;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
/**
*
* @author WenJY
* @date 2023-03-14 12:18
* @param null
* @return null
*/
@Service
public class MqttServerMessageListener implements IMqttMessageListener, SmartInitializingSingleton {
private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class);
@Autowired
private ApplicationContext applicationContext;
private MqttServerTemplate mqttServerTemplate;
@Override
public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qos, MqttPublishMessage message) {
logger.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, ByteBufferUtil.toString(message.getPayload()));
}
@Override
public void afterSingletonsInstantiated() {
// 单利 bean 初始化完成之后从 ApplicationContext 中获取 bean
mqttServerTemplate = applicationContext.getBean(MqttServerTemplate.class);
}
}

@ -0,0 +1,62 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.service;
import java.util.List;
/**
* mqtt broker
*
* @author L.cm
*/
public interface IMqttBrokerService {
/**
* 线
* @author WenJY
* @date 2023-03-14 13:51
* @return long
*/
long getOnlineClientSize();
/**
* 线
* @author WenJY
* @date 2023-03-14 13:51
* @return java.util.List<java.lang.String>
*/
List<String> getOnlineClients();
/**
*
* @author WenJY
* @date 2023-03-14 13:50
* @param topic
* @param payload
* @return boolean
*/
boolean publish(String topic,String payload);
/**
*
* @author WenJY
* @date 2023-03-18 9:23
* @param clientId
*/
boolean closeClientById(String clientId);
}

@ -0,0 +1,29 @@
package com.hw.mqtt.service;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
*
* @author WenJY
* @date 2023-03-14 12:19
* @param null
* @return null
*/
@Service
public class ServerService {
private static final Logger logger = LoggerFactory.getLogger(ServerService.class);
@Autowired
private MqttServerTemplate server;
public boolean publish(String body) {
boolean result = server.publishAll("/test/message", body.getBytes(StandardCharsets.UTF_8));
logger.info("Mqtt publishAll result:{};payload:{}", result,body);
return result;
}
}

@ -0,0 +1,96 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.service.impl;
import com.hw.mqtt.enums.RedisKeys;
import com.hw.mqtt.service.IMqttBrokerService;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import net.dreamlu.mica.core.utils.StringPool;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* mqtt broker
*
* @author L.cm
*/
@Service
public class MqttBrokerServiceImpl implements IMqttBrokerService {
private static final Logger logger = LoggerFactory.getLogger(MqttBrokerServiceImpl.class);
@Autowired private MicaRedisCache redisCache;
@Autowired private MqttServerTemplate server;
@Override
public long getOnlineClientSize() {
Set<String> keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR));
if (keySet.isEmpty()) {
return 0L;
}
long result = 0;
for (String redisKey : keySet) {
Long count = redisCache.getSetOps().size(redisKey);
if (count != null) {
result += count;
}
}
return result;
}
@Override
public List<String> getOnlineClients() {
Set<String> keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR));
if (keySet.isEmpty()) {
return Collections.emptyList();
}
List<String> clientList = new ArrayList<>();
for (String redisKey : keySet) {
Set<String> members = redisCache.sMembers(redisKey);
if (members != null && !members.isEmpty()) {
clientList.addAll(members);
}
}
return clientList;
}
@Override
public boolean publish(String topic, String payload) {
boolean result = server.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8));
logger.info("Mqtt publishAll result:{};topic:{};payload:{}", result, topic, payload);
return result;
}
@Override
public boolean closeClientById(String clientId) {
try{
server.close(clientId);
}catch (Exception ex){
return false;
}
return true;
}
}

@ -0,0 +1,27 @@
package com.hw.mqtt.task;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
*
* @author WenJY
* @date 2023-03-14 12:16
* @param null
* @return null
*/
@Service
public class PublishAllTask {
@Autowired
private MqttServer mqttServer;
@Scheduled(fixedDelay = 1000)
public void run() {
mqttServer.publishAll("/test/heartbeat", "心跳指令无需处理".getBytes(StandardCharsets.UTF_8));
}
}

@ -0,0 +1,40 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.util;
import net.dreamlu.mica.core.utils.CharPool;
/**
* redis
*
* @author L.cm
*/
public class RedisUtil {
/**
* redis pattern
*
* @return pattern
*/
public static String getTopicPattern(String topicFilter) {
// mqtt 分享主题 $share/{ShareName}/{filter}
return topicFilter
.replace(CharPool.PLUS, CharPool.STAR)
.replace(CharPool.HASH, CharPool.STAR);
}
}

@ -0,0 +1,54 @@
server:
port: 30013
spring:
# redis 配置
redis:
# 地址
host: huawei-redis
# 端口默认为6379
port: 6379
# 密码
password: admin123
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 0
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
mqtt:
server:
enabled: true # 是否开启服务端默认true
# ip: 0.0.0.0 # 服务端 ip 默认为空0.0.0.0,建议不要设置
port: 1883 # 端口默认1883
name: Mqtt-Broker # 名称默认Mica-Mqtt-Server
buffer-allocator: HEAP # 堆内存和堆外内存,默认:堆内存
heartbeat-timeout: 120000 # 心跳超时,单位毫秒,默认: 1000 * 120
read-buffer-size: 8KB # 接收数据的 buffer size默认8k
max-bytes-in-message: 10MB # 消息解析最大 bytes 长度默认10M
auth:
enable: false # 是否开启 mqtt 认证
username: mica # mqtt 认证用户名
password: mica # mqtt 认证密码
debug: true # 如果开启 prometheus 指标收集建议关闭
stat-enable: true # 开启指标收集debug 和 prometheus 开启时需要打开,默认开启,关闭节省内存
web-port: 8083 # http、websocket 端口默认8083
websocket-enable: true # 是否开启 websocket默认 true
http-enable: false # 是否开启 http api默认 false
http-basic-auth:
enable: false # 是否开启 http basic auth默认 false
username: mica # http basic auth 用户名
password: mica # http basic auth 密码
ssl: # mqtt tcp ssl 认证
enabled: false # 是否开启 ssl 认证2.1.0 开始支持双向认证
keystore-path: # 必须参数ssl keystore 目录,支持 classpath:/ 路径。
keystore-pass: # 必选参数ssl keystore 密码
truststore-path: # 可选参数ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
truststore-pass: # 可选参数ssl 双向认证 truststore 密码
client-auth: none # 是否需要客户端认证双向认证默认NONE不需要

@ -0,0 +1,33 @@
spring:
application:
name: Mqtt-Broker
profiles:
active: server
springdoc:
swagger-ui:
urls:
- name: swagger
url: /v3/api-docs
# actuator management
management:
info:
defaults:
enabled: true
metrics:
tags:
application: ${spring.application.name}
endpoint:
health:
show-details: ALWAYS
prometheus:
enabled: true
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: info
server: info # t-io ???????
org.tio: info # t-io ???????

@ -0,0 +1,10 @@
${AnsiColor.RED} ## ## ####### ######## ########
${AnsiColor.RED} ### ### ## ## ## ##
${AnsiColor.RED} #### #### ## ## ## ##
${AnsiColor.RED} ## ### ## ## ## ## ##
${AnsiColor.RED} ## ## ## ## ## ## ##
${AnsiColor.RED} ## ## ## ## ## ##
${AnsiColor.RED} ## ## ##### ## ## ##
${AnsiColor.BRIGHT_BLUE}:: ${spring.application.name} :: Running Spring Boot ${spring-boot.version} 🏃🏃🏃 ${AnsiColor.DEFAULT}

@ -13,6 +13,7 @@
<module>ruoyi-gen</module>
<module>ruoyi-job</module>
<module>ruoyi-file</module>
<module>hw-mqtt-broker</module>
</modules>
<artifactId>ruoyi-modules</artifactId>

Loading…
Cancel
Save