找回密码
 立即注册
首页 业界区 业界 Jetlinks 物联网平台 开源版学习源码分析

Jetlinks 物联网平台 开源版学习源码分析

吕梓美 2025-11-29 01:40:13
2022-06-25
Jetlinks

Jetlinks 是一个非常优秀物联网基础平台, 还支持开源二次开发, 且他们的开发团队还非常友好的, 即使你使用的是开源的版本还挺愿意帮你解决问题 (当然我司也购买了企业版, 但不能分享学习笔记)
文档&地址

社区版源码仓库
前端源码 gitee
前端源码 github
文档-源码启动
文档2-物模型
新文档-协议开发
[[N_JetlinksPro2.0 源码分析]]
[[N_Reactor 响应式框架编程]]
设备接入

设备接入流程图
1.png

网络 > 协议 > 网关

网络组件 (org.jetlinks.community.network.Network)

真正与设备连接交互的网络层, 用于管理各种网络服务(MQTT,TCP等),动态配置, 启停. 只负责接收/发送报文,不负责任何处理逻辑。
社区版, 网络组件的实现有四类:

  • org.jetlinks.community.network.tcp.server.TcpServer // 作为服务器接受设备端连接
  • org.jetlinks.community.network.tcp.client.TcpClient // 主动tcp连接设备端
  • org.jetlinks.community.network.mqtt.client.MqttClient //使用客户端连接第三方的MQTT服务器
  • org.jetlinks.community.network.mqtt.server.MqttServer //使用的本机MQTT服务, 接受设备端连接
网络组件, 支持提供关键的两个接口

  • org.jetlinks.community.network.Network
  1. public interface Network {
  2.     /**
  3.      * ID唯一标识
  4.      *
  5.      * @return ID
  6.      */
  7.     String getId();
  8.     /**
  9.      * @return 网络类型
  10.      * @see DefaultNetworkType
  11.      */
  12.     NetworkType getType();
  13.     /**
  14.      * 关闭网络组件
  15.      */
  16.     void shutdown();
  17.     /**
  18.      * @return 是否存活
  19.      */
  20.     boolean isAlive();
  21.     /**
  22.      * 当{@link Network#isAlive()}为false是,是否自动重新加载.
  23.      * @return 是否重新加载
  24.      * @see NetworkProvider#reload(Network, Object)
  25.      */
  26.     boolean isAutoReload();
  27. }
复制代码

  • org.jetlinks.community.network.NetworkProvider
  1. public interface NetworkProvider<P> {
  2.     /**
  3.      * @return 类型
  4.      * @see DefaultNetworkType
  5.      */
  6.     @Nonnull
  7.     NetworkType getType();
  8.     /**
  9.      * 使用配置创建一个网络组件
  10.      * @param properties 配置信息
  11.      * @return 网络组件
  12.      */
  13.     @Nonnull
  14.     Network createNetwork(@Nonnull P properties);
  15.     /**
  16.      * 重新加载网络组件
  17.      * @param network    网络组件
  18.      * @param properties 配置信息
  19.      */
  20.     void reload(@Nonnull Network network, @Nonnull P properties);
  21.     /**
  22.      * @return 配置定义元数据
  23.      */
  24.     @Nullable
  25.     ConfigMetadata getConfigMetadata();
  26.     /**
  27.      * 根据可序列化的配置信息创建网络组件配置
  28.      * @param properties 原始配置信息
  29.      * @return 网络配置信息
  30.      */
  31.     @Nonnull
  32.     Mono<P> createConfig(@Nonnull NetworkProperties properties);
  33.     ...
复制代码

  • 每一个网络组件(org.jetlinks.community.network.Network) 对应有一个组件提供器对应 (org.jetlinks.community.network.NetworkProvider)
  • 最终网络组件统一由 org.jetlinks.community.network.NetworkManager 管理;
  • 默认实现是org.jetlinks.community.network.DefaultNetworkManager(用Spring BeanPostProcessor hook 加载的)
  • 调用其org.jetlinks.community.network.DefaultNetworkManager#register方法, 传递 NetworkProvider 可以注册一个网络组件
  • 实例组件数据是存在数据库的 network_config 表
协议相关 (org.jetlinks.core.ProtocolSupport)

用于自定义消息解析规则,用于认证、将设备发送给平台报文解析为平台统一的报文,以及处理平台下发给设备的指令。
协议(org.jetlinks.core.ProtocolSupport)主要由: 认证器(Authenticator), 消息编解码器(DeviceMessageCodec),消息发送拦截器(DeviceMessageSenderInterceptor) 以及配置元数据(ConfigMetadata)组成.
org.jetlinks.core.defaults.Authenticator // Authenticator
org.jetlinks.core.codec.defaults.DeviceMessageCodec //DeviceMessageCodec
org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor //DeviceMessageSenderInterceptor
org.jetlinks.core.ProtocolSupport + org.jetlinks.core.metadata.DeviceMetadataCodec //ConfigMetadata
其默认自带的JetLinks V1.0 协议,在org.jetlinks.supports.official.JetLinksProtocolSupportProvider 提供

  • 每一个协议(org.jetlinks.core.ProtocolSupport) 对应有一个组件提供器对应 (org.jetlinks.core.spi.ProtocolSupportProvider)
  • 自定义协议, 即实现 org.jetlinks.core.spi.ProtocolSupportProvider 这个接口;
  • 统一由org.jetlinks.supports.protocol.management.ProtocolSupportManager 管理;
  • 默认实现是org.jetlinks.supports.protocol.management.ClusterProtocolSupportManager(用Spring BeanPostProcessor hook自动加载 2.0是org.jetlinks.community.protocol.configuration.ProtocolAutoConfiguration配置类加载的)
  • 调用其org.jetlinks.supports.protocol.management.ClusterProtocolSupportManager#store方法传递 ProtocolSupportDefinition 进行注册;
注意: 注册后只存起来, 顾名思义, 还跟集群的其他节点有关, 而且跟协议的实现类型(jar,js)有关, 加载也不一样; 参考 org.jetlinks.supports.protocol.management.ProtocolSupportDefinition的属性
  1. public class ProtocolSupportDefinition implements Serializable {
  2.     private static final long serialVersionUID = -1;
  3.     private String id;//ID ; 数据库存的就是本地对象的数据
  4.     private String name;
  5.     private String description;
  6.     private String provider;//jar script 协议实现类型, 目前只有 jar, js 脚本
  7.     private byte state;//协议状态
  8.     private Map<String,Object> configuration;//配置元数据, jar 的话会存ProtocolSupportProvider类全限定名, jar包路径等
  9. }
复制代码

  • 实例组件数据是存在数据库 dev_protocol 表
参考下: [自定义协议开发]
网关组件 (org.jetlinks.community.gateway.DeviceGateway)

设备上报数据的处理逻辑入口,  网关代表接入方式需要选择网络组件,  它关联协议, 配置协议
负责平台侧统一的设备接入, 使用网络组件处理对应的请求以及报文, 使用配置的协议解析为平台统一的设备消息(DeviceMessage),然后推送到事件总线。
org.jetlinks.community.gateway.supports.DeviceGateway 网关接口抽象;
设备网关支持提供商,用于提供对各种设备网关的支持.在启动设备网关时,会根据对应的提供商以及配置来创建设备网关. 实现统一管理网关配置,动态创建设备网关.
社区版, 网关的实现有三个

  • 在 mqtt-component 项目有两个
org.jetlinks.community.network.mqtt.gateway.device.MqttClientDeviceGateway
使用MQTT客户端, 连接第三方MQTT服务器; 例如 emqx
org.jetlinks.community.network.mqtt.gateway.device.MqttServerDeviceGateway
本机作为MQTT服务端, 内置启用MQTT服务器, 接受设备接入;

  • 在 tcp-component 项目有一个
org.jetlinks.community.network.tcp.device.TcpServerDeviceGateway
本机作为TCP服务器, 监听端口, 接受设备接入;
类似 桥接网络组件和协议组件(作为一个中介者)

  • 统一由org.jetlinks.community.gateway.DeviceGatewayManager管理;
  • 默认实现是 org.jetlinks.community.gateway.supports.DefaultDeviceGatewayManager(用Spring BeanPostProcessor hook自动加载; 2.0是org.jetlinks.community.gateway.GatewayConfiguration配置类加载的)
  • 调用其org.jetlinks.community.gateway.DeviceGatewayManager#start传递一个网关实例ID 启动网关; 其实是改变了一下网关实例的状态, 在有新消息时根据自身状态决定是否分发消息, 在此之前已经调了协议解析, 所有暂停/停止网关不会影响协议跟设备的交互;
    参考: org.jetlinks.community.network.mqtt.gateway.device.MqttClientDeviceGateway 这个网关实现
  • 实例组件是存在数据库 device_gateway 表
关键的对象概览

DeviceRegistry 设备注册中心

org.jetlinks.core.device.DeviceRegistry
设备注册中心, 用于统一管理设备以及产品的基本信息,缓存,进行设备指令下发等操作.
例如: 获取设备以及设备的配置缓存信息
  1. registry
  2.     .getDevice(deviceId)
  3.     .flatMap(device->device.getSelfConfig("my-config"))
  4.     .flatMap(conf-> doSomeThing(...))
复制代码
相当于总的设备管理器
根据设备ID. 通过 org.jetlinks.core.device.DeviceRegistry#getDevice  可以获取设备操作对象(DeviceOperator)
以 org.jetlinks.supports.cluster.ClusterDeviceRegistry#getDevice 为例:
  1. public Mono<DeviceOperator> getDevice(String deviceId) {
  2.         if (StringUtils.isEmpty(deviceId)) {
  3.             return Mono.empty();
  4.         } else {
  5.             //先从缓存获取
  6.             Mono<DeviceOperator> deviceOperator = (Mono)this.operatorCache.getIfPresent(deviceId);
  7.             if (null != deviceOperator) {
  8.                 return deviceOperator;
  9.             } else {
  10.                 //创建 DeviceOperator
  11.                 DeviceOperator deviceOperator = this.createOperator(deviceId);
  12.                 return deviceOperator.getSelfConfig(DeviceConfigKey.productId).doOnNext((r) -> {
  13.                     //放到缓存
  14.                     this.operatorCache.put(deviceId, Mono.just(deviceOperator).filterWhen((device) -> {
  15.                         return device.getSelfConfig(DeviceConfigKey.productId).hasElement();
  16.                     }));
  17.                 }).map((ignore) -> {
  18.                     return deviceOperator;
  19.                 });
  20.             }
  21.         }
  22.     }
  23.     private DefaultDeviceOperator createOperator(String deviceId) {
  24.         DefaultDeviceOperator device = new DefaultDeviceOperator(deviceId, this.supports, this.manager, this.handler, this, this.interceptor, this.stateChecker);
  25.         if (this.rpcChain != null) {
  26.             device.setRpcChain(this.rpcChain);
  27.         }
  28.         return device;
  29.     }
复制代码
DeviceOperator 设备操作接口

org.jetlinks.core.device.DeviceOperator
设备操作接口,通过DeviceRegister.getDevice(deviceId)获取,用于对设备进行相关操作,如获取配置,发送消息等.
//通过getConfig 可以获取配置的协议元数据
DeviceOperator#getConfig
它是如何创建的?
org.jetlinks.core.defaults.DefaultDeviceOperator为例
从注册中心拿的时候, 如果不存在, 则创建DeviceOperator
  1.   @Override
  2.     public Mono<DeviceOperator> getDevice(String deviceId) {
  3.         if (StringUtils.isEmpty(deviceId)) {
  4.             return Mono.empty();
  5.         }
  6.         {
  7.             Mono<DeviceOperator> deviceOperator = operatorCache.getIfPresent(deviceId);
  8.             if (null != deviceOperator) {
  9.                 return deviceOperator;
  10.             }
  11.         }
  12.         //创建  DeviceOperator
  13.         DeviceOperator deviceOperator = createOperator(deviceId);
  14.         return deviceOperator
  15.                 //有productId说明是存在的设备
  16.                 .getSelfConfig(DeviceConfigKey.productId)
  17.                 .doOnNext(r -> operatorCache.put(deviceId, Mono
  18.                         .just(deviceOperator)
  19.                         .filterWhen(device -> device.getSelfConfig(DeviceConfigKey.productId).hasElement())
  20.                         //设备被注销了?则移除之
  21.                         .switchIfEmpty(Mono.fromRunnable(() -> operatorCache.invalidate(deviceId)))
  22.                 ))
  23.                 .map(ignore -> deviceOperator);
  24.     }
复制代码
DeviceProductOperator 产品操作接口

DeviceProductOperator: 产品操作接口,通过DeviceProductOperator.getProduct(productId)获取.
DeviceGateway 设备接入网关接口

DeviceGateway : 设备接入网关接口,利用网络组件来接入设备消息.
DeviceMessageBusinessHandler

DeviceMessageBusinessHandler: 处理设备状态数据库同步,设备自动注册等逻辑等类.
LocalDeviceInstanceService

LocalDeviceInstanceService: 设备实例管理服务类.
DeviceSessionManager

DeviceSessionManager: 设备会话管理器,可获取当前服务的会话信息.
管理会话, 设备的上线下线;
参考: org.jetlinks.community.standalone.configuration.DefaultDeviceSessionManager
在 init
  1. Flux.interval(Duration.ofSeconds(10), Duration.ofSeconds(30), Schedulers.newSingle("device-session-checker"))
  2.     .flatMap(i -> this
  3.         .checkSession()//周期性 调用 checkSession() 检查状态
  4.         .onErrorContinue((err, val) -> log.error(err.getMessage(), err)))
  5.     .subscribe();
复制代码
DeviceDataStoragePolicy 设备数据存储策略(行式/列式/不存储)

DeviceDataStoragePolicy: 设备存储策略接口,实现此接口来进行自定义设备数据存储策略.
DeviceGatewayHelper

DeviceGatewayHelper: 统一处理设备消息,创建Session等操作的逻辑.
DecodedClientMessageHandler

DecodedClientMessageHandler: 解码后的平台消息处理器,如果是自定义实现网关或者在协议包里手动回复消息等处理, 则可以使用此接口直接将设备消息交给平台.(如果调用了DeviceGatewayHelper则不需要此操作).
EventBus 事件总线

EventBus: 事件总线,通过事件总线去订阅设备数据来实现解耦.(也可以用过@Subscribe()注解订阅).
DeviceMessageConnector

DeviceMessageConnector: 负责将设备消息转发到事件总线.
DeviceMessageSender 消息发送器

org.jetlinks.core.device.DeviceMessageSender
消息发送器,用于发送消息给设备.
DeviceMessage 消息对象

所有设备消息(即设备上报转换后, 平台可识别的消息) 派生自这个 org.jetlinks.core.message.DeviceMessage 接口
EncodedMessage 消息对象

设备端原始的消息, (下发/上报给的原始消息)
源码大体流程研究

先了解消息的组成

消息主要由 deviceId, messageId, headers, timestamp 组成.
deviceId为设备的唯一标识, messageId为消息的唯一标识,headers为消息头,通常用于对自定义消息处理的行为,如是否异步消息, 是否分片消息等.
常用的Headers org.jetlinks.core.message.Headers
async 是否异步,boolean类型.
timeout 指定超时时间. 毫秒.
frag_msg_id 分片主消息ID,为下发消息的messageId
frag_num 分片总数
frag_part 当前分片索引
frag_last 是否为最后一个分片,当无法确定分片数量的时候,可以将分片设置到足够大,最后一个分片设置:frag_last=true来完成返回.
keepOnline 与DeviceOnlineMessage配合使用,在TCP短链接,保持设备一直在线状态,连接断开不会设置设备离线.
keepOnlineTimeoutSeconds 指定在线超时时间,在短链接时,如果超过此间隔没有收到消息则认为设备离线.
ignoreStorage 不存储此消息数据,如: 读写属性回复默认也会记录到属性时序数据库中,设置为true后,将不记录.(1.9版本后支持)
ignoreLog 不记录此消息到日志,如: 设置为true,将不记录此消息的日志.
mergeLatest 是否合并最新属性数据,设置此消息头后,将会把最新的消息合并到消息体里( 需要开启最新数据存储)//jetlinks.device.storage.enable-last-data-in-db=true 是否将设备最新到数据存储到数据库
网络组件(Network) 真正与设备连接交互的网络层, 但是它只负责接收/发送报文,不负责任何处理逻辑, 所以最佳的调试地方是网关组件(DeviceGateway)入口处.
消息类型

所有消息类型参考 org.jetlinks.community.device.enums.DeviceLogType
属性相关消息

获取设备属性(ReadPropertyMessage)对应设备回复的消息 ReadPropertyMessageReply.
修改设备属性(WritePropertyMessage)对应设备回复的消息 WritePropertyMessageReply.
设备上报属性(ReportPropertyMessage) 由设备上报.
功能相关消息

调用设备功能到消息(FunctionInvokeMessage)由平台发往设备,对应到返回消息 FunctionInvokeMessageReply.
事件消息

org.jetlinks.core.message.event.EventMessage
  1. EventMessage eventMessage = new EventMessage();
  2. eventMessage.setDeviceId(deviceId);
  3. eventMessage.setMessageId(fromDevice.path(MessageConstant.MESSAGE_KEY_MESSAGE_SIGN).asText() );
  4. eventMessage.event(eventId);
  5. HashMap data = JacksonUtils.jsonToBean(output.toString(), HashMap.class);
  6. eventMessage.setData(data);
复制代码
其他消息

DeviceOnlineMessage 设备上线消息,通常用于网关代理的子设备的上线操作.
DeviceOfflineMessage 设备离线消息,通常用于网关代理的子设备的下线操作.
ChildDeviceMessage 子设备消息,通常用于网关代理的子设备的消息.
ChildDeviceMessageReply 子设备消息回复,用于平台向网关代理的子设备发送消息后设备回复给平台的结果.
UpdateTagMessage 更新设备标签.
DerivedMetadataMessage 更新设备独立物模型.
设备自注册消息

DeviceRegisterMessage 设备注册消息,通过设置消息头message.addHeader("deviceName","设备名称");和 message.addHeader("productId","产品ID")可实现设备自动注册.
如果配置了状态自管理,在检查子设备状态时,会发送指令ChildDeviceMessage, 网关需要返回ChildDeviceMessageReply.
自定义协议包将消息解析为 DeviceRegisterMessage,
并设置header:productId(必选),deviceName(必选),configuration(可选)。
平台将自动添加设备信息到设备实例中。如果是注册子设备,则解析为 ChildDeviceMessage即可
消息上报 (MQTT Broker)

设备不是直接接入平台, 而是通过第三方MQTT服务, 如:emqx. 消息编解码与MQTT服务一样,从消息协议中使用 DefaultTransport.MQTT 来获取消息编解码器.
网关处理逻辑

org.jetlinks.community.network.mqtt.gateway.device.MqttClientDeviceGateway
  1. public class MqttClientDeviceGateway extends AbstractDeviceGateway {
  2. public MqttClientDeviceGateway(String id,
  3.                                 MqttClient mqttClient,
  4.                                 DeviceRegistry registry,
  5.                                 ProtocolSupports protocolSupport,
  6.                                 String protocol,
  7.                                 DeviceSessionManager sessionManager,
  8.                                 DecodedClientMessageHandler clientMessageHandler,
  9.                                 List<String> topics,
  10.                                 int qos) {
  11.     super(id);
  12.     // mqtt的客户端
  13.     this.mqttClient = Objects.requireNonNull(mqttClient, "mqttClient");
  14.     //DeviceRegistry : 设备注册中心,  用于统一管理设备以及产品的基本信息,缓存,进行设备指令下发等操作(约等于设备统一管理器)
  15.     this.registry = Objects.requireNonNull(registry, "registry");
  16.     this.protocolSupport = Objects.requireNonNull(protocolSupport, "protocolSupport");
  17.     this.protocol = Objects.requireNonNull(protocol, "protocol");
  18.     this.topics = Objects.requireNonNull(topics, "topics");
  19.     this.helper = new DeviceGatewayHelper(registry, sessionManager, clientMessageHandler);
  20.     this.qos = qos;
  21. }
  22. private void doStart() {
  23.     if (disposable != null) {
  24.         disposable.dispose();
  25.     }
  26.     disposable = mqttClient
  27.         .subscribe(topics, qos)//关注MQTT主题, 当有新的消息时
  28.         .filter((msg) -> isStarted())//需当前网关 处于启动状态
  29.         .flatMap(mqttMessage -> {
  30.             AtomicReference<Duration> timeoutRef = new AtomicReference<>();
  31.             return this
  32.                 //注意这里是根据自定义协议 ProtocolSupportProvider::create 返回的 ProtocolSupport id 去匹配的
  33.                 .getProtocol()
  34.                 //通过 ProtocolSupport 获取其 DeviceMessageCodec
  35.                 .flatMap(codec -> codec.getMessageCodec(getTransport()))
  36.                 //使用消息编码器 解码消息
  37.                 .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(
  38.                     new UnknownDeviceMqttClientSession(getId() + ":unknown", mqttClient) {//实际给到协议编解码器的Session 总是这个..
  39.                         @Override
  40.                         public Mono<Boolean> send(EncodedMessage encodedMessage) {
  41.                             return super
  42.                                 .send(encodedMessage)
  43.                                 .doOnSuccess(r -> monitor.sentMessage());
  44.                         }
  45.                         @Override
  46.                         public void setKeepAliveTimeout(Duration timeout) {
  47.                             timeoutRef.set(timeout);
  48.                         }
  49.                     }
  50.                     , mqttMessage, registry)
  51.                 ))
  52.                 .doOnError((err) -> log.error("解码MQTT客户端消息失败 {}:{}",       //发生了错误
  53.                                                 mqttMessage.getTopic(),
  54.                                                 mqttMessage
  55.                                                     .getPayload()
  56.                                                     .toString(StandardCharsets.UTF_8),
  57.                                                 err))
  58.                 //消息向上转型
  59.                 .cast(DeviceMessage.class)
  60.                 .flatMap(message -> {
  61.                     //设备网关监控 (主要是监控消息数量等指标的)
  62.                     monitor.receivedMessage();
  63.                     return helper//设备网关消息处理,会话管理工具类,用于统一封装对设备消息和会话的处理逻辑
  64.                         .handleDeviceMessage(message,//最终主要源码见下:  org.jetlinks.community.network.utils.DeviceGatewayHelper#handleDeviceMessage
  65.                                                 device -> createDeviceSession(device, mqttClient),//<!> 注意这个会话在不存在时回调; 见下: [创建会话的回调]
  66.                                                 ignore->{},//会话自定义回调,处理会话时用来自定义会话,比如重置连接信
  67.                                                 () -> log.warn("无法从MQTT[{}]消息中获取设备信息:{}", mqttMessage.print(), message)//当设备在平台不存在时
  68.                         );
  69.                 })
  70.                 .then()
  71.                 //错误处理, 返回 empty
  72.                 .onErrorResume((err) -> {
  73.                     log.error("处理MQTT消息失败:{}", mqttMessage, err);
  74.                     return Mono.empty();
  75.                 });
  76.         }, Integer.MAX_VALUE)
  77.         .onErrorContinue((err, ms) -> log.error("处理MQTT客户端消息失败", err))
  78.         .subscribe();//Flux的API  触发计算
  79. }
  80. }
复制代码
创建会话的回调
  1. private MqttClientSession createDeviceSession(DeviceOperator device, MqttClient client) {
  2.         return new MqttClientSession(device.getDeviceId(), device, client, monitor);
  3. }
复制代码
DeviceGatewayHelper 处理

主要处理消息分支 子设备分支, 上线/离线 处理....

//如果设备状态为'离线' 则会先构造一个设备上线的消息 publish, 然后在 publish 设备原消息
org.jetlinks.community.network.utils.DeviceGatewayHelper#handleDeviceMessage
  1. public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
  2.                                                 Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,  //会话构造器,在会话不存在时,创建会话
  3.                                                 Function<DeviceSession, Mono<Void>> sessionConsumer,//会话自定义回调, 见上
  4.                                                 Supplier<Mono<DeviceOperator>> deviceNotFoundCallback//设备不存在的监听器回调
  5.                                                 ) {
  6.     String deviceId = message.getDeviceId();
  7.     if (!StringUtils.hasText(deviceId)) {
  8.         return Mono.empty();
  9.     }
  10.     Mono<DeviceOperator> then = null;
  11.     boolean doHandle = true;
  12.     // ........
  13.     // 忽略.. 子设备消息,子设备消息回复, 设备离线消息, 设备上线,  平台处理的消息流程分支
  14.     // ........
  15.    
  16.     if (then == null) {
  17.         then = registry.getDevice(deviceId);
  18.     }
  19.     if (doHandle) {
  20.         //<!> 真正主要处理消息部分!  从 设备注册中心 拿到 DeviceOperator 然后在 org.jetlinks.community.device.message.DeviceMessageConnector#handleMessage 处理
  21.         //<!> 见下 [发布消息总线]
  22.         then = then.flatMap(opt -> messageHandler.handleMessage(opt, message).thenReturn(opt));
  23.     }
  24.     return this
  25.         // 创建或者更新 Session  
  26.         //<!> 见下[会话的创建]
  27.         .createOrUpdateSession(deviceId, message, sessionBuilder, deviceNotFoundCallback)
  28.         .flatMap(sessionConsumer)
  29.         .then(then)//不在意流元素, 在流结束后返回 'then' (即: Mono<DeviceOperator> then 上面定义的)
  30.         .contextWrite(Context.of(DeviceMessage.class, message));//往 context 中写入 DeviceMessage; key是DeviceMessage.class; (Context 类似Map其内部用于传递数据, 注意contextWrite读取值时, 下游优先;从下往上)
  31. }
复制代码
会话的创建逻辑

org.jetlinks.community.network.utils.DeviceGatewayHelper#createOrUpdateSession
  1. private Mono<DeviceSession> createOrUpdateSession(String deviceId,
  2.                                                       DeviceMessage message,
  3.                                                       Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
  4.                                                       Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
  5.         return sessionManager
  6.             .getSession(deviceId, false)
  7.             .filterWhen(DeviceSession::isAliveAsync)
  8.             .map(old -> {
  9.                 //需要更新会话时才进行更新
  10.                 if (needUpdateSession(old, message)) {
  11.                     return sessionManager
  12.                         .compute(deviceId, null, session -> updateSession(session, message, sessionBuilder));
  13.                 }
  14.                 applySessionKeepaliveTimeout(message, old);
  15.                 old.keepAlive();
  16.                 return Mono.just(old);
  17.             })
  18.             //会话不存在 则尝试创建或者更新
  19.             .defaultIfEmpty(Mono.defer(() -> sessionManager //<!> 注意 注意 注意 这个会话管理 `sessionManager.compute` 见下 [设备会话注册 (上线事件)]
  20.                 .compute(deviceId,
  21.                          createNewSession(//<!> 见下 <<<<<<<创建会话>>>>>>>
  22.                              deviceId,
  23.                              message,
  24.                              sessionBuilder,
  25.                              () -> {// deviceNotFoundCallback 回调
  26.                                  //设备注册
  27.                                  if (isDoRegister(message)) {
  28.                                      return messageHandler
  29.                                          .handleMessage(null, message)
  30.                                          //延迟2秒后尝试重新获取设备并上线
  31.                                          .then(Mono.delay(Duration.ofSeconds(2)))
  32.                                          .then(registry.getDevice(deviceId));
  33.                                  }
  34.                                  if (deviceNotFoundCallback != null) {
  35.                                      return deviceNotFoundCallback.get();
  36.                                  }
  37.                                  return Mono.empty();
  38.                              }),
  39.                         //compute 的第二个参数
  40.                          session -> updateSession(session, message, sessionBuilder))))
  41.             .flatMap(Function.identity());
  42.     }
  43.     //org.jetlinks.community.network.utils.DeviceGatewayHelper#createNewSession
  44.     //<<<<<<<创建会话>>>>>>>
  45.     private Mono<DeviceSession> createNewSession(String deviceId,
  46.                                                  DeviceMessage message,
  47.                                                  Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
  48.                                                  Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
  49.         return registry
  50.             .getDevice(deviceId)//从注册中心 拿 DeviceOperator
  51.             .switchIfEmpty(Mono.defer(deviceNotFoundCallback))
  52.             .flatMap(device -> sessionBuilder //给设备 回调 SessionBuilder 创建会话  见上: [创建会话的回调]
  53.                 .apply(device)
  54.                 .map(newSession -> {
  55.                     //保持在线,在低功率设备上,可能无法保持长连接,通过keepOnline的header来标识让设备保持在线
  56.                     if (message.getHeader(Headers.keepOnline).orElse(false)) {
  57.                         int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
  58.                         newSession = new KeepOnlineSession(newSession, Duration.ofSeconds(timeout));
  59.                     }
  60.                     return newSession;
  61.                 }));
  62.     }
复制代码
设备会话注册 (上线事件)

当设备离线时, 设备上报消息, 会触发上线事件; 这个不在消息处理分支里面; 而在创建 Session 过程中.
org.jetlinks.supports.device.session.AbstractDeviceSessionManager#compute(java.lang.String, reactor.core.publisher.Mono, java.util.function.Function)
  1. public Mono<DeviceSession> compute(@Nonnull String deviceId,
  2.                                        Mono<DeviceSession> creator,
  3.                                        Function<DeviceSession, Mono<DeviceSession>> updater) {
  4.         Mono<DeviceSession> ref = localSessions
  5.                 .compute(deviceId, (_id, old) -> {
  6.                     Mono<DeviceSession> operator;
  7.                     if (old == null) {
  8.                         if (creator == null) {
  9.                             return null;
  10.                         }
  11.                         //创建新会话
  12.                         operator = creator
  13.                                 .flatMap(this::doRegister)//创建完成, 注册会话 见下
  14.                                 .doOnNext(this::replaceSession);
  15.                     } else {
  16.     ... 省略 ...
复制代码
org.jetlinks.supports.device.session.AbstractDeviceSessionManager#doRegister
  1.   private Mono<DeviceSession> doRegister(DeviceSession session) {
  2.         if (session.getOperator() == null) {
  3.             return Mono.empty();
  4.         }
  5.         return this
  6.                 .remoteSessionIsAlive(session.getDeviceId())
  7.                 .flatMap(alive -> session
  8.                         .getOperator()//调 DeviceOperator 的 online
  9.                         .online(getCurrentServerId(), session.getId(), session
  10.                                 .getClientAddress()
  11.                                 .map(InetSocketAddress::toString)
  12.                                 .orElse(null))
  13.                         .then(fireEvent(DeviceSessionEvent.of(DeviceSessionEvent.Type.register, session, alive))))//触发事件 见下
  14.                 .thenReturn(session);
  15.     }
复制代码
  1. protected Mono<Void> fireEvent(DeviceSessionEvent event) {
  2.         if (sessionEventHandlers.isEmpty()) {
  3.             return Mono.empty();
  4.         }
  5.         return Flux
  6.                 .fromIterable(sessionEventHandlers)
  7.                 .flatMap(handler -> Mono
  8.                         .defer(() -> handler.apply(event))
  9.                         .onErrorResume(err -> {
  10.                             log.error("fire session event error {}", event, err);
  11.                             return Mono.empty();
  12.                         }))
  13.                 .then();
  14.     }
复制代码
典型的观察者模式,  那么是谁订阅了, 在哪里处理这个事件?
上线/离线 事件总线发布

org.jetlinks.community.device.message.DeviceMessageConnector#DeviceMessageConnector
  1.   public DeviceMessageConnector(EventBus eventBus,
  2.                                   DeviceRegistry registry,
  3.                                   MessageHandler messageHandler,
  4.                                   DeviceSessionManager sessionManager) {
  5.         this.registry = registry;
  6.         this.eventBus = eventBus;
  7.         this.messageHandler = messageHandler;
  8.         sessionManager.listenEvent(event->{// sessionManager 即是 AbstractDeviceSessionManager 的派生类; 订阅上线/离线事件
  9.             if(event.isClusterExists()){
  10.                 return Mono.empty();
  11.             }
  12.             //从会话管理器里监听会话注册,转发为设备离线消息
  13.             if(event.getType()== DeviceSessionEvent.Type.unregister){
  14.                 return this.handleSessionUnregister(event.getSession());
  15.             }
  16.             //从会话管理器里监听会话注销,转发为设备上线消息
  17.             if(event.getType()== DeviceSessionEvent.Type.register){
  18.                 return this.handleSessionRegister(event.getSession());//见下
  19.             }
  20.             return Mono.empty();
  21.         });
  22.     }
复制代码
  1. protected Mono<Void> handleSessionRegister(DeviceSession session) {
  2.         DeviceOnlineMessage message = new DeviceOnlineMessage();
  3.         message.addHeader("from", "session-register");
  4.         //添加客户端地址信息
  5.         message.addHeader("address", session.getClientAddress().map(InetSocketAddress::toString).orElse(""));
  6.         message.setDeviceId(session.getDeviceId());
  7.         message.setTimestamp(System.currentTimeMillis());
  8.         // 最终殊途同归 调到 org.jetlinks.community.device.message.DeviceMessageConnector#onMessage 发布事件总线
  9.         return this
  10.             .onMessage(message)
  11.             .onErrorResume(doOnError);
  12.     }
复制代码
同步设备状态到数据库

在 org.jetlinks.community.device.service.DeviceMessageBusinessHandler#init 方法
使用代码订阅消息事件总线, 设备上下线消息...
  1. @PostConstruct
  2. public void init() {
  3.         Subscription subscription = Subscription
  4.             .builder()
  5.             .subscriberId("device-state-synchronizer")
  6.             .topics("/device/*/*/online", "/device/*/*/offline")
  7.             .justLocal()//只订阅本地
  8.             .build();
  9.         //订阅设备上下线消息,同步数据库中的设备状态,
  10.         //最小间隔800毫秒,最大缓冲数量500,最长间隔2秒.
  11.         //如果2条消息间隔大于0.8秒则不缓冲直接更新
  12.         //否则缓冲,数量超过500后批量更新
  13.         //无论缓冲区是否超过500条,都每2秒更新一次.
  14.         FluxUtils.bufferRate(eventBus
  15.                                  .subscribe(subscription, DeviceMessage.class)
  16.                                  .map(DeviceMessage::getDeviceId),
  17.                              800, Integer.getInteger("device.state.sync.batch", 500), Duration.ofSeconds(2))
  18.                  .onBackpressureBuffer(64,
  19.                                        list -> log.warn("无法处理更多设备状态同步!"),
  20.                                        BufferOverflowStrategy.DROP_OLDEST)
  21.                  .publishOn(Schedulers.boundedElastic(), 64)
  22.                  .concatMap(list -> deviceService.syncStateBatch(Flux.just(list), false).map(List::size))
  23.                  .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
  24.                  .subscribe((i) -> log.info("同步设备状态成功:{}", i));
  25.     }
复制代码
发布消息总线

org.jetlinks.community.device.message.DeviceMessageConnector#handleMessage
org.jetlinks.community.device.message.DeviceMessageConnector#onMessage
  1. public Mono<Void> onMessage(Message message) {
  2.     if (null == message) {
  3.         return Mono.empty();
  4.     }
  5.     message.addHeader(PropertyConstants.uid, IDGenerator.SNOW_FLAKE_STRING.generate());
  6.     return this
  7.         //处理子设备消息 和 <>盲猜提取事件总线的匹配路径 @Subscribe("/device/*/*/online")
  8.         .getTopic(message)
  9.         //事件总线发布, //基于订阅发布的事件总线,可用于事件传递,消息转发等.
  10.         //见下[消息总线发布过程]
  11.         .flatMap(topic -> eventBus.publish(topic, message).then())
  12.         .onErrorResume(doOnError)//发生错误 返回一个  Mono.empty();
  13.         .then();
  14. }
复制代码
消息总线发布过程

org.jetlinks.core.event.EventBus#publish(java.lang.String, org.jetlinks.core.Payload)
org.jetlinks.supports.event.BrokerEventBus#publish(java.lang.String, org.jetlinks.core.codec.Encoder, T)
org.jetlinks.supports.event.BrokerEventBus#publish(java.lang.String, org.jetlinks.core.codec.Encoder, T, reactor.core.scheduler.Scheduler)
  1.    @Override
  2.     public <T> Mono<Long> publish(String topic, Encoder<T> encoder, T payload, Scheduler scheduler) {
  3.         return TraceHolder
  4.                 //写入跟踪信息到header中
  5.                 .writeContextTo(TopicPayload.of(topic, Payload.of(payload, encoder)), TopicPayload::addHeader)
  6.                 .map(pld -> {
  7.                     long subs = this
  8.                             //见下[过滤/处理共享订阅]
  9.                             .doPublish(pld.getTopic(),
  10.                                        sub -> !sub.isLocal() || sub.hasFeature(Subscription.Feature.local),
  11.                                        //见下: 生成推送
  12.                                        sub -> doPublish(pld.getTopic(), sub, pld)
  13.                             );
  14.                     if (log.isTraceEnabled()) {
  15.                         log.trace("topic [{}] has {} subscriber", pld.getTopic(), subs);
  16.                     }
  17.                     ReferenceCountUtil.safeRelease(pld);
  18.                     return subs;
  19.                 });
  20.     }
复制代码
过滤/处理共享订阅
org.jetlinks.supports.event.BrokerEventBus#doPublish(java.lang.String, java.util.function.Predicate, java.util.function.Consumer)
  1. private long doPublish(String topic,
  2.                         Predicate<SubscriptionInfo> predicate,
  3.                         Consumer<SubscriptionInfo> subscriberConsumer) {
  4.     //共享订阅, (有多个订阅者)只有一个订阅者能收到
  5.     Map<String, List<SubscriptionInfo>> sharedMap = new HashMap<>();
  6.     //去重
  7.     Set<Object> distinct = new HashSet<>(64);
  8.     //从订阅表中查找topic
  9.     root.findTopic(topic, subs -> {
  10.         for (SubscriptionInfo sub : subs.getSubscribers()) {
  11.             //broker已经失效则不推送
  12.             if (sub.isBroker() && !sub.getEventConnection().isAlive()) {
  13.                 sub.dispose();
  14.                 continue;
  15.             }
  16.             if (!predicate.test(sub) || !distinct.add(sub.sink)) {
  17.                 continue;
  18.             }
  19.             //共享订阅时,添加到缓存,最后再处理
  20.             if (sub.hasFeature(Subscription.Feature.shared)) {
  21.                 sharedMap
  22.                         .computeIfAbsent(sub.subscriber, ignore -> new ArrayList<>(8))
  23.                         .add(sub);
  24.                 continue;
  25.             }
  26.             subscriberConsumer.accept(sub);
  27.         }
  28.     }, () -> {
  29.         //处理共享订阅
  30.         for (List<SubscriptionInfo> value : sharedMap.values()) {
  31.             subscriberConsumer.accept(value.get(ThreadLocalRandom.current().nextInt(0, value.size())));
  32.         }
  33.     });
  34.     return distinct.size();
  35. }
复制代码
生成推送
  1. private boolean doPublish(String topic, SubscriptionInfo info, TopicPayload payload) {
  2.         try {
  3.             //已经取消订阅则不推送
  4.             if (info.sink.isCancelled()) {
  5.                 return false;
  6.             }
  7.             payload.retain();
  8.             info.sink.next(payload);
  9.             if (log.isDebugEnabled()) {
  10.                 log.debug("publish [{}] to [{}] complete", topic, info);
  11.             }
  12.             return true;
  13.         } catch (Throwable error) {
  14.             log.error("publish [{}] to [{}] event error", topic, info, error);
  15.             ReferenceCountUtil.safeRelease(payload);
  16.         }
  17.         return false;
  18.     }
复制代码
关于消息总线

关于事件总线和消息总线
@Subscribe("/device/**")
采用树结构来定义topic如:/device/id/message/type . topic支持路径通配符,如:/device/** 或者/device//message/.
设备消息主题

也可参考 org.jetlinks.pro.utils.TopicUtils
消息的topic 的前缀均为: /device/{productId}/{deviceId}.
如:产品product-1下的设备device-1上线消息: /device/product-1/device-1/online.
可通过通配符订阅所有设备的指定消息,如:/device///online,或者订阅所有消息:/device/**.
topic类型说明/onlineDeviceOnlineMessage设备上线/offlineDeviceOfflineMessage设备离线/message/event/DeviceEventMessage设备事件/message/property/reportReportPropertyMessage设备上报属性/message/send/property/readReadPropertyMessage平台下发读取消息指令/message/send/property/writeWritePropertyMessage平台下发修改消息指令/message/property/read/replyReadPropertyMessageReply读取属性回复/message/property/write/replyWritePropertyMessageReply修改属性回复/message/send/functionFunctionInvokeMessage平台下发功能调用/message/function/replyFunctionInvokeMessageReply调用功能回复/registerDeviceRegisterMessage设备注册,通常与子设备消息配合使用/unregisterDeviceUnRegisterMessage设备注销,同上/message/children/{childrenDeviceId}/ChildDeviceMessage子设备消息,{topic}为子设备消息对应的topic/message/children/reply/{childrenDeviceId}/ChildDeviceMessage子设备回复消息,同上/message/directDirectDeviceMessage透传消息/message/tags/updateUpdateTagMessage更新标签消息 since 1.5/firmware/pullRequestFirmwareMessage拉取固件请求 (设备->平台)/firmware/pull/replyRequestFirmwareMessageReply拉取固件请求回复 (平台->设备)/firmware/reportReportFirmwareMessage上报固件信息/firmware/progressUpgradeFirmwareProgressMessage上报更新固件进度/firmware/pushUpgradeFirmwareMessage推送固件更新/firmware/push/replyUpgradeFirmwareMessageReply固件更新回复/message/logDeviceLogMessage设备日志/message/tags/updateUpdateTagMessage更新标签/metadata/derivedDerivedMetadataMessage更新物模型消息入库 订阅

org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConnector
  1. /**
  2. * 订阅设备消息 入库
  3. * @param message 设备消息
  4. * @return void
  5. */
  6. @Subscribe(topics = "/device/**", id = "device-message-ts-writer")
  7. public Mono<Void> writeDeviceMessageToTs(DeviceMessage message) {
  8.     return dataService.saveDeviceMessage(message);
  9. }
复制代码
org.jetlinks.community.device.service.data.DefaultDeviceDataService#saveDeviceMessage(org.jetlinks.core.message.DeviceMessage)
策略模式; 目前有两类, 行式存储&列式存储
  1. /**
  2. * 保存单个设备消息,为了提升性能,存储策略会对保存请求进行缓冲,达到一定条件后
  3. * 再进行批量写出,具体由不同对存储策略实现。
  4. * <p>
  5. * 如果保存失败,在这里不会得到错误信息.
  6. * @param message 设备消息
  7. * @return void
  8. */
  9. @Nonnull
  10. @Override
  11. public Mono<Void> saveDeviceMessage(@Nonnull DeviceMessage message) {
  12.     return this
  13.         .getDeviceStrategy(message.getDeviceId())
  14.         .flatMap(strategy -> strategy.saveDeviceMessage(message));
  15. }
复制代码
org.jetlinks.community.device.service.data.AbstractDeviceDataStoragePolicy#saveDeviceMessage(org.jetlinks.core.message.DeviceMessage)
org.jetlinks.community.elastic.search.timeseries.ElasticSearchTimeSeriesService#commit(org.jetlinks.community.timeseries.TimeSeriesData)
  1. //最终最终在 ElasticSearchTimeSeriesService 这里提交
  2. @Override
  3. public Mono<Void> commit(TimeSeriesData data) {
  4.     Map<String, Object> mapData = data.getData();
  5.     mapData.put("timestamp", data.getTimestamp());
  6.     return elasticSearchService.commit(index[0], mapData);
  7. }
复制代码
2.png

平台消息下发

HTTP 接口

主要有三种类型消息
//发送设置属性指令到设备
org.jetlinks.community.device.web.DeviceInstanceController::writeProperties
//发送调用设备功能指令到设备
org.jetlinks.community.device.web.DeviceInstanceController::invokedFunction
//发送指令到设备
org.jetlinks.community.device.web.DeviceInstanceController::sendMessage
参数验证和物模型

org.jetlinks.community.device.service.LocalDeviceInstanceService#invokeFunction
  1. @SneakyThrows
  2. public Flux<?> invokeFunction(String deviceId,
  3.                                 String functionId,
  4.                                 Map<String, Object> properties) {
  5.     return registry
  6.         .getDevice(deviceId)//通过 设备注册中心 找 DeviceOperator
  7.         .switchIfEmpty(ErrorUtils.notFound("设备不存在"))
  8.         .flatMap(operator -> operator
  9.             .messageSender()//拿到 消息发送器, 用于发送消息给设备.
  10.             .invokeFunction(functionId)//new 了一个消息对象; new DefaultFunctionInvokeMessageSender(operator, function);
  11.             .messageId(IDGenerator.SNOW_FLAKE_STRING.generate())//生成唯一消息id
  12.             .setParameter(properties)//设置入参 org.jetlinks.core.defaults.DefaultFunctionInvokeMessageSender#setParameter
  13.             .validate()//验证入参 org.jetlinks.core.defaults.DefaultFunctionInvokeMessageSender#validate
  14.         )
  15.         .flatMapMany(FunctionInvokeMessageSender::send)//调用发送  org.jetlinks.core.defaults.DefaultFunctionInvokeMessageSender#send 见下:
  16.         .flatMap(mapReply(FunctionInvokeMessageReply::getOutput));//处理回复
  17. }
复制代码
org.jetlinks.core.defaults.DefaultFunctionInvokeMessageSender#send
org.jetlinks.core.defaults.DefaultFunctionInvokeMessageSender#doSend
  1. private Flux<FunctionInvokeMessageReply> doSend() {
  2.     //拿到(消息发送器)发送 org.jetlinks.core.defaults.DefaultDeviceMessageSender 见下
  3.     return this.operator.messageSender().send(Mono.just(this.message));
  4. }
复制代码
核心逻辑


org.jetlinks.core.defaults.DefaultDeviceMessageSender#send(org.reactivestreams.Publisher)
org.jetlinks.core.defaults.DefaultDeviceMessageSender#send(org.reactivestreams.Publisher

相关推荐

您需要登录后才可以回帖 登录 | 立即注册