概述

ClusterBuilderSlot 处于 NodeSelectorSlot 后一个节点,负责集群节点的采集.

解析

ClusterBuilderSlot 有个属性是 ClusterNode 类的,所以从ClusterNode开始解析。

ClusterNode

继承关系
image.png

成员变量

1
2
private Map<String, StatisticNode> originCountMap = new HashMap();
private final ReentrantLock lock = new ReentrantLock();

新增了3个方法

getOrCreateOriginNode 获取/生成 OriginNode

在 ClusterBuilderSlot 的 entry 方法中如果 Node 有 Origin 则通过该方法创建 OriginNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Node getOrCreateOriginNode(String origin) {
StatisticNode statisticNode = (StatisticNode)this.originCountMap.get(origin);
if (statisticNode == null) {
try {
this.lock.lock();
statisticNode = (StatisticNode)this.originCountMap.get(origin);
if (statisticNode == null) {
statisticNode = new StatisticNode();
HashMap<String, StatisticNode> newMap = new HashMap(this.originCountMap.size() + 1);
newMap.putAll(this.originCountMap);
newMap.put(origin, statisticNode);
this.originCountMap = newMap;
}
} finally {
this.lock.unlock();
}
}

return statisticNode;
}

trace

1
2
3
4
5
6
7
8
public void trace(Throwable throwable, int count) {
if (count > 0) {
if (!BlockException.isBlockException(throwable)) {
this.increaseExceptionQps(count);
}

}
}

getOriginCountMap

1
2
3
public synchronized Map<String, StatisticNode> getOriginCountMap() {
return this.originCountMap;
}

ClusterBuilderSlot

继承关系
image.png
成员变量

1
2
3
4
5
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

private static final Object lock = new Object();

private volatile ClusterNode clusterNode = null;

entry() 方法

  1. clusterNode 是否为null,若是:
    1. clusterNode new一个对象
    2. 保存到 _clusterNodeMap _中,key为 nodeId;
  2. node 设置ClusterNode为 clusterNode;
  3. 上下文(context)中 origin不为空,则通过 clusterNode 设置来源节点;
  4. 执行 fireEntry() 方法调用后续节点的entry()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode();
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);

clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);

/*
* if context origin is set, we should get or create a new {@link Node} of
* the specific origin.
*/
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

exit() 方法

exit() 方法中未做多余内容直接交由下一个节点处理。

1
2
3
4
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}