Skip to content

Commit f39c0bd

Browse files
author
Nathan Marz
committed
cache drpc clients to eliminate overhead
1 parent a2a004a commit f39c0bd

File tree

1 file changed

+19
-8
lines changed

1 file changed

+19
-8
lines changed

src/jvm/backtype/storm/drpc/ReturnResults.java

+19-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package backtype.storm.drpc;
22

33
import backtype.storm.Config;
4-
import backtype.storm.ILocalDRPC;
54
import backtype.storm.generated.DistributedRPC;
65
import backtype.storm.task.OutputCollector;
76
import backtype.storm.task.TopologyContext;
@@ -10,6 +9,9 @@
109
import backtype.storm.tuple.Tuple;
1110
import backtype.storm.utils.DRPCClient;
1211
import backtype.storm.utils.ServiceRegistry;
12+
import java.util.ArrayList;
13+
import java.util.HashMap;
14+
import java.util.List;
1315
import java.util.Map;
1416
import org.apache.thrift7.TException;
1517
import org.json.simple.JSONValue;
@@ -20,6 +22,8 @@ public class ReturnResults implements IRichBolt {
2022
OutputCollector _collector;
2123
boolean local;
2224

25+
Map<List, DRPCClient> _clients = new HashMap<List, DRPCClient>();
26+
2327
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
2428
_collector = collector;
2529
local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local");
@@ -30,21 +34,26 @@ public void execute(Tuple input) {
3034
String returnInfo = (String) input.getValue(1);
3135
if(returnInfo!=null) {
3236
Map retMap = (Map) JSONValue.parse(returnInfo);
33-
String host = (String) retMap.get("host");
34-
int port = (int) ((Long) retMap.get("port")).longValue();
37+
final String host = (String) retMap.get("host");
38+
final int port = (int) ((Long) retMap.get("port")).longValue();
3539
String id = (String) retMap.get("id");
3640
DistributedRPC.Iface client;
3741
if(local) {
3842
client = (DistributedRPC.Iface) ServiceRegistry.getService(host);
3943
} else {
40-
client = new DRPCClient(host, port);
44+
List server = new ArrayList() {{
45+
add(host);
46+
add(port);
47+
}};
48+
49+
if(!_clients.containsKey(server)) {
50+
_clients.put(server, new DRPCClient(host, port));
51+
}
52+
client = _clients.get(server);
4153
}
4254

4355
try {
4456
client.result(id, result);
45-
if(!local) {
46-
((DRPCClient)client).close();
47-
}
4857
_collector.ack(input);
4958
} catch(TException e) {
5059
_collector.fail(input);
@@ -53,7 +62,9 @@ public void execute(Tuple input) {
5362
}
5463

5564
public void cleanup() {
56-
65+
for(DRPCClient c: _clients.values()) {
66+
c.close();
67+
}
5768
}
5869

5970
public void declareOutputFields(OutputFieldsDeclarer declarer) {

0 commit comments

Comments
 (0)