File tree Expand file tree Collapse file tree 1 file changed +11
-13
lines changed Expand file tree Collapse file tree 1 file changed +11
-13
lines changed Original file line number Diff line number Diff line change 10
10
" Stateful transducer, counts processed items and updating last-received atom. Logs progress every 1000 items."
11
11
(fn [step]
12
12
(let [cnt (atom 0 )]
13
- (fn [r x]
14
- (swap! cnt inc)
15
- (when (zero? (mod @cnt 1000 )) (log/info " processed" @cnt " since startup" ))
16
- (reset! last-received (t/now ))
17
- (step r x)))))
13
+ (fn
14
+ ([r] (step r))
15
+ ([r x]
16
+ (swap! cnt inc)
17
+ (when (zero? (mod @cnt 1000 )) (log/info " processed" @cnt " since startup" ))
18
+ (reset! last-received (t/now ))
19
+ (step r x))))))
18
20
19
21
(defn- insert-newline [s]
20
22
" inserts missing line breaks after end of tweet"
26
28
(fn
27
29
([r] (step r))
28
30
([r x]
29
- (let [json-lines (-> (str @buff x) (insert-newline ) (str/split-lines ))
30
- to-process (butlast json-lines)]
31
- (reset! buff (last json-lines))
32
- (if to-process (reduce step r to-process) r)))))))
31
+ (let [json-lines (-> (str @buff x) (insert-newline ) (str/split-lines ))
32
+ to-process (butlast json-lines)]
33
+ (reset! buff (last json-lines))
34
+ (if to-process (reduce step r to-process) r)))))))
33
35
34
36
(defn- tweet? [data]
35
37
" Checks if data is a tweet. If so, pass on, otherwise log error."
52
54
53
55
(defn ex-handler [ex]
54
56
(log/error " Exception while processing chunk" ex))
55
-
56
-
57
-
58
-
You can’t perform that action at this time.
0 commit comments