Skip to content

Commit 53ce477

Browse files
author
Nathan Marz
committed
simplify inbox cleaning code
1 parent f226695 commit 53ce477

File tree

2 files changed

+19
-26
lines changed

2 files changed

+19
-26
lines changed

src/clj/backtype/storm/daemon/nimbus.clj

+16-23
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,21 @@
617617
(swap! (:task-heartbeats-cache nimbus) dissoc id))
618618
))))
619619

620+
(defn- file-older-than? [now seconds file]
621+
(<= (+ (.lastModified file) (to-millis seconds)) (to-millis now)))
622+
623+
(defn clean-inbox [dir-location seconds]
624+
"Deletes jar files in dir older than seconds."
625+
(let [now (current-time-secs)
626+
pred #(and (.isFile %) (file-older-than? now seconds %))
627+
files (filter pred (file-seq (File. dir-location)))]
628+
(doseq [f files]
629+
(if (.delete f)
630+
(log-message "Cleaning inbox ... deleted: " (.getName f))
631+
;; This should never happen
632+
(log-error "Cleaning inbox ... error deleting: " (.getName f))
633+
))))
634+
620635
(defn cleanup-corrupt-topologies! [nimbus]
621636
(let [storm-cluster-state (:storm-cluster-state nimbus)
622637
code-ids (set (code-ids (:conf nimbus)))
@@ -646,7 +661,7 @@
646661
0
647662
(conf NIMBUS-CLEANUP-FREQ-SECS)
648663
(fn []
649-
(clean-inbox conf (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
664+
(clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
650665
))
651666
(reify Nimbus$Iface
652667
(^void submitTopology
@@ -865,34 +880,12 @@
865880
(FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
866881
))
867882

868-
(defn- file-older-than? [now seconds file]
869-
(<= (+ (.lastModified file) (* seconds 1000)) now))
870-
871-
(defn- is-jar? [f]
872-
(-> f .getName (.endsWith ".jar")))
873-
874-
(defmethod clean-inbox :distributed [conf dir-location seconds]
875-
"Deletes jar files in dir older than seconds."
876-
(let [now (Time/currentTimeMillis)
877-
pred #(and (is-jar? %) (file-older-than? now seconds %))
878-
files (filter pred (file-seq (File. dir-location)))]
879-
(doseq [f files]
880-
(if (.delete f)
881-
(log-message "Cleaning inbox ... deleted: " (.getName f))
882-
;; This should never happen
883-
(log-error "Cleaning inbox ... error deleting: " (.getName f))
884-
))))
885-
886883
;; local implementation
887884

888885
(defmethod setup-jar :local [conf & args]
889886
nil
890887
)
891888

892-
(defmethod clean-inbox :local [conf & args]
893-
nil
894-
)
895-
896889

897890
(defn -main []
898891
(launch-server! (read-storm-config)))

test/clj/backtype/storm/nimbus_test.clj

+3-3
Original file line numberDiff line numberDiff line change
@@ -451,15 +451,15 @@
451451
(doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
452452
(apply mk-file fs))
453453
(assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
454-
(nimbus/clean-inbox {STORM-CLUSTER-MODE "distributed"} dir-location 10)
454+
(nimbus/clean-inbox dir-location 10)
455455
(assert-files-in-dir ["c.jar"])
456456
;; Cleanit again, c.jar should stay
457457
(advance-time-secs! 5)
458-
(nimbus/clean-inbox {STORM-CLUSTER-MODE "distributed"} dir-location 10)
458+
(nimbus/clean-inbox dir-location 10)
459459
(assert-files-in-dir ["c.jar"])
460460
;; Advance time, clean again, c.jar should be deleted.
461461
(advance-time-secs! 5)
462-
(nimbus/clean-inbox {STORM-CLUSTER-MODE "distributed"} dir-location 10)
462+
(nimbus/clean-inbox dir-location 10)
463463
(assert-files-in-dir [])
464464
)))
465465

0 commit comments

Comments
 (0)