Skip to content

Commit 04002f1

Browse files
author
Nathan Marz
committed
starting on revamped clojure dsl
1 parent 195ddaf commit 04002f1

File tree

1 file changed

+91
-12
lines changed

1 file changed

+91
-12
lines changed

src/clj/backtype/storm/clojure.clj

Lines changed: 91 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
(ns backtype.storm.clojure
22
(:use [clojure.contrib.def :only [defnk defalias]])
3+
(:use [clojure.contrib.seq-utils :only [find-first]])
34
(:use [backtype.storm bootstrap util])
45
(:import [backtype.storm StormSubmitter])
56
(:import [backtype.storm.generated StreamInfo])
@@ -41,24 +42,99 @@
4142
(concat [name args] impl)
4243
))
4344

44-
(defmacro bolt [& body]
45-
(let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
46-
fns (normalize-fns bolt-fns)]
47-
`(reify IBolt
45+
(defn mk-concise-reify [klass body]
46+
(let [[reify-fns other-fns] (split-with #(not (symbol? %)) body)
47+
fns (normalize-fns reify-fns)]
48+
`(reify ~klass
4849
~@fns
4950
~@other-fns)))
5051

51-
(defmacro bolt-execute [& body]
52-
`(bolt
53-
(~'execute ~@body)))
52+
(defmacro bolt [& body]
53+
(mk-concise-reify 'IBolt body))
5454

5555
(defmacro spout [& body]
56-
(let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
57-
fns (normalize-fns spout-fns)]
58-
`(reify ISpout
59-
~@fns
60-
~@other-fns)))
56+
(mk-concise-reify 'ISpout body))
6157

58+
{:keys [url service_description]
59+
:or {service_description "Ganglia Web Frontend"}
60+
:as options}
61+
62+
(defn gen-prep-args [klass-sym prep-sym collector-sym]
63+
(let [params (->> klass-sym
64+
resolve
65+
.getMethods
66+
(filter #(= (.getName %) (str prep-sym)))
67+
first
68+
.getParameterTypes)]
69+
(for [p params]
70+
(if (.contains (.getSimpleName p) "Collector")
71+
collector-sym
72+
(gensym)
73+
))))
74+
75+
(defn mk-delegate-class [name main-interface prepare-method delegate-maker committer?]
76+
(let [methods (-> main-interface resolve .getMethods)
77+
interfaces (if committer? [main-interface 'ICommitter] [main-interface])
78+
prefix (str name "-")
79+
implementations (for [m methods]
80+
(let []
81+
`(defn ~(str prefix (.getName m))
82+
)))]
83+
`(do
84+
;; TODO: Might need to auto-ns qualify name
85+
;; need a constructor whichs stores an object reference
86+
;; need to type hint the object as "main-interface" to avoid reflection
87+
(gen-class
88+
:name ~name
89+
:implements ~interfaces
90+
:prefix ~prefix)
91+
~@implementations
92+
)))
93+
94+
;; TODO: need something else that will auto-gen "bolt", "spout"
95+
;; want it to work for batchbolt, richbolt, richspout, basicbolt
96+
;; need to be able to add extra interfaces
97+
(defn mk-executor [main-interface ;; e.g., IRichBolt, IRichSpout
98+
prepare-method
99+
reify-sym
100+
[name output-spec & [opts & impl :as all]]
101+
& {:keys [default-method prepare-default]
102+
:or [prepare-default true]
103+
:as executor-opts}]
104+
(if-not (map? opts)
105+
(apply mk-executor main-interface prepare-method reify-sym
106+
(concat [name output-spec {}] all)
107+
(mapcat identity executor-opt))
108+
(let [worker-name (symbol (str name "__"))
109+
conf-fn-name (symbol (str name "__conf__"))
110+
params (:params opts)
111+
conf-code (:conf opts)
112+
committer? (:committer opts) ;; kind of a hack...
113+
prepare? (if (contains? opts :prepare) (:prepare opts) prepare-default)
114+
fn-body (if prepare?
115+
(cons 'fn impl)
116+
(let [[args & impl-body] impl
117+
coll-sym (last args)
118+
args (butlast args)
119+
prepargs (gen-prep-args main-interface prepare-method coll-sym)]
120+
`(fn ~(vec prepargs) (~reify-sym (~default-method ~(vec args) ~@impl-body)))))
121+
definer (if params
122+
`(defn ~name [& args#]
123+
(clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#))
124+
`(def ~name
125+
(clojure-bolt ~output-spec ~worker-name ~conf-fn-name []))
126+
)
127+
]
128+
`(do
129+
;; TODO: need to do a gen-class for this object using "name"
130+
(defn ~conf-fn-name ~(if params params [])
131+
~conf-code
132+
)
133+
(defn ~worker-name ~(if params params [])
134+
~fn-body
135+
)
136+
~definer
137+
))))
62138
(defmacro defbolt [name output-spec & [opts & impl :as all]]
63139
(if-not (map? opts)
64140
`(defbolt ~name ~output-spec {} ~@all)
@@ -71,6 +147,9 @@
71147
(let [[args & impl-body] impl
72148
coll-sym (nth args 1)
73149
args (vec (take 1 args))
150+
;; unify by taking collector as last arg
151+
;; (batch bolt should force prepare...)
152+
;; need options to implement other interfaces
74153
prepargs [(gensym "conf") (gensym "context") coll-sym]]
75154
`(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
76155
definer (if params

0 commit comments

Comments
 (0)