Skip to content

Commit 2a94674

Browse files
committed
transducers for processing stream and logging processed count
1 parent 55efedc commit 2a94674

File tree

2 files changed

+51
-29
lines changed

2 files changed

+51
-29
lines changed

Clojure-Websockets/src/clj/birdwatch/twitter_client.clj

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,16 @@
1010
[twitter.api.streaming :as tas]
1111
[twitter.oauth :as oauth]
1212
[twitter.callbacks.handlers :as tch]
13-
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! take! alts! timeout go go-loop pipe]]
13+
[clojure.core.async :as async :refer [<! <!! >! >!! close! chan put! take! alts! timeout go go-loop pipe]]
1414
[com.stuartsierra.component :as component])
1515
(:import (twitter.callbacks.protocols AsyncStreamingCallback)))
1616

1717
(defn- creds [config] (oauth/make-oauth-creds (:consumer-key config) (:consumer-secret config)
1818
(:user-access-token config) (:user-access-token-secret config)))
1919

20-
(def cat (fn [step] (fn [r x] (reduce step r x))))
21-
22-
(defn- streaming-buffer []
20+
(defn flattening []
2321
(fn [step]
24-
(let [buff (volatile! "")]
25-
(fn [r x]
26-
(let [json-lines (str/split-lines (str/replace (str @buff x) #"\}\{" "}\r\n{"))
27-
to-process (butlast json-lines)]
28-
(vreset! buff (last json-lines))
29-
(if to-process (step r to-process) r))))))
22+
(fn [r x] (reduce step r x))))
3023

3124
(defn mapping [f]
3225
(fn [step]
@@ -36,18 +29,41 @@
3629
(fn [step]
3730
(fn [r x] (if (pred x) (step r x) r))))
3831

39-
(defn tweet? [json]
40-
(let [text (:text json)]
41-
(when-not text (log/error "error-msg" json))
32+
(defn- log-count [last-received]
33+
(fn [step]
34+
(let [cnt (volatile! 0)]
35+
(fn [r x]
36+
(when (== (mod @cnt 1000) 0) (log/info "processed" @cnt "since startup"))
37+
(vswap! cnt inc)
38+
(reset! last-received (t/now))
39+
(step r x)))))
40+
41+
(defn- streaming-buffer []
42+
(fn [step]
43+
(let [buff (volatile! "")]
44+
(fn [r x]
45+
(let [json-lines (str/split-lines (str/replace (str @buff x) #"\}\{" "}\r\n{"))
46+
to-process (butlast json-lines)]
47+
(vreset! buff (last json-lines))
48+
(if to-process (step r to-process) r))))))
49+
50+
(defn- tweet? [data]
51+
(let [text (:text data)]
52+
(when-not text (log/error "error-msg" data))
4253
text))
4354

4455
(defn- parse-json [jstr]
4556
"parse JSON string"
46-
(try (json/read-json jstr)
47-
(catch Exception ex (log/error ex "JSON parsing" jstr) {:ex ex})))
57+
(try (json/read-str jstr :key-fn clojure.core/keyword)
58+
(catch Exception e {:exception e :jstr jstr})))
4859

49-
(defn- process-chunk []
50-
(comp (streaming-buffer) cat (mapping parse-json) (filtering tweet?)))
60+
(defn- process-chunk [last-received]
61+
(comp
62+
(streaming-buffer)
63+
(flattening)
64+
(mapping parse-json)
65+
(filtering tweet?)
66+
(log-count last-received)))
5167

5268
(defn- tweet-chunk-callback [chunk-chan]
5369
(tas/AsyncStreamingCallback. #(>!! chunk-chan (str %2))
@@ -61,35 +77,40 @@
6177
:oauth-creds (creds conf)
6278
:callbacks (tweet-chunk-callback chunk-chan))))
6379

64-
(defn- stop-twitter-conn! [conn]
80+
(defn stop-twitter-conn! [conn]
6581
(let [m (meta @conn)]
6682
(when m (log/info "Stopping Twitter client.")
6783
((:cancel m)))))
6884

69-
(defn run-watch-loop [conf conn chunk-chan last-received watch-active]
85+
(defn- run-watch-loop [conf conn chunk-chan last-received watch-active]
7086
"run loop watching the twitter client and restarting it if necessary"
7187
(reset! watch-active true)
72-
(go-loop [] (<! (timeout 10000)) ;; check connection every 10 seconds
73-
(let [since-last-sec (t/in-seconds (t/interval @last-received (t/now)))]
74-
(when @watch-active
88+
(go-loop [] (<! (timeout (* (:tw-check-interval-sec conf) 1000)))
89+
(let [since-last-sec (t/in-seconds (t/interval @last-received (t/now)))
90+
active @watch-active]
91+
(when active
7592
(when (> since-last-sec (:tw-check-interval-sec conf))
7693
(log/error since-last-sec "seconds since last tweet received")
7794
(stop-twitter-conn! conn)
78-
(<! (timeout (* (:tw-check-interval-sec conf) 1000)))
95+
(<! (timeout (* (:tw-restart-wait conf) 1000)))
7996
(start-twitter-conn! conf conn chunk-chan))
8097
(recur)))))
8198

82-
(defrecord Twitterclient [conf channels conn watch-active]
99+
(defrecord Twitterclient [conf channels conn chunk-chan watch-active]
83100
component/Lifecycle
84101
(start [component] (log/info "Starting Twitterclient Component")
85-
(let [chunk-chan (chan 1 (process-chunk))
86-
counter (atom 0)
102+
(let [last-received (atom (t/epoch))
103+
chunk-chan (chan 1 (process-chunk last-received))
87104
conn (atom {})
88-
last-received (atom (t/epoch))
89105
watch-active (atom false)]
106+
107+
(def connection conn)
108+
(def config conf)
109+
(def channel chunk-chan)
110+
90111
(start-twitter-conn! conf conn chunk-chan)
91112
(pipe chunk-chan (:tweets channels) false)
92-
; (run-watch-loop conf conn chunk-chan last-received watch-active)
113+
(run-watch-loop conf conn chunk-chan last-received watch-active)
93114
(assoc component :conn conn :chunk-chan chunk-chan :watch-active watch-active)))
94115
(stop [component] (log/info "Stopping Twitterclient Component")
95116
(reset! watch-active false)

Clojure-Websockets/twitterconf-tpl.edn

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
:es-cluster-name "<YOUR CLUSTER NAME HERE>"
1414
:es-index "birdwatch"
1515
:track "clojure,love"
16-
:tw-check-interval-sec 120
16+
:tw-check-interval-sec 10
17+
:tw-restart-wait 60
1718
:port 8888
1819
:pidfile-name "birdwatch.pid"}

0 commit comments

Comments
 (0)