Skip to content

Commit 9c14f8f

Browse files
author
Nathan Marz
committed
merge master, reset coordinatedbolt to use timecachemap
2 parents 3dfa3e7 + 4b38429 commit 9c14f8f

File tree

4 files changed

+93
-58
lines changed

4 files changed

+93
-58
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ NOTE: The change from 0.7.0 in which OutputCollector no longer assumes immutable
4646
* Throw helpful error message if StormSubmitter used without using storm client script
4747
* Add Values class as a default serialization
4848
* Bug fix: give absolute piddir to subprocesses (so that relative paths can be used for storm local dir)
49+
* Bug fix: Fixed critical bug in transactional topologies where a batch would be considered successful even if the batch didn't finish
4950
* Bug fix: Fixed critical bug in opaque transactional topologies that would lead to duplicate messages when using pipelining
5051
* Bug fix: Workers will now die properly if a ShellBolt subprocess dies (thanks tomo)
5152
* Bug fix: Hide the BasicOutputCollector#getOutputter method, since it shouldn't be a publicly available method
5253
* Bug fix: Zookeeper in local mode now always gets an unused port. This will eliminate conflicts with other local mode processes or other Zookeeper instances on a local machine. (thanks xumingming)
5354
* Bug fix: Fixed NPE in CoordinatedBolt it tuples emitted, acked, or failed for a request id that has already timed out. (thanks xumingming)
5455
* Bug fix: UI no longer errors for topologies with no assigned tasks (thanks xumingming)
5556
* Bug fix: emitDirect on SpoutOutputCollector now works
57+
* Bug fix: Fixed NPE when giving null parallelism hint for spout in TransactionalTopologyBuilder (thanks xumingming)
5658

5759
## 0.7.1
5860

README.markdown

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation. Storm is simple, can be used with any programming language, and is a lot of fun to use!
1+
Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation. Storm is simple, can be used with any programming language, [is used by many companies](https://github.com/nathanmarz/storm/wiki/Powered-By), and is a lot of fun to use!
22

33
The [Rationale page](https://github.com/nathanmarz/storm/wiki/Rationale) on the wiki explains what Storm is and why it was built. The [video](http://www.infoq.com/presentations/Storm) and [slides](http://www.slideshare.net/nathanmarz/storm-distributed-and-faulttolerant-realtime-computation) of Storm's launch presentation are also good introductions to the project.
44

@@ -51,3 +51,7 @@ You must not remove this notice, or any other, from this software.
5151
* Nicolas Yzet ([@nicoo](https://github.com/nicoo))
5252
* Fabian Neumann ([@hellp](https://github.com/hellp))
5353
* Soren Macbeth ([@sorenmacbeth](https://github.com/sorenmacbeth))
54+
55+
## Acknowledgements
56+
57+
YourKit is kindly supporting open source projects with its full-featured Java Profiler. YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications. Take a look at YourKit's leading software products: [YourKit Java Profiler](http://www.yourkit.com/java/profiler/index.jsp) and [YourKit .NET Profiler](http://www.yourkit.com/.net/profiler/index.jsp).

src/jvm/backtype/storm/coordination/CoordinatedBolt.java

Lines changed: 85 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.util.Map.Entry;
55
import backtype.storm.tuple.Values;
66
import backtype.storm.generated.GlobalStreamId;
7-
import backtype.storm.Config;
87
import java.util.Collection;
98
import backtype.storm.Constants;
109
import backtype.storm.generated.Grouping;
@@ -15,7 +14,7 @@
1514
import backtype.storm.topology.OutputFieldsDeclarer;
1615
import backtype.storm.tuple.Fields;
1716
import backtype.storm.tuple.Tuple;
18-
import backtype.storm.utils.RotatingMap;
17+
import backtype.storm.utils.TimeCacheMap;
1918
import backtype.storm.utils.Utils;
2019
import java.io.Serializable;
2120
import java.util.ArrayList;
@@ -89,9 +88,9 @@ public void ack(Tuple tuple) {
8988
if (track != null)
9089
track.receivedTuples++;
9190
}
92-
boolean failed = checkFinishId(tuple);
91+
boolean failed = checkFinishId(tuple, TupleType.REGULAR);
9392
if(failed) {
94-
_delegate.fail(tuple);
93+
_delegate.fail(tuple);
9594
} else {
9695
_delegate.ack(tuple);
9796
}
@@ -104,6 +103,7 @@ public void fail(Tuple tuple) {
104103
if (track != null)
105104
track.failed = true;
106105
}
106+
checkFinishId(tuple, TupleType.REGULAR);
107107
_delegate.fail(tuple);
108108
}
109109

@@ -132,7 +132,7 @@ private void updateTaskCounts(Object id, List<Integer> tasks) {
132132
private Integer _numSourceReports;
133133
private List<Integer> _countOutTasks = new ArrayList<Integer>();;
134134
private OutputCollector _collector;
135-
private RotatingMap<Object, TrackingInfo> _tracked;
135+
private TimeCacheMap<Object, TrackingInfo> _tracked;
136136

137137
public static class TrackingInfo {
138138
int reportCount = 0;
@@ -142,6 +142,7 @@ public static class TrackingInfo {
142142
Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
143143
boolean receivedId = false;
144144
boolean finished = false;
145+
List<Tuple> ackTuples = new ArrayList<Tuple>();
145146

146147
@Override
147148
public String toString() {
@@ -186,11 +187,11 @@ public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, I
186187
}
187188

188189
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
189-
RotatingMap.ExpiredCallback<Object, TrackingInfo> callback = null;
190+
TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
190191
if(_delegate instanceof TimeoutCallback) {
191192
callback = new TimeoutItems();
192193
}
193-
_tracked = new RotatingMap<Object, TrackingInfo>(context.maxTopologyMessageTimeout(), callback);
194+
_tracked = new TimeCacheMap<Object, TrackingInfo>(context.maxTopologyMessageTimeout(), callback);
194195
_collector = collector;
195196
_delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
196197
for(String component: Utils.get(context.getThisTargets(),
@@ -213,51 +214,73 @@ public void prepare(Map config, TopologyContext context, OutputCollector collect
213214
}
214215
}
215216

216-
private boolean checkFinishId(Tuple tup) {
217-
boolean ret = false;
217+
private boolean checkFinishId(Tuple tup, TupleType type) {
218218
Object id = tup.getValue(0);
219+
boolean failed = false;
220+
219221
synchronized(_tracked) {
220222
TrackingInfo track = _tracked.get(id);
221223
try {
222-
// if it timed out, then obviously it failed (hence the null check)
223-
if(track==null || track.failed) ret = true;
224-
if(track!=null
225-
&& !track.failed
226-
&& track.receivedId
227-
&& (_sourceArgs.isEmpty()
228-
||
229-
track.reportCount==_numSourceReports &&
230-
track.expectedTupleCount == track.receivedTuples)) {
231-
if(_delegate instanceof FinishedCallback) {
232-
((FinishedCallback)_delegate).finishedId(id);
224+
if(track!=null) {
225+
boolean delayed = false;
226+
if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
227+
track.ackTuples.add(tup);
228+
delayed = true;
233229
}
234-
if(!(_sourceArgs.isEmpty() ||
235-
tup.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID) ||
236-
(_idStreamSpec!=null && tup.getSourceGlobalStreamid().equals(_idStreamSpec._id))
237-
)) {
238-
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
230+
if(track.failed) {
231+
failed = true;
232+
for(Tuple t: track.ackTuples) {
233+
_collector.fail(t);
234+
}
235+
_tracked.remove(id);
236+
} else if(track.receivedId
237+
&& (_sourceArgs.isEmpty() ||
238+
track.reportCount==_numSourceReports &&
239+
track.expectedTupleCount == track.receivedTuples)){
240+
if(_delegate instanceof FinishedCallback) {
241+
((FinishedCallback)_delegate).finishedId(id);
242+
}
243+
if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
244+
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
245+
}
246+
Iterator<Integer> outTasks = _countOutTasks.iterator();
247+
while(outTasks.hasNext()) {
248+
int task = outTasks.next();
249+
int numTuples = get(track.taskEmittedTuples, task, 0);
250+
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
251+
}
252+
for(Tuple t: track.ackTuples) {
253+
_collector.ack(t);
254+
}
255+
track.finished = true;
256+
_tracked.remove(id);
239257
}
240-
Iterator<Integer> outTasks = _countOutTasks.iterator();
241-
while(outTasks.hasNext()) {
242-
int task = outTasks.next();
243-
int numTuples = get(track.taskEmittedTuples, task, 0);
244-
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
258+
if(!delayed && type!=TupleType.REGULAR) {
259+
if(track.failed) {
260+
_collector.fail(tup);
261+
} else {
262+
_collector.ack(tup);
263+
}
245264
}
246-
track.finished = true;
247-
_tracked.remove(id);
265+
} else {
266+
if(type!=TupleType.REGULAR) _collector.fail(tup);
248267
}
249268
} catch(FailedException e) {
250269
LOG.error("Failed to finish batch", e);
251-
track.failed = true;
252-
ret = true;
270+
for(Tuple t: track.ackTuples) {
271+
_collector.fail(t);
272+
}
273+
_tracked.remove(id);
274+
failed = true;
253275
}
254276
}
255-
return ret;
277+
return failed;
256278
}
257279

258280
public void execute(Tuple tuple) {
259281
Object id = tuple.getValue(0);
260282
TrackingInfo track;
283+
TupleType type = getTupleType(tuple);
261284
synchronized(_tracked) {
262285
track = _tracked.get(id);
263286
if(track==null) {
@@ -267,38 +290,25 @@ public void execute(Tuple tuple) {
267290
}
268291
}
269292

270-
if(_idStreamSpec!=null
271-
&& tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
293+
if(type==TupleType.ID) {
272294
synchronized(_tracked) {
273295
track.receivedId = true;
274296
}
275-
boolean failed = checkFinishId(tuple);
276-
if(failed) {
277-
_collector.fail(tuple);
278-
} else {
279-
_collector.ack(tuple);
280-
}
281-
} else if(!_sourceArgs.isEmpty()
282-
&& tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
297+
checkFinishId(tuple, type);
298+
} else if(type==TupleType.COORD) {
283299
int count = (Integer) tuple.getValue(1);
284300
synchronized(_tracked) {
285301
track.reportCount++;
286302
track.expectedTupleCount+=count;
287303
}
288-
boolean failed = checkFinishId(tuple);
289-
if(failed) {
290-
_collector.fail(tuple);
291-
} else {
292-
_collector.ack(tuple);
293-
}
304+
checkFinishId(tuple, type);
294305
} else {
295306
synchronized(_tracked) {
296307
_delegate.execute(tuple);
297308
}
298309
}
299310
}
300311

301-
@Override
302312
public void cleanup() {
303313
_delegate.cleanup();
304314
}
@@ -319,16 +329,35 @@ private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent,
319329
return ret;
320330
}
321331

322-
private class TimeoutItems implements RotatingMap.ExpiredCallback<Object, TrackingInfo> {
332+
private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
323333
@Override
324334
public void expire(Object id, TrackingInfo val) {
325335
synchronized(_tracked) {
326-
// make sure we don't time out something that has been finished. the combination of
327-
// the flag and the lock ensure this
336+
// the combination of the lock and the finished flag ensure that
337+
// an id is never timed out if it has been finished
338+
val.failed = true;
328339
if(!val.finished) {
329340
((TimeoutCallback) _delegate).timeoutId(id);
330341
}
331342
}
332343
}
333-
}
344+
}
345+
346+
private TupleType getTupleType(Tuple tuple) {
347+
if(_idStreamSpec!=null
348+
&& tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
349+
return TupleType.ID;
350+
} else if(!_sourceArgs.isEmpty()
351+
&& tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
352+
return TupleType.COORD;
353+
} else {
354+
return TupleType.REGULAR;
355+
}
356+
}
357+
358+
static enum TupleType {
359+
REGULAR,
360+
ID,
361+
COORD
362+
}
334363
}

src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpo
5252
_id = id;
5353
_spoutId = spoutId;
5454
_spout = spout;
55-
_spoutParallelism = spoutParallelism.intValue();
55+
_spoutParallelism = (spoutParallelism == null) ? null : spoutParallelism.intValue();
5656
}
5757

5858
public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpout spout) {

0 commit comments

Comments
 (0)