找回密码
 立即注册
首页 业界区 业界 dotnetty 新的篇章- 开源

dotnetty 新的篇章- 开源

水苯 2025-9-25 10:58:40
一、前言

      因为微服务引擎依赖于dotnetty组件,很多协议都是针对于dotnetty 进行扩展,然后对于老版本https://github.com/azure/dotnetty 停止更新后,本人下载源码进行修改更新,并且大家要求独立仓库进行开源,所以今天整理了代码开源至https://github.com/microsurging/DotNetty,  也希望大家一起贡献代码,让dotnetty 生态更强大。
HttpFlv:http://demo.kayakiot.cn:281/httpflv.html  (黑衣人)
 HttpFlv:http://demo.kayakiot.cn:281/httpflv1.html  (大红包)
HttpFlv:http://demo.kayakiot.cn:281/httpflv2.html  (鹿鼎记)
rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream2   (黑衣人)
rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream3   (大红包)
rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream4(鹿鼎记)
注:测试服务器带宽只有8MB, httpflv  缓冲做的没有rtmp好,然后httpflv卡就多刷新几次
  凯亚 (Kayak) 是什么?
       凯亚(Kayak)是基于.NET8.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
     凯亚物联网平台:http://demo.kayakiot.cn:3100(用户名:fanly  密码:123456)
    链路跟踪Skywalking V8:http://117.72.121.2:8080/
      surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)
二、ValueTask扩展支持

IValueTaskPromise:
  1.     public interface IValueTaskPromise: IPromise
  2.     {   
  3.             bool IsVoid { get; }
  4.             bool IsCompleted { get; }
  5.             bool IsSuccess { get; }
  6.             bool IsFaulted { get; }
  7.             bool IsCanceled { get; }
  8.             bool TryComplete();
  9.             void Complete();
  10.             bool TrySetException(Exception exception);
  11.             bool TrySetException(IEnumerable<Exception> exceptions);
  12.             void SetException(Exception exception);
  13.             void SetException(IEnumerable<Exception> exceptions);
  14.             bool TrySetCanceled();
  15.             void SetCanceled();
  16.             bool SetUncancellable();
  17.              IPromise Unvoid();
  18.         }
复制代码
DefaultValueTaskPromise:
  1.     public class DefaultValueTaskPromise: IValueTaskPromise
  2.     {
  3.         private readonly CancellationToken _token;
  4. #if NET
  5.         private readonly TaskCompletionSource _tcs;
  6. #else
  7.         private readonly ManualResetValueTaskSource<object> _tcs;
  8. #endif
  9.         private int v_uncancellable = SharedConstants.False;
  10.         public DefaultValueTaskPromise()
  11.         {
  12.             _token = CancellationToken.None;
  13. #if NET
  14.             _tcs = new TaskCompletionSource();
  15. #else
  16.             _tcs = new ManualResetValueTaskSource<object>();
  17. #endif
  18.         }
  19.         public DefaultValueTaskPromise(object state)
  20.         {
  21. #if NET
  22.             _tcs = new TaskCompletionSource(state);
  23. #else
  24.             _tcs = new ManualResetValueTaskSource<object>(state);
  25. #endif
  26.         }
  27.         public DefaultValueTaskPromise(CancellationToken cancellationToken)
  28.         {
  29.             _token= cancellationToken;
  30.         }
  31.         public ValueTask ValueTask
  32.         {
  33.             [MethodImpl(InlineMethod.AggressiveOptimization)]
  34.             get => _tcs.AwaitVoid(_token);
  35.         }
  36.         public bool IsVoid => false;
  37.         public bool IsSuccess => ValueTask.IsCompletedSuccessfully;
  38.         public bool IsCompleted => ValueTask.IsCompleted;
  39.         public bool IsFaulted => ValueTask.IsFaulted;
  40.         public bool IsCanceled => ValueTask.IsCanceled;
  41.        public  Task  Task => ValueTask.AsTask();
  42.         public virtual bool TryComplete()
  43.         {
  44. #if NET
  45.             return _tcs.TrySetResult();
  46. #else
  47.             return _tcs.SetResult(0);
  48. #endif
  49.         }
  50.         public virtual void Complete()
  51.         {
  52. #if NET
  53.             _tcs.SetResult();
  54. #else
  55.             _tcs.SetResult(0);
  56. #endif
  57.         }
  58.         public virtual void SetCanceled()
  59.         {
  60.             if (SharedConstants.False < (uint)Volatile.Read(ref v_uncancellable)) { return; }
  61.             _tcs.SetCanceled();
  62.         }
  63.         public virtual void SetException(Exception exception)
  64.         {
  65.             if (exception is AggregateException aggregateException)
  66.             {
  67.                 SetException(aggregateException.InnerExceptions);
  68.                 return;
  69.             }
  70.             _tcs.SetException(exception);
  71.         }
  72.         public virtual void SetException(IEnumerable<Exception> exceptions)
  73.         {
  74.             _tcs.SetException(exceptions.FirstOrDefault());
  75.         }
  76.         public virtual bool TrySetCanceled()
  77.         {
  78.             if (SharedConstants.False < (uint)Volatile.Read(ref v_uncancellable)) { return false; }
  79.               _tcs.SetCanceled();
  80.             return true;
  81.         }
  82.         public virtual bool TrySetException(Exception exception)
  83.         {
  84.             if (exception is AggregateException aggregateException)
  85.             {
  86.                 return TrySetException(aggregateException.InnerExceptions);
  87.             }
  88.               _tcs.SetException(exception);
  89.             return true;
  90.         }
  91.         public virtual bool TrySetException(IEnumerable<Exception> exceptions)
  92.         {
  93.               _tcs.SetException(exceptions.FirstOrDefault());
  94.             return true;
  95.         }
  96.         public bool SetUncancellable()
  97.         {
  98.             if (SharedConstants.False >= (uint)Interlocked.CompareExchange(ref v_uncancellable, SharedConstants.True, SharedConstants.False))
  99.             {
  100.                 return true;
  101.             }
  102.             return !IsCompleted;
  103.         }
  104.         public override string ToString() => "TaskCompletionSource[status: " + ValueTask.AsTask().Status.ToString() + "]";
  105.         public IPromise Unvoid() => this;
  106.          
  107.     }
复制代码
ManualResetValueTaskSource:
  1. internal interface IStrongBox<T>
  2. {
  3.      ref T Value { get; }
  4.      bool RunContinuationsAsynchronously { get; set; }
  5. }
  6. public enum ContinuationOptions
  7. {
  8.      None,
  9.      ForceDefaultTaskScheduler
  10. }
  11. public class ManualResetValueTaskSource<T> : IStrongBox<ManualResetValueTaskSourceLogic<T>>, IValueTaskSource<T>, IValueTaskSource
  12. {
  13.      private ManualResetValueTaskSourceLogic<T> _logic;
  14.      private readonly Action _cancellationCallback;
  15.      [MethodImpl(MethodImplOptions.AggressiveInlining)]
  16.      public ManualResetValueTaskSource(ContinuationOptions options = ContinuationOptions.None)
  17.      {
  18.          _logic = new ManualResetValueTaskSourceLogic<T>(this, options,null);
  19.          _cancellationCallback = SetCanceled;
  20.      }
  21.      public ManualResetValueTaskSource(object state, ContinuationOptions options = ContinuationOptions.None)
  22.      {
  23.          _logic = new ManualResetValueTaskSourceLogic<T>(this, options,state);
  24.          _cancellationCallback = SetCanceled;
  25.      }
  26.      public short Version => _logic.Version;
  27.      [MethodImpl(MethodImplOptions.AggressiveInlining)]
  28.      public bool SetResult(T result)
  29.      {
  30.          lock (_cancellationCallback)
  31.          {
  32.              if (_logic.Completed)
  33.              {
  34.                  return false;
  35.              }
  36.              _logic.SetResult(result);
  37.              return true;
  38.          }
  39.      }
  40.      [MethodImpl(MethodImplOptions.AggressiveInlining)]
  41.      public void SetException(Exception error)
  42.      {
  43.          if (Monitor.TryEnter(_cancellationCallback))
  44.          {
  45.              if (_logic.Completed)
  46.              {
  47.                  Monitor.Exit(_cancellationCallback);
  48.                  return;
  49.              }
  50.              _logic.SetException(error);
  51.              Monitor.Exit(_cancellationCallback);
  52.          }
  53.      }
  54.      public void SetCanceled() => SetException(new TaskCanceledException());
  55.      public T GetResult(short token) => _logic.GetResult(token);
  56.      void IValueTaskSource.GetResult(short token) => _logic.GetResult(token);
  57.      public ValueTaskSourceStatus GetStatus(short token) => _logic.GetStatus(token);
  58.      public bool RunContinuationsAsynchronously { get; set; } = true;
  59.      public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _logic.OnCompleted(continuation, state, token, flags);
  60.      ref ManualResetValueTaskSourceLogic<T> IStrongBox<ManualResetValueTaskSourceLogic<T>>.Value => ref _logic;
  61.      [MethodImpl(MethodImplOptions.AggressiveInlining)]
  62.      public ValueTask<T> AwaitValue(CancellationToken cancellation)
  63.      {
  64.          CancellationTokenRegistration? registration = cancellation == CancellationToken.None
  65.              ? (CancellationTokenRegistration?)null
  66.              : cancellation.Register(_cancellationCallback);
  67.          return _logic.AwaitValue(this, registration);
  68.      }
  69.      public ValueTask AwaitVoid(CancellationToken cancellation)
  70.      {
  71.          CancellationTokenRegistration? registration = cancellation == CancellationToken.None
  72.              ? (CancellationTokenRegistration?)null
  73.              : cancellation.Register(_cancellationCallback);
  74.          return _logic.AwaitVoid(this, registration);
  75.      }
  76.      public void Reset() => _logic.Reset();
  77. }
  78. internal struct ManualResetValueTaskSourceLogic<TResult>
  79. {
  80.      private static readonly Action<object> s_sentinel = s => throw new InvalidOperationException();
  81.      private readonly IStrongBox<ManualResetValueTaskSourceLogic<TResult>> _parent;
  82.      private readonly ContinuationOptions _options;
  83.      private Action<object> _continuation;
  84.      private object _continuationState;
  85.      private object _capturedContext;
  86.      private ExecutionContext _executionContext;
  87.      private bool _completed;
  88.      private TResult _result;
  89.      private ExceptionDispatchInfo _error;
  90.      private CancellationTokenRegistration? _registration;
  91.       
  92.      public ManualResetValueTaskSourceLogic(IStrongBox<ManualResetValueTaskSourceLogic<TResult>> parent, ContinuationOptions options,object state)
  93.      {
  94.          _parent = parent ?? throw new ArgumentNullException(nameof(parent));
  95.          _options = options;
  96.          _continuation = null;
  97.          _continuationState = null;
  98.          _capturedContext = null;
  99.          _executionContext = null;
  100.          _completed = state != null;
  101.          _result =state==null? default(TResult): (TResult)state;
  102.          _error = null;
  103.          Version = 0;
  104.          _registration = null;
  105.      }
  106.      public short Version { get; private set; }
  107.      public bool Completed => _completed;
  108.      private void ValidateToken(short token)
  109.      {
  110.          if (token != Version)
  111.          {
  112.              throw new InvalidOperationException();
  113.          }
  114.      }
  115.      public ValueTaskSourceStatus GetStatus(short token)
  116.      {
  117.         // ValidateToken(token);
  118.          return
  119.              !_completed ? ValueTaskSourceStatus.Pending :
  120.              _error == null ? ValueTaskSourceStatus.Succeeded :
  121.              _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
  122.              ValueTaskSourceStatus.Faulted;
  123.      }
  124.      public TResult GetResult(short token)
  125.      {
  126.         // ValidateToken(token);
  127.          if (!_completed)
  128.          {
  129.              return _result;
  130.          }
  131.          TResult result = _result;
  132.          ExceptionDispatchInfo error = _error;
  133.          Reset();
  134.          error?.Throw();
  135.          return result;
  136.      }
  137.      public void Reset()
  138.      {
  139.          Version++;
  140.          _registration?.Dispose();
  141.          _completed = false;
  142.          _continuation = null;
  143.          _continuationState = null;
  144.          _result = default(TResult);
  145.          _error = null;
  146.          _executionContext = null;
  147.          _capturedContext = null;
  148.          _registration = null;
  149.      }
  150.      public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
  151.      {
  152.          if (continuation == null)
  153.          {
  154.              throw new ArgumentNullException(nameof(continuation));
  155.          }
  156.          ValidateToken(token);
  157.          if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
  158.          {
  159.              _executionContext = ExecutionContext.Capture();
  160.          }
  161.          if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
  162.          {
  163.              SynchronizationContext sc = SynchronizationContext.Current;
  164.              if (sc != null && sc.GetType() != typeof(SynchronizationContext))
  165.              {
  166.                  _capturedContext = sc;
  167.              }
  168.              else
  169.              {
  170.                  TaskScheduler ts = TaskScheduler.Current;
  171.                  if (ts != TaskScheduler.Default)
  172.                  {
  173.                      _capturedContext = ts;
  174.                  }
  175.              }
  176.          }
  177.          _continuationState = state;
  178.          if (Interlocked.CompareExchange(ref _continuation, continuation, null) != null)
  179.          {
  180.              _executionContext = null;
  181.              object cc = _capturedContext;
  182.              _capturedContext = null;
  183.              switch (cc)
  184.              {
  185.                  case null:
  186.                      Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
  187.                      break;
  188.                  case SynchronizationContext sc:
  189.                      sc.Post(s =>
  190.                      {
  191.                          var tuple = (Tuple, object>)s;
  192.                          tuple.Item1(tuple.Item2);
  193.                      }, Tuple.Create(continuation, state));
  194.                      break;
  195.                  case TaskScheduler ts:
  196.                      Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
  197.                      break;
  198.              }
  199.          }
  200.      }
  201.      public void SetResult(TResult result)
  202.      {
  203.          _result = result;
  204.          SignalCompletion();
  205.      }
  206.      [MethodImpl(MethodImplOptions.AggressiveInlining)]
  207.      public void SetException(Exception error)
  208.      {
  209.          _error = ExceptionDispatchInfo.Capture(error);
  210.          SignalCompletion();
  211.      }
  212.      private void SignalCompletion()
  213.      {
  214.          if (_completed)
  215.          {
  216.              throw new InvalidOperationException("Double completion of completion source is prohibited");
  217.          }
  218.          _completed = true;
  219.          if (Interlocked.CompareExchange(ref _continuation, s_sentinel, null) != null)
  220.          {
  221.              if (_executionContext != null)
  222.              {
  223.                  ExecutionContext.Run(
  224.                      _executionContext,
  225.                      s => ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value.InvokeContinuation(),
  226.                      _parent ?? throw new InvalidOperationException());
  227.              }
  228.              else
  229.              {
  230.                  InvokeContinuation();
  231.              }
  232.          }
  233.      }
  234.      private void InvokeContinuation()
  235.      {
  236.          object cc = _capturedContext;
  237.          _capturedContext = null;
  238.          if (_options == ContinuationOptions.ForceDefaultTaskScheduler)
  239.          {
  240.              cc = TaskScheduler.Default;
  241.          }
  242.          switch (cc)
  243.          {
  244.              case null:
  245.                  if (_parent.RunContinuationsAsynchronously)
  246.                  {
  247.                      var c = _continuation;
  248.                      if (_executionContext != null)
  249.                      {
  250.                          ThreadPool.QueueUserWorkItem(s => c(s), _continuationState);
  251.                      }
  252.                      else
  253.                      {
  254.                          ThreadPool.UnsafeQueueUserWorkItem(s => c(s),  _continuationState);
  255.                      }
  256.                  }
  257.                  else
  258.                  {
  259.                      _continuation(_continuationState);
  260.                  }
  261.                  break;
  262.              case SynchronizationContext sc:
  263.                  sc.Post(s =>
  264.                  {
  265.                      ref ManualResetValueTaskSourceLogic<TResult> logicRef = ref ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value;
  266.                      logicRef._continuation(logicRef._continuationState);
  267.                  }, _parent ?? throw new InvalidOperationException());
  268.                  break;
  269.              case TaskScheduler ts:
  270.                  Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
  271.                  break;
  272.          }
  273.      }
  274.      public ValueTask<T> AwaitValue<T>(IValueTaskSource<T> source, CancellationTokenRegistration? registration)
  275.      {
  276.          _registration = registration;
  277.          return new ValueTask<T>(source, Version);
  278.      }
  279.      public ValueTask AwaitVoid(IValueTaskSource source, CancellationTokenRegistration? registration)
  280.      {
  281.          _registration = registration;
  282.          return new ValueTask(source, Version);
  283.      }
  284. }
复制代码
然后把DefaultPromise 替换成DefaultValueTaskPromise,如下图所示
1.png

三、扩展支持rtmp编解码

如下图所示 :
2.png

四、demo展示

开启了三个通道进行推流,cpu,内存都比较稳定
3.png

 在凯亚物联网平台你也可以创建rtmp组件
4.png

 
视频中心
5.png

 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册