找回密码
 立即注册
首页 业界区 业界 Spring Cloud Gateway实现分布式限流和熔断降级 ...

Spring Cloud Gateway实现分布式限流和熔断降级

郏琼芳 2025-6-15 14:06:40
小伙伴们,你们好呀!我是老寇!一起学习学习gateway限流和熔断降级
一、限流

思考:为啥需要限流?
在一个流量特别大的业务场景中,如果不进行限流,会造成系统宕机,当大批量的请求到达后端服务时,会造成资源耗尽【CPU、内存、线程、网络带宽、数据库连接等是有限的】,进而拖垮系统。
1.常见限流算法


  • 漏桶算法
  • 令牌桶算法
1.1漏桶算法(不推荐)


1.1.1.原理

将请求缓存到一个队列中,然后以固定的速度处理,从而达到限流的目的
1.1.2.实现

将请求装到一个桶中,桶的容量为固定的一个值,当桶装满之后,就会将请求丢弃掉,桶底部有一个洞,以固定的速率流出。
1.1.3.举例

桶的容量为1W,有10W并发请求,最多只能将1W请求放入桶中,其余请求全部丢弃,以固定的速度处理请求
1.1.4.缺点

处理突发流量效率低(处理请求的速度不变,效率很低)
1.2.令牌桶算法(推荐)


1.2.1.原理

将请求放在一个缓冲队列中,拿到令牌后才能进行处理
1.2.2.实现

装令牌的桶大小固定,当令牌装满后,则不能将令牌放入其中;每次请求都会到桶中拿取一个令牌才能放行,没有令牌时即丢弃请求/继续放入缓存队列中等待
1.2.3.举例

桶的容量为10w个,生产1w个/s,有10W的并发请求,以每秒10W个/s速度处理,随着桶中的令牌很快用完,速度又慢慢降下来啦,而生产令牌的速度趋于一致1w个/s
1.2.4.缺点

处理突发流量提供了系统性能,但是对系统造成了一定的压力,桶的大小不合理,甚至会压垮系统(处理1亿的并发请求,将桶的大小设置为1,这个系统一下就凉凉啦)
2.网关限流(Spring Cloud Gateway + Redis实战)

2.1.pom.xml配置
  1.         <dependency>
  2.             <groupId>org.springframework.boot</groupId>
  3.             spring-boot-starter-data-redis-reactive</artifactId>
  4.         </dependency>
  5.         <dependency>
  6.             <groupId>org.springframework.cloud</groupId>
  7.             spring-cloud-starter-gateway</artifactId>
  8.             <exclusions>
  9.                 <exclusion>
  10.                     <groupId>org.springframework.boot</groupId>
  11.                     spring-boot-starter-web</artifactId>
  12.                 </exclusion>
  13.             </exclusions>
  14.         </dependency>
  15.     <dependency>
  16.         <groupId>org.apache.httpcomponents</groupId>
  17.         httpclient</artifactId>
  18.     </dependency>
复制代码
2.2.yaml配置
  1. spring:
  2.   application:
  3.     name: laokou-gateway
  4.   cloud:
  5.     gateway:
  6.       routes:
  7.         - id: LAOKOU-SSO-DEMO
  8.           uri: lb://laokou-sso-demo
  9.           predicates:
  10.           - Path=/sso/**
  11.           filters:
  12.           - StripPrefix=1
  13.           - name: RequestRateLimiter #请求数限流,名字不能乱打
  14.             args:
  15.               key-resolver: "#{@ipKeyResolver}"
  16.               redis-rate-limiter.replenishRate: 1 #生成令牌速率-设为1方便测试
  17.               redis-rate-limiter.burstCapacity: 1 #令牌桶容量-设置1方便测试
  18.   redis:
  19.     database: 0
  20.     cluster:
  21.       nodes: x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005,x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005
  22.     password: laokou #密码
  23.     timeout: 6000ms #连接超时时长(毫秒)
  24.     jedis:
  25.       pool:
  26.         max-active: -1 #连接池最大连接数(使用负值表示无极限)
  27.         max-wait: -1ms #连接池最大阻塞等待时间(使用负值表示没有限制)
  28.         max-idle: 10 #连接池最大空闲连接
  29.         min-idle: 5 #连接池最小空间连接
复制代码
2.3.创建bean
  1. @Configuration
  2. public class RequestRateLimiterConfig {
  3.     @Bean(value = "ipKeyResolver")
  4.     public KeyResolver ipKeyResolver(RemoteAddressResolver remoteAddressResolver) {
  5.             return exchange -> Mono.just(remoteAddressResolver.resolve(exchange).getAddress().getHostAddress());
  6.     }
  7.     @Bean
  8.     public RemoteAddressResolver remoteAddressResolver() {
  9.             // 远程地址解析器
  10.             return XForwardedRemoteAddressResolver.trustAll();
  11.     }
  12. }
复制代码
3.测试限流(编写java并发测试)
  1. @Slf4j
  2. public class HttpUtil {
  3. public static void apiConcurrent(String url,Map<String,String> params) {
  4.         Integer count = 200;
  5.         //创建线程池
  6.         ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.SECONDS, new SynchronousQueue<>());
  7.         //同步工具
  8.         CountDownLatch latch = new CountDownLatch(count);
  9.         Map<String,String> dataMap = new HashMap<>(1);
  10.         dataMap.put("authorize","XXXXXXX");
  11.         for (int i = 0; i < count; i++) {
  12.             pool.execute(() -> {
  13.                 try {
  14.                     //访问网关的API接口
  15.                     HttpUtil.doGet("http://localhost:1234/sso/laokou-demo/user",dataMap);
  16.                 } catch (IOException e) {
  17.                     e.printStackTrace();
  18.                 }finally {
  19.                     latch.countDown();
  20.                 }
  21.             });
  22.         }
  23.         try {
  24.             latch.await();
  25.         } catch (InterruptedException e) {
  26.             e.printStackTrace();
  27.         }
  28.     }
  29. public static String doGet(String url, Map<String, String> params) throws IOException {
  30.         //创建HttpClient对象
  31.         CloseableHttpClient httpClient = HttpClients.createDefault();
  32.         String resultString = "";
  33.         CloseableHttpResponse response = null;
  34.         try {
  35.             //创建uri
  36.             URIBuilder builder = new URIBuilder(url);
  37.             if (!params.isEmpty()) {
  38.                 for (Map.Entry<String, String> entry : params.entrySet()) {
  39.                     builder.addParameter(entry.getKey(), entry.getValue());
  40.                 }
  41.             }
  42.             URI uri = builder.build();
  43.             //创建http GET请求
  44.             HttpGet httpGet = new HttpGet(uri);
  45.             List<NameValuePair> paramList = new ArrayList<>();
  46.             RequestBuilder requestBuilder = RequestBuilder.get().setUri(new URI(url));
  47.             requestBuilder.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));
  48.             httpGet.setHeader(new BasicHeader("Content-Type", "application/json;charset=UTF-8"));
  49.             httpGet.setHeader(new BasicHeader("Accept", "*/*;charset=utf-8"));
  50.             //执行请求
  51.             response = httpClient.execute(httpGet);
  52.             //判断返回状态是否是200
  53.             if (response.getStatusLine().getStatusCode() == 200) {
  54.                 resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
  55.             }
  56.         } catch (Exception e) {
  57.             log.info("调用失败:{}",e);
  58.         } finally {
  59.             if (response != null) {
  60.                 response.close();
  61.             }
  62.             httpClient.close();
  63.         }
  64.         log.info("打印:{}",resultString);
  65.         return resultString;
  66.     }
  67. }
复制代码


说明这个网关限流配置是没有问题的
4.源码查看

Spring Cloud Gateway RequestRateLimiter GatewayFilter Factory文档地址
工厂 RequestRateLimiter GatewayFilter使用一个RateLimiter实现来判断当前请求是否被允许继续。如果不允许,HTTP 429 - Too Many Requests则返回默认状态。
4.1.查看 RequestRateLimiterGatewayFilterFactory
  1.         @Override
  2.         public GatewayFilter apply(Config config) {
  3.                 KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
  4.                 RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
  5.                 boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
  6.                 HttpStatusHolder emptyKeyStatus = HttpStatusHolder
  7.                                 .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
  8.                 return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
  9.                         if (EMPTY_KEY.equals(key)) {
  10.                                 if (denyEmpty) {
  11.                                         setResponseStatus(exchange, emptyKeyStatus);
  12.                                         return exchange.getResponse().setComplete();
  13.                                 }
  14.                                 return chain.filter(exchange);
  15.                         }
  16.                         String routeId = config.getRouteId();
  17.                         if (routeId == null) {
  18.                                 Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
  19.                                 routeId = route.getId();
  20.                         }
  21.                  // 执行限流
  22.                         return limiter.isAllowed(routeId, key).flatMap(response -> {
  23.                                 for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
  24.                                         exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
  25.                                 }
  26.                                 if (response.isAllowed()) {
  27.                                         return chain.filter(exchange);
  28.                                 }
  29.                                 setResponseStatus(exchange, config.getStatusCode());
  30.                                 return exchange.getResponse().setComplete();
  31.                         });
  32.                 });
  33.         }
复制代码
4.2.查看 RedisRateLimiter
  1.         @Override
  2.         @SuppressWarnings("unchecked")
  3.         public Mono<Response> isAllowed(String routeId, String id) {
  4.                 if (!this.initialized.get()) {
  5.                         throw new IllegalStateException("RedisRateLimiter is not initialized");
  6.                 }
  7.         // 这里如何加载配置?请思考
  8.                 Config routeConfig = loadConfiguration(routeId);
  9.         // 令牌桶每秒产生令牌数量
  10.                 int replenishRate = routeConfig.getReplenishRate();
  11.         // 令牌桶容量
  12.                 int burstCapacity = routeConfig.getBurstCapacity();
  13.         // 请求消耗的令牌数
  14.                 int requestedTokens = routeConfig.getRequestedTokens();
  15.                 try {
  16.                   // 键
  17.                         List<String> keys = getKeys(id);
  18.                   // 参数
  19.                         List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", "", requestedTokens + "");
  20.                         // 调用lua脚本
  21.                         Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
  22.                         return flux.onErrorResume(throwable -> {
  23.                                 if (log.isDebugEnabled()) {
  24.                                         log.debug("Error calling rate limiter lua", throwable);
  25.                                 }
  26.                                 return Flux.just(Arrays.asList(1L, -1L));
  27.                         }).reduce(new ArrayList<Long>(), (longs, l) -> {
  28.                                 longs.addAll(l);
  29.                                 return longs;
  30.                         }).map(results -> {
  31.                           // 判断是否等于1,1表示允许通过,0表示不允许通过
  32.                                 boolean allowed = results.get(0) == 1L;
  33.                                 Long tokensLeft = results.get(1);
  34.                                 Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
  35.                                 if (log.isDebugEnabled()) {
  36.                                         log.debug("response: " + response);
  37.                                 }
  38.                                 return response;
  39.                         });
  40.                 }
  41.                 catch (Exception e) {
  42.                         log.error("Error determining if user allowed from redis", e);
  43.                 }
  44.                 return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
  45.         }
  46.         static List<String> getKeys(String id) {
  47.                 String prefix = "request_rate_limiter.{" + id;
  48.                 String tokenKey = prefix + "}.tokens";
  49.                 String timestampKey = prefix + "}.timestamp";
  50.                 return Arrays.asList(tokenKey, timestampKey);
  51.         }
复制代码
思考:redis限流配置是如何加载?
其实就是监听动态路由的事件并把配置存起来

4.3.重点来了,令牌桶 /META-INF/scripts/request_rate_limiter.lua 脚本剖析
  1. -- User Request Rate Limiter filter
  2. -- See https://stripe.com/blog/rate-limiters
  3. -- See https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L11-L34
  4. -- 令牌桶算法工作原理
  5. -- 1.系统以恒定速率往桶里面放入令牌
  6. -- 2.请求需要被处理,则需要从桶里面获取一个令牌
  7. -- 3.如果桶里面没有令牌可获取,则可以选择等待或直接拒绝并返回
  8. -- 令牌桶算法工作流程
  9. -- 1.计算填满令牌桶所需要的时间(填充时间 = 桶容量 / 速率)
  10. -- 2.设置存储数据的TTL(过期时间),为填充时间的两倍(存储时间 = 填充时间 * 2)
  11. -- 3.从Redis获取当前令牌的剩余数量和上一次调用的时间戳
  12. -- 4.计算距离上一次调用的时间间隔(时间间隔 = 当前时间 - 上一次调用时间)
  13. -- 5.计算填充的令牌数量(填充令牌数量 = 时间间隔 * 速率)【前提:桶容量是固定的,不存在无限制的填充】
  14. -- 6.判断是否有足够多的令牌满足请求【 (填充令牌数量 + 剩余令牌数量) >= 请求数量 && (填充令牌数量 + 剩余令牌数量) <= 桶容量 】
  15. -- 7.如果请求被允许,则从桶里面取出相应数据的令牌
  16. -- 8.如果TTL为正,则更新Redis键中的令牌和时间戳
  17. -- 9.返回两个两个参数(allowed_num:请求被允许标志。1允许,0不允许)、(new_tokens:填充令牌后剩余的令牌数据)
  18. -- 随机写入
  19. redis.replicate_commands()
  20. -- 令牌桶Key -> 存储当前可用令牌的数量(剩余令牌数量)
  21. local tokens_key = KEYS[1]
  22. -- 时间戳Key -> 存储上次令牌刷新的时间戳
  23. local timestamp_key = KEYS[2]
  24. -- 令牌填充速率
  25. local rate = tonumber(ARGV[1])
  26. -- 令牌桶容量
  27. local capacity = tonumber(ARGV[2])
  28. -- 当前时间
  29. local now = tonumber(ARGV[3])
  30. -- 请求数量
  31. local requested = tonumber(ARGV[4])
  32. -- 填满令牌桶所需要的时间
  33. local fill_time = capacity / rate
  34. -- 设置key的过期时间(填满令牌桶所需时间的2倍)
  35. local ttl = math.floor(fill_time * 2)
  36. -- 判断当前时间,为空则从redis获取
  37. if now == nil then
  38.     now = redis.call('TIME')[1]
  39. end
  40. -- 获取当前令牌的剩余数量
  41. local last_tokens = tonumber(redis.call("get", tokens_key))
  42. if last_tokens == nil then
  43.     last_tokens = capacity
  44. end
  45. -- 获取上一次调用的时间戳
  46. local last_refreshed = tonumber(redis.call('get', timestamp_key))
  47. if last_refreshed == nil then
  48.     last_refreshed = 0
  49. end
  50. -- 计算距离上一次调用的时间间隔
  51. local delta = math.max(0, now - last_refreshed)
  52. -- 当前的令牌数量(剩余 + 填充 <= 桶容量)
  53. local now_tokens = math.min(capacity, last_refreshed + (rate * delta))
  54. -- 判断是否有足够多的令牌满足请求
  55. local allowed = now_tokens >= requested
  56. -- 定义当前令牌的剩余数量
  57. local new_tokens = now_tokens
  58. -- 定义被允许标志
  59. local allowed_num = 0
  60. if allowed then
  61.     new_tokens = now_tokens - requested
  62.     -- 允许访问
  63.     allowed_num = 1
  64. end
  65. -- ttl > 0,将当前令牌的剩余数量和当前时间戳存入redis
  66. if ttl > 0 then
  67.     redis.call('setex', tokens_key, ttl, new_tokens)
  68.     redis.call('setex', timestamp_key, ttl, now)
  69. end
  70. -- 返回参数
  71. return { allowed_num, new_tokens }
复制代码
4.4.查看 GatewayRedisAutoConfiguration 脚本初始化
  1.         @Bean
  2.         @SuppressWarnings("unchecked")
  3.         public RedisScript redisRequestRateLimiterScript() {
  4.                 DefaultRedisScript redisScript = new DefaultRedisScript<>();
  5.                 redisScript.setScriptSource(
  6.                           // 根据指定路径获取lua脚本来初始化配置
  7.                                 new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
  8.                 redisScript.setResultType(List.class);
  9.                 return redisScript;
  10.         }
  11.         @Bean
  12.         @ConditionalOnMissingBean
  13.         public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
  14.                         @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
  15.                         ConfigurationService configurationService) {
  16.                 return new RedisRateLimiter(redisTemplate, redisScript, configurationService);
  17.         }
复制代码
思考:请求限流过滤器是如何开启?
1.通过yaml配置开启
  1. spring:
  2.   cloud:
  3.     gateway:
  4.       server:
  5.         webflux:
  6.           filter:
  7.             request-rate-limiter:
  8.               enabled: true
复制代码
2.GatewayAutoConfiguration自动注入bean
  1. @Bean
  2. @ConditionalOnBean({ RateLimiter.class, KeyResolver.class })
  3. @ConditionalOnEnabledFilter
  4. public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory(RateLimiter rateLimiter,
  5.        KeyResolver resolver) {
  6.     return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver);
  7. }
复制代码
重点来了,真正加载这个bean的是 @ConditionalOnEnabledFilter 注解进行判断
  1. @Retention(RetentionPolicy.RUNTIME)
  2. @Target({ ElementType.TYPE, ElementType.METHOD })
  3. @Documented
  4. @Conditional(OnEnabledFilter.class)
  5. public @interface ConditionalOnEnabledFilter {
  6.     // 这里value是用来指定满足条件的某些类,换一句话说,就是这些类都加载或注入到ioc容器,这个注解修饰的自动装配类才会生效
  7.     Class<? extends GatewayFilterFactory<?>> value() default OnEnabledFilter.DefaultValue.class;
  8. }
复制代码
我们继续跟进代码,查看@Conditional(OnEnabledFilter.class)
众所周知,@Conditional可以用来加载满足条件的bean,所以,我们分析一下OnEnabledFilter
  1. public class OnEnabledFilter extends OnEnabledComponent<GatewayFilterFactory<?>> {}
复制代码
二、熔断降级

思考:为什么需要熔断降级?
当某个服务发生故障时(超时,响应慢,宕机),上游服务无法及时获取响应,进而也导致故障,出现服务雪崩【服务雪崩是指故障像滚雪球一样沿着调用链向上游扩展,进而导致整个系统瘫痪】
熔断降级的目标就是在故障发生时,快速隔离问题服务【快速失败,防止资源耗尽】,保护系统资源不被耗尽,防止故障扩散,保护核心业务可用性。
1.技术选型

1.1.熔断降级框架选型对比表

对比维度Hystrix (Netflix)Sentinel (Alibaba)Resilience4j当前状态❌ 停止更新 (维护模式)✅ 持续更新✅ 持续更新熔断机制滑动窗口计数响应时间/异常比例/QPS错误率/响应时间阈值流量控制❌ 仅基础隔离✅ QPS/并发数/热点参数/集群流控✅ RateLimiter隔离策略线程池(开销大)/信号量并发线程数(无线程池开销)信号量/Bulkhead降级能力Fallback 方法Fallback + 系统规则自适应Fallback + 自定义组合策略实时监控✅ Hystrix Dashboard✅ 原生控制台(可视化动态规则)❌ 需整合 Prometheus/Grafana动态配置❌ 依赖 Archaius✅ 控制台实时推送✅ 需编码实现(如Spring Cloud Config)生态集成✅ Spring Cloud Netflix✅ Spring Cloud Alibaba/多语言网关✅ Spring Boot/响应式编程性能开销高(线程池隔离)低(无额外线程)极低(纯函数式)适用场景遗留系统维护高并发控制/秒杀/热点防护云原生/轻量级微服务推荐指数⭐⭐ (不推荐新项目)⭐⭐⭐⭐⭐ (Java高并发首选)⭐⭐⭐⭐⭐ (云原生/响应式首选)1.2选型决策指南

需求场景推荐方案原因电商秒杀/API高频调用管控✅ Sentinel精细流量控制+热点防护+实时看板Kubernetes云原生微服务✅ Resilience4j轻量化+无缝集成Prometheus+响应式支持Spring Cloud Netflix旧系统⚠️ Hystrix兼容现存代码(短期过渡)多语言混合架构(如Go+Java)✅ Sentinel通过Sidecar代理支持非Java服务响应式编程(WebFlux)✅ Resilience4j原生Reactive API支持2.Resilience4j使用

Resilience4j官方文档
Resilience4j 可以看作是 Hystrix 的替代品,Resilience4j支持 熔断器和单机限流
Resilience4j 是一个专为函数式编程设计的轻量级容错库。Resilience4j 提供高阶函数(装饰器),可通过断路器、速率限制器、重试或隔离功能增强任何函数式接口、lambda 表达式或方法引用。您可以在任何函数式接口、lambda 表达式或方法引用上堆叠多个装饰器。这样做的好处是,您可以只选择所需的装饰器,而无需考虑其他因素。
2.1.网关熔断降级(Spring Cloud Gateway + Resilience4j实战)

2.1.1.pom依赖
  1. public abstract class OnEnabledComponent<T> extends SpringBootCondition implements ConfigurationCondition {
  2.     private static final String PREFIX = "spring.cloud.gateway.server.webflux.";
  3.     private static final String SUFFIX = ".enabled";
  4.     private ConditionOutcome determineOutcome(Class<? extends T> componentClass, PropertyResolver resolver) {
  5.        // 拼接完整名称
  6.        // 例如 => spring.cloud.gateway.server.webflux.request-rate-limiter.enabled
  7.        String key = PREFIX + normalizeComponentName(componentClass) + SUFFIX;
  8.        ConditionMessage.Builder messageBuilder = forCondition(annotationClass().getName(), componentClass.getName());
  9.        if ("false".equalsIgnoreCase(resolver.getProperty(key))) {
  10.           // 不满足条件不加载bean
  11.           return ConditionOutcome.noMatch(messageBuilder.because("bean is not available"));
  12.        }
  13.        // 满足条件加载bean
  14.        return ConditionOutcome.match();
  15.     }
  16. }
复制代码
2.1.2.yaml配置
  1. @Getter
  2. @ConfigurationProperties("spring.cloud.gateway.server.webflux.filter.request-rate-limiter")
  3. public class RequestRateLimiterGatewayFilterFactory
  4.        extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> {
  5.     private static final String EMPTY_KEY = "____EMPTY_KEY__";
  6.     private final RateLimiter<?> defaultRateLimiter;
  7.     private final KeyResolver defaultKeyResolver;
  8.     /**
  9.      * Switch to deny requests if the Key Resolver returns an empty key, defaults to true.
  10.      */
  11.     @Setter
  12.     private boolean denyEmptyKey = true;
  13.     /** HttpStatus to return when denyEmptyKey is true, defaults to FORBIDDEN. */
  14.     @Setter
  15.     private String emptyKeyStatusCode = HttpStatus.FORBIDDEN.name();
  16.     public RequestRateLimiterGatewayFilterFactory(RateLimiter<?> defaultRateLimiter, KeyResolver defaultKeyResolver) {
  17.        super(Config.class);
  18.        this.defaultRateLimiter = defaultRateLimiter;
  19.        this.defaultKeyResolver = defaultKeyResolver;
  20.     }
  21.     @Override
  22.     public GatewayFilter apply(Config config) {
  23.        KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
  24.        RateLimiter<?> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
  25.        boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
  26.        HttpStatusHolder emptyKeyStatus = HttpStatusHolder
  27.           .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
  28.        return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
  29.           if (EMPTY_KEY.equals(key)) {
  30.              if (denyEmpty) {
  31.                 setResponseStatus(exchange, emptyKeyStatus);
  32.                 return exchange.getResponse().setComplete();
  33.              }
  34.              return chain.filter(exchange);
  35.           }
  36.           String routeId = config.getRouteId();
  37.           if (routeId == null) {
  38.              Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
  39.              Assert.notNull(route, "Route is null");
  40.              routeId = route.getId();
  41.           }
  42.           return limiter.isAllowed(routeId, key).flatMap(response -> {
  43.              for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
  44.                 exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
  45.              }
  46.              if (response.isAllowed()) {
  47.                 return chain.filter(exchange);
  48.              }
  49.              // 主要修改这行
  50.              return responseOk(exchange, Result.fail("Too_Many_Requests", "请求太频繁"));
  51.           });
  52.        });
  53.     }
  54.    
  55.     private Mono<Void> responseOk(ServerWebExchange exchange, Object data) {
  56.         return responseOk(exchange, JacksonUtils.toJsonStr(data), MediaType.APPLICATION_JSON);
  57.     }
  58.     private Mono<Void> responseOk(ServerWebExchange exchange, String str, MediaType contentType) {
  59.         DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(str.getBytes(StandardCharsets.UTF_8));
  60.         ServerHttpResponse response = exchange.getResponse();
  61.         response.setStatusCode(HttpStatus.OK);
  62.         response.getHeaders().setContentType(contentType);
  63.         response.getHeaders().setContentLength(str.getBytes(StandardCharsets.UTF_8).length);
  64.         return response.writeWith(Flux.just(buffer));
  65.     }
  66.     private <T> T getOrDefault(T configValue, T defaultValue) {
  67.        return (configValue != null) ? configValue : defaultValue;
  68.     }
  69.     public static class Config implements HasRouteId {
  70.        @Getter
  71.        private KeyResolver keyResolver;
  72.        @Getter
  73.        private RateLimiter<?> rateLimiter;
  74.        @Getter
  75.        private HttpStatus statusCode = HttpStatus.TOO_MANY_REQUESTS;
  76.        @Getter
  77.        private Boolean denyEmptyKey;
  78.        @Getter
  79.        private String emptyKeyStatus;
  80.        private String routeId;
  81.        public Config setKeyResolver(KeyResolver keyResolver) {
  82.           this.keyResolver = keyResolver;
  83.           return this;
  84.        }
  85.        public Config setRateLimiter(RateLimiter<?> rateLimiter) {
  86.           this.rateLimiter = rateLimiter;
  87.           return this;
  88.        }
  89.        public Config setStatusCode(HttpStatus statusCode) {
  90.           this.statusCode = statusCode;
  91.           return this;
  92.        }
  93.        public Config setDenyEmptyKey(Boolean denyEmptyKey) {
  94.           this.denyEmptyKey = denyEmptyKey;
  95.           return this;
  96.        }
  97.        public Config setEmptyKeyStatus(String emptyKeyStatus) {
  98.           this.emptyKeyStatus = emptyKeyStatus;
  99.           return this;
  100.        }
  101.        @Override
  102.        public void setRouteId(String routeId) {
  103.           this.routeId = routeId;
  104.        }
  105.        @Override
  106.        public String getRouteId() {
  107.           return this.routeId;
  108.        }
  109.     }
  110. }
复制代码
2.1.3.CircuitBreakerConfig配置
  1. <dependency>
  2.   <groupId>org.springframework.cloud</groupId>
  3.   spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
  4. </dependency>
复制代码
我是老寇,我们下次再见啦!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册