Sentinel源码—7.参数限流和注解的实现
大纲1.参数限流的原理和源码
2.@SentinelResource注解的使用和实现
1.参数限流的原理和源码
(1)参数限流规则ParamFlowRule的配置Demo
(2)ParamFlowSlot根据参数限流规则验证请求
(1)参数限流规则ParamFlowRule的配置Demo
一.参数限流的应用场景
二.参数限流规则的属性
三.参数限流规则的配置Demo
一.参数限流的应用场景
传统的流量控制,一般是通过资源维度来限制某接口或方法的调用频率。但有时需要更细粒度地控制不同参数条件下的访问速率,即参数限流。参数限流允许根据不同的参数条件设置不同的流量控制规则,这种方式非常适合处理特定条件下的请求,因为能更加精细地管理流量。
假设有一个在线电影订票系统,某个接口允许用户查询电影的放映时间。但只希望每个用户每10秒只能查询接口1次,以避免过多的查询请求。这时如果直接将接口的QPS限制为5是不能满足要求的,因为需求是每个用户每5分钟只能查询1次,而不是每秒一共只能查询5次,因此参数限流就能派上用场了。
可以设置一个规则,根据用户ID来限制每个用户的查询频率,将限流的维度从资源维度细化到参数维度,从而实现每个用户每10秒只能查询接口1次。比如希望影院工作人员可以每秒查询10次,老板可以每秒查询100次,而购票者则只能每10秒查询一次,其中工作人员的userId值为100和200,老板的userId值为9999,那么可以如下配置:需要注意限流阈值是以秒为单位的,所以需要乘以统计窗口时长10。
二.参数限流规则的属性
public class ParamFlowRule extends AbstractRule {
...
//The threshold type of flow control (0: thread count, 1: QPS).
//流量控制的阈值类型(0表示线程数,1表示QPS)
private int grade = RuleConstant.FLOW_GRADE_QPS;
//Parameter index.
//参数下标
private Integer paramIdx;
//The threshold count.
//阈值
private double count;
//Original exclusion items of parameters.
//针对特定参数的流量控制规则列表
private List<ParamFlowItem> paramFlowItemList = new ArrayList<ParamFlowItem>();
//Indicating whether the rule is for cluster mode.
//是否集群
private boolean clusterMode = false;
...
}
//针对特定参数的流量控制规则
public class ParamFlowItem {
private String object;
private Integer count;
private String classType;
...
}三.参数限流规则的配置Demo
//This demo demonstrates flow control by frequent ("hot spot") parameters.
public class ParamFlowQpsDemo {
private static final int PARAM_A = 1;
private static final int PARAM_B = 2;
private static final int PARAM_C = 3;
private static final int PARAM_D = 4;
//Here we prepare different parameters to validate flow control by parameters.
private static final Integer[] PARAMS = new Integer[] {PARAM_A, PARAM_B, PARAM_C, PARAM_D};
private static final String RESOURCE_KEY = "resA";
public static void main(String[] args) throws Exception {
initParamFlowRules();
final int threadCount = 20;
ParamFlowQpsRunner<Integer> runner = new ParamFlowQpsRunner<>(PARAMS, RESOURCE_KEY, threadCount, 120);
runner.tick();
Thread.sleep(1000);
runner.simulateTraffic();
}
private static void initParamFlowRules() {
//QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg).
ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY)
.setParamIdx(0)
.setGrade(RuleConstant.FLOW_GRADE_QPS)
.setCount(5);
//We can set threshold count for specific parameter value individually.
//Here we add an exception item. That means:
//QPS threshold of entries with parameter `PARAM_B` (type: int) in index 0 will be 10, rather than the global threshold (5).
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
.setClassType(int.class.getName())
.setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));
//ParamFlowRuleManager类加载的一个时机是:它的静态方法被调用了
//所以下面会先初始化ParamFlowRuleManager,再执行loadRules()方法
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
}
}
public final class ParamFlowRuleManager {
private static final Map<String, List<ParamFlowRule>> PARAM_FLOW_RULES = new ConcurrentHashMap<>();
private final static RulePropertyListener PROPERTY_LISTENER = new RulePropertyListener();
private static SentinelProperty<List<ParamFlowRule>> currentProperty = new DynamicSentinelProperty<>();
static {
currentProperty.addListener(PROPERTY_LISTENER);
}
//Load parameter flow rules. Former rules will be replaced.
public static void loadRules(List<ParamFlowRule> rules) {
try {
//设置规则的值为rules
currentProperty.updateValue(rules);
} catch (Throwable e) {
RecordLog.info(" Failed to load rules", e);
}
}
static class RulePropertyListener implements PropertyListener<List<ParamFlowRule>> {
@Override
public void configUpdate(List<ParamFlowRule> list) {
Map<String, List<ParamFlowRule>> rules = aggregateAndPrepareParamRules(list);
if (rules != null) {
PARAM_FLOW_RULES.clear();
PARAM_FLOW_RULES.putAll(rules);
}
RecordLog.info(" Parameter flow rules received: {}", PARAM_FLOW_RULES);
}
@Override
public void configLoad(List<ParamFlowRule> list) {
Map<String, List<ParamFlowRule>> rules = aggregateAndPrepareParamRules(list);
if (rules != null) {
PARAM_FLOW_RULES.clear();
PARAM_FLOW_RULES.putAll(rules);
}
RecordLog.info(" Parameter flow rules received: {}", PARAM_FLOW_RULES);
}
...
}
...
}
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();
private T value = null;
public DynamicSentinelProperty() {
}
//添加监听器到集合
@Override
public void addListener(PropertyListener<T> listener) {
listeners.add(listener);
//回调监听器的configLoad()方法初始化规则配置
listener.configLoad(value);
}
//更新值
@Override
public boolean updateValue(T newValue) {
//如果值没变化,直接返回
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info(" Config will be updated to: {}", newValue);
//如果值发生了变化,则遍历监听器,回调监听器的configUpdate()方法更新对应的值
value = newValue;
for (PropertyListener<T> listener : listeners) {
listener.configUpdate(newValue);
}
return true;
}
...
}
//A traffic runner to simulate flow for different parameters.
class ParamFlowQpsRunner<T> {
private final T[] params;
private final String resourceName;
private int seconds;
private final int threadCount;
private final Map<T, AtomicLong> passCountMap = new ConcurrentHashMap<>();
private final Map<T, AtomicLong> blockCountMap = new ConcurrentHashMap<>();
private volatile boolean stop = false;
public ParamFlowQpsRunner(T[] params, String resourceName, int threadCount, int seconds) {
this.params = params;
this.resourceName = resourceName;
this.seconds = seconds;
this.threadCount = threadCount;
for (T param : params) {
passCountMap.putIfAbsent(param, new AtomicLong());
blockCountMap.putIfAbsent(param, new AtomicLong());
}
}
public void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
public void simulateTraffic() {
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new RunTask());
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
}
final class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("Begin to run! Go go go!");
System.out.println("See corresponding metrics.log for accurate statistic data");
Map<T, Long> map = new HashMap<>(params.length);
for (T param : params) {
map.putIfAbsent(param, 0L);
}
while (!stop) {
sleep(1000);
//There may be a mismatch for time window of internal sliding window.
//See corresponding `metrics.log` for accurate statistic log.
for (T param : params) {
System.out.println(String.format(
"[%d][%d] Parameter flow metrics for resource %s: pass count for param <%s> is %d, block count: %d",
seconds, TimeUtil.currentTimeMillis(), resourceName, param,
passCountMap.get(param).getAndSet(0), blockCountMap.get(param).getAndSet(0)
));
}
System.out.println("=============================");
if (seconds-- <= 0) {
stop = true;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("Time cost: " + cost + " ms");
System.exit(0);
}
}
final class RunTask implements Runnable {
@Override
public void run() {
while (!stop) {
Entry entry = null;
T param = generateParam();
try {
entry = SphU.entry(resourceName, EntryType.IN, 1, param);
//Add pass for parameter.
passFor(param);
} catch (BlockException e) {
//block.incrementAndGet();
blockFor(param);
} catch (Exception ex) {
//biz exception
ex.printStackTrace();
} finally {
//total.incrementAndGet();
if (entry != null) {
entry.exit(1, param);
}
}
sleep(ThreadLocalRandom.current().nextInt(0, 10));
}
}
}
//Pick one of provided parameters randomly.
private T generateParam() {
int i = ThreadLocalRandom.current().nextInt(0, params.length);
return params;
}
private void passFor(T param) {
passCountMap.get(param).incrementAndGet();
}
private void blockFor(T param) {
blockCountMap.get(param).incrementAndGet();
}
private void sleep(int timeMs) {
try {
TimeUnit.MILLISECONDS.sleep(timeMs);
} catch (InterruptedException e) {
}
}
}四.参数限流是如何进行数据统计
由于参数限流的数据统计需要细化到参数值的维度,所以使用参数限流时需要注意OOM问题。比如根据用户ID进行限流,且用户数量有几千万,那么CacheMap将会包含几千万个不会被移除的键值对,而且会随着进程运行时间的增长而不断增加,最后可能会导致OOM。
@Spi(order = -3000)
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//1.如果没配置参数限流规则,直接触发下一个Slot
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
//2.如果配置了参数限流规则,则调用ParamFlowSlot的checkFlow()方法,该方法执行完成后再触发下一个Slot
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) {
int paramIdx = rule.getParamIdx();
if (paramIdx < 0) {
if (-paramIdx <= length) {
rule.setParamIdx(length + paramIdx);
} else {
//Illegal index, give it a illegal positive value, latter rule checking will pass.
rule.setParamIdx(-paramIdx);
}
}
}
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
//1.如果没传递参数,则直接放行,代表不做参数限流逻辑
if (args == null) {
return;
}
//2.如果没给resourceWrapper这个资源配置参数限流规则,则直接放行
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
//3.获取此资源的全部参数限流规则,规则可能会有很多个,所以是个List
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
//4.遍历获取到的参数限流规则
for (ParamFlowRule rule : rules) {
//进行参数验证
applyRealParamIdx(rule, args.length);
//Initialize the parameter metrics.
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
//进行验证的核心方法:检查当前规则是否允许通过此请求,如果不允许,则抛出ParamFlowException异常
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args;
//Assign actual value with the result of paramFlowKey method
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
}五.参数限流验证请求的流程图总结
2.@SentinelResource注解的使用和实现
(1)@SentinelResource注解的使用
(2)@SentinelResource注解和实现
(1)@SentinelResource注解的使用
一.引入Sentinel Spring Boot Starter依赖
//Rule checker for parameter flow control.
public final class ParamFlowChecker {
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count, Object... args) {
if (args == null) {
return true;
}
//1.判断参数索引是否合法,这个就是配置参数限流时设置的下标,从0开始,也就是对应args里的下标
//比如0就代表args数组里的第一个参数,如果参数不合法直接放行,相当于参数限流没生效
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}
//2.判断参数值是不是空,如果是空直接放行
//Get parameter value.
Object value = args;
//Assign value with the result of paramFlowKey method
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
//If value is null, then pass
if (value == null) {
return true;
}
//3.集群限流
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
return passClusterCheck(resourceWrapper, rule, count, value);
}
//4.单机限流
return passLocalCheck(resourceWrapper, rule, count, value);
}
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
try {
if (Collection.class.isAssignableFrom(value.getClass())) {//基本类型
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else if (value.getClass().isArray()) {//数组类型
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else {//其他类型,也就是引用类型
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.warn(" Unexpected error", e);
}
return true;
}
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {//类型是QPS
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
//流控效果为排队等待
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
//流控效果为直接拒绝
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {//类型是Thread
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
return ++threadCount <= threshold;
}
return true;
}
...
}二.为方法添加@SentinelResource注解
下面的代码为sayHello()方法添加了@SentinelResource注解,并指定了资源名称为sayHello以及熔断降级时的回调方法fallback()。这样在请求sayHello()方法后,就可以在Sentinel Dashboard上看到此资源,然后就可以针对此资源进行一系列的规则配置了。
//Rule checker for parameter flow control.
public final class ParamFlowChecker {
...
static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
if (timeRecorderMap == null) {
return true;
}
//Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
if (timeRecorder == null) {
return true;
}
//AtomicLong timeRecorder = timeRecorderMap.get(value);
long lastPassTime = timeRecorder.get();
long expectedTime = lastPassTime + costTime;
if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
long waitTime = expectedTime - currentTime;
if (waitTime > 0) {
lastPastTimeRef.set(expectedTime);
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
}
}
return true;
} else {
Thread.yield();
}
} else {
return false;
}
}
}
private static ParameterMetric getParameterMetric(ResourceWrapper resourceWrapper) {
//Should not be null.
return ParameterMetricStorage.getParamMetric(resourceWrapper);
}
}(2)@SentinelResource注解和实现
利用Spring AOP拦截@SentinelResource注解,最后调用SphU.entry()方法来进行处理。
//Aspect for methods with {@link SentinelResource} annotation.@Aspectpublic class SentinelResourceAspect extends AbstractSentinelAspectSupport { //SentinelResource注解 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { //获取方法 Method originMethod = resolveMethod(pjp); //获取方法上的SentinelResource注解,有了这个注解,就可以获取到注解的各种属性值了 SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { //Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } //获取资源名称 String resourceName = getResourceName(annotation.value(), originMethod); //获取资源类型 EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); //创建一个Entry对象,通过SphU.entry(resourceName)将当前方法纳入Sentinel的保护体系 //如果当前资源的调用未触发任何Sentinel规则,则正常执行被拦截的方法,否则将执行对应的限流、熔断降级等处理逻辑 Entry entry = null; try { entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); return pjp.proceed(); } catch (BlockException ex) { //发生异常时,通过反射执行在注解中设置的降级方法 return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class
页:
[1]