Skip to content

Commit a26c201

Browse files
committed
Switchboard component plus individual component-related channel bundles
These individual channel bundle components are somewhat like the interface for individual components, in the way that an engine's wiring harness represents the interface of an engine. The switchboard then only needs to know the channel bundles, without needing any knowledge about a specific implementation. Despite being a little longer, this is cleaner and less tightly coupled than the previous version were there was one centralized channels component, of which the other components needed to know the names of individual channels.
1 parent 364d7f8 commit a26c201

File tree

8 files changed

+107
-55
lines changed

8 files changed

+107
-55
lines changed

Clojure-Websockets/src/clj/birdwatch/channels.clj

Lines changed: 0 additions & 27 deletions
This file was deleted.

Clojure-Websockets/src/clj/birdwatch/communicator.clj

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
[taoensso.sente :as sente]
88
[taoensso.sente.packers.transit :as sente-transit]
99
[com.stuartsierra.component :as component]
10-
[clojure.core.async :as async :refer [<! >! put! timeout go-loop]]))
10+
[clojure.core.async :as async :refer [<! >! put! timeout go-loop chan]]))
1111

1212
(def packer (sente-transit/get-flexi-packer :json)) ;; serialization format for client<->server comm
1313

@@ -25,7 +25,7 @@
2525
[:cmd/query params] (put! query-chan params)
2626
[:cmd/missing params] (put! tweet-missing-chan params)
2727
[:chsk/ws-ping] () ; currently just do nothing with ping (no logging either)
28-
:else (log/info "Unmatched event:" (pp/pprint event)))))
28+
:else (log/debug "Unmatched event:" (pp/pprint event)))))
2929

3030
(defn- run-percolation-matches-loop [percolation-matches-chan chsk-send! connected-uids]
3131
"runs loop for delivering percolation matches to interested clients"
@@ -84,3 +84,22 @@
8484
(assoc component :chsk-router nil)))
8585

8686
(defn new-communicator [] (map->Communicator {}))
87+
88+
(defrecord Communicator-Channels []
89+
component/Lifecycle
90+
(start [component] (log/info "Starting Communicator Channels Component")
91+
(assoc component
92+
:query (chan)
93+
:query-results (chan)
94+
:tweet-missing (chan)
95+
:missing-tweet-found (chan)
96+
:persistence (chan)
97+
:rt-persistence (chan)
98+
:tweet-count (chan)
99+
:register-percolation (chan)
100+
:percolation-matches (chan)))
101+
(stop [component] (log/info "Stop Communicator Channels Component")
102+
(assoc component :query nil :query-results nil :tweet-missing nil :missing-tweet-found nil
103+
:persistence nil :rt-persistence nil :tweet-count nil)))
104+
105+
(defn new-communicator-channels [] (map->Communicator-Channels {}))

Clojure-Websockets/src/clj/birdwatch/main.clj

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
(:gen-class)
33
(:require
44
[birdwatch.twitter-client :as tc]
5-
[birdwatch.channels :as c]
65
[birdwatch.communicator :as comm]
76
[birdwatch.persistence :as p]
87
[birdwatch.percolator :as perc]
98
[birdwatch.http :as http]
9+
[birdwatch.switchboard :as sw]
1010
[clojure.edn :as edn]
1111
[clojure.tools.logging :as log]
1212
[clj-pid.core :as pid]
@@ -16,15 +16,21 @@
1616

1717
(defn get-system [conf]
1818
"Create system by wiring individual components so that component/start
19-
will bring up the individual components in the correct order."
19+
will bring up the individual components in the correct order."
2020
(component/system-map
21-
:channels (c/new-channels)
22-
:communicator (component/using (comm/new-communicator) {:channels :channels})
23-
:twitterclient (component/using (tc/new-twitterclient conf) {:channels :channels})
24-
:persistence (component/using (p/new-persistence conf) {:channels :channels})
25-
:percolator (component/using (perc/new-percolator conf) {:channels :channels})
26-
:http (component/using (http/new-http-server conf) {:communicator :communicator})))
27-
21+
:communicator-channels (comm/new-communicator-channels)
22+
:communicator (component/using (comm/new-communicator) {:channels :communicator-channels})
23+
:twitterclient-channels (tc/new-twitterclient-channels)
24+
:twitterclient (component/using (tc/new-twitterclient conf) {:channels :twitterclient-channels})
25+
:persistence-channels (p/new-persistence-channels)
26+
:persistence (component/using (p/new-persistence conf) {:channels :persistence-channels})
27+
:percolation-channels (perc/new-percolation-channels)
28+
:percolator (component/using (perc/new-percolator conf) {:channels :percolation-channels})
29+
:http (component/using (http/new-http-server conf) {:communicator :communicator})
30+
:switchboard (component/using (sw/new-switchboard) {:comm-chans :communicator-channels
31+
:tc-chans :twitterclient-channels
32+
:pers-chans :persistence-channels
33+
:perc-chans :percolation-channels})))
2834
(def system (get-system conf))
2935

3036
(defn -main [& args]

Clojure-Websockets/src/clj/birdwatch/percolator.clj

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
(let [sha (sha1 (str query))]
1717
(swap! subscriptions assoc uid sha)
1818
(perc/register-query conn "percolator" sha :query query)
19-
(log/info "Percolation registered for query" query "with SHA1" sha)))
19+
(log/debug "Percolation registered for query" query "with SHA1" sha)))
2020

2121
(defn- run-percolation-register-loop [register-percolation-chan conn subscriptions]
2222
"loop for finding percolation matches and delivering those on the appropriate socket"
@@ -32,17 +32,24 @@
3232
(put! percolation-matches-chan [t matches @subscriptions]) ;; send deref'd subscriptions as val
3333
(recur))))
3434

35-
(defrecord Percolator [conf channels lala conn subscriptions]
35+
(defrecord Percolator [conf channels conn subscriptions]
3636
component/Lifecycle
3737
(start [component] (log/info "Starting Percolator Component")
3838
(let [conn (esr/connect (:es-address conf))
39-
subscriptions (atom {})
40-
percolation-chan (chan)]
41-
(tap (:tweets-mult channels) percolation-chan)
39+
subscriptions (atom {})]
4240
(run-percolation-register-loop (:register-percolation channels) conn subscriptions)
43-
(run-percolation-loop percolation-chan (:percolation-matches channels) conn subscriptions)
41+
(run-percolation-loop (:percolation channels) (:percolation-matches channels) conn subscriptions)
4442
(assoc component :conn conn :subscriptions subscriptions)))
4543
(stop [component] (log/info "Stopping Percolator Component") ;; TODO: proper teardown of resources
4644
(assoc component :conn nil :subscriptions nil)))
4745

4846
(defn new-percolator [conf] (map->Percolator {:conf conf}))
47+
48+
(defrecord Percolation-Channels []
49+
component/Lifecycle
50+
(start [component] (log/info "Starting Percolation Channels Component")
51+
(assoc component :percolation (chan) :register-percolation (chan) :percolation-matches (chan)))
52+
(stop [component] (log/info "Stop Percolation Channels Component")
53+
(assoc component :percolation nil :register-percolation nil :percolation-matches nil)))
54+
55+
(defn new-percolation-channels [] (map->Percolation-Channels {}))

Clojure-Websockets/src/clj/birdwatch/persistence.clj

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,9 @@
6363
(start [component]
6464
(log/info "Starting Persistence Component")
6565
(let [conn (esr/connect (:es-address conf))
66-
native-conn (esn/connect [(:es-native-address conf)] {"cluster.name" (:es-cluster-name conf)})
67-
persistence-chan (chan)
68-
rt-persistence-chan (chan)]
69-
(tap (:tweets-mult channels) persistence-chan)
70-
(tap (:tweets-mult channels) rt-persistence-chan)
71-
(run-persistence-loop persistence-chan conf conn)
72-
(run-rt-persistence-loop rt-persistence-chan persistence-chan)
66+
native-conn (esn/connect [(:es-native-address conf)] {"cluster.name" (:es-cluster-name conf)})]
67+
(run-persistence-loop (:persistence channels) conf conn)
68+
(run-rt-persistence-loop (:rt-persistence channels) (:persistence channels))
7369
(run-find-missing-loop (:tweet-missing channels) (:missing-tweet-found channels) conf conn)
7470
(run-query-loop (:query channels) (:query-results channels) conf native-conn)
7571
(run-tweet-count-loop (:tweet-count channels) conf conn)
@@ -79,3 +75,20 @@
7975
(assoc component :conn nil :native-conn nil)))
8076

8177
(defn new-persistence [conf] (map->Persistence {:conf conf}))
78+
79+
(defrecord Persistence-Channels []
80+
component/Lifecycle
81+
(start [component] (log/info "Starting Persistence Channels Component")
82+
(assoc component
83+
:query (chan)
84+
:query-results (chan)
85+
:tweet-missing (chan)
86+
:missing-tweet-found (chan)
87+
:persistence (chan)
88+
:rt-persistence (chan)
89+
:tweet-count (chan)))
90+
(stop [component] (log/info "Stop Persistence Channels Component")
91+
(assoc component :query nil :query-results nil :tweet-missing nil :missing-tweet-found nil
92+
:persistence nil :rt-persistence nil :tweet-count nil)))
93+
94+
(defn new-persistence-channels [] (map->Persistence-Channels {}))
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
(ns birdwatch.switchboard
2+
(:gen-class)
3+
(:require
4+
[clojure.tools.logging :as log]
5+
[com.stuartsierra.component :as component]
6+
[clojure.core.async :as async :refer [chan mult tap pipe]]))
7+
8+
;; This component is the central switchboard for information flow in this application.
9+
;; The individual channel components come together like wiring harnesses in a car. One for the engine,
10+
;; one for the AC, one for the soundsystem and so on.
11+
12+
(defrecord Switchboard [comm-chans tc-chans pers-chans perc-chans]
13+
component/Lifecycle
14+
(start [component] (log/info "Starting Switchboard Component")
15+
(let [tweets-mult (mult (:tweets tc-chans))]
16+
(tap tweets-mult (:percolation perc-chans)) ; Tweets are distributed to multiple channels
17+
(tap tweets-mult (:persistence pers-chans)) ; through tapping the mult created above
18+
(tap tweets-mult (:rt-persistence pers-chans))
19+
;; connect channels 1 on 1. here, it would be easy to add message logging
20+
(pipe (:tweet-count pers-chans) (:tweet-count comm-chans))
21+
(pipe (:register-percolation comm-chans) (:register-percolation perc-chans))
22+
(pipe (:percolation-matches perc-chans) (:percolation-matches comm-chans))
23+
(pipe (:tweet-missing comm-chans) (:tweet-missing pers-chans))
24+
(pipe (:missing-tweet-found pers-chans) (:missing-tweet-found comm-chans))
25+
(pipe (:query comm-chans) (:query pers-chans))
26+
(pipe (:query-results pers-chans) (:query-results comm-chans))))
27+
(stop [component] (log/info "Stop Switchboard Component")))
28+
29+
(defn new-switchboard [] (map->Switchboard {}))

Clojure-Websockets/src/clj/birdwatch/twitter_client.clj

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,12 @@
106106
(assoc component :connection nil :msg-chan nil :chunk-chan nil :chunk-buff nil :watch-active nil)))
107107

108108
(defn new-twitterclient [conf] (map->Twitterclient {:conf conf}))
109+
110+
(defrecord Twitterclient-Channels []
111+
component/Lifecycle
112+
(start [component] (log/info "Starting Twitterclient Channels Component")
113+
(assoc component :tweets (chan))) ; channel for new tweets received from streaming API
114+
(stop [component] (log/info "Stop Twitterclient Channels Component")
115+
(assoc component :tweets nil)))
116+
117+
(defn new-twitterclient-channels [] (map->Twitterclient-Channels {}))

Clojure-Websockets/src/clj/logback.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,4 @@
1919
<root level="info">
2020
<appender-ref ref="STDOUT" />
2121
</root>
22-
23-
<root level="debug">
24-
<appender-ref ref="FILE" />
25-
</root>
2622
</configuration>

0 commit comments

Comments
 (0)