找回密码
 立即注册
首页 业界区 业界 命令模式的深度解析:从标准实现到TPL Dataflow高性能架 ...

命令模式的深度解析:从标准实现到TPL Dataflow高性能架构

愤血冒 2025-10-1 17:17:19
命令模式是对一类对象公共操作的抽象,它们具有相同的方法签名,所以具有类似的操作,可以被抽象出来,成为一个抽象的命令对象。实际操作的调用者就不是和一组对象打交道,它是需要以来这个命令对象的方法签名,并根据这个签名调用相关的方法。
以上是命令模式的大概含义,这里可以联想到事件驱动,command和handler,也可以联想到AOP的思想。联想到数据流的操作我就写了个数据流操作类库。
1.jpeg

2.jpeg

之前写了一些有关AOP的,但是感觉还是差点意思,补上这次的可能在项目中会弥补一些短板回来,就是灵活性。
但是该项目重点是数据流的处理,所以web端来实现只是一个例子,大量数据的处理最主要的是后台任务吧,通过接口调用只是一个实例展示。
有关数据流这块代码核心如下:
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reflection;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using System.Threading.Tasks.Dataflow;
  9. using Microsoft.Extensions.DependencyInjection;
  10. using Microsoft.Extensions.Logging;
  11. using Common.Bus.Core;
  12. using Common.Bus.Monitoring;
  13. namespace Common.Bus.Implementations
  14. {
  15.     /// <summary>
  16.     /// 基于TPL数据流的高性能CommandBus实现
  17.     /// 支持并行处理、背压控制和监控
  18.     /// </summary>
  19.     public class DataflowCommandBus : ICommandBus, IDisposable
  20.     {
  21.         private readonly IServiceProvider _provider;
  22.         private readonly ILogger<DataflowCommandBus>? _logger;
  23.         private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new();
  24.         private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new();
  25.         
  26.         // 数据流网络
  27.         private ActionBlock<DataflowCommandRequest> _commandProcessor = null!;
  28.         
  29.         // 背压控制
  30.         private readonly SemaphoreSlim _concurrencyLimiter;
  31.         private readonly int _maxConcurrency;
  32.         
  33.         // 监控指标
  34.         private long _processedCommands;
  35.         private long _failedCommands;
  36.         private long _totalProcessingTime;
  37.         public DataflowCommandBus(IServiceProvider serviceProvider, ILogger<DataflowCommandBus>? logger = null,
  38.             int? maxConcurrency = null)
  39.         {
  40.             _provider = serviceProvider;
  41.             _logger = logger;
  42.             _maxConcurrency = maxConcurrency ?? Environment.ProcessorCount * 2;
  43.             _concurrencyLimiter = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
  44.             
  45.             // 创建数据流网络
  46.             CreateDataflowNetwork();
  47.         }
  48.         private void CreateDataflowNetwork()
  49.         {
  50.             // 创建命令处理器
  51.             _commandProcessor = new ActionBlock<DataflowCommandRequest>(
  52.                 async request =>
  53.                 {
  54.                     try
  55.                     {
  56.                         await _concurrencyLimiter.WaitAsync();
  57.                         var startTime = DateTime.UtcNow;
  58.                         
  59.                         // 执行完整的命令处理管道
  60.                         var result = await ProcessCommandPipeline(request);
  61.                         
  62.                         var processingTime = DateTime.UtcNow - startTime;
  63.                         Interlocked.Add(ref _totalProcessingTime, processingTime.Ticks);
  64.                         Interlocked.Increment(ref _processedCommands);
  65.                         
  66.                         request.TaskCompletionSource.SetResult(result);
  67.                     }
  68.                     catch (Exception ex)
  69.                     {
  70.                         Interlocked.Increment(ref _failedCommands);
  71.                         _logger?.LogError(ex, "Command processing failed for {CommandType}", request.CommandType.Name);
  72.                         request.TaskCompletionSource.SetException(ex);
  73.                     }
  74.                     finally
  75.                     {
  76.                         _concurrencyLimiter.Release();
  77.                     }
  78.                 },
  79.                 new ExecutionDataflowBlockOptions
  80.                 {
  81.                     MaxDegreeOfParallelism = _maxConcurrency,
  82.                     BoundedCapacity = _maxConcurrency * 2
  83.                 });
  84.         }
  85.         public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default)
  86.             where TCommand : ICommand<TResult>
  87.         {
  88.             var commandType = typeof(TCommand);
  89.             var requestId = Guid.NewGuid();
  90.             var tcs = new TaskCompletionSource<object>();
  91.             
  92.             var request = new DataflowCommandRequest(requestId, commandType, typeof(TResult), command, tcs);
  93.             
  94.             // 发送到数据流网络
  95.             if (!_commandProcessor.Post(request))
  96.             {
  97.                 throw new InvalidOperationException("Unable to queue command for processing - system may be overloaded");
  98.             }
  99.             
  100.             try
  101.             {
  102.                 var result = await tcs.Task.WaitAsync(ct);
  103.                 return (TResult)result;
  104.             }
  105.             catch (OperationCanceledException) when (ct.IsCancellationRequested)
  106.             {
  107.                 _logger?.LogWarning("Command {CommandType} was cancelled", commandType.Name);
  108.                 throw;
  109.             }
  110.         }
  111.         private async Task<object> ProcessCommandPipeline(DataflowCommandRequest request)
  112.         {
  113.             // 使用反射调用泛型方法
  114.             var method = typeof(DataflowCommandBus).GetMethod(nameof(ProcessCommandPipelineGeneric), BindingFlags.NonPublic | BindingFlags.Instance);
  115.             var genericMethod = method!.MakeGenericMethod(request.CommandType, request.ResultType);
  116.             
  117.             var task = (Task)genericMethod.Invoke(this, new object[] { request })!;
  118.             await task;
  119.             
  120.             var resultProperty = task.GetType().GetProperty("Result");
  121.             return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");
  122.         }
  123.         private async Task<TResult> ProcessCommandPipelineGeneric<TCommand, TResult>(DataflowCommandRequest request)
  124.             where TCommand : ICommand<TResult>
  125.         {
  126.             // 获取处理器和行为的工厂函数
  127.             var handlerFactory = GetCachedHandler<TCommand, TResult>(request.CommandType);
  128.             var behaviorsFactory = GetCachedBehaviors<TCommand, TResult>(request.CommandType);
  129.             
  130.             // 创建处理器和行为的实例
  131.             var handler = handlerFactory();
  132.             var behaviors = behaviorsFactory();
  133.             
  134.             // 构建处理管道
  135.             Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command);
  136.             
  137.             // 按顺序应用管道行为
  138.             foreach (var behavior in behaviors.Reverse())
  139.             {
  140.                 var currentBehavior = behavior;
  141.                 var currentPipeline = pipeline;
  142.                 pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline);
  143.             }
  144.             
  145.             return await pipeline();
  146.         }
  147.         private async Task<object> ExecuteBehavior<TCommand, TResult>(
  148.             ICommandPipelineBehavior<TCommand, TResult> behavior,
  149.             TCommand command,
  150.             Func<Task<TResult>> next)
  151.             where TCommand : ICommand<TResult>
  152.         {
  153.             try
  154.             {
  155.                 var result = await behavior.Handle(command, next, CancellationToken.None);
  156.                 return result!;
  157.             }
  158.             catch (Exception ex)
  159.             {
  160.                 throw new InvalidOperationException($"Error executing behavior {behavior.GetType().Name}: {ex.Message}", ex);
  161.             }
  162.         }
  163.         private Func<ICommandHandler<TCommand, TResult>> GetCachedHandler<TCommand, TResult>(Type commandType)
  164.             where TCommand : ICommand<TResult>
  165.         {
  166.             return (Func<ICommandHandler<TCommand, TResult>>)_handlerCache.GetOrAdd(commandType, _ =>
  167.             {
  168.                 return new Func<ICommandHandler<TCommand, TResult>>(() =>
  169.                 {
  170.                     using var scope = _provider.CreateScope();
  171.                     var handler = scope.ServiceProvider.GetService<ICommandHandler<TCommand, TResult>>();
  172.                     if (handler == null)
  173.                         throw new InvalidOperationException($"No handler registered for {commandType.Name}");
  174.                     return handler;
  175.                 });
  176.             });
  177.         }
  178.         private Func<ICommandPipelineBehavior<TCommand, TResult>[]> GetCachedBehaviors<TCommand, TResult>(Type commandType)
  179.             where TCommand : ICommand<TResult>
  180.         {
  181.             return (Func<ICommandPipelineBehavior<TCommand, TResult>[]>)_behaviorsCache.GetOrAdd(commandType, _ =>
  182.             {
  183.                 return new Func<ICommandPipelineBehavior<TCommand, TResult>[]>(() =>
  184.                 {
  185.                     using var scope = _provider.CreateScope();
  186.                     var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
  187.                     return behaviors;
  188.                 });
  189.             });
  190.         }
  191.         private async Task<TResult> ExecuteHandler<TCommand, TResult>(ICommandHandler<TCommand, TResult> handler, TCommand command)
  192.             where TCommand : ICommand<TResult>
  193.         {
  194.             return await handler.HandleAsync(command, CancellationToken.None);
  195.         }
  196.         private async Task<object> ExecuteHandler(object handler, object command)
  197.         {
  198.             var handlerType = handler.GetType();
  199.             var handleMethod = handlerType.GetMethod("HandleAsync");
  200.             
  201.             if (handleMethod == null)
  202.                 throw new InvalidOperationException($"Handler {handlerType.Name} does not have HandleAsync method");
  203.             var task = (Task)handleMethod.Invoke(handler, new object[] { command, CancellationToken.None })!;
  204.             await task;
  205.             
  206.             var resultProperty = task.GetType().GetProperty("Result");
  207.             return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");
  208.         }
  209.         private Func<object> GetCachedHandler(Type commandType)
  210.         {
  211.             return _handlerCache.GetOrAdd(commandType, _ =>
  212.             {
  213.                 // 获取命令类型实现的ICommand<TResult>接口
  214.                 var commandInterface = commandType.GetInterfaces()
  215.                     .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));
  216.                
  217.                 if (commandInterface == null)
  218.                     throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");
  219.                
  220.                 var resultType = commandInterface.GetGenericArguments()[0];
  221.                 var handlerType = typeof(ICommandHandler<,>).MakeGenericType(commandType, resultType);
  222.                
  223.                 // 返回一个工厂函数,而不是直接返回处理器实例
  224.                 return new Func<object>(() =>
  225.                 {
  226.                     using var scope = _provider.CreateScope();
  227.                     var handler = scope.ServiceProvider.GetService(handlerType);
  228.                     if (handler == null)
  229.                         throw new InvalidOperationException($"No handler registered for {commandType.Name}");
  230.                     return handler;
  231.                 });
  232.             });
  233.         }
  234.         private Func<object[]> GetCachedBehaviors(Type commandType)
  235.         {
  236.             return _behaviorsCache.GetOrAdd(commandType, _ =>
  237.             {
  238.                 // 获取命令类型实现的ICommand<TResult>接口
  239.                 var commandInterface = commandType.GetInterfaces()
  240.                     .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));
  241.                
  242.                 if (commandInterface == null)
  243.                     throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");
  244.                
  245.                 var resultType = commandInterface.GetGenericArguments()[0];
  246.                 var behaviorType = typeof(ICommandPipelineBehavior<,>).MakeGenericType(commandType, resultType);
  247.                
  248.                 // 返回一个工厂函数,而不是直接返回行为实例
  249.                 return new Func<object[]>(() =>
  250.                 {
  251.                     using var scope = _provider.CreateScope();
  252.                     var behaviors = scope.ServiceProvider.GetServices(behaviorType).Where(b => b != null).ToArray();
  253.                     return behaviors!;
  254.                 });
  255.             });
  256.         }
  257.         // 监控和统计方法
  258.         public DataflowMetrics GetMetrics()
  259.         {
  260.             return new DataflowMetrics
  261.             {
  262.                 ProcessedCommands = Interlocked.Read(ref _processedCommands),
  263.                 FailedCommands = Interlocked.Read(ref _failedCommands),
  264.                 TotalProcessingTime = TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime)),
  265.                 AverageProcessingTime = _processedCommands > 0
  266.                     ? TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime) / _processedCommands)
  267.                     : TimeSpan.Zero,
  268.                 AvailableConcurrency = _concurrencyLimiter.CurrentCount,
  269.                 MaxConcurrency = _maxConcurrency,
  270.                 InputQueueSize = _commandProcessor.InputCount
  271.             };
  272.         }
  273.         public void ClearCache()
  274.         {
  275.             _handlerCache.Clear();
  276.             _behaviorsCache.Clear();
  277.         }
  278.         public void Dispose()
  279.         {
  280.             _commandProcessor?.Complete();
  281.             _concurrencyLimiter?.Dispose();
  282.         }
  283.     }
  284.     // 辅助类
  285.     internal class DataflowCommandRequest
  286.     {
  287.         public Guid Id { get; }
  288.         public Type CommandType { get; }
  289.         public Type ResultType { get; }
  290.         public object Command { get; }
  291.         public TaskCompletionSource<object> TaskCompletionSource { get; }
  292.         public DataflowCommandRequest(Guid id, Type commandType, Type resultType, object command, TaskCompletionSource<object> tcs)
  293.         {
  294.             Id = id;
  295.             CommandType = commandType;
  296.             ResultType = resultType;
  297.             Command = command;
  298.             TaskCompletionSource = tcs;
  299.         }
  300.     }
  301. }
复制代码
 
这里如果不是数据流方式可以使用通用模式:

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. using Microsoft.Extensions.DependencyInjection;
  8. using Common.Bus.Core;
  9. namespace Common.Bus.Implementations
  10. {
  11.     public class CommandBus : ICommandBus
  12.     {
  13.         private readonly IServiceProvider _provider;
  14.         private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new();
  15.         private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new();
  16.         private readonly ConcurrentDictionary<Type, Func<object, object, CancellationToken, Task<object>>> _pipelineCache = new();
  17.         public CommandBus(IServiceProvider serviceProvider)
  18.         {
  19.             _provider = serviceProvider;
  20.         }
  21.         // 添加清理缓存的方法,用于测试或动态重新加载
  22.         public void ClearCache()
  23.         {
  24.             _handlerCache.Clear();
  25.             _behaviorsCache.Clear();
  26.             _pipelineCache.Clear();
  27.         }
  28.         public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default) where TCommand : ICommand<TResult>
  29.         {
  30.             var commandType = typeof(TCommand);
  31.             
  32.             // 获取缓存的Handler
  33.             var handler = GetCachedHandler<TCommand, TResult>(commandType);
  34.             
  35.             // 获取缓存的Pipeline
  36.             var pipeline = GetCachedPipeline<TCommand, TResult>(commandType);
  37.             
  38.             // 执行Pipeline
  39.             var result = await pipeline(handler, command, ct);
  40.             return (TResult)result;
  41.         }
  42.         private ICommandHandler<TCommand, TResult> GetCachedHandler<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
  43.         {
  44.             var handlerFactory = (Func<object>)_handlerCache.GetOrAdd(commandType, _ =>
  45.             {
  46.                 return new Func<object>(() =>
  47.                 {
  48.                     using var scope = _provider.CreateScope();
  49.                     var handler = scope.ServiceProvider.GetService(typeof(ICommandHandler<TCommand, TResult>));
  50.                     if (handler == null)
  51.                         throw new InvalidOperationException($"No handler registered for {commandType.Name}");
  52.                     return handler;
  53.                 });
  54.             });
  55.             return (ICommandHandler<TCommand, TResult>)handlerFactory();
  56.         }
  57.         private ICommandPipelineBehavior<TCommand, TResult>[] GetCachedBehaviors<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
  58.         {
  59.             var behaviorsFactory = (Func<object[]>)_behaviorsCache.GetOrAdd(commandType, _ =>
  60.             {
  61.                 return new Func<object[]>(() =>
  62.                 {
  63.                     using var scope = _provider.CreateScope();
  64.                     var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
  65.                     return behaviors.Cast<object>().ToArray();
  66.                 });
  67.             });
  68.             return behaviorsFactory().Cast<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
  69.         }
  70.         private Func<object, object, CancellationToken, Task<object>> GetCachedPipeline<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
  71.         {
  72.             return _pipelineCache.GetOrAdd(commandType, _ =>
  73.             {
  74.                 var behaviors = GetCachedBehaviors<TCommand, TResult>(commandType);
  75.                
  76.                 // 预构建Pipeline,避免每次调用时重新构建
  77.                 return async (handler, command, ct) =>
  78.                 {
  79.                     if (handler == null || command == null)
  80.                         throw new ArgumentNullException("Handler or command cannot be null");
  81.                         
  82.                     var typedHandler = (ICommandHandler<TCommand, TResult>)handler;
  83.                     var typedCommand = (TCommand)command;
  84.                     // 如果没有behaviors,直接调用handler
  85.                     if (behaviors.Length == 0)
  86.                     {
  87.                         var result = await typedHandler.HandleAsync(typedCommand, ct);
  88.                         return (object)result!;
  89.                     }
  90.                     // 使用递归方式构建pipeline,减少委托创建
  91.                     var pipelineResult = await ExecutePipeline(typedHandler, typedCommand, behaviors, 0, ct);
  92.                     return (object)pipelineResult!;
  93.                 };
  94.             });
  95.         }
  96.         private async Task<TResult> ExecutePipeline<TCommand, TResult>(
  97.             ICommandHandler<TCommand, TResult> handler,
  98.             TCommand command,
  99.             ICommandPipelineBehavior<TCommand, TResult>[] behaviors,
  100.             int behaviorIndex,
  101.             CancellationToken ct) where TCommand : ICommand<TResult>
  102.         {
  103.             if (behaviorIndex >= behaviors.Length)
  104.             {
  105.                 return await handler.HandleAsync(command, ct);
  106.             }
  107.             var behavior = behaviors[behaviorIndex];
  108.             return await behavior.Handle(command, () => ExecutePipeline(handler, command, behaviors, behaviorIndex + 1, ct), ct);
  109.         }
  110.     }
  111. }
复制代码
 
其他批量操作、带监控等模式就参考其他代码:
exercisebook/AOP/EventBusAOP/AopNew at main · liuzhixin405/exercisebook

一下是项目更详细介绍,如有错误多多指正:
# CommandBus AOP 项目
这是一个基于AOP(面向切面编程)的CommandBus项目,使用TPL Dataflow进行数据流处理优化,支持多种CommandBus实现和实时监控。
## CommandBus实现类型
### 1. Standard CommandBus- **类型**: `CommandBusType.Standard`- **特点**: 标准同步处理,适合简单场景- **控制器**: `StandardCommandBusController`
### 2. Dataflow CommandBus- **类型**: `CommandBusType.Dataflow`- **特点**: 基于TPL Dataflow的异步并发处理,适合高并发场景- **控制器**: `DataflowCommandBusController`
### 3. Batch Dataflow CommandBus- **类型**: `CommandBusType.BatchDataflow`- **特点**: 支持批量处理,适合大批量数据场景- **控制器**: `BatchDataflowCommandBusController`
### 4. Typed Dataflow CommandBus- **类型**: `CommandBusType.TypedDataflow`- **特点**: 强类型安全,适合复杂业务场景- **控制器**: `TypedDataflowCommandBusController`
### 5. Monitored CommandBus- **类型**: `CommandBusType.Monitored`- **特点**: 包含性能监控,适合生产环境- **控制器**: `MonitoredCommandBusController` 这里有一个扩展点behavior,可以注入前后时间,当前代代码只做了业务前的拦截,业务后的可以如法炮制。这样的话就是一个aop,那么跟aop切面编程又有什么区别和共同点呢?
  1.             // 构建处理管道
  2.             Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command);
  3.             
  4.             // 按顺序应用管道行为
  5.             foreach (var behavior in behaviors.Reverse())
  6.             {
  7.                 var currentBehavior = behavior;
  8.                 var currentPipeline = pipeline;
  9.                 pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline);
  10.             }
  11.             
  12.             return await pipeline();
  13.         }
复制代码
 
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册