Skip to content

Commit f96e10a

Browse files
author
Nathan Marz
committed
optimize and precompute remaining maps in TopologyContexts
1 parent 925d7dc commit f96e10a

File tree

7 files changed

+29
-34
lines changed

7 files changed

+29
-34
lines changed

src/clj/backtype/storm/daemon/common.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@
232232
(:storm-conf worker)
233233
(:task->component worker)
234234
(:component->sorted-tasks worker)
235+
(:component->stream->fields worker)
235236
(:storm-id worker)
236237
(supervisor-storm-resources-path
237238
(supervisor-stormdist-root (:conf worker) (:storm-id worker)))

src/clj/backtype/storm/daemon/task.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
(:storm-conf worker)
1919
(:task->component worker)
2020
(:component->sorted-tasks worker)
21+
(:component->stream->fields worker)
2122
(:storm-id worker)
2223
(supervisor-storm-resources-path
2324
(supervisor-stormdist-root conf (:storm-id worker)))

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,19 @@
106106
(into {})
107107
))
108108

109+
(defn- stream->fields [^StormTopology topology component]
110+
(->> (ThriftTopologyUtils/getComponentCommon topology component)
111+
.get_streams
112+
(map (fn [[s info]] [s (Fields. (.get_output_fields info))]))
113+
(into {})
114+
(HashMap.)))
115+
116+
(defn component->stream->fields [^StormTopology topology]
117+
(->> (ThriftTopologyUtils/getComponentIds topology)
118+
(map (fn [c] [c (stream->fields topology c)]))
119+
(into {})
120+
(HashMap.)))
121+
109122
(defn worker-data [conf mq-context storm-id supervisor-id port worker-id]
110123
(let [cluster-state (cluster/mk-distributed-cluster-state conf)
111124
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
@@ -142,7 +155,8 @@
142155
(log-error t "Error when processing event")
143156
(halt-process! 20 "Error when processing an event")
144157
))
145-
:task->component (storm-task-info topology storm-conf)
158+
:task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
159+
:component->stream->fields (component->stream->fields (:system-topology <>))
146160
:component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort))
147161
:endpoint-socket-lock (mk-rw-lock)
148162
:node+port->socket (atom {})

src/clj/backtype/storm/testing.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,6 @@
547547
spout-spec (mk-spout-spec* (TestWordSpout.)
548548
{stream fields})
549549
topology (StormTopology. {component spout-spec} {} {})
550-
context (TopologyContext. topology (read-storm-config) {(int 1) component} {component [(int 1)]} "test-storm-id" nil nil (int 1) nil [(int 1)])]
550+
context (TopologyContext. topology (read-storm-config) {(int 1) component} {component [(int 1)]} {component {stream (Fields. fields)}} "test-storm-id" nil nil (int 1) nil [(int 1)])]
551551
(TupleImpl. context values 1 stream)
552552
))

src/jvm/backtype/storm/task/GeneralTopologyContext.java

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import backtype.storm.generated.GlobalStreamId;
66
import backtype.storm.generated.Grouping;
77
import backtype.storm.generated.StormTopology;
8-
import backtype.storm.generated.StreamInfo;
98
import backtype.storm.tuple.Fields;
109
import backtype.storm.utils.ThriftTopologyUtils;
1110
import backtype.storm.utils.Utils;
@@ -17,45 +16,24 @@
1716
import org.json.simple.JSONValue;
1817
import org.json.simple.JSONAware;
1918

20-
/**
21-
* A TopologyContext is given to bolts and spouts in their "prepare" and "open"
22-
* methods, respectively. This object provides information about the component's
23-
* place within the topology, such as task ids, inputs and outputs, etc.
24-
*
25-
* <p>The TopologyContext is also used to declare ISubscribedState objects to
26-
* synchronize state with StateSpouts this object is subscribed to.</p>
27-
*/
2819
public class GeneralTopologyContext implements JSONAware {
2920
private StormTopology _topology;
3021
private Map<Integer, String> _taskToComponent;
3122
private Map<String, List<Integer>> _componentToTasks;
32-
private Map<String, Map<String, Fields>> _componentToStreamToFields = new HashMap();
23+
private Map<String, Map<String, Fields>> _componentToStreamToFields;
3324
private String _stormId;
3425
protected Map _stormConf;
3526

3627
// pass in componentToSortedTasks for the case of running tons of tasks in single executor
3728
public GeneralTopologyContext(StormTopology topology, Map stormConf,
38-
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, String stormId) {
29+
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
30+
Map<String, Map<String, Fields>> componentToStreamToFields, String stormId) {
3931
_topology = topology;
4032
_stormConf = stormConf;
4133
_taskToComponent = taskToComponent;
4234
_stormId = stormId;
4335
_componentToTasks = componentToSortedTasks;
44-
45-
//precompute this because getComponentOutputFields is called on the critical path
46-
//of tuple creation
47-
for(String component: getComponentIds()) {
48-
Map<String, Fields> streamToFields = _componentToStreamToFields.get(component);
49-
if(streamToFields==null) {
50-
streamToFields = new HashMap();
51-
_componentToStreamToFields.put(component, streamToFields);
52-
}
53-
ComponentCommon common = getComponentCommon(component);
54-
for(String stream: common.get_streams().keySet()) {
55-
StreamInfo info = common.get_streams().get(stream);
56-
streamToFields.put(stream, new Fields(info.get_output_fields()));
57-
}
58-
}
36+
_componentToStreamToFields = componentToStreamToFields;
5937
}
6038

6139
/**

src/jvm/backtype/storm/task/TopologyContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ public class TopologyContext extends WorkerTopologyContext {
3232

3333

3434
public TopologyContext(StormTopology topology, Map stormConf,
35-
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
35+
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
36+
Map<String, Map<String, Fields>> componentToStreamToFields,
3637
String stormId, String codeDir, String pidDir, Integer taskId,
3738
Integer workerPort, List<Integer> workerTasks) {
38-
super(topology, stormConf, taskToComponent, componentToSortedTasks, stormId, codeDir, pidDir, workerPort, workerTasks);
39+
super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks);
3940
_taskId = taskId;
4041
}
4142

src/jvm/backtype/storm/task/WorkerTopologyContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package backtype.storm.task;
22

33
import backtype.storm.generated.StormTopology;
4+
import backtype.storm.tuple.Fields;
45
import java.io.File;
56
import java.io.IOException;
6-
import java.util.ArrayList;
7-
import java.util.Collections;
87
import java.util.List;
98
import java.util.Map;
109

@@ -15,10 +14,11 @@ public class WorkerTopologyContext extends GeneralTopologyContext {
1514
private String _pidDir;
1615

1716
public WorkerTopologyContext(StormTopology topology, Map stormConf,
18-
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, String stormId,
17+
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
18+
Map<String, Map<String, Fields>> componentToStreamToFields, String stormId,
1919
String codeDir, String pidDir, Integer workerPort,
2020
List<Integer> workerTasks) {
21-
super(topology, stormConf, taskToComponent, componentToSortedTasks, stormId);
21+
super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId);
2222
_codeDir = codeDir;
2323
try {
2424
if(pidDir!=null) {

0 commit comments

Comments
 (0)