Skip to content

Commit c6cbcfc

Browse files
author
Jason Jackson
committed
Added SystemBolt.
SystemBolt has one executor per worker always. It's perfect for exporting worker related metrics data.
1 parent cbd217c commit c6cbcfc

File tree

10 files changed

+291
-82
lines changed

10 files changed

+291
-82
lines changed

bin/build_release.sh

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ DIR=_release/storm-$RELEASE
1414

1515
rm -rf _release
1616
rm -f *.zip
17-
$LEIN with-profile release clean
18-
$LEIN with-profile release deps
19-
$LEIN with-profile release jar
20-
$LEIN with-profile release pom
21-
mvn dependency:copy-dependencies
17+
$LEIN with-profile release clean || exit 1
18+
$LEIN with-profile release deps || exit 1
19+
$LEIN with-profile release jar || exit 1
20+
$LEIN with-profile release pom || exit 1
21+
mvn dependency:copy-dependencies || exit 1
2222

2323
mkdir -p $DIR/lib
2424
cp target/storm-*.jar $DIR/storm-${RELEASE}.jar

src/clj/backtype/storm/daemon/common.clj

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
(:import [backtype.storm.utils Utils])
66
(:import [backtype.storm.task WorkerTopologyContext])
77
(:import [backtype.storm Constants])
8-
(:import [backtype.storm.spout NoOpSpout])
8+
(:import [backtype.storm.metric SystemBolt])
99
(:require [clojure.set :as set])
1010
(:require [backtype.storm.daemon.acker :as acker])
1111
(:require [backtype.storm.thrift :as thrift])
@@ -241,8 +241,9 @@
241241
(number-duplicates)
242242
(map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
243243

244-
(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
245-
(let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
244+
(defn metrics-consumer-bolt-specs [storm-conf topology]
245+
(let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
246+
inputs (->> (for [comp-id component-ids-that-emit-metrics]
246247
{[comp-id METRICS-STREAM-ID] :shuffle})
247248
(into {}))
248249

@@ -261,27 +262,28 @@
261262
(metrics-consumer-register-ids storm-conf)
262263
(get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
263264

264-
(defn add-metric-components! [storm-conf ^StormTopology topology]
265-
(doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)]
265+
(defn add-metric-components! [storm-conf ^StormTopology topology]
266+
(doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
266267
(.put_to_bolts topology comp-id bolt-spec)))
267268

268-
(defn add-system-components! [^StormTopology topology]
269-
(let [system-spout (thrift/mk-spout-spec*
270-
(NoOpSpout.)
271-
{SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
272-
METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
273-
:p 0
274-
:conf {TOPOLOGY-TASKS 0})]
275-
(.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)))
269+
(defn add-system-components! [conf ^StormTopology topology]
270+
(let [system-bolt-spec (thrift/mk-bolt-spec*
271+
{}
272+
(SystemBolt.)
273+
{SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
274+
METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
275+
:p 0
276+
:conf {TOPOLOGY-TASKS 0})]
277+
(.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
276278

277279
(defn system-topology! [storm-conf ^StormTopology topology]
278280
(validate-basic! topology)
279281
(let [ret (.deepCopy topology)]
280282
(add-acker! storm-conf ret)
281-
(add-metric-components! storm-conf ret)
282-
(add-metric-streams! ret)
283+
(add-metric-components! storm-conf ret)
284+
(add-system-components! storm-conf ret)
285+
(add-metric-streams! ret)
283286
(add-system-streams! ret)
284-
(add-system-components! ret)
285287
(validate-structure! ret)
286288
ret
287289
))

src/clj/backtype/storm/daemon/executor.clj

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@
111111
(defn executor-type [^WorkerTopologyContext context component-id]
112112
(let [topology (.getRawTopology context)
113113
spouts (.get_spouts topology)
114-
bolts (.get_bolts topology)
115-
]
114+
bolts (.get_bolts topology)]
116115
(cond (contains? spouts component-id) :spout
117116
(contains? bolts component-id) :bolt
118117
:else (throw-runtime "Could not find " component-id " in topology " topology))))
@@ -182,7 +181,7 @@
182181
(this task tuple nil)
183182
)))
184183

185-
(defn executor-data [worker executor-id]
184+
(defn mk-executor-data [worker executor-id]
186185
(let [worker-context (worker-context worker)
187186
task-ids (executor-id->tasks executor-id)
188187
component-id (.getComponentId worker-context (first task-ids))
@@ -253,7 +252,7 @@
253252
(fn []
254253
(disruptor/publish
255254
receive-queue
256-
[[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
255+
[[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))
257256

258257
(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
259258
(let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
@@ -293,11 +292,11 @@
293292
(fn []
294293
(disruptor/publish
295294
receive-queue
296-
[[nil (TupleImpl. context [tick-time-secs] -1 Constants/SYSTEM_TICK_STREAM_ID)]]
295+
[[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
297296
)))))))
298297

299298
(defn mk-executor [worker executor-id]
300-
(let [executor-data (executor-data worker executor-id)
299+
(let [executor-data (mk-executor-data worker executor-id)
301300
_ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
302301
task-datas (->> executor-data
303302
:task-ids

src/clj/backtype/storm/daemon/supervisor.clj

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@
8585
(let [local-assignment (assigned-executors (:port worker-heartbeat))]
8686
(and local-assignment
8787
(= (:storm-id worker-heartbeat) (:storm-id local-assignment))
88-
(= (set (:executors worker-heartbeat)) (set (:executors local-assignment))))
89-
))
88+
(= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
89+
(set (:executors local-assignment))))))
9090

9191
(defn read-allocated-workers
9292
"Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
@@ -117,7 +117,12 @@
117117
(defn- wait-for-worker-launch [conf id start-time]
118118
(let [state (worker-state conf id)]
119119
(loop []
120-
(let [hb (.get state LS-WORKER-HEARTBEAT)]
120+
(let [hb (try (.get state LS-WORKER-HEARTBEAT)
121+
(catch java.io.FileNotFoundException e
122+
;; This solves race condition in unit tests if you try to shutdown
123+
;; a worker which cleans up worker state while you also try to wait
124+
;; for worker to launch by reading the same state.
125+
nil))]
121126
(when (and
122127
(not hb)
123128
(<

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@
99

1010
(defmulti mk-suicide-fn cluster-mode)
1111

12-
(defn read-worker-executors [storm-cluster-state storm-id assignment-id port]
12+
(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port]
1313
(let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
1414
(doall
15+
(concat
16+
[Constants/SYSTEM_EXECUTOR_ID]
1517
(mapcat (fn [[executor loc]]
16-
(if (= loc [assignment-id port])
17-
[executor]
18-
))
19-
assignment))
20-
))
18+
(if (= loc [assignment-id port])
19+
[executor]
20+
))
21+
assignment)))))
2122

2223
(defnk do-executor-heartbeats [worker :executors nil]
2324
;; stats is how we know what executors are assigned to this worker
@@ -144,7 +145,7 @@
144145
(let [cluster-state (cluster/mk-distributed-cluster-state conf)
145146
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
146147
storm-conf (read-supervisor-storm-conf conf storm-id)
147-
executors (set (read-worker-executors storm-cluster-state storm-id assignment-id port))
148+
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
148149
transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
149150
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
150151
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)

src/jvm/backtype/storm/Constants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package backtype.storm;
22

33
import backtype.storm.coordination.CoordinatedBolt;
4+
import clojure.lang.RT;
45

56

67
public class Constants {
78
public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
89

10+
public static final long SYSTEM_TASK_ID = -1;
11+
public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
912
public static final String SYSTEM_COMPONENT_ID = "__system";
1013
public static final String SYSTEM_TICK_STREAM_ID = "__tick";
1114
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
package backtype.storm.metric;
2+
3+
import backtype.storm.Config;
4+
import backtype.storm.metric.api.AssignableMetric;
5+
import backtype.storm.metric.api.IMetric;
6+
import backtype.storm.task.IBolt;
7+
import backtype.storm.task.OutputCollector;
8+
import backtype.storm.task.TopologyContext;
9+
import backtype.storm.tuple.Tuple;
10+
import clojure.lang.AFn;
11+
import clojure.lang.IFn;
12+
import clojure.lang.RT;
13+
import com.google.common.collect.ImmutableMap;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
import java.lang.management.*;
18+
import java.util.List;
19+
import java.util.Map;
20+
21+
22+
// There is one task inside one executor for each worker of the topology.
23+
// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
24+
// This bolt was conceived to export worker stats via metrics api.
25+
public class SystemBolt implements IBolt {
26+
private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
27+
private static boolean _prepareWasCalled = false;
28+
29+
private static class MemoryUsageMetric implements IMetric {
30+
IFn _getUsage;
31+
public MemoryUsageMetric(IFn getUsage) {
32+
_getUsage = getUsage;
33+
}
34+
@Override
35+
public Object getValueAndReset() {
36+
MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke();
37+
return ImmutableMap.builder()
38+
.put("maxBytes", memUsage.getMax())
39+
.put("committedBytes", memUsage.getCommitted())
40+
.put("initBytes", memUsage.getInit())
41+
.put("usedBytes", memUsage.getUsed())
42+
.put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed())
43+
.put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed())
44+
.build();
45+
}
46+
}
47+
48+
// canonically the metrics data exported is time bucketed when doing counts.
49+
// convert the absolute values here into time buckets.
50+
private static class GarbageCollectorMetric implements IMetric {
51+
GarbageCollectorMXBean _gcBean;
52+
Long _collectionCount;
53+
Long _collectionTime;
54+
public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
55+
_gcBean = gcBean;
56+
}
57+
@Override
58+
public Object getValueAndReset() {
59+
Long collectionCountP = _gcBean.getCollectionCount();
60+
Long collectionTimeP = _gcBean.getCollectionCount();
61+
62+
Map ret = null;
63+
if(_collectionCount!=null && _collectionTime!=null) {
64+
ret = ImmutableMap.builder()
65+
.put("count", collectionCountP - _collectionCount)
66+
.put("timeMs", collectionTimeP - _collectionTime)
67+
.build();
68+
}
69+
70+
_collectionCount = collectionCountP;
71+
_collectionTime = collectionTimeP;
72+
return ret;
73+
}
74+
}
75+
76+
@Override
77+
public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) {
78+
if(_prepareWasCalled && stormConf.get(Config.STORM_CLUSTER_MODE) != "local") {
79+
throw new RuntimeException("A single worker should have 1 SystemBolt instance.");
80+
}
81+
_prepareWasCalled = true;
82+
83+
int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
84+
85+
final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();
86+
87+
context.registerMetric("uptimeSecs", new IMetric() {
88+
@Override
89+
public Object getValueAndReset() {
90+
return jvmRT.getUptime()/1000.0;
91+
}
92+
}, bucketSize);
93+
94+
// You can calculate topology percent uptime between T_0 to T_1 using this metric data:
95+
// let s = sum topologyPartialUptimeSecs for each worker for each time buckets between T_0 and T_1
96+
// topology percent uptime = s/(T_1-T_0)
97+
// Even if the number of workers change over time the value is still correct because I divide by TOPOLOGY_WORKERS.
98+
context.registerMetric("topologyPartialUptimeSecs", new IMetric() {
99+
private long _prevUptime = jvmRT.getUptime();
100+
private final double NUM_WORKERS = RT.doubleCast(stormConf.get(Config.TOPOLOGY_WORKERS));
101+
@Override
102+
public Object getValueAndReset() {
103+
long _nowUptime = jvmRT.getUptime();
104+
double ret = (_nowUptime - _prevUptime)/1000.0/NUM_WORKERS;
105+
_prevUptime = _nowUptime;
106+
return ret;
107+
}
108+
}, bucketSize);
109+
110+
context.registerMetric("startTimeSecs", new IMetric() {
111+
@Override
112+
public Object getValueAndReset() {
113+
return jvmRT.getStartTime()/1000.0;
114+
}
115+
}, bucketSize);
116+
117+
context.registerMetric("newWorkerEvent", new IMetric() {
118+
boolean doEvent = true;
119+
120+
@Override
121+
public Object getValueAndReset() {
122+
if (doEvent) {
123+
doEvent = false;
124+
return 1;
125+
} else return 0;
126+
}
127+
}, bucketSize);
128+
129+
// This is metric data global to topology, but we don't support this concept, so put it here.
130+
// It's very useful to have time series of TOPOLOGY_WORKERS to compare actual worker count.
131+
context.registerMetric("configTopologyWorkers", new IMetric() {
132+
@Override
133+
public Object getValueAndReset() {
134+
return stormConf.get(Config.TOPOLOGY_WORKERS);
135+
}
136+
}, bucketSize);
137+
138+
final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
139+
140+
context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
141+
public Object invoke() {
142+
return jvmMemRT.getHeapMemoryUsage();
143+
}
144+
}), bucketSize);
145+
context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() {
146+
public Object invoke() {
147+
return jvmMemRT.getNonHeapMemoryUsage();
148+
}
149+
}), bucketSize);
150+
151+
for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
152+
context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize);
153+
}
154+
}
155+
156+
@Override
157+
public void execute(Tuple input) {
158+
throw new RuntimeException("Non-system tuples should never be sent to __system bolt.");
159+
}
160+
161+
@Override
162+
public void cleanup() {
163+
}
164+
}

src/jvm/backtype/storm/spout/NoOpSpout.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

0 commit comments

Comments
 (0)