一、概述
周末在家试着扩展SuperSocket,因为之前都是只支持.net framework, 后面出现支持.NET CORE 的SuperSocket 2.0 ,然后集成进来和dotnetty 做下对比,dotnetty 有多强,我压测可以支持20w/s, 然后客户提供的服务器,通过外网压测网关,把上行速度50MB带宽的网络跑满了,引擎主机CPU只是在15%左右,完全没有跑满。然后再试试国人开发的SuperSocket看下性能怎么样。
木舟 (Kayak) 是什么?
木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
凯亚物联网平台:http://117.72.121.2:3100(用户名:fanly 密码:123456)(木舟物联网有人取了,准备改名原神凯亚,凡是交托于他的任务,总能得到解决)
链路跟踪Skywalking V8:http://117.72.121.2:8080/
surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)
二、集成SuperSocket
作为去中心化的微服务引擎,相关的引擎组件,中间件都可以替换,就比如核心的RPC组件dotnetty 都可以替换成其它组件,下面介绍如何进行替换
创建服务端消息监听SuperSocketServerMessageListener,需要继承IMessageListener,代码如下:- public class SuperSocketServerMessageListener : IMessageListener, IDisposable
- {
- public event ReceivedDelegate Received;
- private readonly ILogger<SuperSocketServerMessageListener> _logger;
- private readonly ITransportMessageDecoder _transportMessageDecoder;
- private readonly ITransportMessageEncoder _transportMessageEncoder;
- private readonly IServiceEngineLifetime _serviceEngineLifetime;
- public SuperSocketServerMessageListener(ILogger<SuperSocketServerMessageListener> logger, ITransportMessageCodecFactory codecFactory, IServiceEngineLifetime serviceEngineLifetime)
- {
- _logger = logger;
- _transportMessageEncoder = codecFactory.GetEncoder();
- _transportMessageDecoder = codecFactory.GetDecoder();
- _serviceEngineLifetime = serviceEngineLifetime;
- }
- public async Task StartAsync(EndPoint endPoint)
- {
- _serviceEngineLifetime.ServiceEngineStarted.Register(async () =>
- {
- try
- {
- var ipEndPoint = endPoint as IPEndPoint;
- var host = SuperSocketHostBuilder.Create<TransportMessage, TransportMessagePipelineFilter>()
-
- .UsePackageHandler( (s, p) =>
- {
- Task.Run(async () =>
- {
- var sender = new SuperSocketServerMessageSender(_transportMessageEncoder, s);
- await OnReceived(sender, p);
- });
- return ValueTask.CompletedTask;
- })
- .ConfigureSuperSocket(options =>
- {
- options.Name = "Echo Server";
- options.Logger = _logger;
- options.AddListener(new ListenOptions
- {
- Ip = ipEndPoint.Address.ToString(),
- Port = ipEndPoint.Port,
-
- }
- );
- })
- .ConfigureLogging((hostCtx, loggingBuilder) =>
- {
- loggingBuilder.AddConsole();
- })
- .Build();
- await host.RunAsync();
- }
- catch (Exception ex)
- {
- _logger.LogError($"SuperSocket服务主机启动失败,监听地址:{endPoint}。 ");
- }
- });
- }
-
- public async Task OnReceived(IMessageSender sender, TransportMessage message)
- {
- if (Received == null)
- return;
- await Received(sender, message);
- }
- public void Dispose()
- {
- }
- }
复制代码 创建客户端消息监听SuperSocketTransportClientFactory,需要继承ITransportClientFactory,代码如下:
- internal class SuperSocketTransportClientFactory : ITransportClientFactory, IDisposable
- {
- private readonly ITransportMessageEncoder _transportMessageEncoder;
- private readonly ITransportMessageDecoder _transportMessageDecoder;
- private readonly ILogger<SuperSocketTransportClientFactory> _logger;
- private readonly IServiceExecutor _serviceExecutor;
- private readonly IHealthCheckService _healthCheckService;
- private readonly ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>> _clients = new ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>>();
- public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger<SuperSocketTransportClientFactory> logger)
- : this(codecFactory, healthCheckService, logger, null)
- {
- }
- public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger<SuperSocketTransportClientFactory> logger, IServiceExecutor serviceExecutor)
- {
- _transportMessageEncoder = codecFactory.GetEncoder();
- _transportMessageDecoder = codecFactory.GetDecoder();
- _logger = logger;
- _serviceExecutor = serviceExecutor;
- _healthCheckService = healthCheckService;
- }
- public async Task<ITransportClient> CreateClientAsync(EndPoint endPoint)
- {
- var key = endPoint;
- if (_logger.IsEnabled(LogLevel.Debug))
- _logger.LogDebug($"准备为服务端地址:{key}创建客户端。");
- try
- {
- return await _clients.GetOrAdd(key
- , k => new Lazy<Task<ITransportClient>>(async () =>
- {
- //客户端对象
- var client = new EasyClient<TransportMessage>(new TransportMessagePipelineFilter()).AsClient();
- var messageListener = new MessageListener();
- var messageSender = new SuperSocketMessageClientSender(_transportMessageEncoder, client);
- await client.ConnectAsync(endPoint);
- client.PackageHandler += async (sender, package) =>
- {
- await messageListener.OnReceived(messageSender, package);
- };
- client.StartReceive();
- //创建客户端
- var transportClient = new TransportClient(messageSender, messageListener, _logger, _serviceExecutor);
- return transportClient;
- }
- )).Value;//返回实例
- }
- catch
- {
- //移除
- _clients.TryRemove(key, out var value);
- var ipEndPoint = endPoint as IPEndPoint;
- //标记这个地址是失败的请求
- if (ipEndPoint != null)
- await _healthCheckService.MarkFailure(new IpAddressModel(ipEndPoint.Address.ToString(), ipEndPoint.Port));
- throw;
- }
- }
- public void Dispose()
- {
- foreach (var client in _clients.Values)
- {
- (client as IDisposable)?.Dispose();
- }
- }
- }
复制代码 注册初始化SuperSocket引擎模块,需要继承EnginePartModule, 代码如下:- public class SuperSocketModule : EnginePartModule
- {
- public override void Initialize(AppModuleContext context)
- {
- base.Initialize(context);
- }
- /// <summary>
- /// Inject dependent third-party components
- /// </summary>
- /// <param name="builder"></param>
- protected override void RegisterBuilder(ContainerBuilderWrapper builder)
- {
- base.RegisterBuilder(builder);
- builder.Register(provider =>
- {
- IServiceExecutor serviceExecutor = null;
- if (provider.IsRegistered(typeof(IServiceExecutor)))
- serviceExecutor = provider.Resolve<IServiceExecutor>();
- return new SuperSocketTransportClientFactory(provider.Resolve<ITransportMessageCodecFactory>(),
- provider.Resolve<IHealthCheckService>(),
- provider.Resolve<ILogger<SuperSocketTransportClientFactory>>(),
- serviceExecutor);
- }).As(typeof(ITransportClientFactory)).SingleInstance();
- if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.Tcp ||
- AppConfig.ServerOptions.Protocol == CommunicationProtocol.None)
- {
- RegisterDefaultProtocol(builder);
- }
- }
- private void RegisterDefaultProtocol(ContainerBuilderWrapper builder)
- {
- builder.Register(provider =>
- {
- return new SuperSocketServerMessageListener(provider.Resolve<ILogger<SuperSocketServerMessageListener>>(),
- provider.Resolve<ITransportMessageCodecFactory>(),
- provider.Resolve<IServiceEngineLifetime>());
- }).SingleInstance();
- builder.Register(provider =>
- {
- var serviceExecutor = provider.ResolveKeyed<IServiceExecutor>(CommunicationProtocol.Tcp.ToString());
- var messageListener = provider.Resolve<SuperSocketServerMessageListener>();
- return new DefaultServiceHost(async endPoint =>
- {
- await messageListener.StartAsync(endPoint);
- return messageListener;
- }, serviceExecutor);
- }).As<IServiceHost>();
- }
- }
复制代码 客户端服务端消息发送,需要继承IMessageSender, 代码如下:
- public abstract class SuperSocketMessageSender
- {
- private readonly ITransportMessageEncoder _transportMessageEncoder;
- protected SuperSocketMessageSender(ITransportMessageEncoder transportMessageEncoder)
- {
- _transportMessageEncoder = transportMessageEncoder;
- }
- protected byte[] GetByteBuffer(TransportMessage message)
- {
- var data = _transportMessageEncoder.Encode(message).ToList();
- data.AddRange(Encoding.UTF8.GetBytes("\r\n"));
- //var buffer = PooledByteBufferAllocator.Default.Buffer();
- return data.ToArray();
- }
- }
- public class SuperSocketMessageClientSender : SuperSocketMessageSender, IMessageSender
- {
- private readonly IEasyClient<TransportMessage> _client;
- public SuperSocketMessageClientSender(ITransportMessageEncoder transportMessageEncoder, IEasyClient<TransportMessage> client) : base(transportMessageEncoder)
- {
- _client = client;
- }
- /// <summary>
- /// 发送消息。
- /// </summary>
- /// <param name="message">消息内容。</param>
- /// <returns>一个任务。</returns>
- public async Task SendAsync(TransportMessage message)
- {
- var buffer = GetByteBuffer(message);
- await _client.SendAsync(buffer);
- }
- /// <summary>
- /// 发送消息并清空缓冲区。
- /// </summary>
- /// <param name="message">消息内容。</param>
- /// <returns>一个任务。</returns>
- public async Task SendAndFlushAsync(TransportMessage message)
- {
- var buffer = GetByteBuffer(message);
- await _client.SendAsync(buffer);
- // _client.StartReceive();
- //var p= await _client.ReceiveAsync();
- }
- }
- #region Implementation of IMessageSender
- public class SuperSocketServerMessageSender : SuperSocketMessageSender, IMessageSender
- {
- private readonly IAppSession _session;
- public SuperSocketServerMessageSender(ITransportMessageEncoder transportMessageEncoder, IAppSession session) : base(transportMessageEncoder)
- {
- _session = session;
- }
- /// <summary>
- /// 发送消息。
- /// </summary>
- /// <param name="message">消息内容。</param>
- /// <returns>一个任务。</returns>
- public async Task SendAsync(TransportMessage message)
- {
- var buffer = GetByteBuffer(message);
- await _session.SendAsync(buffer);
- }
- /// <summary>
- /// 发送消息并清空缓冲区。
- /// </summary>
- /// <param name="message">消息内容。</param>
- /// <returns>一个任务。</returns>
- public async Task SendAndFlushAsync(TransportMessage message)
- {
- var buffer = GetByteBuffer(message);
- await _session.SendAsync(buffer);
- }
- }
- #endregion
复制代码
SuperSocket过滤器,需要继承TerminatorPipelineFilter, 代码如下:
- public class TransportMessagePipelineFilter : TerminatorPipelineFilter<TransportMessage>
- {
- private readonly ITransportMessageDecoder _transportMessageDecoder;
- public TransportMessagePipelineFilter() : base(new[] { (byte)'\r', (byte)'\n' })
- {
- _transportMessageDecoder = ServiceLocator.GetService<ITransportMessageCodecFactory>().GetDecoder();
- }
- public override TransportMessage Filter(ref SequenceReader<byte> bufferStream)
- {
- try
- {
- var bytes = bufferStream.Sequence.Slice(0, bufferStream.Length - 2).ToArray();
- var transportMessage = _transportMessageDecoder.Decode(bytes);
- return transportMessage;
- }
- finally
- {
- bufferStream.Advance(bufferStream.Length);
- }
- }
- }
复制代码
三、如何加载SuperSocket引擎组件
第一种方式:
去掉Surging.Core.DotNetty引用,添加Surging.Core.SuperSocket
第二种方式
添加Surging.Core.DotNetty,Surging.Core.SuperSocket这两个应用,在surgingSettings.json配置文件中把Packages的using列表中的DotNettyModule改成SuperSocketModule
四、结果
可能是预发布版本,在测试当中还是有些问题,kayka物联网平台换成SuperSocket还是会发生错误,暂时没有条件进行压测,可能是因为预发布版本,作者还需要完善,等正式版之后再压测做对比吧
五、总结
因为这段时间比较忙,还需要协助客户拆分服务,缓存降级,消息队列,等忙完这段时间,线上物联网平台会开通端口给用户进行测试,我也会努力把物联网进行完善,让微服务物联网平台能走向新的高度。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |