扎先 发表于 2025-5-29 10:46:21

物联网之使用Vertx实现MQTT-Server最佳实践【响应式】

小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现MQTT-Server
实现MQTT-Server【响应式】

vertx-mqtt地址
实现思路

1.启动MQTT Server并绑定很多端口记录到缓存,服务注册到Nacos,通过接口的方式获取IP和端口【负载均衡】
2.MQTT Client连接MQTT Server并上报数据
3.MQTT Server接收到数据并通过MQ转发出去
代码比较简单,懒得讲解啦

代码比较简单,懒得讲解啦

代码比较简单,懒得讲解啦

实现过程

查看源码
kafka安装

采用docker-compose一键式启动!!!
还没有安装docker朋友,参考文章下面两篇文章
# Ubuntu20.04安装Docker
# Centos7安装Docker 23.0.6
services:
    kafka:
      image: bitnami/kafka:4.0.0
      container_name: kafka
      tty: true
      ports:
      - '9092:9092'
      - '9093:9093'
      environment:
      # 节点ID
      - KAFKA_BROKER_ID=1
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 定义kafka服务端socket监听端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 外网访问地址
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      # 节点ID
      - KAFKA_CFG_NODE_ID=1
      restart: always
      privileged: true
      networks:
      - laokou_network
networks:
    laokou_network:
      driver: bridge# 创建topic【进入bin目录执行】 => 每个topic 3个分区和一个副本
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_report --partitions 3 --replication-factor 1

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_reply --partitions 3 --replication-factor 1kafka【响应式】

1.依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
spring-kafka</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
reactor-kafka</artifactId>
<version>1.3.23</version>
</dependency>2.代码
KafkaAutoConfig
/**
* @author laokou
*/
@Configuration
public class KafkaAutoConfig {

    @Bean("defaultKafkaTemplate")
    @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
    public DefaultKafkaTemplate defaultKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
       return new DefaultKafkaTemplate(kafkaTemplate);
    }

    @Bean(value = "reactiveKafkaSender")
    @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
    public KafkaSender reactiveKafkaSender(SenderOptions<String, String> senderOptions) {
       return new ReactiveKafkaSender(
             new reactor.kafka.sender.internals.DefaultKafkaSender<>(ProducerFactory.INSTANCE, senderOptions));
    }

    @Bean
    @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
    public SenderOptions<String, String> senderOptions(KafkaProperties kafkaProperties) {
       Map<String, Object> props = new HashMap<>();
       KafkaProperties.Producer producer = kafkaProperties.getProducer();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
       props.put(ProducerConfig.ACKS_CONFIG, producer.getAcks());
       props.put(ProducerConfig.RETRIES_CONFIG, producer.getRetries());
       props.put(ProducerConfig.BATCH_SIZE_CONFIG, (int) producer.getBatchSize().toBytes());
       props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (int) producer.getBufferMemory().toBytes());
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       return SenderOptions.create(props);
    }

    @Bean
    @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
    public ReceiverOptions<String, String> receiverOptions(KafkaProperties kafkaProperties) {
       Map<String, Object> props = new HashMap<>();
       KafkaProperties.Consumer consumer = kafkaProperties.getConsumer();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
       props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.getGroupId());
       props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.getMaxPollRecords());
       props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.getEnableAutoCommit());
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       return ReceiverOptions.create(props);
    }

}KafkaSender
/**
* @author laokou
*/
public interface KafkaSender {

    Flux<Boolean> send(String topic, String payload);

}ReactiveKafkaSender
/**
* @author laokou
*/
@Slf4j
@RequiredArgsConstructor
public class ReactiveKafkaSender implements KafkaSender {

    private final DefaultKafkaSender<String, String> defaultKafkaSender;

    @Override
    public Flux<Boolean> send(String topic, String payload) {
       return defaultKafkaSender.send(Mono.just(SenderRecord.create(topic, null, null, null, payload, null)))
          .map(result -> {
             Exception exception = result.exception();
             if (ObjectUtils.isNotNull(exception)) {
                log.error("【Kafka】 => 发送消息失败,错误信息:{}", exception.getMessage(), exception);
                return false;
             }
             else {
                return true;
             }
          });
    }

}3.yaml配置【自动批量提交】
spring:
kafka:
    bootstrap-servers: kafka:9092
    consumer:
      group-id: laokou-mqtt
      # 禁用自动提交(按周期)已消费offset
      enable-auto-commit: true
      # 单次poll()调用返回的记录数
      max-poll-records: 50
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # 发生错误后,消息重发的次数。
      retries: 5
      # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 0
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      # 批量提交模式
      ack-mode: batch
      # 批量batch类型
      type: batch
      # topic不存在报错
      missing-topics-fatal: false
    admin:
      auto-create: falsemqtt-server【响应式】

依赖
<dependency>
<groupId>io.vertx</groupId>
vertx-mqtt</artifactId>
<version>4.5.14</version>
</dependency>VertxConfig
/**
* @author laokou
*/
@Configuration
public class VertxConfig {

    @Bean(destroyMethod = "close")
    public Vertx vertx() {
       VertxOptions vertxOptions = new VertxOptions();
       vertxOptions.setMaxEventLoopExecuteTime(30);
       vertxOptions.setWorkerPoolSize(40);
       vertxOptions.setMaxWorkerExecuteTime(30);
       vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);
       vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);
       vertxOptions.setPreferNativeTransport(true);
       vertxOptions.setInternalBlockingPoolSize(40);
       vertxOptions.setEventLoopPoolSize(Math.max(32, 2 * CpuCoreSensor.availableProcessors()));
       return Vertx.vertx(vertxOptions);
    }

}MqttServerProperties【配置了账号和密码】
/**
* @author laokou
*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.mqtt-server")
public class MqttServerProperties {

    private boolean auth = true;

    private String username = "vertx";

    private String password = "laokou123";

    private String host = "0.0.0.0";

    private int port = 0;

    private int threadSize = 32;

    private int maxMessageSize = 8196;

    private boolean isAutoClientId = true;

    private int maxClientIdLength = 30;

    private int timeoutOnConnect = 90;

    private boolean useWebSocket = false;

    private int webSocketMaxFrameSize = 65536;

    private boolean perFrameWebSocketCompressionSupported = true;

    private boolean perMessageWebSocketCompressionSupported = true;

    private int webSocketCompressionLevel = 6;

    private boolean webSocketAllowServerNoContext = false;

    private boolean webSocketPreferredClientNoContext = false;

    private boolean tcpNoDelay = true;

    private boolean tcpKeepAlive = false;

    private int tcpKeepAliveIdleSeconds = -1;

    private int tcpKeepAliveCount = -1;

    private int tcpKeepAliveIntervalSeconds = -1;

    private int soLinger = -1;

    private int idleTimeout = 0;

    private int readIdleTimeout = 0;

    private int writeIdleTimeout = 0;

    private TimeUnit idleTimeoutUnit = TimeUnit.SECONDS;

    private boolean ssl = false;

    private boolean tcpFastOpen = false;

    private boolean tcpCork = false;

    private boolean tcpQuickAck = false;

    private int tcpUserTimeout = 0;

}VertxMqttServer
/**
* @author laokou
*/
@Slf4j
public final class VertxMqttServer {

    private final Sinks.Many<MqttPublishMessage> messageSink = Sinks.many()
       .multicast()
       .onBackpressureBuffer(Integer.MAX_VALUE, false);

    private volatile Flux<MqttServer> mqttServer;

    private final Vertx vertx;

    private final MqttServerProperties properties;

    private final List<ReactiveMessageHandler> reactiveMessageHandlers;

    private volatile boolean isClosed = false;

    public VertxMqttServer(final Vertx vertx, final MqttServerProperties properties,
          List<ReactiveMessageHandler> reactiveMessageHandlers) {
       this.properties = properties;
       this.vertx = vertx;
       this.reactiveMessageHandlers = reactiveMessageHandlers;
    }

    public Flux<MqttServer> start() {
       return mqttServer = getMqttServerOptions().map(mqttServerOption -> MqttServer.create(vertx, mqttServerOption)
          .exceptionHandler(
                error -> log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,错误信息:{}", error.getMessage(), error))
          .endpointHandler(endpoint -> Optional.ofNullable(authHandler(endpoint))
             .ifPresent(e -> e.closeHandler(close -> log.info("【Vertx-MQTT-Server】 => MQTT客户端断开连接"))
                .subscribeHandler(subscribe -> {
                   for (MqttTopicSubscription topicSubscription : subscribe.topicSubscriptions()) {
                      log.info("【Vertx-MQTT-Server】 => MQTT客户端订阅主题:{}", topicSubscription.topicName());
                   }
                })
                .disconnectHandler(disconnect -> log.info("【Vertx-MQTT-Server】 => MQTT客户端主动断开连接"))
                .pingHandler(ping -> log.info("【Vertx-MQTT-Server】 => MQTT客户端发送心跳"))
                .publishHandler(messageSink::tryEmitNext)
                // 不保留会话
                .accept(false)))
          .listen(mqttServerOption.getPort(), mqttServerOption.getHost(), asyncResult -> {
             if (isClosed) {
                return;
             }
             if (asyncResult.succeeded()) {
                log.info("【Vertx-MQTT-Server】 => MQTT服务启动成功,主机:{},端口:{}", mqttServerOption.getHost(),
                      mqttServerOption.getPort());
                // 写入缓存
                PortCache.add(mqttServerOption.getPort());
             }
             else {
                log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,主机:{},端口:{},错误信息:{}", mqttServerOption.getHost(),
                      mqttServerOption.getPort(), asyncResult.cause().getMessage(), asyncResult.cause());
             }
          }));
    }

    public Flux<MqttServer> stop() {
       isClosed = true;
       return mqttServer.doOnNext(server -> server.close(completionHandler -> {
          if (completionHandler.succeeded()) {
             log.info("【Vertx-MQTT-Server】 => MQTT服务停止成功");
          }
          else {
             log.error("【Vertx-MQTT-Server】 => MQTT服务停止失败,错误信息:{}", completionHandler.cause().getMessage(),
                   completionHandler.cause());
          }
       }));
    }

    public Flux<Boolean> publish() {
       return messageSink.asFlux().flatMap(message -> {
          // @formatter:off
             // log.info("【Vertx-MQTT-Server】 => MQTT服务接收到消息,主题:{},内容:{}", message.topicName(), message.payload().toString());
             // @formatter:on
          return Flux
             .fromStream(reactiveMessageHandlers.stream()
                .filter(reactiveMessageHandler -> reactiveMessageHandler.isSubscribe(message.topicName())))
             .flatMap(reactiveMessageHandler -> reactiveMessageHandler
                .handle(new MqttMessage(message.payload(), message.topicName())));
       });
    }

    private int detectAvailablePort(String host) {
       try (ServerSocket socket = SSLServerSocketFactory.getDefault().createServerSocket()) {
          socket.bind(new InetSocketAddress(host, properties.getPort()));
          return socket.getLocalPort();
       }
       catch (IOException e) {
          throw new RuntimeException("Port auto-detection failed", e);
       }
    }

    private Flux<MqttServerOptions> getMqttServerOptions() {
       return Flux.range(1, Math.max(properties.getThreadSize(), CpuCoreSensor.availableProcessors()))
          .map(item -> getMqttServerOption());
    }

    /**
   * 认证.
   */
    private MqttEndpoint authHandler(MqttEndpoint endpoint) {
       MqttAuth mqttAuth = endpoint.auth();
       if (properties.isAuth()) {
          if (ObjectUtils.isNull(mqttAuth)) {
             endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
             return null;
          }
          if (!ObjectUtils.equals(mqttAuth.getUsername(), properties.getUsername())
                || !ObjectUtils.equals(mqttAuth.getPassword(), properties.getPassword())) {
             endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
             return null;
          }
       }
       return endpoint;
    }

    // @formatter:off
    private MqttServerOptions getMqttServerOption() {
       MqttServerOptions mqttServerOptions = new MqttServerOptions();
       mqttServerOptions.setHost(properties.getHost());
       mqttServerOptions.setPort(detectAvailablePort(properties.getHost()));
       mqttServerOptions.setMaxMessageSize(properties.getMaxMessageSize());
       mqttServerOptions.setAutoClientId(properties.isAutoClientId());
       mqttServerOptions.setMaxClientIdLength(properties.getMaxClientIdLength());
       mqttServerOptions.setTimeoutOnConnect(properties.getTimeoutOnConnect());
       mqttServerOptions.setUseWebSocket(properties.isUseWebSocket());
       mqttServerOptions.setWebSocketMaxFrameSize(properties.getWebSocketMaxFrameSize());
       mqttServerOptions.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());
       mqttServerOptions.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());
       mqttServerOptions.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());
       mqttServerOptions.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());
       mqttServerOptions.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());
       mqttServerOptions.setTcpNoDelay(properties.isTcpNoDelay());
       mqttServerOptions.setTcpKeepAlive(properties.isTcpKeepAlive());
       mqttServerOptions.setTcpKeepAliveIdleSeconds(properties.getTcpKeepAliveIdleSeconds());
       mqttServerOptions.setTcpKeepAliveIntervalSeconds(properties.getTcpKeepAliveIntervalSeconds());
       mqttServerOptions.setTcpKeepAliveCount(properties.getTcpKeepAliveCount());
       mqttServerOptions.setSoLinger(properties.getSoLinger());
       mqttServerOptions.setIdleTimeout(properties.getIdleTimeout());
       mqttServerOptions.setReadIdleTimeout(properties.getReadIdleTimeout());
       mqttServerOptions.setWriteIdleTimeout(properties.getWriteIdleTimeout());
       mqttServerOptions.setIdleTimeoutUnit(properties.getIdleTimeoutUnit());
       mqttServerOptions.setSsl(properties.isSsl());
       mqttServerOptions.setTcpFastOpen(properties.isTcpFastOpen());
       mqttServerOptions.setTcpCork(properties.isTcpCork());
       mqttServerOptions.setTcpQuickAck(properties.isTcpQuickAck());
       mqttServerOptions.setTcpUserTimeout(properties.getTcpUserTimeout());
       return mqttServerOptions;
    }
    // @formatter:on

}PortCache【缓存端口】
/**
* @author laokou
*/
public final class PortCache {

    private PortCache() {
    }

    public static final List<Integer> PORTS = new CopyOnWriteArrayList<>();

    public static void add(int port) {
       PORTS.add(port);
    }

    public static List<Integer> get() {
       return PORTS;
    }

    public static void clear() {
       PORTS.clear();
    }

}ReactiveMessageHandler【消息处理,没啥好说的,就是用来转发消息到MQ】
/**
* @author laokou
*/
public interface ReactiveMessageHandler {

    boolean isSubscribe(String topic);

    Flux<Boolean> handle(MqttMessage mqttMessage);

}/**
* 属性回复消息处理器.
*
* @author laokou
*/
@Component
@RequiredArgsConstructor
public class ReactivePropertyReplyMessageHandler implements ReactiveMessageHandler {

    private final KafkaSender kafkaSender;

    @Override
    public boolean isSubscribe(String topic) {
       return TopicUtils.match("/+/+/property/reply", topic);
    }

    @Override
    public Flux<Boolean> handle(MqttMessage mqttMessage) {
       return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPLY, mqttMessage.getPayload().toString());
    }

}/**
* 属性上报消息处理.
*
* @author laokou
*/
@Component
@RequiredArgsConstructor
public class ReactivePropertyReportMessageHandler implements ReactiveMessageHandler {

    private final KafkaSender kafkaSender;

    @Override
    public boolean isSubscribe(String topic) {
       return TopicUtils.match("/+/+/property/report", topic);
    }

    @Override
    public Flux<Boolean> handle(MqttMessage mqttMessage) {
       return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPORT, mqttMessage.getPayload().toString());
    }

}配置yaml
spring:
application:
    name: ${SERVICE_ID:laokou-mqtt}
threads:
    virtual:
      enabled: true
mqtt-server:
    auth: true
    username: vertx
    password: laokou123
    # 开启8196个端口
    thread-size: 8196启动MQTT-Server
/**
* @author laokou
*/
@Slf4j
@EnableDiscoveryClient
@RequiredArgsConstructor
@EnableConfigurationProperties
@SpringBootApplication(scanBasePackages = "org.laokou")
public class MqttServerApp implements CommandLineRunner {

    private final Vertx vertx;

    private final MqttServerProperties properties;

    private final List<ReactiveMessageHandler> reactiveMessageHandlers;

    private final ExecutorService virtualThreadExecutor;

    @Override
    public void run(String... args) {
       virtualThreadExecutor.execute(this::listenMessage);
    }

    private void listenMessage() {
       VertxMqttServer vertxMqttServer = new VertxMqttServer(vertx, properties, reactiveMessageHandlers);
       // 启动服务
       vertxMqttServer.start().subscribeOn(Schedulers.boundedElastic()).subscribe();
       // 推送数据
       vertxMqttServer.publish().subscribeOn(Schedulers.boundedElastic()).subscribe();
       Runtime.getRuntime().addShutdownHook(new Thread(() -> {
          // 清除缓存
          PortCache.clear();
          // 停止服务
          vertxMqttServer.stop().subscribeOn(Schedulers.boundedElastic()).subscribe();
       }));
    }

}启动好之后,请自行测试,这个东西没啥好说,vertx帮我们都实现了,就是简单调用API,自己玩吧~
我是老寇,我们下次再见啦~

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】