Skip to content

Commit 3fe6e8f

Browse files
author
Nathan Marz
committed
Merge branch 'master' into 0.8.2
2 parents cfdb01b + 2ec670b commit 3fe6e8f

File tree

13 files changed

+421
-20
lines changed

13 files changed

+421
-20
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## Unreleased
22

3+
* Added backtype.storm.scheduler.IsolationScheduler. This lets you run topologies that are completely isolated at the machine level. Configure Nimbus to isolate certain topologies, and how many machines to give to each of those topologies, with the isolation.scheduler.machines config in Nimbus's storm.yaml. Topologies run on the cluster that are not listed there will share whatever remaining machines there are on the cluster after machines are allocated to the listed topologies.
34
* Storm UI now uses nimbus.host to find Nimbus rather than always using localhost (thanks Frostman)
45
* Added report-error! to Clojure DSL
56
* Automatically throttle errors sent to Zookeeper/Storm UI when too many are reported in a time interval (all errors are still logged) Configured with TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL and TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
@@ -28,12 +29,14 @@
2829
* Storm UI displays exception instead of blank page when there's an error rendering the page (thanks Frostman)
2930
* Added MultiScheme interface (thanks sritchie)
3031
* Added MockTridentTuple for testing (thanks emblem)
32+
* Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots
3133
* Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
3234
* Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
3335
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
3436
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
3537
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
3638
* Bug fix: fixed NPE when emitting during emit method of Aggregator
39+
* Bug fix: URLs with periods in them in Storm UI now route correctly
3740

3841
## 0.8.1
3942

project.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
[com.netflix.curator/curator-framework "1.0.1"]
2121
[backtype/jzmq "2.1.0"]
2222
[com.googlecode.json-simple/json-simple "1.1"]
23-
[compojure "0.6.4"]
23+
[compojure "1.1.3"]
2424
[hiccup "0.3.6"]
2525
[ring/ring-jetty-adapter "0.3.11"]
2626
[org.clojure/tools.logging "0.2.3"]

src/clj/backtype/storm/daemon/nimbus.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@
569569
(apply merge-with set/union))
570570

571571
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
572-
cluster (Cluster. supervisors topology->scheduler-assignment)
572+
cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
573573

574574
;; call scheduler.schedule to schedule all the topologies
575575
;; the new assignments for all the topologies are in the cluster object.

src/clj/backtype/storm/scheduler/DefaultScheduler.clj

+8-4
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@
2828
(->> slots
2929
(filter
3030
(fn [[node port]]
31-
(if-let [supervisor (.getSupervisorById cluster node)]
32-
(.contains (.getAllPorts supervisor) (int port))
33-
)))))
31+
(if-not (.isBlackListed cluster node)
32+
(if-let [supervisor (.getSupervisorById cluster node)]
33+
(.contains (.getAllPorts supervisor) (int port))
34+
))))))
3435

3536
(defn -prepare [this conf]
3637
)
3738

38-
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
39+
(defn default-schedule [^Topologies topologies ^Cluster cluster]
3940
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
4041
(doseq [^TopologyDetails topology needs-scheduling-topologies
4142
:let [topology-id (.getId topology)
@@ -54,3 +55,6 @@
5455
[])]]
5556
(.freeSlots cluster bad-slots)
5657
(EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))
58+
59+
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
60+
(default-schedule topologies cluster))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
(ns backtype.storm.scheduler.IsolationScheduler
2+
(:use [backtype.storm util config log])
3+
(:require [backtype.storm.scheduler.DefaultScheduler :as DefaultScheduler])
4+
(:import [java.util HashSet Set List LinkedList ArrayList Map HashMap])
5+
(:import [backtype.storm.scheduler IScheduler Topologies
6+
Cluster TopologyDetails WorkerSlot SchedulerAssignment
7+
EvenScheduler ExecutorDetails])
8+
(:gen-class
9+
:init init
10+
:constructors {[] []}
11+
:state state
12+
:implements [backtype.storm.scheduler.IScheduler]))
13+
14+
(defn -init []
15+
[[] (container)])
16+
17+
(defn -prepare [this conf]
18+
(container-set! (.state this) conf))
19+
20+
21+
(defn- compute-worker-specs "Returns list of sets of executors"
22+
[^TopologyDetails details]
23+
(->> (.getExecutorToComponent details)
24+
reverse-map
25+
(map second)
26+
(apply interleave-all)
27+
(partition-fixed (.getNumWorkers details))
28+
(map set)))
29+
30+
(defn- compute-worker-specs "Returns mutable set of sets of executors"
31+
[^TopologyDetails details]
32+
(->> (.getExecutorToComponent details)
33+
reverse-map
34+
(map second)
35+
(apply concat)
36+
(map vector (repeat-seq (range (.getNumWorkers details))))
37+
(group-by first)
38+
(map-val #(map second %))
39+
vals
40+
(map set)
41+
(HashSet.)
42+
))
43+
44+
(defn isolated-topologies [conf topologies]
45+
(let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)]
46+
(filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies)
47+
))
48+
49+
;; map from topology id -> set of sets of executors
50+
(defn topology-worker-specs [iso-topologies]
51+
(->> iso-topologies
52+
(map (fn [t] {(.getId t) (compute-worker-specs t)}))
53+
(apply merge)))
54+
55+
(defn machine-distribution [conf ^TopologyDetails topology]
56+
(let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES)
57+
machines (get name->machines (.getName topology))
58+
workers (.getNumWorkers topology)]
59+
(-> (integer-divided workers machines)
60+
(dissoc 0)
61+
(HashMap.)
62+
)))
63+
64+
(defn topology-machine-distribution [conf iso-topologies]
65+
(->> iso-topologies
66+
(map (fn [t] {(.getId t) (machine-distribution conf t)}))
67+
(apply merge)))
68+
69+
(defn host-assignments [^Cluster cluster]
70+
(letfn [(to-slot-specs [^SchedulerAssignment ass]
71+
(->> ass
72+
.getExecutorToSlot
73+
reverse-map
74+
(map (fn [[slot executors]]
75+
[slot (.getTopologyId ass) (set executors)]))))]
76+
(->> cluster
77+
.getAssignments
78+
vals
79+
(mapcat to-slot-specs)
80+
(group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot))))
81+
)))
82+
83+
(defn- decrement-distribution! [^Map distribution value]
84+
(let [v (-> distribution (get value) dec)]
85+
(if (zero? v)
86+
(.remove distribution value)
87+
(.put distribution value v))))
88+
89+
;; returns list of list of slots, reverse sorted by number of slots
90+
(defn- host-assignable-slots [^Cluster cluster]
91+
(-<> cluster
92+
.getAssignableSlots
93+
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
94+
(dissoc <> nil)
95+
(sort-by #(-> % second count -) <>)
96+
shuffle
97+
(LinkedList. <>)
98+
))
99+
100+
(defn- host->used-slots [^Cluster cluster]
101+
(->> cluster
102+
.getUsedSlots
103+
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)))
104+
))
105+
106+
(defn- distribution->sorted-amts [distribution]
107+
(->> distribution
108+
(mapcat (fn [[val amt]] (repeat amt val)))
109+
(sort-by -)
110+
))
111+
112+
(defn- allocated-topologies [topology-worker-specs]
113+
(->> topology-worker-specs
114+
(filter (fn [[_ worker-specs]] (empty? worker-specs)))
115+
(map first)
116+
set
117+
))
118+
119+
(defn- leftover-topologies [^Topologies topologies filter-ids-set]
120+
(->> topologies
121+
.getTopologies
122+
(filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t)))))
123+
(map (fn [^TopologyDetails t] {(.getId t) t}))
124+
(apply merge)
125+
(Topologies.)
126+
))
127+
128+
;; for each isolated topology:
129+
;; compute even distribution of executors -> workers on the number of workers specified for the topology
130+
;; compute distribution of workers to machines
131+
;; determine host -> list of [slot, topology id, executors]
132+
;; iterate through hosts and: a machine is good if:
133+
;; 1. only running workers from one isolated topology
134+
;; 2. all workers running on it match one of the distributions of executors for that topology
135+
;; 3. matches one of the # of workers
136+
;; blacklist the good hosts and remove those workers from the list of need to be assigned workers
137+
;; otherwise unassign all other workers for isolated topologies if assigned
138+
139+
(defn remove-elem-from-set! [^Set aset]
140+
(let [elem (-> aset .iterator .next)]
141+
(.remove aset elem)
142+
elem
143+
))
144+
145+
;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
146+
;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
147+
;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
148+
;; blacklist all machines who had production slots defined
149+
;; log isolated topologies who weren't able to get enough slots / machines
150+
;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
151+
;; set blacklist to what it was initially
152+
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
153+
(let [conf (container-get (.state this))
154+
orig-blacklist (HashSet. (.getBlacklistedHosts cluster))
155+
iso-topologies (isolated-topologies conf (.getTopologies topologies))
156+
iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set)
157+
topology-worker-specs (topology-worker-specs iso-topologies)
158+
topology-machine-distribution (topology-machine-distribution conf iso-topologies)
159+
host-assignments (host-assignments cluster)]
160+
(doseq [[host assignments] host-assignments]
161+
(let [top-id (-> assignments first second)
162+
distribution (get topology-machine-distribution top-id)
163+
^Set worker-specs (get topology-worker-specs top-id)
164+
num-workers (count assignments)
165+
]
166+
(if (and (contains? iso-ids-set top-id)
167+
(every? #(= (second %) top-id) assignments)
168+
(contains? distribution num-workers)
169+
(every? #(contains? worker-specs (nth % 2)) assignments))
170+
(do (decrement-distribution! distribution num-workers)
171+
(doseq [[_ _ executors] assignments] (.remove worker-specs executors))
172+
(.blacklistHost cluster host))
173+
(doseq [[slot top-id _] assignments]
174+
(when (contains? iso-ids-set top-id)
175+
(.freeSlot cluster slot)
176+
))
177+
)))
178+
179+
(let [host->used-slots (host->used-slots cluster)
180+
^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
181+
;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
182+
(doseq [[top-id worker-specs] topology-worker-specs
183+
:let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
184+
(doseq [amt amts
185+
:let [[host host-slots] (.peek sorted-assignable-hosts)]]
186+
(when (and host-slots (>= (count host-slots) amt))
187+
(.poll sorted-assignable-hosts)
188+
(.freeSlots cluster (get host->used-slots host))
189+
(doseq [slot (take amt host-slots)
190+
:let [executors-set (remove-elem-from-set! worker-specs)]]
191+
(.assign cluster slot top-id executors-set))
192+
(.blacklistHost cluster host))
193+
)))
194+
195+
(doseq [[top-id worker-specs] topology-worker-specs]
196+
(if-not (empty? worker-specs)
197+
(log-warn "Unable to isolate topology " top-id)
198+
))
199+
200+
201+
;; run default scheduler on iso topologies that didn't have enough slot + non-isolated topologies
202+
(-<> topology-worker-specs
203+
allocated-topologies
204+
(leftover-topologies topologies <>)
205+
(DefaultScheduler/default-schedule <> cluster))
206+
(.setBlacklistedHosts cluster orig-blacklist)
207+
))

src/clj/backtype/storm/testing.clj

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
;; local dir is always overridden in maps
9797
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
9898
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
99-
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {}]
99+
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil]
100100
(let [zk-tmp (local-temp-path)
101101
[zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
102102
daemon-conf (merge (read-storm-config)
@@ -113,7 +113,7 @@
113113
port-counter (mk-counter)
114114
nimbus (nimbus/service-handler
115115
(assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
116-
(nimbus/standalone-nimbus))
116+
(if inimbus inimbus (nimbus/standalone-nimbus)))
117117
context (mk-shared-context daemon-conf)
118118
cluster-map {:nimbus nimbus
119119
:port-counter port-counter

src/clj/backtype/storm/util.clj

+10
Original file line numberDiff line numberDiff line change
@@ -813,3 +813,13 @@
813813
(let [klass (if (string? klass) (Class/forName klass) klass)]
814814
(.newInstance klass)
815815
))
816+
817+
(defmacro -<>
818+
([x] x)
819+
([x form] (if (seq? form)
820+
(with-meta
821+
(let [[begin [_ & end]] (split-with #(not= % '<>) form)]
822+
(concat begin [x] end))
823+
(meta form))
824+
(list form x)))
825+
([x form & more] `(-<> (-<> ~x ~form) ~@more)))

src/jvm/backtype/storm/Config.java

+6
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,12 @@ public class Config extends HashMap<String, Object> {
590590
* it is not a production grade zookeeper setup.
591591
*/
592592
public static String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
593+
594+
/**
595+
* A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler
596+
* to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
597+
*/
598+
public static String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
593599

594600
public static void setDebug(Map conf, boolean isOn) {
595601
conf.put(Config.TOPOLOGY_DEBUG, isOn);

0 commit comments

Comments
 (0)