|
1 |
| -(ns birdwatch.twitterclient |
| 1 | +(ns birdwatch.twitter-client |
2 | 2 | (:gen-class)
|
3 | 3 | (:use
|
4 | 4 | [twitter.callbacks]
|
|
16 | 16 | [http.async.client :as ac]
|
17 | 17 | [twitter.oauth :as oauth]
|
18 | 18 | [twitter-streaming-client.core :as client]
|
19 |
| - [clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go]]) |
| 19 | + [clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go close!]] |
| 20 | + [com.stuartsierra.component :as component]) |
20 | 21 | (:import
|
21 | 22 | (twitter.callbacks.protocols AsyncStreamingCallback)))
|
22 | 23 |
|
23 |
| -(def creds (oauth/make-oauth-creds (:consumer-key conf) (:consumer-secret conf) |
24 |
| - (:user-access-token conf) (:user-access-token-secret conf))) |
| 24 | +(defn- creds [config] (oauth/make-oauth-creds (:consumer-key config) (:consumer-secret config) |
| 25 | + (:user-access-token config) (:user-access-token-secret config))) |
25 | 26 |
|
26 | 27 | ;; channels
|
27 |
| -(def ^:private chunk-chan (chan 10000)) |
| 28 | +(def ^:private chunk-chan (chan)) |
28 | 29 | (def ^:private msg-chan (chan))
|
29 | 30 |
|
30 | 31 | ;; atoms for keeping track of counts, incomplete chunk and last received timestamp
|
31 | 32 | (def ^:private last-received (atom (t/epoch)))
|
32 | 33 | (def ^:private chunk-buff (atom ""))
|
33 | 34 | (def ^:private counter (atom 0))
|
34 | 35 |
|
35 |
| -(defn- parse [str] |
| 36 | +(defn- parse [jstr] |
36 | 37 | (try
|
37 | 38 | (let [c @counter
|
38 |
| - json (json/read-json str)] |
| 39 | + json (json/read-json jstr)] |
39 | 40 | (when (== (mod c 1000) 0) (log/info "processed" c "since startup, index" (:es-index conf)))
|
40 | 41 | (if (:text json)
|
41 | 42 | (>!! c/tweets-chan json)
|
42 | 43 | (>!! msg-chan json))
|
43 | 44 | (swap! counter inc))
|
44 |
| - (catch Exception ex (log/error ex "JSON parsing")))) |
| 45 | + (catch Exception ex (log/error ex "JSON parsing" jstr)))) |
| 46 | + |
| 47 | +;; loop for logging messages from Streaming API other than tweets |
| 48 | +(go |
| 49 | + (while true |
| 50 | + (let [m (<! msg-chan)] |
| 51 | + (log/info "msg-chan" m)))) |
| 52 | + |
| 53 | +(defn- tweet-chunk-callback [] |
| 54 | + (AsyncStreamingCallback. #(>!! chunk-chan (str %2)) |
| 55 | + (comp println response-return-everything) |
| 56 | + exception-print)) |
45 | 57 |
|
46 | 58 | ;; loop for processing chunks from Streaming API
|
47 | 59 | (go
|
|
61 | 73 | (reset! chunk-buff last-chunk))
|
62 | 74 | (reset! chunk-buff combined)))))
|
63 | 75 |
|
64 |
| -;; loop for logging messages from Streaming API other than tweets |
65 |
| -(go |
66 |
| - (while true |
67 |
| - (let [m (<! msg-chan)] |
68 |
| - (log/info "msg-chan" m)))) |
| 76 | +(defrecord Twitterclient [conf connection] |
| 77 | + ;; Implement the Lifecycle protocol |
| 78 | + component/Lifecycle |
69 | 79 |
|
70 |
| -(def ^:dynamic *custom-streaming-callback* |
71 |
| - (AsyncStreamingCallback. #(>!! chunk-chan (str %2)) |
72 |
| - (comp println response-return-everything) |
73 |
| - exception-print)) |
| 80 | + (start [component] |
| 81 | + (log/info "Starting Twitterclient") |
| 82 | + ;; In the 'start' method, initialize this component |
| 83 | + ;; and start it running. For example, connect to a |
| 84 | + ;; database, create thread pools, or initialize shared |
| 85 | + ;; state. |
| 86 | + (let [chunk-chan (chan 10000) |
| 87 | + conn (statuses-filter :params {:track (:track conf)} |
| 88 | + :oauth-creds (creds conf) |
| 89 | + :callbacks (tweet-chunk-callback))] |
| 90 | + ;; Return an updated version of the component with |
| 91 | + ;; the run-time state assoc'd in. |
| 92 | + (assoc component :connection conn))) |
| 93 | + |
| 94 | + (stop [component] |
| 95 | + (log/info "stop connection to Twitter Streaming API") |
| 96 | + ;; In the 'stop' method, shut down the running |
| 97 | + ;; component and release any external resources it has |
| 98 | + ;; acquired. |
| 99 | + (let [m (meta connection)] |
| 100 | + (when m |
| 101 | + ((:cancel m)) |
| 102 | + (reset! chunk-buff ""))) |
| 103 | + ;; Return the component, optionally modified. Remember that if you |
| 104 | + ;; dissoc one of a record's base fields, you get a plain map. |
| 105 | + (assoc component :connection nil))) |
74 | 106 |
|
75 |
| -;; streaming connection with Twitter stored in an Atom, can be started and stopped using |
76 |
| -;; using the start-twitter-conn! and stop-twitter-conn! functions |
77 |
| -(def ^:private twitter-conn (atom {})) |
| 107 | +(defn new-twitterclient [conf] |
| 108 | + (map->Twitterclient {:conf conf})) |
78 | 109 |
|
79 |
| -(defn- stop-twitter-conn! [] |
80 |
| - "stop connection to Twitter Streaming API" |
81 |
| - (let [m (meta @twitter-conn)] |
82 |
| - (when m |
83 |
| - (log/info "Stopping Twitter client.") |
84 |
| - ((:cancel m)) |
85 |
| - (reset! chunk-buff "")))) |
| 110 | +(def twitter-client (new-twitterclient conf)) |
86 | 111 |
|
87 | 112 | (defn start-twitter-conn! []
|
88 |
| - "start connection to Twitter Streaming API" |
89 |
| - (log/info "Starting Twitter client.") |
90 |
| - (reset! twitter-conn (statuses-filter :params {:track (:track conf)} |
91 |
| - :oauth-creds creds |
92 |
| - :callbacks *custom-streaming-callback* ))) |
| 113 | + "start twitter component" |
| 114 | + (alter-var-root #'twitter-client component/start)) |
93 | 115 |
|
94 | 116 | ;; loop watching the twitter client and restarting it if necessary
|
95 | 117 | (defn watch-twitter-conn! []
|
| 118 | + "monitor twitter component, restart when necessary" |
96 | 119 | (go
|
97 | 120 | (while true
|
98 | 121 | (<! (timeout (* (:tw-check-interval-sec conf) 1000)))
|
|
101 | 124 | (when (> since-last-sec (:tw-check-interval-sec conf))
|
102 | 125 | (do
|
103 | 126 | (log/error since-last-sec "seconds since last tweet received")
|
104 |
| - (stop-twitter-conn!) |
| 127 | + (alter-var-root #'twitter-client component/stop) |
105 | 128 | (<! (timeout (* (:tw-check-interval-sec conf) 1000)))
|
106 |
| - (start-twitter-conn!))))))) |
| 129 | + (alter-var-root #'twitter-client component/start))))))) |
0 commit comments