Skip to content

Commit fc05f1e

Browse files
author
Nathan Marz
committed
minor cleanup
1 parent e7b00ef commit fc05f1e

File tree

3 files changed

+13
-14
lines changed

3 files changed

+13
-14
lines changed

src/dev/resources/tester_spout.py

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def ack(self, id):
2121
del self.pending[id]
2222

2323
def fail(self, id):
24+
log("emitting " + self.pending[id] + " on fail")
2425
emit([self.pending[id]], id=id)
2526

2627
TesterSpout().run()

src/jvm/backtype/storm/task/ShellBolt.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public class ShellBolt implements IBolt {
5353
private ShellProcess _process;
5454
private volatile boolean _running = true;
5555
private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
56+
57+
private Thread _readerThread;
58+
private Thread _writerThread;
5659

5760
public ShellBolt(ShellComponent component) {
5861
this(component.get_execution_command(), component.get_script());
@@ -76,7 +79,7 @@ public void prepare(Map stormConf, TopologyContext context,
7679
}
7780

7881
// reader
79-
new Thread(new Runnable() {
82+
_readerThread = new Thread(new Runnable() {
8083
public void run() {
8184
while (_running) {
8285
try {
@@ -99,14 +102,14 @@ public void run() {
99102
} catch (IOException e) {
100103
throw new RuntimeException(e);
101104
} catch (InterruptedException e) {
102-
// ignore
103105
}
104106
}
105107
}
106-
}).start();
108+
});
109+
110+
_readerThread.start();
107111

108-
// writer
109-
new Thread(new Runnable() {
112+
_writerThread = new Thread(new Runnable() {
110113
public void run() {
111114
while (_running) {
112115
try {
@@ -117,11 +120,12 @@ public void run() {
117120
} catch (IOException e) {
118121
throw new RuntimeException(e);
119122
} catch (InterruptedException e) {
120-
// ignore
121123
}
122124
}
123125
}
124-
}).start();
126+
});
127+
128+
_writerThread.start();
125129
}
126130

127131
public void execute(Tuple input) {
@@ -142,9 +146,9 @@ public void execute(Tuple input) {
142146
}
143147

144148
public void cleanup() {
149+
_running = false;
145150
_process.destroy();
146151
_inputs.clear();
147-
_running = false;
148152
}
149153

150154
private void handleAck(Map action) {

src/jvm/backtype/storm/utils/ShellProcess.java

-6
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,6 @@ public String launch(Map conf, TopologyContext context) throws IOException {
2929
processIn = new DataOutputStream(_subprocess.getOutputStream());
3030
processOut = new BufferedReader(new InputStreamReader(_subprocess.getInputStream()));
3131

32-
// this doesn't seem to work when the jvm dies due to at least
33-
// some kinds of errors... does it help at all?
34-
Runtime.getRuntime().addShutdownHook(new Thread() {
35-
public void run() { _subprocess.destroy(); }
36-
});
37-
3832
writeString(context.getPIDDir());
3933
String pid = readString();
4034
writeObject(conf);

0 commit comments

Comments
 (0)