Skip to content

Commit 5046c24

Browse files
Delete nimbus inbox jars with a simple scheduled delete jars job
This is meant to close issue 34 (Nimbus should clean inbox) Adds two new config variables to control how often the cleaner jobs runs and how old a jar needs to be before it will be deleted.
1 parent a3094ef commit 5046c24

File tree

4 files changed

+82
-0
lines changed

4 files changed

+82
-0
lines changed

conf/defaults.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ nimbus.childopts: "-Xmx1024m"
1818
nimbus.task.timeout.secs: 30
1919
nimbus.supervisor.timeout.secs: 60
2020
nimbus.monitor.freq.secs: 10
21+
nimbus.cleanup.freq.secs: 1800
22+
nimbus.inbox.jar.expiration.secs: 3600
2123
nimbus.task.launch.secs: 120
2224
nimbus.reassign: true
2325
nimbus.file.copy.expiration.secs: 600

src/clj/backtype/storm/daemon/nimbus.clj

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@
194194

195195

196196
(defmulti setup-jar cluster-mode)
197+
(defmulti clean-inbox cluster-mode)
197198

198199
;; status types
199200
;; -- killed (:kill-time-secs)
@@ -640,6 +641,13 @@
640641
(transition! nimbus storm-id :monitor))
641642
(do-cleanup nimbus)
642643
))
644+
;; Schedule Nimbus inbox cleaner
645+
(schedule-recurring (:timer nimbus)
646+
0
647+
(conf NIMBUS-CLEANUP-FREQ-SECS)
648+
(fn []
649+
(clean-inbox conf (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
650+
))
643651
(reify Nimbus$Iface
644652
(^void submitTopology
645653
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
@@ -857,12 +865,34 @@
857865
(FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
858866
))
859867

868+
(defn- file-older-than? [now seconds file]
869+
(<= (+ (.lastModified file) (* seconds 1000)) now))
870+
871+
(defn- is-jar? [f]
872+
(-> f .getName (.endsWith ".jar")))
873+
874+
(defmethod clean-inbox :distributed [conf dir-location seconds]
875+
"Deletes jar files in dir older than seconds."
876+
(let [now (Time/currentTimeMillis)
877+
pred #(and (is-jar? %) (file-older-than? now seconds %))
878+
files (filter pred (file-seq (File. dir-location)))]
879+
(doseq [f files]
880+
(if (.delete f)
881+
(log-message "Cleaning inbox ... deleted: " (.getName f))
882+
;; This should never happen
883+
(log-error "Cleaning inbox ... error deleting: " (.getName f))
884+
))))
885+
860886
;; local implementation
861887

862888
(defmethod setup-jar :local [conf & args]
863889
nil
864890
)
865891

892+
(defmethod clean-inbox :local [conf & args]
893+
nil
894+
)
895+
866896

867897
(defn -main []
868898
(launch-server! (read-storm-config)))

src/jvm/backtype/storm/Config.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,22 @@ public class Config extends HashMap<String, Object> {
105105
*/
106106
public static String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
107107

108+
/**
109+
* How often nimbus should wake the cleanup thread to clean the inbox.
110+
* @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
111+
*/
112+
public static String NIMBUS_CLEANUP_FREQ_SECS = "nimbus.cleanup.freq.secs";
113+
114+
/**
115+
* The length of time a jar file lives in the inbox before being deleted by the cleanup thread.
116+
*
117+
* Probably keep this value greater than or equal to NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS.
118+
* Note that the time it takes to delete an inbox jar file is going to be somewhat more than
119+
* NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often NIMBUS_CLEANUP_FREQ_SECS
120+
* is set to).
121+
* @see NIMBUS_CLEANUP_FREQ_SECS
122+
*/
123+
public static String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
108124

109125
/**
110126
* How long before a supervisor can go without heartbeating before nimbus considers it dead

test/clj/backtype/storm/nimbus_test.clj

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,3 +430,37 @@
430430
;; test that nimbus can die and restart without any problems
431431
)
432432

433+
(deftest test-clean-inbox
434+
"Tests that the inbox correctly cleans jar files."
435+
(with-simulated-time
436+
(with-local-tmp [dir-location]
437+
(let [dir (File. dir-location)
438+
mk-file (fn [name seconds-ago]
439+
(let [f (File. (str dir-location "/" name))
440+
t (- (Time/currentTimeMillis) (* seconds-ago 1000))]
441+
(FileUtils/touch f)
442+
(.setLastModified f t)))
443+
assert-files-in-dir (fn [compare-file-names]
444+
(let [file-names (map #(.getName %) (file-seq dir))]
445+
(is (= (sort compare-file-names)
446+
(sort (filter #(.endsWith % ".jar") file-names))
447+
))))]
448+
;; Make three files a.jar, b.jar, c.jar.
449+
;; a and b are older than c and should be deleted first.
450+
(advance-time-secs! 100)
451+
(doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
452+
(apply mk-file fs))
453+
(assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
454+
(nimbus/clean-inbox {STORM-CLUSTER-MODE "distributed"} dir-location 10)
455+
(assert-files-in-dir ["c.jar"])
456+
;; Cleanit again, c.jar should stay
457+
(advance-time-secs! 5)
458+
(nimbus/clean-inbox {STORM-CLUSTER-MODE "distributed"} dir-location 10)
459+
(assert-files-in-dir ["c.jar"])
460+
;; Advance time, clean again, c.jar should be deleted.
461+
(advance-time-secs! 5)
462+
(nimbus/clean-inbox {STORM-CLUSTER-MODE "distributed"} dir-location 10)
463+
(assert-files-in-dir [])
464+
)))
465+
466+
)

0 commit comments

Comments
 (0)