|
10 | 10 | [twitter.api.streaming :as tas]
|
11 | 11 | [twitter.oauth :as oauth]
|
12 | 12 | [twitter.callbacks.handlers :as tch]
|
13 |
| - [clojure.core.async :as async :refer [<! <!! >! >!! chan put! take! alts! timeout go go-loop pipe]] |
| 13 | + [clojure.core.async :as async :refer [<! <!! >! >!! close! chan put! take! alts! timeout go go-loop pipe]] |
14 | 14 | [com.stuartsierra.component :as component])
|
15 | 15 | (:import (twitter.callbacks.protocols AsyncStreamingCallback)))
|
16 | 16 |
|
17 | 17 | (defn- creds [config] (oauth/make-oauth-creds (:consumer-key config) (:consumer-secret config)
|
18 | 18 | (:user-access-token config) (:user-access-token-secret config)))
|
19 | 19 |
|
20 |
| -(def cat (fn [step] (fn [r x] (reduce step r x)))) |
21 |
| - |
22 |
| -(defn- streaming-buffer [] |
| 20 | +(defn flattening [] |
23 | 21 | (fn [step]
|
24 |
| - (let [buff (volatile! "")] |
25 |
| - (fn [r x] |
26 |
| - (let [json-lines (str/split-lines (str/replace (str @buff x) #"\}\{" "}\r\n{")) |
27 |
| - to-process (butlast json-lines)] |
28 |
| - (vreset! buff (last json-lines)) |
29 |
| - (if to-process (step r to-process) r)))))) |
| 22 | + (fn [r x] (reduce step r x)))) |
30 | 23 |
|
31 | 24 | (defn mapping [f]
|
32 | 25 | (fn [step]
|
|
36 | 29 | (fn [step]
|
37 | 30 | (fn [r x] (if (pred x) (step r x) r))))
|
38 | 31 |
|
39 |
| -(defn tweet? [json] |
40 |
| - (let [text (:text json)] |
41 |
| - (when-not text (log/error "error-msg" json)) |
| 32 | +(defn- log-count [last-received] |
| 33 | + (fn [step] |
| 34 | + (let [cnt (volatile! 0)] |
| 35 | + (fn [r x] |
| 36 | + (when (== (mod @cnt 1000) 0) (log/info "processed" @cnt "since startup")) |
| 37 | + (vswap! cnt inc) |
| 38 | + (reset! last-received (t/now)) |
| 39 | + (step r x))))) |
| 40 | + |
| 41 | +(defn- streaming-buffer [] |
| 42 | + (fn [step] |
| 43 | + (let [buff (volatile! "")] |
| 44 | + (fn [r x] |
| 45 | + (let [json-lines (str/split-lines (str/replace (str @buff x) #"\}\{" "}\r\n{")) |
| 46 | + to-process (butlast json-lines)] |
| 47 | + (vreset! buff (last json-lines)) |
| 48 | + (if to-process (step r to-process) r)))))) |
| 49 | + |
| 50 | +(defn- tweet? [data] |
| 51 | + (let [text (:text data)] |
| 52 | + (when-not text (log/error "error-msg" data)) |
42 | 53 | text))
|
43 | 54 |
|
44 | 55 | (defn- parse-json [jstr]
|
45 | 56 | "parse JSON string"
|
46 |
| - (try (json/read-json jstr) |
47 |
| - (catch Exception ex (log/error ex "JSON parsing" jstr) {:ex ex}))) |
| 57 | + (try (json/read-str jstr :key-fn clojure.core/keyword) |
| 58 | + (catch Exception e {:exception e :jstr jstr}))) |
48 | 59 |
|
49 |
| -(defn- process-chunk [] |
50 |
| - (comp (streaming-buffer) cat (mapping parse-json) (filtering tweet?))) |
| 60 | +(defn- process-chunk [last-received] |
| 61 | + (comp |
| 62 | + (streaming-buffer) |
| 63 | + (flattening) |
| 64 | + (mapping parse-json) |
| 65 | + (filtering tweet?) |
| 66 | + (log-count last-received))) |
51 | 67 |
|
52 | 68 | (defn- tweet-chunk-callback [chunk-chan]
|
53 | 69 | (tas/AsyncStreamingCallback. #(>!! chunk-chan (str %2))
|
|
61 | 77 | :oauth-creds (creds conf)
|
62 | 78 | :callbacks (tweet-chunk-callback chunk-chan))))
|
63 | 79 |
|
64 |
| -(defn- stop-twitter-conn! [conn] |
| 80 | +(defn stop-twitter-conn! [conn] |
65 | 81 | (let [m (meta @conn)]
|
66 | 82 | (when m (log/info "Stopping Twitter client.")
|
67 | 83 | ((:cancel m)))))
|
68 | 84 |
|
69 |
| -(defn run-watch-loop [conf conn chunk-chan last-received watch-active] |
| 85 | +(defn- run-watch-loop [conf conn chunk-chan last-received watch-active] |
70 | 86 | "run loop watching the twitter client and restarting it if necessary"
|
71 | 87 | (reset! watch-active true)
|
72 |
| - (go-loop [] (<! (timeout 10000)) ;; check connection every 10 seconds |
73 |
| - (let [since-last-sec (t/in-seconds (t/interval @last-received (t/now)))] |
74 |
| - (when @watch-active |
| 88 | + (go-loop [] (<! (timeout (* (:tw-check-interval-sec conf) 1000))) |
| 89 | + (let [since-last-sec (t/in-seconds (t/interval @last-received (t/now))) |
| 90 | + active @watch-active] |
| 91 | + (when active |
75 | 92 | (when (> since-last-sec (:tw-check-interval-sec conf))
|
76 | 93 | (log/error since-last-sec "seconds since last tweet received")
|
77 | 94 | (stop-twitter-conn! conn)
|
78 |
| - (<! (timeout (* (:tw-check-interval-sec conf) 1000))) |
| 95 | + (<! (timeout (* (:tw-restart-wait conf) 1000))) |
79 | 96 | (start-twitter-conn! conf conn chunk-chan))
|
80 | 97 | (recur)))))
|
81 | 98 |
|
82 |
| -(defrecord Twitterclient [conf channels conn watch-active] |
| 99 | +(defrecord Twitterclient [conf channels conn chunk-chan watch-active] |
83 | 100 | component/Lifecycle
|
84 | 101 | (start [component] (log/info "Starting Twitterclient Component")
|
85 |
| - (let [chunk-chan (chan 1 (process-chunk)) |
86 |
| - counter (atom 0) |
| 102 | + (let [last-received (atom (t/epoch)) |
| 103 | + chunk-chan (chan 1 (process-chunk last-received)) |
87 | 104 | conn (atom {})
|
88 |
| - last-received (atom (t/epoch)) |
89 | 105 | watch-active (atom false)]
|
| 106 | + |
| 107 | + (def connection conn) |
| 108 | + (def config conf) |
| 109 | + (def channel chunk-chan) |
| 110 | + |
90 | 111 | (start-twitter-conn! conf conn chunk-chan)
|
91 | 112 | (pipe chunk-chan (:tweets channels) false)
|
92 |
| - ; (run-watch-loop conf conn chunk-chan last-received watch-active) |
| 113 | + (run-watch-loop conf conn chunk-chan last-received watch-active) |
93 | 114 | (assoc component :conn conn :chunk-chan chunk-chan :watch-active watch-active)))
|
94 | 115 | (stop [component] (log/info "Stopping Twitterclient Component")
|
95 | 116 | (reset! watch-active false)
|
|
0 commit comments