找回密码
 立即注册
首页 业界区 业界 也是出息了,业务代码里面也用上算法了。 ...

也是出息了,业务代码里面也用上算法了。

豌畔丛 昨天 19:20
你好呀,我是歪歪。
好消息,好消息,歪师傅最近写业务代码的时候,遇到一个可以优化的点。
然后,灵机一动,想到一个现成的算法可以拿来用。
业务代码中能用到算法,虽然不是头一遭,但是也真的是算难得了。
记录一下,分享一波。
走起。
1.png
场景

场景是这样的。
首先,我有一批数据要调用下游系统的一个统一的接口,去查询数据状态。
这一批数据,分属于不同的平台,所以调用下游查询接口的时候,我会告诉它有的数据要去 A 平台查询,有的数据要去 B 平台查询...
大概就是这个意思:
2.png
在这个场景下,我们还不用关心平台方的同步返回结果,因为最终的结果平台方会异步通知回来。
这样看起来是一个非常简单的场景对不对?
现在我们在这个基础上加一个小小的变化。
由于这个下游系统是一个非常重要的系统,承担着全公司所流量的出人口,可以说是咽喉要道。
所以,出于保护自身的目的,它对调用方的接口都做了限流。
对于我这个小卡拉米的、边边角角的查询动作,它给的限流就是一秒最多一笔。
也就是说,我调用查询接口的时候,线程池什么的就别想了,老老实实的排队,然后一秒一个的发请求,
大概就是这个意思:
3.png
这样看着也没有问题,一秒一个控制起来还不简单吗?
优雅一点的,你就上个限流器,八股文翻出来一看,什么信号量、令牌桶、Guava RateLimiter 随便掏一个出来用就行了。
糙一点的,你直接 for 循环里面 sleep 一秒也不是不可以。
我是一个糙人,所以我直接选择 sleep,大道至简。
伪代码大概是这样的:
  1. // 伪代码:从数据库获取某平台数据后,在循环中每秒调用一次下游接口
  2. public void processDataWithDelay() {
  3.     // 1. 从数据库获取数据(示例使用伪方法)
  4.     List<DataObject> dataList = database.fetchData("A"); // 假设返回数据列表
  5.     
  6.     // 2. 遍历每条数据
  7.     for (DataObject data : dataList) {
  8.         try {
  9.             // 3. 休眠1秒(1000毫秒)
  10.             Thread.sleep(1000); 
  11.             // 4. 调用下游系统查询接口
  12.             downstreamService.query(data.getId());
  13.         } catch (Exception e) {
  14.         }
  15.     }
  16. }
复制代码
整体来说就是先把一个平台的数据全部处理完成后,再处理下一个平台的数据。
用图说话大概就是这样的:
4.png
这样看着也没有毛病。
但是,有一天,A 平台找过来说:哥们,你们这个查询有点太快了,1s 一个查得我有点扛不住。能不能调一下,比如调成 6s 一次?
本来我想追问一下:就这么差劲儿吗,一个查询接口,一秒一个都扛不住?
但是本着程序员不难为程序员的原则,我还是忍住了。
6s 一次,这还不简单嘛。
我把前面伪代码中的 1000ms,修改成配置项,默认为 1000ms,如果某个平台有个性化需求,我直接给对应平台的配置一个专属的间隔时间就行了:
  1. // 伪代码:从数据库获取某平台数据后,在循环中每秒调用一次下游接口
  2. public void processDataWithDelay() {
  3.     // 1. 从数据库获取A平台数据(示例使用伪方法)
  4.     List<DataObject> dataList = database.fetchData("A平台"); // 假设返回数据列表
  5.     
  6.     // 2. 从数据库获取A平台休眠时间配置(毫秒)
  7.     int sleepInterval = getSleepIntervalFromConfig("A平台"); 
  8.     
  9.     for (DataObject data : dataList) {
  10.         try {
  11.             // 3. 休眠
  12.             Thread.sleep(sleepInterval); 
  13.             // 4. 调用下游系统查询接口
  14.             downstreamService.query(data.getId());
  15.         } catch (Exception e) {
  16.         }
  17.     }
  18. }
复制代码
用图说话就是这样的:
5.png
是不是完美的解决了 A 平台的问题?
但是,你想想这个调整之后,带来的新问题是什么?
A 平台假设 100 条数据,之前一秒一个,100 秒就发完了,然后 B、C 平台的数据就能接着处理了。
现在 A 平台的 100 条数据要发 600 秒,由于是排着队串行执行,导致 B、C 平台的数据,都因为 A 平台处理慢了,得跟着慢。
整个数据处理的事件周期就长了。
而且实际情况是更长,因为每次处理的时候,A 平台不止 100 条数据,基本上都是好几千条。平台也不只是 A、B、C 三家,而是有好几家。
怎么办?
遇到事情不要慌,三思而后行。

  • 能不能不做?
  • 能不能给别人做?
  • 能不能晚点做?
6.png
我也三思了,具体是这样的。
首先,能不能不做?
不能不做,因为已经有平台已经问过来了,为什么数据发的比之前晚了,能不能早点?
然后,能不能给别人做?
给别人做就是找下游把接口限流放大一点,但是我这个小卡拉米的、边边角角的查询动作,没有充足的理由。可以说一秒一个都是别人看着可怜,施舍来的。再要多点,就不礼貌了。所以不能给别人做。
最后,能不能晚点做?
也不能晚点做,因为这个查询动作背后有业务含义。业务说了,要尽快解决。技术是为了业务服务的,业务说了要尽快,就要尽快。
所以,只有自救。
思路

我们先捋一下当前的困难点。
第一个:下游系统限流,最多一秒一个卡的死死的。
第二个:有个平台觉得一秒一个太快了。
第三个:调整了这个平台的速率之后,影响到了其他平台的数据处理。
现在你想想,应该怎么做?
我想到的破题之道,就是这样的:
7.png
A 平台的间隔时间调整了之后,时间其实是被白白的浪费了。
中间间隔的 6s 完全可以继续按照一秒一个的频率发其他平台的数据嘛。
比如这样的:
8.png
我们仔细看看上面这个图片。
首先,一秒一个,不会触发下游系统的限流。
然后两个 A 平台的数据调用间隔,也是 6s,符合要求。
中间穿插着其他平台的数据,可以说是雨露均沾。
如果有一天 A 平台说:6s 我也扛不住不,10s 行不行?
行啊,怎么不行。
按照这个思路,我只要把中间的 10s 充分利用起来就行。
怎么实现

思路有了,按照这个思路,我最先想到的一个实现方案就是:加权轮询负载均衡策略算法。
如果你一时间关联不起来这二者之间的联系,那我给你捋一下我是怎么想的。
首先,之前是这样的:
9.png
各个平台的数据串起来,先把一个平台的数据全部处理完成后,再处理下一个平台的数据。
那我们是不是可以换一个思路,先获取待处理数据的平台集合,然后从平台集合中每隔一秒选一个平台出来,再获取这个平台的一条数据,调用查询接口:
10.png
从集合中选一个出来执行,这个“选”的动作,不就和负载均衡策略非常像吗?
然后,A 平台要间隔 6s 才能发一条过去,其他平台保持一秒一个。
说明什么?
说明 A 平台处理数据的能力不行。
在负载均衡策略中,遇到性能不好的机器,怎么选择算法?
是不是“加权轮询策略”就呼之欲出了?
假设,就是 A、B、C 三个平台,A 的服务水平差,它们的权重比是 A:B:C=1:2:3。
所以总权重为1 + 2 + 3 = 6。
加权轮询负载均衡算法会根据权重比例分配请求,生成一个调用序列。
假设,我们一共调用 6 次,那么经过加权轮询负载均衡算法,序列就是这样的:
11.png
假设,我们一共调用 12 次,那么序列就是这样的:
12.png
巧了,你看,两次 A 平台的数据之间刚好隔了 6s:
13.png
而在加权轮询负载均衡算法的加持下,这 6s 也没闲着,发了 5 个其他平台的数据出去,也没有触发下游系统的限流。
雨露均沾,舒服了。
啥,你说你没看懂?
那说明你不懂加权轮询负载均衡策略的底层原理。
可以去考古一下这个文章,看看多年前歪师傅稚嫩的文笔:《加权轮询负载均衡策略》
最后,在这里附上一个加权轮询负载均衡策略的代码实现,你粘过去就能跑:
  1. public class WeightedRoundRobin {
  2.     private List<Server> servers = new ArrayList<>();
  3.     private int index = -1;
  4.     private int remaining = 0;

  5.     static class Server {
  6.         String name;
  7.         int weight;
  8.         int current;
  9.         
  10.         Server(String name, int weight) {
  11.             this.name = name;
  12.             this.weight = weight;
  13.             this.current = weight;
  14.         }
  15.     }

  16.     public void addServer(String name, int weight) {
  17.         servers.add(new Server(name, weight));
  18.         remaining += weight;
  19.     }

  20.     public String getNext() {
  21.         if (remaining == 0) resetWeights();
  22.         
  23.         while (true) {
  24.             index = (index + 1) % servers.size();
  25.             Server server = servers.get(index);
  26.             if (server.current > 0) {
  27.                 server.current--;
  28.                 remaining--;
  29.                 return server.name;
  30.             }
  31.         }
  32.     }

  33.     private void resetWeights() {
  34.         for (Server s : servers) {
  35.             s.current = s.weight;
  36.             remaining += s.weight;
  37.         }
  38.     }
  39. }
复制代码
写个 main 方法跑一跑:
[code]    public static void main(String[] args) {
        WeightedRoundRobin wrr = new WeightedRoundRobin();
        wrr.addServer("A平台", 1);
        wrr.addServer("B平台", 2);
        wrr.addServer("C平台", 3);
        
        for (int i = 0; i 
您需要登录后才可以回帖 登录 | 立即注册