Skip to content

Commit 138a2f9

Browse files
author
Nathan Marz
committed
Merge branch 'master' into 0.8.2
2 parents 7769afb + f28c17d commit 138a2f9

File tree

9 files changed

+25
-8
lines changed

9 files changed

+25
-8
lines changed

src/clj/backtype/storm/messaging/zmq.clj

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
(defprotocol ZMQContextQuery
2727
(zmq-context [this]))
2828

29+
(def NOBLOCK-SNDMORE (bit-or ZMQ/SNDMORE ZMQ/NOBLOCK))
30+
2931
(deftype ZMQConnection [socket ^ByteBuffer bb]
3032
Connection
3133
(recv-with-flags [this flags]
@@ -37,8 +39,8 @@
3739
(send [this task message]
3840
(.clear bb)
3941
(.putShort bb (short task))
40-
(mq/send socket (.array bb) ZMQ/SNDMORE)
41-
(mq/send socket message)) ;; TODO: temporarily remove the noblock flag
42+
(mq/send socket (.array bb) NOBLOCK-SNDMORE)
43+
(mq/send socket message ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
4244
(close [this]
4345
(.close socket)
4446
))
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package backtype.storm.task;
2+
3+
4+
public interface IMetricsContext {
5+
6+
}

src/jvm/backtype/storm/task/TopologyContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* <p>The TopologyContext is also used to declare ISubscribedState objects to
2525
* synchronize state with StateSpouts this object is subscribed to.</p>
2626
*/
27-
public class TopologyContext extends WorkerTopologyContext {
27+
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
2828
private Integer _taskId;
2929
private Map<String, Object> _taskData = new HashMap<String, Object>();
3030
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();

src/jvm/storm/trident/operation/TridentOperationContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package storm.trident.operation;
22

3+
import backtype.storm.task.IMetricsContext;
34
import backtype.storm.task.TopologyContext;
45
import backtype.storm.tuple.Fields;
56
import storm.trident.tuple.TridentTuple;
67
import storm.trident.tuple.TridentTupleView.ProjectionFactory;
78

8-
public class TridentOperationContext {
9+
public class TridentOperationContext implements IMetricsContext{
910
TridentTuple.Factory _factory;
1011
TopologyContext _topoContext;
1112

src/jvm/storm/trident/planner/SubtopologyBolt.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void prepare(Map conf, TopologyContext context, BatchOutputCollector batc
4949
int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
5050
for(Node n: _nodes) {
5151
if(n.stateInfo!=null) {
52-
State s = n.stateInfo.spec.stateFactory.makeState(conf, context.getThisTaskIndex(), thisComponentNumTasks);
52+
State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
5353
context.setTaskData(n.stateInfo.id, s);
5454
}
5555
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package storm.trident.state;
22

3+
import backtype.storm.task.IMetricsContext;
34
import java.io.Serializable;
45
import java.util.Map;
56

67
public interface StateFactory extends Serializable {
7-
State makeState(Map conf, int partitionIndex, int numPartitions);
8+
State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);
89
}

src/jvm/storm/trident/testing/LRUMemoryMapState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package storm.trident.testing;
22

3+
import backtype.storm.task.IMetricsContext;
34
import storm.trident.state.ITupleCollection;
45
import backtype.storm.tuple.Values;
56
import java.util.*;
@@ -70,7 +71,7 @@ public Factory(int maxSize) {
7071
}
7172

7273
@Override
73-
public State makeState(Map conf, int partitionIndex, int numPartitions) {
74+
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
7475
return new LRUMemoryMapState(_maxSize, _id);
7576
}
7677
}

src/jvm/storm/trident/testing/MemoryMapState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package storm.trident.testing;
22

3+
import backtype.storm.task.IMetricsContext;
34
import storm.trident.state.ITupleCollection;
45
import backtype.storm.tuple.Values;
56
import java.util.*;
@@ -67,7 +68,7 @@ public Factory() {
6768
}
6869

6970
@Override
70-
public State makeState(Map conf, int partitionIndex, int numPartitions) {
71+
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
7172
return new MemoryMapState(_id);
7273
}
7374
}

src/jvm/storm/trident/tuple/ComboList.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
public class ComboList extends AbstractList<Object> {
99
public static class Factory implements Serializable {
1010
Pointer[] index;
11+
int numLists;
1112

1213
public Factory(int... sizes) {
14+
numLists = sizes.length;
1315
int total = 0;
1416
for(int size: sizes) {
1517
total+=size;
@@ -27,6 +29,9 @@ public Factory(int... sizes) {
2729
}
2830

2931
public ComboList create(List[] delegates) {
32+
if(delegates.length!=numLists) {
33+
throw new RuntimeException("Expected " + numLists + " lists, but instead got " + delegates.length + " lists");
34+
}
3035
return new ComboList(delegates, index);
3136
}
3237
}

0 commit comments

Comments
 (0)