命令模式是对一类对象公共操作的抽象,它们具有相同的方法签名,所以具有类似的操作,可以被抽象出来,成为一个抽象的命令对象。实际操作的调用者就不是和一组对象打交道,它是需要以来这个命令对象的方法签名,并根据这个签名调用相关的方法。
以上是命令模式的大概含义,这里可以联想到事件驱动,command和handler,也可以联想到AOP的思想。联想到数据流的操作我就写了个数据流操作类库。
之前写了一些有关AOP的,但是感觉还是差点意思,补上这次的可能在项目中会弥补一些短板回来,就是灵活性。
但是该项目重点是数据流的处理,所以web端来实现只是一个例子,大量数据的处理最主要的是后台任务吧,通过接口调用只是一个实例展示。
有关数据流这块代码核心如下:- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reflection;
- using System.Threading;
- using System.Threading.Tasks;
- using System.Threading.Tasks.Dataflow;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Logging;
- using Common.Bus.Core;
- using Common.Bus.Monitoring;
- namespace Common.Bus.Implementations
- {
- /// <summary>
- /// 基于TPL数据流的高性能CommandBus实现
- /// 支持并行处理、背压控制和监控
- /// </summary>
- public class DataflowCommandBus : ICommandBus, IDisposable
- {
- private readonly IServiceProvider _provider;
- private readonly ILogger<DataflowCommandBus>? _logger;
- private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new();
- private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new();
-
- // 数据流网络
- private ActionBlock<DataflowCommandRequest> _commandProcessor = null!;
-
- // 背压控制
- private readonly SemaphoreSlim _concurrencyLimiter;
- private readonly int _maxConcurrency;
-
- // 监控指标
- private long _processedCommands;
- private long _failedCommands;
- private long _totalProcessingTime;
- public DataflowCommandBus(IServiceProvider serviceProvider, ILogger<DataflowCommandBus>? logger = null,
- int? maxConcurrency = null)
- {
- _provider = serviceProvider;
- _logger = logger;
- _maxConcurrency = maxConcurrency ?? Environment.ProcessorCount * 2;
- _concurrencyLimiter = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
-
- // 创建数据流网络
- CreateDataflowNetwork();
- }
- private void CreateDataflowNetwork()
- {
- // 创建命令处理器
- _commandProcessor = new ActionBlock<DataflowCommandRequest>(
- async request =>
- {
- try
- {
- await _concurrencyLimiter.WaitAsync();
- var startTime = DateTime.UtcNow;
-
- // 执行完整的命令处理管道
- var result = await ProcessCommandPipeline(request);
-
- var processingTime = DateTime.UtcNow - startTime;
- Interlocked.Add(ref _totalProcessingTime, processingTime.Ticks);
- Interlocked.Increment(ref _processedCommands);
-
- request.TaskCompletionSource.SetResult(result);
- }
- catch (Exception ex)
- {
- Interlocked.Increment(ref _failedCommands);
- _logger?.LogError(ex, "Command processing failed for {CommandType}", request.CommandType.Name);
- request.TaskCompletionSource.SetException(ex);
- }
- finally
- {
- _concurrencyLimiter.Release();
- }
- },
- new ExecutionDataflowBlockOptions
- {
- MaxDegreeOfParallelism = _maxConcurrency,
- BoundedCapacity = _maxConcurrency * 2
- });
- }
- public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default)
- where TCommand : ICommand<TResult>
- {
- var commandType = typeof(TCommand);
- var requestId = Guid.NewGuid();
- var tcs = new TaskCompletionSource<object>();
-
- var request = new DataflowCommandRequest(requestId, commandType, typeof(TResult), command, tcs);
-
- // 发送到数据流网络
- if (!_commandProcessor.Post(request))
- {
- throw new InvalidOperationException("Unable to queue command for processing - system may be overloaded");
- }
-
- try
- {
- var result = await tcs.Task.WaitAsync(ct);
- return (TResult)result;
- }
- catch (OperationCanceledException) when (ct.IsCancellationRequested)
- {
- _logger?.LogWarning("Command {CommandType} was cancelled", commandType.Name);
- throw;
- }
- }
- private async Task<object> ProcessCommandPipeline(DataflowCommandRequest request)
- {
- // 使用反射调用泛型方法
- var method = typeof(DataflowCommandBus).GetMethod(nameof(ProcessCommandPipelineGeneric), BindingFlags.NonPublic | BindingFlags.Instance);
- var genericMethod = method!.MakeGenericMethod(request.CommandType, request.ResultType);
-
- var task = (Task)genericMethod.Invoke(this, new object[] { request })!;
- await task;
-
- var resultProperty = task.GetType().GetProperty("Result");
- return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");
- }
- private async Task<TResult> ProcessCommandPipelineGeneric<TCommand, TResult>(DataflowCommandRequest request)
- where TCommand : ICommand<TResult>
- {
- // 获取处理器和行为的工厂函数
- var handlerFactory = GetCachedHandler<TCommand, TResult>(request.CommandType);
- var behaviorsFactory = GetCachedBehaviors<TCommand, TResult>(request.CommandType);
-
- // 创建处理器和行为的实例
- var handler = handlerFactory();
- var behaviors = behaviorsFactory();
-
- // 构建处理管道
- Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command);
-
- // 按顺序应用管道行为
- foreach (var behavior in behaviors.Reverse())
- {
- var currentBehavior = behavior;
- var currentPipeline = pipeline;
- pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline);
- }
-
- return await pipeline();
- }
- private async Task<object> ExecuteBehavior<TCommand, TResult>(
- ICommandPipelineBehavior<TCommand, TResult> behavior,
- TCommand command,
- Func<Task<TResult>> next)
- where TCommand : ICommand<TResult>
- {
- try
- {
- var result = await behavior.Handle(command, next, CancellationToken.None);
- return result!;
- }
- catch (Exception ex)
- {
- throw new InvalidOperationException($"Error executing behavior {behavior.GetType().Name}: {ex.Message}", ex);
- }
- }
- private Func<ICommandHandler<TCommand, TResult>> GetCachedHandler<TCommand, TResult>(Type commandType)
- where TCommand : ICommand<TResult>
- {
- return (Func<ICommandHandler<TCommand, TResult>>)_handlerCache.GetOrAdd(commandType, _ =>
- {
- return new Func<ICommandHandler<TCommand, TResult>>(() =>
- {
- using var scope = _provider.CreateScope();
- var handler = scope.ServiceProvider.GetService<ICommandHandler<TCommand, TResult>>();
- if (handler == null)
- throw new InvalidOperationException($"No handler registered for {commandType.Name}");
- return handler;
- });
- });
- }
- private Func<ICommandPipelineBehavior<TCommand, TResult>[]> GetCachedBehaviors<TCommand, TResult>(Type commandType)
- where TCommand : ICommand<TResult>
- {
- return (Func<ICommandPipelineBehavior<TCommand, TResult>[]>)_behaviorsCache.GetOrAdd(commandType, _ =>
- {
- return new Func<ICommandPipelineBehavior<TCommand, TResult>[]>(() =>
- {
- using var scope = _provider.CreateScope();
- var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
- return behaviors;
- });
- });
- }
- private async Task<TResult> ExecuteHandler<TCommand, TResult>(ICommandHandler<TCommand, TResult> handler, TCommand command)
- where TCommand : ICommand<TResult>
- {
- return await handler.HandleAsync(command, CancellationToken.None);
- }
- private async Task<object> ExecuteHandler(object handler, object command)
- {
- var handlerType = handler.GetType();
- var handleMethod = handlerType.GetMethod("HandleAsync");
-
- if (handleMethod == null)
- throw new InvalidOperationException($"Handler {handlerType.Name} does not have HandleAsync method");
- var task = (Task)handleMethod.Invoke(handler, new object[] { command, CancellationToken.None })!;
- await task;
-
- var resultProperty = task.GetType().GetProperty("Result");
- return resultProperty?.GetValue(task) ?? throw new InvalidOperationException("Failed to get result from task");
- }
- private Func<object> GetCachedHandler(Type commandType)
- {
- return _handlerCache.GetOrAdd(commandType, _ =>
- {
- // 获取命令类型实现的ICommand<TResult>接口
- var commandInterface = commandType.GetInterfaces()
- .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));
-
- if (commandInterface == null)
- throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");
-
- var resultType = commandInterface.GetGenericArguments()[0];
- var handlerType = typeof(ICommandHandler<,>).MakeGenericType(commandType, resultType);
-
- // 返回一个工厂函数,而不是直接返回处理器实例
- return new Func<object>(() =>
- {
- using var scope = _provider.CreateScope();
- var handler = scope.ServiceProvider.GetService(handlerType);
- if (handler == null)
- throw new InvalidOperationException($"No handler registered for {commandType.Name}");
- return handler;
- });
- });
- }
- private Func<object[]> GetCachedBehaviors(Type commandType)
- {
- return _behaviorsCache.GetOrAdd(commandType, _ =>
- {
- // 获取命令类型实现的ICommand<TResult>接口
- var commandInterface = commandType.GetInterfaces()
- .FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ICommand<>));
-
- if (commandInterface == null)
- throw new InvalidOperationException($"Command type {commandType.Name} does not implement ICommand<TResult>");
-
- var resultType = commandInterface.GetGenericArguments()[0];
- var behaviorType = typeof(ICommandPipelineBehavior<,>).MakeGenericType(commandType, resultType);
-
- // 返回一个工厂函数,而不是直接返回行为实例
- return new Func<object[]>(() =>
- {
- using var scope = _provider.CreateScope();
- var behaviors = scope.ServiceProvider.GetServices(behaviorType).Where(b => b != null).ToArray();
- return behaviors!;
- });
- });
- }
- // 监控和统计方法
- public DataflowMetrics GetMetrics()
- {
- return new DataflowMetrics
- {
- ProcessedCommands = Interlocked.Read(ref _processedCommands),
- FailedCommands = Interlocked.Read(ref _failedCommands),
- TotalProcessingTime = TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime)),
- AverageProcessingTime = _processedCommands > 0
- ? TimeSpan.FromTicks(Interlocked.Read(ref _totalProcessingTime) / _processedCommands)
- : TimeSpan.Zero,
- AvailableConcurrency = _concurrencyLimiter.CurrentCount,
- MaxConcurrency = _maxConcurrency,
- InputQueueSize = _commandProcessor.InputCount
- };
- }
- public void ClearCache()
- {
- _handlerCache.Clear();
- _behaviorsCache.Clear();
- }
- public void Dispose()
- {
- _commandProcessor?.Complete();
- _concurrencyLimiter?.Dispose();
- }
- }
- // 辅助类
- internal class DataflowCommandRequest
- {
- public Guid Id { get; }
- public Type CommandType { get; }
- public Type ResultType { get; }
- public object Command { get; }
- public TaskCompletionSource<object> TaskCompletionSource { get; }
- public DataflowCommandRequest(Guid id, Type commandType, Type resultType, object command, TaskCompletionSource<object> tcs)
- {
- Id = id;
- CommandType = commandType;
- ResultType = resultType;
- Command = command;
- TaskCompletionSource = tcs;
- }
- }
- }
复制代码
这里如果不是数据流方式可以使用通用模式:
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using Microsoft.Extensions.DependencyInjection;
- using Common.Bus.Core;
- namespace Common.Bus.Implementations
- {
- public class CommandBus : ICommandBus
- {
- private readonly IServiceProvider _provider;
- private readonly ConcurrentDictionary<Type, Func<object>> _handlerCache = new();
- private readonly ConcurrentDictionary<Type, Func<object[]>> _behaviorsCache = new();
- private readonly ConcurrentDictionary<Type, Func<object, object, CancellationToken, Task<object>>> _pipelineCache = new();
- public CommandBus(IServiceProvider serviceProvider)
- {
- _provider = serviceProvider;
- }
- // 添加清理缓存的方法,用于测试或动态重新加载
- public void ClearCache()
- {
- _handlerCache.Clear();
- _behaviorsCache.Clear();
- _pipelineCache.Clear();
- }
- public async Task<TResult> SendAsync<TCommand, TResult>(TCommand command, CancellationToken ct = default) where TCommand : ICommand<TResult>
- {
- var commandType = typeof(TCommand);
-
- // 获取缓存的Handler
- var handler = GetCachedHandler<TCommand, TResult>(commandType);
-
- // 获取缓存的Pipeline
- var pipeline = GetCachedPipeline<TCommand, TResult>(commandType);
-
- // 执行Pipeline
- var result = await pipeline(handler, command, ct);
- return (TResult)result;
- }
- private ICommandHandler<TCommand, TResult> GetCachedHandler<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
- {
- var handlerFactory = (Func<object>)_handlerCache.GetOrAdd(commandType, _ =>
- {
- return new Func<object>(() =>
- {
- using var scope = _provider.CreateScope();
- var handler = scope.ServiceProvider.GetService(typeof(ICommandHandler<TCommand, TResult>));
- if (handler == null)
- throw new InvalidOperationException($"No handler registered for {commandType.Name}");
- return handler;
- });
- });
- return (ICommandHandler<TCommand, TResult>)handlerFactory();
- }
- private ICommandPipelineBehavior<TCommand, TResult>[] GetCachedBehaviors<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
- {
- var behaviorsFactory = (Func<object[]>)_behaviorsCache.GetOrAdd(commandType, _ =>
- {
- return new Func<object[]>(() =>
- {
- using var scope = _provider.CreateScope();
- var behaviors = scope.ServiceProvider.GetServices<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
- return behaviors.Cast<object>().ToArray();
- });
- });
- return behaviorsFactory().Cast<ICommandPipelineBehavior<TCommand, TResult>>().ToArray();
- }
- private Func<object, object, CancellationToken, Task<object>> GetCachedPipeline<TCommand, TResult>(Type commandType) where TCommand : ICommand<TResult>
- {
- return _pipelineCache.GetOrAdd(commandType, _ =>
- {
- var behaviors = GetCachedBehaviors<TCommand, TResult>(commandType);
-
- // 预构建Pipeline,避免每次调用时重新构建
- return async (handler, command, ct) =>
- {
- if (handler == null || command == null)
- throw new ArgumentNullException("Handler or command cannot be null");
-
- var typedHandler = (ICommandHandler<TCommand, TResult>)handler;
- var typedCommand = (TCommand)command;
- // 如果没有behaviors,直接调用handler
- if (behaviors.Length == 0)
- {
- var result = await typedHandler.HandleAsync(typedCommand, ct);
- return (object)result!;
- }
- // 使用递归方式构建pipeline,减少委托创建
- var pipelineResult = await ExecutePipeline(typedHandler, typedCommand, behaviors, 0, ct);
- return (object)pipelineResult!;
- };
- });
- }
- private async Task<TResult> ExecutePipeline<TCommand, TResult>(
- ICommandHandler<TCommand, TResult> handler,
- TCommand command,
- ICommandPipelineBehavior<TCommand, TResult>[] behaviors,
- int behaviorIndex,
- CancellationToken ct) where TCommand : ICommand<TResult>
- {
- if (behaviorIndex >= behaviors.Length)
- {
- return await handler.HandleAsync(command, ct);
- }
- var behavior = behaviors[behaviorIndex];
- return await behavior.Handle(command, () => ExecutePipeline(handler, command, behaviors, behaviorIndex + 1, ct), ct);
- }
- }
- }
复制代码
其他批量操作、带监控等模式就参考其他代码:
exercisebook/AOP/EventBusAOP/AopNew at main · liuzhixin405/exercisebook
一下是项目更详细介绍,如有错误多多指正:
# CommandBus AOP 项目
这是一个基于AOP(面向切面编程)的CommandBus项目,使用TPL Dataflow进行数据流处理优化,支持多种CommandBus实现和实时监控。
## CommandBus实现类型
### 1. Standard CommandBus- **类型**: `CommandBusType.Standard`- **特点**: 标准同步处理,适合简单场景- **控制器**: `StandardCommandBusController`
### 2. Dataflow CommandBus- **类型**: `CommandBusType.Dataflow`- **特点**: 基于TPL Dataflow的异步并发处理,适合高并发场景- **控制器**: `DataflowCommandBusController`
### 3. Batch Dataflow CommandBus- **类型**: `CommandBusType.BatchDataflow`- **特点**: 支持批量处理,适合大批量数据场景- **控制器**: `BatchDataflowCommandBusController`
### 4. Typed Dataflow CommandBus- **类型**: `CommandBusType.TypedDataflow`- **特点**: 强类型安全,适合复杂业务场景- **控制器**: `TypedDataflowCommandBusController`
### 5. Monitored CommandBus- **类型**: `CommandBusType.Monitored`- **特点**: 包含性能监控,适合生产环境- **控制器**: `MonitoredCommandBusController` 这里有一个扩展点behavior,可以注入前后时间,当前代代码只做了业务前的拦截,业务后的可以如法炮制。这样的话就是一个aop,那么跟aop切面编程又有什么区别和共同点呢?- // 构建处理管道
- Func<Task<TResult>> pipeline = () => ExecuteHandler<TCommand, TResult>(handler, (TCommand)request.Command);
-
- // 按顺序应用管道行为
- foreach (var behavior in behaviors.Reverse())
- {
- var currentBehavior = behavior;
- var currentPipeline = pipeline;
- pipeline = async () => (TResult)await ExecuteBehavior(currentBehavior, (TCommand)request.Command, currentPipeline);
- }
-
- return await pipeline();
- }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |