|
1 | 1 | (ns backtype.storm.daemon.acker
|
2 |
| - (:import [backtype.storm.task OutputCollector IBolt TopologyContext]) |
| 2 | + (:import [backtype.storm.task OutputCollector TopologyContext]) |
3 | 3 | (:import [backtype.storm.tuple Tuple Fields])
|
4 | 4 | (:import [backtype.storm.utils TimeCacheMap])
|
| 5 | + (:import [backtype.storm.topology IRichBolt]) |
5 | 6 | (:import [java.util List Map])
|
6 |
| - (:use [backtype.storm config]) |
7 |
| - (:gen-class |
8 |
| - :init init |
9 |
| - :implements [backtype.storm.topology.IRichBolt] |
10 |
| - :constructors {[] []} |
11 |
| - :state state )) |
| 7 | + (:use [backtype.storm config])) |
12 | 8 |
|
13 | 9 | (def ACKER-COMPONENT-ID "__acker")
|
14 | 10 | (def ACKER-INIT-STREAM-ID "__ack_init")
|
|
24 | 20 | (.emitDirect collector task stream values)
|
25 | 21 | )
|
26 | 22 |
|
27 |
| -;;TODO: this approach doesn't work... need to set it during prepare |
28 | 23 | (defn mk-acker-bolt []
|
29 | 24 | (let [output-collector (atom nil)
|
30 | 25 | pending (atom nil)]
|
31 |
| - (reify IBolt |
| 26 | + (reify IRichBolt |
32 | 27 | (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
|
33 | 28 | (reset! output-collector collector)
|
34 | 29 | (reset! pending (TimeCacheMap. (int (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS))))
|
|
67 | 62 | ))
|
68 | 63 | (^void cleanup [this]
|
69 | 64 | )
|
| 65 | + (declareOutputFields [this declarer] |
| 66 | + (.declareStream declarer ACKER-ACK-STREAM-ID true (Fields. ["id"])) |
| 67 | + (.declareStream declarer ACKER-FAIL-STREAM-ID true (Fields. ["id"]))) |
70 | 68 | )))
|
71 |
| - |
72 |
| -(defn -init [] |
73 |
| - [[] (mk-acker-bolt)] |
74 |
| - ) |
75 |
| - |
76 |
| -(defn -prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector] |
77 |
| - (.prepare ^IBolt (. this state) storm-conf context collector)) |
78 |
| - |
79 |
| -(defn -execute [this ^Tuple tuple] |
80 |
| - (.execute ^IBolt (. this state) tuple)) |
81 |
| - |
82 |
| -(defn -cleanup [this] |
83 |
| - (.cleanup ^IBolt (. this state))) |
84 |
| - |
85 |
| -(defn -declareOutputFields [this declarer] |
86 |
| - (.declareStream declarer ACKER-ACK-STREAM-ID true (Fields. ["id"])) |
87 |
| - (.declareStream declarer ACKER-FAIL-STREAM-ID true (Fields. ["id"]))) |
88 |
| - |
89 |
| - |
0 commit comments