概述

StatisticSlot处于LogSlot 节点的下一位,作用是统计实时数据,例如线程数、异常请求数、通过请求数等,并执行注册的 ProcessorSlotEntryCallback 接口的回调方法。

解析

继承关系

image.png

entry() 方法

  1. fireEntry() 执行后续节点的操作;
  2. 线程数加一;
  3. 后续执行正常:
    1. 通过请求数加count;
    2. 如果有Origin, 则对应的 OriginNode 节点线程数加一,通过请求数加count;
    3. 如果资源类型是IN,则全局节点 ENTRY_NODE 节点线程数加一,通过请求数加count;
    4. 回调所有注册的 ProcessorSlotEntryCallback 接口的 onPass 方法;
    5. return;
  4. 后续执行抛出 PriorityWaitException 异常:
    1. 线程数加一;
    2. 如果有Origin, 则对应的 OriginNode 节点线程数加一;
    3. 如果资源类型是IN,则全局节点 ENTRY_NODE 节点线程数加一;
    4. 回调所有注册的 ProcessorSlotEntryCallback 接口的 onPass 方法;
    5. return;
  5. 后续执行抛出 BlockException 异常:
    1. 当前 curEntry 设置异常对象e;
    2. BlockQps 数加一;
    3. 如果有Origin, 则对应的 OriginNode 节点 BlockQps 数加一;
    4. 如果资源类型是IN,则全局节点 ENTRY_NODE 节点 BlockQps 数加一;
    5. 回调所有注册的 ProcessorSlotEntryCallback 接口的 onBlocked 方法;
    6. throw e;
  6. 后续执行抛出 Throwable 异常(其他异常):
    1. 当前 curEntry 设置异常对象e;
    2. ExceptionQps 数加一;
    3. 如果有Origin, 则对应的 OriginNode 节点 ExceptionQps 数加一;
    4. 如果资源类型是IN,则全局节点 ENTRY_NODE 节点 ExceptionQps 数加一;
    5. throw e;
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      @Override
      public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
      boolean prioritized, Object... args) throws Throwable {
      try {
      // Do some checking.
      fireEntry(context, resourceWrapper, node, count, prioritized, args);

      // Request passed, add thread count and pass count.
      node.increaseThreadNum();
      node.addPassRequest(count);

      if (context.getCurEntry().getOriginNode() != null) {
      // Add count for origin node.
      context.getCurEntry().getOriginNode().increaseThreadNum();
      context.getCurEntry().getOriginNode().addPassRequest(count);
      }

      if (resourceWrapper.getType() == EntryType.IN) {
      // Add count for global inbound entry node for global statistics.
      Constants.ENTRY_NODE.increaseThreadNum();
      Constants.ENTRY_NODE.addPassRequest(count);
      }

      // Handle pass event with registered entry callback handlers.
      for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
      handler.onPass(context, resourceWrapper, node, count, args);
      }
      } catch (PriorityWaitException ex) {
      node.increaseThreadNum();
      if (context.getCurEntry().getOriginNode() != null) {
      // Add count for origin node.
      context.getCurEntry().getOriginNode().increaseThreadNum();
      }

      if (resourceWrapper.getType() == EntryType.IN) {
      // Add count for global inbound entry node for global statistics.
      Constants.ENTRY_NODE.increaseThreadNum();
      }
      // Handle pass event with registered entry callback handlers.
      for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
      handler.onPass(context, resourceWrapper, node, count, args);
      }
      } catch (BlockException e) {
      // Blocked, set block exception to current entry.
      context.getCurEntry().setError(e);

      // Add block count.
      node.increaseBlockQps(count);
      if (context.getCurEntry().getOriginNode() != null) {
      context.getCurEntry().getOriginNode().increaseBlockQps(count);
      }

      if (resourceWrapper.getType() == EntryType.IN) {
      // Add count for global inbound entry node for global statistics.
      Constants.ENTRY_NODE.increaseBlockQps(count);
      }

      // Handle block event with registered entry callback handlers.
      for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
      handler.onBlocked(e, context, resourceWrapper, node, count, args);
      }

      throw e;
      } catch (Throwable e) {
      // Unexpected error, set error to current entry.
      context.getCurEntry().setError(e);

      // This should not happen.
      node.increaseExceptionQps(count);
      if (context.getCurEntry().getOriginNode() != null) {
      context.getCurEntry().getOriginNode().increaseExceptionQps(count);
      }

      if (resourceWrapper.getType() == EntryType.IN) {
      Constants.ENTRY_NODE.increaseExceptionQps(count);
      }
      throw e;
      }
      }

exit() 方法

  1. 如果执行过程中没有发生异常(curEntry中没有异常对象存储):
    1. rt = 当前时间减去createTime
    2. 如果 rt > TIME_DROP_VALVE,就让rt = TIME_DROP_VALVE;
    3. 设置rt和count;
    4. 如果有Origin, 则对应的 OriginNode 节点设置 rt,并增加通过数;
    5. 减去node的线程数;
    6. 如果有Origin, 则对应的 OriginNode 节点线程数减一;
    7. 如果资源类型是IN,则全局节点 ENTRY_NODE 节点 设置 rt、增加通过数、线程数减一;
  2. 回调所有注册的 ProcessorSlotEntryCallback 接口的 onExit 方法;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    DefaultNode node = (DefaultNode)context.getCurNode();

    if (context.getCurEntry().getError() == null) {
    // Calculate response time (max RT is TIME_DROP_VALVE).
    long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
    if (rt > Constants.TIME_DROP_VALVE) {
    rt = Constants.TIME_DROP_VALVE;
    }

    // Record response time and success count.
    node.addRtAndSuccess(rt, count);
    if (context.getCurEntry().getOriginNode() != null) {
    context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
    }

    node.decreaseThreadNum();

    if (context.getCurEntry().getOriginNode() != null) {
    context.getCurEntry().getOriginNode().decreaseThreadNum();
    }

    if (resourceWrapper.getType() == EntryType.IN) {
    Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
    Constants.ENTRY_NODE.decreaseThreadNum();
    }
    } else {
    // Error may happen.
    }

    // Handle exit event with registered exit callback handlers.
    Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
    for (ProcessorSlotExitCallback handler : exitCallbacks) {
    handler.onExit(context, resourceWrapper, count, args);
    }

    fireExit(context, resourceWrapper, count);
    }