Skip to content

Commit 26cc00a

Browse files
author
Nathan Marz
committed
testing progress
1 parent ff2e570 commit 26cc00a

File tree

4 files changed

+136
-3
lines changed

4 files changed

+136
-3
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package backtype.storm.testing;
2+
3+
import backtype.storm.task.TopologyContext;
4+
import backtype.storm.topology.OutputFieldsDeclarer;
5+
import backtype.storm.topology.base.BaseBatchBolt;
6+
import backtype.storm.coordination.BatchOutputCollector;
7+
import backtype.storm.tuple.Fields;
8+
import backtype.storm.tuple.Tuple;
9+
import backtype.storm.tuple.Values;
10+
import backtype.storm.utils.Utils;
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
14+
public class KeyedCountingBatchBolt extends BaseBatchBolt {
15+
BatchOutputCollector _collector;
16+
Object _id;
17+
Map<Object, Integer> _counts = new HashMap<Object, Integer>();
18+
19+
@Override
20+
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
21+
_collector = collector;
22+
_id = id;
23+
}
24+
25+
@Override
26+
public void execute(Tuple tuple) {
27+
Object key = tuple.getValue(1);
28+
int curr = Utils.get(_counts, key, 0);
29+
_counts.put(key, curr + 1);
30+
}
31+
32+
@Override
33+
public void finishBatch() {
34+
for(Object key: _counts.keySet()) {
35+
_collector.emit(new Values(_id, key, _counts.get(key)));
36+
}
37+
}
38+
39+
@Override
40+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
41+
declarer.declare(new Fields("tx", "key", "count"));
42+
}
43+
44+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package backtype.storm.testing;
2+
3+
import backtype.storm.task.TopologyContext;
4+
import backtype.storm.topology.OutputFieldsDeclarer;
5+
import backtype.storm.coordination.BatchOutputCollector;
6+
import backtype.storm.topology.base.BaseCommitterBolt;
7+
import backtype.storm.transactional.TransactionAttempt;
8+
import backtype.storm.tuple.Fields;
9+
import backtype.storm.tuple.Tuple;
10+
import backtype.storm.tuple.Values;
11+
import backtype.storm.utils.Utils;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
15+
public class KeyedCountingCommitterBolt extends BaseCommitterBolt {
16+
TransactionAttempt _id;
17+
Map<Object, Integer> _counts = new HashMap<Object, Integer>();
18+
19+
@Override
20+
public void prepare(Map conf, TopologyContext context, TransactionAttempt id) {
21+
_id = id;
22+
}
23+
24+
@Override
25+
public void execute(Tuple tuple) {
26+
Object key = tuple.getValue(1);
27+
int curr = Utils.get(_counts, key, 0);
28+
_counts.put(key, curr + 1);
29+
}
30+
31+
@Override
32+
public void commit(BatchOutputCollector collector) {
33+
for(Object key: _counts.keySet()) {
34+
collector.emit(new Values(_id, key, _counts.get(key)));
35+
}
36+
}
37+
38+
@Override
39+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
40+
declarer.declare(new Fields("tx", "key", "count"));
41+
}
42+
43+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package backtype.storm.testing;
2+
3+
import backtype.storm.coordination.BatchOutputCollector;
4+
import backtype.storm.task.TopologyContext;
5+
import backtype.storm.topology.OutputFieldsDeclarer;
6+
import backtype.storm.topology.base.BaseBatchBolt;
7+
import backtype.storm.tuple.Fields;
8+
import backtype.storm.tuple.Tuple;
9+
import backtype.storm.tuple.Values;
10+
import backtype.storm.utils.Utils;
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
14+
public class KeyedSummingBatchBolt extends BaseBatchBolt {
15+
BatchOutputCollector _collector;
16+
Object _id;
17+
Map<Object, Integer> _sums = new HashMap<Object, Integer>();
18+
19+
@Override
20+
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
21+
_collector = collector;
22+
_id = id;
23+
}
24+
25+
@Override
26+
public void execute(Tuple tuple) {
27+
Object key = tuple.getValue(1);
28+
int curr = Utils.get(_sums, key, 0);
29+
_sums.put(key, curr + tuple.getInteger(2));
30+
}
31+
32+
@Override
33+
public void finishBatch() {
34+
for(Object key: _sums.keySet()) {
35+
_collector.emit(new Values(_id, key, _sums.get(key)));
36+
}
37+
}
38+
39+
@Override
40+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
41+
declarer.declare(new Fields("tx", "key", "sum"));
42+
}
43+
}

test/clj/backtype/storm/transactional_test.clj

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
(:import [backtype.storm.transactional TransactionalSpoutCoordinator ITransactionalSpout ITransactionalSpout$Coordinator TransactionAttempt
55
TransactionalTopologyBuilder])
66
(:import [backtype.storm.transactional.state TransactionalState RotatingTransactionalState RotatingTransactionalState$StateInitializer])
7-
(:import [backtype.storm.testing CountingBatchBolt MemoryTransactionalSpout])
7+
(:import [backtype.storm.testing CountingBatchBolt MemoryTransactionalSpout
8+
KeyedCountingBatchBolt KeyedCountingCommitterBolt KeyedSummingBatchBolt])
89
(:use [backtype.storm bootstrap testing])
910
(:use [backtype.storm.daemon common])
1011
)
@@ -312,7 +313,9 @@
312313
(Fields. ["word" "amt"])
313314
3)
314315
2))
315-
316+
(-> builder
317+
(.setBolt "count" (KeyedSummingBatchBolt.) 2)
318+
(.fieldsGrouping "spout" (Fields. ["word"])))
316319

317320
(add-transactional-data data
318321
{0 [["dog" 3]
@@ -329,6 +332,6 @@
329332
(complete-topology cluster
330333
(.buildTopology builder)
331334
:storm-conf {TOPOLOGY-DEBUG true}))
332-
(println results)
335+
(println (read-tuples results "count"))
333336

334337
)))

0 commit comments

Comments
 (0)