StatisticNode 结构


StatisticNode 结构

关键的引用树栈

StatisticNode 结构

从图上可知,StatisticNode 的调用栈的顺序,和几个类的关系,今天我们开始分析这几个类的结构和功能点

父类及其接口

Node, 用来统计的接口, 其方法大部分都是数据汇总类


    /**
     * Get incoming request per minute ({@code pass + block}).
     *
     * @return total request count per minute
     */
    long totalRequest();

    /**
     * Get pass count per minute.
     *
     * @return total passed request count per minute
     * @since 1.5.0
     */
    long totalPass();

    /**
     * Get {@link Entry#exit()} count per minute.
     *
     * @return total completed request count per minute
     */
    long totalSuccess();

    /**
     * Get blocked request count per minute (totalBlockRequest).
     *
     * @return total blocked request count per minute
     */
    long blockRequest();

    /**
     * Get exception count per minute.
     *
     * @return total business exception count per minute
     */
    long totalException();

    /**
     * Get pass request per second.
     *
     * @return QPS of passed requests
     */
    double passQps();

    /**
     * Get block request per second.
     *
     * @return QPS of blocked requests
     */
    double blockQps();

    /**
     * Get {@link #passQps()} + {@link #blockQps()} request per second.
     *
     * @return QPS of passed and blocked requests
     */
    double totalQps();
  
   // 还有很多 ...

成员变量和参数

    /** 
     秒级的滑动时间窗口(时间窗口单位500ms),  1000ms / 2 , sampleCode 为 2
     * Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans
     * by given {@code sampleCount}.
     */
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    /**
     最近1分钟内的统计记录 , 最小时间单位 1s
      * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
     * meaning each bucket per second, in this way we can get accurate statistics of each second.
     */
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

    /**
     * The counter for thread count. 访问线程计数
     */
    private LongAdder curThreadNum = new LongAdder();

    /**
     * The last timestamp when metrics were fetched.
       最近一次统计时间
     */
    private long lastFetchTime = -1;

    @Override
    public Map<Long, MetricNode> metrics() {
        // The fetch operation is thread-safe under a single-thread scheduler pool.
        long currentTime = TimeUtil.currentTimeMillis();
        // 当前时间戳
        currentTime = currentTime - currentTime % 1000;
        Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
        // 从每秒的滑动窗口中获取指标list
        List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();
        
        long newLastFetchTime = lastFetchTime;
        // Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).
        //  滚动更新各个统计Node
        for (MetricNode node : nodesOfEverySecond) {
            if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {
                metrics.put(node.getTimestamp(), node);
                newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());
            }
        }
        // 更新最新的时间
        lastFetchTime = newLastFetchTime;
        // 返回当前统计实体
        return metrics;
    }
    // 后面都是接口的实现类,其计算逻辑都是依托于上面的2个滑动窗口计算而来

子类

DefaultNode结构

public class DefaultNode extends StatisticNode {

    /**
     * 一个resource 同一个context 下绑定一个 defaultNode 
     */
    private ResourceWrapper id;
    /**
     * Node下对应的执行子节点
     */
    private volatile Set<Node> childList = new HashSet<>();
    /**
     * 如果该值为空, 会在ClusterBuilderSlot.entry中会进行初始化。
     */
    private ClusterNode clusterNode;
}

ArrayMetric 滑动窗口结构

public class ArrayMetric implements Metric {
    // 指标桶 数组
    private final LeapArray<MetricBucket> data;

    //举个例子
    @Override
    public long maxSuccess() {
        //  返回当前时间戳的桶
        data.currentWindow();
        long success = 0;
        // 循环当前所有桶内的数据
        List<MetricBucket> list = data.values();
        for (MetricBucket window : list) {
            // 桶内的成功数据 拿到最大值
            //。桶内的数据分步有个枚举类型 com.alibaba.csp.sentinel.slots.statistic.MetricEvent
            if (window.success() > success) {
                success = window.success();
            }
        }
        return Math.max(success, 1);
    }
        // 后续全部都是在这个array中进行处理
    ...
}

这里面比较重要的2个方法

  1. 根据提供的时间戳获取桶项

  2. 判断桶内的成功数据
    可以看看在代码中是如何实现的

  3. 根据提供的时间戳获取桶项

public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 根据当前时间戳 , 除以当前设置的时间窗口单位 再用长度取模运算 得到的一个桶id
        // 
        int idx = calculateTimeIdx(timeMillis);
        // Calculate current bucket start time.
        // 计算当前滑动窗口的开始位置,   用时间戳 % 窗口时间 算出当前窗口的时间长度 , 再时间戳减去当前长度, 得到开始的位置 
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        while (true) {
            // 看有没有初始化过该时间戳窗口 
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *在滑动窗口最开始的地方创建桶,cas操作保证只有1个能更新成功.
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                 //先初始化一个当前窗口单位,开始时间为滑动窗口开始时间
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                // cas 更新到当前array中
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // 资源锁竞争失败 再进行下一轮执行的线程竞争,再继续往下走
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
            
                // 如果老的桶的开始时间等于当前的开始时间, 说明就是最新的桶了, 直接返回就行
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                // 如果窗口的开始时间是大于老桶的开始时间, 说明老的桶的数据需要重置了
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                 // 使用同步锁(默认非公平锁),只有在过期的情况下才进去拿更新锁 大部分情况下不会有性能损耗
                if (updateLock.tryLock()) {
                    try {
                         // 重置,分钟的时间窗口和秒级别的时间窗口是不一样的
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // 不可能到这里,直接返回新的就行
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
  1. 获取桶里面成功的数据

获取当前滑动窗口的所有成功的数据,本质上就是将滑动窗口内所有的时间窗口单位全部累加,调用链如下所示

StatisticNode 结构

枚举

/**
 这个枚举就决定了MetricBucket 里面的longAdder的数组大小
 */
public enum MetricEvent {

    /**
     * Normal pass.
     */
    PASS,
    /**
     * Normal block.
     */
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,

    /**
     * Passed in future quota (pre-occupied, since 1.5.0).
     */
    OCCUPIED_PASS
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/280530.html

(0)
上一篇 2022年8月14日
下一篇 2022年8月14日

相关推荐

发表回复

登录后才能评论