愤血冒 发表于 2025-10-1 17:17:19

命令模式的深度解析:从标准实现到TPL Dataflow高性能架构

命令模式是对一类对象公共操作的抽象,它们具有相同的方法签名,所以具有类似的操作,可以被抽象出来,成为一个抽象的命令对象。实际操作的调用者就不是和一组对象打交道,它是需要以来这个命令对象的方法签名,并根据这个签名调用相关的方法。
以上是命令模式的大概含义,这里可以联想到事件驱动,command和handler,也可以联想到AOP的思想。联想到数据流的操作我就写了个数据流操作类库。


之前写了一些有关AOP的,但是感觉还是差点意思,补上这次的可能在项目中会弥补一些短板回来,就是灵活性。
但是该项目重点是数据流的处理,所以web端来实现只是一个例子,大量数据的处理最主要的是后台任务吧,通过接口调用只是一个实例展示。
有关数据流这块代码核心如下:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Common.Bus.Core;
using Common.Bus.Monitoring;

namespace Common.Bus.Implementations
{
    /// <summary>
    /// 基于TPL数据流的高性能CommandBus实现
    /// 支持并行处理、背压控制和监控
    /// </summary>
    public class DataflowCommandBus : ICommandBus, IDisposable
    {
      private readonly IServiceProvider _provider;
      private readonly ILogger<DataflowCommandBus>? _logger;
      private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new();
      private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new();
      
      // 数据流网络
      private ActionBlock<DataflowCommandRequest> _commandProcessor = null!;
      
      // 背压控制
      private readonly SemaphoreSlim _concurrencyLimiter;
      private readonly int _maxConcurrency;
      
      // 监控指标
      private long _processedCommands;
      private long _failedCommands;
      private long _totalProcessingTime;

      public DataflowCommandBus(IServiceProvider serviceProvider, ILogger<DataflowCommandBus>? logger = null,
            int? maxConcurrency = null)
      {
            _provider = serviceProvider;
            _logger = logger;
            _maxConcurrency = maxConcurrency ?? Environment.ProcessorCount * 2;
            _concurrencyLimiter = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
            
            // 创建数据流网络
            CreateDataflowNetwork();
      }

      private void CreateDataflowNetwork()
      {
            // 创建命令处理器
            _commandProcessor = new ActionBlock<DataflowCommandRequest>(
                async request =>
                {
                  try
                  {
                        await _concurrencyLimiter.WaitAsync();
                        var startTime = DateTime.UtcNow;
                        
                        // 执行完整的命令处理管道
                        var result = await ProcessCommandPipeline(request);
                        
                        var processingTime = DateTime.UtcNow - startTime;
                        Interlocked.Add(ref _totalProcessingTime, processingTime.Ticks);
                        Interlocked.Increment(ref _processedCommands);
                        
                        request.TaskCompletionSource.SetResult(result);
                  }
                  catch (Exception ex)
                  {
                        Interlocked.Increment(ref _failedCommands);
                        _logger?.LogError(ex, "Command processing failed for {CommandType}", request.CommandType.Name);
                        request.TaskCompletionSource.SetException(ex);
                  }
                  finally
                  {
                        _concurrencyLimiter.Release();
                  }
                },
                new ExecutionDataflowBlockOptions
                {
                  MaxDegreeOfParallelism = _maxConcurrency,
                  BoundedCapacity = _maxConcurrency * 2
                });
      }

      public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default)
            where TCommand : ICommand<TResult>
      {
            var commandType = typeof(TCommand);
            var requestId = Guid.NewGuid();
            var tcs = new TaskCompletionSource<object>();
            
            var request = new DataflowCommandRequest(requestId, commandType, typeof(TResult), command, tcs);
            
            // 发送到数据流网络
            if (!_commandProcessor.Post(request))
            {
                throw new InvalidOperationException("Unable to queue command for processing - system may be overloaded");
            }
            
            try
            {
                var result = await tcs.Task.WaitAsync(ct);
                return (TResult)result;
            }
            catch (OperationCanceledException) when (ct.IsCancellationRequested)
            {
                _logger?.LogWarning("Command {CommandType} was cancelled", commandType.Name);
                throw;
            }
      }

      private async Task<object> ProcessCommandPipeline(DataflowCommandRequest request)
      {
            // 使用反射调用泛型方法
            var method = typeof(DataflowCommandBus).GetMethod(nameof(ProcessCommandPipelineGeneric), BindingFlags.NonPublic | BindingFlags.Instance);
            var genericMethod = method!.MakeGenericMethod(request.CommandType, request.ResultType);
            
            var task = (Task)genericMethod.Invoke(this, new object[] { request })!;
            await task;
            
            var resultProperty = task.GetType().GetProperty("Result");
            return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");
      }

      private async Task<TResult> ProcessCommandPipelineGeneric<TCommand, TResult>(DataflowCommandRequest request)
            where TCommand : ICommand<TResult>
      {
            // 获取处理器和行为的工厂函数
            var handlerFactory = GetCachedHandler<TCommand, TResult>(request.CommandType);
            var behaviorsFactory = GetCachedBehaviors<TCommand, TResult>(request.CommandType);
            
            // 创建处理器和行为的实例
            var handler = handlerFactory();
            var behaviors = behaviorsFactory();
            
            // 构建处理管道
            Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command);
            
            // 按顺序应用管道行为
            foreach (var behavior in behaviors.Reverse())
            {
                var currentBehavior = behavior;
                var currentPipeline = pipeline;
                pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline);
            }
            
            return await pipeline();
      }

      private async Task<object> ExecuteBehavior<TCommand, TResult>(
            ICommandPipelineBehavior<TCommand, TResult> behavior,
            TCommand command,
            Func<Task<TResult>> next)
            where TCommand : ICommand<TResult>
      {
            try
            {
                var result = await behavior.Handle(command, next, CancellationToken.None);
                return result!;
            }
            catch (Exception ex)
            {
                throw new InvalidOperationException($"Error executing behavior {behavior.GetType().Name}: {ex.Message}", ex);
            }
      }

      private Func<ICommandHandler<TCommand, TResult>> GetCachedHandler<TCommand, TResult>(Type commandType)
            where TCommand : ICommand<TResult>
      {
            return (Func<ICommandHandler<TCommand, TResult>>)_handlerCache.GetOrAdd(commandType, _ =>
            {
                return new Func<ICommandHandler<TCommand, TResult>>(() =>
                {
                  using var scope = _provider.CreateScope();
                  var handler = scope.ServiceProvider.GetService<ICommandHandler<TCommand, TResult>>();
                  if (handler == null)
                        throw new InvalidOperationException($"No handler registered for {commandType.Name}");
                  return handler;
                });
            });
      }

      private Func<ICommandPipelineBehavior<TCommand, TResult>[]> GetCachedBehaviors<TCommand, TResult>(Type commandType)
            where TCommand : ICommand<TResult>
      {
            return (Func<ICommandPipelineBehavior<TCommand, TResult>[]>)_behaviorsCache.GetOrAdd(commandType, _ =>
            {
                return new Func<ICommandPipelineBehavior<TCommand, TResult>[]>(() =>
                {
                  using var scope = _provider.CreateScope();
                  var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
                  return behaviors;
                });
            });
      }

      private async Task<TResult> ExecuteHandler<TCommand, TResult>(ICommandHandler<TCommand, TResult> handler, TCommand command)
            where TCommand : ICommand<TResult>
      {
            return await handler.HandleAsync(command, CancellationToken.None);
      }

      private async Task<object> ExecuteHandler(object handler, object command)
      {
            var handlerType = handler.GetType();
            var handleMethod = handlerType.GetMethod("HandleAsync");
            
            if (handleMethod == null)
                throw new InvalidOperationException($"Handler {handlerType.Name} does not have HandleAsync method");

            var task = (Task)handleMethod.Invoke(handler, new object[] { command, CancellationToken.None })!;
            await task;
            
            var resultProperty = task.GetType().GetProperty("Result");
            return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");
      }

      private Func<object> GetCachedHandler(Type commandType)
      {
            return _handlerCache.GetOrAdd(commandType, _ =>
            {
                // 获取命令类型实现的ICommand<TResult>接口
                var commandInterface = commandType.GetInterfaces()
                  .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));
               
                if (commandInterface == null)
                  throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");
               
                var resultType = commandInterface.GetGenericArguments();
                var handlerType = typeof(ICommandHandler<,>).MakeGenericType(commandType, resultType);
               
                // 返回一个工厂函数,而不是直接返回处理器实例
                return new Func<object>(() =>
                {
                  using var scope = _provider.CreateScope();
                  var handler = scope.ServiceProvider.GetService(handlerType);
                  if (handler == null)
                        throw new InvalidOperationException($"No handler registered for {commandType.Name}");
                  return handler;
                });
            });
      }

      private Func<object[]> GetCachedBehaviors(Type commandType)
      {
            return _behaviorsCache.GetOrAdd(commandType, _ =>
            {
                // 获取命令类型实现的ICommand<TResult>接口
                var commandInterface = commandType.GetInterfaces()
                  .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));
               
                if (commandInterface == null)
                  throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");
               
                var resultType = commandInterface.GetGenericArguments();
                var behaviorType = typeof(ICommandPipelineBehavior<,>).MakeGenericType(commandType, resultType);
               
                // 返回一个工厂函数,而不是直接返回行为实例
                return new Func<object[]>(() =>
                {
                  using var scope = _provider.CreateScope();
                  var behaviors = scope.ServiceProvider.GetServices(behaviorType).Where(b => b != null).ToArray();
                  return behaviors!;
                });
            });
      }

      // 监控和统计方法
      public DataflowMetrics GetMetrics()
      {
            return new DataflowMetrics
            {
                ProcessedCommands = Interlocked.Read(ref _processedCommands),
                FailedCommands = Interlocked.Read(ref _failedCommands),
                TotalProcessingTime = TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime)),
                AverageProcessingTime = _processedCommands > 0
                  ? TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime) / _processedCommands)
                  : TimeSpan.Zero,
                AvailableConcurrency = _concurrencyLimiter.CurrentCount,
                MaxConcurrency = _maxConcurrency,
                InputQueueSize = _commandProcessor.InputCount
            };
      }

      public void ClearCache()
      {
            _handlerCache.Clear();
            _behaviorsCache.Clear();
      }

      public void Dispose()
      {
            _commandProcessor?.Complete();
            _concurrencyLimiter?.Dispose();
      }
    }

    // 辅助类
    internal class DataflowCommandRequest
    {
      public Guid Id { get; }
      public Type CommandType { get; }
      public Type ResultType { get; }
      public object Command { get; }
      public TaskCompletionSource<object> TaskCompletionSource { get; }

      public DataflowCommandRequest(Guid id, Type commandType, Type resultType, object command, TaskCompletionSource<object> tcs)
      {
            Id = id;
            CommandType = commandType;
            ResultType = resultType;
            Command = command;
            TaskCompletionSource = tcs;
      }
    }


这里如果不是数据流方式可以使用通用模式:


using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Common.Bus.Core;

namespace Common.Bus.Implementations
{
    public class CommandBus : ICommandBus
    {
      private readonly IServiceProvider _provider;
      private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new();
      private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new();
      private readonly ConcurrentDictionary<Type, Func<object, object, CancellationToken, Task<object>>> _pipelineCache = new();

      public CommandBus(IServiceProvider serviceProvider)
      {
            _provider = serviceProvider;
      }

      // 添加清理缓存的方法,用于测试或动态重新加载
      public void ClearCache()
      {
            _handlerCache.Clear();
            _behaviorsCache.Clear();
            _pipelineCache.Clear();
      }

      public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default) where TCommand : ICommand<TResult>
      {
            var commandType = typeof(TCommand);
            
            // 获取缓存的Handler
            var handler = GetCachedHandler<TCommand, TResult>(commandType);
            
            // 获取缓存的Pipeline
            var pipeline = GetCachedPipeline<TCommand, TResult>(commandType);
            
            // 执行Pipeline
            var result = await pipeline(handler, command, ct);
            return (TResult)result;
      }

      private ICommandHandler<TCommand, TResult> GetCachedHandler<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
      {
            var handlerFactory = (Func<object>)_handlerCache.GetOrAdd(commandType, _ =>
            {
                return new Func<object>(() =>
                {
                  using var scope = _provider.CreateScope();
                  var handler = scope.ServiceProvider.GetService(typeof(ICommandHandler<TCommand, TResult>));
                  if (handler == null)
                        throw new InvalidOperationException($"No handler registered for {commandType.Name}");
                  return handler;
                });
            });
            return (ICommandHandler<TCommand, TResult>)handlerFactory();
      }

      private ICommandPipelineBehavior<TCommand, TResult>[] GetCachedBehaviors<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
      {
            var behaviorsFactory = (Func<object[]>)_behaviorsCache.GetOrAdd(commandType, _ =>
            {
                return new Func<object[]>(() =>
                {
                  using var scope = _provider.CreateScope();
                  var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
                  return behaviors.Cast<object>().ToArray();
                });
            });
            return behaviorsFactory().Cast<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
      }

      private Func<object, object, CancellationToken, Task<object>> GetCachedPipeline<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
      {
            return _pipelineCache.GetOrAdd(commandType, _ =>
            {
                var behaviors = GetCachedBehaviors<TCommand, TResult>(commandType);
               
                // 预构建Pipeline,避免每次调用时重新构建
                return async (handler, command, ct) =>
                {
                  if (handler == null || command == null)
                        throw new ArgumentNullException("Handler or command cannot be null");
                        
                  var typedHandler = (ICommandHandler<TCommand, TResult>)handler;
                  var typedCommand = (TCommand)command;

                  // 如果没有behaviors,直接调用handler
                  if (behaviors.Length == 0)
                  {
                        var result = await typedHandler.HandleAsync(typedCommand, ct);
                        return (object)result!;
                  }

                  // 使用递归方式构建pipeline,减少委托创建
                  var pipelineResult = await ExecutePipeline(typedHandler, typedCommand, behaviors, 0, ct);
                  return (object)pipelineResult!;
                };
            });
      }

      private async Task<TResult> ExecutePipeline<TCommand, TResult>(
            ICommandHandler<TCommand, TResult> handler,
            TCommand command,
            ICommandPipelineBehavior<TCommand, TResult>[] behaviors,
            int behaviorIndex,
            CancellationToken ct) where TCommand : ICommand<TResult>
      {
            if (behaviorIndex >= behaviors.Length)
            {
                return await handler.HandleAsync(command, ct);
            }

            var behavior = behaviors;
            return await behavior.Handle(command, () => ExecutePipeline(handler, command, behaviors, behaviorIndex + 1, ct), ct);
      }
    }

其他批量操作、带监控等模式就参考其他代码:
exercisebook/AOP/EventBusAOP/AopNew at main · liuzhixin405/exercisebook

一下是项目更详细介绍,如有错误多多指正:
# CommandBus AOP 项目
这是一个基于AOP(面向切面编程)的CommandBus项目,使用TPL Dataflow进行数据流处理优化,支持多种CommandBus实现和实时监控。
## CommandBus实现类型
### 1. Standard CommandBus- **类型**: `CommandBusType.Standard`- **特点**: 标准同步处理,适合简单场景- **控制器**: `StandardCommandBusController`
### 2. Dataflow CommandBus- **类型**: `CommandBusType.Dataflow`- **特点**: 基于TPL Dataflow的异步并发处理,适合高并发场景- **控制器**: `DataflowCommandBusController`
### 3. Batch Dataflow CommandBus- **类型**: `CommandBusType.BatchDataflow`- **特点**: 支持批量处理,适合大批量数据场景- **控制器**: `BatchDataflowCommandBusController`
### 4. Typed Dataflow CommandBus- **类型**: `CommandBusType.TypedDataflow`- **特点**: 强类型安全,适合复杂业务场景- **控制器**: `TypedDataflowCommandBusController`
### 5. Monitored CommandBus- **类型**: `CommandBusType.Monitored`- **特点**: 包含性能监控,适合生产环境- **控制器**: `MonitoredCommandBusController` 这里有一个扩展点behavior,可以注入前后时间,当前代代码只做了业务前的拦截,业务后的可以如法炮制。这样的话就是一个aop,那么跟aop切面编程又有什么区别和共同点呢?
            // 构建处理管道
            Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command);
            
            // 按顺序应用管道行为
            foreach (var behavior in behaviors.Reverse())
            {
                var currentBehavior = behavior;
                var currentPipeline = pipeline;
                pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline);
            }
            
            return await pipeline();
      } 
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 命令模式的深度解析:从标准实现到TPL Dataflow高性能架构