一、前言
因为微服务引擎依赖于dotnetty组件,很多协议都是针对于dotnetty 进行扩展,然后对于老版本https://github.com/azure/dotnetty 停止更新后,本人下载源码进行修改更新,并且大家要求独立仓库进行开源,所以今天整理了代码开源至https://github.com/microsurging/DotNetty, 也希望大家一起贡献代码,让dotnetty 生态更强大。
HttpFlv:http://demo.kayakiot.cn:281/httpflv.html (黑衣人)
HttpFlv:http://demo.kayakiot.cn:281/httpflv1.html (大红包)
HttpFlv:http://demo.kayakiot.cn:281/httpflv2.html (鹿鼎记)
rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream2 (黑衣人)
rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream3 (大红包)
rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream4(鹿鼎记)
注:测试服务器带宽只有8MB, httpflv 缓冲做的没有rtmp好,然后httpflv卡就多刷新几次
凯亚 (Kayak) 是什么?
凯亚(Kayak)是基于.NET8.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
凯亚物联网平台:http://demo.kayakiot.cn:3100(用户名:fanly 密码:123456)
链路跟踪Skywalking V8:http://117.72.121.2:8080/
surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)
二、ValueTask扩展支持
IValueTaskPromise:- public interface IValueTaskPromise: IPromise
- {
- bool IsVoid { get; }
- bool IsCompleted { get; }
- bool IsSuccess { get; }
- bool IsFaulted { get; }
- bool IsCanceled { get; }
- bool TryComplete();
- void Complete();
- bool TrySetException(Exception exception);
- bool TrySetException(IEnumerable<Exception> exceptions);
- void SetException(Exception exception);
- void SetException(IEnumerable<Exception> exceptions);
- bool TrySetCanceled();
- void SetCanceled();
- bool SetUncancellable();
- IPromise Unvoid();
- }
复制代码 DefaultValueTaskPromise:ManualResetValueTaskSource:- internal interface IStrongBox<T>
- {
- ref T Value { get; }
- bool RunContinuationsAsynchronously { get; set; }
- }
- public enum ContinuationOptions
- {
- None,
- ForceDefaultTaskScheduler
- }
- public class ManualResetValueTaskSource<T> : IStrongBox<ManualResetValueTaskSourceLogic<T>>, IValueTaskSource<T>, IValueTaskSource
- {
- private ManualResetValueTaskSourceLogic<T> _logic;
- private readonly Action _cancellationCallback;
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public ManualResetValueTaskSource(ContinuationOptions options = ContinuationOptions.None)
- {
- _logic = new ManualResetValueTaskSourceLogic<T>(this, options,null);
- _cancellationCallback = SetCanceled;
- }
- public ManualResetValueTaskSource(object state, ContinuationOptions options = ContinuationOptions.None)
- {
- _logic = new ManualResetValueTaskSourceLogic<T>(this, options,state);
- _cancellationCallback = SetCanceled;
- }
- public short Version => _logic.Version;
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public bool SetResult(T result)
- {
- lock (_cancellationCallback)
- {
- if (_logic.Completed)
- {
- return false;
- }
- _logic.SetResult(result);
- return true;
- }
- }
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void SetException(Exception error)
- {
- if (Monitor.TryEnter(_cancellationCallback))
- {
- if (_logic.Completed)
- {
- Monitor.Exit(_cancellationCallback);
- return;
- }
- _logic.SetException(error);
- Monitor.Exit(_cancellationCallback);
- }
- }
- public void SetCanceled() => SetException(new TaskCanceledException());
- public T GetResult(short token) => _logic.GetResult(token);
- void IValueTaskSource.GetResult(short token) => _logic.GetResult(token);
- public ValueTaskSourceStatus GetStatus(short token) => _logic.GetStatus(token);
- public bool RunContinuationsAsynchronously { get; set; } = true;
- public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _logic.OnCompleted(continuation, state, token, flags);
- ref ManualResetValueTaskSourceLogic<T> IStrongBox<ManualResetValueTaskSourceLogic<T>>.Value => ref _logic;
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public ValueTask<T> AwaitValue(CancellationToken cancellation)
- {
- CancellationTokenRegistration? registration = cancellation == CancellationToken.None
- ? (CancellationTokenRegistration?)null
- : cancellation.Register(_cancellationCallback);
- return _logic.AwaitValue(this, registration);
- }
- public ValueTask AwaitVoid(CancellationToken cancellation)
- {
- CancellationTokenRegistration? registration = cancellation == CancellationToken.None
- ? (CancellationTokenRegistration?)null
- : cancellation.Register(_cancellationCallback);
- return _logic.AwaitVoid(this, registration);
- }
- public void Reset() => _logic.Reset();
- }
- internal struct ManualResetValueTaskSourceLogic<TResult>
- {
- private static readonly Action<object> s_sentinel = s => throw new InvalidOperationException();
- private readonly IStrongBox<ManualResetValueTaskSourceLogic<TResult>> _parent;
- private readonly ContinuationOptions _options;
- private Action<object> _continuation;
- private object _continuationState;
- private object _capturedContext;
- private ExecutionContext _executionContext;
- private bool _completed;
- private TResult _result;
- private ExceptionDispatchInfo _error;
- private CancellationTokenRegistration? _registration;
-
- public ManualResetValueTaskSourceLogic(IStrongBox<ManualResetValueTaskSourceLogic<TResult>> parent, ContinuationOptions options,object state)
- {
- _parent = parent ?? throw new ArgumentNullException(nameof(parent));
- _options = options;
- _continuation = null;
- _continuationState = null;
- _capturedContext = null;
- _executionContext = null;
- _completed = state != null;
- _result =state==null? default(TResult): (TResult)state;
- _error = null;
- Version = 0;
- _registration = null;
- }
- public short Version { get; private set; }
- public bool Completed => _completed;
- private void ValidateToken(short token)
- {
- if (token != Version)
- {
- throw new InvalidOperationException();
- }
- }
- public ValueTaskSourceStatus GetStatus(short token)
- {
- // ValidateToken(token);
- return
- !_completed ? ValueTaskSourceStatus.Pending :
- _error == null ? ValueTaskSourceStatus.Succeeded :
- _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
- ValueTaskSourceStatus.Faulted;
- }
- public TResult GetResult(short token)
- {
- // ValidateToken(token);
- if (!_completed)
- {
- return _result;
- }
- TResult result = _result;
- ExceptionDispatchInfo error = _error;
- Reset();
- error?.Throw();
- return result;
- }
- public void Reset()
- {
- Version++;
- _registration?.Dispose();
- _completed = false;
- _continuation = null;
- _continuationState = null;
- _result = default(TResult);
- _error = null;
- _executionContext = null;
- _capturedContext = null;
- _registration = null;
- }
- public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
- {
- if (continuation == null)
- {
- throw new ArgumentNullException(nameof(continuation));
- }
- ValidateToken(token);
- if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
- {
- _executionContext = ExecutionContext.Capture();
- }
- if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
- {
- SynchronizationContext sc = SynchronizationContext.Current;
- if (sc != null && sc.GetType() != typeof(SynchronizationContext))
- {
- _capturedContext = sc;
- }
- else
- {
- TaskScheduler ts = TaskScheduler.Current;
- if (ts != TaskScheduler.Default)
- {
- _capturedContext = ts;
- }
- }
- }
- _continuationState = state;
- if (Interlocked.CompareExchange(ref _continuation, continuation, null) != null)
- {
- _executionContext = null;
- object cc = _capturedContext;
- _capturedContext = null;
- switch (cc)
- {
- case null:
- Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
- break;
- case SynchronizationContext sc:
- sc.Post(s =>
- {
- var tuple = (Tuple, object>)s;
- tuple.Item1(tuple.Item2);
- }, Tuple.Create(continuation, state));
- break;
- case TaskScheduler ts:
- Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
- break;
- }
- }
- }
- public void SetResult(TResult result)
- {
- _result = result;
- SignalCompletion();
- }
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void SetException(Exception error)
- {
- _error = ExceptionDispatchInfo.Capture(error);
- SignalCompletion();
- }
- private void SignalCompletion()
- {
- if (_completed)
- {
- throw new InvalidOperationException("Double completion of completion source is prohibited");
- }
- _completed = true;
- if (Interlocked.CompareExchange(ref _continuation, s_sentinel, null) != null)
- {
- if (_executionContext != null)
- {
- ExecutionContext.Run(
- _executionContext,
- s => ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value.InvokeContinuation(),
- _parent ?? throw new InvalidOperationException());
- }
- else
- {
- InvokeContinuation();
- }
- }
- }
- private void InvokeContinuation()
- {
- object cc = _capturedContext;
- _capturedContext = null;
- if (_options == ContinuationOptions.ForceDefaultTaskScheduler)
- {
- cc = TaskScheduler.Default;
- }
- switch (cc)
- {
- case null:
- if (_parent.RunContinuationsAsynchronously)
- {
- var c = _continuation;
- if (_executionContext != null)
- {
- ThreadPool.QueueUserWorkItem(s => c(s), _continuationState);
- }
- else
- {
- ThreadPool.UnsafeQueueUserWorkItem(s => c(s), _continuationState);
- }
- }
- else
- {
- _continuation(_continuationState);
- }
- break;
- case SynchronizationContext sc:
- sc.Post(s =>
- {
- ref ManualResetValueTaskSourceLogic<TResult> logicRef = ref ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value;
- logicRef._continuation(logicRef._continuationState);
- }, _parent ?? throw new InvalidOperationException());
- break;
- case TaskScheduler ts:
- Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
- break;
- }
- }
- public ValueTask<T> AwaitValue<T>(IValueTaskSource<T> source, CancellationTokenRegistration? registration)
- {
- _registration = registration;
- return new ValueTask<T>(source, Version);
- }
- public ValueTask AwaitVoid(IValueTaskSource source, CancellationTokenRegistration? registration)
- {
- _registration = registration;
- return new ValueTask(source, Version);
- }
- }
复制代码 然后把DefaultPromise 替换成DefaultValueTaskPromise,如下图所示
三、扩展支持rtmp编解码
如下图所示 :
四、demo展示
开启了三个通道进行推流,cpu,内存都比较稳定
在凯亚物联网平台你也可以创建rtmp组件
视频中心
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |