|
1 |
| -(ns birdwatch.core |
| 1 | +(ns birdwatch.main |
| 2 | + (:gen-class) |
2 | 3 | (:use
|
3 | 4 | [twitter.callbacks]
|
4 | 5 | [twitter.callbacks.handlers]
|
5 | 6 | [twitter.api.streaming]
|
6 | 7 | [clojure.pprint])
|
7 | 8 | (:require
|
8 | 9 | [clojure.edn :as edn]
|
| 10 | + [clojure.string :as str] |
9 | 11 | [clojure.data.json :as json]
|
10 | 12 | [clojure.core.match :as match :refer (match)]
|
11 | 13 |
|
12 | 14 | [http.async.client :as ac]
|
13 | 15 | [twitter.oauth :as oauth]
|
14 | 16 | [twitter-streaming-client.core :as client]
|
15 | 17 |
|
| 18 | + [clj-time.core :as t] |
| 19 | + |
16 | 20 | [org.httpkit.server :as http-kit-server]
|
17 | 21 | [ring.middleware.defaults]
|
18 | 22 | [ring.util.response :refer [resource-response response content-type]]
|
|
123 | 127 |
|
124 | 128 | ;; channels
|
125 | 129 | (def chunk-chan (chan 10000))
|
126 |
| -(def buffer-chan (chan 1)) |
127 |
| -(>!! buffer-chan "") |
128 | 130 | (def tweets-chan (chan))
|
129 | 131 | (def msg-chan (chan))
|
130 | 132 |
|
131 |
| -(def counter-chan (chan 1)) |
132 |
| -(>!! counter-chan 0) |
| 133 | +(def counter (atom 0)) |
| 134 | + |
| 135 | +;; atoms for keeping track of incomplete chunk and last received timestamp |
| 136 | +(def last-received (atom (t/epoch))) |
| 137 | +(def chunk-buff (atom "")) |
133 | 138 |
|
134 | 139 | (defn parse [str]
|
135 | 140 | (try
|
136 |
| - (let [c (inc (<!! counter-chan)) |
| 141 | + (let [c @counter |
137 | 142 | json (json/read-json str)]
|
138 |
| - (when (== (mod c 1000) 0) (log/info "processed" c "since startup, index " (:es-index twitter-conf))) |
| 143 | + (when (== (mod c 1000) 0) (log/info "processed" c "since startup, index" (:es-index twitter-conf))) |
139 | 144 | (if (:text json)
|
140 | 145 | (>!! tweets-chan json)
|
141 | 146 | (>!! msg-chan json))
|
142 |
| - (>!! counter-chan c)) |
| 147 | + (swap! counter inc)) |
143 | 148 | (catch Exception ex (log/error ex "JSON parsing"))))
|
144 | 149 |
|
145 |
| -(go (while true |
146 |
| - (let [chunk (<! chunk-chan) |
147 |
| - buff (<! buffer-chan) |
148 |
| - combined (str buff chunk)] |
149 |
| - (if (and (.endsWith combined "\r\n") (.startsWith combined "{")) |
150 |
| - (do |
151 |
| - (parse combined) |
152 |
| - (>! buffer-chan "")) |
153 |
| - (>! buffer-chan combined))))) |
| 150 | +;; loop for processing chunks from Streaming API |
| 151 | +(go |
| 152 | + (while true |
| 153 | + (let [ch (<! chunk-chan) |
| 154 | + buff @chunk-buff |
| 155 | + combined (str buff ch) |
| 156 | + tweet-strings (str/split-lines combined) |
| 157 | + to-process (butlast tweet-strings) |
| 158 | + last-chunk (last tweet-strings)] |
| 159 | + (reset! last-received (t/now)) |
| 160 | + (if (> (count to-process) 0) |
| 161 | + (do |
| 162 | + (doseq [ts to-process] |
| 163 | + (when (not (str/blank? ts)) |
| 164 | + (parse ts))) |
| 165 | + (reset! chunk-buff last-chunk)) |
| 166 | + (reset! chunk-buff combined))))) |
154 | 167 |
|
155 | 168 | (def conn (esr/connect (:es-address twitter-conf)))
|
156 | 169 |
|
| 170 | +;; loop processing successfully parsed tweets, currently just fanning out to all connected clients |
157 | 171 | (go
|
158 | 172 | (while true
|
159 | 173 | (let [t (<! tweets-chan)]
|
|
165 | 179 | (esd/put conn (:es-index twitter-conf) "tweet" (:id_str t) t)
|
166 | 180 | (catch Exception ex (log/error ex "esd/put error"))))))
|
167 | 181 |
|
| 182 | +;; loop for logging messages from Streaming API other than tweets |
168 | 183 | (go
|
169 | 184 | (while true
|
170 | 185 | (let [m (<! msg-chan)]
|
171 | 186 | (log/info "msg-chan" m))))
|
172 | 187 |
|
173 | 188 | (def ^:dynamic *custom-streaming-callback*
|
174 |
| - (AsyncStreamingCallback. #(>!! chunk-chan (str %2)) (comp println response-return-everything) exception-print)) |
175 |
| - |
| 189 | + (AsyncStreamingCallback. #(>!! chunk-chan (str %2)) |
| 190 | + (comp println response-return-everything) |
| 191 | + exception-print)) |
176 | 192 |
|
177 | 193 | ;; streaming connection with Twitter stored in an Atom, can be started and stopped using
|
178 | 194 | ;; using the start-twitter-conn! and stop-twitter-conn! functions
|
|
181 | 197 | (defn stop-twitter-conn! []
|
182 | 198 | "stop connection to Twitter Streaming API"
|
183 | 199 | (let [m (meta @twitter-conn)]
|
184 |
| - (when m ((:cancel m))))) |
| 200 | + (when m |
| 201 | + (log/info "Stopping Twitter client.") |
| 202 | + ((:cancel m)) |
| 203 | + (reset! chunk-buff "")))) |
185 | 204 |
|
186 | 205 | (defn start-twitter-conn! []
|
187 | 206 | "start connection to Twitter Streaming API"
|
188 |
| - (stop-twitter-conn!) |
| 207 | + (log/info "Starting Twitter client.") |
189 | 208 | (reset! twitter-conn (statuses-filter :params {:track (:track twitter-conf)}
|
190 | 209 | :oauth-creds creds
|
191 | 210 | :callbacks *custom-streaming-callback* )))
|
192 | 211 |
|
| 212 | +;; loop watching the twitter client and restarting it if necessary |
| 213 | +(go |
| 214 | + (while true |
| 215 | + (<! (timeout (* (:tw-check-interval-sec twitter-conf) 1000))) |
| 216 | + (let [now (t/now) |
| 217 | + since-last-sec (t/in-seconds (t/interval @last-received now))] |
| 218 | + (when (> since-last-sec (:tw-check-interval-sec twitter-conf)) |
| 219 | + (do |
| 220 | + (log/error since-last-sec "seconds since last tweet received") |
| 221 | + (stop-twitter-conn!) |
| 222 | + (<! (timeout (* (:tw-check-interval-sec twitter-conf) 1000))) |
| 223 | + (start-twitter-conn!)))))) |
193 | 224 |
|
194 | 225 | (defn -main
|
195 | 226 | [& args]
|
196 |
| - (start-http-server!) |
197 |
| - (start-twitter-conn!)) |
| 227 | + (start-twitter-conn!) |
| 228 | + (start-http-server!)) |
0 commit comments