|
3 | 3 | (:use [birdwatch.conf] ;; TODO: remove conf dependency
|
4 | 4 | [clojure.data.priority-map])
|
5 | 5 | (:require
|
6 |
| - [birdwatch.atoms :as a] |
7 | 6 | [birdwatch.data :as d]
|
8 |
| - [clojure.edn :as edn] |
9 | 7 | [clojure.tools.logging :as log]
|
10 |
| - [pandect.core :refer [sha1]] |
11 | 8 | [clojure.pprint :as pp]
|
12 | 9 | [clojurewerkz.elastisch.native :as esn]
|
13 | 10 | [clojurewerkz.elastisch.native.document :as esnd]
|
14 | 11 | [clojurewerkz.elastisch.native.response :as esnrsp]
|
15 | 12 | [clojurewerkz.elastisch.rest :as esr]
|
16 | 13 | [clojurewerkz.elastisch.rest.document :as esd]
|
17 |
| - [clojurewerkz.elastisch.rest.percolation :as perc] |
18 | 14 | [clojurewerkz.elastisch.query :as q]
|
19 | 15 | [clojurewerkz.elastisch.rest.response :as esrsp]
|
20 | 16 | [com.stuartsierra.component :as component]
|
21 | 17 | [clojure.core.async :as async :refer [<! <!! >! >!! chan put! alts! timeout go go-loop close!]]))
|
22 | 18 |
|
23 | 19 | ;; TODO: move connection objects into component
|
24 | 20 | (def conn (esr/connect (:es-address conf)))
|
25 |
| -(def native-conn (esn/connect [["127.0.0.1" 9300]] {"cluster.name" "elasticsearch_mn"})) |
| 21 | +(def native-conn (esn/connect [(:es-native-address conf)] {"cluster.name" (:es-cluster-name conf)})) |
26 | 22 |
|
27 | 23 | (defn total-tweet-count []
|
28 | 24 | "get total count of indexed tweets from ElasticSearch"
|
|
76 | 72 | ;(log/info "top retweets in chunk" (pp/pprint (take 10 (into (priority-map-by >) (d/retweets res {})))))
|
77 | 73 | res))
|
78 | 74 |
|
79 |
| -(defn start-percolator [{:keys [query uid]}] |
80 |
| - "register percolation search with ID based on hash of the query" |
81 |
| - (let [sha (sha1 (str query))] |
82 |
| - (swap! a/subscriptions assoc uid sha) |
83 |
| - (perc/register-query conn "percolator" sha :query query) |
84 |
| - (log/info "Percolation registered for query" query "with SHA1" sha))) |
85 |
| - |
86 | 75 | ;; loop for persisting tweets
|
87 | 76 | (defn- run-persistence-loop [persistence-chan]
|
88 | 77 | (go-loop []
|
|
102 | 91 | (catch Exception ex (log/error ex "esd/put error"))))
|
103 | 92 | (recur))))
|
104 | 93 |
|
105 |
| -;; loop for finding percolation matches and delivering those on the appropriate socket |
106 |
| -(defn- run-percolation-loop [percolation-chan percolation-matches-chan] |
107 |
| - (go-loop [] |
108 |
| - (let [t (<! percolation-chan) |
109 |
| - response (perc/percolate conn "percolator" "tweet" :doc t) |
110 |
| - matches (into #{} (map #(:_id %1) (esrsp/matches-from response))) |
111 |
| - ] |
112 |
| - (put! percolation-matches-chan [t matches]) |
113 |
| - (recur)))) |
114 |
| - |
115 | 94 | (defn- run-find-missing-loop [tweet-missing-chan missing-tweet-found-chan]
|
116 | 95 | "starts loop for finding missing tweets, puts result on missing-tweet-found-chan "
|
117 | 96 | (go-loop []
|
|
139 | 118 |
|
140 | 119 | (run-persistence-loop (:persistence channels))
|
141 | 120 | (run-rt-persistence-loop (:rt-persistence channels))
|
142 |
| - (run-percolation-loop (:percolation channels) (:percolation-matches channels)) |
143 | 121 | (run-find-missing-loop (:tweet-missing channels) (:missing-tweet-found channels))
|
144 | 122 | (run-query-loop (:query channels) (:query-results channels))
|
145 | 123 |
|
|
0 commit comments