C#.Net,本地高速产生的数据,http client发送给服务端保存
高性能C#.NET HttpClient数据发送方案为了实现高速数据产生环境下稳定可靠地向服务端发送数据,同时不影响数据源性能和界面响应,我将设计一个基于生产者-消费者模式的异步解决方案。
方案设计
核心组件
[*]数据生产者:高速生成数据的模块
[*]并发队列:作为缓冲区,解耦生产者和消费者
[*]数据消费者:从队列取出数据并发送到服务端
[*]HTTP客户端池:优化HTTP连接管理
[*]故障处理机制:确保数据不丢失
完整实现代码
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Net.Http; using System.Threading; using System.Threading.Tasks; public class HighPerformanceDataSender : IDisposable { private readonly BlockingCollection _dataQueue; private readonly HttpClient _httpClient; private readonly string _apiEndpoint; private readonly CancellationTokenSource _cts; private readonly int _maxRetryCount; private readonly int _batchSize; private readonly TimeSpan _batchTimeout; private readonly Task _senderTask; // 统计数据 private long _totalSent; private long _totalErrors; private long _queueSize; public HighPerformanceDataSender(string apiEndpoint, int maxQueueSize = 100000, int maxRetryCount = 3, int batchSize = 100, TimeSpan? batchTimeout = null) { _dataQueue = new BlockingCollection(maxQueueSize); _httpClient = CreateHttpClient(); _apiEndpoint = apiEndpoint; _cts = new CancellationTokenSource(); _maxRetryCount = maxRetryCount; _batchSize = batchSize; _batchTimeout = batchTimeout ?? TimeSpan.FromMilliseconds(500); // 启动后台发送任务 _senderTask = Task.Run(() => ProcessQueueAsync(_cts.Token)); } private HttpClient CreateHttpClient() { var handler = new HttpClientHandler { MaxConnectionsPerServer = Environment.ProcessorCount * 2, UseProxy = false }; return new HttpClient(handler) { Timeout = TimeSpan.FromSeconds(30) }; } // 添加数据到队列(生产者调用) public bool EnqueueData(DataItem data) { try { bool added = _dataQueue.TryAdd(data, 50, _cts.Token); // 小超时防止无限阻塞 if (added) Interlocked.Increment(ref _queueSize); return added; } catch (OperationCanceledException) { return false; } } // 处理队列中的消息(消费者) private async Task ProcessQueueAsync(CancellationToken ct) { var batch = new List(_batchSize); var batchTimer = new System.Diagnostics.Stopwatch(); while (!ct.IsCancellationRequested) { try { batch.Clear(); batchTimer.Restart(); // 从队列中获取一批数据,或等待超时 while (batch.Count < _batchSize && batchTimer.Elapsed < _batchTimeout) { if (_dataQueue.TryTake(out DataItem item, (int)_batchTimeout.TotalMilliseconds, ct)) { batch.Add(item); Interlocked.Decrement(ref _queueSize); } } if (batch.Count > 0) { await SendBatchWithRetryAsync(batch, ct).ConfigureAwait(false); Interlocked.Add(ref _totalSent, batch.Count); } } catch (OperationCanceledException) { // 正常退出 break; } catch (Exception ex) { Interlocked.Increment(ref _totalErrors); // 记录错误,可以选择重试或将数据放回队列 LogError($"Error processing queue: {ex.Message}"); await Task.Delay(1000, ct).ConfigureAwait(false); // 出错后稍作暂停 } } } private async Task SendBatchWithRetryAsync(List batch, CancellationToken ct) { int retryCount = 0; while (retryCount < _maxRetryCount && !ct.IsCancellationRequested) { try { using var content = new StringContent(SerializeData(batch)); content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json"); using var response = await _httpClient.PostAsync(_apiEndpoint, content, ct).ConfigureAwait(false); if (response.IsSuccessStatusCode) { return; // 发送成功 } // 服务器错误,需要重试 var errorContent = await response.Content.ReadAsStringAsync(); throw new HttpRequestException($"Server error: {response.StatusCode} - {errorContent}"); } catch (Exception ex) when (retryCount < _maxRetryCount - 1) { retryCount++; LogError($"Attempt {retryCount} failed: {ex.Message}"); await Task.Delay(CalculateRetryDelay(retryCount), ct).ConfigureAwait(false); } } // 如果所有重试都失败,可以选择记录或采取其他措施 LogError($"Failed to send batch after {_maxRetryCount} attempts."); } private TimeSpan CalculateRetryDelay(int attempt) { // 指数退避策略 return TimeSpan.FromSeconds(Math.Pow(2, attempt)); } private string SerializeData(List batch) { // 使用System.Text.Json或Newtonsoft.Json进行序列化 return System.Text.Json.JsonSerializer.Serialize(batch); } private void LogError(string message) { // 实际项目中可以使用日志框架 Console.WriteLine($" {DateTime.UtcNow}: {message}"); } // 统计数据访问 public long TotalSent => _totalSent; public long TotalErrors => _totalErrors; public long CurrentQueueSize => _queueSize; // 优雅停止 public async Task StopAsync() { _cts.Cancel(); await _senderTask.ConfigureAwait(false); } public void Dispose() { StopAsync().Wait(); _httpClient.Dispose(); _cts.Dispose(); _dataQueue.Dispose(); } } // 数据项定义 public class DataItem { public DateTime Timestamp { get; set; } public string Payload { get; set; } // 其他必要字段... }
关键优化点
[*]生产者-消费者模式:
[*]使用BlockingCollection作为线程安全的缓冲区
[*]生产者和消费者完全解耦
[*]批量发送:
[*]按照时间或数量条件批量发送数据
[*]显著减少HTTP请求数量
[*]性能优化:
[*]使用HttpClient池化
[*]设置合理的连接限制
[*]异步操作避免阻塞
[*]可靠性:
[*]指数退避重试机制
[*]完善的错误处理
[*]避免数据丢失
[*]可观测性:
[*]内置性能计数器
[*]错误日志记录
使用示例
// 初始化发送器 var sender = new HighPerformanceDataSender( apiEndpoint: "https://api.example.com/data", maxQueueSize: 50000, batchSize: 200 ); // 模拟数据生产者 Task.Run(() => { var random = new Random(); while (true) { var data = new DataItem { Timestamp = DateTime.UtcNow, Payload = GenerateRandomData(random) }; if (!sender.EnqueueData(data)) { // 队列已满,处理策略... } Thread.Sleep(1); // 模拟高速数据产生 } }); // UI线程可以定期检查统计数据 Timer statsTimer = new Timer(_ => { Console.WriteLine($"Sent: {sender.TotalSent}, Errors: {sender.TotalErrors}, Queue: {sender.CurrentQueueSize}"); }, null, 1000, 1000);
扩展建议
[*]持久化队列:考虑使用磁盘备份队列防止程序崩溃丢失数据
[*]流量控制:实现背压机制防止生产过快消费过慢
[*]动态调整:根据网络条件自动调整批量大小和延迟
[*]监控集成:添加指标导出供Prometheus等监控系统使用
[*]多目的地支持:扩展支持多个服务端目标
这个方案提供了高吞吐量、低延迟的数据发送能力,同时确保不会拖慢数据产生端的性能或导致UI卡顿。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]