File tree Expand file tree Collapse file tree 6 files changed +80
-5
lines changed Expand file tree Collapse file tree 6 files changed +80
-5
lines changed Original file line number Diff line number Diff line change 24
24
[com.cognitect/transit-clj " 0.8.259" ]
25
25
[com.cognitect/transit-cljs " 0.8.188" ]
26
26
[clj-pid " 0.1.1" ]
27
- [com.stuartsierra/component " 0.2.2" ]]
27
+ [com.stuartsierra/component " 0.2.2" ]
28
+ [com.taoensso/carmine " 2.7.1" ]]
28
29
29
30
:source-paths [" src/clj/" ]
30
31
Original file line number Diff line number Diff line change
1
+ (ns birdwatch.interop.component
2
+ (:gen-class )
3
+ (:require
4
+ [birdwatch.interop.redis :as red]
5
+ [clojure.tools.logging :as log]
6
+ [clojure.pprint :as pp]
7
+ [com.stuartsierra.component :as component]
8
+ [clojure.core.async :as async :refer [chan tap]]))
9
+
10
+ ; ;; The interop component allows sending and receiving messages via Redis Pub/Sub.
11
+ ; ;; It has both a :send and a :receive channel and can be used on both sides of the Pub/Sub.
12
+ (defrecord Interop [conf channels conn]
13
+ component /Lifecycle
14
+ (start [component] (log/info " Starting Interop Component" )
15
+ (let [conn {:pool {} :spec {:host (:redis-host conf) :port (:redis-port conf)}}]
16
+ (red/subscribe-topic (:receive channels) conn " matches" )
17
+ (red/run-send-loop (:send channels) conn " matches" )
18
+ (assoc component :conn conn)))
19
+ (stop [component] (log/info " Stopping Interop Component" ) ; ; TODO: proper teardown of resources
20
+ (assoc component :conn nil )))
21
+
22
+ (defn new-interop [conf] (map->Interop {:conf conf}))
23
+
24
+ (defrecord Interop-Channels []
25
+ component /Lifecycle
26
+ (start [component]
27
+ (log/info " Starting Interop Channels Component" )
28
+ (assoc component :send (chan ) :receive (chan )))
29
+ (stop [component]
30
+ (log/info " Stop Interop Channels Component" )
31
+ (assoc component :send nil :receive nil )))
32
+
33
+ (defn new-interop-channels [] (map->Interop-Channels {}))
Original file line number Diff line number Diff line change
1
+ (ns birdwatch.interop.redis
2
+ (:gen-class )
3
+ (:require
4
+ [clojure.tools.logging :as log]
5
+ [clojure.pprint :as pp]
6
+ [clojure.core.match :as match :refer (match )]
7
+ [taoensso.carmine :as car :refer (wcar )]
8
+ [clojure.core.async :as async :refer [<! put! go-loop]]))
9
+
10
+ (defn run-send-loop
11
+ " loop for sending items by publishing them on a REDIS pub topic"
12
+ [send-chan conn topic]
13
+ (go-loop [] (let [msg (<! send-chan)]
14
+ (car/wcar conn (car/publish topic msg))
15
+ (recur ))))
16
+
17
+ (defn- msg-handler-fn
18
+ " create handler function for messages from Redis Pub/Sub"
19
+ [receive-chan]
20
+ (fn [[msg-type topic payload]]
21
+ (when (= msg-type " message" )
22
+ (put! receive-chan payload))))
23
+
24
+ (defn subscribe-topic
25
+ " subscribe to topic, put items on specified channel"
26
+ [receive-chan conn topic]
27
+ (car/with-new-pubsub-listener
28
+ (:spec conn)
29
+ {" matches" (msg-handler-fn receive-chan)}
30
+ (car/subscribe topic))
31
+ (log/info " subscribe-topic" ))
Original file line number Diff line number Diff line change 5
5
[birdwatch.communicator.component :as comm]
6
6
[birdwatch.persistence.component :as p]
7
7
[birdwatch.percolator.component :as perc]
8
+ [birdwatch.interop.component :as iop]
8
9
[birdwatch.http.component :as http]
9
10
[birdwatch.switchboard :as sw]
10
11
[clojure.edn :as edn]
23
24
:twitterclient-channels (tc/new-twitterclient-channels )
24
25
:persistence-channels (p/new-persistence-channels )
25
26
:percolation-channels (perc/new-percolation-channels )
27
+ :interop-channels (iop/new-interop-channels )
26
28
:comm (component/using (comm/new-communicator ) {:channels :comm-channels })
29
+ :interop (component/using (iop/new-interop conf) {:channels :interop-channels })
27
30
:twitterclient (component/using (tc/new-twitterclient conf) {:channels :twitterclient-channels })
28
31
:persistence (component/using (p/new-persistence conf) {:channels :persistence-channels })
29
32
:percolator (component/using (perc/new-percolator conf) {:channels :percolation-channels })
30
33
:http (component/using (http/new-http-server conf) {:comm :comm })
31
34
:switchboard (component/using (sw/new-switchboard ) {:comm-chans :comm-channels
32
35
:tc-chans :twitterclient-channels
33
36
:pers-chans :persistence-channels
34
- :perc-chans :percolation-channels })))
37
+ :perc-chans :percolation-channels
38
+ :iop-chans :interop-channels })))
35
39
(def system (get-system conf))
36
40
37
41
(defn -main [& args]
Original file line number Diff line number Diff line change 9
9
; ;;; The individual channel components come together like wiring harnesses in a car. One for the engine,
10
10
; ;;; one for the AC, one for the soundsystem and so on.
11
11
12
- (defrecord Switchboard [comm-chans tc-chans pers-chans perc-chans]
12
+ (defrecord Switchboard [comm-chans tc-chans pers-chans perc-chans iop-chans ]
13
13
component /Lifecycle
14
14
(start [component] (log/info " Starting Switchboard Component" )
15
15
(let [tweets-mult (mult (:tweets tc-chans))]
19
19
; ; Connect channels 1 on 1. Here, it would be easy to add message logging.
20
20
(pipe (:tweet-count pers-chans) (:tweet-count comm-chans))
21
21
(pipe (:register-perc comm-chans) (:register-percolation perc-chans))
22
- (pipe (:percolation-matches perc-chans) (:perc-matches comm-chans))
22
+
23
+ ; (pipe (:percolation-matches perc-chans) (:perc-matches comm-chans))
24
+ (pipe (:percolation-matches perc-chans) (:send iop-chans))
25
+ (pipe (:receive iop-chans) (:perc-matches comm-chans))
26
+
23
27
(pipe (:tweet-missing comm-chans) (:tweet-missing pers-chans))
24
28
(pipe (:missing-tweet-found pers-chans) (:missing-tweet-found comm-chans))
25
29
(pipe (:query comm-chans) (:query pers-chans))
Original file line number Diff line number Diff line change 16
16
:tw-check-interval-sec 10
17
17
:tw-restart-wait 60
18
18
:port 8888
19
- :pidfile-name " birdwatch.pid" }
19
+ :pidfile-name " birdwatch.pid"
20
+ :redis-host " 127.0.0.1"
21
+ :redis-port 6379 }
You can’t perform that action at this time.
0 commit comments