找回密码
 立即注册
首页 业界区 业界 【RabbitMQ】RPC模式(请求/回复)

【RabbitMQ】RPC模式(请求/回复)

普料飕 10 小时前
本章目标


  • 理解RabbitMQ RPC模式的工作原理和适用场景。
  • 掌握回调队列(Callback Queue)和关联ID(Correlation Id)的使用。
  • 实现基于RabbitMQ的异步RPC调用。
  • 学习RPC模式下的错误处理和超时机制。
  • 构建完整的微服务间同步通信解决方案。
一、理论部分

1. RPC模式简介

RPC(Remote Procedure Call)模式允许客户端应用程序调用远程服务器上的方法,就像调用本地方法一样。在RabbitMQ中,RPC是通过消息队列实现的异步RPC。
与传统HTTP RPC的区别:

  • HTTP RPC:同步,直接连接,需要服务端在线
  • 消息队列RPC:异步,通过消息代理,支持解耦和负载均衡
2. RabbitMQ RPC核心组件


  • 请求队列(Request Queue):客户端发送请求的队列
  • 回复队列(Reply Queue):服务器返回响应的队列
  • 关联ID(Correlation Id):匹配请求和响应的唯一标识
  • 消息属性:使用IBasicProperties.ReplyTo和IBasicProperties.CorrelationId
3. RPC工作流程
  1. Client端:
  2. 1. 生成唯一CorrelationId
  3. 2. 创建临时回复队列
  4. 3. 发送请求到请求队列,设置ReplyTo和CorrelationId
  5. 4. 监听回复队列,等待匹配的CorrelationId
  6. Server端:
  7. 1. 监听请求队列
  8. 2. 处理请求
  9. 3. 将响应发送到请求中的ReplyTo队列
  10. 4. 设置相同的CorrelationId
  11. Client端:
  12. 5. 收到响应,根据CorrelationId匹配请求
  13. 6. 处理响应
复制代码
4. 适用场景


  • 需要同步响应的异步操作
  • 微服务间的同步通信
  • 计算密集型任务的分布式处理
  • 需要负载均衡的同步调用
二、实操部分:构建分布式计算服务

我们将创建一个分布式斐波那契数列计算服务,演示完整的RPC模式实现。
第1步:创建项目结构
  1. # 创建解决方案
  2. dotnet new sln -n RpcSystem
  3. # 创建项目
  4. dotnet new webapi -n RpcClient.API
  5. dotnet new classlib -n RpcClient.Core
  6. dotnet new classlib -n RpcServer.Service
  7. dotnet new classlib -n RpcShared
  8. # 添加到解决方案
  9. dotnet sln add RpcClient.API/RpcClient.API.csproj
  10. dotnet sln add RpcClient.Core/RpcClient.Core.csproj
  11. dotnet sln add RpcServer.Service/RpcServer.Service.csproj
  12. dotnet sln add RpcShared/RpcShared.csproj
  13. # 添加项目引用
  14. dotnet add RpcClient.API reference RpcClient.Core
  15. dotnet add RpcClient.API reference RpcShared
  16. dotnet add RpcClient.Core reference RpcShared
  17. dotnet add RpcServer.Service reference RpcShared
  18. # 添加NuGet包
  19. cd RpcClient.API
  20. dotnet add package RabbitMQ.Client
  21. cd ../RpcClient.Core
  22. dotnet add package RabbitMQ.Client
  23. cd ../RpcServer.Service
  24. dotnet add package RabbitMQ.Client
复制代码
第2步:定义共享模型(RpcShared)

Models/RpcRequest.cs
1.gif
2.gif
  1. using System.Text.Json.Serialization;
  2. namespace RpcShared.Models
  3. {
  4.     public class RpcRequest
  5.     {
  6.         [JsonPropertyName("requestId")]
  7.         public string RequestId { get; set; } = Guid.NewGuid().ToString();
  8.         [JsonPropertyName("method")]
  9.         public string Method { get; set; } = string.Empty;
  10.         [JsonPropertyName("parameters")]
  11.         public Dictionary<string, object> Parameters { get; set; } = new();
  12.         [JsonPropertyName("timestamp")]
  13.         public DateTime Timestamp { get; set; } = DateTime.UtcNow;
  14.         public RpcRequest WithParameter(string key, object value)
  15.         {
  16.             Parameters[key] = value;
  17.             return this;
  18.         }
  19.         public T? GetParameter<T>(string key)
  20.         {
  21.             if (Parameters.TryGetValue(key, out var value))
  22.             {
  23.                 try
  24.                 {
  25.                     return (T)Convert.ChangeType(value, typeof(T));
  26.                 }
  27.                 catch
  28.                 {
  29.                     return default;
  30.                 }
  31.             }
  32.             return default;
  33.         }
  34.     }
  35. }
复制代码
View CodeModels/RpcResponse.cs
3.gif
4.gif
  1. using System.Text.Json.Serialization;
  2. namespace RpcShared.Models
  3. {
  4.     public class RpcResponse
  5.     {
  6.         [JsonPropertyName("requestId")]
  7.         public string RequestId { get; set; } = string.Empty;
  8.         [JsonPropertyName("success")]
  9.         public bool Success { get; set; }
  10.         [JsonPropertyName("data")]
  11.         public object? Data { get; set; }
  12.         [JsonPropertyName("error")]
  13.         public string? Error { get; set; }
  14.         [JsonPropertyName("timestamp")]
  15.         public DateTime Timestamp { get; set; } = DateTime.UtcNow;
  16.         [JsonPropertyName("processingTimeMs")]
  17.         public long ProcessingTimeMs { get; set; }
  18.         public static RpcResponse SuccessResponse(string requestId, object data, long processingTimeMs = 0)
  19.         {
  20.             return new RpcResponse
  21.             {
  22.                 RequestId = requestId,
  23.                 Success = true,
  24.                 Data = data,
  25.                 ProcessingTimeMs = processingTimeMs
  26.             };
  27.         }
  28.         public static RpcResponse ErrorResponse(string requestId, string error, long processingTimeMs = 0)
  29.         {
  30.             return new RpcResponse
  31.             {
  32.                 RequestId = requestId,
  33.                 Success = false,
  34.                 Error = error,
  35.                 ProcessingTimeMs = processingTimeMs
  36.             };
  37.         }
  38.         public T? GetData<T>()
  39.         {
  40.             if (Data is JsonElement jsonElement)
  41.             {
  42.                 return jsonElement.Deserialize<T>();
  43.             }
  44.             return Data is T typedData ? typedData : default;
  45.         }
  46.     }
  47. }
复制代码
View CodeMessages/FibonacciRequest.cs
5.gif
6.gif
  1. namespace RpcShared.Messages
  2. {
  3.     public class FibonacciRequest
  4.     {
  5.         public int Number { get; set; }
  6.         public bool UseOptimizedAlgorithm { get; set; } = true;
  7.     }
  8.     public class FibonacciResponse
  9.     {
  10.         public long Result { get; set; }
  11.         public long CalculationTimeMs { get; set; }
  12.         public int InputNumber { get; set; }
  13.     }
  14. }
复制代码
View Code第3步:RPC客户端核心库(RpcClient.Core)

Services/IRpcClient.cs
7.gif
8.gif
  1. using RpcShared.Models;
  2. namespace RpcClient.Core.Services
  3. {
  4.     public interface IRpcClient : IDisposable
  5.     {
  6.         Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout);
  7.         Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class;
  8.     }
  9. }
复制代码
View CodeServices/RpcClient.cs
  1. using System.Collections.Concurrent;
  2. using System.Text;
  3. using System.Text.Json;
  4. using Microsoft.Extensions.Logging;
  5. using RabbitMQ.Client;
  6. using RabbitMQ.Client.Events;
  7. using RpcShared.Models;
  8. namespace RpcClient.Core.Services
  9. {
  10.     public class RpcClient : IRpcClient
  11.     {
  12.         private readonly IConnection _connection;
  13.         private readonly IModel _channel;
  14.         private readonly ILogger<RpcClient> _logger;
  15.         private readonly string _replyQueueName;
  16.         private readonly ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>> _pendingRequests;
  17.         private readonly AsyncEventingBasicConsumer _consumer;
  18.         private bool _disposed = false;
  19.         public RpcClient(
  20.             IConnectionFactory connectionFactory,
  21.             ILogger<RpcClient> logger)
  22.         {
  23.             _logger = logger;
  24.             _pendingRequests = new ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>>();
  25.             // 建立连接和通道
  26.             _connection = connectionFactory.CreateConnection();
  27.             _channel = _connection.CreateModel();
  28.             // 声明临时回复队列(排他性,连接关闭时自动删除)
  29.             _replyQueueName = _channel.QueueDeclare(
  30.                 queue: "",
  31.                 durable: false,
  32.                 exclusive: true,
  33.                 autoDelete: true,
  34.                 arguments: null).QueueName;
  35.             // 创建消费者监听回复队列
  36.             _consumer = new AsyncEventingBasicConsumer(_channel);
  37.             _consumer.Received += OnResponseReceived;
  38.             // 开始消费回复队列
  39.             _channel.BasicConsume(
  40.                 queue: _replyQueueName,
  41.                 autoAck: false,
  42.                 consumer: _consumer);
  43.             _logger.LogInformation("RPC Client initialized with reply queue: {ReplyQueue}", _replyQueueName);
  44.         }
  45.         public async Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout)
  46.         {
  47.             if (_disposed)
  48.                 throw new ObjectDisposedException(nameof(RpcClient));
  49.             var tcs = new TaskCompletionSource<RpcResponse>();
  50.             var cancellationTokenSource = new CancellationTokenSource(timeout);
  51.             // 注册超时取消
  52.             cancellationTokenSource.Token.Register(() =>
  53.             {
  54.                 if (_pendingRequests.TryRemove(request.RequestId, out var removedTcs))
  55.                 {
  56.                     removedTcs.TrySetException(new TimeoutException($"RPC call timed out after {timeout.TotalSeconds} seconds"));
  57.                     _logger.LogWarning("RPC request {RequestId} timed out", request.RequestId);
  58.                 }
  59.             });
  60.             // 将请求添加到待处理字典
  61.             if (!_pendingRequests.TryAdd(request.RequestId, tcs))
  62.             {
  63.                 throw new InvalidOperationException($"Request with ID {request.RequestId} is already pending");
  64.             }
  65.             try
  66.             {
  67.                 // 序列化请求
  68.                 var requestJson = JsonSerializer.Serialize(request);
  69.                 var requestBody = Encoding.UTF8.GetBytes(requestJson);
  70.                 // 设置消息属性
  71.                 var properties = _channel.CreateBasicProperties();
  72.                 properties.ReplyTo = _replyQueueName;
  73.                 properties.CorrelationId = request.RequestId;
  74.                 properties.Persistent = true;
  75.                 _logger.LogDebug("Sending RPC request {RequestId} to queue: rpc_queue", request.RequestId);
  76.                 // 发布请求到RPC队列
  77.                 _channel.BasicPublish(
  78.                     exchange: "",
  79.                     routingKey: "rpc_queue",
  80.                     basicProperties: properties,
  81.                     body: requestBody);
  82.                 _logger.LogInformation("RPC request {RequestId} sent successfully", request.RequestId);
  83.                 // 等待响应
  84.                 return await tcs.Task;
  85.             }
  86.             catch (Exception ex)
  87.             {
  88.                 // 发生异常时移除待处理请求
  89.                 _pendingRequests.TryRemove(request.RequestId, out _);
  90.                 _logger.LogError(ex, "Error sending RPC request {RequestId}", request.RequestId);
  91.                 throw;
  92.             }
  93.         }
  94.         public async Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class
  95.         {
  96.             var response = await CallAsync(request, timeout);
  97.             
  98.             if (!response.Success)
  99.             {
  100.                 throw new InvalidOperationException($"RPC call failed: {response.Error}");
  101.             }
  102.             return response.GetData<TResponse>();
  103.         }
  104.         private async Task OnResponseReceived(object sender, BasicDeliverEventArgs ea)
  105.         {
  106.             var responseBody = ea.Body.ToArray();
  107.             var responseJson = Encoding.UTF8.GetString(responseBody);
  108.             var correlationId = ea.BasicProperties.CorrelationId;
  109.             _logger.LogDebug("Received RPC response for correlation ID: {CorrelationId}", correlationId);
  110.             try
  111.             {
  112.                 var response = JsonSerializer.Deserialize<RpcResponse>(responseJson);
  113.                 if (response == null)
  114.                 {
  115.                     _logger.LogError("Failed to deserialize RPC response for correlation ID: {CorrelationId}", correlationId);
  116.                     return;
  117.                 }
  118.                 // 查找匹配的待处理请求
  119.                 if (_pendingRequests.TryRemove(correlationId, out var tcs))
  120.                 {
  121.                     tcs.TrySetResult(response);
  122.                     _logger.LogDebug("RPC response for {CorrelationId} delivered to waiting task", correlationId);
  123.                 }
  124.                 else
  125.                 {
  126.                     _logger.LogWarning("Received response for unknown correlation ID: {CorrelationId}", correlationId);
  127.                 }
  128.                 // 手动确认消息
  129.                 _channel.BasicAck(ea.DeliveryTag, false);
  130.             }
  131.             catch (Exception ex)
  132.             {
  133.                 _logger.LogError(ex, "Error processing RPC response for correlation ID: {CorrelationId}", correlationId);
  134.                
  135.                 // 处理失败时拒绝消息(不重新入队)
  136.                 _channel.BasicNack(ea.DeliveryTag, false, false);
  137.                
  138.                 // 如果反序列化失败,仍然通知等待的任务
  139.                 if (_pendingRequests.TryRemove(correlationId, out var tcs))
  140.                 {
  141.                     tcs.TrySetException(new InvalidOperationException("Failed to process RPC response"));
  142.                 }
  143.             }
  144.             await Task.CompletedTask;
  145.         }
  146.         public void Dispose()
  147.         {
  148.             if (!_disposed)
  149.             {
  150.                 _disposed = true;
  151.                 // 取消所有待处理的请求
  152.                 foreach (var (requestId, tcs) in _pendingRequests)
  153.                 {
  154.                     tcs.TrySetCanceled();
  155.                 }
  156.                 _pendingRequests.Clear();
  157.                 _channel?.Close();
  158.                 _channel?.Dispose();
  159.                 _connection?.Close();
  160.                 _connection?.Dispose();
  161.                 _logger.LogInformation("RPC Client disposed");
  162.             }
  163.         }
  164.     }
  165. }
复制代码
Services/FibonacciRpcClient.cs
  1. using RpcClient.Core.Services;
  2. using RpcShared.Messages;
  3. using RpcShared.Models;
  4. namespace RpcClient.Core.Services
  5. {
  6.     public class FibonacciRpcClient
  7.     {
  8.         private readonly IRpcClient _rpcClient;
  9.         private readonly ILogger<FibonacciRpcClient> _logger;
  10.         public FibonacciRpcClient(IRpcClient rpcClient, ILogger<FibonacciRpcClient> logger)
  11.         {
  12.             _rpcClient = rpcClient;
  13.             _logger = logger;
  14.         }
  15.         public async Task<long> CalculateFibonacciAsync(int number, bool useOptimized = true, TimeSpan? timeout = null)
  16.         {
  17.             var request = new RpcRequest
  18.             {
  19.                 Method = "fibonacci.calculate",
  20.                 Timestamp = DateTime.UtcNow
  21.             }
  22.             .WithParameter("number", number)
  23.             .WithParameter("useOptimized", useOptimized);
  24.             timeout ??= TimeSpan.FromSeconds(30);
  25.             try
  26.             {
  27.                 _logger.LogInformation("Calculating Fibonacci({Number}) with timeout {Timeout}s",
  28.                     number, timeout.Value.TotalSeconds);
  29.                 var response = await _rpcClient.CallAsync<FibonacciResponse>(request, timeout.Value);
  30.                
  31.                 if (response != null)
  32.                 {
  33.                     _logger.LogInformation(
  34.                         "Fibonacci({Number}) = {Result} (calculated in {Time}ms)",
  35.                         number, response.Result, response.CalculationTimeMs);
  36.                     
  37.                     return response.Result;
  38.                 }
  39.                 throw new InvalidOperationException("Received null response from RPC server");
  40.             }
  41.             catch (TimeoutException ex)
  42.             {
  43.                 _logger.LogError(ex, "Fibonacci calculation timed out for number {Number}", number);
  44.                 throw;
  45.             }
  46.             catch (Exception ex)
  47.             {
  48.                 _logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number);
  49.                 throw;
  50.             }
  51.         }
  52.         public async Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number, bool useOptimized = true, TimeSpan? timeout = null)
  53.         {
  54.             var request = new RpcRequest
  55.             {
  56.                 Method = "fibonacci.calculate",
  57.                 Timestamp = DateTime.UtcNow
  58.             }
  59.             .WithParameter("number", number)
  60.             .WithParameter("useOptimized", useOptimized);
  61.             timeout ??= TimeSpan.FromSeconds(30);
  62.             var response = await _rpcClient.CallAsync<FibonacciResponse>(request, timeout.Value);
  63.             return response ?? throw new InvalidOperationException("Received null response from RPC server");
  64.         }
  65.     }
  66. }
复制代码
第4步:RPC客户端API(RpcClient.API)

Program.cs
9.gif
10.gif
  1. using RpcClient.API.Services;
  2. using RpcClient.Core.Services;
  3. using RpcShared.Models;
  4. using RabbitMQ.Client;
  5. var builder = WebApplication.CreateBuilder(args);
  6. // Add services to the container.
  7. builder.Services.AddControllers();
  8. builder.Services.AddEndpointsApiExplorer();
  9. builder.Services.AddSwaggerGen();
  10. // Configure RabbitMQ
  11. builder.Services.AddSingleton<IConnectionFactory>(sp =>
  12. {
  13.     var configuration = sp.GetRequiredService<IConfiguration>();
  14.     return new ConnectionFactory
  15.     {
  16.         HostName = configuration["RabbitMQ:HostName"],
  17.         UserName = configuration["RabbitMQ:UserName"],
  18.         Password = configuration["RabbitMQ:Password"],
  19.         Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
  20.         VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
  21.         DispatchConsumersAsync = true
  22.     };
  23. });
  24. // Register RPC services
  25. builder.Services.AddSingleton<IRpcClient, RpcClient>();
  26. builder.Services.AddScoped<FibonacciRpcClient>();
  27. builder.Services.AddScoped<IMathRpcService, MathRpcService>();
  28. // Add health checks
  29. builder.Services.AddHealthChecks()
  30.     .AddRabbitMQ(provider =>
  31.     {
  32.         var factory = provider.GetRequiredService<IConfiguration>();
  33.         return factory.CreateConnection();
  34.     });
  35. var app = builder.Build();
  36. // Configure the HTTP request pipeline.
  37. if (app.Environment.IsDevelopment())
  38. {
  39.     app.UseSwagger();
  40.     app.UseSwaggerUI();
  41. }
  42. app.UseHttpsRedirection();
  43. app.UseAuthorization();
  44. app.MapControllers();
  45. app.MapHealthChecks("/health");
  46. app.Run();
复制代码
View CodeServices/IMathRpcService.cs
11.gif
12.gif
  1. using RpcShared.Messages;
  2. namespace RpcClient.API.Services
  3. {
  4.     public interface IMathRpcService
  5.     {
  6.         Task<long> CalculateFibonacciAsync(int number);
  7.         Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number);
  8.         Task<bool> HealthCheckAsync();
  9.     }
  10. }
复制代码
View CodeServices/MathRpcService.cs
13.gif
14.gif
  1. using RpcClient.Core.Services;
  2. using RpcShared.Messages;
  3. namespace RpcClient.API.Services
  4. {
  5.     public class MathRpcService : IMathRpcService
  6.     {
  7.         private readonly FibonacciRpcClient _fibonacciClient;
  8.         private readonly ILogger<MathRpcService> _logger;
  9.         public MathRpcService(FibonacciRpcClient fibonacciClient, ILogger<MathRpcService> logger)
  10.         {
  11.             _fibonacciClient = fibonacciClient;
  12.             _logger = logger;
  13.         }
  14.         public async Task<long> CalculateFibonacciAsync(int number)
  15.         {
  16.             if (number < 0)
  17.                 throw new ArgumentException("Number must be non-negative", nameof(number));
  18.             if (number > 50)
  19.                 throw new ArgumentException("Number too large for demonstration", nameof(number));
  20.             return await _fibonacciClient.CalculateFibonacciAsync(number);
  21.         }
  22.         public async Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number)
  23.         {
  24.             if (number < 0)
  25.                 throw new ArgumentException("Number must be non-negative", nameof(number));
  26.             if (number > 50)
  27.                 throw new ArgumentException("Number too large for demonstration", nameof(number));
  28.             return await _fibonacciClient.CalculateFibonacciDetailedAsync(number);
  29.         }
  30.         public async Task<bool> HealthCheckAsync()
  31.         {
  32.             try
  33.             {
  34.                 // 简单的健康检查:计算 Fibonacci(1)
  35.                 var result = await _fibonacciClient.CalculateFibonacciAsync(1, timeout: TimeSpan.FromSeconds(5));
  36.                 return result == 1;
  37.             }
  38.             catch (Exception ex)
  39.             {
  40.                 _logger.LogWarning(ex, "RPC health check failed");
  41.                 return false;
  42.             }
  43.         }
  44.     }
  45. }
复制代码
View CodeControllers/MathController.cs
15.gif
16.gif
  1. using Microsoft.AspNetCore.Mvc;
  2. using RpcClient.API.Services;
  3. using RpcShared.Messages;
  4. namespace RpcClient.API.Controllers
  5. {
  6.     [ApiController]
  7.     [Route("api/[controller]")]
  8.     public class MathController : ControllerBase
  9.     {
  10.         private readonly IMathRpcService _mathService;
  11.         private readonly ILogger<MathController> _logger;
  12.         public MathController(IMathRpcService mathService, ILogger<MathController> logger)
  13.         {
  14.             _mathService = mathService;
  15.             _logger = logger;
  16.         }
  17.         [HttpGet("fibonacci/{number}")]
  18.         public async Task> CalculateFibonacci(int number)
  19.         {
  20.             try
  21.             {
  22.                 _logger.LogInformation("Calculating Fibonacci({Number}) via RPC", number);
  23.                 var result = await _mathService.CalculateFibonacciAsync(number);
  24.                 return Ok(result);
  25.             }
  26.             catch (ArgumentException ex)
  27.             {
  28.                 return BadRequest(ex.Message);
  29.             }
  30.             catch (TimeoutException ex)
  31.             {
  32.                 _logger.LogWarning(ex, "Fibonacci calculation timed out for number {Number}", number);
  33.                 return StatusCode(408, "Calculation timed out");
  34.             }
  35.             catch (Exception ex)
  36.             {
  37.                 _logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number);
  38.                 return StatusCode(500, "Internal server error");
  39.             }
  40.         }
  41.         [HttpGet("fibonacci/{number}/detailed")]
  42.         public async Task> CalculateFibonacciDetailed(int number)
  43.         {
  44.             try
  45.             {
  46.                 _logger.LogInformation("Calculating Fibonacci({Number}) with details via RPC", number);
  47.                 var result = await _mathService.CalculateFibonacciDetailedAsync(number);
  48.                 return Ok(result);
  49.             }
  50.             catch (ArgumentException ex)
  51.             {
  52.                 return BadRequest(ex.Message);
  53.             }
  54.             catch (TimeoutException ex)
  55.             {
  56.                 _logger.LogWarning(ex, "Fibonacci calculation timed out for number {Number}", number);
  57.                 return StatusCode(408, "Calculation timed out");
  58.             }
  59.             catch (Exception ex)
  60.             {
  61.                 _logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number);
  62.                 return StatusCode(500, "Internal server error");
  63.             }
  64.         }
  65.         [HttpGet("health")]
  66.         public async Task HealthCheck()
  67.         {
  68.             var isHealthy = await _mathService.HealthCheckAsync();
  69.             return isHealthy ? Ok("RPC service is healthy") : StatusCode(503, "RPC service is unavailable");
  70.         }
  71.     }
  72. }
复制代码
View Code第5步:RPC服务器(RpcServer.Service)

Program.cs
17.gif
18.gif
  1. using RpcServer.Service.Services;
  2. using RpcShared.Models;
  3. using RabbitMQ.Client;
  4. var builder = Host.CreateApplicationBuilder(args);
  5. builder.Services.AddHostedService<FibonacciRpcServer>();
  6. // Configure RabbitMQ
  7. builder.Services.AddSingleton<IConnectionFactory>(sp =>
  8. {
  9.     var configuration = sp.GetRequiredService<IConfiguration>();
  10.     return new ConnectionFactory
  11.     {
  12.         HostName = configuration["RabbitMQ:HostName"],
  13.         UserName = configuration["RabbitMQ:UserName"],
  14.         Password = configuration["RabbitMQ:Password"],
  15.         Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
  16.         VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
  17.         DispatchConsumersAsync = true
  18.     };
  19. });
  20. builder.Services.AddSingleton<FibonacciCalculator>();
  21. var host = builder.Build();
  22. host.Run();
复制代码
View CodeServices/FibonacciCalculator.cs
19.gif
20.gif
  1. using RpcShared.Messages;
  2. namespace RpcServer.Service.Services
  3. {
  4.     public class FibonacciCalculator
  5.     {
  6.         private readonly ILogger<FibonacciCalculator> _logger;
  7.         private readonly Dictionary<int, long> _cache = new();
  8.         public FibonacciCalculator(ILogger<FibonacciCalculator> logger)
  9.         {
  10.             _logger = logger;
  11.         }
  12.         public FibonacciResponse Calculate(int number, bool useOptimized = true)
  13.         {
  14.             var startTime = DateTime.UtcNow;
  15.             try
  16.             {
  17.                 _logger.LogInformation("Calculating Fibonacci({Number}) with optimized: {Optimized}",
  18.                     number, useOptimized);
  19.                 long result;
  20.                 if (useOptimized)
  21.                 {
  22.                     result = CalculateOptimized(number);
  23.                 }
  24.                 else
  25.                 {
  26.                     result = CalculateNaive(number);
  27.                 }
  28.                 var calculationTime = (DateTime.UtcNow - startTime).TotalMilliseconds;
  29.                 _logger.LogInformation(
  30.                     "Fibonacci({Number}) = {Result} (calculated in {Time}ms)",
  31.                     number, result, calculationTime);
  32.                 return new FibonacciResponse
  33.                 {
  34.                     Result = result,
  35.                     CalculationTimeMs = (long)calculationTime,
  36.                     InputNumber = number
  37.                 };
  38.             }
  39.             catch (Exception ex)
  40.             {
  41.                 _logger.LogError(ex, "Error calculating Fibonacci({Number})", number);
  42.                 throw;
  43.             }
  44.         }
  45.         private long CalculateOptimized(int n)
  46.         {
  47.             if (n < 0) throw new ArgumentException("Number must be non-negative");
  48.             if (n <= 1) return n;
  49.             // 检查缓存
  50.             if (_cache.TryGetValue(n, out var cachedResult))
  51.             {
  52.                 _logger.LogDebug("Cache hit for Fibonacci({Number})", n);
  53.                 return cachedResult;
  54.             }
  55.             long a = 0, b = 1;
  56.             for (int i = 2; i <= n; i++)
  57.             {
  58.                 var temp = a + b;
  59.                 a = b;
  60.                 b = temp;
  61.                 // 缓存中间结果
  62.                 if (i % 10 == 0) // 每10个数缓存一次以减少内存使用
  63.                 {
  64.                     _cache[i] = b;
  65.                 }
  66.             }
  67.             // 缓存最终结果
  68.             _cache[n] = b;
  69.             return b;
  70.         }
  71.         private long CalculateNaive(int n)
  72.         {
  73.             if (n < 0) throw new ArgumentException("Number must be non-negative");
  74.             if (n <= 1) return n;
  75.             // 模拟计算密集型任务
  76.             Thread.Sleep(100);
  77.             return CalculateNaive(n - 1) + CalculateNaive(n - 2);
  78.         }
  79.         public void ClearCache()
  80.         {
  81.             _cache.Clear();
  82.             _logger.LogInformation("Fibonacci cache cleared");
  83.         }
  84.     }
  85. }
复制代码
View Code第6步:高级特性 - 带重试的RPC客户端

Services/ResilientRpcClient.cs
21.gif
22.gif
  1. using System.Text;
  2. using System.Text.Json;
  3. using Microsoft.Extensions.Options;
  4. using RabbitMQ.Client;
  5. using RabbitMQ.Client.Events;
  6. using RpcShared.Messages;
  7. using RpcShared.Models;
  8. namespace RpcServer.Service.Services
  9. {
  10.     public class FibonacciRpcServer : BackgroundService
  11.     {
  12.         private readonly IConnection _connection;
  13.         private readonly IModel _channel;
  14.         private readonly FibonacciCalculator _calculator;
  15.         private readonly ILogger<FibonacciRpcServer> _logger;
  16.         private const string QueueName = "rpc_queue";
  17.         public FibonacciRpcServer(
  18.             IConnectionFactory connectionFactory,
  19.             FibonacciCalculator calculator,
  20.             ILogger<FibonacciRpcServer> logger)
  21.         {
  22.             _calculator = calculator;
  23.             _logger = logger;
  24.             // 建立连接和通道
  25.             _connection = connectionFactory.CreateConnection();
  26.             _channel = _connection.CreateModel();
  27.             InitializeQueue();
  28.         }
  29.         private void InitializeQueue()
  30.         {
  31.             // 声明RPC请求队列(持久化)
  32.             _channel.QueueDeclare(
  33.                 queue: QueueName,
  34.                 durable: true,
  35.                 exclusive: false,
  36.                 autoDelete: false,
  37.                 arguments: null);
  38.             // 设置公平分发,每次只处理一个请求
  39.             _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
  40.             _logger.LogInformation("Fibonacci RPC Server initialized and listening on queue: {QueueName}", QueueName);
  41.         }
  42.         protected override Task ExecuteAsync(CancellationToken stoppingToken)
  43.         {
  44.             stoppingToken.ThrowIfCancellationRequested();
  45.             var consumer = new AsyncEventingBasicConsumer(_channel);
  46.             consumer.Received += async (model, ea) =>
  47.             {
  48.                 RpcResponse response = null;
  49.                 string correlationId = ea.BasicProperties.CorrelationId;
  50.                 string replyTo = ea.BasicProperties.ReplyTo;
  51.                 _logger.LogDebug("Received RPC request with correlation ID: {CorrelationId}", correlationId);
  52.                 try
  53.                 {
  54.                     // 处理请求
  55.                     response = await ProcessRequestAsync(ea.Body.ToArray());
  56.                     response.RequestId = correlationId;
  57.                 }
  58.                 catch (Exception ex)
  59.                 {
  60.                     _logger.LogError(ex, "Error processing RPC request {CorrelationId}", correlationId);
  61.                     response = RpcResponse.ErrorResponse(correlationId, ex.Message);
  62.                 }
  63.                 finally
  64.                 {
  65.                     // 发送响应
  66.                     if (!string.IsNullOrEmpty(replyTo))
  67.                     {
  68.                         SendResponse(replyTo, correlationId, response ??
  69.                             RpcResponse.ErrorResponse(correlationId, "Unknown error occurred"));
  70.                     }
  71.                     // 确认消息处理完成
  72.                     _channel.BasicAck(ea.DeliveryTag, false);
  73.                 }
  74.             };
  75.             _channel.BasicConsume(
  76.                 queue: QueueName,
  77.                 autoAck: false, // 手动确认
  78.                 consumer: consumer);
  79.             _logger.LogInformation("Fibonacci RPC Server started successfully");
  80.             return Task.CompletedTask;
  81.         }
  82.         private async Task<RpcResponse> ProcessRequestAsync(byte[] body)
  83.         {
  84.             var startTime = DateTime.UtcNow;
  85.             try
  86.             {
  87.                 var requestJson = Encoding.UTF8.GetString(body);
  88.                 var request = JsonSerializer.Deserialize<RpcRequest>(requestJson);
  89.                 if (request == null)
  90.                 {
  91.                     return RpcResponse.ErrorResponse("unknown", "Invalid request format");
  92.                 }
  93.                 _logger.LogInformation("Processing RPC request {RequestId}, Method: {Method}",
  94.                     request.RequestId, request.Method);
  95.                 // 根据方法名路由到不同的处理逻辑
  96.                 object result = request.Method.ToLowerInvariant() switch
  97.                 {
  98.                     "fibonacci.calculate" => ProcessFibonacciRequest(request),
  99.                     "ping" => new { message = "pong", timestamp = DateTime.UtcNow },
  100.                     _ => throw new NotSupportedException($"Method {request.Method} is not supported")
  101.                 };
  102.                 var processingTime = (DateTime.UtcNow - startTime).TotalMilliseconds;
  103.                 return RpcResponse.SuccessResponse(
  104.                     request.RequestId,
  105.                     result,
  106.                     (long)processingTime);
  107.             }
  108.             catch (Exception ex)
  109.             {
  110.                 var processingTime = (DateTime.UtcNow - startTime).TotalMilliseconds;
  111.                 _logger.LogError(ex, "Error processing RPC request");
  112.                
  113.                 return RpcResponse.ErrorResponse(
  114.                     "unknown",
  115.                     ex.Message,
  116.                     (long)processingTime);
  117.             }
  118.         }
  119.         private FibonacciResponse ProcessFibonacciRequest(RpcRequest request)
  120.         {
  121.             var number = request.GetParameter<int>("number");
  122.             var useOptimized = request.GetParameter<bool>("useOptimized") ?? true;
  123.             if (number < 0)
  124.             {
  125.                 throw new ArgumentException("Fibonacci number must be non-negative");
  126.             }
  127.             // 防止过大的计算消耗资源
  128.             if (number > 50)
  129.             {
  130.                 throw new ArgumentException("Number too large for calculation");
  131.             }
  132.             return _calculator.Calculate(number, useOptimized);
  133.         }
  134.         private void SendResponse(string replyTo, string correlationId, RpcResponse response)
  135.         {
  136.             try
  137.             {
  138.                 var responseJson = JsonSerializer.Serialize(response);
  139.                 var responseBody = Encoding.UTF8.GetBytes(responseJson);
  140.                 var properties = _channel.CreateBasicProperties();
  141.                 properties.CorrelationId = correlationId;
  142.                 properties.Persistent = true;
  143.                 _channel.BasicPublish(
  144.                     exchange: "",
  145.                     routingKey: replyTo,
  146.                     basicProperties: properties,
  147.                     body: responseBody);
  148.                 _logger.LogDebug("Sent RPC response for correlation ID: {CorrelationId}", correlationId);
  149.             }
  150.             catch (Exception ex)
  151.             {
  152.                 _logger.LogError(ex, "Error sending RPC response for correlation ID: {CorrelationId}", correlationId);
  153.             }
  154.         }
  155.         public override void Dispose()
  156.         {
  157.             _channel?.Close();
  158.             _channel?.Dispose();
  159.             _connection?.Close();
  160.             _connection?.Dispose();
  161.             
  162.             _logger.LogInformation("Fibonacci RPC Server disposed");
  163.             
  164.             base.Dispose();
  165.         }
  166.     }
  167. }
复制代码
View Code第7步:运行与测试


  • 启动服务
    1. using Microsoft.Extensions.Logging;
    2. using Polly;
    3. using Polly.Retry;
    4. using RpcShared.Models;
    5. namespace RpcClient.Core.Services
    6. {
    7.     public class ResilientRpcClient : IRpcClient
    8.     {
    9.         private readonly IRpcClient _innerClient;
    10.         private readonly ILogger<ResilientRpcClient> _logger;
    11.         private readonly AsyncRetryPolicy _retryPolicy;
    12.         public ResilientRpcClient(IRpcClient innerClient, ILogger<ResilientRpcClient> logger)
    13.         {
    14.             _innerClient = innerClient;
    15.             _logger = logger;
    16.             // 配置重试策略
    17.             _retryPolicy = Policy
    18.                 .Handle<TimeoutException>()
    19.                 .Or<InvalidOperationException>()
    20.                 .WaitAndRetryAsync(
    21.                     retryCount: 3,
    22.                     sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
    23.                     onRetry: (exception, delay, retryCount, context) =>
    24.                     {
    25.                         _logger.LogWarning(
    26.                             "RPC call failed. Retry {RetryCount} after {Delay}ms. Error: {Error}",
    27.                             retryCount, delay.TotalMilliseconds, exception.Message);
    28.                     });
    29.         }
    30.         public async Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout)
    31.         {
    32.             return await _retryPolicy.ExecuteAsync(async () =>
    33.             {
    34.                 return await _innerClient.CallAsync(request, timeout);
    35.             });
    36.         }
    37.         public async Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class
    38.         {
    39.             return await _retryPolicy.ExecuteAsync(async () =>
    40.             {
    41.                 return await _innerClient.CallAsync<TResponse>(request, timeout);
    42.             });
    43.         }
    44.         public void Dispose()
    45.         {
    46.             _innerClient?.Dispose();
    47.         }
    48.     }
    49. }
    复制代码
  • 测试API
    1. # 终端1:启动RPC服务器
    2. cd RpcServer.Service
    3. dotnet run
    4. # 终端2:启动RPC客户端API
    5. cd RpcClient.API
    6. dotnet run
    复制代码
  • 测试错误场景
    1. # 计算斐波那契数列
    2. curl -X GET "https://localhost:7000/api/math/fibonacci/10"
    3. curl -X GET "https://localhost:7000/api/math/fibonacci/20/detailed"
    4. # 健康检查
    5. curl -X GET "https://localhost:7000/api/math/health"
    复制代码
  • 观察日志输出

    • 客户端发送请求,生成CorrelationId
    • 服务器接收请求,处理计算
    • 服务器发送响应,使用相同的CorrelationId
    • 客户端接收响应,匹配CorrelationId

第8步:性能测试和监控

创建性能测试控制器
  1. # 测试超时(设置很小的超时时间)
  2. # 测试无效输入
  3. curl -X GET "https://localhost:7000/api/math/fibonacci/-5"
  4. curl -X GET "https://localhost:7000/api/math/fibonacci/100"
复制代码
本章总结

在这一章中,我们完整实现了RabbitMQ的RPC模式:

  • RPC核心概念:理解了回调队列、关联ID、请求-响应模式。
  • 客户端实现:创建了能够发送请求并异步等待响应的RPC客户端。
  • 服务器实现:构建了处理请求并返回响应的RPC服务器。
  • 错误处理:实现了超时控制、异常处理和重试机制。
  • 性能优化:使用缓存和优化算法提高计算效率。
  • ** resilience**:通过Polly实现了弹性重试策略。
RPC模式为微服务架构提供了强大的同步通信能力,结合消息队列的异步特性,既保持了系统的解耦性,又提供了同步调用的便利性。这种模式特别适合需要等待计算结果的分布式任务。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册