Skip to content

Commit 1a648c1

Browse files
committed
introduction of interop component for Redis Pub/Sub
The interop component allows sending and receiving messages via Redis Pub/Sub. It has both a :send and a :receive channel and can be used on both sides of the Pub/Sub. In this commit, the application is still complete but all messages are sent over Pub/Sub so the next step will be to separate this into two applications.
1 parent be16363 commit 1a648c1

File tree

6 files changed

+80
-5
lines changed

6 files changed

+80
-5
lines changed

Clojure-Websockets/TwitterClient/project.clj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
[com.cognitect/transit-clj "0.8.259"]
2525
[com.cognitect/transit-cljs "0.8.188"]
2626
[clj-pid "0.1.1"]
27-
[com.stuartsierra/component "0.2.2"]]
27+
[com.stuartsierra/component "0.2.2"]
28+
[com.taoensso/carmine "2.7.1"]]
2829

2930
:source-paths ["src/clj/"]
3031

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
(ns birdwatch.interop.component
2+
(:gen-class)
3+
(:require
4+
[birdwatch.interop.redis :as red]
5+
[clojure.tools.logging :as log]
6+
[clojure.pprint :as pp]
7+
[com.stuartsierra.component :as component]
8+
[clojure.core.async :as async :refer [chan tap]]))
9+
10+
;;; The interop component allows sending and receiving messages via Redis Pub/Sub.
11+
;;; It has both a :send and a :receive channel and can be used on both sides of the Pub/Sub.
12+
(defrecord Interop [conf channels conn]
13+
component/Lifecycle
14+
(start [component] (log/info "Starting Interop Component")
15+
(let [conn {:pool {} :spec {:host (:redis-host conf) :port (:redis-port conf)}}]
16+
(red/subscribe-topic (:receive channels) conn "matches")
17+
(red/run-send-loop (:send channels) conn "matches")
18+
(assoc component :conn conn)))
19+
(stop [component] (log/info "Stopping Interop Component") ;; TODO: proper teardown of resources
20+
(assoc component :conn nil)))
21+
22+
(defn new-interop [conf] (map->Interop {:conf conf}))
23+
24+
(defrecord Interop-Channels []
25+
component/Lifecycle
26+
(start [component]
27+
(log/info "Starting Interop Channels Component")
28+
(assoc component :send (chan) :receive (chan)))
29+
(stop [component]
30+
(log/info "Stop Interop Channels Component")
31+
(assoc component :send nil :receive nil)))
32+
33+
(defn new-interop-channels [] (map->Interop-Channels {}))
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
(ns birdwatch.interop.redis
2+
(:gen-class)
3+
(:require
4+
[clojure.tools.logging :as log]
5+
[clojure.pprint :as pp]
6+
[clojure.core.match :as match :refer (match)]
7+
[taoensso.carmine :as car :refer (wcar)]
8+
[clojure.core.async :as async :refer [<! put! go-loop]]))
9+
10+
(defn run-send-loop
11+
"loop for sending items by publishing them on a REDIS pub topic"
12+
[send-chan conn topic]
13+
(go-loop [] (let [msg (<! send-chan)]
14+
(car/wcar conn (car/publish topic msg))
15+
(recur))))
16+
17+
(defn- msg-handler-fn
18+
"create handler function for messages from Redis Pub/Sub"
19+
[receive-chan]
20+
(fn [[msg-type topic payload]]
21+
(when (= msg-type "message")
22+
(put! receive-chan payload))))
23+
24+
(defn subscribe-topic
25+
"subscribe to topic, put items on specified channel"
26+
[receive-chan conn topic]
27+
(car/with-new-pubsub-listener
28+
(:spec conn)
29+
{"matches" (msg-handler-fn receive-chan)}
30+
(car/subscribe topic))
31+
(log/info "subscribe-topic"))

Clojure-Websockets/TwitterClient/src/clj/birdwatch/main.clj

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
[birdwatch.communicator.component :as comm]
66
[birdwatch.persistence.component :as p]
77
[birdwatch.percolator.component :as perc]
8+
[birdwatch.interop.component :as iop]
89
[birdwatch.http.component :as http]
910
[birdwatch.switchboard :as sw]
1011
[clojure.edn :as edn]
@@ -23,15 +24,18 @@
2324
:twitterclient-channels (tc/new-twitterclient-channels)
2425
:persistence-channels (p/new-persistence-channels)
2526
:percolation-channels (perc/new-percolation-channels)
27+
:interop-channels (iop/new-interop-channels)
2628
:comm (component/using (comm/new-communicator) {:channels :comm-channels})
29+
:interop (component/using (iop/new-interop conf) {:channels :interop-channels})
2730
:twitterclient (component/using (tc/new-twitterclient conf) {:channels :twitterclient-channels})
2831
:persistence (component/using (p/new-persistence conf) {:channels :persistence-channels})
2932
:percolator (component/using (perc/new-percolator conf) {:channels :percolation-channels})
3033
:http (component/using (http/new-http-server conf) {:comm :comm})
3134
:switchboard (component/using (sw/new-switchboard) {:comm-chans :comm-channels
3235
:tc-chans :twitterclient-channels
3336
:pers-chans :persistence-channels
34-
:perc-chans :percolation-channels})))
37+
:perc-chans :percolation-channels
38+
:iop-chans :interop-channels})))
3539
(def system (get-system conf))
3640

3741
(defn -main [& args]

Clojure-Websockets/TwitterClient/src/clj/birdwatch/switchboard.clj

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
;;;; The individual channel components come together like wiring harnesses in a car. One for the engine,
1010
;;;; one for the AC, one for the soundsystem and so on.
1111

12-
(defrecord Switchboard [comm-chans tc-chans pers-chans perc-chans]
12+
(defrecord Switchboard [comm-chans tc-chans pers-chans perc-chans iop-chans]
1313
component/Lifecycle
1414
(start [component] (log/info "Starting Switchboard Component")
1515
(let [tweets-mult (mult (:tweets tc-chans))]
@@ -19,7 +19,11 @@
1919
;; Connect channels 1 on 1. Here, it would be easy to add message logging.
2020
(pipe (:tweet-count pers-chans) (:tweet-count comm-chans))
2121
(pipe (:register-perc comm-chans) (:register-percolation perc-chans))
22-
(pipe (:percolation-matches perc-chans) (:perc-matches comm-chans))
22+
23+
;(pipe (:percolation-matches perc-chans) (:perc-matches comm-chans))
24+
(pipe (:percolation-matches perc-chans) (:send iop-chans))
25+
(pipe (:receive iop-chans) (:perc-matches comm-chans))
26+
2327
(pipe (:tweet-missing comm-chans) (:tweet-missing pers-chans))
2428
(pipe (:missing-tweet-found pers-chans) (:missing-tweet-found comm-chans))
2529
(pipe (:query comm-chans) (:query pers-chans))

Clojure-Websockets/TwitterClient/twitterconf-tpl.edn

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@
1616
:tw-check-interval-sec 10
1717
:tw-restart-wait 60
1818
:port 8888
19-
:pidfile-name "birdwatch.pid"}
19+
:pidfile-name "birdwatch.pid"
20+
:redis-host "127.0.0.1"
21+
:redis-port 6379}

0 commit comments

Comments
 (0)