Skip to content

Commit 8dba94a

Browse files
author
Nathan Marz
committed
merge master and fix conflicts
2 parents 6b5eb55 + 1ae5ce4 commit 8dba94a

24 files changed

+690
-29
lines changed

CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
## Unreleased
1+
## 0.8.2
22

3+
* Added backtype.storm.scheduler.IsolationScheduler. This lets you run topologies that are completely isolated at the machine level. Configure Nimbus to isolate certain topologies, and how many machines to give to each of those topologies, with the isolation.scheduler.machines config in Nimbus's storm.yaml. Topologies run on the cluster that are not listed there will share whatever remaining machines there are on the cluster after machines are allocated to the listed topologies.
34
* Storm UI now uses nimbus.host to find Nimbus rather than always using localhost (thanks Frostman)
45
* Added report-error! to Clojure DSL
56
* Automatically throttle errors sent to Zookeeper/Storm UI when too many are reported in a time interval (all errors are still logged) Configured with TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL and TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
@@ -28,11 +29,18 @@
2829
* Storm UI displays exception instead of blank page when there's an error rendering the page (thanks Frostman)
2930
* Added MultiScheme interface (thanks sritchie)
3031
* Added MockTridentTuple for testing (thanks emblem)
32+
* Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots
3133
* Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
34+
* Number of DRPC server worker threads now customizable (thanks xiaokang)
35+
* DRPC server now uses a bounded queue for requests to prevent being overloaded with requests (thanks xiaokang)
36+
* Add __hash__ method to all generated Python Thrift objects so that Python code can read Nimbus stats which use Thrift objects as dict keys
3237
* Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
3338
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
3439
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
3540
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
41+
* Bug fix: fixed NPE when emitting during emit method of Aggregator
42+
* Bug fix: URLs with periods in them in Storm UI now route correctly
43+
* Bug fix: Fix occasional cascading worker crashes due when a worker dies due to not removing connections from connection cache appropriately
3644

3745
## 0.8.1
3846

conf/defaults.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ ui.port: 8080
3636
ui.childopts: "-Xmx768m"
3737

3838
drpc.port: 3772
39+
drpc.worker.threads: 64
40+
drpc.queue.size: 128
3941
drpc.invocations.port: 3773
4042
drpc.request.timeout.secs: 600
4143

project.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
[com.netflix.curator/curator-framework "1.0.1"]
2121
[backtype/jzmq "2.1.0"]
2222
[com.googlecode.json-simple/json-simple "1.1"]
23-
[compojure "0.6.4"]
23+
[compojure "1.1.3"]
2424
[hiccup "0.3.6"]
2525
[ring/ring-jetty-adapter "0.3.11"]
2626
[org.clojure/tools.logging "0.2.3"]

src/clj/backtype/storm/daemon/drpc.clj

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
(:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
77
DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
88
DistributedRPCInvocations$Processor])
9-
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue])
9+
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
1010
(:import [backtype.storm.daemon Shutdownable])
1111
(:import [java.net InetAddress])
1212
(:use [backtype.storm bootstrap config log])
@@ -100,6 +100,8 @@
100100
(defn launch-server!
101101
([]
102102
(let [conf (read-storm-config)
103+
worker-threads (int (conf DRPC-WORKER-THREADS))
104+
queue-size (int (conf DRPC-QUEUE-SIZE))
103105
service-handler (service-handler)
104106
;; requests and returns need to be on separate thread pools, since calls to
105107
;; "execute" don't unblock until other thrift methods are called. So if
@@ -108,6 +110,8 @@
108110
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
109111
(THsHaServer$Args.)
110112
(.workerThreads 64)
113+
(.executorService (ThreadPoolExecutor. worker-threads worker-threads
114+
60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
111115
(.protocolFactory (TBinaryProtocol$Factory.))
112116
(.processor (DistributedRPC$Processor. service-handler))
113117
))

src/clj/backtype/storm/daemon/nimbus.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@
566566
(apply merge-with set/union))
567567

568568
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
569-
cluster (Cluster. supervisors topology->scheduler-assignment)
569+
cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
570570

571571
;; call scheduler.schedule to schedule all the topologies
572572
;; the new assignments for all the topologies are in the cluster object.

src/clj/backtype/storm/daemon/worker.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@
252252
(.close (get @(:cached-node+port->socket worker) endpoint)))
253253
(apply swap!
254254
(:cached-node+port->socket worker)
255-
#(HashMap. (dissoc (into {} %1) %&))
255+
#(HashMap. (apply dissoc (into {} %1) %&))
256256
remove-connections)
257257

258258
(let [missing-tasks (->> needed-tasks

src/clj/backtype/storm/scheduler/DefaultScheduler.clj

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@
2828
(->> slots
2929
(filter
3030
(fn [[node port]]
31-
(if-let [supervisor (.getSupervisorById cluster node)]
32-
(.contains (.getAllPorts supervisor) (int port))
33-
)))))
31+
(if-not (.isBlackListed cluster node)
32+
(if-let [supervisor (.getSupervisorById cluster node)]
33+
(.contains (.getAllPorts supervisor) (int port))
34+
))))))
3435

3536
(defn -prepare [this conf]
3637
)
3738

38-
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
39+
(defn default-schedule [^Topologies topologies ^Cluster cluster]
3940
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
4041
(doseq [^TopologyDetails topology needs-scheduling-topologies
4142
:let [topology-id (.getId topology)
@@ -54,3 +55,6 @@
5455
[])]]
5556
(.freeSlots cluster bad-slots)
5657
(EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))
58+
59+
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
60+
(default-schedule topologies cluster))
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
(ns backtype.storm.scheduler.IsolationScheduler
2+
(:use [backtype.storm util config log])
3+
(:require [backtype.storm.scheduler.DefaultScheduler :as DefaultScheduler])
4+
(:import [java.util HashSet Set List LinkedList ArrayList Map HashMap])
5+
(:import [backtype.storm.scheduler IScheduler Topologies
6+
Cluster TopologyDetails WorkerSlot SchedulerAssignment
7+
EvenScheduler ExecutorDetails])
8+
(:gen-class
9+
:init init
10+
:constructors {[] []}
11+
:state state
12+
:implements [backtype.storm.scheduler.IScheduler]))
13+
14+
(defn -init []
15+
[[] (container)])
16+
17+
(defn -prepare [this conf]
18+
(container-set! (.state this) conf))
19+
20+
21+
(defn- compute-worker-specs "Returns list of sets of executors"
22+
[^TopologyDetails details]
23+
(->> (.getExecutorToComponent details)
24+
reverse-map
25+
(map second)
26+
(apply interleave-all)
27+
(partition-fixed (.getNumWorkers details))
28+
(map set)))
29+
30+
(defn- compute-worker-specs "Returns mutable set of sets of executors"
31+
[^TopologyDetails details]
32+
(->> (.getExecutorToComponent details)
33+
reverse-map
34+
(map second)
35+
(apply concat)
36+
(map vector (repeat-seq (range (.getNumWorkers details))))
37+
(group-by first)
38+
(map-val #(map second %))
39+
vals
40+
(map set)
41+
(HashSet.)
42+
))
43+
44+
(defn isolated-topologies [conf topologies]
45+
(let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)]
46+
(filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies)
47+
))
48+
49+
;; map from topology id -> set of sets of executors
50+
(defn topology-worker-specs [iso-topologies]
51+
(->> iso-topologies
52+
(map (fn [t] {(.getId t) (compute-worker-specs t)}))
53+
(apply merge)))
54+
55+
(defn machine-distribution [conf ^TopologyDetails topology]
56+
(let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES)
57+
machines (get name->machines (.getName topology))
58+
workers (.getNumWorkers topology)]
59+
(-> (integer-divided workers machines)
60+
(dissoc 0)
61+
(HashMap.)
62+
)))
63+
64+
(defn topology-machine-distribution [conf iso-topologies]
65+
(->> iso-topologies
66+
(map (fn [t] {(.getId t) (machine-distribution conf t)}))
67+
(apply merge)))
68+
69+
(defn host-assignments [^Cluster cluster]
70+
(letfn [(to-slot-specs [^SchedulerAssignment ass]
71+
(->> ass
72+
.getExecutorToSlot
73+
reverse-map
74+
(map (fn [[slot executors]]
75+
[slot (.getTopologyId ass) (set executors)]))))]
76+
(->> cluster
77+
.getAssignments
78+
vals
79+
(mapcat to-slot-specs)
80+
(group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot))))
81+
)))
82+
83+
(defn- decrement-distribution! [^Map distribution value]
84+
(let [v (-> distribution (get value) dec)]
85+
(if (zero? v)
86+
(.remove distribution value)
87+
(.put distribution value v))))
88+
89+
;; returns list of list of slots, reverse sorted by number of slots
90+
(defn- host-assignable-slots [^Cluster cluster]
91+
(-<> cluster
92+
.getAssignableSlots
93+
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
94+
(dissoc <> nil)
95+
(sort-by #(-> % second count -) <>)
96+
shuffle
97+
(LinkedList. <>)
98+
))
99+
100+
(defn- host->used-slots [^Cluster cluster]
101+
(->> cluster
102+
.getUsedSlots
103+
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)))
104+
))
105+
106+
(defn- distribution->sorted-amts [distribution]
107+
(->> distribution
108+
(mapcat (fn [[val amt]] (repeat amt val)))
109+
(sort-by -)
110+
))
111+
112+
(defn- allocated-topologies [topology-worker-specs]
113+
(->> topology-worker-specs
114+
(filter (fn [[_ worker-specs]] (empty? worker-specs)))
115+
(map first)
116+
set
117+
))
118+
119+
(defn- leftover-topologies [^Topologies topologies filter-ids-set]
120+
(->> topologies
121+
.getTopologies
122+
(filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t)))))
123+
(map (fn [^TopologyDetails t] {(.getId t) t}))
124+
(apply merge)
125+
(Topologies.)
126+
))
127+
128+
;; for each isolated topology:
129+
;; compute even distribution of executors -> workers on the number of workers specified for the topology
130+
;; compute distribution of workers to machines
131+
;; determine host -> list of [slot, topology id, executors]
132+
;; iterate through hosts and: a machine is good if:
133+
;; 1. only running workers from one isolated topology
134+
;; 2. all workers running on it match one of the distributions of executors for that topology
135+
;; 3. matches one of the # of workers
136+
;; blacklist the good hosts and remove those workers from the list of need to be assigned workers
137+
;; otherwise unassign all other workers for isolated topologies if assigned
138+
139+
(defn remove-elem-from-set! [^Set aset]
140+
(let [elem (-> aset .iterator .next)]
141+
(.remove aset elem)
142+
elem
143+
))
144+
145+
;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
146+
;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
147+
;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
148+
;; blacklist all machines who had production slots defined
149+
;; log isolated topologies who weren't able to get enough slots / machines
150+
;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
151+
;; set blacklist to what it was initially
152+
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
153+
(let [conf (container-get (.state this))
154+
orig-blacklist (HashSet. (.getBlacklistedHosts cluster))
155+
iso-topologies (isolated-topologies conf (.getTopologies topologies))
156+
iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set)
157+
topology-worker-specs (topology-worker-specs iso-topologies)
158+
topology-machine-distribution (topology-machine-distribution conf iso-topologies)
159+
host-assignments (host-assignments cluster)]
160+
(doseq [[host assignments] host-assignments]
161+
(let [top-id (-> assignments first second)
162+
distribution (get topology-machine-distribution top-id)
163+
^Set worker-specs (get topology-worker-specs top-id)
164+
num-workers (count assignments)
165+
]
166+
(if (and (contains? iso-ids-set top-id)
167+
(every? #(= (second %) top-id) assignments)
168+
(contains? distribution num-workers)
169+
(every? #(contains? worker-specs (nth % 2)) assignments))
170+
(do (decrement-distribution! distribution num-workers)
171+
(doseq [[_ _ executors] assignments] (.remove worker-specs executors))
172+
(.blacklistHost cluster host))
173+
(doseq [[slot top-id _] assignments]
174+
(when (contains? iso-ids-set top-id)
175+
(.freeSlot cluster slot)
176+
))
177+
)))
178+
179+
(let [host->used-slots (host->used-slots cluster)
180+
^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
181+
;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
182+
(doseq [[top-id worker-specs] topology-worker-specs
183+
:let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
184+
(doseq [amt amts
185+
:let [[host host-slots] (.peek sorted-assignable-hosts)]]
186+
(when (and host-slots (>= (count host-slots) amt))
187+
(.poll sorted-assignable-hosts)
188+
(.freeSlots cluster (get host->used-slots host))
189+
(doseq [slot (take amt host-slots)
190+
:let [executors-set (remove-elem-from-set! worker-specs)]]
191+
(.assign cluster slot top-id executors-set))
192+
(.blacklistHost cluster host))
193+
)))
194+
195+
(let [failed-iso-topologies (->> topology-worker-specs
196+
(mapcat (fn [[top-id worker-specs]]
197+
(if-not (empty? worker-specs) [top-id])
198+
)))]
199+
(if (empty? failed-iso-topologies)
200+
;; run default scheduler on non-isolated topologies
201+
(-<> topology-worker-specs
202+
allocated-topologies
203+
(leftover-topologies topologies <>)
204+
(DefaultScheduler/default-schedule <> cluster))
205+
(log-warn "Unable to isolate topologies " (pr-str failed-iso-topologies) ". Will wait for enough resources for isolated topologies before allocating any other resources.")
206+
))
207+
(.setBlacklistedHosts cluster orig-blacklist)
208+
))

src/clj/backtype/storm/testing.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
;; local dir is always overridden in maps
9797
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
9898
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
99-
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {}]
99+
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil]
100100
(let [zk-tmp (local-temp-path)
101101
[zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
102102
nimbus-tmp (local-temp-path)
@@ -114,7 +114,7 @@
114114
port-counter (mk-counter)
115115
nimbus (nimbus/service-handler
116116
(assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
117-
(nimbus/standalone-nimbus))
117+
(if inimbus inimbus (nimbus/standalone-nimbus)))
118118
context (mk-shared-context daemon-conf)
119119
cluster-map {:nimbus nimbus
120120
:port-counter port-counter

src/clj/backtype/storm/util.clj

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,3 +824,12 @@
824824
(str left "/" right)
825825
(str left right)))))
826826

827+
(defmacro -<>
828+
([x] x)
829+
([x form] (if (seq? form)
830+
(with-meta
831+
(let [[begin [_ & end]] (split-with #(not= % '<>) form)]
832+
(concat begin [x] end))
833+
(meta form))
834+
(list form x)))
835+
([x form & more] `(-<> (-<> ~x ~form) ~@more)))

src/jvm/backtype/storm/Config.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,16 @@ public class Config extends HashMap<String, Object> {
226226
*/
227227
public static String DRPC_PORT = "drpc.port";
228228

229+
/**
230+
* DRPC thrift server worker threads
231+
*/
232+
public static String DRPC_WORKER_THREADS = "drpc.worker.threads";
233+
234+
/**
235+
* DRPC thrift server queue size
236+
*/
237+
public static String DRPC_QUEUE_SIZE = "drpc.queue.size";
238+
229239
/**
230240
* This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
231241
*/
@@ -594,6 +604,12 @@ public class Config extends HashMap<String, Object> {
594604
* it is not a production grade zookeeper setup.
595605
*/
596606
public static String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
607+
608+
/**
609+
* A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler
610+
* to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
611+
*/
612+
public static String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
597613

598614
public static void setDebug(Map conf, boolean isOn) {
599615
conf.put(Config.TOPOLOGY_DEBUG, isOn);

0 commit comments

Comments
 (0)