概述

最近对 sentinel 框架的使用作了一个了解,为了在团队内对 sentinel 做分享学习一下其中的原理。

源码解析

看一下简单的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void entry(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule("run");
rule.setCount(3);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
rules.add(rule);
FlowRuleManager.loadRules(rules);
Entry entry = null;
try {
logger.info("run");
entry = SphU.entry("run");
//Thread.sleep((long)(200*1000*Math.random()));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("run end");
} catch (BlockException e) {
logger.info("限流");
longAdder.increment();
//logger.info("异常", e);
} finally {
if (entry != null ) {
entry.exit();
}
}
}

根据上面的代码可以知道 sentinel 对代码的限流/熔断都是由 Entry 类和 SphU 类作为入口的,所以源码解读可以从这些类开始.

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责。

Entry类

Entry 类是一个抽象类

继承关系

1
public abstract class Entry implements AutoCloseable{}

成员变量

1
2
3
4
5
6
7
8
9
10
private static final Object[] OBJECTS0 = new Object[0];

private long createTime;
private Node curNode;
/**
* {@link Node} of the specific origin, Usually the origin is the Service Consumer.
*/
private Node originNode;
private Throwable error;
protected ResourceWrapper resourceWrapper;

方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public ResourceWrapper getResourceWrapper() {
return resourceWrapper;
}
// 退出时执行
/**
* Complete the current resource entry and restore the entry stack in context.
*
* @throws ErrorEntryFreeException if entry in current context does not match current entry
*/
public void exit() throws ErrorEntryFreeException {
exit(1, OBJECTS0);
}

public void exit(int count) throws ErrorEntryFreeException {
exit(count, OBJECTS0);
}

/**
* Equivalent to {@link #exit()}. Support try-with-resources since JDK 1.7.
*
* @since 1.5.0
*/
@Override
public void close() {
exit();
}

/**
* Exit this entry. This method should invoke if and only if once at the end of the resource protection.
*
* @param count tokens to release.
* @param args extra parameters
* @throws ErrorEntryFreeException, if {@link Context#getCurEntry()} is not this entry.
*/
public abstract void exit(int count, Object... args) throws ErrorEntryFreeException;

/**
* Exit this entry.
*
* @param count tokens to release.
* @param args extra parameters
* @return next available entry after exit, that is the parent entry.
* @throws ErrorEntryFreeException, if {@link Context#getCurEntry()} is not this entry.
*/
protected abstract Entry trueExit(int count, Object... args) throws ErrorEntryFreeException;

/**
* Get related {@link Node} of the parent {@link Entry}.
*
* @return
*/
public abstract Node getLastNode();

public long getCreateTime() {
return createTime;
}

public Node getCurNode() {
return curNode;
}

public void setCurNode(Node node) {
this.curNode = node;
}

public Throwable getError() {
return error;
}

public void setError(Throwable error) {
this.error = error;
}

/**
* Get origin {@link Node} of the this {@link Entry}.
*
* @return origin {@link Node} of the this {@link Entry}, may be null if no origin specified by
* {@link ContextUtil#enter(String name, String origin)}.
*/
public Node getOriginNode() {
return originNode;
}

public void setOriginNode(Node originNode) {
this.originNode = originNode;
}

SphU.entry

SphU.entry 是最开始的入口,委托了 Env.sph.entry 执行

1
2
3
4
5
private static final Object[] OBJECTS0 = new Object[0];

public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}

Env 类

sph 的实现类是 CtSph

1
2
3
4
5
6
7
8
9
public class Env {

public static final Sph sph = new CtSph();

static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}

Sph

Sph 用于对资源的保护,如果发生 block 就会抛出 BlockException,成功就会返回一个 entry。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public interface Sph {

Entry entry(String name) throws BlockException;

Entry entry(Method method) throws BlockException;

Entry entry(Method method, int count) throws BlockException;

Entry entry(String name, int count) throws BlockException;

Entry entry(Method method, EntryType type) throws BlockException;

Entry entry(String name, EntryType type) throws BlockException;

Entry entry(Method method, EntryType type, int count) throws BlockException;

Entry entry(String name, EntryType type, int count) throws BlockException;

Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException;

Entry entry(String name, EntryType type, int count, Object... args) throws BlockException;

AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException;

Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized) throws BlockException;

Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized, Object... args)
throws BlockException;
}

CtSph

继承关系

1
public class CtSph implements Sph{}

成员变量

1
2
3
4
5
private static final Object[] OBJECTS0 = new Object[0];
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>();

private static final Object LOCK = new Object();

核心方法

entryWithPriority()

所有同步的都是走的entryWithPriority,只是带不带优先级的区别

执行逻辑

  1. 获取上下文Context;
  2. 如果上下文是NullContext,则返回new CtEntry(resourceWrapper, null, context);
  3. 如果上下文为空,生成默认上下文;
  4. 如果Constants.ON为false,则返回new CtEntry(resourceWrapper, null, context);
  5. 获取插槽链;
  6. 如果插槽链为Null,则返回new CtEntry(resourceWrapper, null, context);
  7. 生成带插槽链的Entry;
  8. 执行插槽链的entry方法;
  9. 返回Entry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 从ThreadLocal中获取上下文
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
// 使用默认上下文
if (context == null) {
// Using default context.
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}

// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 生成插槽链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
// 生成带插槽链的 Entry
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 这里执行 chain.entry
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
插槽链 lookProcessChain
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}

chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}