StatisticNode 结构
关键的引用树栈
从图上可知,StatisticNode 的调用栈的顺序,和几个类的关系,今天我们开始分析这几个类的结构和功能点
父类及其接口
Node, 用来统计的接口, 其方法大部分都是数据汇总类
/**
* Get incoming request per minute ({@code pass + block}).
*
* @return total request count per minute
*/
long totalRequest();
/**
* Get pass count per minute.
*
* @return total passed request count per minute
* @since 1.5.0
*/
long totalPass();
/**
* Get {@link Entry#exit()} count per minute.
*
* @return total completed request count per minute
*/
long totalSuccess();
/**
* Get blocked request count per minute (totalBlockRequest).
*
* @return total blocked request count per minute
*/
long blockRequest();
/**
* Get exception count per minute.
*
* @return total business exception count per minute
*/
long totalException();
/**
* Get pass request per second.
*
* @return QPS of passed requests
*/
double passQps();
/**
* Get block request per second.
*
* @return QPS of blocked requests
*/
double blockQps();
/**
* Get {@link #passQps()} + {@link #blockQps()} request per second.
*
* @return QPS of passed and blocked requests
*/
double totalQps();
// 还有很多 ...
成员变量和参数
/**
秒级的滑动时间窗口(时间窗口单位500ms), 1000ms / 2 , sampleCode 为 2
* Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans
* by given {@code sampleCount}.
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
最近1分钟内的统计记录 , 最小时间单位 1s
* Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
/**
* The counter for thread count. 访问线程计数
*/
private LongAdder curThreadNum = new LongAdder();
/**
* The last timestamp when metrics were fetched.
最近一次统计时间
*/
private long lastFetchTime = -1;
@Override
public Map<Long, MetricNode> metrics() {
// The fetch operation is thread-safe under a single-thread scheduler pool.
long currentTime = TimeUtil.currentTimeMillis();
// 当前时间戳
currentTime = currentTime - currentTime % 1000;
Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
// 从每秒的滑动窗口中获取指标list
List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();
long newLastFetchTime = lastFetchTime;
// Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).
// 滚动更新各个统计Node
for (MetricNode node : nodesOfEverySecond) {
if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {
metrics.put(node.getTimestamp(), node);
newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());
}
}
// 更新最新的时间
lastFetchTime = newLastFetchTime;
// 返回当前统计实体
return metrics;
}
// 后面都是接口的实现类,其计算逻辑都是依托于上面的2个滑动窗口计算而来
子类
DefaultNode结构
public class DefaultNode extends StatisticNode {
/**
* 一个resource 同一个context 下绑定一个 defaultNode
*/
private ResourceWrapper id;
/**
* Node下对应的执行子节点
*/
private volatile Set<Node> childList = new HashSet<>();
/**
* 如果该值为空, 会在ClusterBuilderSlot.entry中会进行初始化。
*/
private ClusterNode clusterNode;
}
ArrayMetric 滑动窗口结构
public class ArrayMetric implements Metric {
// 指标桶 数组
private final LeapArray<MetricBucket> data;
//举个例子
@Override
public long maxSuccess() {
// 返回当前时间戳的桶
data.currentWindow();
long success = 0;
// 循环当前所有桶内的数据
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
// 桶内的成功数据 拿到最大值
//。桶内的数据分步有个枚举类型 com.alibaba.csp.sentinel.slots.statistic.MetricEvent
if (window.success() > success) {
success = window.success();
}
}
return Math.max(success, 1);
}
// 后续全部都是在这个array中进行处理
...
}
这里面比较重要的2个方法
-
根据提供的时间戳获取桶项
-
判断桶内的成功数据
可以看看在代码中是如何实现的 -
根据提供的时间戳获取桶项
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 根据当前时间戳 , 除以当前设置的时间窗口单位 再用长度取模运算 得到的一个桶id
//
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
// 计算当前滑动窗口的开始位置, 用时间戳 % 窗口时间 算出当前窗口的时间长度 , 再时间戳减去当前长度, 得到开始的位置
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
// 看有没有初始化过该时间戳窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*在滑动窗口最开始的地方创建桶,cas操作保证只有1个能更新成功.
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
//先初始化一个当前窗口单位,开始时间为滑动窗口开始时间
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// cas 更新到当前array中
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// 资源锁竞争失败 再进行下一轮执行的线程竞争,再继续往下走
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 如果老的桶的开始时间等于当前的开始时间, 说明就是最新的桶了, 直接返回就行
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
// 如果窗口的开始时间是大于老桶的开始时间, 说明老的桶的数据需要重置了
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
// 使用同步锁(默认非公平锁),只有在过期的情况下才进去拿更新锁 大部分情况下不会有性能损耗
if (updateLock.tryLock()) {
try {
// 重置,分钟的时间窗口和秒级别的时间窗口是不一样的
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 不可能到这里,直接返回新的就行
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
- 获取桶里面成功的数据
获取当前滑动窗口的所有成功的数据,本质上就是将滑动窗口内所有的时间窗口单位全部累加,调用链如下所示
枚举
/**
这个枚举就决定了MetricBucket 里面的longAdder的数组大小
*/
public enum MetricEvent {
/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,
/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
}
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/280530.html