Skip to content

Commit 2df3150

Browse files
committed
keeping the Streaming API connection alive / restart on disconnect
1 parent c2b4a38 commit 2df3150

File tree

4 files changed

+60
-27
lines changed

4 files changed

+60
-27
lines changed

Clojure-Websockets/project.clj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
[compojure "1.1.8"]
1515
[ring "1.3.0"]
1616
[ring/ring-defaults "0.1.1"]
17+
[clj-time "0.8.0"]
1718

1819
[org.clojure/clojurescript "0.0-2268"]
1920
[tailrecursion/cljs-priority-map "1.1.0"]
@@ -22,7 +23,7 @@
2223

2324
:source-paths ["src/clj/"]
2425

25-
:main ^:skip-aot birdwatch.core
26+
:main ^:skip-aot birdwatch.main
2627
:target-path "target/%s"
2728
:profiles {:uberjar {:aot :all}}
2829

Clojure-Websockets/src/clj/birdwatch/core.clj renamed to Clojure-Websockets/src/clj/birdwatch/main.clj

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
1-
(ns birdwatch.core
1+
(ns birdwatch.main
2+
(:gen-class)
23
(:use
34
[twitter.callbacks]
45
[twitter.callbacks.handlers]
56
[twitter.api.streaming]
67
[clojure.pprint])
78
(:require
89
[clojure.edn :as edn]
10+
[clojure.string :as str]
911
[clojure.data.json :as json]
1012
[clojure.core.match :as match :refer (match)]
1113

1214
[http.async.client :as ac]
1315
[twitter.oauth :as oauth]
1416
[twitter-streaming-client.core :as client]
1517

18+
[clj-time.core :as t]
19+
1620
[org.httpkit.server :as http-kit-server]
1721
[ring.middleware.defaults]
1822
[ring.util.response :refer [resource-response response content-type]]
@@ -123,37 +127,47 @@
123127

124128
;; channels
125129
(def chunk-chan (chan 10000))
126-
(def buffer-chan (chan 1))
127-
(>!! buffer-chan "")
128130
(def tweets-chan (chan))
129131
(def msg-chan (chan))
130132

131-
(def counter-chan (chan 1))
132-
(>!! counter-chan 0)
133+
(def counter (atom 0))
134+
135+
;; atoms for keeping track of incomplete chunk and last received timestamp
136+
(def last-received (atom (t/epoch)))
137+
(def chunk-buff (atom ""))
133138

134139
(defn parse [str]
135140
(try
136-
(let [c (inc (<!! counter-chan))
141+
(let [c @counter
137142
json (json/read-json str)]
138-
(when (== (mod c 1000) 0) (log/info "processed" c "since startup, index " (:es-index twitter-conf)))
143+
(when (== (mod c 1000) 0) (log/info "processed" c "since startup, index" (:es-index twitter-conf)))
139144
(if (:text json)
140145
(>!! tweets-chan json)
141146
(>!! msg-chan json))
142-
(>!! counter-chan c))
147+
(swap! counter inc))
143148
(catch Exception ex (log/error ex "JSON parsing"))))
144149

145-
(go (while true
146-
(let [chunk (<! chunk-chan)
147-
buff (<! buffer-chan)
148-
combined (str buff chunk)]
149-
(if (and (.endsWith combined "\r\n") (.startsWith combined "{"))
150-
(do
151-
(parse combined)
152-
(>! buffer-chan ""))
153-
(>! buffer-chan combined)))))
150+
;; loop for processing chunks from Streaming API
151+
(go
152+
(while true
153+
(let [ch (<! chunk-chan)
154+
buff @chunk-buff
155+
combined (str buff ch)
156+
tweet-strings (str/split-lines combined)
157+
to-process (butlast tweet-strings)
158+
last-chunk (last tweet-strings)]
159+
(reset! last-received (t/now))
160+
(if (> (count to-process) 0)
161+
(do
162+
(doseq [ts to-process]
163+
(when (not (str/blank? ts))
164+
(parse ts)))
165+
(reset! chunk-buff last-chunk))
166+
(reset! chunk-buff combined)))))
154167

155168
(def conn (esr/connect (:es-address twitter-conf)))
156169

170+
;; loop processing successfully parsed tweets, currently just fanning out to all connected clients
157171
(go
158172
(while true
159173
(let [t (<! tweets-chan)]
@@ -165,14 +179,16 @@
165179
(esd/put conn (:es-index twitter-conf) "tweet" (:id_str t) t)
166180
(catch Exception ex (log/error ex "esd/put error"))))))
167181

182+
;; loop for logging messages from Streaming API other than tweets
168183
(go
169184
(while true
170185
(let [m (<! msg-chan)]
171186
(log/info "msg-chan" m))))
172187

173188
(def ^:dynamic *custom-streaming-callback*
174-
(AsyncStreamingCallback. #(>!! chunk-chan (str %2)) (comp println response-return-everything) exception-print))
175-
189+
(AsyncStreamingCallback. #(>!! chunk-chan (str %2))
190+
(comp println response-return-everything)
191+
exception-print))
176192

177193
;; streaming connection with Twitter stored in an Atom, can be started and stopped using
178194
;; using the start-twitter-conn! and stop-twitter-conn! functions
@@ -181,17 +197,32 @@
181197
(defn stop-twitter-conn! []
182198
"stop connection to Twitter Streaming API"
183199
(let [m (meta @twitter-conn)]
184-
(when m ((:cancel m)))))
200+
(when m
201+
(log/info "Stopping Twitter client.")
202+
((:cancel m))
203+
(reset! chunk-buff ""))))
185204

186205
(defn start-twitter-conn! []
187206
"start connection to Twitter Streaming API"
188-
(stop-twitter-conn!)
207+
(log/info "Starting Twitter client.")
189208
(reset! twitter-conn (statuses-filter :params {:track (:track twitter-conf)}
190209
:oauth-creds creds
191210
:callbacks *custom-streaming-callback* )))
192211

212+
;; loop watching the twitter client and restarting it if necessary
213+
(go
214+
(while true
215+
(<! (timeout (* (:tw-check-interval-sec twitter-conf) 1000)))
216+
(let [now (t/now)
217+
since-last-sec (t/in-seconds (t/interval @last-received now))]
218+
(when (> since-last-sec (:tw-check-interval-sec twitter-conf))
219+
(do
220+
(log/error since-last-sec "seconds since last tweet received")
221+
(stop-twitter-conn!)
222+
(<! (timeout (* (:tw-check-interval-sec twitter-conf) 1000)))
223+
(start-twitter-conn!))))))
193224

194225
(defn -main
195226
[& args]
196-
(start-http-server!)
197-
(start-twitter-conn!))
227+
(start-twitter-conn!)
228+
(start-http-server!))

Clojure-Websockets/twitterconf-tpl.edn

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@
66
:user-access-token-secret "<YOUR ACCESS TOKEN SECRET HERE>"
77
:es-address "http://127.0.0.1:9200"
88
:es-index "birdwatch"
9-
:track "clojure,love" }
9+
:track "clojure,love"
10+
:tw-check-interval-sec 120
11+
:port 8888 }

Scala-Play-SSE/public/stylesheets/main.css

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ a {
6969
font-size: 11px;
7070
font-weight: normal;
7171
color: #999;
72+
cursor: pointer;
7273
}
7374

7475
.pagination-mini > li > a:hover,
@@ -83,7 +84,6 @@ a {
8384
.pagination-mini > .active > span {
8485
background: #428bca;
8586
color: #fff;
86-
cursor: default;
8787
}
8888

8989
#tweet-frame {
@@ -103,7 +103,6 @@ a {
103103
.toggleActive {
104104
background: #428bca;
105105
color: #fff;
106-
cursor: default;
107106
}
108107

109108
.container {

0 commit comments

Comments
 (0)