Skip to content

Commit d5f16d5

Browse files
author
Thomas Jack
committed
add synchronous ShellSpout, update storm.py
1 parent 8d8194d commit d5f16d5

File tree

9 files changed

+224
-31
lines changed

9 files changed

+224
-31
lines changed

src/clj/backtype/storm/clojure.clj

+1
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@
229229
(defalias bolt-spec thrift/mk-bolt-spec)
230230
(defalias spout-spec thrift/mk-spout-spec)
231231
(defalias shell-bolt-spec thrift/mk-shell-bolt-spec)
232+
(defalias shell-spout-spec thrift/mk-shell-spout-spec)
232233

233234
(defn submit-remote-topology [name conf topology]
234235
(StormSubmitter/submitTopology name conf topology))

src/clj/backtype/storm/thrift.clj

+6-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
(:import [backtype.storm Constants])
88
(:import [backtype.storm.grouping CustomStreamGrouping])
99
(:import [backtype.storm.topology TopologyBuilder])
10-
(:import [backtype.storm.clojure RichShellBolt])
10+
(:import [backtype.storm.clojure RichShellBolt RichShellSpout])
1111
(:import [org.apache.thrift7.protocol TBinaryProtocol TProtocol])
1212
(:import [org.apache.thrift7.transport TTransport TFramedTransport TSocket])
1313
(:use [backtype.storm util config])
@@ -176,6 +176,11 @@
176176
(shell-component-params command script-or-output-spec kwargs)]
177177
(apply mk-bolt-spec inputs (RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
178178

179+
(defn mk-shell-spout-spec [command script-or-output-spec & kwargs]
180+
(let [[command output-spec kwargs]
181+
(shell-component-params command script-or-output-spec kwargs)]
182+
(apply mk-spout-spec (RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
183+
179184
(defn- add-inputs [declarer inputs]
180185
(doseq [[id grouping] (mk-inputs inputs)]
181186
(.grouping declarer id grouping)

src/dev/resources/tester.py

-8
This file was deleted.

src/dev/resources/tester_bolt.py

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import storm
2+
from random import random
3+
4+
class TesterBolt(storm.Bolt):
5+
def initialize(self, conf, context):
6+
storm.emit(['bolt initializing'])
7+
8+
def process(self, tup):
9+
word = tup.values[0];
10+
if (random() < 0.75):
11+
storm.emit([word + 'lalala'], anchors=[tup])
12+
storm.ack(tup)
13+
else:
14+
storm.log(word + ' randomly skipped!')
15+
16+
TesterBolt().run()

src/dev/resources/tester_spout.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from storm import Spout, emit, log
2+
from random import choice
3+
from time import sleep
4+
from uuid import uuid4
5+
6+
words = ["nathan", "mike", "jackson", "golda", "bertels"]
7+
8+
class TesterSpout(Spout):
9+
def initialize(self, conf, context):
10+
emit(['spout initializing'])
11+
self.pending = {}
12+
13+
def nextTuple(self):
14+
sleep(1.0/2)
15+
word = choice(words)
16+
id = str(uuid4())
17+
self.pending[id] = word
18+
emit([word], id=id)
19+
20+
def ack(self, id):
21+
del self.pending[id]
22+
23+
def fail(self, id):
24+
emit([self.pending[id]], id=id)
25+
26+
TesterSpout().run()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package backtype.storm.clojure;
2+
3+
import backtype.storm.generated.StreamInfo;
4+
import backtype.storm.spout.ShellSpout;
5+
import backtype.storm.topology.IRichSpout;
6+
import backtype.storm.topology.OutputFieldsDeclarer;
7+
import backtype.storm.tuple.Fields;
8+
import java.util.Map;
9+
10+
public class RichShellSpout extends ShellSpout implements IRichSpout {
11+
private Map<String, StreamInfo> _outputs;
12+
13+
public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
14+
super(command);
15+
_outputs = outputs;
16+
}
17+
18+
@Override
19+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
20+
for(String stream: _outputs.keySet()) {
21+
StreamInfo def = _outputs.get(stream);
22+
if(def.is_direct()) {
23+
declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
24+
} else {
25+
declarer.declareStream(stream, new Fields(def.get_output_fields()));
26+
}
27+
}
28+
}
29+
30+
@Override
31+
public Map<String, Object> getComponentConfiguration() {
32+
return null;
33+
}
34+
}

src/jvm/backtype/storm/spout/ShellSpout.java

+79-8
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,107 @@
22

33
import backtype.storm.generated.ShellComponent;
44
import backtype.storm.task.TopologyContext;
5+
import backtype.storm.utils.ShellProcess;
6+
import backtype.storm.utils.Utils;
57
import java.util.Map;
8+
import java.util.List;
9+
import java.io.IOException;
10+
import org.apache.log4j.Logger;
11+
import org.json.simple.JSONObject;
612

13+
import static backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING;
714

815
public class ShellSpout implements ISpout {
16+
public static Logger LOG = Logger.getLogger(ShellSpout.class);
17+
18+
private SpoutOutputCollector _collector;
19+
private String[] _command;
20+
private ShellProcess _process;
21+
922
public ShellSpout(ShellComponent component) {
1023
this(component.get_execution_command(), component.get_script());
1124
}
1225

13-
public ShellSpout(String shellCommand, String codeResource) {
14-
26+
public ShellSpout(String... command) {
27+
_command = command;
1528
}
1629

17-
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
18-
throw new UnsupportedOperationException("Not supported yet.");
30+
public void open(Map stormConf, TopologyContext context,
31+
SpoutOutputCollector collector) {
32+
_process = new ShellProcess(_command);
33+
_collector = collector;
34+
35+
try {
36+
String subpid = _process.launch(stormConf, context);
37+
LOG.info("Launched subprocess with pid " + subpid);
38+
} catch (IOException e) {
39+
throw new RuntimeException("Error when launching multilang subprocess", e);
40+
}
1941
}
2042

2143
public void close() {
22-
throw new UnsupportedOperationException("Not supported yet.");
44+
_process.destroy();
2345
}
2446

47+
private JSONObject _next;
2548
public void nextTuple() {
26-
throw new UnsupportedOperationException("Not supported yet.");
49+
if (_next == null) {
50+
_next = new JSONObject();
51+
_next.put("command", "next");
52+
}
53+
54+
querySubprocess(_next);
2755
}
2856

57+
private JSONObject _ack;
2958
public void ack(Object msgId) {
30-
throw new UnsupportedOperationException("Not supported yet.");
59+
if (_ack == null) {
60+
_ack = new JSONObject();
61+
_ack.put("command", "ack");
62+
}
63+
64+
_ack.put("id", msgId);
65+
querySubprocess(_ack);
3166
}
3267

68+
private JSONObject _fail;
3369
public void fail(Object msgId) {
34-
throw new UnsupportedOperationException("Not supported yet.");
70+
if (_fail == null) {
71+
_fail = new JSONObject();
72+
_fail.put("command", "fail");
73+
}
74+
75+
_fail.put("id", msgId);
76+
querySubprocess(_fail);
3577
}
3678

79+
private void querySubprocess(Object query) {
80+
try {
81+
_process.writeObject(query);
82+
83+
while (true) {
84+
Map action = _process.readMap();
85+
if (action == null) return; // sync
86+
String command = (String) action.get("command");
87+
if (command.equals("log")) {
88+
String msg = (String) action.get("msg");
89+
LOG.info("Shell msg: " + msg);
90+
} else if (command.equals("emit")) {
91+
String stream = (String) action.get("stream");
92+
if (stream == null) stream = Utils.DEFAULT_STREAM_ID;
93+
Long task = (Long) action.get("task");
94+
List<Object> tuple = (List) action.get("tuple");
95+
Object messageId = (Object) action.get("id");
96+
if (task == null) {
97+
List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
98+
_process.writeObject(outtasks);
99+
} else {
100+
_collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
101+
}
102+
}
103+
}
104+
} catch (IOException e) {
105+
throw new RuntimeException(e);
106+
}
107+
}
37108
}

src/multilang/py/storm.py

+60-12
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def readStringMsg():
2121
msg = msg + line + "\n"
2222
return msg[0:-1]
2323

24+
MODE = None
2425
ANCHOR_TUPLE = None
2526

2627
#reads lines and reconstructs newlines appropriately
@@ -74,7 +75,22 @@ def sendpid(heartbeatdir):
7475
def sendMsgToParent(amap):
7576
sendToParent(json_encode(amap))
7677

77-
def emittuple(tup, stream=None, anchors = [], directTask=None):
78+
def emit(*args, **kwargs):
79+
__emit(*args, **kwargs)
80+
return readTaskIds()
81+
82+
def emitDirect(task, *args, **kwargs):
83+
kwargs[directTask] = task
84+
__emit(*args, **kwargs)
85+
86+
def __emit(*args, **kwargs):
87+
global MODE
88+
if MODE == Bolt:
89+
emitBolt(*args, **kwargs)
90+
elif MODE == Spout:
91+
emitSpout(*args, **kwargs)
92+
93+
def emitBolt(tup, stream=None, anchors = [], directTask=None):
7894
global ANCHOR_TUPLE
7995
if ANCHOR_TUPLE is not None:
8096
anchors = [ANCHOR_TUPLE]
@@ -87,13 +103,16 @@ def emittuple(tup, stream=None, anchors = [], directTask=None):
87103
m["tuple"] = tup
88104
sendMsgToParent(m)
89105

90-
def emit(tup, stream=None, anchors = []):
91-
emittuple(tup, stream=stream, anchors=anchors)
92-
#read back task ids
93-
return readMsg()
94-
95-
def emitDirect(task, tup, stream=None, anchors = []):
96-
emittuple(tup, stream=stream, anchors=anchors, directTask=task)
106+
def emitSpout(tup, stream=None, id=None, directTask=None):
107+
m = {"command": "emit"}
108+
if id is not None:
109+
m["id"] = id
110+
if stream is not None:
111+
m["stream"] = stream
112+
if directTask is not None:
113+
m["task"] = directTask
114+
m["tuple"] = tup
115+
sendMsgToParent(m)
97116

98117
def ack(tup):
99118
sendMsgToParent({"command": "ack", "id": tup.id})
@@ -110,7 +129,7 @@ def readenv():
110129
context = readMsg()
111130
return [conf, context]
112131

113-
def initbolt():
132+
def initComponent():
114133
heartbeatdir = readStringMsg()
115134
sendpid(heartbeatdir)
116135
return readenv()
@@ -136,7 +155,9 @@ def process(self, tuple):
136155
pass
137156

138157
def run(self):
139-
conf, context = initbolt()
158+
global MODE
159+
MODE = Bolt
160+
conf, context = initComponent()
140161
self.initialize(conf, context)
141162
try:
142163
while True:
@@ -153,8 +174,10 @@ def process(self, tuple):
153174
pass
154175

155176
def run(self):
177+
global MODE
178+
MODE = Bolt
156179
global ANCHOR_TUPLE
157-
conf, context = initbolt()
180+
conf, context = initComponent()
158181
self.initialize(conf, context)
159182
try:
160183
while True:
@@ -166,7 +189,32 @@ def run(self):
166189
log(traceback.format_exc(e))
167190

168191
class Spout:
169-
pass
192+
def initialize(self, conf, context):
193+
pass
194+
195+
def ack(self, id):
196+
pass
170197

198+
def fail(self, id):
199+
pass
171200

201+
def nextTuple(self):
202+
pass
172203

204+
def run(self):
205+
global MODE
206+
MODE = Spout
207+
conf, context = initComponent()
208+
self.initialize(conf, context)
209+
try:
210+
while True:
211+
msg = readCommand()
212+
if msg["command"] == "next":
213+
self.nextTuple()
214+
if msg["command"] == "ack":
215+
self.ack(msg["id"])
216+
if msg["command"] == "fail":
217+
self.fail(msg["id"])
218+
sync()
219+
except Exception, e:
220+
log(traceback.format_exc(e))

test/clj/backtype/storm/integration_test.clj

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@
6464
(with-local-cluster [cluster :supervisors 4]
6565
(let [nimbus (:nimbus cluster)
6666
topology (thrift/mk-topology
67-
{"1" (thrift/mk-spout-spec (TestWordSpout. false))}
68-
{"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} ["python" "tester.py"] ["word"] :parallelism-hint 1)}
67+
{"1" (thrift/mk-shell-spout-spec ["python" "tester_spout.py"] ["word"])}
68+
{"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} ["python" "tester_bolt.py"] ["word"] :parallelism-hint 1)}
6969
)]
7070
(submit-local-topology nimbus
7171
"test"

0 commit comments

Comments
 (0)