Sentinel原理十:FlowSlot
概述
FlowSlot 处于 AuthoritySlot 节点后面,负责处理流量的校验,是限流的核心。
解析
entry() 方法
FlowRule 限流规则校验,具体校验由FlowRuleChecker处理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
// Flow rule map cannot be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
List<FlowRule> rules = flowRules.get(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) {
return FlowRuleChecker.passCheck(rule, context, node, count, prioritized);
}
FlowRuleChecker
规则校验入口 passCheck()
- 如果没有设置limitApp,则直接返回true;
- 如果开启了集群模式,则调用集群校验方法 passClusterCheck();不然使用 passLocalCheck()。
1 | static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, |
非集群模式校验 passLocalCheck()
1 | private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, |
核心是从rule中获取rater执行canPass方法(规则)。
getRater:1
2
3TrafficShapingController getRater() {
return controller;
}
getRater() 返回的是 TrafficShapingController 的实现类,这个是由 rule.setGrade 决定的
初始化过程:
- 首先配置Grade,例如rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
将 rule交由FlowRuleManager管理
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager#loadRules ->
com.alibaba.csp.sentinel.property.DynamicSentinelProperty#updateValue ->
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.FlowPropertyListener#configUpdate ->
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil#buildFlowRuleMap(java.util.List<com.alibaba.csp.sentinel.slots.block.flow.FlowRule>) ->
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil#buildFlowRuleMap(java.util.List<com.alibaba.csp.sentinel.slots.block.flow.FlowRule>, com.alibaba.csp.sentinel.util.function.Function<com.alibaba.csp.sentinel.slots.block.flow.FlowRule,K>, com.alibaba.csp.sentinel.util.function.Predicate<com.alibaba.csp.sentinel.slots.block.flow.FlowRule>, boolean) ->
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil#generateRater ->
com.alibaba.csp.sentinel.slots.block.flow.FlowRule#setRaterTrafficShapingController 的实现类主要有3个:
- DefaultController 简单qps限制
- RateLimiterController 排队等待(流量更均匀)
- WarmUpController 冷启动
具体的grade对应的实现如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
return new DefaultController(rule.getCount(), rule.getGrade());
}
DefaultController
根据grade的类型决定canPass时是累加线程数还是QPS.
1 | public class DefaultController implements TrafficShapingController { |
WarmUpController
https://github.com/alibaba/Sentinel/wiki/%E9%99%90%E6%B5%81—%E5%86%B7%E5%90%AF%E5%8A%A8
Warm Up(
RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过”冷启动”,让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。
这个场景主要用于启动需要额外开销的场景,例如建立数据库连接等。
它的实现是在 Guava 的算法的基础上实现的。然而,和 Guava 的场景不同,Guava 的场景主要用于调节请求的间隔,即 Leaky Bucket,而 Sentinel 则主要用于控制每秒的 QPS,即我们满足每秒通过的 QPS 即可,我们不需要关注每个请求的间隔,换言之,我们更像一个 Token Bucket。
我们用桶里剩余的令牌来量化系统的使用率。假设系统每秒的处理能力为 b,系统每处理一个请求,就从桶中取走一个令牌;每秒这个令牌桶会自动掉落b个令牌。令牌桶越满,则说明系统的利用率越低;当令牌桶里的令牌高于某个阈值之后,我们称之为令牌桶”饱和”。
当令牌桶饱和的时候,基于 Guava 的计算上,我们可以推出下面的公式:
rate(c)=m*c+ coldrate其中,rate 为当前请求和上一个请求的间隔时间,而 rate 是和令牌桶中的高于阈值的令牌数量成线形关系的。cold rate 则为当桶满的时候,请求和请求的最大间隔。通常是
coldFactor * rate(stable)。
简单的配置
1 | FlowRule rule = new FlowRule(); |
对warmUpPeriodInSec秒内的总流量进行限流为coldFactor - 1分之一
源码:
1 | public class WarmUpController implements TrafficShapingController { |
RateLimiterController
排队等待
- 计算通过acquireCount个请求需要花费的时间间隔;
- 计算当前时间之后还要等待多久waitTime;
- 如果waitTime没有大于最大等待时间maxQueueingTimeMs,则sleep(waitTime)。
1 | public class RateLimiterController implements TrafficShapingController { |
集群模式校验 passClusterCheck()
- 获取TokenService对象;
- 如果获取不到则走3,不然走4;
- 如果fallbackToLocalWhenFail为true则走本地校验,不然直接通过;
- 通过TokenService的requestToken方法获取限流判断结果;
- 使用applyTokenResult方法解析返回的结果,根据结果判断是通过/不通过/等待/走降级到本地;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {
// 令牌服务
TokenService clusterService = pickClusterService();
// 获取不到令牌服务则降级到本地处理
if (clusterService == null) {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
// 获取令牌结果
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
// Fallback to local flow control when token client or server for this rule is not available.
// If fallback is not enabled, then directly pass.
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
// 降级处理
private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
return passLocalCheck(rule, context, node, acquireCount, prioritized);
} else {
// The rule won't be activated, just pass.
return true;
}
}
// 处理TokenService的响应
private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context, DefaultNode node,
int acquireCount, boolean prioritized) {
switch (result.getStatus()) {
case TokenResultStatus.OK:
return true;
case TokenResultStatus.SHOULD_WAIT:
// Wait for next tick.
try {
Thread.sleep(result.getWaitInMs());
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
case TokenResultStatus.NO_RULE_EXISTS:
case TokenResultStatus.BAD_REQUEST:
case TokenResultStatus.FAIL:
case TokenResultStatus.TOO_MANY_REQUEST:
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
case TokenResultStatus.BLOCKED:
default:
return false;
}
}
exit() 方法
1 |
|