源码分析 Sentinel 实时数据采集实现原理

点击上方 “中间件兴趣圈” 选择 “设为星标”

做积极的人,越努力越幸运!

本篇将重点关注 Sentienl 实时数据收集,即 Sentienl 具体是如何收集调用信息,以此来判断是否需要触发限流或熔断。

Sentienl 实时数据收集的入口类为 StatisticSlot。

我们先简单来看一下 StatisticSlot 该类的注释,来看一下该类的整体定位。

StatisticSlot,专用于实时统计的 slot。在进入一个资源时,在执行 Sentienl 的处理链条中会进入到该 slot 中,需要完成如下计算任务:

  • 集群维度计算资源的总统计信息,用于集群限流,后续文章将详细探讨。

  • 来自不同调用方/来源的群集节点的统计信息。

  • 特定调用上下文环境的统计信息。

  • 统计所有入口的统计信息。

接下来用源码分析的手段来详细分析 StatisticSlot 的实现原理。

1、源码分析 StatisticSlot

1.1 StatisticSlot entry 详解

StatisticSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable { 
    try {
        // Do some checking.
                fireEntry(context, resourceWrapper, node, count, prioritized, args);  // @1
            // Request passed, add thread count and pass count.
            node.increaseThreadNum();                                                             // @2
               node.addPassRequest(count);
        if (context.getCurEntry().getOriginNode() != null) {                           // @3
            // Add count for origin node.
                    context.getCurEntry().getOriginNode().increaseThreadNum();
                    context.getCurEntry().getOriginNode().addPassRequest(count);
                }
        if (resourceWrapper.getEntryType() == EntryType.IN) {                // @4
            // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseThreadNum();
                   Constants.ENTRY_NODE.addPassRequest(count);
            }
        // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {   // @5
                    handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {                                                                                                                                // @6
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
                    context.getCurEntry().getOriginNode().increaseThreadNum();
               }
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {     // @7                                                                                                              
            // Blocked, set block exception to current entry.
            context.getCurEntry().setError(e);
        // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }
        if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseBlockQps(count);
            }
            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }
        throw e;
        } catch (Throwable e) {   // @8
            // Unexpected error, set error to current entry.
            context.getCurEntry().setError(e);
        // This should not happen.
            node.increaseExceptionQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().increaseExceptionQps(count);
            }
        if (resourceWrapper.getEntryType() == EntryType.IN) {
                    Constants.ENTRY_NODE.increaseExceptionQps(count);
            }
            throw e;
        }
}

代码@1:首先调用 fireEntry,先调用 Sentinel Slot Chain 中其他的处理器,执行完其他处理器的逻辑,例如 FlowSlot、DegradeSlot,因为 StatisticSlot 的职责是收集统计信息。

代码@2:如果后续处理器成功执行,则将正在执行线程数统计指标加一,并将通过的请求数量指标增加对应的值。下文会对 Sentinel Node 体系进行详细的介绍,在 Sentinel 中使用 Node 来表示调用链中的某一个节点,每个节点关联一个资源,资源的实时统计信息就存储在 Node 中,故该部分也是调用 DefaultNode 的相关方法来改变线程数等,将在下文会向详细介绍。

代码@3:如果上下文环境中保存了调用的源头(调用方)的节点信息不为空,则更新该节点的统计数据:线程数与通过数量。

代码@4:如果资源的进入类型为 EntryType.IN,表示入站流量,更新入站全局统计数据(集群范围 ClusterNode)。

代码@5:执行注册的进入Handler,可以通过 StatisticSlotCallbackRegistry 的 addEntryCallback 注册相关监听器。

代码@6:如果捕获到 PriorityWaitException ,则认为是等待过一定时间,但最终还是算通过,只需增加线程的个数,但无需增加节点通过的数量,具体原因我们在详细分析限流部分时会重点讨论,也会再次阐述 PriorityWaitException 的含义。

代码@7:如果捕获到 BlockException,则主要增加阻塞的数量。

代码@8:如果是系统异常,则增加异常数量。

我想上面的代码应该不难理解,但涉及到统计指标数据的变化,都是调用 DefaultNode node 相关的方法,从这里也可以看出,Node 将是实时统计数据的直接持有者,那毋容置疑接下来将重点来学习 Node,为了知识体系的完备性,我们先来看一下 StatisticSlot 的 exit 方法。

1.2 StatisticSlot exit 详解

StatisticSlot#exit

public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    DefaultNode node = (DefaultNode)context.getCurNode();
    if (context.getCurEntry().getError() == null) {         // @1
        // Calculate response time (max RT is TIME_DROP_VALVE).
        long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
        if (rt > Constants.TIME_DROP_VALVE) {
            rt = Constants.TIME_DROP_VALVE;
            }
        // Record response time and success count.
        node.addRtAndSuccess(rt, count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
                   }
        node.decreaseThreadNum();
           if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().decreaseThreadNum();
                }
           if (resourceWrapper.getEntryType() == EntryType.IN) {
                   Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
                   Constants.ENTRY_NODE.decreaseThreadNum();
               }
        } else {
            // Error may happen.
        }
    // Handle exit event with registered exit callback handlers.
        Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
        for (ProcessorSlotExitCallback handler : exitCallbacks) {      // @2
        handler.onExit(context, resourceWrapper, count, args);
         }
    fireExit(context, resourceWrapper, count);     // @3
}

代码@1:成功执行,则重点关注响应时间,其实现亮点如下:

计算本次响应时间,将本次响应时间收集到 Node 中。

将当前活跃线程数减一。

代码@2:执行退出时的 callback。可以通过 StatisticSlotCallbackRegistry 的 addExitCallback 方法添加退出回调函数。

代码@3:传播 exit 事件。

接下来我们将重点介绍 DefaultNode,即 Sentinel 的 Node 体系,持有资源的实时调用信息。

2、Sentienl Node 体系

2.1 Node 类体系图

我们先简单介绍一下上述核心类的作用与核心接口或核心属性的含义。

  • OccupySupport 支持抢占未来的时间窗口,有点类似借用“未来”的令牌。其核心方法如下:

    • long tryOccupyNext(long currentTime, int acquireCount, double threshold)

      尝试抢占未来的令牌,返回值为调用该方法的线程应该 sleep 的时间。

      1、long currentTime

      当前时间。

      2、int acquireCount

      本次需要申请的令牌个数。

      3、double threshold

      设置的阔值。

  • long waiting()

    获取当前已申请的未来的令牌的个数。

  • void addWaitingRequest(long futureTime, int acquireCount)

    申请未来时间窗口中的令牌。

  • void addOccupiedPass(int acquireCount)

    增加申请未来令牌通过的个数。

  • double occupiedPassQps()

    当前抢占未来令牌的QPS。

  • Node

    持有实时统计信息的节点。定义了收集统计信息与获取统计信息的接口,上面方法根据方法名称即可得知其含义,故这里就不一一罗列了。

  • StatisticNode

    实现统计信息的默认实现类。

  • DefaultNode

    用于在特定上下文环境中保存某一个资源的实时统计信息。

  • ClusterNode

    实现基于集群限流模式的节点,将在集群限流模式部分详细介绍。

  • EntranceNode

    用来表示调用链入口的节点信息。

本文将详细介绍 DefaultNode 与  StatisticNode,重点阐述调用树与实时统计信息。DefaultNode 是 StatisticNode 的子类,我们先从 StatisticNode 开始 Node 体系的探究。

3、StatisticNode 详解

3.1 核心类图

我们对其核心属性进行一一解读:

  • Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL)

    每秒的实时统计信息,使用 ArrayMetric 实现,即基于滑动窗口实现,正是上篇文章详细介绍的,默认1s 采样 2次。即一个统计周期中包含两个滑动窗口。

  • Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false)

    每分钟实时统计信息,同样使用 ArrayMetric  实现,即基于滑动窗口实现。每1分钟,抽样60次,即包含60个滑动窗口,每一个窗口的时间间隔为 1s 。

  • LongAdder curThreadNum = new LongAdder()

    当前线程计数器。

  • long lastFetchTime = -1

    上一次获取资源的有效统计数据的时间,即调用 Node 的 metrics() 方法的时间。

关于 ArrayMetric 滑动窗口设计与实现原理,请参考笔者的另一篇博文: Alibaba Seninel 滑动窗口实现原理(文末附原理图)

接下来我们挑选几个具有代表性的方法进行探究。

3.2 addPassRequest

public void addPassRequest(int count) {
    rollingCounterInSecond.addPass(count);
    rollingCounterInMinute.addPass(count);
}

增加通过请求数量。即将实时调用信息向滑动窗口中进行统计。addPassRequest 即报告成功的通过数量。就是分别调用 秒级、分钟即对应的滑动窗口中添加数量,然后限流规则、熔断规则将基于滑动窗口中的值进行计算。

3.3 totalRequest

public long totalRequest() {
    return rollingCounterInMinute.pass() + rollingCounterInMinute.block();
}

获取当前时间戳的总请求数,获取分钟级时间窗口中的统计信息。

3.4 successQps

public double successQps() {
    return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();
}

成功TPS,用秒级统计滑动窗口中统计的个数 除以 窗口的间隔得出其 tps,即抽样个数越大,其统计越精确。

温馨提示:上面的方法在学习了上文的滑动窗口设计原理后将显得非常简单,大家在学习的过程中,可以总结出一个规律,什么时候时候使用秒级滑动窗口,什么时候使用分钟级滑动窗口。

3.5 metrics

由于 Sentienl 基于滑动窗口来实时收集统计信息,并存储在内存中,并随着时间的推移,旧的滑动窗口将失效,故需要提供一个方法,及时将所有的统计信息进行汇总输出,供监控客户端定时拉取,转储都其他客户端,例如数据库,方便监控数据的可视化,这也通常是中间件用于监控指标的监控与采集的通用设计方法。

public Map<Long, MetricNode> metrics() {
    long currentTime = TimeUtil.currentTimeMillis();
    currentTime = currentTime - currentTime % 1000;   // @1
    Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
    List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();   // @2
    long newLastFetchTime = lastFetchTime;
    // Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).
    for (MetricNode node : nodesOfEverySecond) { 
        if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {    // @3
        metrics.put(node.getTimestamp(), node);
            newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());
        }
    }
    lastFetchTime = newLastFetchTime;
    return metrics;
}

代码@1:获取当前时间对应的滑动窗口的开始时间,可以对比上文计算滑动窗口的算法。

代码@2:获取一分钟内的所有滑动窗口中的统计数据,使用 MetricNode 表示。

代码@3:遍历所有节点,刷选出不是当前滑动窗口外的所有数据。这里的重点是方法:isNodeInTime。

private boolean isNodeInTime(MetricNode node, long currentTime) {
    return node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime;
}

这里只刷选出不是当前窗口的数据,即 metrics 方法返回的是“过去”的统计数据。

接下来我们再来看看 DefaultNode 相关的几个特性方法。

4、DefaultNode  详解

4.1 类图

DefaultNode 是 StatisticNode 的子类,其额外增加的属性如下:

  • private ResourceWrapper id

    资源id,即 DefaultNode 才真正与资源挂钩,可以将 DefaultNode 看出是调用链中的一个节点,并且与资源关联。

  • private volatile Set< Node > childList

    子节点结合。以此来维持其调用链。

  • private ClusterNode clusterNode

    集群节点,同样为 StatisticNode 的子类,表示与资源集群相关的环境。

接下来我们将来看一下 DefaultNode 的核心方法。

4.2 increaseBlockQps

public void increaseBlockQps(int count) {
    super.increaseBlockQps(count);
    this.clusterNode.increaseBlockQps(count);
}

DefaultNode 的此类方法,通常是先调用 StatisticNode 的方法,然后再调用 clusterNode 的相关方法,最终就是使用在对应的滑动窗口中增加或减少计量值。

其他方法也比较简单,就不再细看了,我们可以通过 DefaultNode 的 printDefaultNode 方法来打印该节点的调用链。

本文就介绍到这里了,本文详细介绍了 Sentinel 实时数据收集的统一入口 StatisticSlot,并且介绍了 Seninel Node 体系,即调用链中的每一个节点,每一个节点对一个资源的实时统计信息。

如果您喜欢这篇文章,点【在看】与转发是一种美德,期待您的认可与鼓励,越努力越幸运。

欢迎加入我的知识星球,一起交流源码,探讨架构 ,打造高质量的技术交流圈, 长按如下二维码

中间件兴趣圈 知识星球 正在对如下话题展开如火如荼的讨论:

1、【让天下没有难学的Netty-网络通道篇】

1、Netty4 Channel概述( 已发表

2、Netty4 ChannelHandler概述( 已发表

3、Netty4事件处理传播机制( 已发表

4、Netty4服务端启动流程( 已发表

5、Netty4 NIO 客户端启动流程

6、Netty4 NIO线程模型分析

7、Netty4编码器、解码器实现原理

8、Netty4 读事件处理流程

9、Netty4 写事件处理流程

10、Netty4 NIO Channel其他方法详解

2、Java 并发框架(JUC) 探讨【 面试神器

3、源码分析Alibaba Sentienl 专栏背后的

写作与学习技巧

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章