Skip to content

Commit d99f13d

Browse files
author
Nathan Marz
committed
merge master
2 parents b7d7962 + a942da6 commit d99f13d

File tree

13 files changed

+235
-243
lines changed

13 files changed

+235
-243
lines changed

CHANGELOG.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## Unreleased
22

3+
NOTE: The change from 0.7.0 in which OutputCollector no longer assumes immutable inputs has been reverted to support optimized sending of tuples to colocated tasks
4+
5+
* Messages sent to colocated tasks are sent in-memory, skipping serialization (useful in conjunction with localOrShuffle grouping) (thanks xumingming)
36
* Upgrade to Clojure 1.4 (thanks sorenmacbeth)
47
* Can override the hostname that supervisors report using "storm.local.hostname" config.
58
* Make request timeout within DRPC server configurable via "drpc.request.timeout.secs"
@@ -10,6 +13,7 @@
1013
* Added close method to OpaqueTransactionalSpout coordinator
1114
* Added "storm dev-zookeeper" command for launching a local zookeeper server. Useful for testing a one node Storm cluster locally. Zookeeper dir configured with "dev.zookeeper.path"
1215
* Use new style classes for Python multilang adapter (thanks hellp)
16+
* Added "storm version" command
1317
* Bug fix: Fixed criticial bug in opaque transactional topologies that would lead to duplicate messages when using pipelining
1418
* Bug fix: Workers will now die properly if a ShellBolt subprocess dies (thanks tomo)
1519
* Bug fix: Hide the BasicOutputCollector#getOutputter method, since it shouldn't be a publicly available method.
@@ -69,7 +73,7 @@
6973
* Logging now always goes to logs/ in the Storm directory, regardless of where you launched the daemon (thanks haitaoyao)
7074
* Improved Clojure DSL: can emit maps and Tuples implement the appropriate interfaces to integrate with Clojure's seq functions (thanks schleyfox)
7175
* Added "ui.childopts" config (thanks ddillinger)
72-
* Bug fix: OutputCollector no longer assumes immutable inputs
76+
* Bug fix: OutputCollector no longer assumes immutable inputs [NOTE: this was reverted in 0.7.2 because it conflicts with sending tuples to colocated tasks without serialization]
7377
* Bug fix: DRPC topologies now throw a proper error when no DRPC servers are configured instead of NPE (thanks danharvey)
7478
* Bug fix: Fix local mode so multiple topologies can be run on one LocalCluster
7579
* Bug fix: "storm supervisor" now uses supervisor.childopts instead of nimbus.childopts (thanks ddillinger)

README.markdown

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ Storm is a distributed realtime computation system. Similar to how Hadoop provid
22

33
The [Rationale page](https://github.com/nathanmarz/storm/wiki/Rationale) on the wiki explains what Storm is and why it was built. The [video](http://www.infoq.com/presentations/Storm) and [slides](http://www.slideshare.net/nathanmarz/storm-distributed-and-faulttolerant-realtime-computation) of Storm's launch presentation are also good introductions to the project.
44

5+
Follow [@stormprocessor](https://twitter.com/stormprocessor) on Twitter for updates on the project.
6+
57
## Documentation
68

79
Documentation and tutorials can be found on the [Storm wiki](http://github.com/nathanmarz/storm/wiki).

bin/storm

+13-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def activate(*args):
132132
jvmtype="-client",
133133
extrajars=[CONF_DIR, STORM_DIR + "/bin"])
134134

135-
def list(*args):
135+
def listtopos(*args):
136136
"""Syntax: [storm list]
137137
138138
List the running topologies and their statuses.
@@ -273,6 +273,17 @@ def dev_zookeeper():
273273
jvmtype="-server",
274274
extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"])
275275

276+
def version():
277+
"""Syntax: [storm version]
278+
279+
Prints the version number of this Storm release.
280+
"""
281+
releasefile = STORM_DIR + "/RELEASE"
282+
if os.path.exists(releasefile):
283+
print open(releasefile).readline().strip()
284+
else:
285+
print "Unknown"
286+
276287
def print_classpath():
277288
"""Syntax: [storm classpath]
278289
@@ -305,7 +316,7 @@ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui
305316
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
306317
"remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
307318
"activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
308-
"list": list, "dev-zookeeper": dev_zookeeper}
319+
"list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version}
309320

310321
def parse_config(config_list):
311322
global CONFIG_OPTS

src/clj/backtype/storm/daemon/task.clj

+60-53
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
(ns backtype.storm.daemon.task
22
(:use [backtype.storm.daemon common])
33
(:use [backtype.storm bootstrap])
4-
(:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap])
4+
(:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap LinkedBlockingQueue])
55
(:import [backtype.storm.hooks ITaskHook])
6+
(:import [backtype.storm.tuple Tuple])
67
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
78
EmitInfo BoltFailInfo BoltAckInfo])
89
(:require [backtype.storm [tuple :as tuple]]))
@@ -156,8 +157,10 @@
156157
(.getThisTaskId topology-context)
157158
stream))))
158159

159-
(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn]
160+
(defn mk-task [conf storm-conf topology-context user-context storm-id cluster-state storm-active-atom transfer-fn suicide-fn
161+
receive-queue]
160162
(let [task-id (.getThisTaskId topology-context)
163+
worker-port (.getThisWorkerPort topology-context)
161164
component-id (.getThisComponentId topology-context)
162165
storm-conf (component-conf storm-conf topology-context component-id)
163166
_ (log-message "Loading task " component-id ":" task-id)
@@ -196,9 +199,7 @@
196199

197200
stream->component->grouper (outbound-components topology-context user-context)
198201
component->tasks (reverse-map task-info)
199-
;; important it binds to virtual port before function returns
200-
puller (msg/bind mq-context storm-id task-id)
201-
202+
202203
;; TODO: consider DRYing things up and moving stats
203204
task-readable-name (get-readable-name topology-context)
204205

@@ -239,7 +240,7 @@
239240
_ (send-unanchored topology-context tasks-fn transfer-fn SYSTEM-STREAM-ID ["startup"])
240241
executor-threads (dofor
241242
[exec (with-error-reaction report-error-and-die
242-
(mk-executors task-object storm-conf puller tasks-fn
243+
(mk-executors task-object storm-conf receive-queue tasks-fn
243244
transfer-fn
244245
storm-active-atom topology-context
245246
user-context task-stats report-error))]
@@ -254,16 +255,16 @@
254255
[this]
255256
(log-message "Shutting down task " storm-id ":" task-id)
256257
(reset! active false)
257-
;; empty messages are skip messages (this unblocks the socket)
258-
(msg/send-local-task-empty mq-context storm-id task-id)
258+
;; put an empty message into receive-queue
259+
;; empty messages are skip messages (this unblocks the receive-queue.take thread)
260+
(.put receive-queue (byte-array []))
259261
(doseq [t all-threads]
260262
(.interrupt t)
261263
(.join t))
262264
(doseq [hook (.getHooks user-context)]
263265
(.cleanup hook))
264266
(.remove-task-heartbeat! storm-cluster-state storm-id task-id)
265267
(.disconnect storm-cluster-state)
266-
(.close puller)
267268
(close-component task-object)
268269
(log-message "Shut down task " storm-id ":" task-id))
269270
DaemonCommon
@@ -290,7 +291,18 @@
290291
(stats/spout-acked-tuple! task-stats (:stream tuple-info) time-delta)
291292
))
292293

293-
(defmethod mk-executors ISpout [^ISpout spout storm-conf puller tasks-fn transfer-fn storm-active-atom
294+
(defn mk-task-receiver [^LinkedBlockingQueue receive-queue ^KryoTupleDeserializer deserializer tuple-action-fn]
295+
(fn []
296+
(let [msg (.take receive-queue)
297+
is-tuple? (instance? Tuple msg)]
298+
(when (or is-tuple? (not (empty? msg))) ; skip empty messages (used during shutdown)
299+
(log-debug "Processing message " msg)
300+
(let [^Tuple tuple (if is-tuple? msg (.deserialize deserializer msg))]
301+
(tuple-action-fn tuple)
302+
))
303+
)))
304+
305+
(defmethod mk-executors ISpout [^ISpout spout storm-conf ^LinkedBlockingQueue receive-queue tasks-fn transfer-fn storm-active-atom
294306
^TopologyContext topology-context ^TopologyContext user-context
295307
task-stats report-error-fn]
296308
(let [wait-fn (fn [] @storm-active-atom)
@@ -349,10 +361,24 @@
349361
)
350362
(reportError [this error]
351363
(report-error-fn error)
352-
))]
364+
))
365+
tuple-action-fn (fn [^Tuple tuple]
366+
(let [id (.getValue tuple 0)
367+
[spout-id tuple-finished-info start-time-ms] (.remove pending id)]
368+
(when spout-id
369+
(let [time-delta (time-delta-ms start-time-ms)]
370+
(condp = (.getSourceStreamId tuple)
371+
ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg spout user-context storm-conf spout-id
372+
tuple-finished-info time-delta task-stats sampler))
373+
ACKER-FAIL-STREAM-ID (.add event-queue #(fail-spout-msg spout user-context storm-conf spout-id
374+
tuple-finished-info time-delta task-stats sampler))
375+
)))
376+
;; TODO: on failure, emit tuple to failure stream
377+
))]
353378
(log-message "Opening spout " component-id ":" task-id)
354379
(.open spout storm-conf user-context (SpoutOutputCollector. output-collector))
355380
(log-message "Opened spout " component-id ":" task-id)
381+
;; TODO: should redesign this to only use one thread
356382
[(fn []
357383
;; This design requires that spouts be non-blocking
358384
(loop []
@@ -377,23 +403,7 @@
377403
;; TODO: log that it's getting throttled
378404
(Time/sleep 100)))
379405
))
380-
(fn []
381-
(let [^bytes ser-msg (msg/recv puller)]
382-
;; skip empty messages (used during shutdown)
383-
(when-not (empty? ser-msg)
384-
(let [tuple (.deserialize deserializer ser-msg)
385-
id (.getValue tuple 0)
386-
[spout-id tuple-finished-info start-time-ms] (.remove pending id)]
387-
(when spout-id
388-
(let [time-delta (time-delta-ms start-time-ms)]
389-
(condp = (.getSourceStreamId tuple)
390-
ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg spout user-context storm-conf spout-id
391-
tuple-finished-info time-delta task-stats sampler))
392-
ACKER-FAIL-STREAM-ID (.add event-queue #(fail-spout-msg spout user-context storm-conf spout-id
393-
tuple-finished-info time-delta task-stats sampler))
394-
))))
395-
;; TODO: on failure, emit tuple to failure stream
396-
)))
406+
(mk-task-receiver receive-queue deserializer tuple-action-fn)
397407
]
398408
))
399409

@@ -405,7 +415,7 @@
405415
;; TODO: this portion is not thread safe (multiple threads updating same value at same time)
406416
(.put pending key (bit-xor curr id))))
407417

408-
(defmethod mk-executors IBolt [^IBolt bolt storm-conf puller tasks-fn transfer-fn storm-active-atom
418+
(defmethod mk-executors IBolt [^IBolt bolt storm-conf ^LinkedBlockingQueue receive-queue tasks-fn transfer-fn storm-active-atom
409419
^TopologyContext topology-context ^TopologyContext user-context
410420
task-stats report-error-fn]
411421
(let [deserializer (KryoTupleDeserializer. storm-conf topology-context)
@@ -466,37 +476,34 @@
466476
)))
467477
(reportError [this error]
468478
(report-error-fn error)
469-
))]
479+
))
480+
tuple-action-fn (fn [^Tuple tuple]
481+
;; synchronization needs to be done with a key provided by this bolt, otherwise:
482+
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
483+
;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
484+
;; buffer other tuples until fully synchronized, then process all of those tuples
485+
;; then go into normal loop
486+
;; spill to disk?
487+
;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
488+
;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
489+
;; or just timeout the sync messages that are coming in until full sync is hit from that task
490+
;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
491+
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
492+
;; TODO: how to handle incremental updates as well as synchronizations at same time
493+
;; TODO: need to version tuples somehow
494+
495+
(log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context))
496+
(.put tuple-start-times tuple (System/currentTimeMillis))
497+
498+
(.execute bolt tuple))]
470499
(log-message "Preparing bolt " component-id ":" task-id)
471500
(.prepare bolt
472501
storm-conf
473502
user-context
474503
(OutputCollector. output-collector))
475504
(log-message "Prepared bolt " component-id ":" task-id)
476505
;; TODO: can get any SubscribedState objects out of the context now
477-
[(fn []
478-
;; synchronization needs to be done with a key provided by this bolt, otherwise:
479-
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
480-
;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
481-
;; buffer other tuples until fully synchronized, then process all of those tuples
482-
;; then go into normal loop
483-
;; spill to disk?
484-
;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
485-
;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
486-
;; or just timeout the sync messages that are coming in until full sync is hit from that task
487-
;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
488-
(let [^bytes ser (msg/recv puller)]
489-
(when-not (empty? ser) ; skip empty messages (used during shutdown)
490-
(log-debug "Processing message")
491-
(let [tuple (.deserialize deserializer ser)]
492-
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
493-
;; TODO: how to handle incremental updates as well as synchronizations at same time
494-
;; TODO: need to version tuples somehow
495-
(log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context))
496-
(.put tuple-start-times tuple (System/currentTimeMillis))
497-
498-
(.execute bolt tuple)
499-
))))]
506+
[(mk-task-receiver receive-queue deserializer tuple-action-fn)]
500507
))
501508

502509
(defmethod close-component ISpout [spout]

0 commit comments

Comments
 (0)