Skip to content

Commit 98efd03

Browse files
committed
finding and returning missing tweets
1 parent a828746 commit 98efd03

File tree

3 files changed

+46
-11
lines changed

3 files changed

+46
-11
lines changed

Clojure-Websockets/src/clj/birdwatch/channels.clj

+4
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66

77
;; channels
88
(def tweets-chan (chan))
9+
(def tweet-missing-chan (chan)) ; channel for requesting missing tweet
10+
(def missing-tweet-found-chan (chan)) ; channel for responding to missing request
911
(def persistence-chan (chan))
12+
(def rt-persistence-chan (chan))
1013
(def percolation-chan (chan))
1114
(def percolation-matches-chan (chan))
1215

1316
;; fanning tweets out to separate channels per task
1417
(def tweets-chan-mult (mult tweets-chan))
1518
(tap tweets-chan-mult percolation-chan)
1619
(tap tweets-chan-mult persistence-chan)
20+
(tap tweets-chan-mult rt-persistence-chan)

Clojure-Websockets/src/clj/birdwatch/communicator.clj

+9-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@
5050
;(doseq [t res] (chsk-send! (:uid params) [:some/tweet (:_source t)]))
5151
(chsk-send! (:uid params) [:tweet/prev-chunk res])))
5252

53-
[:cmd/missing tid]
54-
(log/info "Missing Tweet requested:" tid)
53+
[:cmd/missing params]
54+
(put! c/tweet-missing-chan params)
5555

5656
[:chsk/ws-ping params]
5757
() ; currently just do nothing with ping (no logging either)
@@ -87,3 +87,10 @@
8787
total-tweet-count (format "%,15d" (:count (p/total-tweet-count)))]
8888
(doseq [uid uids]
8989
(chsk-send! uid [:stats/total-tweet-count total-tweet-count])))))
90+
91+
92+
;; loop for sending missing tweet back to client
93+
(go
94+
(while true
95+
(let [msg (<! c/missing-tweet-found-chan)]
96+
(chsk-send! (:uid msg) [:tweet/missing-tweet (:tweet msg)]))))

Clojure-Websockets/src/clj/birdwatch/persistence.clj

+33-9
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,8 @@
2626
"get total count of indexed tweets from ElasticSearch"
2727
(esd/count conn (:es-index conf) "tweet"))
2828

29-
;; loop for persisting tweets
30-
(go
31-
(while true
32-
(let [t (<! c/persistence-chan)]
33-
(try
34-
(esd/put conn (:es-index conf) "tweet" (:id_str t) t)
35-
(catch Exception ex (log/error ex "esd/put error"))))))
36-
3729
(defn strip-tweet [t]
30+
"take only actually needed fields from tweet"
3831
(let [u (:user t)]
3932
{:id_str (:id_str t)
4033
:id (:id t)
@@ -57,6 +50,10 @@
5750
(assoc t :retweeted_status (strip-tweet rt))
5851
t)))
5952

53+
(defn get-tweet [id]
54+
"get Tweet for specified ID"
55+
(esd/get conn (:es-index conf) "tweet" id))
56+
6057
(defn get-source [coll]
6158
"get vector with :_source of each ElasticSearch result"
6259
(map strip-source coll))
@@ -80,7 +77,7 @@
8077
n (esrsp/total-hits res)
8178
hits (esrsp/hits-from res)
8279
res (get-source hits)]
83-
(log/info "Total hits:" n "Retrieved:" (count hits))
80+
;(log/info "Total hits:" n "Retrieved:" (count hits))
8481
;(log/info "top retweets in chunk" (pp/pprint (take 10 (into (priority-map-by >) (d/retweets res {})))))
8582
res))
8683

@@ -93,10 +90,37 @@
9390
(perc/register-query conn "percolator" sha :query q)
9491
(log/info "Percolation registered for query" q "with SHA1" sha)))
9592

93+
;; loop for persisting tweets
94+
(go
95+
(while true
96+
(let [t (<! c/persistence-chan)]
97+
(try
98+
(esd/put conn (:es-index conf) "tweet" (:id_str t) t)
99+
(catch Exception ex (log/error ex "esd/put error"))))))
100+
101+
;; loop for persisting retweets
102+
(go
103+
(while true
104+
(let [t (<! c/rt-persistence-chan)]
105+
(when (:retweeted_status t)
106+
(try
107+
(esd/put conn (:es-index conf) "tweet" (:id_str t) t)
108+
(catch Exception ex (log/error ex "esd/put error")))))))
109+
96110
;; loop for finding percolation matches and delivering those on the appropriate socket
97111
(go
98112
(while true
99113
(let [t (<! c/percolation-chan)
100114
response (perc/percolate conn "percolator" "tweet" :doc t)
101115
matches (into #{} (map #(:_id %1) (esrsp/matches-from response)))]
102116
(put! c/percolation-matches-chan [t matches]))))
117+
118+
;; loop for finding missing tweet
119+
(go
120+
(while true
121+
(let [req (<! c/tweet-missing-chan)
122+
res (get-tweet (:id_str req))
123+
uid (:uid req)]
124+
(if res
125+
(put! c/missing-tweet-found-chan {:tweet (strip-source res) :uid uid})
126+
(log/info "birdwatch.persistence missing" (:id_str req) res)))))

0 commit comments

Comments
 (0)