Skip to content

Commit fbe2047

Browse files
author
Nathan Marz
committed
Merge branch '0.8.0' into scheduler
2 parents fce5783 + 2708e40 commit fbe2047

File tree

13 files changed

+454
-306
lines changed

13 files changed

+454
-306
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* Only track errors on a component by component basis to reduce the amount stored in zookeeper (to speed up UI). A side effect of this change is the removal of the task page in the UI.
1919
* Add TOPOLOGY-TICK-TUPLE-FREQ-SECS config to have Storm automatically send "tick" tuples to a bolt's execute method coming from the __system component and __tick stream at the configured frequency. Meant to be used as a component-specific configuration.
2020
* Upgrade Kryo to v2.04
21+
* Tuple is now an interface and is much cleaner. The Clojure DSL helpers have been moved to TupleImpl
2122

2223
## 0.7.2 (unreleased but release candidate available)
2324

conf/defaults.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,6 @@ topology.executor.send.buffer.size: 16384 #individual messages
9191
topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets)
9292
topology.transfer.buffer.size: 32 # batched
9393
topology.tick.tuple.freq.secs: null
94+
topology.worker.shared.thread.pool.size: 4
9495

9596
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"

src/clj/backtype/storm/daemon/common.clj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,8 @@
263263
(worker-pids-root (:conf worker) (:worker-id worker))
264264
(:port worker)
265265
(:task-ids worker)
266+
(:default-shared-resources worker)
267+
(:user-shared-resources worker)
266268
))
267269

268270

src/clj/backtype/storm/daemon/executor.clj

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@
140140
(log-error error)
141141
(cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) error))
142142

143+
;; in its own function so that it can be mocked out by tracked topologies
144+
(defn mk-executor-transfer-fn [batch-transfer->worker]
145+
(fn [task tuple]
146+
(disruptor/publish batch-transfer->worker [task tuple])))
147+
143148
(defn executor-data [worker executor-id]
144149
(let [worker-context (worker-context worker)
145150
task-ids (executor-id->tasks executor-id)
@@ -160,10 +165,10 @@
160165
:receive-queue ((:executor-receive-queue-map worker) executor-id)
161166
:storm-id (:storm-id worker)
162167
:conf (:conf worker)
168+
:shared-executor-data (HashMap.)
163169
:storm-active-atom (:storm-active-atom worker)
164170
:batch-transfer-queue batch-transfer->worker
165-
:transfer-fn (fn [task tuple]
166-
(disruptor/publish batch-transfer->worker [task tuple]))
171+
:transfer-fn (mk-executor-transfer-fn batch-transfer->worker)
167172
:suicide-fn (:suicide-fn worker)
168173
:storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker))
169174
:type executor-type

src/clj/backtype/storm/daemon/supervisor.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@
466466
curr-id (if-let [id (.get state LS-ID)]
467467
id
468468
(generate-supervisor-id))]
469-
(.put state LS-ID curr-id)
469+
(.put state LS-ID curr-id)
470470
(reset! id-atom curr-id))
471471
)
472472
(confirmAssigned [this port]

src/clj/backtype/storm/daemon/task.clj

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
(bootstrap)
1212

13-
(defn mk-topology-context-builder [worker topology]
13+
(defn mk-topology-context-builder [worker executor-data topology]
1414
(let [conf (:conf worker)]
1515
#(TopologyContext.
1616
topology
@@ -25,17 +25,22 @@
2525
(int %)
2626
(:port worker)
2727
(:task-ids worker)
28+
(:default-shared-resources worker)
29+
(:user-shared-resources worker)
30+
(:shared-executor-data executor-data)
2831
)))
2932

30-
(defn system-topology-context [worker tid]
33+
(defn system-topology-context [worker executor-data tid]
3134
((mk-topology-context-builder
3235
worker
36+
executor-data
3337
(:system-topology worker))
3438
tid))
3539

36-
(defn user-topology-context [worker tid]
40+
(defn user-topology-context [worker executor-data tid]
3741
((mk-topology-context-builder
3842
worker
43+
executor-data
3944
(:topology worker))
4045
tid))
4146

@@ -138,8 +143,8 @@
138143
(recursive-map
139144
:executor-data executor-data
140145
:task-id task-id
141-
:system-context (system-topology-context (:worker executor-data) task-id)
142-
:user-context (user-topology-context (:worker executor-data) task-id)
146+
:system-context (system-topology-context (:worker executor-data) executor-data task-id)
147+
:user-context (user-topology-context (:worker executor-data) executor-data task-id)
143148
:tasks-fn (mk-tasks-fn <>)
144149
:object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))
145150
))

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
(:use [backtype.storm.daemon common])
33
(:use [backtype.storm bootstrap])
44
(:require [backtype.storm.daemon [executor :as executor]])
5+
(:import [java.util.concurrent Executors])
56
(:gen-class))
67

78
(bootstrap)
@@ -119,6 +120,19 @@
119120
(into {})
120121
(HashMap.)))
121122

123+
(defn- mk-default-resources [worker]
124+
(let [conf (:conf worker)
125+
thread-pool-size (int (conf TOPOLOGY-WORKER-SHARED-THREAD-POOL-SIZE))]
126+
{WorkerTopologyContext/SHARED_EXECUTOR (Executors/newFixedThreadPool thread-pool-size)}
127+
))
128+
129+
(defn- mk-user-resources [worker]
130+
;;TODO: need to invoke a hook provided by the topology, giving it a chance to create user resources.
131+
;; this would be part of the initialization hook
132+
;; need to separate workertopologycontext into WorkerContext and WorkerUserContext.
133+
;; actually just do it via interfaces. just need to make sure to hide setResource from tasks
134+
{})
135+
122136
(defn worker-data [conf mq-context storm-id supervisor-id port worker-id]
123137
(let [cluster-state (cluster/mk-distributed-cluster-state conf)
124138
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
@@ -170,6 +184,8 @@
170184
(HashMap.))
171185
:suicide-fn (mk-suicide-fn conf)
172186
:uptime (uptime-computer)
187+
:default-shared-resources (mk-default-resources <>)
188+
:user-shared-resources (mk-user-resources <>)
173189
:transfer-local-fn (mk-transfer-local-fn <>)
174190
:transfer-fn (mk-transfer-fn <>)
175191
)))
@@ -276,6 +292,12 @@
276292
(-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))
277293
:kill-fn (fn [t] (halt-process! 11))))
278294

295+
(defn- close-resources [worker]
296+
(let [dr (:default-shared-resources worker)]
297+
(log-message "Shutting down default resources")
298+
(.shutdownNow (get dr WorkerTopologyContext/SHARED_EXECUTOR))
299+
(log-message "Shut down default resources")))
300+
279301
;; TODO: should worker even take the storm-id as input? this should be
280302
;; deducable from cluster state (by searching through assignments)
281303
;; what about if there's inconsistency in assignments? -> but nimbus
@@ -334,6 +356,11 @@
334356
(.join transfer-thread)
335357
(log-message "Shut down transfer thread")
336358
(cancel-timer (:timer worker))
359+
360+
(close-resources worker)
361+
362+
;; TODO: here need to invoke the "shutdown" method of WorkerHook
363+
337364
(.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id supervisor-id port)
338365
(log-message "Disconnecting from storm cluster state context")
339366
(.disconnect (:storm-cluster-state worker))

src/clj/backtype/storm/testing.clj

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
(:require [backtype.storm [process-simulator :as psim]])
99
(:import [org.apache.commons.io FileUtils])
1010
(:import [java.io File])
11+
(:import [java.util HashMap])
1112
(:import [java.util.concurrent.atomic AtomicInteger])
1213
(:import [java.util.concurrent ConcurrentHashMap])
1314
(:import [backtype.storm.utils Time Utils RegisteredGlobalState])
@@ -207,7 +208,6 @@
207208
`(with-simulated-time
208209
(with-local-cluster ~@args)))
209210

210-
;; TODO: should take in a port symbol and find available port automatically
211211
(defmacro with-inprocess-zookeeper [port-sym & body]
212212
`(with-local-tmp [tmp#]
213213
(let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)]
@@ -511,14 +511,20 @@
511511
(fn [& args#]
512512
(NonRichBoltTracker. (apply old# args#) id#)
513513
))
514-
worker/mk-transfer-fn (let [old# worker/mk-transfer-fn]
515-
(fn [& args#]
516-
(let [transferrer# (apply old# args#)]
517-
(fn [ser# batch#]
518-
;; (log-message "Transferring: " transfer-args#)
519-
(increment-global! id# "transferred" (count batch#))
520-
(transferrer# ser# batch#)
521-
))))
514+
;; critical that this particular function is overridden here,
515+
;; since the transferred stat needs to be incremented at the moment
516+
;; of tuple emission (and not on a separate thread later) for
517+
;; topologies to be tracked correctly. This is because "transferred" *must*
518+
;; be incremented before "processing".
519+
executor/mk-executor-transfer-fn
520+
(let [old# executor/mk-executor-transfer-fn]
521+
(fn [& args#]
522+
(let [transferrer# (apply old# args#)]
523+
(fn [& args2#]
524+
;; (log-message "Transferring: " transfer-args#)
525+
(increment-global! id# "transferred" 1)
526+
(apply transferrer# args2#)
527+
))))
522528
]
523529
(with-local-cluster [~cluster-sym ~@cluster-args]
524530
(let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
@@ -558,6 +564,20 @@
558564
spout-spec (mk-spout-spec* (TestWordSpout.)
559565
{stream fields})
560566
topology (StormTopology. {component spout-spec} {} {})
561-
context (TopologyContext. topology (read-storm-config) {(int 1) component} {component [(int 1)]} {component {stream (Fields. fields)}} "test-storm-id" nil nil (int 1) nil [(int 1)])]
567+
context (TopologyContext.
568+
topology
569+
(read-storm-config)
570+
{(int 1) component}
571+
{component [(int 1)]}
572+
{component {stream (Fields. fields)}}
573+
"test-storm-id"
574+
nil
575+
nil
576+
(int 1)
577+
nil
578+
[(int 1)]
579+
{}
580+
{}
581+
(HashMap.))]
562582
(TupleImpl. context values 1 stream)
563583
))

src/jvm/backtype/storm/Config.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,12 @@ public class Config extends HashMap<String, Object> {
463463
*/
464464
public static String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";
465465

466+
/**
467+
* The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
468+
* via the TopologyContext.
469+
*/
470+
public static String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
471+
466472
/**
467473
* Name of the topology. This config is automatically set by Storm when the topology is submitted.
468474
*/

src/jvm/backtype/storm/task/TopologyContext.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@
77
import backtype.storm.state.ISubscribedState;
88
import backtype.storm.tuple.Fields;
99
import backtype.storm.utils.Utils;
10-
import java.io.File;
11-
import java.io.IOException;
1210
import java.util.ArrayList;
1311
import java.util.Collection;
1412
import java.util.Collections;
13+
import java.util.HashMap;
1514
import java.util.List;
1615
import java.util.Map;
1716
import java.util.Set;
@@ -27,17 +26,22 @@
2726
*/
2827
public class TopologyContext extends WorkerTopologyContext {
2928
private Integer _taskId;
30-
private Object _taskData = null;
29+
private Map<String, Object> _taskData = new HashMap<String, Object>();
3130
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
31+
private Map<String, Object> _executorData;
3232

3333

3434
public TopologyContext(StormTopology topology, Map stormConf,
3535
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
3636
Map<String, Map<String, Fields>> componentToStreamToFields,
3737
String stormId, String codeDir, String pidDir, Integer taskId,
38-
Integer workerPort, List<Integer> workerTasks) {
39-
super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks);
38+
Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
39+
Map<String, Object> userResources, Map<String, Object> executorData) {
40+
super(topology, stormConf, taskToComponent, componentToSortedTasks,
41+
componentToStreamToFields, stormId, codeDir, pidDir,
42+
workerPort, workerTasks, defaultResources, userResources);
4043
_taskId = taskId;
44+
_executorData = executorData;
4145
}
4246

4347
/**
@@ -162,13 +166,21 @@ public Map<String, Map<String, Grouping>> getThisTargets() {
162166
return getTargets(getThisComponentId());
163167
}
164168

165-
public void setTaskData(Object data) {
166-
_taskData = data;
169+
public void setTaskData(String name, Object data) {
170+
_taskData.put(name, data);
167171
}
168172

169-
public Object getTaskData() {
170-
return _taskData;
173+
public Object getTaskData(String name) {
174+
return _taskData.get(name);
171175
}
176+
177+
public void setExecutorData(String name, Object data) {
178+
_executorData.put(name, data);
179+
}
180+
181+
public Object getExecutorData(String name) {
182+
return _executorData.get(name);
183+
}
172184

173185
public void addTaskHook(ITaskHook hook) {
174186
hook.prepare(_stormConf, this);

src/jvm/backtype/storm/task/WorkerTopologyContext.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,36 @@
66
import java.io.IOException;
77
import java.util.List;
88
import java.util.Map;
9+
import java.util.concurrent.ExecutorService;
910

1011
public class WorkerTopologyContext extends GeneralTopologyContext {
12+
public static final String SHARED_EXECUTOR = "executor";
13+
1114
private Integer _workerPort;
1215
private List<Integer> _workerTasks;
1316
private String _codeDir;
1417
private String _pidDir;
18+
Map<String, Object> _userResources;
19+
Map<String, Object> _defaultResources;
1520

16-
public WorkerTopologyContext(StormTopology topology, Map stormConf,
17-
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
18-
Map<String, Map<String, Fields>> componentToStreamToFields, String stormId,
19-
String codeDir, String pidDir, Integer workerPort,
20-
List<Integer> workerTasks) {
21+
public WorkerTopologyContext(
22+
StormTopology topology,
23+
Map stormConf,
24+
Map<Integer, String> taskToComponent,
25+
Map<String, List<Integer>> componentToSortedTasks,
26+
Map<String, Map<String, Fields>> componentToStreamToFields,
27+
String stormId,
28+
String codeDir,
29+
String pidDir,
30+
Integer workerPort,
31+
List<Integer> workerTasks,
32+
Map<String, Object> defaultResources,
33+
Map<String, Object> userResources
34+
) {
2135
super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId);
2236
_codeDir = codeDir;
37+
_defaultResources = defaultResources;
38+
_userResources = userResources;
2339
try {
2440
if(pidDir!=null) {
2541
_pidDir = new File(pidDir).getCanonicalPath();
@@ -62,4 +78,12 @@ public String getCodeDir() {
6278
public String getPIDDir() {
6379
return _pidDir;
6480
}
81+
82+
public Object getResource(String name) {
83+
return _userResources.get(name);
84+
}
85+
86+
public ExecutorService getSharedExecutor() {
87+
return (ExecutorService) _defaultResources.get(SHARED_EXECUTOR);
88+
}
6589
}

0 commit comments

Comments
 (0)