小伙伴们,你们好呀,我是老寇,跟我一起学习对接MQTT
安装EMQX
采用docker-compose一键式启动!!!
还没有安装docker朋友,参考文章下面两篇文章
# Ubuntu20.04安装Docker
# Centos7安装Docker 23.0.6
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
- services:
- emqx:
- image: emqx/emqx:5.4.1
- container_name: emqx
- # 保持容器在没有守护程序的情况下运行
- tty: true
- restart: always
- privileged: true
- ports:
- - "1883:1883"
- - "8083:8083"
- - "8883:8883"
- - "18083:18083"
- environment:
- - TZ=Asia/Shanghai
- volumes:
- # 挂载数据存储
- - ./emqx/data:/opt/emqx/data
- # 挂载日志文件
- - ./emqx/log:/opt/emqx/log
- networks:
- - laokou_network
- networks:
- laokou_network:
- driver: bridge
复制代码 访问 http://127.0.0.1:18083 设置密码
EMQX MQTT【摘抄自官方文档】
EMQX官方文档
MQTT 是物联网 (IoT) 的 OASIS 标准消息传递协议。它被设计为一种极轻量的发布/订阅消息传输协议,非常适合以较小的代码占用空间和极低的网络带宽连接远程设备。MQTT 目前广泛应用于汽车、制造、电信、石油和天然气等众多行业。
EMQX 完全兼容 MQTT 5.0 和 3.x,本节将介绍 MQTT 相关功能的基本配置项,包括基本 MQTT 设置、订阅设置、会话设置、强制关闭设置和强制垃圾回收设置等
客户端对接
本文章采用三种客户端对接
维度PahoHivemq-MQTT-ClientVert.x MQTT Client协议支持MQTT 3.1.1(5.0 实验性)MQTT 5.0 完整支持MQTT 5.0(较新版本)性能中(同步模式)高(异步非阻塞)极高(响应式架构)依赖复杂度低中(仅 Netty)高(需 Vert.x 生态)社区资源丰富较少中等适用场景传统 IoT、跨语言项目企业级 MQTT 5.0、高吞吐响应式系统、高并发微服务Paho【不推荐,连接不稳定】
Paho代码地址
引入依赖
- <dependencies>
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- org.eclipse.paho.mqttv5.client</artifactId>
- <version>1.2.5</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.2.5</version>
- </dependency>
- </dependencies>
复制代码 项目集成
PahoProperties- /**
- * @author laokou
- */
- @Data
- public class PahoProperties {
- private boolean auth = true;
- private String username = "emqx";
- private String password = "laokou123";
- private String host = "127.0.0.1";
- private int port = 1883;
- private String clientId;
- private int subscribeQos = 1;
- private int publishQos = 0;
- private int willQos = 1;
- private int connectionTimeout = 60;
- private boolean manualAcks = false;
- // @formatter:off
- /**
- * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.
- * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.
- * ...
- */
- // @formatter:on
- private boolean clearStart = false;
- private int receiveMaximum = 10000;
- private int maximumPacketSize = 10000;
- // @formatter:off
- /**
- * 默认会话保留一天.
- * 最大值,4294967295L,会话过期时间【永不过期,单位秒】.
- * 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).
- */
- private long sessionExpiryInterval = 86400L;
- // @formatter:on
- /**
- * 心跳包每隔60秒发一次.
- */
- private int keepAliveInterval = 60;
- private boolean automaticReconnect = true;
- private Set<String> topics = new HashSet<>(0);
- }
复制代码 PahoMqttClientMessageCallbackV5- /**
- * @author laokou
- */
- @Slf4j
- @RequiredArgsConstructor
- public class PahoMqttClientMessageCallbackV5 implements MqttCallback {
- private final List<MessageHandler> messageHandlers;
- @Override
- public void disconnected(MqttDisconnectResponse disconnectResponse) {
- log.error("【Paho-V5】 => MQTT关闭连接");
- }
- @Override
- public void mqttErrorOccurred(MqttException ex) {
- log.error("【Paho-V5】 => MQTT报错,错误信息:{}", ex.getMessage());
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) {
- for (MessageHandler messageHandler : messageHandlers) {
- if (messageHandler.isSubscribe(topic)) {
- log.info("【Paho-V5】 => MQTT接收到消息,Topic:{}", topic);
- messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));
- }
- }
- }
- @Override
- public void deliveryComplete(IMqttToken token) {
- log.info("【Paho-V5】 => MQTT消息发送成功,消息ID:{}", token.getMessageId());
- }
- @Override
- public void connectComplete(boolean reconnect, String uri) {
- if (reconnect) {
- log.info("【Paho-V5】 => MQTT重连成功,URI:{}", uri);
- }
- else {
- log.info("【Paho-V5】 => MQTT建立连接,URI:{}", uri);
- }
- }
- @Override
- public void authPacketArrived(int reasonCode, MqttProperties properties) {
- log.info("【Paho-V5】 => 接收到身份验证数据包:{}", reasonCode);
- }
- }
复制代码 PahoV5MqttClientTest- /**
- * @author laokou
- */
- @SpringBootTest
- @RequiredArgsConstructor
- @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
- class PahoV5MqttClientTest {
- private final List<MessageHandler> messageHandlers;
- @Test
- void testMqttClient() throws InterruptedException {
- ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
- PahoProperties pahoProperties = new PahoProperties();
- pahoProperties.setClientId("test-client-3");
- pahoProperties.setTopics(Set.of("/test-topic-3/#"));
- PahoMqttClientV5 pahoMqttClientV5 = new PahoMqttClientV5(pahoProperties, messageHandlers, scheduledExecutorService);
- pahoMqttClientV5.open();
- Thread.sleep(1000);
- pahoMqttClientV5.publish("/test-topic-3/789", "Hello World789".getBytes());
- }
- }
复制代码 PahoMqttClientMessageCallbackV3- /**
- * @author laokou
- */
- @Slf4j
- @RequiredArgsConstructor
- public class PahoMqttClientMessageCallbackV3 implements MqttCallback {
- private final List<MessageHandler> messageHandlers;
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- log.info("【Paho-V3】 => MQTT消息发送成功,消息ID:{}", iMqttDeliveryToken.getMessageId());
- }
- @Override
- public void connectionLost(Throwable throwable) {
- log.error("【Paho-V3】 => MQTT关闭连接");
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- for (MessageHandler messageHandler : messageHandlers) {
- if (messageHandler.isSubscribe(topic)) {
- log.info("【Paho-V3】 => MQTT接收到消息,Topic:{}", topic);
- messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));
- }
- }
- }
- }
复制代码 PahoV3MqttClientTest- /**
- * @author laokou
- */
- @SpringBootTest
- @RequiredArgsConstructor
- @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
- class PahoV3MqttClientTest {
- private final List<MessageHandler> messageHandlers;
- @Test
- void testMqttClient() throws InterruptedException {
- ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
- PahoProperties pahoProperties2 = new PahoProperties();
- pahoProperties2.setClientId("test-client-4");
- pahoProperties2.setTopics(Set.of("/test-topic-4/#"));
- PahoMqttClientV3 pahoMqttClientV3 = new PahoMqttClientV3(pahoProperties2, messageHandlers, scheduledExecutorService);
- pahoMqttClientV3.open();
- Thread.sleep(1000);
- pahoMqttClientV3.publish("/test-topic-4/000", "Hello World000".getBytes());
- }
- }
复制代码 Hivemq-MQTT-Client【不推荐】
注意:订阅一段时间收不到数据,标准mqtt5.0协议,不兼容emqx broker mqtt5.0
Hivemq代码地址
引入依赖
- <dependencies>
- <dependency>
- <groupId>com.hivemq</groupId>
- hivemq-mqtt-client-reactor</artifactId>
- <version>1.3.5</version>
- </dependency>
- <dependency>
- <groupId>com.hivemq</groupId>
- hivemq-mqtt-client-epoll</artifactId>
- <version>1.3.5</version>
- <type>pom</type>
- </dependency>
- <dependencies>
复制代码 项目集成
HivemqProperties- /**
- * @author laokou
- */
- @Data
- public class HivemqProperties {
- private boolean auth = true;
- private String username = "emqx";
- private String password = "laokou123";
- private String host = "127.0.0.1";
- private int port = 1883;
- private String clientId;
- private int subscribeQos = 1;
- private int publishQos = 0;
- private int willQos = 1;
- // @formatter:off
- /**
- * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.
- * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.
- * ...
- */
- // @formatter:on
- private boolean clearStart = false;
- private int receiveMaximum = 10000;
- private int sendMaximum = 10000;
- private int maximumPacketSize = 10000;
- private int sendMaximumPacketSize = 10000;
- private int topicAliasMaximum = 1024;
- private int sendTopicAliasMaximum = 2048;
- private long messageExpiryInterval = 86400L;
- private boolean requestProblemInformation = true;
- private boolean requestResponseInformation = true;
- // @formatter:off
- /**
- * 默认会话保留一天.
- * 最大值,4294967295L,会话过期时间【永不过期,单位秒】.
- * 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).
- */
- private long sessionExpiryInterval = 86400L;
- // @formatter:on
- /**
- * 心跳包每隔60秒发一次.
- */
- private int keepAliveInterval = 60;
- private boolean automaticReconnect = true;
- private long automaticReconnectMaxDelay = 5;
- private long automaticReconnectInitialDelay = 1;
- private Set<String> topics = new HashSet<>(0);
- private int nettyThreads = 32;
- private boolean retain = false;
- private boolean noLocal = false;
- }
复制代码 HivemqClientV5
[code]/** * @author laokou */@Slf4jpublic class HivemqClientV5 { /** * 响应主题. */ private final String RESPONSE_TOPIC = "response/topic"; /** * 服务下线数据. */ private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8); /** * 相关数据. */ private final byte[] CORRELATION_DATA = "correlationData".getBytes(UTF_8); private final HivemqProperties hivemqProperties; private final List messageHandlers; private volatile Mqtt5RxClient client; private final Object lock = new Object(); private volatile Disposable connectDisposable; private volatile Disposable subscribeDisposable; private volatile Disposable unSubscribeDisposable; private volatile Disposable publishDisposable; private volatile Disposable disconnectDisposable; private volatile Disposable consumeDisposable; public HivemqClientV5(HivemqProperties hivemqProperties, List messageHandlers) { this.hivemqProperties = hivemqProperties; this.messageHandlers = messageHandlers; } public void open() { if (Objects.isNull(client)) { synchronized (lock) { if (Objects.isNull(client)) { client = getMqtt5ClientBuilder().buildRx(); } } } connect(); consume(); } public void close() { if (!Objects.isNull(client)) { disconnectDisposable = client.disconnectWith() .sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval()) .applyDisconnect() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(() -> log.info("【Hivemq-V5】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()), e -> log.error("【Hivemq-V5】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e)); } } public void subscribe() { String[] topics = getTopics(); subscribe(topics, getQosArray(topics)); } public String[] getTopics() { return hivemqProperties.getTopics().toArray(String[]::new); } public int[] getQosArray(String[] topics) { return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray(); } public void subscribe(String[] topics, int[] qosArray) { checkTopicAndQos(topics, qosArray); if (!Objects.isNull(client)) { List subscriptions = new ArrayList(topics.length); for (int i = 0; i < topics.length; i++) { subscriptions.add(Mqtt5Subscription.builder() .topicFilter(topics) .qos(getMqttQos(qosArray)) .retainAsPublished(hivemqProperties.isRetain()) .noLocal(hivemqProperties.isNoLocal()) .build()); } subscribeDisposable = client.subscribeWith() .addSubscriptions(subscriptions) .applySubscribe() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log .error("【Hivemq-V5】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e)); } } public void unSubscribe() { String[] topics = hivemqProperties.getTopics().toArray(String[]::new); unSubscribe(topics); } public void unSubscribe(String[] topics) { checkTopic(topics); if (!Objects.isNull(client)) { List matchedTopics = new ArrayList(topics.length); for (String topic : topics) { matchedTopics.add(MqttTopicFilter.of(topic)); } unSubscribeDisposable = client.unsubscribeWith() .addTopicFilters(matchedTopics) .applyUnsubscribe() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log .error("【Hivemq-V5】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e)); } } public void publish(String topic, byte[] payload, int qos) { if (!Objects.isNull(client)) { publishDisposable = client .publish(Flowable.just(Mqtt5Publish.builder() .topic(topic) .qos(getMqttQos(qos)) .payload(payload) .noMessageExpiry() .retain(hivemqProperties.isRetain()) .messageExpiryInterval(hivemqProperties.getMessageExpiryInterval()) .correlationData(CORRELATION_DATA) .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) .contentType("text/plain") .responseTopic(RESPONSE_TOPIC) .build())) .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT消息发布成功,topic:{}", topic), e -> log.error("【Hivemq-V5】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e)); } } public void publish(String topic, byte[] payload) { publish(topic, payload, hivemqProperties.getPublishQos()); } public void dispose(Disposable disposable) { if (!Objects.isNull(disposable) && !disposable.isDisposed()) { // 显式取消订阅 disposable.dispose(); } } public void dispose() { dispose(connectDisposable); dispose(subscribeDisposable); dispose(unSubscribeDisposable); dispose(publishDisposable); dispose(consumeDisposable); dispose(disconnectDisposable); } public void reSubscribe() { log.info("【Hivemq-V5】 => MQTT重新订阅开始"); dispose(subscribeDisposable); subscribe(); log.info("【Hivemq-V5】 => MQTT重新订阅结束"); } private MqttQos getMqttQos(int qos) { return MqttQos.fromCode(qos); } private void connect() { connectDisposable = client.connectWith() .keepAlive(hivemqProperties.getKeepAliveInterval()) .cleanStart(hivemqProperties.isClearStart()) .sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval()) .willPublish() .topic("will/topic") .payload(WILL_PAYLOAD) .qos(getMqttQos(hivemqProperties.getWillQos())) .retain(true) .messageExpiryInterval(100) .delayInterval(10) .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) .contentType("text/plain") .responseTopic(RESPONSE_TOPIC) .correlationData(CORRELATION_DATA) .applyWillPublish() .restrictions() .receiveMaximum(hivemqProperties.getReceiveMaximum()) .sendMaximum(hivemqProperties.getSendMaximum()) .maximumPacketSize(hivemqProperties.getMaximumPacketSize()) .sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize()) .topicAliasMaximum(hivemqProperties.getTopicAliasMaximum()) .sendTopicAliasMaximum(hivemqProperties.getSendTopicAliasMaximum()) .requestProblemInformation(hivemqProperties.isRequestProblemInformation()) .requestResponseInformation(hivemqProperties.isRequestResponseInformation()) .applyRestrictions() .applyConnect() .toFlowable() .firstElement() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe( ack -> log.info("【Hivemq-V5】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(), hivemqProperties.getPort(), hivemqProperties.getClientId()), e -> log.error("【Hivemq-V5】 => MQTT连接失败,错误信息:{}", e.getMessage(), e)); } private void consume() { if (!Objects.isNull(client)) { consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL) .onBackpressureBuffer(8192) .observeOn(Schedulers.computation(), false, 8192) .doOnSubscribe(subscribe -> { log.info("【Hivemq-V5】 => MQTT开始订阅消息,请稍候。。。。。。"); reSubscribe(); }) .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(publish -> { for (MessageHandler messageHandler : messageHandlers) { if (messageHandler.isSubscribe(publish.getTopic().toString())) { log.info("【Hivemq-V5】 => MQTT接收到消息,Topic:{}", publish.getTopic()); messageHandler .handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString())); } } }, e -> log.error("【Hivemq-V5】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e), () -> log.info("【Hivemq-V5】 => MQTT订阅消息结束,请稍候。。。。。。")); } } private Mqtt5ClientBuilder getMqtt5ClientBuilder() { Mqtt5ClientBuilder builder = Mqtt5Client.builder().addConnectedListener(listener -> { Optional |