Skip to content

Commit f592566

Browse files
author
Nathan Marz
committed
refactor acker -- doesn't need to be serializable
1 parent fefef8f commit f592566

File tree

3 files changed

+13
-35
lines changed

3 files changed

+13
-35
lines changed

src/clj/backtype/storm/daemon/acker.clj

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
(ns backtype.storm.daemon.acker
2-
(:import [backtype.storm.task OutputCollector IBolt TopologyContext])
2+
(:import [backtype.storm.task OutputCollector TopologyContext])
33
(:import [backtype.storm.tuple Tuple Fields])
44
(:import [backtype.storm.utils TimeCacheMap])
5+
(:import [backtype.storm.topology IRichBolt])
56
(:import [java.util List Map])
6-
(:use [backtype.storm config])
7-
(:gen-class
8-
:init init
9-
:implements [backtype.storm.topology.IRichBolt]
10-
:constructors {[] []}
11-
:state state ))
7+
(:use [backtype.storm config]))
128

139
(def ACKER-COMPONENT-ID "__acker")
1410
(def ACKER-INIT-STREAM-ID "__ack_init")
@@ -24,11 +20,10 @@
2420
(.emitDirect collector task stream values)
2521
)
2622

27-
;;TODO: this approach doesn't work... need to set it during prepare
2823
(defn mk-acker-bolt []
2924
(let [output-collector (atom nil)
3025
pending (atom nil)]
31-
(reify IBolt
26+
(reify IRichBolt
3227
(^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
3328
(reset! output-collector collector)
3429
(reset! pending (TimeCacheMap. (int (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS))))
@@ -67,23 +62,7 @@
6762
))
6863
(^void cleanup [this]
6964
)
65+
(declareOutputFields [this declarer]
66+
(.declareStream declarer ACKER-ACK-STREAM-ID true (Fields. ["id"]))
67+
(.declareStream declarer ACKER-FAIL-STREAM-ID true (Fields. ["id"])))
7068
)))
71-
72-
(defn -init []
73-
[[] (mk-acker-bolt)]
74-
)
75-
76-
(defn -prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
77-
(.prepare ^IBolt (. this state) storm-conf context collector))
78-
79-
(defn -execute [this ^Tuple tuple]
80-
(.execute ^IBolt (. this state) tuple))
81-
82-
(defn -cleanup [this]
83-
(.cleanup ^IBolt (. this state)))
84-
85-
(defn -declareOutputFields [this declarer]
86-
(.declareStream declarer ACKER-ACK-STREAM-ID true (Fields. ["id"]))
87-
(.declareStream declarer ACKER-FAIL-STREAM-ID true (Fields. ["id"])))
88-
89-

src/clj/backtype/storm/daemon/common.clj

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
(:import [backtype.storm.generated StormTopology])
55
(:import [backtype.storm.utils Utils])
66
(:require [backtype.storm.daemon.acker :as acker])
7-
(:import [backtype.storm.daemon acker])
87
(:require [backtype.storm.thrift :as thrift])
98
)
109

@@ -109,7 +108,7 @@
109108
[id ACKER-FAIL-STREAM-ID] ["id"]}
110109
))
111110
acker-bolt (thrift/mk-bolt-spec (merge spout-inputs bolt-inputs)
112-
(backtype.storm.daemon.acker.)
111+
(acker/mk-acker-bolt)
113112
:p (storm-conf TOPOLOGY-ACKERS))]
114113
(.put_to_bolts ret "__acker" acker-bolt)
115114
(dofor [[_ bolt] (.get_bolts ret)

src/clj/backtype/storm/testing.clj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
SpoutTracker BoltTracker TrackerAggregator])
1515
(:require [backtype.storm [zookeeper :as zk]])
1616
(:require [backtype.storm.messaging.loader :as msg-loader])
17+
(:require [backtype.storm.daemon.acker :as acker])
1718
(:use [clojure.contrib.def :only [defnk]])
1819
(:use [clojure.contrib.seq :only [find-first]])
1920
(:use [backtype.storm cluster util thrift config log]))
@@ -410,17 +411,16 @@
410411
}))
411412

412413
(defmacro with-tracked-cluster [cluster-args & body]
413-
;;TODO: need an alternative approach to this
414414
`(with-var-roots [task/outbound-components (let [old# task/outbound-components]
415415
(fn [& args#]
416416
(merge (apply old# args#)
417417
{TrackerAggregator/TRACK_STREAM
418418
{TRACKER-BOLT-ID (fn [& args#] 0)}}
419419
)))
420-
task/mk-acker-bolt (let [old# task/mk-acker-bolt]
421-
(fn [& args#]
422-
(BoltTracker. (apply old# args#))
423-
))
420+
acker/mk-acker-bolt (let [old# acker/mk-acker-bolt]
421+
(fn [& args#]
422+
(BoltTracker. (apply old# args#))
423+
))
424424
]
425425
(with-local-cluster ~cluster-args
426426
~@body

0 commit comments

Comments
 (0)