概述

FlowSlot 处于 AuthoritySlot 节点后面,负责处理流量的校验,是限流的核心。

解析

entry() 方法

FlowRule 限流规则校验,具体校验由FlowRuleChecker处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
// Flow rule map cannot be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();

List<FlowRule> rules = flowRules.get(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}

boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) {
return FlowRuleChecker.passCheck(rule, context, node, count, prioritized);
}

FlowRuleChecker

规则校验入口 passCheck()

  1. 如果没有设置limitApp,则直接返回true;
  2. 如果开启了集群模式,则调用集群校验方法 passClusterCheck();不然使用 passLocalCheck()。
1
2
3
4
5
6
7
8
9
10
11
12
13
static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
// 集群模式
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// 非集群模式
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

非集群模式校验 passLocalCheck()

1
2
3
4
5
6
7
8
9
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// 执行rule的
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

核心是从rule中获取rater执行canPass方法(规则)。

getRater:

1
2
3
TrafficShapingController getRater() {
return controller;
}

getRater() 返回的是 TrafficShapingController 的实现类,这个是由 rule.setGrade 决定的

初始化过程:

  1. 首先配置Grade,例如rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
  2. 将 rule交由FlowRuleManager管理

    com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager#loadRules ->
    com.alibaba.csp.sentinel.property.DynamicSentinelProperty#updateValue ->
    com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.FlowPropertyListener#configUpdate ->
    com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil#buildFlowRuleMap(java.util.List<com.alibaba.csp.sentinel.slots.block.flow.FlowRule>) ->
    com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil#buildFlowRuleMap(java.util.List<com.alibaba.csp.sentinel.slots.block.flow.FlowRule>, com.alibaba.csp.sentinel.util.function.Function<com.alibaba.csp.sentinel.slots.block.flow.FlowRule,K>, com.alibaba.csp.sentinel.util.function.Predicate<com.alibaba.csp.sentinel.slots.block.flow.FlowRule>, boolean) ->
    com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil#generateRater ->
    com.alibaba.csp.sentinel.slots.block.flow.FlowRule#setRater

    TrafficShapingController 的实现类主要有3个:

  • DefaultController 简单qps限制
  • RateLimiterController 排队等待(流量更均匀)
  • WarmUpController 冷启动

具体的grade对应的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
return new DefaultController(rule.getCount(), rule.getGrade());
}

DefaultController

根据grade的类型决定canPass时是累加线程数还是QPS.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class DefaultController implements TrafficShapingController {

private static final int DEFAULT_AVG_USED_TOKENS = 0;

private double count;
private int grade;

public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}

@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {\
// 根据grade的类型决定canPass时是累加线程数还是QPS.
int curCount = avgUsedTokens(node);
// 如果已经超过当前时间窗格的通过数上限了
if (curCount + acquireCount > count) {
// 有优先且grade为FLOW_GRADE_QPS
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
// 尝试借用后续时间窗格的通过数,返回等待时间
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
// 没有达到最大等待时间
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
// 等待
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);

// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}

private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

private void sleep(long timeMillis) {
try {
Thread.sleep(timeMillis);
} catch (InterruptedException e) {
// Ignore.
}
}
}
WarmUpController

https://github.com/alibaba/Sentinel/wiki/%E9%99%90%E6%B5%81—%E5%86%B7%E5%90%AF%E5%8A%A8

Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过”冷启动”,让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。
这个场景主要用于启动需要额外开销的场景,例如建立数据库连接等。
它的实现是在 Guava 的算法的基础上实现的。然而,和 Guava 的场景不同,Guava 的场景主要用于调节请求的间隔,即 Leaky Bucket,而 Sentinel 则主要用于控制每秒的 QPS,即我们满足每秒通过的 QPS 即可,我们不需要关注每个请求的间隔,换言之,我们更像一个 Token Bucket
我们用桶里剩余的令牌来量化系统的使用率。假设系统每秒的处理能力为 b,系统每处理一个请求,就从桶中取走一个令牌;每秒这个令牌桶会自动掉落b个令牌。令牌桶越满,则说明系统的利用率越低;当令牌桶里的令牌高于某个阈值之后,我们称之为令牌桶”饱和”。
当令牌桶饱和的时候,基于 Guava 的计算上,我们可以推出下面的公式:
rate(c)=m*c+ coldrate

其中,rate 为当前请求和上一个请求的间隔时间,而 rate 是和令牌桶中的高于阈值的令牌数量成线形关系的。cold rate 则为当桶满的时候,请求和请求的最大间隔。通常是 coldFactor * rate(stable)

简单的配置

1
2
3
4
5
6
7
FlowRule rule = new FlowRule();
rule.setResource(resourceName);
rule.setCount(20);
rule.setGrade(RuleConstant.GRADE_QPS);
rule.setLimitApp("default");
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP); // 冷启动
rule.setWarmUpPeriodSec(10); // 系统进入稳定状态的时间(即预热时长)

对warmUpPeriodInSec秒内的总流量进行限流为coldFactor - 1分之一

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public class WarmUpController implements TrafficShapingController {

protected double count;
private int coldFactor;
protected int warningToken = 0;
private int maxToken;
protected double slope;

protected AtomicLong storedTokens = new AtomicLong(0);
protected AtomicLong lastFilledTime = new AtomicLong(0);

public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}

public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
// count最大限流数,warmUpPeriodInSec启动时长,coldFactor 冷启动阈值
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}

this.count = count;

this.coldFactor = coldFactor;

// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
// 警戒线
// 对warmUpPeriodInSec秒内的总流量进行限流为coldFactor - 1分之一
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200
// 最大上限 = 警戒线+2倍warmUpPeriodInSec秒内的总流量的coldFactor+1分之一
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
// 计算斜率
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

}

@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();

long previousQps = (long) node.previousPassQps();
syncToken(previousQps);

// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}

return false;
}

protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}

long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);

if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}

}
// 当前的token = 时间* count / 1000
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;

// 添加令牌的判断前提条件:
// 当令牌的消耗程度远远低于警戒线的时候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
}
RateLimiterController

排队等待

  1. 计算通过acquireCount个请求需要花费的时间间隔;
  2. 计算当前时间之后还要等待多久waitTime;
  3. 如果waitTime没有大于最大等待时间maxQueueingTimeMs,则sleep(waitTime)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class RateLimiterController implements TrafficShapingController {

private final int maxQueueingTimeMs;
private final double count;

private final AtomicLong latestPassedTime = new AtomicLong(-1);

public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}

@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}

long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
// 计算通过acquireCount个请求需要花费的时间间隔
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

// Expected pass time of this request.
// 这些请求通过的时间
long expectedTime = costTime + latestPassedTime.get();
// 直接通过
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// 计算等待时间
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 如果等待时间大于最大等待时间则返回false
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
// sleep相应的时间
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}

}

集群模式校验 passClusterCheck()

  1. 获取TokenService对象;
  2. 如果获取不到则走3,不然走4;
  3. 如果fallbackToLocalWhenFail为true则走本地校验,不然直接通过;
  4. 通过TokenService的requestToken方法获取限流判断结果;
  5. 使用applyTokenResult方法解析返回的结果,根据结果判断是通过/不通过/等待/走降级到本地;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
    boolean prioritized) {
    try {
    // 令牌服务
    TokenService clusterService = pickClusterService();
    // 获取不到令牌服务则降级到本地处理
    if (clusterService == null) {
    return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }
    long flowId = rule.getClusterConfig().getFlowId();
    // 获取令牌结果
    TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
    return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
    // If client is absent, then fallback to local mode.
    } catch (Throwable ex) {
    RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
    }
    // Fallback to local flow control when token client or server for this rule is not available.
    // If fallback is not enabled, then directly pass.
    return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }
    // 降级处理
    private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
    boolean prioritized) {
    if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
    } else {
    // The rule won't be activated, just pass.
    return true;
    }
    }
    // 处理TokenService的响应
    private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context, DefaultNode node,
    int acquireCount, boolean prioritized) {
    switch (result.getStatus()) {
    case TokenResultStatus.OK:
    return true;
    case TokenResultStatus.SHOULD_WAIT:
    // Wait for next tick.
    try {
    Thread.sleep(result.getWaitInMs());
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return true;
    case TokenResultStatus.NO_RULE_EXISTS:
    case TokenResultStatus.BAD_REQUEST:
    case TokenResultStatus.FAIL:
    case TokenResultStatus.TOO_MANY_REQUEST:
    return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    case TokenResultStatus.BLOCKED:
    default:
    return false;
    }
    }

exit() 方法

1
2
3
4
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}