|
1 | 1 | (ns backtype.storm.clojure
|
2 | 2 | (:use [clojure.contrib.def :only [defnk defalias]])
|
| 3 | + (:use [clojure.contrib.seq-utils :only [find-first]]) |
3 | 4 | (:use [backtype.storm bootstrap util])
|
4 | 5 | (:import [backtype.storm StormSubmitter])
|
5 | 6 | (:import [backtype.storm.generated StreamInfo])
|
|
41 | 42 | (concat [name args] impl)
|
42 | 43 | ))
|
43 | 44 |
|
44 |
| -(defmacro bolt [& body] |
45 |
| - (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body) |
46 |
| - fns (normalize-fns bolt-fns)] |
47 |
| - `(reify IBolt |
| 45 | +(defn mk-concise-reify [klass body] |
| 46 | + (let [[reify-fns other-fns] (split-with #(not (symbol? %)) body) |
| 47 | + fns (normalize-fns reify-fns)] |
| 48 | + `(reify ~klass |
48 | 49 | ~@fns
|
49 | 50 | ~@other-fns)))
|
50 | 51 |
|
51 |
| -(defmacro bolt-execute [& body] |
52 |
| - `(bolt |
53 |
| - (~'execute ~@body))) |
| 52 | +(defmacro bolt [& body] |
| 53 | + (mk-concise-reify 'IBolt body)) |
54 | 54 |
|
55 | 55 | (defmacro spout [& body]
|
56 |
| - (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body) |
57 |
| - fns (normalize-fns spout-fns)] |
58 |
| - `(reify ISpout |
59 |
| - ~@fns |
60 |
| - ~@other-fns))) |
| 56 | + (mk-concise-reify 'ISpout body)) |
61 | 57 |
|
| 58 | +{:keys [url service_description] |
| 59 | + :or {service_description "Ganglia Web Frontend"} |
| 60 | + :as options} |
| 61 | + |
| 62 | +(defn gen-prep-args [klass-sym prep-sym collector-sym] |
| 63 | + (let [params (->> klass-sym |
| 64 | + resolve |
| 65 | + .getMethods |
| 66 | + (filter #(= (.getName %) (str prep-sym))) |
| 67 | + first |
| 68 | + .getParameterTypes)] |
| 69 | + (for [p params] |
| 70 | + (if (.contains (.getSimpleName p) "Collector") |
| 71 | + collector-sym |
| 72 | + (gensym) |
| 73 | + )))) |
| 74 | + |
| 75 | +(defn mk-delegate-class [name main-interface prepare-method delegate-maker committer?] |
| 76 | + (let [methods (-> main-interface resolve .getMethods) |
| 77 | + interfaces (if committer? [main-interface 'ICommitter] [main-interface]) |
| 78 | + prefix (str name "-") |
| 79 | + implementations (for [m methods] |
| 80 | + (let [] |
| 81 | + `(defn ~(str prefix (.getName m)) |
| 82 | + )))] |
| 83 | + `(do |
| 84 | + ;; TODO: Might need to auto-ns qualify name |
| 85 | + ;; need a constructor whichs stores an object reference |
| 86 | + ;; need to type hint the object as "main-interface" to avoid reflection |
| 87 | + (gen-class |
| 88 | + :name ~name |
| 89 | + :implements ~interfaces |
| 90 | + :prefix ~prefix) |
| 91 | + ~@implementations |
| 92 | + ))) |
| 93 | + |
| 94 | +;; TODO: need something else that will auto-gen "bolt", "spout" |
| 95 | +;; want it to work for batchbolt, richbolt, richspout, basicbolt |
| 96 | +;; need to be able to add extra interfaces |
| 97 | +(defn mk-executor [main-interface ;; e.g., IRichBolt, IRichSpout |
| 98 | + prepare-method |
| 99 | + reify-sym |
| 100 | + [name output-spec & [opts & impl :as all]] |
| 101 | + & {:keys [default-method prepare-default] |
| 102 | + :or [prepare-default true] |
| 103 | + :as executor-opts}] |
| 104 | + (if-not (map? opts) |
| 105 | + (apply mk-executor main-interface prepare-method reify-sym |
| 106 | + (concat [name output-spec {}] all) |
| 107 | + (mapcat identity executor-opt)) |
| 108 | + (let [worker-name (symbol (str name "__")) |
| 109 | + conf-fn-name (symbol (str name "__conf__")) |
| 110 | + params (:params opts) |
| 111 | + conf-code (:conf opts) |
| 112 | + committer? (:committer opts) ;; kind of a hack... |
| 113 | + prepare? (if (contains? opts :prepare) (:prepare opts) prepare-default) |
| 114 | + fn-body (if prepare? |
| 115 | + (cons 'fn impl) |
| 116 | + (let [[args & impl-body] impl |
| 117 | + coll-sym (last args) |
| 118 | + args (butlast args) |
| 119 | + prepargs (gen-prep-args main-interface prepare-method coll-sym)] |
| 120 | + `(fn ~(vec prepargs) (~reify-sym (~default-method ~(vec args) ~@impl-body))))) |
| 121 | + definer (if params |
| 122 | + `(defn ~name [& args#] |
| 123 | + (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#)) |
| 124 | + `(def ~name |
| 125 | + (clojure-bolt ~output-spec ~worker-name ~conf-fn-name [])) |
| 126 | + ) |
| 127 | + ] |
| 128 | + `(do |
| 129 | + ;; TODO: need to do a gen-class for this object using "name" |
| 130 | + (defn ~conf-fn-name ~(if params params []) |
| 131 | + ~conf-code |
| 132 | + ) |
| 133 | + (defn ~worker-name ~(if params params []) |
| 134 | + ~fn-body |
| 135 | + ) |
| 136 | + ~definer |
| 137 | + )))) |
62 | 138 | (defmacro defbolt [name output-spec & [opts & impl :as all]]
|
63 | 139 | (if-not (map? opts)
|
64 | 140 | `(defbolt ~name ~output-spec {} ~@all)
|
|
71 | 147 | (let [[args & impl-body] impl
|
72 | 148 | coll-sym (nth args 1)
|
73 | 149 | args (vec (take 1 args))
|
| 150 | + ;; unify by taking collector as last arg |
| 151 | + ;; (batch bolt should force prepare...) |
| 152 | + ;; need options to implement other interfaces |
74 | 153 | prepargs [(gensym "conf") (gensym "context") coll-sym]]
|
75 | 154 | `(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
|
76 | 155 | definer (if params
|
|
0 commit comments