Skip to content

Commit 4865737

Browse files
committed
initial attempt at using Stuart Sierra's component library
1 parent 3c6b884 commit 4865737

File tree

4 files changed

+65
-41
lines changed

4 files changed

+65
-41
lines changed

Clojure-Websockets/project.clj

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
[om "0.7.3"]
2424
[com.cognitect/transit-clj "0.8.247"]
2525
[com.cognitect/transit-cljs "0.8.188"]
26-
[clj-pid "0.1.1"]]
26+
[clj-pid "0.1.1"]
27+
[com.stuartsierra/component "0.2.2"]]
2728

2829
:source-paths ["src/clj/"]
2930

Clojure-Websockets/src/clj/birdwatch/communicator.clj

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
(:gen-class)
33
(:use [birdwatch.conf])
44
(:require
5-
[birdwatch.twitterclient :as tc]
65
[birdwatch.channels :as c]
76
[birdwatch.atoms :as a]
87
[birdwatch.persistence :as p]

Clojure-Websockets/src/clj/birdwatch/main.clj

+6-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
(:gen-class)
33
(:use [birdwatch.conf])
44
(:require
5-
[birdwatch.twitterclient :as tc]
5+
[birdwatch.twitter-client :as tc]
66
[birdwatch.channels :as c]
77
[birdwatch.communicator :as comm]
88
[birdwatch.atoms :as a]
@@ -21,7 +21,8 @@
2121
[compojure.route :as route]
2222
[taoensso.sente :as sente]
2323
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go]]
24-
[clj-pid.core :as pid]))
24+
[clj-pid.core :as pid]
25+
[com.stuartsierra.component :as component]))
2526

2627
(defroutes my-routes
2728
(GET "/" [] (content-type
@@ -55,12 +56,12 @@
5556
(reset! http-server_ s)))
5657

5758
(def pid-file "birdwatch.pid")
58-
(pid/save pid-file)
59-
(pid/delete-on-shutdown! pid-file)
60-
(log/info "Application started, PID" (pid/current))
6159

6260
(defn -main
6361
[& args]
62+
(pid/save pid-file)
63+
(pid/delete-on-shutdown! pid-file)
64+
(log/info "Application started, PID" (pid/current))
6465
(tc/start-twitter-conn!)
6566
(tc/watch-twitter-conn!)
6667
(start-http-server!))
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
(ns birdwatch.twitterclient
1+
(ns birdwatch.twitter-client
22
(:gen-class)
33
(:use
44
[twitter.callbacks]
@@ -16,32 +16,44 @@
1616
[http.async.client :as ac]
1717
[twitter.oauth :as oauth]
1818
[twitter-streaming-client.core :as client]
19-
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go]])
19+
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go close!]]
20+
[com.stuartsierra.component :as component])
2021
(:import
2122
(twitter.callbacks.protocols AsyncStreamingCallback)))
2223

23-
(def creds (oauth/make-oauth-creds (:consumer-key conf) (:consumer-secret conf)
24-
(:user-access-token conf) (:user-access-token-secret conf)))
24+
(defn- creds [config] (oauth/make-oauth-creds (:consumer-key config) (:consumer-secret config)
25+
(:user-access-token config) (:user-access-token-secret config)))
2526

2627
;; channels
27-
(def ^:private chunk-chan (chan 10000))
28+
(def ^:private chunk-chan (chan))
2829
(def ^:private msg-chan (chan))
2930

3031
;; atoms for keeping track of counts, incomplete chunk and last received timestamp
3132
(def ^:private last-received (atom (t/epoch)))
3233
(def ^:private chunk-buff (atom ""))
3334
(def ^:private counter (atom 0))
3435

35-
(defn- parse [str]
36+
(defn- parse [jstr]
3637
(try
3738
(let [c @counter
38-
json (json/read-json str)]
39+
json (json/read-json jstr)]
3940
(when (== (mod c 1000) 0) (log/info "processed" c "since startup, index" (:es-index conf)))
4041
(if (:text json)
4142
(>!! c/tweets-chan json)
4243
(>!! msg-chan json))
4344
(swap! counter inc))
44-
(catch Exception ex (log/error ex "JSON parsing"))))
45+
(catch Exception ex (log/error ex "JSON parsing" jstr))))
46+
47+
;; loop for logging messages from Streaming API other than tweets
48+
(go
49+
(while true
50+
(let [m (<! msg-chan)]
51+
(log/info "msg-chan" m))))
52+
53+
(defn- tweet-chunk-callback []
54+
(AsyncStreamingCallback. #(>!! chunk-chan (str %2))
55+
(comp println response-return-everything)
56+
exception-print))
4557

4658
;; loop for processing chunks from Streaming API
4759
(go
@@ -61,38 +73,49 @@
6173
(reset! chunk-buff last-chunk))
6274
(reset! chunk-buff combined)))))
6375

64-
;; loop for logging messages from Streaming API other than tweets
65-
(go
66-
(while true
67-
(let [m (<! msg-chan)]
68-
(log/info "msg-chan" m))))
76+
(defrecord Twitterclient [conf connection]
77+
;; Implement the Lifecycle protocol
78+
component/Lifecycle
6979

70-
(def ^:dynamic *custom-streaming-callback*
71-
(AsyncStreamingCallback. #(>!! chunk-chan (str %2))
72-
(comp println response-return-everything)
73-
exception-print))
80+
(start [component]
81+
(log/info "Starting Twitterclient")
82+
;; In the 'start' method, initialize this component
83+
;; and start it running. For example, connect to a
84+
;; database, create thread pools, or initialize shared
85+
;; state.
86+
(let [chunk-chan (chan 10000)
87+
conn (statuses-filter :params {:track (:track conf)}
88+
:oauth-creds (creds conf)
89+
:callbacks (tweet-chunk-callback))]
90+
;; Return an updated version of the component with
91+
;; the run-time state assoc'd in.
92+
(assoc component :connection conn)))
93+
94+
(stop [component]
95+
(log/info "stop connection to Twitter Streaming API")
96+
;; In the 'stop' method, shut down the running
97+
;; component and release any external resources it has
98+
;; acquired.
99+
(let [m (meta connection)]
100+
(when m
101+
((:cancel m))
102+
(reset! chunk-buff "")))
103+
;; Return the component, optionally modified. Remember that if you
104+
;; dissoc one of a record's base fields, you get a plain map.
105+
(assoc component :connection nil)))
74106

75-
;; streaming connection with Twitter stored in an Atom, can be started and stopped using
76-
;; using the start-twitter-conn! and stop-twitter-conn! functions
77-
(def ^:private twitter-conn (atom {}))
107+
(defn new-twitterclient [conf]
108+
(map->Twitterclient {:conf conf}))
78109

79-
(defn- stop-twitter-conn! []
80-
"stop connection to Twitter Streaming API"
81-
(let [m (meta @twitter-conn)]
82-
(when m
83-
(log/info "Stopping Twitter client.")
84-
((:cancel m))
85-
(reset! chunk-buff ""))))
110+
(def twitter-client (new-twitterclient conf))
86111

87112
(defn start-twitter-conn! []
88-
"start connection to Twitter Streaming API"
89-
(log/info "Starting Twitter client.")
90-
(reset! twitter-conn (statuses-filter :params {:track (:track conf)}
91-
:oauth-creds creds
92-
:callbacks *custom-streaming-callback* )))
113+
"start twitter component"
114+
(alter-var-root #'twitter-client component/start))
93115

94116
;; loop watching the twitter client and restarting it if necessary
95117
(defn watch-twitter-conn! []
118+
"monitor twitter component, restart when necessary"
96119
(go
97120
(while true
98121
(<! (timeout (* (:tw-check-interval-sec conf) 1000)))
@@ -101,6 +124,6 @@
101124
(when (> since-last-sec (:tw-check-interval-sec conf))
102125
(do
103126
(log/error since-last-sec "seconds since last tweet received")
104-
(stop-twitter-conn!)
127+
(alter-var-root #'twitter-client component/stop)
105128
(<! (timeout (* (:tw-check-interval-sec conf) 1000)))
106-
(start-twitter-conn!)))))))
129+
(alter-var-root #'twitter-client component/start)))))))

0 commit comments

Comments
 (0)