Skip to content

Commit 7ffb9cc

Browse files
author
Nathan Marz
committed
implemented LocalDRPC and FeederSpoutAdder
1 parent 0cd5707 commit 7ffb9cc

File tree

5 files changed

+71
-19
lines changed

5 files changed

+71
-19
lines changed

src/clj/backtype/storm/LocalDRPC.clj

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
(ns backtype.storm.LocalDRPC
2+
(:use [backtype.storm.daemon drpc])
3+
(:use [backtype.storm util])
4+
(:import [backtype.storm.utils InprocMessaging])
5+
(:gen-class
6+
:init init
7+
:implements [backtype.storm.generated.DistributedRPC$Iface backtype.storm.daemon.Shutdownable]
8+
:constructors {[backtype.storm.drpc.SpoutAdder] []}
9+
:state state ))
10+
11+
(defn -init [adder]
12+
(let [port (InprocMessaging/acquireNewPort)
13+
handler (service-handler adder port)
14+
port-thread (async-loop (fn []
15+
(let [[id result] (InprocMessaging/takeMessage port)]
16+
(.result handler id result))
17+
0 )
18+
:daemon true)
19+
]
20+
[[] {:thread port-thread :handler handler}]
21+
))
22+
23+
(defn -execute [this func funcArgs]
24+
(.execute (:handler (. this state)) func funcArgs)
25+
)
26+
27+
(defn -result [this id result]
28+
(.result (:handler (. this state)) id result)
29+
)
30+
31+
(defn -shutdown [this]
32+
(.interrupt {:thread (. this state)})
33+
)

src/jvm/backtype/storm/drpc/DRPCScheme.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22

33
import backtype.storm.spout.Scheme;
44
import backtype.storm.tuple.Fields;
5+
import backtype.storm.tuple.Values;
56
import java.io.UnsupportedEncodingException;
67
import java.util.List;
78
import java.util.Map;
89
import org.json.simple.JSONValue;
9-
import static backtype.storm.utils.Utils.tuple;
1010

1111
public class DRPCScheme implements Scheme {
1212
public List<Object> deserialize(byte[] bytes) {
1313
try {
1414
Map obj = (Map) JSONValue.parse(new String(bytes, "UTF-8"));
15-
return tuple(obj.get("args"), obj.get("return"));
15+
return new Values(obj.get("args"), obj.get("return"));
1616
} catch (UnsupportedEncodingException e) {
1717
throw new RuntimeException(e);
1818
}

src/jvm/backtype/storm/drpc/ReturnResults.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package backtype.storm.drpc;
22

3+
import backtype.storm.Config;
34
import backtype.storm.task.OutputCollector;
45
import backtype.storm.task.TopologyContext;
56
import backtype.storm.topology.IRichBolt;
67
import backtype.storm.topology.OutputFieldsDeclarer;
78
import backtype.storm.tuple.Tuple;
89
import backtype.storm.utils.DRPCClient;
10+
import backtype.storm.utils.InprocMessaging;
911
import java.util.Map;
1012
import org.apache.thrift7.TException;
1113
import org.json.simple.JSONValue;
@@ -14,9 +16,11 @@
1416
public class ReturnResults implements IRichBolt {
1517

1618
OutputCollector _collector;
19+
boolean local;
1720

1821
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
1922
_collector = collector;
23+
local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local");
2024
}
2125

2226
public void execute(Tuple input) {
@@ -25,15 +29,19 @@ public void execute(Tuple input) {
2529
if(returnInfo!=null) {
2630
Map retMap = (Map) JSONValue.parse(returnInfo);
2731
String ip = (String) retMap.get("ip");
28-
Long port = (Long) retMap.get("port");
32+
int port = (int) ((Long) retMap.get("port")).longValue();
2933
String id = (String) retMap.get("id");
30-
try {
31-
DRPCClient client = new DRPCClient(ip, (int) port.longValue());
32-
client.result(id, result);
33-
client.close();
34-
_collector.ack(input);
35-
} catch(TException e) {
36-
_collector.fail(input);
34+
if(local) {
35+
InprocMessaging.sendMessage(port, new Object[] {id, result});
36+
} else {
37+
try {
38+
DRPCClient client = new DRPCClient(ip, port);
39+
client.result(id, result);
40+
client.close();
41+
_collector.ack(input);
42+
} catch(TException e) {
43+
_collector.fail(input);
44+
}
3745
}
3846
}
3947
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package backtype.storm.testing;
2+
3+
import backtype.storm.drpc.SpoutAdder;
4+
import backtype.storm.tuple.Values;
5+
6+
public class FeederSpoutAdder implements SpoutAdder {
7+
private FeederSpout _spout;
8+
9+
public FeederSpoutAdder(FeederSpout spout) {
10+
_spout = spout;
11+
}
12+
13+
@Override
14+
public void add(String function, String args, String returnInfo) {
15+
_spout.feed(new Values(args, returnInfo));
16+
}
17+
}

src/jvm/backtype/storm/utils/InprocMessaging.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,9 @@ public static void sendMessage(int port, Object msg) {
2525
}
2626
}
2727

28-
public static Object takeMessage(int port) {
28+
public static Object takeMessage(int port) throws InterruptedException {
2929
LinkedBlockingQueue<Object> queue = getQueue(port);
30-
Object ret;
31-
try {
32-
ret = queue.take();
33-
} catch (InterruptedException e) {
34-
throw new RuntimeException(e);
35-
}
30+
Object ret = queue.take();
3631
synchronized(_lock) {
3732
if(queue.size()==0) {
3833
_queues.remove(port);
@@ -43,8 +38,7 @@ public static Object takeMessage(int port) {
4338

4439
public static Object pollMessage(int port) {
4540
LinkedBlockingQueue<Object> queue = getQueue(port);
46-
Object ret;
47-
ret = queue.poll();
41+
Object ret = queue.poll();
4842
synchronized(_lock) {
4943
if(queue.size()==0) {
5044
_queues.remove(port);

0 commit comments

Comments
 (0)