Prometheus源码 – memSeries
Prolegomena
Prometheus无疑是一款优秀的开源系统监控报警框架,在云原生的时代发挥着重要作用。它提供近实时的、基于动态云环境和容器微服务、服务以及应用程序的内省监控。同时也用于监控传统架构的资源。Fortunately,笔者每天的工作都会与Prometheus打交道,在使用过程中它体现的高效无不让人着迷。同时,笔者对于这款CNCF设计思想产生了浓厚的兴趣,这个框架是如何做到单一节点可以处理数以百万的监控指标,每秒处理数十万的数据点? 带着这种疑惑与兴趣,开始了prometheus设计思想和工程实践的探索。它的高效必然离不开一款高效的时序数据库的支持TSDB。本文会对TSDB的其中一个memSeries模块进行源码层面的剖析。借此机会,笔者也分享下看源码的心得,对于这种体量级别的应用,看其源码无异于自己置身扁舟漂泊在太平洋,无头绪,无方向,充满挑战。Efficiently,我们可以找一篇写的很有总结性质的好文章(Prometheus时序数据库-内存中的存储结构),然后阅读下该模块所涉及到的技术论文(Gorilla: A Fast, Scalable, In-Memory Time Series Database),这样再去撸源码就事半功倍了。
Introduction
笔者主要是围绕memSeries源码(prometheus/prometheus-main/tsdb/head.go)进行,顺序性的解读。因为还没有系统性的将整个工程源码都读完,而且这类resource很少,无法站在更高的角度将memSeries的方法具体作用和Prometheus实际使用完美的串联起来。Hence,本文主要讲memSeries源码实现过程和方法的效果。后续笔者会不断的解读Prometheus其他模块,这样一步步的将其各个模块串联起来,读者可以关注后续的update。
MemSeries Attributes
type memSeries struct {
sync.RWMutex
ref uint64
lset labels.Labels
mmappedChunks []*mmappedChunk
headChunk *memChunk
chunkRange int64
firstChunkID int
nextAt int64 // Timestamp at which to cu? the next chunk.
sampleBuf [4]sample
pendingCommit bool // Whether there are samples waiting to be commi?ted to~ this series.
app chunkenc.Appender // Current appender for the chunk.
memChunkPool *sync.Pool
txs *txRing
}
-
ref
:每接受一个新的时间序列(e.g. ht?p_request??_total{path=”/”, method=”GET”},NTC. 时间序列=指标(e.g. h?tp_request_total) + 标签(k,v)) -
lset
:这个是识别这个series的标签集合。// 源码 lset labels.Labels | -> type Labels []Label | -> type Label struct { Name, Value string }
-
mmappedChunks
:type mmappedChunk struct { ref uint64 numSamples uint16 minTime, maxTime int64 }
sample(t,v)是这个时间序列某个时间的采集数据经过gorilla压缩后的数据。sample是被存储到memchunk中,其中达到120个(默认15s采集一次数据,生成一个sample。Hence,120*15s=30min Full chunk),或者
chunkRange
的两个小时(memSeries -> chunkRange 参数决定) 都可以叫做这个chunk是full(ref. Once the chunk fills till 120 samples (or) spans upto chunk/block range (let’s call it chunkRange
), which is 2h by default, a new chunk is cut and the old chunk is said to be “full”. Link: https://ganeshvernekar.com/blog/prometheus-tsdb-the-head-block/)
这个也可以在源码中考究:
// tsdb/head.go
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
-> const samplesPerChunk = 120
当 active chunk 写满sample后,就会使用 chunks.ChunkDiskMapper -> chunkDiskMapper.WriteChunk()
写入到磁盘中,同时生成 chunkRef
,这个值 represent :该时间序列磁盘中的chunk 在内存中的映射
。 mmappedChunks []*mmappedChunk
维护着该时间序列的所有的chunk。
源码探索:
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) {
...
// 将full chunk 写入到磁盘
chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk)
// 建立内存映射
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
...
})
...
}
-
headChunk
:type memChunk struct { chunk chunkenc.Chunk minTime, maxTime int64 }
headChunk
一般都是active chunk 一直有samples写入。
Relevant Methods:
// 将目前的headchunk 写入磁盘同时在内存中建立映射保存元数据,memSeries的指针在指向新的headChunk
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk{ ... }
// 将headChunk 写入磁盘 建立M-map映射
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper){ ... }
那我们会疑惑一个问题:headChunk 满足什么条件才会触发memSeries指针指向新的headChunk,以及落盘和创建内存映射呢?这里,笔者画了一个flow chart. 关注两个指标即可
memSeries.chunkRange and const samplesPerChunk = 120
-
chunkRange
:type memSeries struct { ... chunkRange int64 ... }
它的作用可以看下该源码:
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { | -> s.nextAt = rangeForTimestamp(mint, s.chunkRange) | -> func rangeForTimestamp(t int64, width int64) (maxt int64) { return (t/width)*width + width} }
这个chunkRange的大小决定着 nextAt的值(default. 2H),这个值决定了新的headChunk什么时候被创建,也就是curChunk的什么时候full。
-
firstChunkID
: 因为每个memSeries中会有属于这个时间序列chunk的映射表mmappedChunks []*mmappedChunk
,这个主要是是为了找到对应chunk(一般是on-disk chunk)的metadatatype memSeries struct { ... mmappedChunks []*mmappedChunk |-> type mmappedChunk struct { ... } ... firstChunkID int ... }
实际应用:ix 表示chunk在mmappedChunks的索引,chunkID 是从1开始逐渐增长的。Hence,(id- s.firstChunkID)就是这个chunk在 mmappendChunks的索引。
func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { ix := id - s.firstChunkID // chunk在mmappendChunks的索引 ... }
-
nextAt
:下一个headChunk的开始时间。 可以看下面这段源码, 设置一个最低的限制,下一个chunk必须创建的时间,其实是受const samplesPerChunk = 120(当sample 达到120个的时候回cutHeadChunk)
和chunkRange(default:2h)
影响的。
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
...
s.nextAt = rangeForTimestamp(mint, s.chunkRange)
| -> func rangeForTimestamp(t int64, width int64) (maxt int64) {
return (t/width)*width + width
}
...
}
-
sampleBuf
:type memSeries struct { ... sampleBuf [4]sample ... }
通过源码查看 其实就是headChunk 保存最新的4个sample,对于有什么用处,目前看memSeries源码还没发现
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
...
s.sampleBuf[0] = s.sampleBuf[1]
s.sampleBuf[1] = s.sampleBuf[2]
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v}
...
}
-
pendingCommit
: OFF: Whether there are samples waiting to be committed to this series.这个字段在
func (a *headAppender) Commit()
这个方法中会涉及到,目前不展开讲,等下一个Head系列会提到。 -
app
: 其实是一个空接口,但是当初始化memSeries的时候,会实例化它。这个设计模式值得学习(其实一个工厂模型,但是在源码中比较难找。。。),// Appender adds sample pairs to a chunk. type Appender interface { Append(int64, float64) }
而它真正的实现是XOR算法模块,这个算法可以参考笔者另一篇文章:Gorilla Encoding. 在以后的Prometheus系列中,笔者会出一期关于XOR算法实现的分析锻炼下工程化落地能力。Anyway,memSeries如何使用这个接口的呢? 可以参考下面的源码片段
// XORChunk 结构体的这个方法返回一个xorAppender结构体
func (c *XORChunk) Appender() (Appender, error) {
...
a := &xorAppender{
...
}
...
return a, err
}
// xorAppender 这个结构体 有一个method Append 这样就具体实现了 Appender interface{} 这个接口
func (a *xorAppender) Append(t int64, v float64) {
...
}
// 初始化chunk的时候 chunk 为 XORChunk,并使用XORChunk中的Appender()方法 返回一个xorAppender 结
// 构体,这个结构体具体实现了Append方法,并赋值给 s.app。
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
...
s.headChunk = &memChunk{
chunk: chunkenc.NewXORChunk(), // 实例化 chunk为 XORChunk()
...
}
...
app, err := s.headChunk.chunk.Appender() // 实例化
if err != nil {
panic(err)
}
s.app = app // 实例化 s.app
return s.headChunk
}
所以它才会有如下这个操作:
// s.app.Append(t,v)
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
...
s.app.Append(t, v)
...
}
-
memChunkPool
: 由于golang内建的GC机制会影响应用的性能,为了减少GC,golang提供了对象重用的机制,也就是sync.Pool对象池。 sync.Pool是可伸缩的,并发安全的。其大小仅受限于内存的大小,可以被看作是一个存放可重用对象的值的容器。 设计的目的是存放已经分配的但是暂时不用的对象,在需要用到的时候直接从pool中取。type memSeries struct { ... memChunkPool *sync.Pool ... }
-
txs
: 事务ID记录的一个结构体. 这个又是另一个模块(isolation)的知识体系,本文不细说。type memSeries struct { ... txs *txRing | —> type txRing struct { txIDs []uint64 txIDFirst int // Position of the first id in the ring. txIDCount int // How many ids in the ring. } ... }
MemSeries methods
当介绍memSeries结构体元素的时候,其实都穿插着讲了它的方法,所以这个版本会简单的介绍下几个方法的实现和作用。
-
func (s *memSeries) cutNewHeadChunk(...){...}
:当前的headChunk达到full chunk条件的时候,会使用该方法重新初始化一个新的headChunk,并将memSeries的headChunk指向该chunk。 -
func (s *memSeries) mmapCurrentHeadChunk(...){...}
: 将目前的headChunk 写入到磁盘中,同时会建立内存映射。内存映射主要是mmappedChunks []*mmappedChunk
进行内存中存储。 -
func (s *memSeries) chunk(id int, ...){...}
: 根据id,找到想查找的chunk在内存映射中的索引,从而找到该chunk。可以看下源码,这里面还是有一些trickfunc (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { ix := id - s.firstChunkID // ix是 该chunk在m-map中的索引. chunk的id 是从1开始逐渐增加的1,2,3,... if ix < 0 || ix > len(s.mmappedChunks) { return nil, false, storage.ErrNotFound } if ix == len(s.mmappedChunks) { // 查找的是active chunk 正在执行写入的headChunk if s.headChunk == nil { return nil, false, errors.New("invalid head chunk") } return s.headChunk, false, nil // 由于是active chunk 所以不能被GC 回收。 } chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) // 查找的chunk已经落盘,根据m-map的ref 在磁盘中查找对应的chunk 这个落盘的chunk是没有mint,maxt if err != nil { if _, ok := err.(*chunks.CorruptionErr); ok { panic(err) } return nil, false, err } mc := s.memChunkPool.Get().(*memChunk) // 这里是一个trick,从池子里拿一个memChunk内存空间 然后进行初始化,这个是并发安全的。 mc.chunk = chk mc.minTime = s.mmappedChunks[ix].minTime mc.maxTime = s.mmappedChunks[ix].maxTime return mc, true, nil // true: 表示该chunk使用后,可以被回收。 }
-
func (s *memSeries) truncateChunksBefore(mint int64) (removed int)
: 这个函数的作用是给一个时间,把这个时间点之前的chunk都从series中剔除掉。func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { // headchunk的最大时间小于给的这个时间段 说明 把这个series中所有的chunk都要清零。 if s.headChunk != nil && s.headChunk.maxTime < mint { removed = 1 + len(s.mmappedChunks) s.firstChunkID += removed s.headChunk = nil // 清空 s.mmappedChunks = nil // 清空 return removed } // 判断每一个落盘的chunk中的时间 与 截断时间对比 if len(s.mmappedChunks) > 0 { for i, c := range s.mmappedChunks { // 这个c其实不是chunk 只是磁盘某个chunk中在内存中的元数据 if c.maxTime >= mint { break } removed = i + 1 } s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[removed:]...) s.firstChunkID += removed } return removed }
-
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (...)
:func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { // 这个就是定义了 一个full chunk的一个条件,当有120个samples 就证明full chunk,如果设置的采集时间是每15s采集一次,那么一个full chunk(每一个时间序列的)需要15s * 120 = 30min const samplesPerChunk = 120 c := s.head() if c == nil { if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { // 该headChunk的最大时间都大于插入的时间t了,所以这个sample无法被append return false, false } // len(s.mmappedChunks) = 0 and headChunk == nil 那么就创建一个新的HeadChunk 起始时 //间是t // c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } numSamples := c.chunk.NumSamples() if c.maxTime >= t { return false, chunkCreated } if numSamples == samplesPerChunk/4 { s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) // 推断并设置下一个headChunk创建时间。这个函数有一个分母有一个+1操作,是为了防止分母为0。 } if t >= s.nextAt { c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } s.app.Append(t, v) c.maxTime = t s.sampleBuf[0] = s.sampleBuf[1] s.sampleBuf[1] = s.sampleBuf[2] s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} if appendID > 0 { // 是否需要被隔离? 如果是appendID == 0 则不需要。 主要是在查询和插入的时候。 s.txs.add(appendID) } return true, chunkCreated }
-
func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator{...}
func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { c, garbageCollect, err := s.chunk(id, chunkDiskMapper) // 根据id 从 memseries中找到对应的c -> memchunk // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a // series's chunk, which got then garbage collected before it got // accessed. We must ensure to not garbage collect as long as any // readers still hold a reference. if err != nil { return chunkenc.NewNopIterator() } defer func() { if garbageCollect { // Set this to nil so that Go GC can collect it after it has been used. // This should be done always at the end. c.chunk = nil s.memChunkPool.Put(c) } }() ix := id - s.firstChunkID // ix: c 在 memseries中的索引值 numSamples := c.chunk.NumSamples() // c 有多少个numSamples stopAfter := numSamples if isoState != nil { totalSamples := 0 // Total samples in this series. previousSamples := 0 // Samples before this chunk. for j, d := range s.mmappedChunks { totalSamples += int(d.numSamples) if j < ix { previousSamples += int(d.numSamples) } } if s.headChunk != nil { totalSamples += s.headChunk.chunk.NumSamples() } // Removing the extra transactionIDs that are relevant for samples that // come after this chunk, from the total transactionIDs. appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples)) // 这个chunk 和它之前的所有 samples数量 // Iterate over the appendIDs, find the first one that the isolation state says not // to return. it := s.txs.iterator() for index := 0; index < appendIDsToConsider; index++ { appendID := it.At() // 没有初始化txs 所以就是postion=0 第一个位置的appendID if appendID <= isoState.maxAppendID { // Easy check first. if _, ok := isoState.incompleteAppends[appendID]; !ok { // 没有检测到这个某个sample的完成操作 it.Next() continue } } // Eq. index - previousSamples, 当小于0说明 index // 还没到目前chunk的上一个chunk或者到了上一个chunk但是没遍历完呢。 大于0 说明 index目前 //curChunk上 stopAfter = numSamples - (appendIDsToConsider - index) if stopAfter < 0 { // index还没遍历到 curChunk上 stopAfter = 0 // Stopped in a previous chunk. } break } } if stopAfter == 0 { return chunkenc.NewNopIterator() //NewNopIterator returns a new chunk iterator that does not hold any data. 目前index指针到了previous chunk末尾 或者curChunk开头,也就是stopAfter的pos所在,start = end ? 这样肯定返回一个没有数据的迭代器 } // 以下代码是根据 传入的参数 it chunkenc.Iterator 来选择返回哪种类型的Iterator if id-s.firstChunkID < len(s.mmappedChunks) { if stopAfter == numSamples { return c.chunk.Iterator(it) // index 到了curChunk的末尾,其中it 传入的object可以将Iterator实例化, } // 不同种类的Iterator if msIter, ok := it.(*stopIterator); ok { msIter.Iterator = c.chunk.Iterator(msIter.Iterator) msIter.i = -1 msIter.stopAfter = stopAfter return msIter } return &stopIterator{ Iterator: c.chunk.Iterator(it), i: -1, stopAfter: stopAfter, } } // Serve the last 4 samples for the last chunk from the sample buffer // as their compressed bytes may be mutated by added samples. if msIter, ok := it.(*memSafeIterator); ok { msIter.Iterator = c.chunk.Iterator(msIter.Iterator) msIter.i = -1 msIter.total = numSamples msIter.stopAfter = stopAfter msIter.buf = s.sampleBuf return msIter } return &memSafeIterator{ stopIterator: stopIterator{ Iterator: c.chunk.Iterator(it), i: -1, stopAfter: stopAfter, }, total: numSamples, buf: s.sampleBuf, } }
上述Iterator的关系
// Iterator interface
type Chunk interface {
...
Iterator(Iterator) Iterator
| -> type Iterator interface {Next() bool,Seek(t int64) bool,
At() (int64, float64), Err() error}
}
type stopIterator struct {
chunkenc.Iterator
i, stopAfter int
}
type memSafeIterator struct {
stopIterator
total int
buf [4]sample}
}
Conclusion
通过阅读源码,笔者不仅学习到了这些大神的代码风格,体会了prometheus TSDB的设计思想(这种设计风格建议总结下来,以后可以尝试造轮子),其中笔者更是对memSeries模块在脑海里有自己的认识和理解。后续笔者将会不断解剖Prometheus源码,大家可以关注下。
{{m.name}}
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/127752.html