找回密码
 立即注册
首页 业界区 业界 物联网之对接MQTT最佳实践

物联网之对接MQTT最佳实践

琶轮 2025-6-2 23:50:21
小伙伴们,你们好呀,我是老寇,跟我一起学习对接MQTT
安装EMQX

采用docker-compose一键式启动!!!
还没有安装docker朋友,参考文章下面两篇文章
# Ubuntu20.04安装Docker
# Centos7安装Docker 23.0.6
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!

使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!

使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
  1. services:
  2.     emqx:
  3.       image: emqx/emqx:5.4.1
  4.       container_name: emqx
  5.       # 保持容器在没有守护程序的情况下运行
  6.       tty: true
  7.       restart: always
  8.       privileged: true
  9.       ports:
  10.         - "1883:1883"
  11.         - "8083:8083"
  12.         - "8883:8883"
  13.         - "18083:18083"
  14.       environment:
  15.         - TZ=Asia/Shanghai
  16.       volumes:
  17.         # 挂载数据存储
  18.         - ./emqx/data:/opt/emqx/data
  19.         # 挂载日志文件
  20.         - ./emqx/log:/opt/emqx/log
  21.       networks:
  22.         - laokou_network
  23. networks:
  24.   laokou_network:
  25.     driver: bridge
复制代码
访问 http://127.0.0.1:18083 设置密码

EMQX MQTT【摘抄自官方文档】

EMQX官方文档
MQTT 是物联网 (IoT) 的 OASIS 标准消息传递协议。它被设计为一种极轻量的发布/订阅消息传输协议,非常适合以较小的代码占用空间和极低的网络带宽连接远程设备。MQTT 目前广泛应用于汽车、制造、电信、石油和天然气等众多行业。
EMQX 完全兼容 MQTT 5.0 和 3.x,本节将介绍 MQTT 相关功能的基本配置项,包括基本 MQTT 设置、订阅设置、会话设置、强制关闭设置和强制垃圾回收设置等
客户端对接

本文章采用三种客户端对接
维度PahoHivemq-MQTT-ClientVert.x MQTT Client协议支持MQTT 3.1.1(5.0 实验性)MQTT 5.0 完整支持MQTT 5.0(较新版本)性能中(同步模式)高(异步非阻塞)极高(响应式架构)依赖复杂度低中(仅 Netty)高(需 Vert.x 生态)社区资源丰富较少中等适用场景传统 IoT、跨语言项目企业级 MQTT 5.0、高吞吐响应式系统、高并发微服务Paho【不推荐,连接不稳定】

Paho代码地址
引入依赖
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.eclipse.paho</groupId>
  4.         org.eclipse.paho.mqttv5.client</artifactId>
  5.         <version>1.2.5</version>
  6.     </dependency>
  7.     <dependency>
  8.         <groupId>org.eclipse.paho</groupId>
  9.         org.eclipse.paho.client.mqttv3</artifactId>
  10.         <version>1.2.5</version>
  11.     </dependency>
  12. </dependencies>
复制代码
项目集成

PahoProperties
  1. /**
  2. * @author laokou
  3. */
  4. @Data
  5. public class PahoProperties {
  6.     private boolean auth = true;
  7.     private String username = "emqx";
  8.     private String password = "laokou123";
  9.     private String host = "127.0.0.1";
  10.     private int port = 1883;
  11.     private String clientId;
  12.     private int subscribeQos = 1;
  13.     private int publishQos = 0;
  14.     private int willQos = 1;
  15.     private int connectionTimeout = 60;
  16.     private boolean manualAcks = false;
  17.     // @formatter:off
  18.     /**
  19.      * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.
  20.      * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.
  21.      * ...
  22.      */
  23.     // @formatter:on
  24.     private boolean clearStart = false;
  25.     private int receiveMaximum = 10000;
  26.     private int maximumPacketSize = 10000;
  27.     // @formatter:off
  28.     /**
  29.      * 默认会话保留一天.
  30.      * 最大值,4294967295L,会话过期时间【永不过期,单位秒】.
  31.      * 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).
  32.      */
  33.     private long sessionExpiryInterval = 86400L;
  34.     // @formatter:on
  35.     /**
  36.      * 心跳包每隔60秒发一次.
  37.      */
  38.     private int keepAliveInterval = 60;
  39.     private boolean automaticReconnect = true;
  40.     private Set<String> topics = new HashSet<>(0);
  41. }
复制代码
PahoMqttClientMessageCallbackV5
  1. /**
  2. * @author laokou
  3. */
  4. @Slf4j
  5. @RequiredArgsConstructor
  6. public class PahoMqttClientMessageCallbackV5 implements MqttCallback {
  7.     private final List<MessageHandler> messageHandlers;
  8.     @Override
  9.     public void disconnected(MqttDisconnectResponse disconnectResponse) {
  10.        log.error("【Paho-V5】 => MQTT关闭连接");
  11.     }
  12.     @Override
  13.     public void mqttErrorOccurred(MqttException ex) {
  14.        log.error("【Paho-V5】 => MQTT报错,错误信息:{}", ex.getMessage());
  15.     }
  16.     @Override
  17.     public void messageArrived(String topic, MqttMessage message) {
  18.        for (MessageHandler messageHandler : messageHandlers) {
  19.           if (messageHandler.isSubscribe(topic)) {
  20.              log.info("【Paho-V5】 => MQTT接收到消息,Topic:{}", topic);
  21.              messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));
  22.           }
  23.        }
  24.     }
  25.     @Override
  26.     public void deliveryComplete(IMqttToken token) {
  27.        log.info("【Paho-V5】 => MQTT消息发送成功,消息ID:{}", token.getMessageId());
  28.     }
  29.     @Override
  30.     public void connectComplete(boolean reconnect, String uri) {
  31.        if (reconnect) {
  32.           log.info("【Paho-V5】 => MQTT重连成功,URI:{}", uri);
  33.        }
  34.        else {
  35.           log.info("【Paho-V5】 => MQTT建立连接,URI:{}", uri);
  36.        }
  37.     }
  38.     @Override
  39.     public void authPacketArrived(int reasonCode, MqttProperties properties) {
  40.        log.info("【Paho-V5】 => 接收到身份验证数据包:{}", reasonCode);
  41.     }
  42. }
复制代码
PahoV5MqttClientTest
  1. /**
  2. * @author laokou
  3. */
  4. @SpringBootTest
  5. @RequiredArgsConstructor
  6. @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
  7. class PahoV5MqttClientTest {
  8.     private final List<MessageHandler> messageHandlers;
  9.     @Test
  10.     void testMqttClient() throws InterruptedException {
  11.         ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
  12.         PahoProperties pahoProperties = new PahoProperties();
  13.         pahoProperties.setClientId("test-client-3");
  14.         pahoProperties.setTopics(Set.of("/test-topic-3/#"));
  15.         PahoMqttClientV5 pahoMqttClientV5 = new PahoMqttClientV5(pahoProperties, messageHandlers, scheduledExecutorService);
  16.         pahoMqttClientV5.open();
  17.         Thread.sleep(1000);
  18.         pahoMqttClientV5.publish("/test-topic-3/789", "Hello World789".getBytes());
  19.     }
  20. }
复制代码
PahoMqttClientMessageCallbackV3
  1. /**
  2. * @author laokou
  3. */
  4. @Slf4j
  5. @RequiredArgsConstructor
  6. public class PahoMqttClientMessageCallbackV3 implements MqttCallback {
  7.     private final List<MessageHandler> messageHandlers;
  8.     @Override
  9.     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  10.        log.info("【Paho-V3】 => MQTT消息发送成功,消息ID:{}", iMqttDeliveryToken.getMessageId());
  11.     }
  12.     @Override
  13.     public void connectionLost(Throwable throwable) {
  14.        log.error("【Paho-V3】 => MQTT关闭连接");
  15.     }
  16.     @Override
  17.     public void messageArrived(String topic, MqttMessage message) throws Exception {
  18.        for (MessageHandler messageHandler : messageHandlers) {
  19.           if (messageHandler.isSubscribe(topic)) {
  20.              log.info("【Paho-V3】 => MQTT接收到消息,Topic:{}", topic);
  21.              messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));
  22.           }
  23.        }
  24.     }
  25. }
复制代码
PahoV3MqttClientTest
  1. /**
  2. * @author laokou
  3. */
  4. @SpringBootTest
  5. @RequiredArgsConstructor
  6. @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
  7. class PahoV3MqttClientTest {
  8.     private final List<MessageHandler> messageHandlers;
  9.     @Test
  10.     void testMqttClient() throws InterruptedException {
  11.         ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
  12.         PahoProperties pahoProperties2 = new PahoProperties();
  13.         pahoProperties2.setClientId("test-client-4");
  14.         pahoProperties2.setTopics(Set.of("/test-topic-4/#"));
  15.         PahoMqttClientV3 pahoMqttClientV3 = new PahoMqttClientV3(pahoProperties2, messageHandlers, scheduledExecutorService);
  16.         pahoMqttClientV3.open();
  17.         Thread.sleep(1000);
  18.         pahoMqttClientV3.publish("/test-topic-4/000", "Hello World000".getBytes());
  19.     }
  20. }
复制代码
Hivemq-MQTT-Client【不推荐】

注意:订阅一段时间收不到数据,标准mqtt5.0协议,不兼容emqx broker mqtt5.0
Hivemq代码地址
引入依赖
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>com.hivemq</groupId>
  4.         hivemq-mqtt-client-reactor</artifactId>
  5.         <version>1.3.5</version>
  6.     </dependency>
  7.     <dependency>
  8.         <groupId>com.hivemq</groupId>
  9.         hivemq-mqtt-client-epoll</artifactId>
  10.         <version>1.3.5</version>
  11.         <type>pom</type>
  12.     </dependency>
  13. <dependencies>
复制代码
项目集成

HivemqProperties
  1. /**
  2. * @author laokou
  3. */
  4. @Data
  5. public class HivemqProperties {
  6.     private boolean auth = true;
  7.     private String username = "emqx";
  8.     private String password = "laokou123";
  9.     private String host = "127.0.0.1";
  10.     private int port = 1883;
  11.     private String clientId;
  12.     private int subscribeQos = 1;
  13.     private int publishQos = 0;
  14.     private int willQos = 1;
  15.     // @formatter:off
  16.     /**
  17.      * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.
  18.      * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.
  19.      * ...
  20.      */
  21.     // @formatter:on
  22.     private boolean clearStart = false;
  23.     private int receiveMaximum = 10000;
  24.     private int sendMaximum = 10000;
  25.     private int maximumPacketSize = 10000;
  26.     private int sendMaximumPacketSize = 10000;
  27.     private int topicAliasMaximum = 1024;
  28.     private int sendTopicAliasMaximum = 2048;
  29.     private long messageExpiryInterval = 86400L;
  30.     private boolean requestProblemInformation = true;
  31.     private boolean requestResponseInformation = true;
  32.     // @formatter:off
  33.     /**
  34.      * 默认会话保留一天.
  35.      * 最大值,4294967295L,会话过期时间【永不过期,单位秒】.
  36.      * 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).
  37.      */
  38.     private long sessionExpiryInterval = 86400L;
  39.     // @formatter:on
  40.     /**
  41.      * 心跳包每隔60秒发一次.
  42.      */
  43.     private int keepAliveInterval = 60;
  44.     private boolean automaticReconnect = true;
  45.     private long automaticReconnectMaxDelay = 5;
  46.     private long automaticReconnectInitialDelay = 1;
  47.     private Set<String> topics = new HashSet<>(0);
  48.     private int nettyThreads = 32;
  49.     private boolean retain = false;
  50.     private boolean noLocal = false;
  51. }
复制代码
HivemqClientV5
[code]/** * @author laokou */@Slf4jpublic class HivemqClientV5 {    /**     * 响应主题.     */    private final String RESPONSE_TOPIC = "response/topic";    /**     * 服务下线数据.     */    private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8);    /**     * 相关数据.     */    private final byte[] CORRELATION_DATA = "correlationData".getBytes(UTF_8);    private final HivemqProperties hivemqProperties;    private final List messageHandlers;    private volatile Mqtt5RxClient client;    private final Object lock = new Object();    private volatile Disposable connectDisposable;    private volatile Disposable subscribeDisposable;    private volatile Disposable unSubscribeDisposable;    private volatile Disposable publishDisposable;    private volatile Disposable disconnectDisposable;    private volatile Disposable consumeDisposable;    public HivemqClientV5(HivemqProperties hivemqProperties, List messageHandlers) {        this.hivemqProperties = hivemqProperties;        this.messageHandlers = messageHandlers;    }    public void open() {        if (Objects.isNull(client)) {            synchronized (lock) {                if (Objects.isNull(client)) {                    client = getMqtt5ClientBuilder().buildRx();                }            }        }        connect();        consume();    }    public void close() {        if (!Objects.isNull(client)) {            disconnectDisposable = client.disconnectWith()                    .sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval())                    .applyDisconnect()                    .subscribeOn(Schedulers.io())                    .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)                            .takeWhile(retryCount -> retryCount != -1)                            .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))                    .subscribe(() -> log.info("【Hivemq-V5】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()),                            e -> log.error("【Hivemq-V5】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e));        }    }    public void subscribe() {        String[] topics = getTopics();        subscribe(topics, getQosArray(topics));    }    public String[] getTopics() {        return hivemqProperties.getTopics().toArray(String[]::new);    }    public int[] getQosArray(String[] topics) {        return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray();    }    public void subscribe(String[] topics, int[] qosArray) {        checkTopicAndQos(topics, qosArray);        if (!Objects.isNull(client)) {            List subscriptions = new ArrayList(topics.length);            for (int i = 0; i < topics.length; i++) {                subscriptions.add(Mqtt5Subscription.builder()                        .topicFilter(topics)                        .qos(getMqttQos(qosArray))                        .retainAsPublished(hivemqProperties.isRetain())                        .noLocal(hivemqProperties.isNoLocal())                        .build());            }            subscribeDisposable = client.subscribeWith()                    .addSubscriptions(subscriptions)                    .applySubscribe()                    .subscribeOn(Schedulers.io())                    .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)                            .takeWhile(retryCount -> retryCount != -1)                            .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))                    .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log                            .error("【Hivemq-V5】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));        }    }    public void unSubscribe() {        String[] topics = hivemqProperties.getTopics().toArray(String[]::new);        unSubscribe(topics);    }    public void unSubscribe(String[] topics) {        checkTopic(topics);        if (!Objects.isNull(client)) {            List matchedTopics = new ArrayList(topics.length);            for (String topic : topics) {                matchedTopics.add(MqttTopicFilter.of(topic));            }            unSubscribeDisposable = client.unsubscribeWith()                    .addTopicFilters(matchedTopics)                    .applyUnsubscribe()                    .subscribeOn(Schedulers.io())                    .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)                            .takeWhile(retryCount -> retryCount != -1)                            .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))                    .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log                            .error("【Hivemq-V5】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));        }    }    public void publish(String topic, byte[] payload, int qos) {        if (!Objects.isNull(client)) {            publishDisposable = client                    .publish(Flowable.just(Mqtt5Publish.builder()                            .topic(topic)                            .qos(getMqttQos(qos))                            .payload(payload)                            .noMessageExpiry()                            .retain(hivemqProperties.isRetain())                            .messageExpiryInterval(hivemqProperties.getMessageExpiryInterval())                            .correlationData(CORRELATION_DATA)                            .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)                            .contentType("text/plain")                            .responseTopic(RESPONSE_TOPIC)                            .build()))                    .subscribeOn(Schedulers.io())                    .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)                            .takeWhile(retryCount -> retryCount != -1)                            .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))                    .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT消息发布成功,topic:{}", topic),                            e -> log.error("【Hivemq-V5】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e));        }    }    public void publish(String topic, byte[] payload) {        publish(topic, payload, hivemqProperties.getPublishQos());    }    public void dispose(Disposable disposable) {        if (!Objects.isNull(disposable) && !disposable.isDisposed()) {            // 显式取消订阅            disposable.dispose();        }    }    public void dispose() {        dispose(connectDisposable);        dispose(subscribeDisposable);        dispose(unSubscribeDisposable);        dispose(publishDisposable);        dispose(consumeDisposable);        dispose(disconnectDisposable);    }    public void reSubscribe() {        log.info("【Hivemq-V5】 => MQTT重新订阅开始");        dispose(subscribeDisposable);        subscribe();        log.info("【Hivemq-V5】 => MQTT重新订阅结束");    }    private MqttQos getMqttQos(int qos) {        return MqttQos.fromCode(qos);    }    private void connect() {        connectDisposable = client.connectWith()                .keepAlive(hivemqProperties.getKeepAliveInterval())                .cleanStart(hivemqProperties.isClearStart())                .sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval())                .willPublish()                .topic("will/topic")                .payload(WILL_PAYLOAD)                .qos(getMqttQos(hivemqProperties.getWillQos()))                .retain(true)                .messageExpiryInterval(100)                .delayInterval(10)                .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)                .contentType("text/plain")                .responseTopic(RESPONSE_TOPIC)                .correlationData(CORRELATION_DATA)                .applyWillPublish()                .restrictions()                .receiveMaximum(hivemqProperties.getReceiveMaximum())                .sendMaximum(hivemqProperties.getSendMaximum())                .maximumPacketSize(hivemqProperties.getMaximumPacketSize())                .sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize())                .topicAliasMaximum(hivemqProperties.getTopicAliasMaximum())                .sendTopicAliasMaximum(hivemqProperties.getSendTopicAliasMaximum())                .requestProblemInformation(hivemqProperties.isRequestProblemInformation())                .requestResponseInformation(hivemqProperties.isRequestResponseInformation())                .applyRestrictions()                .applyConnect()                .toFlowable()                .firstElement()                .subscribeOn(Schedulers.io())                .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)                        .takeWhile(retryCount -> retryCount != -1)                        .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))                .subscribe(                        ack -> log.info("【Hivemq-V5】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(),                                hivemqProperties.getPort(), hivemqProperties.getClientId()),                        e -> log.error("【Hivemq-V5】 => MQTT连接失败,错误信息:{}", e.getMessage(), e));    }    private void consume() {        if (!Objects.isNull(client)) {            consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL)                    .onBackpressureBuffer(8192)                    .observeOn(Schedulers.computation(), false, 8192)                    .doOnSubscribe(subscribe -> {                        log.info("【Hivemq-V5】 => MQTT开始订阅消息,请稍候。。。。。。");                        reSubscribe();                    })                    .subscribeOn(Schedulers.io())                    .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)                            .takeWhile(retryCount -> retryCount != -1)                            .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))                    .subscribe(publish -> {                                for (MessageHandler messageHandler : messageHandlers) {                                    if (messageHandler.isSubscribe(publish.getTopic().toString())) {                                        log.info("【Hivemq-V5】 => MQTT接收到消息,Topic:{}", publish.getTopic());                                        messageHandler                                                .handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString()));                                    }                                }                            }, e -> log.error("【Hivemq-V5】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e),                            () -> log.info("【Hivemq-V5】 => MQTT订阅消息结束,请稍候。。。。。。"));        }    }    private Mqtt5ClientBuilder getMqtt5ClientBuilder() {        Mqtt5ClientBuilder builder = Mqtt5Client.builder().addConnectedListener(listener -> {                    Optional

相关推荐

您需要登录后才可以回帖 登录 | 立即注册