|
617 | 617 | (swap! (:task-heartbeats-cache nimbus) dissoc id))
|
618 | 618 | ))))
|
619 | 619 |
|
| 620 | +(defn- file-older-than? [now seconds file] |
| 621 | + (<= (+ (.lastModified file) (to-millis seconds)) (to-millis now))) |
| 622 | + |
| 623 | +(defn clean-inbox [dir-location seconds] |
| 624 | + "Deletes jar files in dir older than seconds." |
| 625 | + (let [now (current-time-secs) |
| 626 | + pred #(and (.isFile %) (file-older-than? now seconds %)) |
| 627 | + files (filter pred (file-seq (File. dir-location)))] |
| 628 | + (doseq [f files] |
| 629 | + (if (.delete f) |
| 630 | + (log-message "Cleaning inbox ... deleted: " (.getName f)) |
| 631 | + ;; This should never happen |
| 632 | + (log-error "Cleaning inbox ... error deleting: " (.getName f)) |
| 633 | + )))) |
| 634 | + |
620 | 635 | (defn cleanup-corrupt-topologies! [nimbus]
|
621 | 636 | (let [storm-cluster-state (:storm-cluster-state nimbus)
|
622 | 637 | code-ids (set (code-ids (:conf nimbus)))
|
|
646 | 661 | 0
|
647 | 662 | (conf NIMBUS-CLEANUP-FREQ-SECS)
|
648 | 663 | (fn []
|
649 |
| - (clean-inbox conf (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)) |
| 664 | + (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)) |
650 | 665 | ))
|
651 | 666 | (reify Nimbus$Iface
|
652 | 667 | (^void submitTopology
|
|
865 | 880 | (FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
|
866 | 881 | ))
|
867 | 882 |
|
868 |
| -(defn- file-older-than? [now seconds file] |
869 |
| - (<= (+ (.lastModified file) (* seconds 1000)) now)) |
870 |
| - |
871 |
| -(defn- is-jar? [f] |
872 |
| - (-> f .getName (.endsWith ".jar"))) |
873 |
| - |
874 |
| -(defmethod clean-inbox :distributed [conf dir-location seconds] |
875 |
| - "Deletes jar files in dir older than seconds." |
876 |
| - (let [now (Time/currentTimeMillis) |
877 |
| - pred #(and (is-jar? %) (file-older-than? now seconds %)) |
878 |
| - files (filter pred (file-seq (File. dir-location)))] |
879 |
| - (doseq [f files] |
880 |
| - (if (.delete f) |
881 |
| - (log-message "Cleaning inbox ... deleted: " (.getName f)) |
882 |
| - ;; This should never happen |
883 |
| - (log-error "Cleaning inbox ... error deleting: " (.getName f)) |
884 |
| - )))) |
885 |
| - |
886 | 883 | ;; local implementation
|
887 | 884 |
|
888 | 885 | (defmethod setup-jar :local [conf & args]
|
889 | 886 | nil
|
890 | 887 | )
|
891 | 888 |
|
892 |
| -(defmethod clean-inbox :local [conf & args] |
893 |
| - nil |
894 |
| - ) |
895 |
| - |
896 | 889 |
|
897 | 890 | (defn -main []
|
898 | 891 | (launch-server! (read-storm-config)))
|
0 commit comments