找回密码
 立即注册
首页 业界区 业界 致敬1024,《手搓》轻量级EventBus

致敬1024,《手搓》轻量级EventBus

喜及眩 前天 09:25
一、MediatR



  • .NET事件总线一般使用MediatR
  • 或者基于MediatR二次封装
  • 笔者手搓事件总线和MediatR对比一下
二、事件处理的基本概念

1. 事件



  • 表示已经发生的事情,需要通知其他模块进行处理
2. 事件发布器



  • 负责发布事件的对象
3. 事件处理器



  • 实际接收到通知并处理事件的对象
4. 事件分发器



  • 负责将事件通知给对应的事件处理器
5. 事件总线



  • 用于协调各个组件
三、 先看一下MediatR的Case



  • 用INotification来表示事件
  • 用INotificationHandler来表示事件处理器
  • IMediator双重身份,既是事件发布器,又是事件总线
  • INotificationPublisher是事件分发器
1. 测试代码



  • 使用GenericNotification表示事件
  • CreateNotification()表示主业务,使用异步处理
  • A和B两个处理器表示其他模块业务订阅该事件,并进行异步处理
  • 以上模拟一个简单事件使用的过程
  1. record GenericNotification(string Name) : INotification;
  2. static async Task<GenericNotification> CreateNotification()
  3. {
  4.     var notification = new GenericNotification("GenericNotification");
  5.     await Task.Delay(1000);
  6.     return await Task.FromResult(notification);
  7. }
  8. class GenericANotificationHandler : INotificationHandler<GenericNotification>
  9. {
  10.     public async Task Handle(GenericNotification notification, CancellationToken cancellationToken)
  11.     {
  12.         Console.WriteLine($"GenericANotificationHandler {notification.Name},ThreadId:{Environment.CurrentManagedThreadId}");
  13.         await Task.Delay(1000, cancellationToken);
  14.     }
  15. }
  16. class GenericBNotificationHandler : INotificationHandler<GenericNotification>
  17. {
  18.     public async Task Handle(GenericNotification notification, CancellationToken cancellationToken)
  19.     {
  20.         Console.WriteLine($"GenericBNotificationHandler {notification.Name},ThreadId:{Environment.CurrentManagedThreadId}");
  21.         await Task.Delay(2000, cancellationToken);
  22.     }
  23. }
复制代码
2. 运行一下



  • 使用MediatR的Publish方法异步发布事件
  1. var mediator = serviceCollection.GetRequiredService<IMediator>();
  2. var sw = Stopwatch.StartNew();
  3. var notification = await CreateNotification();
  4. Console.WriteLine($"Publish {notification.Name},ThreadId:{Environment.CurrentManagedThreadId}");
  5. await mediator.Publish(notification);
  6. Console.WriteLine($"总耗时:{sw.ElapsedMilliseconds}ms");
  7. // Publish GenericNotification,ThreadId:11
  8. // GenericANotificationHandler GenericNotification,ThreadId:11
  9. // GenericBNotificationHandler GenericNotification,ThreadId:11
  10. // 总耗时:4015ms
复制代码
3. 分析一下结果



  • 首先业务需求是很好的实现了
  • 虽然全都用了异步,却感觉是同步执行
  • 连实际线程ID都是一样一样的
  • 耗时更是惨不忍睹,如果发布一个事件会拖累主业务,那谁还敢用事件呢?
  • 有人可能会说去掉Publish的await就行了,不await你能保证处理器都全部能执行完吗?
  • 为此笔者手搓一个事件总线,来解决这个问题
四、手搓事件总线

1. 测试代码



  • 还是使用GenericNotification表示事件
  • 依然使用CreateNotification()表示主业务
  • A和B两个处理器表示其他模块业务订阅该事件,并进行异步处理
  • 这次用的是ITaskEventHandler接口,表示事件异步处理器
  1. record GenericNotification(string Name) : INotification;
  2. static async Task<GenericNotification> CreateNotification()
  3. {
  4.     var notification = new GenericNotification("GenericNotification");
  5.     await Task.Delay(1000);
  6.     return await Task.FromResult(notification);
  7. }
  8. internal class AEventHandler : ITaskEventHandler<GenericNotification>
  9. {
  10.     public async Task TaskHandle(GenericNotification @event, CancellationToken cancellationToken)
  11.     {
  12.         Console.WriteLine($"AEventHandler {@event.Name},ThreadId:{Environment.CurrentManagedThreadId}");
  13.         await Task.Delay(1000, cancellationToken);
  14.     }
  15. }
  16. internal class BEventHandler : ITaskEventHandler<GenericNotification>
  17. {
  18.     public async Task TaskHandle(GenericNotification @event, CancellationToken cancellationToken)
  19.     {
  20.         Console.WriteLine($"BEventHandler {@event.Name},ThreadId:{Environment.CurrentManagedThreadId}");
  21.         await Task.Delay(1000, cancellationToken);
  22.     }
  23. }
复制代码
2. 运行一下



  • 使用IEventBus的Publish同步方法发布事件
  1. var eventBus = serviceCollection.GetRequiredService<IEventBus>();
  2. var sw = Stopwatch.StartNew();
  3. var notification = await CreateNotification();
  4. Console.WriteLine($"Publish {notification.Name},ThreadId:{Environment.CurrentManagedThreadId}");
  5. eventBus.Publish(notification);
  6. Console.WriteLine($"总耗时:{sw.ElapsedMilliseconds}ms");
  7. // Publish GenericNotification,ThreadId:11
  8. // 总耗时:1008ms
  9. // AEventHandler GenericNotification,ThreadId:10
  10. // BEventHandler GenericNotification,ThreadId:11
复制代码
3. 分析一下结果



  • 业务需求同样是很好的实现了
  • IMediator换成了IEventBus
  • 但耗时只有1秒钟,也就是说没有拖慢主业务
  • 前面显示事件处理的日志都在主业务之后,更说明是异步处理的
五. 揭秘手搓事件总线

1. IEventBus的Publish是同步方法



  • 但实际内部是异步处理的
  • 是调用自定义线程池来执行的
  • 是不是有点拗口,同步方法实际是异步处理
  • MediatR的Publish异步方法,实际要同步处理
  1. public interface IEventBus
  2. {
  3.     void Publish<TEvent>(TEvent @event);
  4. }
复制代码
2. EventBus配置代码



  • 使用ScanEventHandler扫描并注册事件处理器
  • 通过EventBusOptions配置EventBus内置线程池的并发
  • 通过内置线程池给事件处理单独提供了线程资源和并发控制
  • 既可以保障事件处理,还可以避免事件处理耗完整个程序CPU资源
  • 最后注册EventBus
  1. var serviceCollection = new ServiceCollection()
  2.     .ScanEventHandler(ServiceLifetime.Scoped, Assembly.GetExecutingAssembly())
  3.     .AddSingleton(new EventBusOptions { ConcurrencyLevel = 10 })
  4.     .AddEventBus<EventBus>();
复制代码
3. 支持同步事件处理



  • IEventHandler是同步事件处理接口
  • 一般最好涉及IO操作才用异步
  • 只是简单操作用同步性能更好
  • 如果是CPU密集操作就需要单独调用自定义线程池来排队执行
  • 避免拖垮整个程序
  1. internal class CEventHandler : IEventHandler<GenericNotification>
  2. {
  3.     public void Handle(GenericNotification @event)
  4.     {
  5.         Console.WriteLine($"CEventHandler {@event.Name},ThreadId:{Environment.CurrentManagedThreadId}");
  6.     }
  7. }
复制代码
4. 同步执行结果
  1. Publish GenericNotification,ThreadId:11
  2. 总耗时:1009ms
  3. AEventHandler GenericNotification,ThreadId:10
  4. BEventHandler GenericNotification,ThreadId:11
  5. CEventHandler GenericNotification,ThreadId:10
复制代码
5. 手搓事件总线的边缘



  • 支持net4.5+和netstandard1.1+,也就是说非.net core也支持
  • 但是ServiceCollection支持的没有这么全啊
  • 因此提供了Hand.EventDictionaryProvider,不依赖IOC环境
5.1 使用代码



  • EventBus也支持EventHandlerDictionaryProvider
  • 只是注册事件处理器没有IOC方便,但至少能用
  1. EventHandlerDictionaryProvider provider = new();
  2. var handler = new Handler();
  3. provider.AddHandler(handler);
  4. EventBus bus = new(provider, new EventBusOptions { ConcurrencyLevel = 1 });
复制代码
6. 手搓事件总线并发控制测试

6.1 事件处理代码



  • Handler是同步事件处理器
  • TaskHandler是异步事件处理器
  • 事件是string类型,手搓事件总线的事件类型没有限制
  • 这也是和MediatR不一样的地方,MediatR只能用INotification接口表示事件
  • 当然实际项目最好定义事件接口或者基类,便于管理和维护
  • 从技术角度是不需要限制事件的类型
  1. internal class Handler(ITestOutputHelper output) : IEventHandler<string>
  2. {
  3.     private readonly ITestOutputHelper _output = output;
  4.     public void Handle(string @event)
  5.     {
  6.         Thread.Sleep(10);
  7.         _output.WriteLine($"Thread{Environment.CurrentManagedThreadId} Handler: {@event},{DateTime.Now:HH:mm:ss.fff}");
  8.     }
  9. }
  10. internal class TaskHandler(ITestOutputHelper output) : ITaskEventHandler<string>
  11. {
  12.     private readonly ITestOutputHelper _output = output;
  13.     public async Task TaskHandle(string @event, CancellationToken cancellationToken)
  14.     {
  15.         await Task.Delay(10, cancellationToken);
  16.         _output.WriteLine($"Thread{Environment.CurrentManagedThreadId} TaskHandler: {@event},{DateTime.Now:HH:mm:ss.fff}");
  17.     }
  18. }
复制代码
6.2 单并发测试



  • ConcurrencyLevel配置为1,连续发布20个事件
  • Handler都是Thread10在执行
  • TaskHandler虽然在多个线程上执行,但基本和Handler是交替执行
  • 由于TaskHandler异步线程是由系统默认线程池执行的,无法完全控制线程ID
  • 但是内置的线程池控制了异步线程的开始和结束,所以也控制住了异步事件处理器的并发
  1. EventHandlerDictionaryProvider provider = new();
  2. EventBus bus = new(provider, new EventBusOptions { ConcurrencyLevel = 1 });
  3. var handler = new Handler(_output);
  4. provider.AddHandler(handler);
  5. var taskHandler = new TaskHandler(_output);
  6. provider.AddEventHandler(taskHandler);
  7. for (int i = 0; i < 20; i++)
  8.     bus.Publish("Event" + i);
  9. _output.WriteLine($"Thread{Environment.CurrentManagedThreadId} Sleep");
  10. // Thread15 Sleep
  11. // Thread11 TaskHandler: Event0,01:41:41.633
  12. // Thread10 Handler: Event0,01:41:41.647
  13. // Thread10 Handler: Event1,01:41:41.663
  14. // Thread11 TaskHandler: Event1,01:41:41.663
  15. // Thread10 Handler: Event2,01:41:41.679
  16. // Thread11 TaskHandler: Event2,01:41:41.679
  17. // Thread10 Handler: Event3,01:41:41.695
  18. // Thread11 TaskHandler: Event3,01:41:41.695
  19. // Thread10 Handler: Event4,01:41:41.711
  20. // Thread11 TaskHandler: Event4,01:41:41.711
  21. // Thread10 Handler: Event5,01:41:41.726
  22. // Thread11 TaskHandler: Event5,01:41:41.726
  23. // Thread10 Handler: Event6,01:41:41.742
  24. // Thread11 TaskHandler: Event6,01:41:41.742
  25. // Thread10 Handler: Event7,01:41:41.758
  26. // Thread11 TaskHandler: Event7,01:41:41.758
  27. // Thread10 Handler: Event8,01:41:41.774
  28. // Thread11 TaskHandler: Event8,01:41:41.774
  29. // Thread10 Handler: Event9,01:41:41.790
  30. // Thread11 TaskHandler: Event9,01:41:41.790
  31. // Thread10 Handler: Event10,01:41:41.806
  32. // Thread11 TaskHandler: Event10,01:41:41.806
  33. // Thread10 Handler: Event11,01:41:41.822
  34. // Thread11 TaskHandler: Event11,01:41:41.822
  35. // Thread10 Handler: Event12,01:41:41.838
  36. // Thread11 TaskHandler: Event12,01:41:41.838
  37. // Thread10 Handler: Event13,01:41:41.854
  38. // Thread11 TaskHandler: Event13,01:41:41.854
  39. // Thread10 Handler: Event14,01:41:41.870
  40. // Thread11 TaskHandler: Event14,01:41:41.870
  41. // Thread10 Handler: Event15,01:41:41.886
  42. // Thread11 TaskHandler: Event15,01:41:41.886
  43. // Thread10 Handler: Event16,01:41:41.902
  44. // Thread32 TaskHandler: Event16,01:41:41.902
  45. // Thread10 Handler: Event17,01:41:41.918
  46. // Thread32 TaskHandler: Event17,01:41:41.918
  47. // Thread10 Handler: Event18,01:41:41.934
  48. // Thread32 TaskHandler: Event18,01:41:41.934
  49. // Thread10 Handler: Event19,01:41:41.950
  50. // Thread32 TaskHandler: Event19,01:41:41.950
复制代码
6.3 多并发测试



  • 这次ConcurrencyLevel配置为4,连续发布100个事件
  • Handler都是8、11、31和32等4个线程在执行,且每个线程都执行了25次
  • TaskHandler和Handler还是交替执行,说明并发控制没有问题
  • 另外笔者抽空再补一篇文章,专门介绍手搓线程池的使用和原理
  1. EventHandlerDictionaryProvider provider = new();
  2. EventBus bus = new(provider, new EventBusOptions { ConcurrencyLevel = 4 });
  3. var handler = new Handler(_output);
  4. provider.AddHandler(handler);
  5. var taskHandler = new TaskHandler(_output);
  6. provider.AddEventHandler(taskHandler);
  7. for (int i = 0; i < 100; i++)
  8.     bus.Publish("Event" + i);
  9. _output.WriteLine($"Thread{Environment.CurrentManagedThreadId} Sleep");
  10. // Thread16 Sleep
  11. // Thread34 TaskHandler: Event3,01:52:53.795
  12. // Thread31 Handler: Event2,01:52:53.794
  13. // Thread36 TaskHandler: Event0,01:52:53.795
  14. // Thread32 Handler: Event1,01:52:53.794
  15. // Thread8 Handler: Event3,01:52:53.794
  16. // Thread33 TaskHandler: Event1,01:52:53.795
  17. // Thread35 TaskHandler: Event2,01:52:53.795
  18. // Thread11 Handler: Event0,01:52:53.794
  19. // Thread8 Handler: Event4,01:52:53.810
  20. // Thread32 Handler: Event6,01:52:53.810
  21. // Thread31 Handler: Event5,01:52:53.810
  22. // Thread11 Handler: Event7,01:52:53.810
  23. // Thread33 TaskHandler: Event7,01:52:53.810
  24. // Thread33 TaskHandler: Event4,01:52:53.810
  25. // Thread34 TaskHandler: Event5,01:52:53.810
  26. // Thread35 TaskHandler: Event6,01:52:53.810
  27. // Thread8 Handler: Event8,01:52:53.826
  28. // Thread32 Handler: Event11,01:52:53.826
  29. // Thread31 Handler: Event10,01:52:53.826
  30. // Thread11 Handler: Event9,01:52:53.826
  31. // Thread34 TaskHandler: Event9,01:52:53.826
  32. // Thread33 TaskHandler: Event10,01:52:53.826
  33. // Thread34 TaskHandler: Event8,01:52:53.826
  34. // Thread35 TaskHandler: Event11,01:52:53.826
  35. // Thread31 Handler: Event14,01:52:53.841
  36. // Thread11 Handler: Event13,01:52:53.841
  37. // Thread8 Handler: Event12,01:52:53.841
  38. // Thread32 Handler: Event15,01:52:53.841
  39. // Thread35 TaskHandler: Event13,01:52:53.841
  40. // Thread35 TaskHandler: Event12,01:52:53.841
  41. // Thread33 TaskHandler: Event15,01:52:53.841
  42. // Thread34 TaskHandler: Event14,01:52:53.841
  43. // Thread32 Handler: Event19,01:52:53.857
  44. // Thread34 TaskHandler: Event16,01:52:53.857
  45. // Thread31 Handler: Event16,01:52:53.857
  46. // Thread8 Handler: Event18,01:52:53.857
  47. // Thread11 Handler: Event17,01:52:53.857
  48. // Thread33 TaskHandler: Event19,01:52:53.857
  49. // Thread34 TaskHandler: Event18,01:52:53.857
  50. // Thread35 TaskHandler: Event17,01:52:53.857
  51. // Thread11 Handler: Event22,01:52:53.873
  52. // Thread31 Handler: Event23,01:52:53.873
  53. // Thread8 Handler: Event21,01:52:53.873
  54. // Thread35 TaskHandler: Event23,01:52:53.873
  55. // Thread32 Handler: Event20,01:52:53.873
  56. // Thread35 TaskHandler: Event20,01:52:53.873
  57. // Thread33 TaskHandler: Event22,01:52:53.873
  58. // Thread34 TaskHandler: Event21,01:52:53.873
  59. // Thread8 Handler: Event25,01:52:53.889
  60. // Thread11 Handler: Event26,01:52:53.889
  61. // Thread32 Handler: Event27,01:52:53.889
  62. // Thread31 Handler: Event24,01:52:53.889
  63. // Thread33 TaskHandler: Event27,01:52:53.889
  64. // Thread34 TaskHandler: Event25,01:52:53.889
  65. // Thread35 TaskHandler: Event26,01:52:53.889
  66. // Thread36 TaskHandler: Event24,01:52:53.889
  67. // Thread11 Handler: Event29,01:52:53.905
  68. // Thread31 Handler: Event28,01:52:53.905
  69. // Thread8 Handler: Event30,01:52:53.905
  70. // Thread32 Handler: Event31,01:52:53.905
  71. // Thread36 TaskHandler: Event31,01:52:53.905
  72. // Thread36 TaskHandler: Event29,01:52:53.905
  73. // Thread35 TaskHandler: Event28,01:52:53.905
  74. // Thread34 TaskHandler: Event30,01:52:53.905
  75. // Thread32 Handler: Event32,01:52:53.920
  76. // Thread11 Handler: Event35,01:52:53.920
  77. // Thread31 Handler: Event34,01:52:53.920
  78. // Thread8 Handler: Event33,01:52:53.920
  79. // Thread34 TaskHandler: Event35,01:52:53.920
  80. // Thread36 TaskHandler: Event34,01:52:53.920
  81. // Thread35 TaskHandler: Event32,01:52:53.920
  82. // Thread33 TaskHandler: Event33,01:52:53.920
  83. // Thread32 Handler: Event39,01:52:53.935
  84. // Thread35 TaskHandler: Event39,01:52:53.935
  85. // Thread8 Handler: Event38,01:52:53.935
  86. // Thread11 Handler: Event37,01:52:53.935
  87. // Thread31 Handler: Event36,01:52:53.935
  88. // Thread33 TaskHandler: Event38,01:52:53.935
  89. // Thread36 TaskHandler: Event36,01:52:53.935
  90. // Thread37 TaskHandler: Event37,01:52:53.936
  91. // Thread11 Handler: Event42,01:52:53.951
  92. // Thread8 Handler: Event40,01:52:53.951
  93. // Thread32 Handler: Event43,01:52:53.951
  94. // Thread37 TaskHandler: Event41,01:52:53.951
  95. // Thread31 Handler: Event41,01:52:53.951
  96. // Thread33 TaskHandler: Event42,01:52:53.951
  97. // Thread37 TaskHandler: Event40,01:52:53.951
  98. // Thread36 TaskHandler: Event43,01:52:53.951
  99. // Thread11 Handler: Event45,01:52:53.967
  100. // Thread31 Handler: Event47,01:52:53.967
  101. // Thread32 Handler: Event46,01:52:53.967
  102. // Thread35 TaskHandler: Event44,01:52:53.967
  103. // Thread8 Handler: Event44,01:52:53.967
  104. // Thread36 TaskHandler: Event47,01:52:53.967
  105. // Thread37 TaskHandler: Event46,01:52:53.967
  106. // Thread33 TaskHandler: Event45,01:52:53.967
  107. // Thread11 Handler: Event48,01:52:53.983
  108. // Thread8 Handler: Event49,01:52:53.983
  109. // Thread31 Handler: Event50,01:52:53.983
  110. // Thread33 TaskHandler: Event51,01:52:53.983
  111. // Thread32 Handler: Event51,01:52:53.983
  112. // Thread33 TaskHandler: Event48,01:52:53.983
  113. // Thread36 TaskHandler: Event49,01:52:53.983
  114. // Thread37 TaskHandler: Event50,01:52:53.983
  115. // Thread11 Handler: Event53,01:52:53.999
  116. // Thread31 Handler: Event55,01:52:53.999
  117. // Thread32 Handler: Event54,01:52:53.999
  118. // Thread8 Handler: Event52,01:52:53.999
  119. // Thread33 TaskHandler: Event55,01:52:53.999
  120. // Thread33 TaskHandler: Event53,01:52:53.999
  121. // Thread37 TaskHandler: Event54,01:52:53.999
  122. // Thread36 TaskHandler: Event52,01:52:53.999
  123. // Thread31 Handler: Event58,01:52:54.015
  124. // Thread11 Handler: Event59,01:52:54.015
  125. // Thread8 Handler: Event57,01:52:54.015
  126. // Thread32 Handler: Event56,01:52:54.015
  127. // Thread37 TaskHandler: Event56,01:52:54.015
  128. // Thread36 TaskHandler: Event59,01:52:54.015
  129. // Thread37 TaskHandler: Event57,01:52:54.015
  130. // Thread33 TaskHandler: Event58,01:52:54.015
  131. // Thread8 Handler: Event61,01:52:54.031
  132. // Thread32 Handler: Event63,01:52:54.031
  133. // Thread31 Handler: Event60,01:52:54.031
  134. // Thread11 Handler: Event62,01:52:54.031
  135. // Thread33 TaskHandler: Event63,01:52:54.032
  136. // Thread36 TaskHandler: Event60,01:52:54.032
  137. // Thread37 TaskHandler: Event61,01:52:54.032
  138. // Thread35 TaskHandler: Event62,01:52:54.032
  139. // Thread11 Handler: Event64,01:52:54.046
  140. // Thread31 Handler: Event67,01:52:54.046
  141. // Thread8 Handler: Event65,01:52:54.046
  142. // Thread35 TaskHandler: Event67,01:52:54.047
  143. // Thread37 TaskHandler: Event65,01:52:54.047
  144. // Thread32 Handler: Event66,01:52:54.046
  145. // Thread36 TaskHandler: Event66,01:52:54.047
  146. // Thread33 TaskHandler: Event64,01:52:54.047
  147. // Thread31 Handler: Event68,01:52:54.062
  148. // Thread32 Handler: Event71,01:52:54.062
  149. // Thread11 Handler: Event69,01:52:54.062
  150. // Thread8 Handler: Event70,01:52:54.062
  151. // Thread35 TaskHandler: Event71,01:52:54.062
  152. // Thread36 TaskHandler: Event68,01:52:54.063
  153. // Thread33 TaskHandler: Event70,01:52:54.063
  154. // Thread37 TaskHandler: Event69,01:52:54.063
  155. // Thread32 Handler: Event72,01:52:54.078
  156. // Thread11 Handler: Event75,01:52:54.078
  157. // Thread8 Handler: Event73,01:52:54.078
  158. // Thread31 Handler: Event74,01:52:54.078
  159. // Thread33 TaskHandler: Event75,01:52:54.078
  160. // Thread37 TaskHandler: Event74,01:52:54.078
  161. // Thread36 TaskHandler: Event73,01:52:54.078
  162. // Thread34 TaskHandler: Event72,01:52:54.078
  163. // Thread8 Handler: Event79,01:52:54.094
  164. // Thread32 Handler: Event78,01:52:54.094
  165. // Thread31 Handler: Event77,01:52:54.094
  166. // Thread11 Handler: Event76,01:52:54.094
  167. // Thread34 TaskHandler: Event79,01:52:54.094
  168. // Thread36 TaskHandler: Event76,01:52:54.094
  169. // Thread34 TaskHandler: Event77,01:52:54.094
  170. // Thread37 TaskHandler: Event78,01:52:54.094
  171. // Thread32 Handler: Event82,01:52:54.110
  172. // Thread8 Handler: Event80,01:52:54.110
  173. // Thread11 Handler: Event83,01:52:54.110
  174. // Thread31 Handler: Event81,01:52:54.110
  175. // Thread37 TaskHandler: Event83,01:52:54.110
  176. // Thread36 TaskHandler: Event81,01:52:54.110
  177. // Thread34 TaskHandler: Event82,01:52:54.110
  178. // Thread33 TaskHandler: Event80,01:52:54.110
  179. // Thread11 Handler: Event87,01:52:54.126
  180. // Thread31 Handler: Event85,01:52:54.126
  181. // Thread8 Handler: Event86,01:52:54.126
  182. // Thread32 Handler: Event84,01:52:54.126
  183. // Thread36 TaskHandler: Event86,01:52:54.126
  184. // Thread34 TaskHandler: Event87,01:52:54.126
  185. // Thread33 TaskHandler: Event84,01:52:54.126
  186. // Thread37 TaskHandler: Event85,01:52:54.126
  187. // Thread11 Handler: Event90,01:52:54.142
  188. // Thread31 Handler: Event89,01:52:54.142
  189. // Thread8 Handler: Event88,01:52:54.142
  190. // Thread32 Handler: Event91,01:52:54.142
  191. // Thread33 TaskHandler: Event89,01:52:54.142
  192. // Thread37 TaskHandler: Event91,01:52:54.142
  193. // Thread34 TaskHandler: Event88,01:52:54.142
  194. // Thread36 TaskHandler: Event90,01:52:54.142
  195. // Thread11 Handler: Event94,01:52:54.158
  196. // Thread8 Handler: Event92,01:52:54.158
  197. // Thread31 Handler: Event95,01:52:54.158
  198. // Thread32 Handler: Event93,01:52:54.158
  199. // Thread37 TaskHandler: Event94,01:52:54.158
  200. // Thread36 TaskHandler: Event95,01:52:54.158
  201. // Thread34 TaskHandler: Event93,01:52:54.158
  202. // Thread33 TaskHandler: Event92,01:52:54.159
  203. // Thread8 Handler: Event98,01:52:54.174
  204. // Thread11 Handler: Event99,01:52:54.174
  205. // Thread32 Handler: Event97,01:52:54.174
  206. // Thread31 Handler: Event96,01:52:54.174
  207. // Thread33 TaskHandler: Event97,01:52:54.174
  208. // Thread34 TaskHandler: Event99,01:52:54.174
  209. // Thread37 TaskHandler: Event96,01:52:54.174
  210. // Thread36 TaskHandler: Event98,01:52:54.175
复制代码
7. 集成事件的探讨



  • 集成事件就是分布式事件,跨进程的事件
  • 这种事件肯定要借助消息队列中间件
  • 笔者以为做一个事件转发器(继承事件处理接口)就可以了,手写也并不复杂
  • 如何封装事件转发器笔者还没考虑清楚
  • 下次考虑清楚了再发一篇文章
8. 总结



  • IEventBus作为事件总线和事件发布者
  • IEventHandler和ITaskEventHandler作为
  • IEventHandlerProvider用于查找事件处理器,支持非IOC环境
  • EventDispatcher是事件发布者包含在EventBus内部
  • 用于调用内置线程池发布事件和跟踪异步事件执行完成()
六. 手搓事件总线非常轻量级

1. MediatR已经非常轻量级了



  • MediatR.Contracts.dll是7K
  • MediatR.dll是76K
2. 手搓事件总线更轻量级



  • 本示例用到手搓的6个类库,共61K
  • Hand.EventBuses.dll是9K
  • Hand.EventServiceProvider.dll是6K
  • Hand.Core.dll是14K
  • Hand.Tasks.dll是13K
  • Hand.Reflection.dll是11K
  • Hand.Job.dll是9K
  • 其中只有Hand.EventBuses.dll和Hand.EventServiceProvider.dll是专用事件总线的,共15K
  • 其他4个类库是《手搓》系列的通用类库,可以复用与其他场景
七、插一个异步的题外话

1. 模拟一个异步业务场景



  • 有3个异步逻辑的结果聚合
  • 每个异步逻辑都需要1秒钟
  1. public static async Task<int> Sum(params int[] input)
  2. {
  3.     await Task.Delay(1000);
  4.     return input.Sum();
  5. }
  6. public static async Task<int> One()
  7. {
  8.     await Task.Delay(1000);
  9.     return 1;
  10. }
  11. public static async Task<int> Two()
  12. {
  13.     await Task.Delay(1000);
  14.     return 2;
  15. }
  16. public static async Task<int> Tree()
  17. {
  18.     await Task.Delay(1000);
  19.     return 3;
  20. }
复制代码
2. 先给一个错误的示范
  1. var sw = Stopwatch.StartNew();
  2. var one = await One();
  3. var two = await Two();
  4. var tree = await Tree();
  5. var sum = await Sum(one, two, tree);
  6. Assert.Equal(6, sum);
  7. sw.Stop();
  8. _output.WriteLine($"Async Total Time: {sw.ElapsedMilliseconds} ms");
  9. // Async Total Time: 4020 ms
复制代码


  • 以上代码看着很漂亮,但是实际执行时间是4秒
  • 因为每个异步逻辑都是顺序执行的
  • 这显然不是我们想要的结果
3. 可以这么优化
  1. var sw = Stopwatch.StartNew();
  2. var oneTask = One();
  3. var twoTask = Two();
  4. var treeTask = Tree();
  5. var list = await Task.WhenAll(oneTask, twoTask, treeTask);
  6. var sum = await Sum(list);
  7. Assert.Equal(6, sum);
  8. sw.Stop();
  9. _output.WriteLine($"WhenAll Async Total Time: {sw.ElapsedMilliseconds} ms");
  10. // WhenAll Async Total Time: 2016 ms
复制代码


  • 以上代码耗时2秒
  • 因为我们将异步逻辑并行执行了
  • 很多时候不要看到Task就加await
  • 更别觉得有Task有await,就是异步了
4. 不用WhenAll可以这么写
  1. var sw = Stopwatch.StartNew();
  2. var oneTask = One();
  3. var twoTask = Two();
  4. var treeTask = Tree();
  5. var one = await oneTask;
  6. var two = await twoTask;
  7. var tree = await treeTask;
  8. var sum = await Sum(one, two, tree);
  9. Assert.Equal(6, sum);
  10. sw.Stop();
  11. _output.WriteLine($"Parallel Async Total Time: {sw.ElapsedMilliseconds} ms");
  12. // Parallel Async Total Time: 2017 ms
复制代码


  • 为什么要说这个题外话
  • 因为我看到太多项目是这样子,甚至在循环里面逐个await
  • 虽然看上去是一手漂亮的代码,但是执行效率却很低
  • 希望说破这个事情,能帮到一部分人
另外源码托管地址: https://github.com/donetsoftwork/HandCore.net ,欢迎大家直接查看源码。
gitee同步更新:https://gitee.com/donetsoftwork/HandCore.net
如果大家喜欢请动动您发财的小手手帮忙点一下Star,谢谢!!!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册