当前位置: 首页 > news >正文

初探MQTT之安装配置及使用

  MQTT 是一种轻量级、基于发布/订阅模式的消息传输协议,旨在用极小的代码空间和网络带宽为物联网设备提供简单、可靠的消息传递服务。MQTT 经过多年的发展,如今已被广泛应用于各行各业,使得 MQTT 成为了物联网传输协议的事实标准。

  笔者公司近期让自己在微服务中添加一个MQTT模块,由于本人技术不是特别好,所以参考了很多博客,终于完成了一版初步使用的代码:


1、添加 mqtt属性配置类

点击查看代码
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;/*** @Desc: mqtt属性配置* @Author: w_jin* @Date: 2025/7/30*/
@Configuration
// 识别配置文件中,spring.mqtt下面的属性
@ConfigurationProperties("spring.mqtt")
public class MqttConfiguration {private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfiguration.class);/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;/*** 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息*/private static final byte[] WILL_DATA = "offline".getBytes();/*** 注册MQTT客户端工厂** @return MqttPahoClientFactory*/@Beanpublic MqttPahoClientFactory mqttClientFactory() {// 客户端工厂DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();// 连接配置MqttConnectOptions options = new MqttConnectOptions();// 设置连接的用户名options.setUserName(username);// 设置连接的密码options.setPassword(password.toCharArray());// 设置连接的地址options.setServerURIs(new String[]{hostUrl});// 如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持:// 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。// 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。options.setCleanSession(true);// 设置超时时间,该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。// 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。options.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线// 此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0// 但这个方法并没有重连的机制options.setKeepAliveInterval(20);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。options.setWill("willTopic", WILL_DATA, 2, false);//自动重新连接options.setAutomaticReconnect(true);factory.setConnectionOptions(options);LOGGER.info("初始化 MQTT 配置: hostUrl={}, username={}, clientId={}", hostUrl, username, clientId);return factory;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}
}

2、添加 配置Mqtt消息发送通道

点击查看代码
import jakarta.annotation.Resource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** @Desc: 配置Mqtt消息发送通道* @Author: w_jin* @Date: 2025/7/30*/
@Configuration
public class MqttOutboundConfiguration {@Resourceprivate MqttConfiguration mqttConfiguration;/*** MQTT信息通道(生产者)*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** MQTT消息处理器(生产者)*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {// 客户端idString clientId = mqttConfiguration.getClientId();// 默认主题String defaultTopic = mqttConfiguration.getDefaultTopic();MqttPahoClientFactory mqttPahoClientFactory = mqttConfiguration.mqttClientFactory();// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactoryMqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producers", mqttPahoClientFactory);// true,异步,发送消息时将不会阻塞。messageHandler.setAsync(true);
//        messageHandler.setDefaultTopic(defaultTopic);// 默认QoSmessageHandler.setDefaultQos(1);// Paho消息转换器DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();// defaultPahoMessageConverter.setPayloadAsBytes(true);// 发送默认按字节类型发送消息messageHandler.setConverter(defaultPahoMessageConverter);return messageHandler;}}

3、添加 mqtt消息发送接口

点击查看代码
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @Desc: mqtt消息发送接口* @Author: w_jin* @Date: 2025/7/30*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {/*** 发送mqtt消息** @param topic   主题* @param payload 内容*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送包含qos的消息** @param topic   主题* @param qos     对消息处理的几种机制。*                * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>*                * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>*                * 2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);/*** 发送包含qos的消息** @param topic   主题* @param qos     对消息处理的几种机制。*                * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>*                * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>*                * 2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);}

4、添加 mqtt消息发送者

点击查看代码
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;/*** @Desc: mqtt消息发送者* @Author: w_jin* @Date: 2025/7/30*/
@Component
public class MqttMessageSender {private static final Logger LOGGER = LoggerFactory.getLogger(MqttMessageSender.class);@Resourceprivate MqttGateway mqttGateway;/*** 发送mqtt消息** @param topic   主题* @param message 内容* @return void*/public boolean send(String topic, String message) {LOGGER.info("给主题:{} 发送mqtt消息:{}", topic, message);try {mqttGateway.sendToMqtt(topic, message);return true;} catch (Exception e) {LOGGER.error("发送mqtt消息异常:{}", e.getMessage());return false;}}/*** 发送包含qos的消息** @param topic       主题* @param qos         质量* @param messageBody 消息体* @return void*/public boolean send(String topic, int qos, JSONObject messageBody) {LOGGER.info("给主题:{} 发送mqtt消息:{} 质量为:{}", topic, messageBody, qos);try {mqttGateway.sendToMqtt(topic, qos, messageBody.toString());return true;} catch (Exception e) {LOGGER.error("给主题:{} 发送mqtt消息:{} 失败", topic, messageBody, e);return false;}}/*** 发送包含qos的消息** @param topic   主题* @param qos     质量* @param message 消息体* @return void*/public boolean send(String topic, int qos, byte[] message) {LOGGER.info("给主题:{} 发送mqtt消息:{} 质量为:{}", topic, message, qos);try {mqttGateway.sendToMqtt(topic, qos, message);return true;} catch (Exception e) {LOGGER.error("给主题:{} 发送mqtt消息:{} 失败", topic, message);return false;}}
}

 这套代码还需要进行优化,优化的方向是组件化。

代码参考至:https://blog.csdn.net/m0_74825634/article/details/146431834

http://www.sczhlp.com/news/1875/

相关文章:

  • 7月30日
  • libjit的编译问题
  • 漏电断路检测中探头的选择与应用
  • 电机控制笔记:理解直流无刷电机矢量控制
  • 第二十七天
  • eBPF 赋能云原生: WizTelemetry 无侵入网络可观测实践
  • AIX 双网卡绑定
  • Redis(可视化)GUI连接工具推荐
  • 示波器电流探头5mv/A和10mv/A的区别
  • RN-async await
  • Nginx 日志轮转实战指南:按天、周、月、年自动切割与归档(附完整案例)
  • Python使用\N{ANIMAL_NAME}语法直接输出十二生肖
  • electron-egg实现全量更新和增量更新(上)
  • iOS WebView 加载失败与缓存刷新问题排查实战指南
  • 数字孪生技术是如何帮助物流行业发展的?
  • ​​Linux PR(Priority)​​ 和 ​​NI(Nice)进程优先级详解
  • 软考系统分析师每日学习卡 | [日期:2025-07-30] | [今日主题:进程管理(一)]
  • c#nopi读取excel内容
  • 容器云网络故障深度排查:POD访问SVC超时全解析
  • 一些图论进阶
  • Django模型关系:从一对多到多对多全解析
  • Higress curl测试Mcp
  • 2025年10款必须知道的项目管理软件推荐,好用的项目管理工具都在这里!
  • 数字时代的隐私盾牌:深度解析Seaoss临时邮箱如何重塑你的网络安全
  • day6
  • 好用的临时邮箱十分钟邮箱推荐(亲测)
  • 临时邮箱、tempmail、十分钟邮箱、24小时邮箱、可丢弃邮箱推荐
  • SSH连接服务器正常显示GUI程序
  • PY_0001:python的安装和打包exe程序
  • 亲测好用的临时邮箱推荐