博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm-源码分析-Stats (backtype.storm.stats)
阅读量:7252 次
发布时间:2019-06-29

本文共 6801 字,大约阅读时间需要 22 分钟。

会发现, 现在storm里面有两套metrics系统, metrics framework和stats framework

并且在所有地方都是同时注册两套, 貌似准备用metrics来替代stats, 但当前版本UI仍然使用stats

 

这个模块统计的数据怎么被使用,

1. 在worker中, 会定期调用do-executor-heartbeats去往zk同步hb 

可以看到, stats也会作为hb的一部分被同步到zk上

(defnk do-executor-heartbeats [worker :executors nil]  ;; stats is how we know what executors are assigned to this worker   (let [stats (if-not executors                  (into {} (map (fn [e] {e nil}) (:executors worker)))                  (->> executors                    (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))                    (apply merge)))        zk-hb {:storm-id (:storm-id worker)               :
executor-stats stats
:uptime ((:uptime worker))               :time-secs (current-time-secs)               }]    ;; do the zookeeper heartbeat    (.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)        ))

2. 现在任何人都可以通过nimbus的thrift接口来得到相关信息 

(^TopologyInfo getTopologyInfo [this ^String storm-id]   beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))   stats (:stats heartbeat))

3. 最直接的用户就是storm UI, 在准备topology page的时候, 就会调用getTopologyInfo来获取数据

(defn topology-page [id window include-sys?]  (with-nimbus nimbus    (let [summ (.getTopologyInfo ^Nimbus$Client nimbus id)])

 

Stats

这个模块用于spout和bolt来抽样统计数据, 需要统计的具体metics如下

(def COMMON-FIELDS [:emitted :transferred])(defrecord CommonStats [emitted transferred rate])(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies]);;acked and failed count individual tuples(defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])(def SPOUT-FIELDS [:acked :failed :complete-latencies]);;acked and failed count tuple completion(defrecord SpoutExecutorStats [common acked failed complete-latencies])

 

抽样的比例在storm-conf, TOPOLOGY_STATS_SAMPLE_RATE, 配置

为什么统计时每次加rate, 而不是加1?

因为这里的统计是抽样的, 所以如果抽样比例是10%, 那么发现一个, 应该加1/(10%), 10个

(defn sampling-rate [conf]  (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)       (/ 1)       int))

 

然后统计是基于时间窗口的, 底下是对应默认的bucket和时间窗口的定义

(def NUM-STAT-BUCKETS 20) ;;bucket数;; 10 minutes, 3 hours, 1 day ;;定义3种时间窗口(def STAT-BUCKETS [30 540 4320]) ;;bucket大小分别是30,540,4320秒

 

核心数据结构是RollingWindowSet, 包含: 

统计数据需要的函数, updater extractor, 之所以治理也需要是因为需要统计all-time  
一组rolling windows, 默认是3个时间窗, 10 minutes, 3 hours, 1 day 
all-time, 在完整的时间区间上的统计结果

(defrecord RollingWindowSet [updater extractor windows all-time])(defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes]  (RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil)  )

 

继续看看rolling window的定义, 

核心数据, buckets, hashmap, {streamid, data}, 初始化为{} 
统计data需要的函数, updater merger extractor 
时间窗口, buckets大小和buckets个数

(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets])(defn rolling-window [updater merger extractor bucket-size-secs num-buckets]  (RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))

 

1. mk-stats

在mk-executedata的时候需要创建stats

mk-executor-stats <> (sampling-rate storm-conf)

 

;; TODO: refactor this to be part of an executor-specific map(defmethod mk-executor-stats :spout [_ rate]  (stats/mk-spout-stats rate))(defmethod mk-executor-stats :bolt [_ rate]  (stats/mk-bolt-stats rate))

第一个参数忽略, 其实就是分别调用stats/mk-spout-stats或stats/mk-bolt-stats, 可见就是对于每个需要统计的数据, 创建一个rolling-windows-set

(defn- mk-common-stats [rate]  (CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                rate                ))(defn mk-bolt-stats [rate]  (BoltExecutorStats. (mk-common-stats rate)                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                  (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                  (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                  ))(defn mk-spout-stats [rate]  (SpoutExecutorStats. (mk-common-stats rate)                   (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                   (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                   (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))                   ))

 

2. 数据更新

(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms]  (update-executor-stat! stats :acked stream (stats-rate stats))  (update-executor-stat! stats :complete-latencies stream latency-ms)  )
(defmacro update-executor-stat! [stats path & args]  (let [path (collectify path)]    `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)    ))

就以update-executor-stat! stats :acked stream (stats-rate stats)为例子看看怎么做的?

SpoutExecutorStats取出用于记录spout acked情况的rolling-windows-set 

然后使用update-rolling-window-set来swap这个atom

来看看记录acked的rolling-windows-set是如何定义的?

keyed-counter-rolling-window-set, 预定义了updater merger extractor 

updater, incr-val [amap key amt], 把给定的值amt加到amap的对应的key的value上 
merger, (partial merge-with +), 用+作为map merge的逻辑, 即出现相同key则相加 
extractor, counter-extract, (if v v {}), 有则返回, 无则返回{} 
windows, rolling-window的list 
all-time, 初始化为nil

(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes]  (apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))

 

好, 下面就看看, 当spout-acked-tuple!时更新:acked时, 如何update的?

首先更新每个rolling-window, 并把更新过的rolling-window-set更新到:windows 

并且更新:all-time, (apply (:updater rws) (:all-time rws) args) 
updated, incr-val [amap key amt] 
args, steamid, rate 
all-time, 是用来记录整个时间区间上的, 某个stream的统计情况

(defn update-rolling-window-set  ([^RollingWindowSet rws & args]     (let [now (current-time-secs)           new-windows (dofor [w (:windows rws)]                         (apply update-rolling-window w now args))]       (assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args))       )))

看下如何更新某个rolling-windw 

根据now算出当前属于哪个bucket, time-bucket 
取出buckets, 并使用:updater更新相应的bucket, 这里的操作仍然是把rate叠加到streamid的value上

(defn update-rolling-window  ([^RollingWindow rw time-secs & args]     ;; this is 2.5x faster than using update-in...     (let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))           buckets (:buckets rw)           curr (get buckets time-bucket)                      curr (apply (:updater rw) curr args)           ]       (assoc rw :buckets (assoc buckets time-bucket curr))       )))
 
本文章摘自博客园,原文发布日期:2013-07-29

转载地址:http://qxebm.baihongyu.com/

你可能感兴趣的文章
perl的匿名引用
查看>>
git 命令总结
查看>>
eclipse 如何运行mavenWeb项目
查看>>
小程序的tab标签实现效果
查看>>
javascript中使用new与不使用实例化对象的区别
查看>>
Excel lastindex of a substring
查看>>
【转】JAVA反射与注解
查看>>
【Python 数据分析】pandas模块
查看>>
微信小程序--兼容
查看>>
【php+uploadify3.2】上传按钮点击一点反应都没有,原因
查看>>
react 使用 moment 进行 日期格式化
查看>>
wamp设置实现本机IP或者局域网访问
查看>>
SDOI2018:荣誉称号
查看>>
WPF中监视DependencyProperty的变化
查看>>
区块链原理基础
查看>>
jdbc操作根据bean类自动组装sql,天啦,我感觉我实现了hibernate
查看>>
PHP实现执行定时任务的几种思路详解
查看>>
几种机器学习框架的对比和选择
查看>>
graphql-yoga interface && union 使用
查看>>
32.QT-制作最强电压电阻表盘,可以自定义阴影效果,渐变颜色,图标,文字标签等-附带demo程序...
查看>>