Sentinel源码—4.FlowSlot实现流控的原理
大纲1.FlowSlot根据流控规则对请求进行限流
2.FlowSlot实现流控规则的快速失败效果的原理
3.FlowSlot实现流控规则中排队等待效果的原理
4.FlowSlot实现流控规则中Warm Up效果的原理
1.FlowSlot根据流控规则对请求进行限流
(1)流控规则FlowRule的配置Demo
(2)注册流控监听器和加载流控规则
(3)FlowSlot根据流控规则对请求进行限流
(1)流控规则FlowRule的配置Demo
从下图可知,流控规则包含以下属性:
一.规则id、资源名、针对来源
这三个属性是所有规则共有的属性,会分别封装到AbstractRule的id、resource、limitApp字段中,各个具体的规则子类都会继承AbstractRule类。
二.阈值类型
阈值类型包括QPS和并发线程数两个选项,对应FlowRule中的字段为grade。
三.单机阈值
单机阈值也就是限流阈值。无论是基于QPS还是并发线程数,都要设置限流阈值,对应FlowRule中的字段为count。
四.是否集群
是否集群是一个boolean类型的字段,对应FlowRule中的字段为clusterMode。true表示开启集群模式,false表示单机模式;
五.流控模式
流控模式有三种:直接模式、关联模式和链路模式,对应FlowRule中的字段为strategy。
六.关联资源
当流控模式选择为关联时,此值含义是关联资源,当流控模式选择为链路时,此值含义是入口资源。所以仅当流控模式选择关联和链路时,才对应FlowRule中的字段为refResource。
七.流控效果
流控效果有三种:快速失败、Warm Up、排队等待。还有一个选项未在页面上显示,即Warm Up和排队等待的结合体。也就是Warm Up + 排队等待,对应FlowRule中的字段为controlBehavior。
八.流控控制器
由于流控有多种模式,而每种模式都会对应一个模式流量整形控制器。所以流控规则FlowRule中会有一个字段TrafficShapingController,用来实现不同流控模式下的不同流控效果。
九.预热时长
此选项仅在流控效果选择Warm Up时出现,表示Warm Up的时长,对应FlowRule中的字段为warmUpPeriodSec。
十.超时时间
此选项仅在流控效果选择排队等待时出现,表示超出流控阈值后,排队等待多久才抛出异常,对应FlowRule中的字段为maxQueueingTimeMs。
public abstract class AbstractRule implements Rule {
//rule id. 规则id
private Long id;
//Resource name. 资源名称
private String resource;
//针对来源,默认是default
//多个来源使用逗号隔开,比如黑名单规则,限制userId是1和3的访问,那么就设置limitApp为"1,3"
//Application name that will be limited by origin.
//The default limitApp is {@code default}, which means allowing all origin apps.
//For authority rules, multiple origin name can be separated with comma (',').
private String limitApp;
...
}
//规则id、资源名(resource)、针对来源(limitApp),这三个字段在父类AbstractRule里
//Each flow rule is mainly composed of three factors: grade, strategy and controlBehavior:
//The grade represents the threshold type of flow control (by QPS or thread count).
//The strategy represents the strategy based on invocation relation.
//The controlBehavior represents the QPS shaping behavior (actions on incoming request when QPS exceeds the threshold.
public class FlowRule extends AbstractRule {
public FlowRule() {
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
public FlowRule(String resourceName) {
super();
setResource(resourceName);
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
//The threshold type of flow control (0: thread count, 1: QPS).
//阈值类型:1代表QPS;0代表并发线程数
private int grade = RuleConstant.FLOW_GRADE_QPS;
//Flow control threshold count.
//单机阈值:也就是限流数
private double count;
//Flow control strategy based on invocation chain.
//流控模式:0代表直接;1代表关联;2代表链路
//RuleConstant#STRATEGY_DIRECT for direct flow control (by origin);
//RuleConstant#STRATEGY_RELATE for relevant flow control (with relevant resource);
//RuleConstant#STRATEGY_CHAIN for chain flow control (by entrance resource).
private int strategy = RuleConstant.STRATEGY_DIRECT;
//关联资源,当流控模式选择为关联时,此值含义是关联资源,当流控模式选择为链路时,此值含义是入口资源
//Reference resource in flow control with relevant resource or context.
private String refResource;
//Rate limiter control behavior.
//0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter
//流控效果:0代表快速失败, 1代表Warm Up, 2代表排队等待, 3代表Warm Up + 排队等待
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
//预热时长:只有当流控效果选择为Warm Up时才会出现
private int warmUpPeriodSec = 10;
//Max queueing time in rate limiter behavior.
//超时时间:只有当流控效果选择排队等待时才会出现
private int maxQueueingTimeMs = 500;
//是否集群:默认false表示单机
private boolean clusterMode;
//Flow rule config for cluster mode.
//集群配置
private ClusterFlowConfig clusterConfig;
//The traffic shaping (throttling) controller.
//流量整形控制器:实现[流控效果]的四种不同模式
private TrafficShapingController controller;
...
}
public class FlowQpsDemo {
private static final String KEY = "abc";
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 32;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
//初始化QPS的流控规则
initFlowQpsRule();
//启动线程定时输出信息
tick();
//first make the system run on a very low condition
//模拟QPS为32时的访问场景
simulateTraffic();
System.out.println("===== begin to do flow control");
System.out.println("only 20 requests per second can pass");
}
private static void initFlowQpsRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
//设置QPS的限制为20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
//加载流控规则
FlowRuleManager.loadRules(rules);
}
private static void simulateTraffic() {
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
}
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + " send qps is: " + oneSecondTotal);
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get() + ", block:" + block.get());
System.exit(0);
}
}
static class RunTask implements Runnable {
@Override
public void run() {
while (!stop) {
Entry entry = null;
try {
//调用entry()方法开始规则验证
entry = SphU.entry(KEY);
//token acquired, means pass
pass.addAndGet(1);
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
//biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
//完成规则验证调用exit()方法
entry.exit();
}
}
Random random2 = new Random();
try {
TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
} catch (InterruptedException e) {
//ignore
}
}
}
}
}(2)流量整形控制器DefaultController执行分析
流控效果为快速失败时,会调用DefaultController的canPass()方法。
问题一:其中的入参Node是如何选择的
Node的选择主要依赖于三个参数:FlowRule的limitApp、FlowRule的strategy、Context的origin,这三个参数分别表示流控规则的针对来源、流控模式和当前请求的来源。
情形一:如果limitApp和origin相等并且limitApp不是默认值default
//One resources can have multiple rules.
//And these rules take effects in the following order:
//requests from specified caller
//no specified caller
public class FlowRuleManager {
//维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则
private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();
//饿汉式单例模式实例化流控规则的监听器对象
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
//监听器对象的管理器
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
...
static {
//将流控规则监听器注册到监听器管理器中
currentProperty.addListener(LISTENER);
startMetricTimerListener();
}
//Load FlowRules, former rules will be replaced.
//加载流控规则
public static void loadRules(List<FlowRule> rules) {
//通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置
//其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRules
currentProperty.updateValue(rules);
}
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public synchronized void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules = rules;
}
RecordLog.info(" Flow rules received: {}", rules);
}
@Override
public synchronized void configLoad(List<FlowRule> conf) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules = rules;
}
RecordLog.info(" Flow rules loaded: {}", rules);
}
}
...
}
public final class FlowRuleUtil {
private static final Function<FlowRule, String> extractResource = new Function<FlowRule, String>() {
@Override
public String apply(FlowRule rule) {
return rule.getResource();
}
};
...
//Build the flow rule map from raw list of flow rules, grouping by resource name.
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list) {
return buildFlowRuleMap(list, null);
}
//Build the flow rule map from raw list of flow rules, grouping by resource name.
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter) {
return buildFlowRuleMap(list, filter, true);
}
//Build the flow rule map from raw list of flow rules, grouping by resource name.
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter, boolean shouldSort) {
return buildFlowRuleMap(list, extractResource, filter, shouldSort);
}
//Build the flow rule map from raw list of flow rules, grouping by provided group function.
public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction, Predicate<FlowRule> filter, boolean shouldSort) {
Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();
for (FlowRule rule : list) {
...
//获取[流控效果]流量整形控制器TrafficShapingController
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);
//获取资源名
K key = groupFunction.apply(rule);
if (key == null) {
continue;
}
//获取资源名对应的流控规则列表
Set<FlowRule> flowRules = tmpMap.get(key);
//将规则放到Map里,和当前资源绑定
if (flowRules == null) {
//Use hash set here to remove duplicate rules.
flowRules = new HashSet<>();
tmpMap.put(key, flowRules);
}
flowRules.add(rule);
}
Comparator<FlowRule> comparator = new FlowRuleComparator();
for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
List<FlowRule> rules = new ArrayList<>(entries.getValue());
if (shouldSort) {
//Sort the rules.
Collections.sort(rules, comparator);
}
newRuleMap.put(entries.getKey(), rules);
}
return newRuleMap;
}
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
//判断只有当阈值类型为QPS时才生效
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//根据流控效果选择不同的流量整形控制器TrafficShapingController
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
//Warm Up预热模式——冷启动模式
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
//排队等待模式
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
//Warm Up + 排队等待模式
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
//快速失败模式——Default默认模式
default:
//Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
//默认模式:快速失败用的是DefaultController
return new DefaultController(rule.getCount(), rule.getGrade());
}
...
}情形二:limitApp值为默认的default
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
//Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
public FlowSlot() {
this(new FlowRuleChecker());
}
FlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
//检查流控规则,count默认是1
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
//调用规则检查器FlowRuleChecker的checkFlow()方法进行检查,count默认是1
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
//Rule checker for flow control rules.
public class FlowRuleChecker {
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//从Map中获取resource资源对应的流控规则列表
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
//循环遍历每一个流控规则
for (FlowRule rule : rules) {
//调用canPassCheck方法进行流控规则验证,判断此次请求是否命中针对resource资源配置的流控规则
//传入的参数count默认是1
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) {
return canPassCheck(rule, context, node, acquireCount, false);
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//参数校验
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
//如果是集群模式,则执行passClusterCheck()方法
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
//如果是单机模式,则执行passLocalCheck()方法,acquireCount默认是1
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//选择Node作为限流计算的依据
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//先通过FlowRule.getRater()方法获取流控规则对应的流量整形控制器
//然后调用TrafficShapingController.canPass()方法对请求进行检查,acquireCount默认是1
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
//选择Node作为限流计算的依据
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
//The limit app should not be empty.
//获取流控规则中配置的"针对来源",默认是default
String limitApp = rule.getLimitApp();
//获取流控规则中配置的"流控模式",0代表直接,1代表关联,2代表链路
int strategy = rule.getStrategy();
//从context对象中获取当前请求的来源
String origin = context.getOrigin();
//情形一:当流控规则的针对来源(limitApp)与当前请求的来源(origin)相同时
//这种情况表示该限流规则针对特定来源进行限流
//如果配置了针对app1进行限流,那么app2就不会生效,这就是针对特定来源进行限流
if (limitApp.equals(origin) && filterOrigin(origin)) {
//如果流控规则中配置的"流控模式"是直接(RuleConstant.STRATEGY_DIRECT),则返回上下文中的Origin Node
//因为这种模式要求根据调用方的情况进行限流,而Origin Node包含了调用方的统计信息,所以选择Origin Node作为限流计算的依据
if (strategy == RuleConstant.STRATEGY_DIRECT) {
//Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
//如果流控规则中配置的"流控模式"是关联、链路(RuleConstant.STRATEGY_RELATE或RuleConstant.STRATEGY_CHAIN),则调用selectReferenceNode()方法
//此方法会判断:
//如果"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode
//如果"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNode
return selectReferenceNode(rule, context, node);
}
//情况二:当流控规则的针对来源(limitApp)是默认值(RuleConstant.LIMIT_APP_DEFAULT)时
//这种情况表示该流控规则对所有来源都生效
else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
//如果流控规则中配置的"流控模式"是直接(RuleConstant.STRATEGY_DIRECT),则返回当前请求的集群节点ClusterNode
//因为此时要求的是根据被调用资源的情况进行限流,而集群节点包含了被调用资源的统计信息,所以选择集群节点作为限流计算的依据
if (strategy == RuleConstant.STRATEGY_DIRECT) {
//Return the cluster node.
return node.getClusterNode();
}
//如果流控规则中配置的"流控模式"是关联、链路(RuleConstant.STRATEGY_RELATE或RuleConstant.STRATEGY_CHAIN),则调用selectReferenceNode()方法
//此方法会判断:
//如果"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode
//如果"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNode
return selectReferenceNode(rule, context, node);
}
//情况三:当流控规则的针对来源(limitApp)是其他(RuleConstant.LIMIT_APP_OTHER),且当前请求的来源(origin)与流控规则的资源名(rule.getResource())不同时
//这种情况表示该流控规则针对除默认来源以外的其他来源进行限流,可实现个性化限流
//比如可以对app1进行个性化限流,对其他所有app进行整体限流
else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
//如果流控规则中配置的"流控模式"是直接(RuleConstant.STRATEGY_DIRECT),则返回上下文中的来源节点(Origin Node)
//因为这种"流控模式"要求根据调用方的情况进行限流,而来源节点包含了调用方的统计信息,所以选择来源节点作为限流计算的依据
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
//如果流控规则中配置的"流控模式"是关联、链路(RuleConstant.STRATEGY_RELATE或RuleConstant.STRATEGY_CHAIN),则调用selectReferenceNode()方法
//此方法会判断:
//如果"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode
//如果"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNode
return selectReferenceNode(rule, context, node);
}
return null;
}
//如果流控规则中配置的"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode
//如果流控规则中配置的"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNode
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
if (strategy == RuleConstant.STRATEGY_RELATE) {
//关联资源的ClusterNode
return ClusterBuilderSlot.getClusterNode(refResource);
}
if (strategy == RuleConstant.STRATEGY_CHAIN) {
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
//No node.
return null;
}
private static boolean filterOrigin(String origin) {
// Origin cannot be `default` or `other`.
return !RuleConstant.LIMIT_APP_DEFAULT.equals(origin) && !RuleConstant.LIMIT_APP_OTHER.equals(origin);
}
...
}情形三:limitApp值为other且origin与FlowRule.getResource()不同
public class FlowRuleManager {
//维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则
private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();
//饿汉式单例模式实例化流控规则的监听器对象
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
//监听器对象的管理器
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
...
static {
//将流控规则监听器注册到监听器管理器中
currentProperty.addListener(LISTENER);
startMetricTimerListener();
}
//Load FlowRules, former rules will be replaced.
//加载流控规则
public static void loadRules(List<FlowRule> rules) {
//通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置
//其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRules
currentProperty.updateValue(rules);
}
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public synchronized void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules = rules;
}
RecordLog.info(" Flow rules received: {}", rules);
}
@Override
public synchronized void configLoad(List<FlowRule> conf) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules = rules;
}
RecordLog.info(" Flow rules loaded: {}", rules);
}
}
...
}
public final class FlowRuleUtil {
private static final Function<FlowRule, String> extractResource = new Function<FlowRule, String>() {
@Override
public String apply(FlowRule rule) {
return rule.getResource();
}
};
...
//Build the flow rule map from raw list of flow rules, grouping by provided group function.
public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction, Predicate<FlowRule> filter, boolean shouldSort) {
Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();
for (FlowRule rule : list) {
...
//获取[流控效果]流量整形控制器TrafficShapingController
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);
//获取资源名
K key = groupFunction.apply(rule);
if (key == null) {
continue;
}
//获取资源名对应的流控规则列表
Set<FlowRule> flowRules = tmpMap.get(key);
//将规则放到Map里,和当前资源绑定
if (flowRules == null) {
// Use hash set here to remove duplicate rules.
flowRules = new HashSet<>();
tmpMap.put(key, flowRules);
}
flowRules.add(rule);
}
Comparator<FlowRule> comparator = new FlowRuleComparator();
for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
List<FlowRule> rules = new ArrayList<>(entries.getValue());
if (shouldSort) {
//Sort the rules.
Collections.sort(rules, comparator);
}
newRuleMap.put(entries.getKey(), rules);
}
return newRuleMap;
}
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
//判断只有当阈值类型为QPS时才生效
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//根据流控效果选择不同的流量整形控制器TrafficShapingController
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
//Warm Up预热模式——冷启动模式
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
//排队等待模式
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
//Warm Up + 排队等待模式
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
//快速失败模式——Default默认模式
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
//默认模式:快速失败用的是DefaultController
return new DefaultController(rule.getCount(), rule.getGrade());
}
...
}问题二:如何判断阈值类型是QPS还是线程
FlowRule规则类里有个名为grade的字段,代表着阈值类型。所以在初始化DefaultController时就会传入流控规则FlowRule的grade值,这样在DefaultController的avgUsedTokens()方法中,就可以根据grade字段的值判断出阈值类型是QPS还是线程数了。
问题三:其中的入参prioritized的作用是什么
DefaultController的canPass()的入参prioritized表示是否对请求设置优先级。当prioritized为true时,表示该请求具有较高优先级的请求。当prioritized为false时,表示该请求是普通请求。而高优先级的请求需要尽量保证其可以通过限流检查。不过一般情况下,prioritized的值默认为false,除非手动指定为true,毕竟限流的目的是不想让超出的流量通过。
(3)流控模式中的关联模式和链路模式说明
一.关联模式
关联流控模式中,可以将两个资源进行关联。当某个资源的流量超限时,可以触发其他资源的流控规则。
比如用户下单购物时会涉及下单资源和支付资源,如果支付资源达到流控阈值,那么应该要同时禁止下单,也就是通过支付资源来关联到下单资源。
注意:如果采取关联模式,那么设置的QPS阈值是被关联者的,而非关联者的。在如下图示中配置了QPS的阈值为3,这是针对testPay资源设置的,而不是针对testOrder资源设置的。也就是testOrder被流控的时机就是当testPay的QPS达到3的时候,3并不是testOrder所访问的次数,而是testPay这个接口被访问的次数。
二.链路模式
一个资源可能会被多个业务链路调用,不同的业务链路需要进行不同的流控,这时就可以使用链路模式。
下图便为testTrace资源创建了一条链路模式的流控规则,规则为QPS限制到1 ,且链路入口资源为/trace/test2。
这意味着:/trace/test1链路可以随便访问testTrace资源,不受任何限制。/trace/test2链路访问testTrace资源时会限制QPS为1,超出限制被流控。
3.FlowSlot实现流控规则中排队等待效果的原理
(1)实现排队等待流控效果的普通漏桶算法介绍
(2)RateLimiterController如何实现排队等待效果
(3)RateLimiterController如何判断是否超出阈值
(4)RateLimiterController如何实现排队等待
(5)RateLimiterController实现的漏桶算法与普通漏桶算法的区别
(6)RateLimiterController流控存在的问题
(1)实现排队等待流控效果的普通漏桶算法介绍
一.漏桶算法的基本原理
漏桶算法的核心思想是以固定速率流出。
假设有一个水桶,水桶底上有几个孔洞。因为孔洞个数和孔洞大小是固定的,因此水从水桶流出的速度是固定的。那么就会有以下两种情况:
情况一:往水桶注水的速度小于等于水从孔洞流出的速度,那么水桶中将不会有剩余的水,因为"消费大于等于生产"。
情况二:往水桶注水的速度大于水从孔洞流出的速度,那么随着时间推移,水桶会被装满,水会溢出,因为"生产大于消费"。
在请求 / 响应场景中,水桶可以被理解为系统处理请求的能力。水对应成请求,水桶对应成一个有限的缓冲区(请求队列)用于存储请求。那么水桶的容量就代表了系统能够处理的最大并发请求数,当水桶满时(请求队列达到上限),新请求将被拒绝,从而实现流量控制。
二.漏桶算法的处理流程
步骤一:当新的请求到达时,会将新的请求放入缓冲区(请求队列)中,类似于往水桶里注水。
步骤二:系统会以固定的速度处理缓冲区中的请求,类似于水从窟窿中以固定的速度流出,比如开启一个后台线程定时以固定的速度从缓冲区中取出请求然后进行分发处理。
步骤三:如果缓冲区已满,则新的请求将被拒绝或丢弃,类似于水溢出。
三.漏桶算法的主要特点
特点一:固定速率
水从桶底的孔洞中以固定速率流出,类似于网络中以固定速率发送数据包。但写入速度不固定,也就是请求不是匀速产生的。相当于生产者生产消息不固定,消费者消费消息是匀速消费的。
特点二:有限容量
桶的容量有限,当桶满时,新到达的水会溢出,即拒绝超过容量的请求。
特点三:先进先出(FIFO)
水按照先进先出的顺序从桶中流出,类似于请求的处理顺序。
四.漏桶算法的基本实现
应用场景一:假设有一个视频上传服务,为了确保服务器稳定,希望限制用户在1分钟内最多只能上传5个视频。此时可以使用漏桶算法,将每个视频上传请求视为一个"水滴"。桶的容量设为5个水滴,出水速率设为每分钟5个水滴。当用户上传速度超过限制时,多余的请求将被拒绝。
应用场景二:假设有一个向用户发送电子邮件的后台任务,为了确保邮件发送系统的稳定性,希望限制每秒钟最多发送10封邮件。此时可以使用漏桶算法来限制消费MQ的处理速率,将桶的容量设为10个水滴,出水速率设为每秒钟10个水滴。这样,邮件发送系统每秒最多只会处理10封电子邮件。
漏桶算法的代码实现如下:
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
//Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
...
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
//检查流控规则
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
//调用规则检查器FlowRuleChecker的checkFlow()方法进行检查
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
//Rule checker for flow control rules.
public class FlowRuleChecker {
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//从Map中获取resource资源对应的流控规则列表
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
//循环遍历每一个流控规则
for (FlowRule rule : rules) {
//调用canPassCheck方法进行流控规则验证,判断此次请求是否命中针对resource资源配置的流控规则
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount) {
return canPassCheck(rule, context, node, acquireCount, false);
}
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//参数校验
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
//如果是集群模式,则执行passClusterCheck()方法
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
//如果是单机模式,则执行passLocalCheck()方法
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//选择Node作为限流计算的依据
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//先通过FlowRule.getRater()方法获取流控规则对应的流量整形控制器
//然后调用TrafficShapingController.canPass()方法对请求进行检查
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
...
}
//Default throttling controller (immediately reject strategy).
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//获取当前请求数
int curCount = avgUsedTokens(node);
//如果当前请求数 + 1超出阈值,那么返回失败
if (curCount + acquireCount > count) {
//进行优先级逻辑处理
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
//PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
//如果当前请求数+1没有超出阈值,则返回成功
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
private void sleep(long timeMillis) {
try {
Thread.sleep(timeMillis);
} catch (InterruptedException e) {
//Ignore.
}
}
}流程图如下:
(2)RateLimiterController如何实现排队等待效果
当流控规则中指定的流控效果是排队等待时,对应的流量整形控制器是RateLimiterController。
当调用FlowSlot的entry()方法对请求进行流控规则验证时,最终会调用RateLimiterController的canPass()方法来对请求进行检查。
RateLimiterController实现排队等待的效果时使用了漏桶算法。既然使用了漏桶算法,那么就一定包含如下字段:
字段一:count,表示QPS阈值,即QPS超出多少后会进行限流。
字段二:latestPassedTime,表示最近允许请求通过的时间。有了这个参数,就能计算出当前请求最早的预期通过时间。
字段三:maxQueueingTimeMs,表示排队时的最大等待时间。
(3)RateLimiterController如何判断是否超出阈值
在RateLimiterController的canPass()方法中,为了判断是否超出QPS阈值,通过原子类变量latestPassedTime简化成单线程让请求先后通过的处理模型。为了尽量让业务不受Sentinel影响,采用预估请求的被处理时间点的方式。也就是无需等前面的请求完全被处理完,才确定后面的请求被处理的时间。因为在普通的漏桶算法中,是处理完一个请求,才从漏桶取出水滴。而RateLimiterController的漏桶算法,则是假设请求已经被通过了。
具体的判断逻辑如下:首先获取系统的当前时间currentTime。然后计算在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小时间间隔costTime。接着计算当前请求最早的预期通过时间expectedTime,也就是此次请求预计会在几时几分几秒内通过。最后比较expectedTime和currentTime就可知当前请求是否允许通过了。
一.如果expectedTime小于等于currentTime
也就是当前请求最早的预期通过时间比系统当前时间小。如果在此时(currentTime)通过当前请求,则当前请求的通过时间就比它最早的预期通过时间(expectedTime)要晚,即当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值。于是返回true允许通过,同时更新最近允许请求通过的时间戳为当前时间。
二.如果expectedTime大于currentTime
也就是当前请求最早的预期通过时间比系统当前时间大。如果在此时(currentTime)通过当前请求,则当前请求的通过时间就比它最早的预期通过时间(expectedTime)要早,即当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小,所以此时必然会超QPS阈值。因此返回进行等待或者返回false不允许通过,等待的最小时间就是:最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间。
需要注意:Sentinel流量控制的漏桶算法,只能限制在costTime内的流量激增,限制不了costTime外的流量激增。比如系统启动完一瞬间就涌入大量并发请求,此时的流量激增限制不了。又比如系统处理完正常流量的最后一个请求,隔了costTime+的时间后,突然涌入超QPS阈值的并发请求,此时也限制不了这种情况的流量激增。但如果系统处理完正常流量的最后一个请求,隔了costTime-的时间后,突然涌入超QPS阈值的并发请求,此时则可以限制这种情况的流量激增。
public class RateLimiterController implements TrafficShapingController { //排队等待的意思是超出阈值后等待一段时间,maxQueueingTimeMs就是请求在队列中的最大等待时间 private final int maxQueueingTimeMs; //流控规则中限制QPS的阈值,也就是QPS超出多少后会进行限制 private final double count; //最近允许一个请求通过的时间,每次请求通过后就会更新此时间,可以根据该时间计算出当前请求最早的预期通过时间 //注意:Sentinel是在业务前面的,尽量不要让业务受到Sentinel的影响,所以不需要等请求完全被处理完,才确定请求被通过的时间 private final AtomicLong latestPassedTime = new AtomicLong(-1); public RateLimiterController(int timeOut, double count) { this.maxQueueingTimeMs = timeOut; this.count = count; } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { //Pass when acquire count is less or equal than 0. //acquireCount代表每次从桶底流出多少个请求 //如果acquireCount小于等于0,则表示无需限流直接通过,不过acquireCount一般默认是1 if (acquireCount
页:
[1]