找回密码
 立即注册
首页 业界区 业界 凯亚物联网平台如何通过MQTT网络组件接入设备 ...

凯亚物联网平台如何通过MQTT网络组件接入设备

闾丘婉奕 2025-6-2 22:09:50
一、概述

    有人提议我用kestrel代替Dotnetty ,那是不可能的, 物联网平台MQTT,rtmp,rtsp,httpflv,tcp,udp,rpc 都是基于dotnetty实现,压测没有问题,每秒可以达到20w/s,当中因为SingleThreadEventExecutor的问题 导致每天内存会增加的问题,我会通过源码修复,修复完成我会开通MQTT,http,tcp,udp,coap端口提供给大家测试。
 
      凯亚 (Kayak) 是什么?
       凯亚(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
     凯亚物联网平台:http://117.72.121.2:3100(用户名:fanly  密码:123456)
    链路跟踪Skywalking V8:http://117.72.121.2:8080/
      surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)
二、网络组件

1.编辑创建MQTT协议的网络组件,可以选择共享配置和独立配置(独立配置是集群模式)
1.png

 
三、自定义协议


  • 如何创建自定义协议模块
如果是网络编程开发,必然会涉及到协议报文的编码解码处理,那么对于平台也是做到了灵活处理,首先是协议模块创建,通过以下代码看出协议模块可以添加协议说明md文档, 身份鉴权处理,HTTP路由,消息编解码,元数据配置。下面一一介绍如何进行编写
2.gif
  1.   public class Demo5ProtocolSupportProvider : ProtocolSupportProvider
  2.     {<br>             private readonly DefaultConfigMetadata _mqttConfig = new DefaultConfigMetadata(<br>        "Mqtt认证配置"<br>        , "secureId以及secureKey在创建设备产品或设备实例时进行配置.\r\n    timestamp为当前时间戳(毫秒), 与服务器时间不能相差5分钟.\r\n        md5为32位, 不区分大小写")<br>        .Add("secureId", "secureId", "用户唯一标识编号", StringType.Instance)<br>        .Add("secureKey", "secureKey", "密钥", StringType.Instance);<br>
  3.         public override IObservable<ProtocolSupport> Create(ProtocolContext context)
  4.         {<br>            var support = new ComplexProtocolSupport();<br>              support.Id = "demo5";<br>              support.Name = "演示协议5";<br>              support.Description = "演示协议5";
  5.                support.AddDocument(MessageTransport.Mqtt, "Document/document-mqtt.md");
  6.            support.AddAuthenticator(MessageTransport.Mqtt, new DefaultAuthenticator());<br> support.AddRoutes(MessageTransport.Mqtt, new List<TopicMessageCodec>() {<br>    TopicMessageCodec.DeviceOnline,<br>     TopicMessageCodec.ReportProperty,<br>     TopicMessageCodec.WriteProperty,<br>      TopicMessageCodec.ReadProperty,<br>        TopicMessageCodec.Event<br> }.Select(p => MqttDescriptor.Instance(p.Pattern)<br>     .GroupName(p.Route.GroupName())<br>     .Path(p.Pattern)<br>     .ContentType(MediaType.ToString(MediaType.ApplicationJson))<br>     .Description(p.Route.Description())<br>     .Example(p.Route.Example())<br>).ToList());
  7.            support.AddMessageCodecSupport(MessageTransport.Mqtt, () => Observable.Return(new ScriptDeviceMessageCodec(support.Script)));<br>        support.AddConfigMetadata(MessageTransport.Mqtt, _mqttConfig);
  8.        return Observable.Return(support);
  9.                
  10.         }
  11.      
  12.      }               
复制代码
3.gif
1. 添加协议说明文档如代码: support.AddDocument(MessageTransport.Http, "Document/document-http.md");,文档仅支持 markdown文件,如下所示
 
4.gif
  1. ### 认证说明<br><br>CONNECT报文:<br>```text<br>clientId: 设备ID<br>username: secureId+"&"+timestamp<br>password: md5(secureId+"&"+timestamp+"&"+secureKey)<br> ```<br><br>说明: secureId以及secureKey在创建设备产品或设备实例时进行配置.<br>    timestamp为当前时间戳(毫秒), 与服务器时间不能相差5分钟.<br>        md5为32位, 不区分大小写.
复制代码
5.gif
 
2. 添加身份鉴权如代码:  support.AddAuthenticator(MessageTransport.Http, new Demo5Authenticator()) ,自定义身份鉴权Demo5Authenticator 代码如下:

 
6.gif
  1.     public class DefaultAuthenticator : IAuthenticator<br>    {<br>        public IObservable Authenticate(IAuthenticationRequest request, IDeviceOperator deviceOperation)<br>        {<br>            var result = Observable.Return(default);<br>            if (request is DefaultAuthRequest)<br>            {<br>                var authRequest = request as DefaultAuthRequest;  <br>                var username = authRequest.UserName; <br>                var password = authRequest.Password;<br>                String[] arr = username.Split("&");<br>                if (arr.Length <= 1)<br>                {<br>                    return Observable.Return(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "用户名格式错误"));<br>                }<br>                var requestSecureId = arr[0];<br>                long.TryParse(arr[1], out long time);<br>                if (Math.Abs(Utility.CurrentTimeMillis() - time) > TimeSpan.FromMinutes(10).TotalMilliseconds)<br>                {<br>                    return Observable.Return(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "时间不一致"));<br>                }<br>                var configs = deviceOperation.GetConfigs("secureId", "secureKey").Subscribe(p =><br>                {<br>                    try<br>                    {<br>                        var secureId = p.GetValue("secureId").Convert<string>();<br>                        var secureKey = p.GetValue("secureKey").Convert<string>();<br>                        var encryptStr = $"{username}&{secureKey}".GetMd5Hash();<br>                        if (requestSecureId.Equals(secureId) && encryptStr.Equals(password))<br>                        {<br>                            result= result.Publish(AuthenticationResult.Success(deviceOperation.GetDeviceId()));<br>                        }<br>                        else<br>                        {<br>                            result= result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "验证失败,密钥错误"));<br><br>                        }<br>                    }<br>                    catch (Exception ex)<br>                    {<br>                        result = result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "请求参数格式错误"));<br>                    }<br>                });<br>            }<br>            else<br>            result = Observable.Return(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "不支持请求参数类型"));<br>            return result;<br>        }<br><br>        public IObservable Authenticate(IAuthenticationRequest request, IDeviceRegistry registry)<br>        {<br>            var result = Observable.Return(default);<br>            var authRequest = request as DefaultAuthRequest;<br>            registry<br>              .GetDevice(authRequest.DeviceId)<br>              .Subscribe( p =>Authenticate(request, p).Subscribe(authResult => result = result.Publish(authResult)));<br>            return result;<br>        }<br>    }
复制代码
7.gif
 
3. 添加Http路由代码support.AddRoutes,那么如何配置呢,代码如下:
 
  1.     public static BasicMessageCodec ReportProperty =>
  2. new BasicMessageCodec("/*/properties/report", typeof(ReadPropertyMessage), route => route.GroupName("属性上报")
  3.                      .Description("上报物模型属性数据")
  4.                      .Example("{"properties":{"属性ID":"属性值"}}"));
复制代码
 

4.添加元数据配置代码 support.AddConfigMetadata(MessageTransport.Http, _httpConfig);  _httpConfig代码如下
  1.         private readonly DefaultConfigMetadata _mqttConfig = new DefaultConfigMetadata(
  2. "Mqtt认证配置"<br>, "secureId以及secureKey在创建设备产品或设备实例时进行配置.\r\n    timestamp为当前时间戳(毫秒), 与服务器时间不能相差5分钟.\r\n        md5为32位, 不区分大小写")<br>.Add("secureId", "secureId", "用户唯一标识编号", StringType.Instance)<br>.Add("secureKey", "secureKey", "密钥", StringType.Instance);<br>
复制代码

  • 如何加载协议模块,协议模块包含了协议模块支持添加引用加载和上传热部署加载。                           
   引用加载模块
8.png

 
 上传热部署协议模块
9.png

 四、设备网关

创建设备网关

 五、产品管理

以下是添加产品。
11.png

  设备接入
12.png

 
 六、设备管理

添加设备
13.png

 mqtt 认证配置
14.png

 
创建告警阈值
15.png

 七、测试

利用工具进行连接MQTT
16.png

 然后可以在平台看到设备日志,看下连接失败成功情况
17.png

 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册