|
13 | 13 | (log/info "Connected:" (:remote-addr req) uid)
|
14 | 14 | uid))
|
15 | 15 |
|
16 |
| -(defn make-event-handler [query-chan tweet-missing-chan register-percolation-chan] |
17 |
| - "creates event handler function for the websocket connection" |
| 16 | +(defn make-handler [query-chan tweet-missing-chan register-percolation-chan] |
| 17 | + "create event handler function for the websocket connection" |
18 | 18 | (fn [{:as ev-msg :keys [event ?reply-fn]}]
|
19 | 19 | (match event
|
20 | 20 | [:cmd/percolate params] (put! register-percolation-chan params)
|
|
23 | 23 | [:chsk/ws-ping] () ; currently just do nothing with ping (no logging either)
|
24 | 24 | :else (log/debug "Unmatched event:" (pp/pprint event)))))
|
25 | 25 |
|
26 |
| -(defn run-percolation-matches-loop [percolation-matches-chan chsk-send! connected-uids] |
27 |
| - "runs loop for delivering percolation matches to interested clients" |
28 |
| - (go-loop [] |
29 |
| - (let [[t matches subscriptions] (<! percolation-matches-chan)] |
30 |
| - (doseq [uid (:any @connected-uids)] |
31 |
| - (when (contains? matches (get subscriptions uid)) |
32 |
| - (chsk-send! uid [:tweet/new t])))) |
33 |
| - (recur))) |
| 26 | +(defn send-loop [channel f] |
| 27 | + "run loop, call f with message on channel" |
| 28 | + (go-loop [] (let [msg (<! channel)] (f msg)) (recur))) |
| 29 | + |
| 30 | +(defn tweet-stats [uids chsk-send!] |
| 31 | + "send stats about number of indexed tweets to all connected clients" |
| 32 | + (fn [msg] (doseq [uid (:any @uids)] (chsk-send! uid [:stats/total-tweet-count msg])))) |
| 33 | + |
| 34 | +(defn perc-matches [uids chsk-send!] |
| 35 | + "deliver percolation matches to interested clients" |
| 36 | + (fn [msg] (let [[t matches subscriptions] msg] |
| 37 | + (doseq [uid (:any @uids)] |
| 38 | + (when (contains? matches (get subscriptions uid)) |
| 39 | + (chsk-send! uid [:tweet/new t])))))) |
| 40 | + |
| 41 | +(defn relay-msg [msg-type msg-key chsk-send!] |
| 42 | + "send query result chunks back to client" |
| 43 | + (fn [msg] (chsk-send! (:uid msg) [msg-type (msg-key msg)]))) |
34 | 44 |
|
35 | 45 | (defn run-users-count-loop [chsk-send! connected-uids]
|
36 | 46 | "runs loop for sending stats about number of connected users to all connected clients"
|
37 |
| - (go-loop [] |
38 |
| - (<! (timeout 2000)) |
| 47 | + (go-loop [] (<! (timeout 2000)) |
39 | 48 | (let [uids (:any @connected-uids)]
|
40 | 49 | (doseq [uid uids] (chsk-send! uid [:stats/users-count (count uids)])))
|
41 | 50 | (recur)))
|
42 |
| - |
43 |
| -(defn run-tweet-stats-loop [chsk-send! uids tweet-count-chan] |
44 |
| - "runs loop for sending stats about number of indexed tweets to all connected clients" |
45 |
| - (go-loop [] |
46 |
| - (let [tweet-count (<! tweet-count-chan)] |
47 |
| - (doseq [uid (:any @uids)] (chsk-send! uid [:stats/total-tweet-count tweet-count]))) |
48 |
| - (recur))) |
49 |
| - |
50 |
| -(defn run-missing-tweet-loop [missing-tweet-found-chan chsk-send!] |
51 |
| - "runs loop for sending missing tweet back to client" |
52 |
| - (go-loop [] (let [msg (<! missing-tweet-found-chan)] |
53 |
| - (chsk-send! (:uid msg) [:tweet/missing-tweet (:tweet msg)])) |
54 |
| - (recur))) |
55 |
| - |
56 |
| -(defn run-query-results-loop [query-results-chan chsk-send!] |
57 |
| - "runs loop for sending query result chunks back to client" |
58 |
| - (go-loop [] |
59 |
| - (let [res (<! query-results-chan)] |
60 |
| - (chsk-send! (:uid res) [:tweet/prev-chunk (:result res)])) |
61 |
| - (recur))) |
0 commit comments