找回密码
 立即注册
首页 业界区 业界 哎呀,我老大写Bug啦——记一次MessageQueue的优化 ...

哎呀,我老大写Bug啦——记一次MessageQueue的优化

嗳歉楞 2025-5-29 05:02:24
  MessageQueue,顾名思义消息队列,在系统开发中也是用的比较多的一个中间件吧。我们这里主要用它来做日志管理和订单管理的,记得老老大(恩,是的,就是老老大,因为他已经跳槽了)还在的时候,当时也是为了赶项目进度,他也参与开发了,那时候我才刚刚入职,他负责写后端这块,我来了就把他手上的任务接过来了,(接着接着……就辞职了)。
1.png

之后我们的开发仍然有条不紊的开发着,直到今年的一月份吧,才上线开始运行,然后就出现了常规状态,上线之后就开始爆炸,
                                                                                     
2.png

这个页面打不开呀,那个内容没东西呀,第三方登录问题呀,支付问题呀,临时再改需求呀……(该来的都来了),加班、debug、测试、再debug……,然后经过几天的修复,终于完成了跟自己电脑一样稳定的运行,组员们都美滋滋的,今晚加个鸡腿才行。
                                                                                    
3.gif

都说祸不单行,古人是不会骗我们的,Bug怎么会修得完呢?天真,要是Bug能修得完还要我们来干啥,好景不长,果然,过了一周之后,组员突然群里叫喳喳,
4.png
5.png

what is it ? 

 
6.png

 
来了,今天的主角登场了,我也要开始加班了。
RabbitMQ

  这个是今天要说的东西,基础概念什么的不是今天要说的重点,重点是:
7.png

 
RabbitMQ内存暴涨!使得整个服务器濒临瘫痪,远程登录服务器都差点挤不进去的状态,别看截图目前才1.3G,吃个午饭回来,就2.3G了,可怕不可怕?咋回事?
老板喊你回来加班啦

  先不管了,线上优先解决,手动先Reset回收资源以释放空间,这个只是临时的办法,然后检查一下rabbitMQ的配置有没有问题,路径在
 C:\Users\Administrator\AppData\Roaming\RabbitMQ 
8.png

完全是默认的配置,完全ojbk啊,那到底咋回事?继续检查,想想不如从项目开始吧,然后查看项目中的代码,都是从来自【MessageLib】的组件调用
9.png

10.png

好了,叫我老老大要这个组件的代码,他把git的地址就发给我,我把项目down下来,
11.png

这个封装的组件内容不多,主要的文件一目了然,其实就是用到这个两个组件来进行的二次封装来调用
12.png

主要的代码是在【MessageQueue.cs】文件里,展示一下当时的代码情况:
13.gif
14.gif
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using MessageLib.ClassBean;
  7. using EasyNetQ;
  8. using System.Threading;
  9. namespace MessageLib
  10. {
  11.     public static class MessageQueue
  12.     {
  13.         public static IBus bus = MQBusBuilder.CreateMessageBus();
  14.         //消息队列
  15.         private static Queue<Item> NoticQueue = new Queue<Item>(5000);
  16.         //日志队列
  17.         private static Queue<Item> LogQueue = new Queue<Item>(5000);
  18.         //队列数目发布数量
  19.         private static int max_count_to_pulish = 1000;
  20.         /// <summary>
  21.         /// 可供外部使用的消息入列操作
  22.         /// </summary>
  23.         public static void push(Item item)
  24.         {
  25.             if (item.type == ItemType.notic)
  26.             {
  27.                 NoticQueue.Enqueue(item);
  28.             }
  29.             if (item.type == ItemType.log)
  30.             {
  31.                 LogQueue.Enqueue(item);
  32.             }
  33.         }
  34.         /// <summary>
  35.         /// 监听后需要调用的发布接口
  36.         /// </summary>
  37.         private static void Pulish(object source, System.Timers.ElapsedEventArgs e)
  38.         {
  39.             if (NoticQueue.Count > 0 || LogQueue.Count > 0)
  40.             {
  41.                 if (bus == null || !bus.IsConnected)
  42.                 {
  43.                     bus = MQBusBuilder.CreateMessageBus();
  44.                 }
  45.                 if (bus.IsConnected)
  46.                 {
  47.                     Send(ItemType.notic);
  48.                     Send(ItemType.log);
  49.                 }
  50.             }
  51.         }
  52.         /// <summary>
  53.         /// 程序自运行并开始监听
  54.         /// </summary>
  55.         public static void Run()
  56.         {
  57.             System.Timers.Timer timer = new System.Timers.Timer();
  58.             timer.Interval = 1000;
  59.             timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;   
  60.             timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);   
  61.             timer.Enabled = true;//是否执行System.Timers.Timer.Elapsed事件;   
  62.         }
  63.         /// <summary>
  64.         /// 启动线程异步调用
  65.         /// </summary>
  66.         /// <param name="channelType"></param>
  67.         private static void Send(string channelType)
  68.         {
  69.             Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
  70.             thread.IsBackground = true;
  71.             thread.Start(channelType);
  72.         }
  73.         /// <summary>
  74.         /// 调用发布日志及提醒两个接口
  75.         /// </summary>
  76.         /// <param name="channel"></param>
  77.         private static void PublishAction(object channel)
  78.         {
  79.             PublisLog();
  80.             PublisNotic();
  81.         }
  82.         /// <summary>
  83.         /// 日志消息发送至RabbitMQ指定exchange、Queue
  84.         /// </summary>
  85.         private static void PublisLog()
  86.         {
  87.             string channelName = ItemType.log;
  88.             try
  89.             {
  90.                 var routingKey = channelName;
  91.                 var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
  92.                 var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}",channelName), "direct");
  93.                 var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
  94.                 while (LogQueue.Count > 0)
  95.                 {
  96.                     Item item = LogQueue.Dequeue();
  97.                     if (item != null)
  98.                     {
  99.                         var properties = new MessageProperties();
  100.                         var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
  101.                         Message.Properties.AppId = item.appid;
  102.                         bus.Advanced.Publish(exchange, routingKey, false, Message);
  103.                     }
  104.                 }
  105.             }
  106.             catch (Exception ex)
  107.             {
  108.                 throw ex;
  109.             }
  110.         }
  111.         /// <summary>
  112.         /// 提醒消息发送至RabbitMQ指定exchange、Queue
  113.         /// </summary>
  114.         private static void PublisNotic()
  115.         {
  116.             string channelName = ItemType.notic;
  117.             var routingKey = channelName;
  118.             var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
  119.             var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");
  120.             var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
  121.             while(NoticQueue.Count > 0)
  122.             {
  123.                 Item item = NoticQueue.Dequeue();
  124.                 if (item != null)
  125.                 {
  126.                     var properties = new MessageProperties();
  127.                     var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
  128.                     Message.Properties.AppId = item.appid;
  129.                     bus.Advanced.Publish(exchange, routingKey, false, Message);
  130.                 }
  131.             }
  132.         }
  133.     }
  134. }
复制代码
View Code然后我就发现了这一段代码!
  1.         /// <summary>
  2.         /// 程序自运行并开始监听
  3.         /// </summary>
  4.         public static void Run()
  5.         {
  6.             System.Timers.Timer timer = new System.Timers.Timer();
  7.             timer.Interval = 1000;
  8.             timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;   
  9.             timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);   
  10.             timer.Enabled = true;//是否执行System.Timers.Timer.Elapsed事件;   
  11.         }
复制代码
  1.         /// <summary>
  2.         /// 启动线程异步调用
  3.         /// </summary>
  4.         /// <param name="channelType"></param>
  5.         private static void Send(string channelType)
  6.         {
  7.             Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
  8.             thread.IsBackground = true;
  9.             thread.Start(channelType);
  10.         }
复制代码
15.png

  老老大写Bug了,当Run()起来之后,队列中【NoticQueue】有内容,就开始推送消息,发送消息Send(),每来一次推送new一个线程并设置为后台线程,然后发送消息。好了,明白了,这里的线程很混乱,因为线程操作不当,new了N多个频道,并且没有主动回收,这也难怪内存暴涨呢。并且要是Run()调用多次,后果更加不堪设想。
加班改起来

  开始动手吧,业务主要推送有普通消息、错误消息和通知消息,那么将队列与线程组装一起,新增一个类QueueTask.cs:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MessageLib.Core;
  8. using MessageLib.Core.ClassBean;
  9. using EasyNetQ;
  10. using EasyNetQ.Topology;
  11. using System.Linq.Expressions;
  12. namespace MessageLib.Core
  13. {
  14.     public class QueueTask
  15.     {
  16.         private Queue<Item> QueueData = new Queue<Item>(5000);
  17.         //队列数目发布数量
  18.         private int max_count_to_pulish = 1000;
  19.         public  bool isRunning = false;
  20.         private string itemType = ItemType.info;
  21.         private string MessageRouter = ItemType.info;
  22.         public QueueTask(string itemType,string MessageRouter)
  23.         {
  24.             this.itemType = itemType;
  25.             this.MessageRouter = MessageRouter;
  26.         }
  27.         /// <summary>
  28.         /// 可供外部使用的消息入列操作
  29.         /// </summary>
  30.         public void Push(Item item, IBus IBus)
  31.         {
  32.             QueueData.Enqueue(item);
  33.             if (!isRunning)
  34.                 Run(IBus);
  35.         }
  36.         public void Run(IBus IBus)
  37.         {
  38.             if (!isRunning)
  39.             {
  40.                 Timer timerNotic = new Timer(PulishMsg, IBus, 1000, 1000);
  41.                 isRunning = true;
  42.             }
  43.         }
  44.         private void PulishMsg(object state)
  45.         {
  46.             IBus IBus = state as IBus;
  47.             if (QueueData.Count > 0)
  48.             {
  49.                 PublisMsg(itemType, IBus);
  50.             }
  51.         }
  52.         private void PublisMsg(object channel, IBus BusInstance)
  53.         {
  54.             try
  55.             {
  56.                 string channelName = channel as string;
  57.                 if (QueueData.Count > 0)
  58.                 {
  59.                     var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
  60.                     var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);
  61.                     var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);
  62.                     while (QueueData.Count > 0)
  63.                     {
  64.                         Item item = QueueData.Dequeue();
  65.                         if (item != null)
  66.                         {
  67.                             var properties = new MessageProperties();
  68.                             var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
  69.                             Message.Properties.AppId = item.appid;
  70.                             BusInstance.Advanced.Publish(exchange, mqqueue.Name, false, Message);
  71.                         }
  72.                     }
  73.                 }
  74.             }
  75.             catch (Exception ex)
  76.             {
  77.                 Console.WriteLine("PublisMsg error:" + ex.Message);
  78.             }
  79.         }
  80.         public void Read<T>(IBus BusInstance,Action<Item> dealAction) where T : Item
  81.         {
  82.             try
  83.             {
  84.                 string channelName = itemType;
  85.                 var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
  86.                 var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);
  87.                 var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);
  88.                 var Consume = BusInstance.Advanced.Consume(mqqueue, registration => Task.Run(() =>
  89.                 {
  90.                     registration.Add<string>((message, info) =>
  91.                     {
  92.                         Item data = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message.Body);
  93.                         dealAction(data);
  94.                     });
  95.                 }));
  96.             }
  97.             catch (Exception ex)
  98.             {
  99.                 Console.WriteLine("Read error:" + ex.Message);
  100.             }
  101.         }
  102.     }
  103. }
复制代码
 
然后,在MessageQueue.cs修改为单例模式:
16.gif
17.gif
  1.     public static class MessageQueue
  2.     {
  3.         /*Install-Package EasyNetQ-dotnet-core -Version 2.0.2-radicalgeek-netc0001 -Pre*/
  4.         private static IBus bus = null;
  5.         public static bool isRunning = false;
  6.         //消息队列
  7.         private static QueueTask NoticQueue = null;
  8.         //日志队列
  9.         private static QueueTask LogQueue = null;
  10.         //自定义
  11.         private static QueueTask InfoQueue = null;
  12.         #region 同步锁
  13.         private static readonly object obj = new object();
  14.         #endregion
  15.         public static void Init(string Connection, string routeKey)
  16.         {
  17.             if (NoticQueue == null)
  18.                 NoticQueue = new QueueTask(ItemType.notic, ItemType.notic);
  19.             if (LogQueue == null)
  20.                 LogQueue = new QueueTask(ItemType.error, ItemType.error);
  21.             if (InfoQueue == null)
  22.                 InfoQueue = new QueueTask(ItemType.info, routeKey);
  23.             if (string.IsNullOrEmpty(MQBusBuilder.Connnection))
  24.                 MQBusBuilder.Connnection = Connection;
  25.         }
  26.         public static IBus BusInstance
  27.         {
  28.             get
  29.             {
  30.                 if (bus == null)
  31.                 {
  32.                     lock (obj)
  33.                     {
  34.                         if (bus == null|| !bus.IsConnected)
  35.                         {
  36.                             bus = MQBusBuilder.CreateMessageBus();
  37.                         }
  38.                     }
  39.                 }
  40.                 return bus;
  41.             }
  42.         }
  43.         /// <summary>
  44.         /// 可供外部使用的消息入列操作
  45.         /// </summary>
  46.         public static void PushAndRun(Item item)
  47.         {
  48.             if (string.IsNullOrWhiteSpace(MQBusBuilder.Connnection) || BusInstance == null)
  49.                 return;
  50.             if (item.type == ItemType.notic)
  51.             {
  52.                 NoticQueue.Push(item, BusInstance);
  53.             }
  54.             if (item.type == ItemType.error)
  55.             {
  56.                 LogQueue.Push(item, BusInstance);
  57.             }
  58.             if (item.type == ItemType.info)
  59.             {
  60.                 InfoQueue.Push(item, BusInstance);
  61.             }
  62.         }
  63.         public static void Read(string itemType, Action<Item> dealAction)
  64.         {
  65.             if (itemType == ItemType.notic)
  66.             {
  67.                 NoticQueue.Read<NoticItem>(BusInstance, dealAction);
  68.             }
  69.             if (itemType == ItemType.error)
  70.             {
  71.                 LogQueue.Read<ErrorItem>(BusInstance, dealAction);
  72.             }
  73.             if (itemType == ItemType.info)
  74.             {
  75.                 InfoQueue.Read<Message>(BusInstance, dealAction);
  76.             }
  77.         }
  78.     }
复制代码
View Code每次推送消息的时候,每个QueueTask就自己维护自己的线程和队列了,当调用推送之后,就开始运作起来。恩,应该没问题了。然后就发布nuget,再更新项目,然后发布。观察一段时间,恩,完美。
18.png

19.png

 
事件二

  事情过后,B端开始搞起来了,然后涉及到订单系统,跟老大(不是老老大,老老大那时候已经跑了)商量之后确定使用消息队列来做订单的事件的拓展,然后就直接美滋滋的调用好之前写的了,没想到啊,这次是线程暴涨!因为订单是从B端推送过来的,B端肯定没事,订单后台订阅消息之后,读取过程中出现的线程增多,然后看看之前写的Read()方法,感觉没啥问题啊,每运行完一次,就多了一个线程,这个神奇了啊,那么源代码撸起来。
  1. https://github.com/EasyNetQ/EasyNetQ
复制代码
20.png

翻来覆去,看到这个Consume方法,继承的是IDisposable接口,得勒,知道咋回事了。
21.png

Consume.Dispose(); 多个消费者的情况下,用完请记得主动释放啊。
这回真的可以浪了。
 
22.gif

总结

  遇到问题,冷静下来,耐得了寂寞才行。线上的问题优先解决,然后再慢慢Debug,解决不了,看源码,再解决不了,降级处理,欢迎共同探讨。同时也感谢一下技术群里的兄弟给的一些建议,并帮忙查找资料,还好EasyNetQ是开源了,不然也打算说先不用了,毕竟一开始没什么用户量,所以没必要整那么麻烦,加班加点的弄这个问题。不过最终都完美的解决了,心里还是挺美滋滋的,程序猿随之而来的成就感。
  别看我们在工位上默不作声,我们可能在拯救世界呢!老板,该加工资啦!
                                                                                             
23.jpeg

 补充


2018-12-25  鉴于大伙私信我想看看原来的bug修复后的情况,毕竟是公司代码不适合完全开源,我单独把例子源码做过修改的发布出来,思路都差不多的,对比一下文章中原来的有问题的代码就可以了吧。因为都已经修复掉了,修改后的在这里。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册