|
1 | 1 | (ns birdwatch.communicator
|
2 | 2 | (:gen-class)
|
3 |
| - (:use [birdwatch.conf]) |
4 | 3 | (:require
|
5 | 4 | [birdwatch.channels :as c]
|
6 | 5 | [birdwatch.atoms :as a]
|
|
20 | 19 | [compojure.route :as route]
|
21 | 20 | [taoensso.sente :as sente]
|
22 | 21 | [taoensso.sente.packers.transit :as sente-transit]
|
23 |
| - [clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go]])) |
| 22 | + [com.stuartsierra.component :as component] |
| 23 | + [clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go go-loop]])) |
24 | 24 |
|
25 | 25 | (def packer
|
26 | 26 | "Defines our packing (serialization) format for client<->server comms."
|
27 | 27 | (sente-transit/get-flexi-packer :json))
|
28 | 28 |
|
| 29 | +(defn- user-id-fn [req] |
| 30 | + "generates unique ID for request" |
| 31 | + (let [uid (str (java.util.UUID/randomUUID))] |
| 32 | + (log/info "Connected:" (:remote-addr req) uid) |
| 33 | + uid)) |
| 34 | + |
29 | 35 | (let [{:keys [ch-recv send-fn ajax-post-fn ajax-get-or-ws-handshake-fn connected-uids]}
|
30 | 36 | (sente/make-channel-socket! {:packer packer
|
31 |
| - :user-id-fn (fn [req] |
32 |
| - (let [uid (str (java.util.UUID/randomUUID))] |
33 |
| - (log/info "Connected:" (:remote-addr req) uid) |
34 |
| - uid))})] |
| 37 | + :user-id-fn user-id-fn})] |
35 | 38 | (def ring-ajax-post ajax-post-fn)
|
36 | 39 | (def ring-ajax-get-or-ws-handshake ajax-get-or-ws-handshake-fn)
|
37 | 40 | (def ch-chsk ch-recv) ; ChannelSocket's receive channel
|
|
52 | 55 | (defonce chsk-router (sente/start-chsk-router! ch-chsk event-msg-handler))
|
53 | 56 |
|
54 | 57 | ;; loop for matching connected clients with percolation matches and delivering those on the appropriate socket
|
55 |
| -(go |
56 |
| - (while true |
57 |
| - (let [[t matches] (<! c/percolation-matches-chan)] |
58 |
| - (doseq [uid (:any @connected-uids)] |
59 |
| - (when (contains? matches (get @a/subscriptions uid)) |
60 |
| - (chsk-send! uid [:tweet/new t])))))) |
| 58 | +(defn- run-percolation-loop [] |
| 59 | + (go-loop [] |
| 60 | + (let [[t matches] (<! c/percolation-matches-chan)] |
| 61 | + (doseq [uid (:any @connected-uids)] |
| 62 | + (when (contains? matches (get @a/subscriptions uid)) |
| 63 | + (chsk-send! uid [:tweet/new t])))) |
| 64 | + (recur))) |
61 | 65 |
|
62 | 66 | ;; loop sending stats about number of connected users to all connected clients
|
63 |
| -(go |
64 |
| - (while true |
65 |
| - (<! (timeout 2000)) |
66 |
| - (let [uids (:any @connected-uids)] |
67 |
| - (doseq [uid uids] |
68 |
| - (chsk-send! uid [:stats/users-count (count uids)]))))) |
| 67 | +(defn- run-users-count-loop [] |
| 68 | + (go-loop [] |
| 69 | + (<! (timeout 2000)) |
| 70 | + (let [uids (:any @connected-uids)] |
| 71 | + (doseq [uid uids] (chsk-send! uid [:stats/users-count (count uids)]))) |
| 72 | + (recur))) |
69 | 73 |
|
70 | 74 | ;; loop sending stats about number of indexed tweets to all connected clients
|
71 |
| -(go |
72 |
| - (while true |
73 |
| - (<! (timeout 3000)) |
74 |
| - (let [uids (:any @connected-uids) |
75 |
| - total-tweet-count (format "%,15d" (:count (p/total-tweet-count)))] |
76 |
| - (doseq [uid uids] |
77 |
| - (chsk-send! uid [:stats/total-tweet-count total-tweet-count]))))) |
| 75 | +(defn- run-tweet-stats-loop [] |
| 76 | + (go-loop [] |
| 77 | + (<! (timeout 3000)) |
| 78 | + (let [uids (:any @connected-uids) |
| 79 | + total-tweet-count (format "%,15d" (:count (p/total-tweet-count)))] |
| 80 | + (doseq [uid uids] |
| 81 | + (chsk-send! uid [:stats/total-tweet-count total-tweet-count]))) |
| 82 | + (recur))) |
78 | 83 |
|
79 | 84 | ;; loop for sending missing tweet back to client
|
80 |
| -(go |
81 |
| - (while true |
82 |
| - (let [msg (<! c/missing-tweet-found-chan)] |
83 |
| - (chsk-send! (:uid msg) [:tweet/missing-tweet (:tweet msg)])))) |
| 85 | +(defn- run-missing-tweet-loop [] |
| 86 | + (go-loop [] (let [msg (<! c/missing-tweet-found-chan)] |
| 87 | + (chsk-send! (:uid msg) [:tweet/missing-tweet (:tweet msg)])) |
| 88 | + (recur))) |
84 | 89 |
|
85 | 90 | ;; loop for sending query result chunks back to client
|
86 |
| -(go |
87 |
| - (while true |
88 |
| - (let [res (<! c/query-results-chan)] |
89 |
| - (chsk-send! (:uid res) [:tweet/prev-chunk (:result res)])))) |
| 91 | +(defn- run-query-results-loop [] |
| 92 | + (go-loop [] |
| 93 | + (let [res (<! c/query-results-chan)] |
| 94 | + (chsk-send! (:uid res) [:tweet/prev-chunk (:result res)])) |
| 95 | + (recur))) |
| 96 | + |
| 97 | +(run-percolation-loop) |
| 98 | +(run-users-count-loop) |
| 99 | +(run-tweet-stats-loop) |
| 100 | +(run-missing-tweet-loop) |
| 101 | +(run-query-results-loop) |
| 102 | + |
| 103 | +(defrecord Communicator [channels] |
| 104 | + ;; Implement the Lifecycle protocol |
| 105 | + component/Lifecycle |
| 106 | + |
| 107 | + (start [component] |
| 108 | + (log/info "Starting Communicator Component") |
| 109 | + |
| 110 | + (let [{:keys [ch-recv send-fn ajax-post-fn ajax-get-or-ws-handshake-fn connected-uids]} |
| 111 | + (sente/make-channel-socket! {:packer packer :user-id-fn user-id-fn})] |
| 112 | + (def ring-ajax-post ajax-post-fn) |
| 113 | + (def ring-ajax-get-or-ws-handshake ajax-get-or-ws-handshake-fn) |
| 114 | + (def ch-chsk ch-recv) ; ChannelSocket's receive channel |
| 115 | + (def chsk-send! send-fn) ; ChannelSocket's send API fn |
| 116 | + (def connected-uids connected-uids) ; Watchable, read-only atom |
| 117 | + |
| 118 | + ;; Return an updated version of the component with |
| 119 | + ;; the run-time state assoc'd in. |
| 120 | + (assoc component :ajax-post-fn ajax-post-fn :ajax-get-or-ws-handshake-fn ajax-get-or-ws-handshake-fn) |
| 121 | + )) |
| 122 | + |
| 123 | + (stop [component] |
| 124 | + (log/info "Stopping Persistence Component") |
| 125 | + ;; In the 'stop' method, shut down the running |
| 126 | + ;; component and release any external resources it has |
| 127 | + ;; acquired. |
| 128 | + |
| 129 | + ;; Return the component, optionally modified. Remember that if you |
| 130 | + ;; dissoc one of a record's base fields, you get a plain map. |
| 131 | + |
| 132 | + ;(assoc component :loops nil) |
| 133 | + )) |
| 134 | + |
| 135 | +(defn new-communicator [] (map->Communicator {})) |
0 commit comments