|
109 | 109 | (str id " is not a valid stream id"))))))
|
110 | 110 | ))
|
111 | 111 |
|
| 112 | +(defn all-components [^StormTopology topology] |
| 113 | + (apply merge {} |
| 114 | + (for [f thrift/STORM-TOPOLOGY-FIELDS] |
| 115 | + (.getFieldValue topology f) |
| 116 | + ))) |
| 117 | + |
| 118 | +(defn component-conf [component] |
| 119 | + (->> component |
| 120 | + .get_common |
| 121 | + .get_json_conf |
| 122 | + from-json)) |
| 123 | + |
112 | 124 | (defn validate-basic! [^StormTopology topology]
|
113 | 125 | (validate-ids! topology)
|
114 | 126 | (doseq [f thrift/SPOUT-FIELDS
|
115 | 127 | obj (->> f (.getFieldValue topology) vals)]
|
116 | 128 | (if-not (empty? (-> obj .get_common .get_inputs))
|
117 |
| - (throw (InvalidTopologyException. "May not declare inputs for a spout")) |
| 129 | + (throw (InvalidTopologyException. "May not declare inputs for a spout")))) |
| 130 | + (doseq [[comp-id comp] (all-components topology) |
| 131 | + :let [conf (component-conf comp) |
| 132 | + p (-> comp .get_common thrift/parallelism-hint)]] |
| 133 | + (when (and (> (conf TOPOLOGY-TASKS) 0) |
| 134 | + p |
| 135 | + (<= p 0)) |
| 136 | + (throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0")) |
118 | 137 | )))
|
119 | 138 |
|
120 |
| -(defn all-components [^StormTopology topology] |
121 |
| - (apply merge {} |
122 |
| - (for [f thrift/STORM-TOPOLOGY-FIELDS] |
123 |
| - (.getFieldValue topology f) |
124 |
| - ))) |
125 |
| - |
126 | 139 | (defn validate-structure! [^StormTopology topology]
|
127 | 140 | ;; validate all the component subscribe from component+stream which actually exists in the topology
|
128 | 141 | ;; and if it is a fields grouping, validate the corresponding field exists
|
|
144 | 157 | (when-not (empty? diff-fields)
|
145 | 158 | (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields)))))))))))))
|
146 | 159 |
|
147 |
| -(defn component-conf [component] |
148 |
| - (->> component |
149 |
| - .get_common |
150 |
| - .get_json_conf |
151 |
| - from-json)) |
152 |
| - |
153 | 160 | (defn acker-inputs [^StormTopology topology]
|
154 | 161 | (let [bolt-ids (.. topology get_bolts keySet)
|
155 | 162 | spout-ids (.. topology get_spouts keySet)
|
|
0 commit comments