|
109 | 109 | (str id " is not a valid stream id"))))))
|
110 | 110 | ))
|
111 | 111 |
|
| 112 | +(defn all-components [^StormTopology topology] |
| 113 | + (apply merge {} |
| 114 | + (for [f thrift/STORM-TOPOLOGY-FIELDS] |
| 115 | + (.getFieldValue topology f) |
| 116 | + ))) |
| 117 | + |
| 118 | +(defn component-conf [component] |
| 119 | + (->> component |
| 120 | + .get_common |
| 121 | + .get_json_conf |
| 122 | + from-json)) |
| 123 | + |
112 | 124 | (defn validate-basic! [^StormTopology topology]
|
113 | 125 | (validate-ids! topology)
|
114 | 126 | (doseq [f thrift/SPOUT-FIELDS
|
115 | 127 | obj (->> f (.getFieldValue topology) vals)]
|
116 | 128 | (if-not (empty? (-> obj .get_common .get_inputs))
|
117 |
| - (throw (InvalidTopologyException. "May not declare inputs for a spout")) |
| 129 | + (throw (InvalidTopologyException. "May not declare inputs for a spout")))) |
| 130 | + (doseq [[comp-id comp] (all-components topology) |
| 131 | + :let [conf (component-conf comp) |
| 132 | + p (-> comp .get_common thrift/parallelism-hint)]] |
| 133 | + (when (and (> (conf TOPOLOGY-TASKS) 0) |
| 134 | + p |
| 135 | + (<= p 0)) |
| 136 | + (throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0")) |
118 | 137 | )))
|
119 | 138 |
|
120 |
| -(defn all-components [^StormTopology topology] |
121 |
| - (apply merge {} |
122 |
| - (for [f thrift/STORM-TOPOLOGY-FIELDS] |
123 |
| - (.getFieldValue topology f) |
124 |
| - ))) |
125 |
| - |
126 | 139 | (defn validate-structure! [^StormTopology topology]
|
127 | 140 | ;; validate all the component subscribe from component+stream which actually exists in the topology
|
128 | 141 | ;; and if it is a fields grouping, validate the corresponding field exists
|
|
144 | 157 | (when-not (empty? diff-fields)
|
145 | 158 | (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields)))))))))))))
|
146 | 159 |
|
147 |
| -(defn component-conf [component] |
148 |
| - (->> component |
149 |
| - .get_common |
150 |
| - .get_json_conf |
151 |
| - from-json)) |
152 |
| - |
153 | 160 | (defn acker-inputs [^StormTopology topology]
|
154 | 161 | (let [bolt-ids (.. topology get_bolts keySet)
|
155 | 162 | spout-ids (.. topology get_spouts keySet)
|
|
166 | 173 |
|
167 | 174 | (defn add-acker! [storm-conf ^StormTopology ret]
|
168 | 175 | (let [num-executors (storm-conf TOPOLOGY-ACKER-EXECUTORS)
|
169 |
| - num-tasks (storm-conf TOPOLOGY-ACKER-TASKS) |
170 | 176 | acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
|
171 | 177 | (new backtype.storm.daemon.acker)
|
172 | 178 | {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
|
173 | 179 | ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
|
174 | 180 | }
|
175 | 181 | :p num-executors
|
176 |
| - :conf {TOPOLOGY-TASKS num-tasks |
| 182 | + :conf {TOPOLOGY-TASKS num-executors |
177 | 183 | TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
|
178 | 184 | (dofor [[_ bolt] (.get_bolts ret)
|
179 | 185 | :let [common (.get_common bolt)]]
|
|
228 | 234 | ))
|
229 | 235 |
|
230 | 236 | (defn has-ackers? [storm-conf]
|
231 |
| - (let [tasks (storm-conf TOPOLOGY-ACKER-TASKS)] |
232 |
| - (and (or (nil? tasks) (> tasks 0)) |
233 |
| - (> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0)))) |
| 237 | + (> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0)) |
234 | 238 |
|
235 | 239 | (defn num-start-executors [component]
|
236 | 240 | (thrift/parallelism-hint (.get_common component)))
|
|
0 commit comments