File tree 1 file changed +24
-0
lines changed
1 file changed +24
-0
lines changed Original file line number Diff line number Diff line change 34
34
(read-tuples results " 4" )))
35
35
))))
36
36
37
+ (defbolt emit-task-id [" tid" ] {:prepare true }
38
+ [conf context collector]
39
+ (let [tid (.getThisTaskIndex context)]
40
+ (bolt
41
+ (execute [tuple]
42
+ (emit-bolt! collector [tid] :anchor tuple)
43
+ (ack! collector tuple)
44
+ ))))
45
+
46
+ (deftest test-multi-tasks-per-executor
47
+ (with-simulated-time-local-cluster [cluster :supervisors 4 ]
48
+ (let [topology (thrift/mk-topology
49
+ {" 1" (thrift/mk-spout-spec (TestWordSpout. true ))}
50
+ {" 2" (thrift/mk-bolt-spec {" 1" :shuffle } emit-task-id
51
+ :parallelism-hint 3
52
+ :conf {TOPOLOGY-TASKS 6 })
53
+ })
54
+ results (complete-topology cluster
55
+ topology
56
+ :mock-sources {" 1" [[" a" ] [" a" ] [" a" ] [" a" ] [" a" ] [" a" ]]})]
57
+ (is (ms= [[0 ] [1 ] [2 ] [3 ] [4 ] [5 ]]
58
+ (read-tuples results " 2" )))
59
+ )))
60
+
37
61
(defbolt ack-every-other {} {:prepare true }
38
62
[conf context collector]
39
63
(let [state (atom -1 )]
You can’t perform that action at this time.
0 commit comments