Skip to content

Commit 90642e0

Browse files
committed
twitterclient namespace structure
1 parent 2a94674 commit 90642e0

File tree

5 files changed

+140
-130
lines changed

5 files changed

+140
-130
lines changed

Clojure-Websockets/src/clj/birdwatch/main.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(ns birdwatch.main
22
(:gen-class)
33
(:require
4-
[birdwatch.twitter-client :as tc]
4+
[birdwatch.twitterclient.core :as tc]
55
[birdwatch.communicator :as comm]
66
[birdwatch.persistence :as p]
77
[birdwatch.percolator :as perc]

Clojure-Websockets/src/clj/birdwatch/twitter_client.clj

Lines changed: 0 additions & 129 deletions
This file was deleted.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
(ns birdwatch.twitterclient.core
2+
(:gen-class)
3+
(:require
4+
[clojure.string :as str]
5+
[birdwatch.twitterclient.http :as http-client]
6+
[birdwatch.twitterclient.processing :as processing]
7+
[clojure.data.json :as json]
8+
[clj-time.core :as t]
9+
[clojure.pprint :as pp]
10+
[clojure.tools.logging :as log]
11+
[clojure.core.async :as async :refer [chan go-loop pipe]]
12+
[com.stuartsierra.component :as component]))
13+
14+
(defrecord Twitterclient [conf channels conn chunk-chan watch-active]
15+
component/Lifecycle
16+
(start [component] (log/info "Starting Twitterclient Component")
17+
(let [last-received (atom (t/epoch))
18+
chunk-chan (chan 1 (processing/process-chunk last-received))
19+
conn (atom {})
20+
watch-active (atom false)]
21+
(http-client/start-twitter-conn! conf conn chunk-chan)
22+
(pipe chunk-chan (:tweets channels) false)
23+
(http-client/run-watch-loop conf conn chunk-chan last-received watch-active)
24+
(assoc component :conn conn :chunk-chan chunk-chan :watch-active watch-active)))
25+
(stop [component] (log/info "Stopping Twitterclient Component")
26+
(reset! watch-active false)
27+
(http-client/stop-twitter-conn! conn)
28+
(assoc component :conn nil :chunk-chan nil :watch-active nil)))
29+
30+
(defn new-twitterclient [conf] (map->Twitterclient {:conf conf}))
31+
32+
(defrecord Twitterclient-Channels []
33+
component/Lifecycle
34+
(start [component] (log/info "Starting Twitterclient Channels Component")
35+
(assoc component :tweets (chan))) ; channel for new tweets received from streaming API
36+
(stop [component] (log/info "Stop Twitterclient Channels Component")
37+
(assoc component :tweets nil)))
38+
39+
(defn new-twitterclient-channels [] (map->Twitterclient-Channels {}))
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
(ns birdwatch.twitterclient.http
2+
(:gen-class)
3+
(:require
4+
[clojure.string :as str]
5+
[clojure.tools.logging :as log]
6+
[twitter.api.streaming :as tas]
7+
[twitter.oauth :as oauth]
8+
[clj-time.core :as t]
9+
[twitter.callbacks.handlers :as tch]
10+
[clojure.core.async :as async :refer [>!! <! timeout go-loop]])
11+
(:import (twitter.callbacks.protocols AsyncStreamingCallback)))
12+
13+
(defn- creds [config] (oauth/make-oauth-creds (:consumer-key config) (:consumer-secret config)
14+
(:user-access-token config) (:user-access-token-secret config)))
15+
16+
(defn- tweet-chunk-callback [chunk-chan]
17+
(tas/AsyncStreamingCallback. #(>!! chunk-chan (str %2))
18+
(comp println tch/response-return-everything)
19+
tch/exception-print))
20+
21+
(defn start-twitter-conn! [conf conn chunk-chan]
22+
(log/info "Starting Twitter client.")
23+
(reset! conn (tas/statuses-filter
24+
:params {:track (:track conf)}
25+
:oauth-creds (creds conf)
26+
:callbacks (tweet-chunk-callback chunk-chan))))
27+
28+
(defn stop-twitter-conn! [conn]
29+
(let [m (meta @conn)]
30+
(when m (log/info "Stopping Twitter client.")
31+
((:cancel m)))))
32+
33+
(defn run-watch-loop [conf conn chunk-chan last-received watch-active]
34+
"run loop watching the twitter client and restarting it if necessary"
35+
(reset! watch-active true)
36+
(go-loop [] (<! (timeout (* (:tw-check-interval-sec conf) 1000)))
37+
(let [since-last-sec (t/in-seconds (t/interval @last-received (t/now)))
38+
active @watch-active]
39+
(when active
40+
(when (> since-last-sec (:tw-check-interval-sec conf))
41+
(log/error since-last-sec "seconds since last tweet received")
42+
(stop-twitter-conn! conn)
43+
(<! (timeout (* (:tw-restart-wait conf) 1000)))
44+
(start-twitter-conn! conf conn chunk-chan))
45+
(recur)))))
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
(ns birdwatch.twitterclient.processing
2+
(:gen-class)
3+
(:require
4+
[clojure.string :as str]
5+
[clojure.data.json :as json]
6+
[clj-time.core :as t]
7+
[clojure.tools.logging :as log]))
8+
9+
(defn flattening []
10+
(fn [step]
11+
(fn [r x] (reduce step r x))))
12+
13+
(defn mapping [f]
14+
(fn [step]
15+
(fn [r x] (step r (f x)))))
16+
17+
(defn filtering [pred]
18+
(fn [step]
19+
(fn [r x] (if (pred x) (step r x) r))))
20+
21+
(defn- log-count [last-received]
22+
(fn [step]
23+
(let [cnt (volatile! 0)]
24+
(fn [r x]
25+
(when (== (mod @cnt 1000) 0) (log/info "processed" @cnt "since startup"))
26+
(vswap! cnt inc)
27+
(reset! last-received (t/now))
28+
(step r x)))))
29+
30+
(defn- streaming-buffer []
31+
(fn [step]
32+
(let [buff (volatile! "")]
33+
(fn [r x]
34+
(let [json-lines (str/split-lines (str/replace (str @buff x) #"\}\{" "}\r\n{"))
35+
to-process (butlast json-lines)]
36+
(vreset! buff (last json-lines))
37+
(if to-process (step r to-process) r))))))
38+
39+
(defn- tweet? [data]
40+
(let [text (:text data)]
41+
(when-not text (log/error "error-msg" data))
42+
text))
43+
44+
(defn- parse-json [jstr]
45+
"parse JSON string"
46+
(try (json/read-str jstr :key-fn clojure.core/keyword)
47+
(catch Exception e {:exception e :jstr jstr})))
48+
49+
(defn process-chunk [last-received]
50+
(comp
51+
(streaming-buffer)
52+
(flattening)
53+
(mapping parse-json)
54+
(filtering tweet?)
55+
(log-count last-received)))

0 commit comments

Comments
 (0)