作者:张富春(ahfuzhang),转载时请注明作者和引用链接,谢谢!
- cnblogs博客
- zhihu
- Github
- 公众号:一本正经的瞎扯
以最经典的计算 qps 的曲线为例,vmselect 内部是如何计算的?
1 grafana 通过 query_range 接口发起请求
通常会在 grafana 中配置一个 line chart,然后使用以下的 promql 表达式来计算每分钟的请求量:- sum by (path) (increase(http_request_total{job="myApp"}[1m]))
复制代码 grafana 会向所配置数据源的 vmselect 发送类似的请求:- POST /select/0/prometheus/api/v1/query_range HTTP/1.1
- Host: xxx
- Content-Type: application/x-www-form-urlencoded
- start=${开始时间}&end=${结束时间}&step=15s&query=sum by (path) (increase(http_request_total{job="myApp"}[1m]))
复制代码
- queuy_range 的 API 格式请看:https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
2. vmselect 中的处理流程
2.1 函数调用过程:
TL;DR
可以直接跳到下一节看源码分析
文件函数调用代码说明app/vmselect/main.gofunc main()Main 函数go httpserver.Serve(listenAddrs, requestHandler启动http 服务func requestHandlerhttp的callback 函数return selectHandler(qt, startTime, w, r, p, at)执行 http://vmselect:8481/select/ 这个路径func selectHandler查询的处理函数prometheus.QueryRangeHandler(qt, startTime, at, w, r)promql 查询的处理函数
/query_range 这条API 的处理代码app/vmselect/prometheus/
prometheus.gofunc QueryRangeHandlerqueryRangeHandler(qt, startTime, at, w, query从http协议中取出参数,执行范围查询func queryRangeHandlerresult, err := promql.Exec(qt, ec, query, false)组织好 promql.EvalConfig 对象app/vmselect/promql/exec.gofunc Exec执行查询表达式的函数e, err := parsePromQLWithCache(q)解析查询表达式qid := activeQueriesV.Add(ec, q)记录当前正在查询哪个表达式rv, err := evalExpr(qt, ec, e)执行解析后的表达式, metricsql.Expr对象app/vmselect/promql/eval.gofunc evalExprevalExpr会根据 promql的结构嵌套执行,直到叶子节点。rv, err := evalExprInternal(qt, ec, e)func evalExprInternal逐个种类判断,一共八个种类。 是哪种表达式,就执行对应的分支rv, err := evalAggrFunc(qtChild, ec, ae)执行聚合表达式。(选择最常见的一种表达式来分析)func evalAggrFunccallbacks := getIncrementalAggrFuncCallbacks(ae.Name)根据表达式中的聚合函数名,找到对应的执行代码。 例如:函数 sum() 对应着一个 golang 的 funcfe, nrf := tryGetArgRollupFuncWithMetricExpr(ae)如果 sum() 里面还有类似 increase() 这样的 rollup 函数,则执行这一步args, re, err := evalRollupFuncArgs(qt, ec, fe)先执行 rollup() 函数里面的表达式rf, err := nrf(args)得到表达式的结果后,再执行 rollup() 函数func evalRollupFuncArgsts, err := evalExpr(qt, ec, arg)嵌套执行表达式,又回到函数 func evalExpr内层一般都是 metrics 表达式这里开始展示执行到了叶子节点的情况。rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)func evalRollupFuncreturn evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc)func evalRollupFuncWithoutAtrvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window)func evalRollupFuncWithMetricExprtss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries)func evalRollupFuncNoCachetfss := searchutil.ToTagFilterss(me.LabelFilterss)把 metrics 相关的表达式,变成标签过滤的对象sq = storage.NewMultiTenantSearchQuery(ts, minTimestamp, ec.End, tfss, ec.MaxSeries)把 [][]storage.TagFilter 构造成 SearchQuery 对象rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline)把请求发到 storage 节点,得到了 Results 对象evalRollupNoIncrementalAggregate(qt, funcName, keepMetricNames, rss, rcs, preFunc, sharedTimestamps)func evalRollupNoIncrementalAggregaterss.RunParallel(qt, ...当 vmstorage 返回 metricBlock 数据块后,开始并行执行,做各种聚合运算。app/vmselect/netstorage/
netstorage.gofunc (rss *Results) RunParallelrowsProcessedTotal, err := rss.runParallel(qt, f)在与核数相匹配的协程中并行执行func (rss *Results) runParallelerr = tsw.do(&tmpResult.rs, 0)每个 time series的数据上调用 do 方法func (tsw *timeseriesWork) doerr := tsw.pts.Unpack(r, rss.tbfs, rss.tr)把 metricBlock 数据进行反序列化,变成与 data point 数量相等的 []timestamp 和 []valuesfunc (pts *packedTimeseries) UnpackdedupInterval := storage.GetDedupInterval()查询时,拉取全局的 dedup 间隔配置mergeSortBlocks(dst, sbh, dedupInterval)去重逻辑。去掉重复的 timestamp2.2 查询过程概述
- 服务启动流程
- main() 中启动了http服务
- 提供 callback 函数来对应到 /select/0/prometheus/api/v1/xxx 下面的查询
- 最终请求触发时,走到 QueryRangeHandler 中进行处理
- 查询过程
- 通过 Exec() 函数来处理查询
- 使用 parsePromQL() 来解析 promql 表达式,把表达式变成 8 中基本语句的嵌套。8种语句包含:
- metricsql.MetricExpr: metric 的过滤表达式,主要是 tag 层面的过滤
- metricsql.RollupExpr: 可以理解为 increase(), rate() 这样的区间聚合函数
- metricsql.FuncExpr: 执行 metricsQL 内部提供的函数,例如 label_replace() 等
- metricsql.AggrFuncExpr: 执行聚合函数,例如 max, sum, avg 等
- metricsql.BinaryOpExpr: 执行布尔表达式的运算,主要有:and / or / unless
- metricsql.NumberExpr: 数值常量表达式
- metricsql.StringExpr: 把字符串看成一个独立的时间序列
- metricsql.DurationExpr: 产生新的 timestamp 的序列
- 根据 promQL 表达式的结构,在不同的部分嵌套执行 evalExpr(), 直到执行完成整个表达式
- 调用存储后端:
- 当 promql 的表达式是 MetricExpr 时,通过 vm-select 与 vm-storage 之间的二进制 rpc 协议来通讯
- vmselect 向所有的 vmstorage 广播
- 发送到 vmstorage 上的请求都是要求返回 metricName + timestamp + value 这样形式的请求
- vmstorage 根据 tag 的过滤表达式,先在索引中找到符合条件的 tsid
- 再根据 tsid 在数据去寻找每个 tsid 对应的 block,然后直接返回未解码的 block 数据
- vmselect 会收到来自 vmstorage 的 MetricBlock 结构
- 最终收到的数据类似这样的结构: map[metricName] -> block list
- 数据反序列化及其去重
- 当收到多个 vmstorage 节点返回的数据后,创建 N 个协程(N一般与CPU核数相等)来做反序列化、dedup 和 表达式计算
- 每条 metricName 下的 block list 会逐个调用 unpack 来变成 []timestamp 和 []values 的数组
- 所有的 block list 会建立成一个堆,按照 timestamp 来排序
- 以 dedupInterval 为时间窗口,逐个检查 timestamp。在 dedupInterval 时间窗口内的多条数据,只取一条
- 表达式计算
- 重新回到 evalExpr() 函数,做 sum 等表达式的计算
- 全部计算完成后返回 json 数据给 grafana
3. 总结
- vmselect / vmstorage 是存算分离的架构
- vmstorage 内部组织了索引和数据两部分
- vmstorage 通过 tag 来在索引中过滤,找到后返回整个 block 数据
- vmselect 把查询广播到所有的 vmstorage,得到 metricBlock 后进行 promql 表达式的计算
- promQL 被解析为一颗树,通过递归执行。
最后,为了便于分析 VictoriaMetrics 系列的源码,我又专门建立了一个源码仓库来存放增加了注释的源码:
https://github.com/ahfuzhang/code_comments
Have Fun.
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |