Skip to content

Commit 3dc29e0

Browse files
committed
merge in nathanmarz/scheduler branch
2 parents d84ac47 + 436ae85 commit 3dc29e0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1616
-1033
lines changed

CHANGELOG.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,16 @@
99
* Added button to show/hide system stats (e.g., acker component and stream stats) from the Storm UI (thanks xumingming)
1010
* Stats are tracked on a per-executor basis instead of per-task basis
1111
* Major optimization for unreliable spouts and unanchored tuples (will use far less CPU)
12+
* Revamped internals of Storm to use LMAX disruptor for internal queuing. Dramatic reductions in contention and CPU usage.
13+
* Numerous micro-optimizations all throughout the codebase to reduce CPU usage.
14+
* Optimized internals of Storm to use much fewer threads - two fewer threads per spout and one fewer thread per acker.
1215
* Removed error method from task hooks (to be re-added at a later time)
13-
* Switched in memory queues to use Disruptor (major performance improvement)
1416
* Validate that subscriptions come from valid components and streams, and if it's a field grouping that the schema is correct (thanks xumingming)
15-
* MemoryTransactionalSpout now works on a cluster
17+
* MemoryTransactionalSpout now works in cluster mode
1618
* Only track errors on a component by component basis to reduce the amount stored in zookeeper (to speed up UI). A side effect of this change is the removal of the task page in the UI.
19+
* Add TOPOLOGY-TICK-TUPLE-FREQ-SECS config to have Storm automatically send "tick" tuples to a bolt's execute method coming from the __system component and __tick stream at the configured frequency. Meant to be used as a component-specific configuration.
20+
* Upgrade Kryo to v2.04
21+
* Tuple is now an interface and is much cleaner. The Clojure DSL helpers have been moved to TupleImpl
1722

1823
## 0.7.2 (unreleased but release candidate available)
1924

@@ -42,13 +47,15 @@ NOTE: The change from 0.7.0 in which OutputCollector no longer assumes immutable
4247
* Throw helpful error message if StormSubmitter used without using storm client script
4348
* Add Values class as a default serialization
4449
* Bug fix: give absolute piddir to subprocesses (so that relative paths can be used for storm local dir)
50+
* Bug fix: Fixed critical bug in transactional topologies where a batch would be considered successful even if the batch didn't finish
4551
* Bug fix: Fixed critical bug in opaque transactional topologies that would lead to duplicate messages when using pipelining
4652
* Bug fix: Workers will now die properly if a ShellBolt subprocess dies (thanks tomo)
4753
* Bug fix: Hide the BasicOutputCollector#getOutputter method, since it shouldn't be a publicly available method
4854
* Bug fix: Zookeeper in local mode now always gets an unused port. This will eliminate conflicts with other local mode processes or other Zookeeper instances on a local machine. (thanks xumingming)
4955
* Bug fix: Fixed NPE in CoordinatedBolt it tuples emitted, acked, or failed for a request id that has already timed out. (thanks xumingming)
5056
* Bug fix: UI no longer errors for topologies with no assigned tasks (thanks xumingming)
5157
* Bug fix: emitDirect on SpoutOutputCollector now works
58+
* Bug fix: Fixed NPE when giving null parallelism hint for spout in TransactionalTopologyBuilder (thanks xumingming)
5259

5360
## 0.7.1
5461

README.markdown

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation. Storm is simple, can be used with any programming language, and is a lot of fun to use!
1+
Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation. Storm is simple, can be used with any programming language, [is used by many companies](https://github.com/nathanmarz/storm/wiki/Powered-By), and is a lot of fun to use!
22

33
The [Rationale page](https://github.com/nathanmarz/storm/wiki/Rationale) on the wiki explains what Storm is and why it was built. The [video](http://www.infoq.com/presentations/Storm) and [slides](http://www.slideshare.net/nathanmarz/storm-distributed-and-faulttolerant-realtime-computation) of Storm's launch presentation are also good introductions to the project.
44

@@ -51,3 +51,7 @@ You must not remove this notice, or any other, from this software.
5151
* Nicolas Yzet ([@nicoo](https://github.com/nicoo))
5252
* Fabian Neumann ([@hellp](https://github.com/hellp))
5353
* Soren Macbeth ([@sorenmacbeth](https://github.com/sorenmacbeth))
54+
55+
## Acknowledgements
56+
57+
YourKit is kindly supporting open source projects with its full-featured Java Profiler. YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications. Take a look at YourKit's leading software products: [YourKit Java Profiler](http://www.yourkit.com/java/profiler/index.jsp) and [YourKit .NET Profiler](http://www.yourkit.com/.net/profiler/index.jsp).

conf/defaults.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ zmq.threads: 1
7070
zmq.linger.millis: 5000
7171

7272
### topology.* configs are for specific executing storms
73+
topology.enable.message.timeouts: true
7374
topology.debug: false
7475
topology.optimize: true
7576
topology.workers: 1
@@ -89,5 +90,7 @@ topology.executor.receive.buffer.size: 8192 #batched
8990
topology.executor.send.buffer.size: 16384 #individual messages
9091
topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets)
9192
topology.transfer.buffer.size: 32 # batched
93+
topology.tick.tuple.freq.secs: null
94+
topology.worker.shared.thread.pool.size: 4
9295

9396
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"

project.clj

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@
1515
[com.netflix.curator/curator-framework "1.0.1"]
1616
[backtype/jzmq "2.1.0"]
1717
[com.googlecode.json-simple/json-simple "1.1"]
18-
[com.googlecode/kryo "1.04"]
1918
[compojure "0.6.4"]
2019
[hiccup "0.3.6"]
2120
[ring/ring-jetty-adapter "0.3.11"]
2221
[org.clojure/tools.logging "0.2.3"]
2322
[org.clojure/math.numeric-tower "0.0.1"]
2423
[org.slf4j/slf4j-log4j12 "1.5.8"]
25-
[storm/carbonite "1.0.1"]
24+
[storm/carbonite "1.2.1"]
2625
[org.yaml/snakeyaml "1.9"]
2726
[org.apache.httpcomponents/httpclient "4.1.1"]
2827
[storm/tools.cli "0.2.2"]

src/clj/backtype/storm/bootstrap.clj

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
(import (quote [backtype.storm.testing FeederSpout TestPlannerBolt TestPlannerSpout
77
AckFailDelegate AckTracker]))
88
(import (quote [backtype.storm.utils Utils LocalState Time TimeCacheMap
9-
TimeCacheMap$ExpiredCallback BufferFileInputStream
9+
TimeCacheMap$ExpiredCallback
10+
RotatingMap RotatingMap$ExpiredCallback
11+
BufferFileInputStream
1012
RegisteredGlobalState ThriftTopologyUtils DisruptorQueue
1113
MutableObject]))
1214
(import (quote [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]))
Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
(ns backtype.storm.daemon.acker
22
(:import [backtype.storm.task OutputCollector TopologyContext IBolt])
33
(:import [backtype.storm.tuple Tuple Fields])
4-
(:import [backtype.storm.utils TimeCacheMap])
4+
(:import [backtype.storm.utils RotatingMap MutableObject])
55
(:import [java.util List Map])
6+
(:import [backtype.storm Constants])
67
(:use [backtype.storm config util log])
78
(:gen-class
89
:init init
@@ -25,64 +26,67 @@
2526
)
2627

2728
(defn mk-acker-bolt []
28-
(let [output-collector (atom nil)
29-
pending (atom nil)]
29+
(let [output-collector (MutableObject.)
30+
pending (MutableObject.)]
3031
(reify IBolt
3132
(^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
32-
(reset! output-collector collector)
33-
(reset! pending (TimeCacheMap. (.maxTopologyMessageTimeout context)))
33+
(.setObject output-collector collector)
34+
(.setObject pending (RotatingMap. 2))
3435
)
3536
(^void execute [this ^Tuple tuple]
36-
(let [id (.getValue tuple 0)
37-
^TimeCacheMap pending @pending
38-
curr (.get pending id)
39-
curr (condp = (.getSourceStreamId tuple)
40-
ACKER-INIT-STREAM-ID (-> curr
41-
(update-ack (.getValue tuple 1))
42-
(assoc :spout-task (.getValue tuple 2)))
43-
ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
44-
ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
45-
(.put pending id curr)
46-
(when (and curr
47-
(:spout-task curr))
48-
(cond (= 0 (:val curr))
49-
(do
50-
(.remove pending id)
51-
(acker-emit-direct @output-collector
52-
(:spout-task curr)
53-
ACKER-ACK-STREAM-ID
54-
[id]
55-
))
56-
(:failed curr)
57-
(do
58-
(.remove pending id)
59-
(acker-emit-direct @output-collector
60-
(:spout-task curr)
61-
ACKER-FAIL-STREAM-ID
62-
[id]
63-
))
64-
))
65-
(.ack ^OutputCollector @output-collector tuple)
66-
))
37+
(let [^RotatingMap pending (.getObject pending)
38+
stream-id (.getSourceStreamId tuple)]
39+
(if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
40+
(.rotate pending)
41+
(let [id (.getValue tuple 0)
42+
^OutputCollector output-collector (.getObject output-collector)
43+
curr (.get pending id)
44+
curr (condp = stream-id
45+
ACKER-INIT-STREAM-ID (-> curr
46+
(update-ack (.getValue tuple 1))
47+
(assoc :spout-task (.getValue tuple 2)))
48+
ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
49+
ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
50+
(.put pending id curr)
51+
(when (and curr (:spout-task curr))
52+
(cond (= 0 (:val curr))
53+
(do
54+
(.remove pending id)
55+
(acker-emit-direct output-collector
56+
(:spout-task curr)
57+
ACKER-ACK-STREAM-ID
58+
[id]
59+
))
60+
(:failed curr)
61+
(do
62+
(.remove pending id)
63+
(acker-emit-direct output-collector
64+
(:spout-task curr)
65+
ACKER-FAIL-STREAM-ID
66+
[id]
67+
))
68+
))
69+
(.ack output-collector tuple)
70+
))))
6771
(^void cleanup [this]
68-
(.cleanup @pending))
72+
)
6973
)))
7074

7175
(defn -init []
7276
[[] (container)])
7377

7478
(defn -prepare [this conf context collector]
7579
(let [^IBolt ret (mk-acker-bolt)]
76-
(container-set! (.state this) ret)
80+
(container-set! (.state ^backtype.storm.daemon.acker this) ret)
7781
(.prepare ret conf context collector)
7882
))
7983

8084
(defn -execute [this tuple]
81-
(let [^IBolt delegate (container-get (.state this))]
85+
(let [^IBolt delegate (container-get (.state ^backtype.storm.daemon.acker this))]
8286
(.execute delegate tuple)
8387
))
8488

8589
(defn -cleanup [this]
86-
(let [^IBolt delegate (container-get (.state this))]
90+
(let [^IBolt delegate (container-get (.state ^backtype.storm.daemon.acker this))]
8791
(.cleanup delegate)
8892
))

src/clj/backtype/storm/daemon/common.clj

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
(:import [backtype.storm.utils Utils])
66
(:import [backtype.storm.task WorkerTopologyContext])
77
(:import [backtype.storm Constants])
8+
(:import [backtype.storm.spout NoOpSpout])
89
(:require [clojure.set :as set])
910
(:require [backtype.storm.daemon.acker :as acker])
1011
(:require [backtype.storm.thrift :as thrift])
@@ -20,6 +21,9 @@
2021

2122
(def SYSTEM-STREAM-ID "__system")
2223

24+
(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
25+
(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
26+
2327
;; the task id is the virtual port
2428
;; node->host is here so that tasks know who to talk to just from assignment
2529
;; this avoid situation where node goes down and task doesn't know what to do information-wise
@@ -140,6 +144,12 @@
140144
(when-not (empty? diff-fields)
141145
(throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields)))))))))))))
142146

147+
(defn component-conf [component]
148+
(->> component
149+
.get_common
150+
.get_json_conf
151+
from-json))
152+
143153
(defn acker-inputs [^StormTopology topology]
144154
(let [bolt-ids (.. topology get_bolts keySet)
145155
spout-ids (.. topology get_spouts keySet)
@@ -154,23 +164,31 @@
154164
))]
155165
(merge spout-inputs bolt-inputs)))
156166

157-
(defn add-acker! [num-executors num-tasks ^StormTopology ret]
158-
(let [acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
167+
(defn add-acker! [storm-conf ^StormTopology ret]
168+
(let [num-executors (storm-conf TOPOLOGY-ACKER-EXECUTORS)
169+
num-tasks (storm-conf TOPOLOGY-ACKER-TASKS)
170+
acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
159171
(new backtype.storm.daemon.acker)
160172
{ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
161173
ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
162174
}
163175
:p num-executors
164-
:conf {TOPOLOGY-TASKS num-tasks})]
176+
:conf {TOPOLOGY-TASKS num-tasks
177+
TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
165178
(dofor [[_ bolt] (.get_bolts ret)
166179
:let [common (.get_common bolt)]]
167180
(do
168181
(.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"]))
169182
(.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"]))
170183
))
171184
(dofor [[_ spout] (.get_spouts ret)
172-
:let [common (.get_common spout)]]
185+
:let [common (.get_common spout)
186+
spout-conf (merge
187+
(component-conf spout)
188+
{TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]]
173189
(do
190+
;; this set up tick tuples to cause timeouts to be triggered
191+
(.set_json_conf common (to-json spout-conf))
174192
(.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"]))
175193
(.put_to_inputs common
176194
(GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
@@ -189,11 +207,22 @@
189207
;; TODO: consider adding a stats stream for stats aggregation
190208
))
191209

210+
(defn add-system-components! [^StormTopology topology]
211+
(let [system-spout (thrift/mk-spout-spec*
212+
(NoOpSpout.)
213+
{SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
214+
}
215+
:p 0
216+
:conf {TOPOLOGY-TASKS 0})]
217+
(.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)
218+
))
219+
192220
(defn system-topology! [storm-conf ^StormTopology topology]
193221
(validate-basic! topology)
194222
(let [ret (.deepCopy topology)]
195-
(add-acker! (storm-conf TOPOLOGY-ACKER-EXECUTORS) (storm-conf TOPOLOGY-ACKER-TASKS) ret)
223+
(add-acker! storm-conf ret)
196224
(add-system-streams! ret)
225+
(add-system-components! ret)
197226
(validate-structure! ret)
198227
ret
199228
))
@@ -203,12 +232,6 @@
203232
(and (or (nil? tasks) (> tasks 0))
204233
(> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0))))
205234

206-
(defn component-conf [component]
207-
(->> component
208-
.get_common
209-
.get_json_conf
210-
from-json))
211-
212235
(defn num-start-executors [component]
213236
(thrift/parallelism-hint (.get_common component)))
214237

@@ -229,18 +252,20 @@
229252
(map int)))
230253

231254
(defn worker-context [worker]
232-
(WorkerTopologyContext. (:system-topology worker)
233-
(:storm-conf worker)
234-
(:task->component worker)
235-
(:component->sorted-tasks worker)
236-
(:component->stream->fields worker)
237-
(:storm-id worker)
238-
(supervisor-storm-resources-path
239-
(supervisor-stormdist-root (:conf worker) (:storm-id worker)))
240-
(worker-pids-root (:conf worker) (:worker-id worker))
241-
(:port worker)
242-
(:task-ids worker)
243-
))
255+
(WorkerTopologyContext. (:system-topology worker)
256+
(:storm-conf worker)
257+
(:task->component worker)
258+
(:component->sorted-tasks worker)
259+
(:component->stream->fields worker)
260+
(:storm-id worker)
261+
(supervisor-storm-resources-path
262+
(supervisor-stormdist-root (:conf worker) (:storm-id worker)))
263+
(worker-pids-root (:conf worker) (:worker-id worker))
264+
(:port worker)
265+
(:task-ids worker)
266+
(:default-shared-resources worker)
267+
(:user-shared-resources worker)
268+
))
244269

245270

246271
(defn to-task->node+port [executor->node+port]

0 commit comments

Comments
 (0)