找回密码
 立即注册
首页 业界区 业界 surging 集成SuperSocket预发布版本2.0

surging 集成SuperSocket预发布版本2.0

祝安芙 2025-6-2 21:47:00
一、概述

周末在家试着扩展SuperSocket,因为之前都是只支持.net framework, 后面出现支持.NET CORE 的SuperSocket 2.0 ,然后集成进来和dotnetty 做下对比,dotnetty 有多强,我压测可以支持20w/s, 然后客户提供的服务器,通过外网压测网关,把上行速度50MB带宽的网络跑满了,引擎主机CPU只是在15%左右,完全没有跑满。然后再试试国人开发的SuperSocket看下性能怎么样。
1.png

 
2.png

 
3.png

4.png

木舟 (Kayak) 是什么?
       木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
     凯亚物联网平台:http://117.72.121.2:3100(用户名:fanly  密码:123456)(木舟物联网有人取了,准备改名原神凯亚,凡是交托于他的任务,总能得到解决)
    链路跟踪Skywalking V8:http://117.72.121.2:8080/
      surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)
 
二、集成SuperSocket

作为去中心化的微服务引擎,相关的引擎组件,中间件都可以替换,就比如核心的RPC组件dotnetty 都可以替换成其它组件,下面介绍如何进行替换
创建服务端消息监听SuperSocketServerMessageListener,需要继承IMessageListener,代码如下:
  1.   public class SuperSocketServerMessageListener : IMessageListener, IDisposable
  2.   {
  3.       public event ReceivedDelegate Received;
  4.       private readonly ILogger<SuperSocketServerMessageListener> _logger;
  5.       private readonly ITransportMessageDecoder _transportMessageDecoder;
  6.       private readonly ITransportMessageEncoder _transportMessageEncoder;
  7.       private readonly IServiceEngineLifetime _serviceEngineLifetime;
  8.       public SuperSocketServerMessageListener(ILogger<SuperSocketServerMessageListener> logger, ITransportMessageCodecFactory codecFactory, IServiceEngineLifetime serviceEngineLifetime)
  9.       {
  10.           _logger = logger;
  11.           _transportMessageEncoder = codecFactory.GetEncoder();
  12.           _transportMessageDecoder = codecFactory.GetDecoder();
  13.           _serviceEngineLifetime = serviceEngineLifetime;
  14.       }
  15.       public async Task StartAsync(EndPoint endPoint)
  16.       {
  17.           _serviceEngineLifetime.ServiceEngineStarted.Register(async () =>
  18.           {
  19.               try
  20.               {
  21.                   var ipEndPoint = endPoint as IPEndPoint;
  22.                   var host = SuperSocketHostBuilder.Create<TransportMessage, TransportMessagePipelineFilter>()
  23.               
  24.                   .UsePackageHandler( (s, p) =>
  25.                   {
  26.                       Task.Run(async () =>
  27.                       {
  28.                           var sender = new SuperSocketServerMessageSender(_transportMessageEncoder, s);
  29.                           await OnReceived(sender, p);
  30.                       });
  31.                       return ValueTask.CompletedTask;
  32.                   })
  33.                   .ConfigureSuperSocket(options =>
  34.                   {
  35.                       options.Name = "Echo Server";
  36.                       options.Logger = _logger;
  37.                       options.AddListener(new ListenOptions
  38.                       {
  39.                           Ip = ipEndPoint.Address.ToString(),
  40.                           Port = ipEndPoint.Port,
  41.                         
  42.                       }
  43.                       );
  44.                   })
  45.                   .ConfigureLogging((hostCtx, loggingBuilder) =>
  46.                   {
  47.                       loggingBuilder.AddConsole();
  48.                   })
  49.                   .Build();
  50.                   await host.RunAsync();
  51.               }
  52.               catch (Exception ex)
  53.               {
  54.                   _logger.LogError($"SuperSocket服务主机启动失败,监听地址:{endPoint}。 ");
  55.               }
  56.           });
  57.       }
  58.       
  59.       public async Task OnReceived(IMessageSender sender, TransportMessage message)
  60.       {
  61.           if (Received == null)
  62.               return;
  63.           await Received(sender, message);
  64.       }
  65.       public void Dispose()
  66.       {
  67.       }
  68.   }
复制代码
创建客户端消息监听SuperSocketTransportClientFactory,需要继承ITransportClientFactory,代码如下:
 
  1. internal class SuperSocketTransportClientFactory : ITransportClientFactory, IDisposable
  2. {
  3.      private readonly ITransportMessageEncoder _transportMessageEncoder;
  4.      private readonly ITransportMessageDecoder _transportMessageDecoder;
  5.      private readonly ILogger<SuperSocketTransportClientFactory> _logger;
  6.      private readonly IServiceExecutor _serviceExecutor;
  7.      private readonly IHealthCheckService _healthCheckService;
  8.      private readonly ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>> _clients = new ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>>();
  9.      public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger<SuperSocketTransportClientFactory> logger)
  10. : this(codecFactory, healthCheckService, logger, null)
  11.      {
  12.      }
  13.      public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger<SuperSocketTransportClientFactory> logger, IServiceExecutor serviceExecutor)
  14.      {
  15.          _transportMessageEncoder = codecFactory.GetEncoder();
  16.          _transportMessageDecoder = codecFactory.GetDecoder();
  17.          _logger = logger;
  18.          _serviceExecutor = serviceExecutor;
  19.          _healthCheckService = healthCheckService;
  20.      }
  21.      public async Task<ITransportClient> CreateClientAsync(EndPoint endPoint)
  22.      {
  23.          var key = endPoint;
  24.          if (_logger.IsEnabled(LogLevel.Debug))
  25.              _logger.LogDebug($"准备为服务端地址:{key}创建客户端。");
  26.          try
  27.          {
  28.              return await _clients.GetOrAdd(key
  29.           , k => new Lazy<Task<ITransportClient>>(async () =>
  30.           {
  31.               //客户端对象
  32.               var client = new EasyClient<TransportMessage>(new TransportMessagePipelineFilter()).AsClient();
  33.               var messageListener = new MessageListener();
  34.               var messageSender = new SuperSocketMessageClientSender(_transportMessageEncoder, client);
  35.               await client.ConnectAsync(endPoint);
  36.               client.PackageHandler += async (sender, package) =>
  37.               {
  38.                   await messageListener.OnReceived(messageSender, package);
  39.               };
  40.               client.StartReceive();
  41.               //创建客户端
  42.               var transportClient = new TransportClient(messageSender, messageListener, _logger, _serviceExecutor);
  43.               return transportClient;
  44.           }
  45.               )).Value;//返回实例
  46.          }
  47.          catch
  48.          {
  49.              //移除
  50.              _clients.TryRemove(key, out var value);
  51.              var ipEndPoint = endPoint as IPEndPoint;
  52.              //标记这个地址是失败的请求
  53.              if (ipEndPoint != null)
  54.                  await _healthCheckService.MarkFailure(new IpAddressModel(ipEndPoint.Address.ToString(), ipEndPoint.Port));
  55.              throw;
  56.          }
  57.      }
  58.      public void Dispose()
  59.      {
  60.          foreach (var client in _clients.Values)
  61.          {
  62.              (client as IDisposable)?.Dispose();
  63.          }
  64.      }
  65. }
复制代码
注册初始化SuperSocket引擎模块,需要继承EnginePartModule, 代码如下:
  1.   public class SuperSocketModule : EnginePartModule
  2.   {
  3.       public override void Initialize(AppModuleContext context)
  4.       {
  5.           base.Initialize(context);
  6.       }
  7.       /// <summary>
  8.       /// Inject dependent third-party components
  9.       /// </summary>
  10.       /// <param name="builder"></param>
  11.       protected override void RegisterBuilder(ContainerBuilderWrapper builder)
  12.       {
  13.           base.RegisterBuilder(builder);
  14.           builder.Register(provider =>
  15.           {
  16.               IServiceExecutor serviceExecutor = null;
  17.               if (provider.IsRegistered(typeof(IServiceExecutor)))
  18.                   serviceExecutor = provider.Resolve<IServiceExecutor>();
  19.               return new SuperSocketTransportClientFactory(provider.Resolve<ITransportMessageCodecFactory>(),
  20.                     provider.Resolve<IHealthCheckService>(),
  21.                   provider.Resolve<ILogger<SuperSocketTransportClientFactory>>(),
  22.                   serviceExecutor);
  23.           }).As(typeof(ITransportClientFactory)).SingleInstance();
  24.           if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.Tcp ||
  25.               AppConfig.ServerOptions.Protocol == CommunicationProtocol.None)
  26.           {
  27.               RegisterDefaultProtocol(builder);
  28.           }
  29.       }
  30.       private void RegisterDefaultProtocol(ContainerBuilderWrapper builder)
  31.       {
  32.           builder.Register(provider =>
  33.           {
  34.               return new SuperSocketServerMessageListener(provider.Resolve<ILogger<SuperSocketServerMessageListener>>(),
  35.                     provider.Resolve<ITransportMessageCodecFactory>(),
  36.                        provider.Resolve<IServiceEngineLifetime>());
  37.           }).SingleInstance();
  38.           builder.Register(provider =>
  39.           {
  40.               var serviceExecutor = provider.ResolveKeyed<IServiceExecutor>(CommunicationProtocol.Tcp.ToString());
  41.               var messageListener = provider.Resolve<SuperSocketServerMessageListener>();
  42.               return new DefaultServiceHost(async endPoint =>
  43.               {
  44.                   await messageListener.StartAsync(endPoint);
  45.                   return messageListener;
  46.               }, serviceExecutor);
  47.           }).As<IServiceHost>();
  48.       }
  49.   }
复制代码
客户端服务端消息发送,需要继承IMessageSender, 代码如下:
 
  1.     public abstract class SuperSocketMessageSender
  2.     {
  3.         private readonly ITransportMessageEncoder _transportMessageEncoder;
  4.         protected SuperSocketMessageSender(ITransportMessageEncoder transportMessageEncoder)
  5.         {
  6.             _transportMessageEncoder = transportMessageEncoder;
  7.         }
  8.         protected byte[] GetByteBuffer(TransportMessage message)
  9.         {
  10.             var data = _transportMessageEncoder.Encode(message).ToList();
  11.             data.AddRange(Encoding.UTF8.GetBytes("\r\n"));
  12.             //var buffer = PooledByteBufferAllocator.Default.Buffer();
  13.             return data.ToArray();
  14.         }
  15.     }
  16.     public class SuperSocketMessageClientSender : SuperSocketMessageSender, IMessageSender
  17.     {
  18.         private readonly IEasyClient<TransportMessage> _client;
  19.         public SuperSocketMessageClientSender(ITransportMessageEncoder transportMessageEncoder, IEasyClient<TransportMessage> client) : base(transportMessageEncoder)
  20.         {
  21.             _client = client;
  22.         }
  23.         /// <summary>
  24.         /// 发送消息。
  25.         /// </summary>
  26.         /// <param name="message">消息内容。</param>
  27.         /// <returns>一个任务。</returns>
  28.         public async Task SendAsync(TransportMessage message)
  29.         {
  30.             var buffer = GetByteBuffer(message);
  31.            await _client.SendAsync(buffer);
  32.         }
  33.         /// <summary>
  34.         /// 发送消息并清空缓冲区。
  35.         /// </summary>
  36.         /// <param name="message">消息内容。</param>
  37.         /// <returns>一个任务。</returns>
  38.         public async Task SendAndFlushAsync(TransportMessage message)
  39.         {
  40.             var buffer = GetByteBuffer(message);
  41.             await _client.SendAsync(buffer);
  42.             // _client.StartReceive();
  43.             //var p=  await _client.ReceiveAsync();
  44.         }
  45.     }
  46.     #region Implementation of IMessageSender
  47.     public class SuperSocketServerMessageSender : SuperSocketMessageSender, IMessageSender
  48.     {
  49.         private readonly IAppSession _session;
  50.         public SuperSocketServerMessageSender(ITransportMessageEncoder transportMessageEncoder, IAppSession session) : base(transportMessageEncoder)
  51.         {
  52.             _session = session;
  53.         }
  54.         /// <summary>
  55.         /// 发送消息。
  56.         /// </summary>
  57.         /// <param name="message">消息内容。</param>
  58.         /// <returns>一个任务。</returns>
  59.         public async Task SendAsync(TransportMessage message)
  60.         {
  61.             var buffer = GetByteBuffer(message);
  62.            await _session.SendAsync(buffer);
  63.         }
  64.         /// <summary>
  65.         /// 发送消息并清空缓冲区。
  66.         /// </summary>
  67.         /// <param name="message">消息内容。</param>
  68.         /// <returns>一个任务。</returns>
  69.         public async Task SendAndFlushAsync(TransportMessage message)
  70.         {
  71.             var buffer = GetByteBuffer(message);
  72.            await _session.SendAsync(buffer);
  73.         }
  74.     }
  75.     #endregion
复制代码
 
SuperSocket过滤器,需要继承TerminatorPipelineFilter, 代码如下:
 
  1.     public class TransportMessagePipelineFilter : TerminatorPipelineFilter<TransportMessage>
  2.     {
  3.         private readonly ITransportMessageDecoder _transportMessageDecoder;
  4.         public TransportMessagePipelineFilter() : base(new[] { (byte)'\r', (byte)'\n' })
  5.         {
  6.             _transportMessageDecoder = ServiceLocator.GetService<ITransportMessageCodecFactory>().GetDecoder();
  7.         }
  8.         public override TransportMessage Filter(ref SequenceReader<byte> bufferStream)
  9.         {
  10.             try
  11.             {
  12.                 var bytes = bufferStream.Sequence.Slice(0, bufferStream.Length - 2).ToArray();
  13.                 var transportMessage = _transportMessageDecoder.Decode(bytes);
  14.                 return transportMessage;
  15.             }
  16.             finally
  17.             {
  18.                 bufferStream.Advance(bufferStream.Length);
  19.             }
  20.         }
  21.     }
复制代码
 
三、如何加载SuperSocket引擎组件

第一种方式:
去掉Surging.Core.DotNetty引用,添加Surging.Core.SuperSocket
5.png

 第二种方式
添加Surging.Core.DotNetty,Surging.Core.SuperSocket这两个应用,在surgingSettings.json配置文件中把Packages的using列表中的DotNettyModule改成SuperSocketModule
6.png

四、结果

可能是预发布版本,在测试当中还是有些问题,kayka物联网平台换成SuperSocket还是会发生错误,暂时没有条件进行压测,可能是因为预发布版本,作者还需要完善,等正式版之后再压测做对比吧
 
五、总结

因为这段时间比较忙,还需要协助客户拆分服务,缓存降级,消息队列,等忙完这段时间,线上物联网平台会开通端口给用户进行测试,我也会努力把物联网进行完善,让微服务物联网平台能走向新的高度。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册