Skip to content

Commit 43a8df1

Browse files
author
Nathan Marz
committed
added cleanup method to task hook, added getThisWorkerPort to TopologyContext
1 parent 2de2a8e commit 43a8df1

File tree

7 files changed

+21
-4
lines changed

7 files changed

+21
-4
lines changed

src/clj/backtype/storm/daemon/task.clj

+2
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@
239239
(doseq [t all-threads]
240240
(.interrupt t)
241241
(.join t))
242+
(doseq [hook (.getHooks user-context)]
243+
(.cleanup hook))
242244
(.remove-task-heartbeat! storm-cluster-state storm-id task-id)
243245
(.disconnect storm-cluster-state)
244246
(.close puller)

src/clj/backtype/storm/daemon/worker.clj

+4-2
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,17 @@
9494
(supervisor-storm-resources-path
9595
(supervisor-stormdist-root conf storm-id))
9696
(worker-pids-root conf worker-id)
97-
%)
97+
%
98+
port)
9899
mk-user-context #(TopologyContext. topology
99100
storm-conf
100101
task->component
101102
storm-id
102103
(supervisor-storm-resources-path
103104
(supervisor-stormdist-root conf storm-id))
104105
(worker-pids-root conf worker-id)
105-
%)
106+
%
107+
port)
106108
mq-context (if mq-context
107109
mq-context
108110
(msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS)

src/clj/backtype/storm/testing.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,6 @@
542542
spout-spec (mk-spout-spec* (TestWordSpout.)
543543
{stream fields})
544544
topology (StormTopology. {component spout-spec} {} {})
545-
context (TopologyContext. topology (read-storm-config) {1 component} "test-storm-id" nil nil 1)]
545+
context (TopologyContext. topology (read-storm-config) {1 component} "test-storm-id" nil nil 1 nil)]
546546
(Tuple. context values 1 stream)
547547
))

src/jvm/backtype/storm/hooks/BaseTaskHook.java

+4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ public class BaseTaskHook implements ITaskHook {
1313
public void prepare(Map conf, TopologyContext context) {
1414
}
1515

16+
@Override
17+
public void cleanup() {
18+
}
19+
1620
@Override
1721
public void emit(EmitInfo info) {
1822
}

src/jvm/backtype/storm/hooks/ITaskHook.java

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
public interface ITaskHook {
1212
void prepare(Map conf, TopologyContext context);
13+
void cleanup();
1314
void emit(EmitInfo info);
1415
void spoutAck(SpoutAckInfo info);
1516
void spoutFail(SpoutFailInfo info);

src/jvm/backtype/storm/task/TopologyContext.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ public class TopologyContext {
4040
private Object _taskData = null;
4141
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
4242
private Map _stormConf;
43+
private Integer _workerPort;
4344

44-
public TopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, String stormId, String codeDir, String pidDir, Integer taskId) {
45+
public TopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort) {
4546
_topology = topology;
4647
_stormConf = stormConf;
48+
_workerPort = workerPort;
4749
_taskToComponent = taskToComponent;
4850
_stormId = stormId;
4951
_taskId = taskId;
@@ -349,6 +351,10 @@ public int maxTopologyMessageTimeout() {
349351
return max;
350352
}
351353

354+
public Integer getThisWorkerPort() {
355+
return _workerPort;
356+
}
357+
352358
public void addTaskHook(ITaskHook hook) {
353359
hook.prepare(_stormConf, this);
354360
_hooks.add(hook);

test/clj/backtype/storm/integration_test.clj

+2
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,8 @@
542542
(reify backtype.storm.hooks.ITaskHook
543543
(prepare [this conf context]
544544
)
545+
(cleanup [this]
546+
)
545547
(emit [this info]
546548
(swap! emitted inc))
547549
(boltAck [this info]

0 commit comments

Comments
 (0)