Skip to content

Commit ae40b55

Browse files
committed
Percolator component
1 parent f3ca7b1 commit ae40b55

File tree

6 files changed

+68
-37
lines changed

6 files changed

+68
-37
lines changed

Clojure-Websockets/src/clj/birdwatch/channels.clj

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
rt-persistence (chan)
2020
percolation (chan)
2121
percolation-matches (chan 10000)
22+
register-percolation (chan)
2223
query (chan)
2324
query-results (chan)
2425
tweets-mult (mult tweets)]
@@ -36,6 +37,7 @@
3637
:rt-persistence rt-persistence
3738
:percolation percolation
3839
:percolation-matches percolation-matches
40+
:register-percolation register-percolation
3941
:query query
4042
:query-results query-results
4143
:tweets-mult tweets-mult)))

Clojure-Websockets/src/clj/birdwatch/communicator.clj

+7-11
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
(log/info "Connected:" (:remote-addr req) uid)
2323
uid))
2424

25-
(defn- make-event-msg-handler [query-chan tweet-missing-chan]
25+
(defn- make-event-msg-handler [query-chan tweet-missing-chan register-percolation-chan]
2626
(fn [{:as ev-msg :keys [event ?reply-fn]}]
2727
(match event
28-
[:cmd/percolate params] (p/start-percolator params)
28+
[:cmd/percolate params] (put! register-percolation-chan params)
2929
[:cmd/query params] (do (log/info "Received query:" params)
30-
(put! query-chan params))
30+
(put! query-chan params))
3131
[:cmd/missing params] (put! tweet-missing-chan params)
3232
[:chsk/ws-ping] () ; currently just do nothing with ping (no logging either)
3333
:else (log/info "Unmatched event:" (pp/pprint event)))))
@@ -73,25 +73,21 @@
7373
(recur)))
7474

7575
(defrecord Communicator [channels]
76-
;; Implement the Lifecycle protocol
7776
component/Lifecycle
78-
7977
(start [component]
8078
(log/info "Starting Communicator Component")
81-
8279
(let [{:keys [ch-recv send-fn ajax-post-fn ajax-get-or-ws-handshake-fn connected-uids]}
8380
(sente/make-channel-socket! {:packer packer :user-id-fn user-id-fn})
84-
chsk-router (sente/start-chsk-router! ch-recv
85-
(make-event-msg-handler
86-
(:query channels) (:tweet-missing channels)))]
81+
event-msg-handler (make-event-msg-handler (:query channels)
82+
(:tweet-missing channels)
83+
(:register-percolation channels))
84+
chsk-router (sente/start-chsk-router! ch-recv event-msg-handler)]
8785
(run-percolation-matches-loop (:percolation-matches channels) send-fn connected-uids)
8886
(run-users-count-loop send-fn connected-uids)
8987
(run-tweet-stats-loop send-fn connected-uids)
9088
(run-missing-tweet-loop (:missing-tweet-found channels) send-fn)
9189
(run-query-results-loop (:query-results channels) send-fn)
92-
9390
(assoc component :ajax-post-fn ajax-post-fn :ajax-get-or-ws-handshake-fn ajax-get-or-ws-handshake-fn)))
94-
9591
(stop [component]
9692
(log/info "Stopping Persistence Component"))) ;; TODO: teardown
9793

Clojure-Websockets/src/clj/birdwatch/main.clj

+5-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
[birdwatch.channels :as c]
66
[birdwatch.communicator :as comm]
77
[birdwatch.persistence :as p]
8+
[birdwatch.percolator :as perc]
89
[birdwatch.http :as http]
910
[clojure.edn :as edn]
1011
[clojure.tools.logging :as log]
@@ -16,10 +17,11 @@
1617
(defn get-system [conf]
1718
(component/system-map
1819
:channels (c/new-channels)
19-
:communicator (component/using (comm/new-communicator) {:channels :channels})
20+
:communicator (component/using (comm/new-communicator) {:channels :channels})
2021
:twitterclient (component/using (tc/new-twitterclient conf) {:channels :channels})
21-
:persistence (component/using (p/new-persistence conf) {:channels :channels})
22-
:http (component/using (http/new-http-server conf) {:communicator :communicator})))
22+
:persistence (component/using (p/new-persistence conf) {:channels :channels})
23+
:percolator (component/using (perc/new-percolator conf) {:channels :channels})
24+
:http (component/using (http/new-http-server conf) {:communicator :communicator})))
2325

2426
(def system (get-system conf))
2527

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
(ns birdwatch.percolator
2+
(:gen-class)
3+
(:require
4+
[birdwatch.atoms :as a] ;; TOD: should be local
5+
[birdwatch.data :as d]
6+
[clojure.tools.logging :as log]
7+
[pandect.core :refer [sha1]]
8+
[clojure.pprint :as pp]
9+
[clojurewerkz.elastisch.rest :as esr]
10+
[clojurewerkz.elastisch.rest.percolation :as perc]
11+
[clojurewerkz.elastisch.rest.response :as esrsp]
12+
[com.stuartsierra.component :as component]
13+
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go go-loop close!]]))
14+
15+
(defn start-percolator [{:keys [query uid]} conn]
16+
"register percolation search with ID based on hash of the query"
17+
(let [sha (sha1 (str query))]
18+
(swap! a/subscriptions assoc uid sha)
19+
(perc/register-query conn "percolator" sha :query query)
20+
(log/info "Percolation registered for query" query "with SHA1" sha)))
21+
22+
;; loop for finding percolation matches and delivering those on the appropriate socket
23+
(defn- run-percolation-register-loop [register-percolation-chan conn]
24+
(go-loop []
25+
(let [params (<! register-percolation-chan)]
26+
(start-percolator params conn)
27+
(recur))))
28+
29+
;; loop for finding percolation matches and delivering those on the appropriate socket
30+
(defn- run-percolation-loop [percolation-chan percolation-matches-chan conn]
31+
(go-loop []
32+
(let [t (<! percolation-chan)
33+
response (perc/percolate conn "percolator" "tweet" :doc t)
34+
matches (into #{} (map #(:_id %1) (esrsp/matches-from response)))]
35+
(put! percolation-matches-chan [t matches])
36+
(recur))))
37+
38+
(defrecord Percolator [conf channels conn]
39+
component/Lifecycle
40+
(start [component]
41+
(log/info "Starting Percolator Component")
42+
(let [conn (esr/connect (:es-address conf))]
43+
(run-percolation-register-loop (:register-percolation channels) conn)
44+
(run-percolation-loop (:percolation channels) (:percolation-matches channels) conn)
45+
(assoc component :conn conn)))
46+
(stop [component] ;; TODO: proper teardown of resources
47+
(log/info "Stopping Percolator Component")
48+
(assoc component :conn nil)))
49+
50+
(defn new-percolator [conf]
51+
(map->Percolator {:conf conf}))

Clojure-Websockets/src/clj/birdwatch/persistence.clj

+1-23
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,22 @@
33
(:use [birdwatch.conf] ;; TODO: remove conf dependency
44
[clojure.data.priority-map])
55
(:require
6-
[birdwatch.atoms :as a]
76
[birdwatch.data :as d]
8-
[clojure.edn :as edn]
97
[clojure.tools.logging :as log]
10-
[pandect.core :refer [sha1]]
118
[clojure.pprint :as pp]
129
[clojurewerkz.elastisch.native :as esn]
1310
[clojurewerkz.elastisch.native.document :as esnd]
1411
[clojurewerkz.elastisch.native.response :as esnrsp]
1512
[clojurewerkz.elastisch.rest :as esr]
1613
[clojurewerkz.elastisch.rest.document :as esd]
17-
[clojurewerkz.elastisch.rest.percolation :as perc]
1814
[clojurewerkz.elastisch.query :as q]
1915
[clojurewerkz.elastisch.rest.response :as esrsp]
2016
[com.stuartsierra.component :as component]
2117
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go go-loop close!]]))
2218

2319
;; TODO: move connection objects into component
2420
(def conn (esr/connect (:es-address conf)))
25-
(def native-conn (esn/connect [["127.0.0.1" 9300]] {"cluster.name" "elasticsearch_mn"}))
21+
(def native-conn (esn/connect [(:es-native-address conf)] {"cluster.name" (:es-cluster-name conf)}))
2622

2723
(defn total-tweet-count []
2824
"get total count of indexed tweets from ElasticSearch"
@@ -76,13 +72,6 @@
7672
;(log/info "top retweets in chunk" (pp/pprint (take 10 (into (priority-map-by >) (d/retweets res {})))))
7773
res))
7874

79-
(defn start-percolator [{:keys [query uid]}]
80-
"register percolation search with ID based on hash of the query"
81-
(let [sha (sha1 (str query))]
82-
(swap! a/subscriptions assoc uid sha)
83-
(perc/register-query conn "percolator" sha :query query)
84-
(log/info "Percolation registered for query" query "with SHA1" sha)))
85-
8675
;; loop for persisting tweets
8776
(defn- run-persistence-loop [persistence-chan]
8877
(go-loop []
@@ -102,16 +91,6 @@
10291
(catch Exception ex (log/error ex "esd/put error"))))
10392
(recur))))
10493

105-
;; loop for finding percolation matches and delivering those on the appropriate socket
106-
(defn- run-percolation-loop [percolation-chan percolation-matches-chan]
107-
(go-loop []
108-
(let [t (<! percolation-chan)
109-
response (perc/percolate conn "percolator" "tweet" :doc t)
110-
matches (into #{} (map #(:_id %1) (esrsp/matches-from response)))
111-
]
112-
(put! percolation-matches-chan [t matches])
113-
(recur))))
114-
11594
(defn- run-find-missing-loop [tweet-missing-chan missing-tweet-found-chan]
11695
"starts loop for finding missing tweets, puts result on missing-tweet-found-chan "
11796
(go-loop []
@@ -139,7 +118,6 @@
139118

140119
(run-persistence-loop (:persistence channels))
141120
(run-rt-persistence-loop (:rt-persistence channels))
142-
(run-percolation-loop (:percolation channels) (:percolation-matches channels))
143121
(run-find-missing-loop (:tweet-missing channels) (:missing-tweet-found channels))
144122
(run-query-loop (:query channels) (:query-results channels))
145123

Clojure-Websockets/twitterconf-tpl.edn

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
:user-access-token "<YOUR ACCESS TOKEN HERE>"
1010
:user-access-token-secret "<YOUR ACCESS TOKEN SECRET HERE>"
1111
:es-address "http://127.0.0.1:9200"
12+
:es-native-address ["127.0.0.1" 9300]
13+
:es-cluster-name "elasticsearch_mn"
1214
:es-index "birdwatch"
1315
:track "clojure,love"
1416
:tw-check-interval-sec 120

0 commit comments

Comments
 (0)