Skip to content

Commit 167c8f5

Browse files
committed
WIP communicator componentization
1 parent 2547200 commit 167c8f5

File tree

3 files changed

+106
-59
lines changed

3 files changed

+106
-59
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
(ns birdwatch.communicator
22
(:gen-class)
3-
(:use [birdwatch.conf])
43
(:require
54
[birdwatch.channels :as c]
65
[birdwatch.atoms :as a]
@@ -20,18 +19,22 @@
2019
[compojure.route :as route]
2120
[taoensso.sente :as sente]
2221
[taoensso.sente.packers.transit :as sente-transit]
23-
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go]]))
22+
[com.stuartsierra.component :as component]
23+
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go go-loop]]))
2424

2525
(def packer
2626
"Defines our packing (serialization) format for client<->server comms."
2727
(sente-transit/get-flexi-packer :json))
2828

29+
(defn- user-id-fn [req]
30+
"generates unique ID for request"
31+
(let [uid (str (java.util.UUID/randomUUID))]
32+
(log/info "Connected:" (:remote-addr req) uid)
33+
uid))
34+
2935
(let [{:keys [ch-recv send-fn ajax-post-fn ajax-get-or-ws-handshake-fn connected-uids]}
3036
(sente/make-channel-socket! {:packer packer
31-
:user-id-fn (fn [req]
32-
(let [uid (str (java.util.UUID/randomUUID))]
33-
(log/info "Connected:" (:remote-addr req) uid)
34-
uid))})]
37+
:user-id-fn user-id-fn})]
3538
(def ring-ajax-post ajax-post-fn)
3639
(def ring-ajax-get-or-ws-handshake ajax-get-or-ws-handshake-fn)
3740
(def ch-chsk ch-recv) ; ChannelSocket's receive channel
@@ -52,38 +55,81 @@
5255
(defonce chsk-router (sente/start-chsk-router! ch-chsk event-msg-handler))
5356

5457
;; loop for matching connected clients with percolation matches and delivering those on the appropriate socket
55-
(go
56-
(while true
57-
(let [[t matches] (<! c/percolation-matches-chan)]
58-
(doseq [uid (:any @connected-uids)]
59-
(when (contains? matches (get @a/subscriptions uid))
60-
(chsk-send! uid [:tweet/new t]))))))
58+
(defn- run-percolation-loop []
59+
(go-loop []
60+
(let [[t matches] (<! c/percolation-matches-chan)]
61+
(doseq [uid (:any @connected-uids)]
62+
(when (contains? matches (get @a/subscriptions uid))
63+
(chsk-send! uid [:tweet/new t]))))
64+
(recur)))
6165

6266
;; loop sending stats about number of connected users to all connected clients
63-
(go
64-
(while true
65-
(<! (timeout 2000))
66-
(let [uids (:any @connected-uids)]
67-
(doseq [uid uids]
68-
(chsk-send! uid [:stats/users-count (count uids)])))))
67+
(defn- run-users-count-loop []
68+
(go-loop []
69+
(<! (timeout 2000))
70+
(let [uids (:any @connected-uids)]
71+
(doseq [uid uids] (chsk-send! uid [:stats/users-count (count uids)])))
72+
(recur)))
6973

7074
;; loop sending stats about number of indexed tweets to all connected clients
71-
(go
72-
(while true
73-
(<! (timeout 3000))
74-
(let [uids (:any @connected-uids)
75-
total-tweet-count (format "%,15d" (:count (p/total-tweet-count)))]
76-
(doseq [uid uids]
77-
(chsk-send! uid [:stats/total-tweet-count total-tweet-count])))))
75+
(defn- run-tweet-stats-loop []
76+
(go-loop []
77+
(<! (timeout 3000))
78+
(let [uids (:any @connected-uids)
79+
total-tweet-count (format "%,15d" (:count (p/total-tweet-count)))]
80+
(doseq [uid uids]
81+
(chsk-send! uid [:stats/total-tweet-count total-tweet-count])))
82+
(recur)))
7883

7984
;; loop for sending missing tweet back to client
80-
(go
81-
(while true
82-
(let [msg (<! c/missing-tweet-found-chan)]
83-
(chsk-send! (:uid msg) [:tweet/missing-tweet (:tweet msg)]))))
85+
(defn- run-missing-tweet-loop []
86+
(go-loop [] (let [msg (<! c/missing-tweet-found-chan)]
87+
(chsk-send! (:uid msg) [:tweet/missing-tweet (:tweet msg)]))
88+
(recur)))
8489

8590
;; loop for sending query result chunks back to client
86-
(go
87-
(while true
88-
(let [res (<! c/query-results-chan)]
89-
(chsk-send! (:uid res) [:tweet/prev-chunk (:result res)]))))
91+
(defn- run-query-results-loop []
92+
(go-loop []
93+
(let [res (<! c/query-results-chan)]
94+
(chsk-send! (:uid res) [:tweet/prev-chunk (:result res)]))
95+
(recur)))
96+
97+
(run-percolation-loop)
98+
(run-users-count-loop)
99+
(run-tweet-stats-loop)
100+
(run-missing-tweet-loop)
101+
(run-query-results-loop)
102+
103+
(defrecord Communicator [channels]
104+
;; Implement the Lifecycle protocol
105+
component/Lifecycle
106+
107+
(start [component]
108+
(log/info "Starting Communicator Component")
109+
110+
(let [{:keys [ch-recv send-fn ajax-post-fn ajax-get-or-ws-handshake-fn connected-uids]}
111+
(sente/make-channel-socket! {:packer packer :user-id-fn user-id-fn})]
112+
(def ring-ajax-post ajax-post-fn)
113+
(def ring-ajax-get-or-ws-handshake ajax-get-or-ws-handshake-fn)
114+
(def ch-chsk ch-recv) ; ChannelSocket's receive channel
115+
(def chsk-send! send-fn) ; ChannelSocket's send API fn
116+
(def connected-uids connected-uids) ; Watchable, read-only atom
117+
118+
;; Return an updated version of the component with
119+
;; the run-time state assoc'd in.
120+
(assoc component :ajax-post-fn ajax-post-fn :ajax-get-or-ws-handshake-fn ajax-get-or-ws-handshake-fn)
121+
))
122+
123+
(stop [component]
124+
(log/info "Stopping Persistence Component")
125+
;; In the 'stop' method, shut down the running
126+
;; component and release any external resources it has
127+
;; acquired.
128+
129+
;; Return the component, optionally modified. Remember that if you
130+
;; dissoc one of a record's base fields, you get a plain map.
131+
132+
;(assoc component :loops nil)
133+
))
134+
135+
(defn new-communicator [] (map->Communicator {}))

Clojure-Websockets/src/clj/birdwatch/main.clj

+5-4
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
[compojure.core :as comp :refer (defroutes GET POST)]
2121
[compojure.route :as route]
2222
[taoensso.sente :as sente]
23-
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go]]
23+
[clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go go-loop close!]]
2424
[clj-pid.core :as pid]
2525
[com.stuartsierra.component :as component]))
2626

@@ -57,10 +57,10 @@
5757

5858
(def pid-file "birdwatch.pid")
5959

60-
6160
(defn get-system [conf]
6261
(component/system-map
6362
:channels (c/new-channels)
63+
; :communicator (component/using (comm/new-communicator) {:channels :channels})
6464
:twitterclient (component/using (tc/new-twitterclient conf) {:channels :channels})
6565
:persistence (component/using (p/new-persistence conf) {:channels :channels})))
6666

@@ -71,6 +71,7 @@
7171
(pid/save pid-file)
7272
(pid/delete-on-shutdown! pid-file)
7373
(log/info "Application started, PID" (pid/current))
74-
(tc/start-twitter-conn!)
75-
(tc/watch-twitter-conn!)
74+
; (tc/start-twitter-conn!)
75+
; (tc/watch-twitter-conn!)
76+
(alter-var-root #'system component/start)
7677
(start-http-server!))

Clojure-Websockets/src/clj/birdwatch/persistence.clj

+22-22
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@
129129
(put! query-results-chan {:uid (:uid q) :result result}))
130130
(recur)))
131131

132-
(defrecord Persistence [conf channels loops]
132+
(defrecord Persistence [conf channels conn native-conn]
133133
;; Implement the Lifecycle protocol
134134
component/Lifecycle
135135

@@ -139,27 +139,27 @@
139139
;; and start it running. For example, connect to a
140140
;; database, create thread pools, or initialize shared
141141
;; state.
142-
(let [loops {:persistence (run-persistence-loop (:persistence channels))
143-
:rt-persistence (run-rt-persistence-loop (:rt-persistence channels))
144-
:percolation (run-percolation-loop (:percolation channels) (:percolation-matches channels))
145-
:find-missing (run-find-missing-loop (:tweet-missing channels) (:missing-tweet-found channels))
146-
:query (run-query-loop (:query channels) (:query-results channels))}]
147-
;; Return an updated version of the component with
148-
;; the run-time state assoc'd in.
149-
(assoc component :loops loops)))
150-
151-
(stop [component]
152-
(log/info "Stopping Persistence Component")
153-
;; In the 'stop' method, shut down the running
154-
;; component and release any external resources it has
155-
;; acquired.
156-
157-
;; Return the component, optionally modified. Remember that if you
158-
;; dissoc one of a record's base fields, you get a plain map.
159-
(doseq [p-loop (vals loops)]
160-
(close! p-loop))
161-
162-
(assoc component :loops nil)))
142+
(run-persistence-loop (:persistence channels))
143+
(run-rt-persistence-loop (:rt-persistence channels))
144+
(run-percolation-loop (:percolation channels) (:percolation-matches channels))
145+
(run-find-missing-loop (:tweet-missing channels) (:missing-tweet-found channels))
146+
(run-query-loop (:query channels) (:query-results channels))
147+
;; Return an updated version of the component with
148+
;; the run-time state assoc'd in.
149+
;(assoc component :c loops)
150+
)
151+
152+
(stop [component]
153+
(log/info "Stopping Persistence Component")
154+
;; In the 'stop' method, shut down the running
155+
;; component and release any external resources it has
156+
;; acquired.
157+
158+
;; Return the component, optionally modified. Remember that if you
159+
;; dissoc one of a record's base fields, you get a plain map.
160+
161+
;(assoc component :loops nil)
162+
))
163163

164164
(defn new-persistence [conf]
165165
(map->Persistence {:conf conf}))

0 commit comments

Comments
 (0)