|
26 | 26 | "get total count of indexed tweets from ElasticSearch"
|
27 | 27 | (esd/count conn (:es-index conf) "tweet"))
|
28 | 28 |
|
29 |
| -;; loop for persisting tweets |
30 |
| -(go |
31 |
| - (while true |
32 |
| - (let [t (<! c/persistence-chan)] |
33 |
| - (try |
34 |
| - (esd/put conn (:es-index conf) "tweet" (:id_str t) t) |
35 |
| - (catch Exception ex (log/error ex "esd/put error")))))) |
36 |
| - |
37 | 29 | (defn strip-tweet [t]
|
| 30 | + "take only actually needed fields from tweet" |
38 | 31 | (let [u (:user t)]
|
39 | 32 | {:id_str (:id_str t)
|
40 | 33 | :id (:id t)
|
|
57 | 50 | (assoc t :retweeted_status (strip-tweet rt))
|
58 | 51 | t)))
|
59 | 52 |
|
| 53 | +(defn get-tweet [id] |
| 54 | + "get Tweet for specified ID" |
| 55 | + (esd/get conn (:es-index conf) "tweet" id)) |
| 56 | + |
60 | 57 | (defn get-source [coll]
|
61 | 58 | "get vector with :_source of each ElasticSearch result"
|
62 | 59 | (map strip-source coll))
|
|
80 | 77 | n (esrsp/total-hits res)
|
81 | 78 | hits (esrsp/hits-from res)
|
82 | 79 | res (get-source hits)]
|
83 |
| - (log/info "Total hits:" n "Retrieved:" (count hits)) |
| 80 | + ;(log/info "Total hits:" n "Retrieved:" (count hits)) |
84 | 81 | ;(log/info "top retweets in chunk" (pp/pprint (take 10 (into (priority-map-by >) (d/retweets res {})))))
|
85 | 82 | res))
|
86 | 83 |
|
|
93 | 90 | (perc/register-query conn "percolator" sha :query q)
|
94 | 91 | (log/info "Percolation registered for query" q "with SHA1" sha)))
|
95 | 92 |
|
| 93 | +;; loop for persisting tweets |
| 94 | +(go |
| 95 | + (while true |
| 96 | + (let [t (<! c/persistence-chan)] |
| 97 | + (try |
| 98 | + (esd/put conn (:es-index conf) "tweet" (:id_str t) t) |
| 99 | + (catch Exception ex (log/error ex "esd/put error")))))) |
| 100 | + |
| 101 | +;; loop for persisting retweets |
| 102 | +(go |
| 103 | + (while true |
| 104 | + (let [t (<! c/rt-persistence-chan)] |
| 105 | + (when (:retweeted_status t) |
| 106 | + (try |
| 107 | + (esd/put conn (:es-index conf) "tweet" (:id_str t) t) |
| 108 | + (catch Exception ex (log/error ex "esd/put error"))))))) |
| 109 | + |
96 | 110 | ;; loop for finding percolation matches and delivering those on the appropriate socket
|
97 | 111 | (go
|
98 | 112 | (while true
|
99 | 113 | (let [t (<! c/percolation-chan)
|
100 | 114 | response (perc/percolate conn "percolator" "tweet" :doc t)
|
101 | 115 | matches (into #{} (map #(:_id %1) (esrsp/matches-from response)))]
|
102 | 116 | (put! c/percolation-matches-chan [t matches]))))
|
| 117 | + |
| 118 | +;; loop for finding missing tweet |
| 119 | +(go |
| 120 | + (while true |
| 121 | + (let [req (<! c/tweet-missing-chan) |
| 122 | + res (get-tweet (:id_str req)) |
| 123 | + uid (:uid req)] |
| 124 | + (if res |
| 125 | + (put! c/missing-tweet-found-chan {:tweet (strip-source res) :uid uid}) |
| 126 | + (log/info "birdwatch.persistence missing" (:id_str req) res))))) |
0 commit comments