Skip to content

Commit 55ceb6d

Browse files
author
Nathan Marz
committed
fix conflicts
2 parents f84d181 + 7918fad commit 55ceb6d

35 files changed

+196
-114
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## Unreleased
22

3+
* Upgrade to Clojure 1.4 (thanks sorenmacbeth)
34
* Can override the hostname that supervisors report using "storm.local.hostname" config.
45
* Make request timeout within DRPC server configurable via "drpc.request.timeout.secs"
56
* Added "storm list" command to show running topologies at the command line (thanks xumingming)
@@ -8,7 +9,7 @@
89
* Added reportError to BatchOutputCollector
910
* Added close method to OpaqueTransactionalSpout coordinator
1011
* Added "storm dev-zookeeper" command for launching a local zookeeper server. Useful for testing a one node Storm cluster locally. Zookeeper dir configured with "dev.zookeeper.path"
11-
* Use new style classes for Python multilang adapter
12+
* Use new style classes for Python multilang adapter (thanks hellp)
1213
* Bug fix: Fixed criticial bug in opaque transactional topologies that would lead to duplicate messages when using pipelining
1314
* Bug fix: Workers will now die properly if a ShellBolt subprocess dies (thanks tomo)
1415
* Bug fix: Hide the BasicOutputCollector#getOutputter method, since it shouldn't be a publicly available method.

README.markdown

+2
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,5 @@ You must not remove this notice, or any other, from this software.
4141
* Thomas Jack ([@tomo](https://github.com/tomo))
4242
* Nicolas Yzet ([@nicoo](https://github.com/nicoo))
4343
* James Xu ([@xumingming](https://github.com/xumingming))
44+
* Fabian Neumann ([@hellp](https://github.com/hellp))
45+
* Soren Macbeth ([@sorenmacbeth](https://github.com/sorenmacbeth))

project.clj

+7-7
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@
66
:resources-path "conf"
77
:dev-resources-path "src/dev"
88
:repositories {"sonatype" "http://oss.sonatype.org/content/groups/public/"}
9-
:dependencies [[org.clojure/clojure "1.2.0"]
10-
[org.clojure/clojure-contrib "1.2.0"]
9+
:dependencies [[org.clojure/clojure "1.4.0"]
1110
[commons-io "1.4"]
1211
[org.apache.commons/commons-exec "1.1"]
1312
[storm/libthrift7 "0.7.0"]
14-
[clj-time "0.3.0"]
13+
[clj-time "0.4.1"]
1514
[log4j/log4j "1.2.16"]
1615
[com.netflix.curator/curator-framework "1.0.1"]
1716
[backtype/jzmq "2.1.0"]
@@ -20,16 +19,17 @@
2019
[compojure "0.6.4"]
2120
[hiccup "0.3.6"]
2221
[ring/ring-jetty-adapter "0.3.11"]
22+
[org.clojure/tools.logging "0.2.3"]
23+
[org.clojure/math.numeric-tower "0.0.1"]
2324
[org.slf4j/slf4j-log4j12 "1.5.8"]
24-
[storm/carbonite "1.0.0"]
25+
[storm/carbonite "1.0.1"]
2526
[org.yaml/snakeyaml "1.9"]
2627
[org.apache.httpcomponents/httpclient "4.1.1"]
28+
[org.clojure/tools.cli "0.2.1"]
2729
]
2830
:uberjar-exclusions [#"META-INF.*"]
2931
:dev-dependencies [
30-
[swank-clojure "1.2.1"]
31-
[lein-ring "0.4.5"]
32-
[lein-eclipse "1.0.0"]
32+
[swank-clojure "1.4.0-SNAPSHOT" :exclusions [org.clojure/clojure]]
3333
]
3434
:jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]
3535
:ring {:handler backtype.storm.ui.core/app}

src/clj/backtype/storm/bootstrap.clj

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
(require (quote [backtype.storm.messaging.loader :as msg-loader]))
2121
(require (quote [backtype.storm.messaging.protocol :as msg]))
2222
(use (quote [backtype.storm config util log clojure timer]))
23-
(use (quote [clojure.contrib.seq :only [find-first]]))
2423
(require (quote [backtype.storm [thrift :as thrift] [cluster :as cluster]
2524
[event :as event] [process-simulator :as psim]]))
2625
(require (quote [clojure.set :as set]))

src/clj/backtype/storm/clojure.clj

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
(ns backtype.storm.clojure
2-
(:use [clojure.contrib.def :only [defnk defalias]])
32
(:use [backtype.storm bootstrap util])
43
(:import [backtype.storm StormSubmitter])
54
(:import [backtype.storm.generated StreamInfo])
@@ -8,6 +7,7 @@
87
(:import [backtype.storm.spout SpoutOutputCollector ISpout])
98
(:import [backtype.storm.utils Utils])
109
(:import [backtype.storm.clojure ClojureBolt ClojureSpout])
10+
(:import [java.util List])
1111
(:require [backtype.storm [thrift :as thrift]]))
1212

1313

@@ -138,14 +138,14 @@
138138
(tuple-values [this collector stream]
139139
this))
140140

141-
(defnk emit-bolt! [collector ^TupleValues values
141+
(defnk emit-bolt! [collector values
142142
:stream Utils/DEFAULT_STREAM_ID :anchor []]
143143
(let [^List anchor (collectify anchor)
144144
values (tuple-values values collector stream) ]
145145
(.emit ^OutputCollector (:output-collector collector) stream anchor values)
146146
))
147147

148-
(defnk emit-direct-bolt! [collector task ^TupleValues values
148+
(defnk emit-direct-bolt! [collector task values
149149
:stream Utils/DEFAULT_STREAM_ID :anchor []]
150150
(let [^List anchor (collectify anchor)
151151
values (tuple-values values collector stream) ]
@@ -158,12 +158,12 @@
158158
(defn fail! [collector ^Tuple tuple]
159159
(.fail ^OutputCollector (:output-collector collector) tuple))
160160

161-
(defnk emit-spout! [collector ^TupleValues values
161+
(defnk emit-spout! [collector values
162162
:stream Utils/DEFAULT_STREAM_ID :id nil]
163163
(let [values (tuple-values values collector stream)]
164164
(.emit ^SpoutOutputCollector (:output-collector collector) stream values id)))
165165

166-
(defnk emit-direct-spout! [collector task ^TupleValues values
166+
(defnk emit-direct-spout! [collector task values
167167
:stream Utils/DEFAULT_STREAM_ID :id nil]
168168
(let [values (tuple-values values collector stream)]
169169
(.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id)))

src/clj/backtype/storm/cluster.clj

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
(:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException])
44
(:import [backtype.storm.utils Utils])
55
(:use [backtype.storm util log config])
6-
(:use [clojure.contrib.core :only [dissoc-in]])
76
(:require [backtype.storm [zookeeper :as zk]])
87

98
)
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
(ns backtype.storm.command.kill-topology
2-
(:use [clojure.contrib.command-line :only [with-command-line]])
2+
(:use [clojure.tools.cli :only [cli]])
33
(:use [backtype.storm thrift config log])
44
(:import [backtype.storm.generated KillOptions])
55
(:gen-class))
66

7-
87
(defn -main [& args]
9-
(with-command-line args
10-
"Kill a topology"
11-
[[wait w "Override the amount of time to wait after deactivating before killing" nil]
12-
posargs]
13-
(let [name (first posargs)
14-
opts (KillOptions.)]
15-
(if wait (.set_wait_secs opts (Integer/parseInt wait)))
16-
(with-configured-nimbus-connection nimbus
17-
(.killTopologyWithOpts nimbus name opts)
18-
(log-message "Killed topology: " name)
19-
))))
8+
(let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)])
9+
opts (KillOptions.)]
10+
(if wait (.set_wait_secs opts wait))
11+
(with-configured-nimbus-connection nimbus
12+
(.killTopologyWithOpts nimbus name opts)
13+
(log-message "Killed topology: " name)
14+
)))
+9-14
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
(ns backtype.storm.command.rebalance
2-
(:use [clojure.contrib.command-line :only [with-command-line]])
2+
(:use [clojure.tools.cli :only [cli]])
33
(:use [backtype.storm thrift config log])
44
(:import [backtype.storm.generated RebalanceOptions])
55
(:gen-class))
66

7-
8-
(defn -main [& args]
9-
(with-command-line args
10-
"Rebalance a topology"
11-
[[wait w "Override the amount of time to wait after deactivating before rebalancing" nil]
12-
posargs]
13-
(let [name (first posargs)
14-
opts (RebalanceOptions.)]
15-
(if wait (.set_wait_secs opts (Integer/parseInt wait)))
16-
(with-configured-nimbus-connection nimbus
17-
(.rebalance nimbus name opts)
18-
(log-message "Topology " name " is rebalancing")
19-
))))
7+
(defn -main [& args]
8+
(let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)])
9+
opts (RebalanceOptions.)]
10+
(if wait (.set_wait_secs opts wait))
11+
(with-configured-nimbus-connection nimbus
12+
(.rebalance nimbus name opts)
13+
(log-message "Topology " name " is rebalancing")
14+
)))

src/clj/backtype/storm/config.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
(:import [backtype.storm Config])
44
(:import [backtype.storm.utils Utils LocalState])
55
(:import [org.apache.commons.io FileUtils])
6-
(:require [clojure.contrib [str-utils2 :as str]])
6+
(:require [clojure [string :as str]])
77
(:use [backtype.storm util])
88
)
99

src/clj/backtype/storm/daemon/common.clj

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
(ns backtype.storm.daemon.common
2-
(:use [clojure.contrib.seq-utils :only [find-first]])
32
(:use [backtype.storm log config util])
43
(:import [backtype.storm.generated StormTopology
54
InvalidTopologyException GlobalStreamId])

src/clj/backtype/storm/daemon/nimbus.clj

+1-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
(:import [java.nio ByteBuffer])
77
(:import [java.nio.channels Channels WritableByteChannel])
88
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails])
9-
(:use [backtype.storm bootstrap])
9+
(:use [backtype.storm bootstrap util])
1010
(:use [backtype.storm.daemon common])
11-
(:use [clojure.contrib.def :only [defnk]])
1211
(:gen-class
1312
:methods [^{:static true} [launch [backtype.storm.scheduler.INimbus] void]]))
1413

@@ -433,7 +432,6 @@
433432
available-slots (available-slots nimbus callback topology-details)
434433
storm-conf (read-storm-conf conf storm-id)
435434
all-task-ids (set (.task-ids storm-cluster-state storm-id))
436-
437435
existing-assigned (reverse-map (:task->node+port existing-assignment))
438436
alive-ids (if scratch?
439437
all-task-ids

src/clj/backtype/storm/daemon/supervisor.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
))]
3333
(into {} (for [[port task-ids] port-tasks]
3434
;; need to cast to int b/c it might be a long (due to how yaml parses things)
35-
[(int port) (LocalAssignment. storm-id task-ids)]
35+
[(Integer. port) (LocalAssignment. storm-id task-ids)]
3636
))
3737
))
3838

src/clj/backtype/storm/daemon/task.clj

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
(ns backtype.storm.daemon.task
22
(:use [backtype.storm.daemon common])
33
(:use [backtype.storm bootstrap])
4-
(:use [clojure.contrib.seq :only [positions]])
54
(:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap])
65
(:import [backtype.storm.hooks ITaskHook])
76
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo

src/clj/backtype/storm/log.clj

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
(ns backtype.storm.log
2-
(:require [clojure.contrib [logging :as log]]))
2+
(:require [clojure.tools [logging :as log]]))
33

44
(defmacro log-message [& args]
55
`(log/info (str ~@args)))
66

77
(defmacro log-error [e & args]
8-
`(log/error (str ~@args) ~e))
8+
`(log/log :error ~e (str ~@args)))
99

1010
(defmacro log-debug [& args]
1111
`(log/debug (str ~@args)))

src/clj/backtype/storm/stats.clj

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@
44
ClusterSummary TopologyInfo TopologySummary TaskSummary TaskStats TaskSpecificStats
55
SpoutStats BoltStats ErrorInfo SupervisorSummary])
66
(:use [backtype.storm util])
7-
(:use [clojure.contrib.seq-utils :only [find-first]])
8-
(:use [clojure.contrib.math :only [ceil]]))
7+
(:use [clojure.math.numeric-tower :only [ceil]]))
98

109
;;TODO: consider replacing this with some sort of RRD
1110

1211
(defn curr-time-bucket [^Integer time-secs ^Integer bucket-size-secs]
13-
(* bucket-size-secs (unchecked-divide time-secs bucket-size-secs))
12+
(* bucket-size-secs (unchecked-divide-int time-secs bucket-size-secs))
1413
)
1514

1615
(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets])

src/clj/backtype/storm/testing.clj

+1-3
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
(:require [backtype.storm [zookeeper :as zk]])
2626
(:require [backtype.storm.messaging.loader :as msg-loader])
2727
(:require [backtype.storm.daemon.acker :as acker])
28-
(:use [clojure.contrib.def :only [defnk]])
29-
(:use [clojure.contrib.seq :only [find-first]])
3028
(:use [backtype.storm cluster util thrift config log]))
3129

3230
(defn feeder-spout [fields]
@@ -542,6 +540,6 @@
542540
spout-spec (mk-spout-spec* (TestWordSpout.)
543541
{stream fields})
544542
topology (StormTopology. {component spout-spec} {} {})
545-
context (TopologyContext. topology (read-storm-config) {1 component} "test-storm-id" nil nil 1 nil [1])]
543+
context (TopologyContext. topology (read-storm-config) {(int 1) component} "test-storm-id" nil nil (int 1) nil [(int 1)])]
546544
(Tuple. context values 1 stream)
547545
))

src/clj/backtype/storm/thrift.clj

+2-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
(:import [org.apache.thrift7.protocol TBinaryProtocol TProtocol])
1212
(:import [org.apache.thrift7.transport TTransport TFramedTransport TSocket])
1313
(:use [backtype.storm util config])
14-
(:use [clojure.contrib.def :only [defnk]])
1514
)
1615

1716
(defn instantiate-java-object [^JavaObject obj]
@@ -195,9 +194,9 @@
195194
([spout-map bolt-map]
196195
(let [builder (TopologyBuilder.)]
197196
(doseq [[name {spout :obj p :p conf :conf}] spout-map]
198-
(-> builder (.setSpout name spout p) (.addConfigurations conf)))
197+
(-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf)))
199198
(doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
200-
(-> builder (.setBolt name bolt p) (.addConfigurations conf) (add-inputs inputs)))
199+
(-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs)))
201200
(.createTopology builder)
202201
))
203202
([spout-map bolt-map state-spout-map]

src/clj/backtype/storm/timer.clj

+1-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
(:import [java.util PriorityQueue Comparator])
44
(:import [java.util.concurrent Semaphore])
55
(:use [backtype.storm util log])
6-
(:use [clojure.contrib.def :only [defnk]])
76
)
87

98
;; The timer defined in this file is very similar to java.util.Timer, except it integrates with
@@ -67,8 +66,7 @@
6766
delay-secs
6867
(fn this []
6968
(afn)
70-
(schedule timer recur-secs this))
71-
:check-active false ; this avoids a race condition with cancel-timer
69+
(schedule timer recur-secs this :check-active false)) ; this avoids a race condition with cancel-timer
7270
))
7371

7472
(defn cancel-timer [timer]

src/clj/backtype/storm/ui/core.clj

+2-4
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
(:use [backtype.storm config util])
55
(:use [backtype.storm.ui helpers])
66
(:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID]]])
7-
(:use [clojure.contrib.def :only [defnk]])
8-
(:use [clojure.contrib.seq-utils :only [find-first]])
97
(:use [ring.adapter.jetty :only [run-jetty]])
108
(:use [clojure.string :only [trim]])
119
(:import [backtype.storm.generated TaskSpecificStats
@@ -18,7 +16,7 @@
1816
[backtype.storm [thrift :as thrift]])
1917
(:gen-class))
2018

21-
(def *STORM-CONF* (read-storm-config))
19+
(def ^:dynamic *STORM-CONF* (read-storm-config))
2220

2321
(defmacro with-nimbus [nimbus-sym & body]
2422
`(thrift/with-nimbus-connection [~nimbus-sym "localhost" (*STORM-CONF* NIMBUS-THRIFT-PORT)]
@@ -691,4 +689,4 @@
691689
(handler/site main-routes))
692690

693691
(defn -main []
694-
(run-jetty app {:port (int (*STORM-CONF* UI-PORT))}))
692+
(run-jetty app {:port (Integer. (*STORM-CONF* UI-PORT))}))

src/clj/backtype/storm/ui/helpers.clj

+3-5
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
(ns backtype.storm.ui.helpers
22
(:use compojure.core)
33
(:use [hiccup core page-helpers])
4-
(:use [clojure.contrib
5-
[str-utils2 :only [join]]
6-
[def :only [defnk]]])
7-
(:use [backtype.storm.util :only [uuid]])
4+
(:use [clojure [string :only [join]]])
5+
(:use [backtype.storm.util :only [uuid defnk]])
86
(:use [clj-time coerce format])
97
(:require [compojure.route :as route]
108
[compojure.handler :as handler]))
119

1210
(defn split-divide [val divider]
13-
[(int (/ val divider)) (mod val divider)]
11+
[(Integer. (int (/ val divider))) (mod val divider)]
1412
)
1513

1614
(def PRETTY-SEC-DIVIDERS

0 commit comments

Comments
 (0)