Skip to content

Commit f226695

Browse files
committed
Merge pull request nathanmarz#93 from trevorsummerssmith/inbox-clean
Delete nimbus inbox jars with a simple scheduled delete job
2 parents 20e1ef9 + 5046c24 commit f226695

File tree

4 files changed

+82
-0
lines changed

4 files changed

+82
-0
lines changed

conf/defaults.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ nimbus.childopts: "-Xmx1024m"
1818
nimbus.task.timeout.secs: 30
1919
nimbus.supervisor.timeout.secs: 60
2020
nimbus.monitor.freq.secs: 10
21+
nimbus.cleanup.freq.secs: 1800
22+
nimbus.inbox.jar.expiration.secs: 3600
2123
nimbus.task.launch.secs: 120
2224
nimbus.reassign: true
2325
nimbus.file.copy.expiration.secs: 600

src/clj/backtype/storm/daemon/nimbus.clj

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@
194194

195195

196196
(defmulti setup-jar cluster-mode)
197+
(defmulti clean-inbox cluster-mode)
197198

198199
;; status types
199200
;; -- killed (:kill-time-secs)
@@ -640,6 +641,13 @@
640641
(transition! nimbus storm-id :monitor))
641642
(do-cleanup nimbus)
642643
))
644+
;; Schedule Nimbus inbox cleaner
645+
(schedule-recurring (:timer nimbus)
646+
0
647+
(conf NIMBUS-CLEANUP-FREQ-SECS)
648+
(fn []
649+
(clean-inbox conf (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
650+
))
643651
(reify Nimbus$Iface
644652
(^void submitTopology
645653
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
@@ -857,12 +865,34 @@
857865
(FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
858866
))
859867

868+
(defn- file-older-than? [now seconds file]
869+
(<= (+ (.lastModified file) (* seconds 1000)) now))
870+
871+
(defn- is-jar? [f]
872+
(-> f .getName (.endsWith ".jar")))
873+
874+
(defmethod clean-inbox :distributed [conf dir-location seconds]
875+
"Deletes jar files in dir older than seconds."
876+
(let [now (Time/currentTimeMillis)
877+
pred #(and (is-jar? %) (file-older-than? now seconds %))
878+
files (filter pred (file-seq (File. dir-location)))]
879+
(doseq [f files]
880+
(if (.delete f)
881+
(log-message "Cleaning inbox ... deleted: " (.getName f))
882+
;; This should never happen
883+
(log-error "Cleaning inbox ... error deleting: " (.getName f))
884+
))))
885+
860886
;; local implementation
861887

862888
(defmethod setup-jar :local [conf & args]
863889
nil
864890
)
865891

892+
(defmethod clean-inbox :local [conf & args]
893+
nil
894+
)
895+
866896

867897
(defn -main []
868898
(launch-server! (read-storm-config)))

src/jvm/backtype/storm/Config.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,22 @@ public class Config extends HashMap<String, Object> {
105105
*/
106106
public static String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
107107

108+
/**
109+
* How often nimbus should wake the cleanup thread to clean the inbox.
110+
* @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
111+
*/
112+
public static String NIMBUS_CLEANUP_FREQ_SECS = "nimbus.cleanup.freq.secs";
113+
114+
/**
115+
* The length of time a jar file lives in the inbox before being deleted by the cleanup thread.
116+
*
117+
* Probably keep this value greater than or equal to NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS.
118+
* Note that the time it takes to delete an inbox jar file is going to be somewhat more than
119+
* NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often NIMBUS_CLEANUP_FREQ_SECS
120+
* is set to).
121+
* @see NIMBUS_CLEANUP_FREQ_SECS
122+
*/
123+
public static String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
108124

109125
/**
110126
* How long before a supervisor can go without heartbeating before nimbus considers it dead

test/clj/backtype/storm/nimbus_test.clj

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,3 +430,37 @@
430430
;; test that nimbus can die and restart without any problems
431431
)
432432

433+
(deftest test-clean-inbox
434+
"Tests that the inbox correctly cleans jar files."
435+
(with-simulated-time
436+
(with-local-tmp [dir-location]
437+
(let [dir (File. dir-location)
438+
mk-file (fn [name seconds-ago]
439+
(let [f (File. (str dir-location "/" name))
440+
t (- (Time/currentTimeMillis) (* seconds-ago 1000))]
441+
(FileUtils/touch f)
442+
(.setLastModified f t)))
443+
assert-files-in-dir (fn [compare-file-names]
444+
(let [file-names (map #(.getName %) (file-seq dir))]
445+
(is (= (sort compare-file-names)
446+
(sort (filter #(.endsWith % ".jar") file-names))
447+
))))]
448+
;; Make three files a.jar, b.jar, c.jar.
449+
;; a and b are older than c and should be deleted first.
450+
(advance-time-secs! 100)
451+
(doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
452+
(apply mk-file fs))
453+
(assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
454+
(nimbus/clean-inbox {STORM-CLUSTER-MODE "distributed"} dir-location 10)
455+
(assert-files-in-dir ["c.jar"])
456+
;; Cleanit again, c.jar should stay
457+
(advance-time-secs! 5)
458+
(nimbus/clean-inbox {STORM-CLUSTER-MODE "distributed"} dir-location 10)
459+
(assert-files-in-dir ["c.jar"])
460+
;; Advance time, clean again, c.jar should be deleted.
461+
(advance-time-secs! 5)
462+
(nimbus/clean-inbox {STORM-CLUSTER-MODE "distributed"} dir-location 10)
463+
(assert-files-in-dir [])
464+
)))
465+
466+
)

0 commit comments

Comments
 (0)