Skip to content

Commit 9336699

Browse files
author
Nathan Marz
committed
added shared executor data between tasks, make task data have a map-like interface
1 parent 109093d commit 9336699

File tree

4 files changed

+23
-7
lines changed

4 files changed

+23
-7
lines changed

src/clj/backtype/storm/daemon/executor.clj

+1
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@
160160
:receive-queue ((:executor-receive-queue-map worker) executor-id)
161161
:storm-id (:storm-id worker)
162162
:conf (:conf worker)
163+
:shared-executor-data (HashMap.)
163164
:storm-active-atom (:storm-active-atom worker)
164165
:batch-transfer-queue batch-transfer->worker
165166
:transfer-fn (fn [task tuple]

src/clj/backtype/storm/daemon/task.clj

+8-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
(bootstrap)
1212

13-
(defn mk-topology-context-builder [worker topology]
13+
(defn mk-topology-context-builder [worker executor-data topology]
1414
(let [conf (:conf worker)]
1515
#(TopologyContext.
1616
topology
@@ -27,17 +27,20 @@
2727
(:task-ids worker)
2828
(:default-shared-resources worker)
2929
(:user-shared-resources worker)
30+
(:shared-executor-data executor-data)
3031
)))
3132

32-
(defn system-topology-context [worker tid]
33+
(defn system-topology-context [worker executor-data tid]
3334
((mk-topology-context-builder
3435
worker
36+
executor-data
3537
(:system-topology worker))
3638
tid))
3739

38-
(defn user-topology-context [worker tid]
40+
(defn user-topology-context [worker executor-data tid]
3941
((mk-topology-context-builder
4042
worker
43+
executor-data
4144
(:topology worker))
4245
tid))
4346

@@ -140,8 +143,8 @@
140143
(recursive-map
141144
:executor-data executor-data
142145
:task-id task-id
143-
:system-context (system-topology-context (:worker executor-data) task-id)
144-
:user-context (user-topology-context (:worker executor-data) task-id)
146+
:system-context (system-topology-context (:worker executor-data) executor-data task-id)
147+
:user-context (user-topology-context (:worker executor-data) executor-data task-id)
145148
:tasks-fn (mk-tasks-fn <>)
146149
:object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))
147150
))

src/clj/backtype/storm/testing.clj

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
(:require [backtype.storm [process-simulator :as psim]])
99
(:import [org.apache.commons.io FileUtils])
1010
(:import [java.io File])
11+
(:import [java.util HashMap])
1112
(:import [java.util.concurrent.atomic AtomicInteger])
1213
(:import [java.util.concurrent ConcurrentHashMap])
1314
(:import [backtype.storm.utils Time Utils RegisteredGlobalState])
@@ -560,6 +561,7 @@
560561
nil
561562
[(int 1)]
562563
{}
563-
{})]
564+
{}
565+
(HashMap.))]
564566
(TupleImpl. context values 1 stream)
565567
))

src/jvm/backtype/storm/task/TopologyContext.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,20 @@ public class TopologyContext extends WorkerTopologyContext {
2828
private Integer _taskId;
2929
private Map<String, Object> _taskData = new HashMap<String, Object>();
3030
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
31+
private Map<String, Object> _executorData;
3132

3233

3334
public TopologyContext(StormTopology topology, Map stormConf,
3435
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
3536
Map<String, Map<String, Fields>> componentToStreamToFields,
3637
String stormId, String codeDir, String pidDir, Integer taskId,
3738
Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
38-
Map<String, Object> userResources) {
39+
Map<String, Object> userResources, Map<String, Object> executorData) {
3940
super(topology, stormConf, taskToComponent, componentToSortedTasks,
4041
componentToStreamToFields, stormId, codeDir, pidDir,
4142
workerPort, workerTasks, defaultResources, userResources);
4243
_taskId = taskId;
44+
_executorData = executorData;
4345
}
4446

4547
/**
@@ -171,6 +173,14 @@ public void setTaskData(String name, Object data) {
171173
public Object getTaskData(String name) {
172174
return _taskData.get(name);
173175
}
176+
177+
public void setExecutorData(String name, Object data) {
178+
_executorData.put(name, data);
179+
}
180+
181+
public Object getExecutorData(String name) {
182+
return _executorData.get(name);
183+
}
174184

175185
public void addTaskHook(ITaskHook hook) {
176186
hook.prepare(_stormConf, this);

0 commit comments

Comments
 (0)