1
1
package backtype .storm .drpc ;
2
2
3
3
import backtype .storm .Config ;
4
- import backtype .storm .ILocalDRPC ;
5
4
import backtype .storm .generated .DistributedRPC ;
6
5
import backtype .storm .task .OutputCollector ;
7
6
import backtype .storm .task .TopologyContext ;
10
9
import backtype .storm .tuple .Tuple ;
11
10
import backtype .storm .utils .DRPCClient ;
12
11
import backtype .storm .utils .ServiceRegistry ;
12
+ import java .util .ArrayList ;
13
+ import java .util .HashMap ;
14
+ import java .util .List ;
13
15
import java .util .Map ;
14
16
import org .apache .thrift7 .TException ;
15
17
import org .json .simple .JSONValue ;
@@ -20,6 +22,8 @@ public class ReturnResults implements IRichBolt {
20
22
OutputCollector _collector ;
21
23
boolean local ;
22
24
25
+ Map <List , DRPCClient > _clients = new HashMap <List , DRPCClient >();
26
+
23
27
public void prepare (Map stormConf , TopologyContext context , OutputCollector collector ) {
24
28
_collector = collector ;
25
29
local = stormConf .get (Config .STORM_CLUSTER_MODE ).equals ("local" );
@@ -30,21 +34,26 @@ public void execute(Tuple input) {
30
34
String returnInfo = (String ) input .getValue (1 );
31
35
if (returnInfo !=null ) {
32
36
Map retMap = (Map ) JSONValue .parse (returnInfo );
33
- String host = (String ) retMap .get ("host" );
34
- int port = (int ) ((Long ) retMap .get ("port" )).longValue ();
37
+ final String host = (String ) retMap .get ("host" );
38
+ final int port = (int ) ((Long ) retMap .get ("port" )).longValue ();
35
39
String id = (String ) retMap .get ("id" );
36
40
DistributedRPC .Iface client ;
37
41
if (local ) {
38
42
client = (DistributedRPC .Iface ) ServiceRegistry .getService (host );
39
43
} else {
40
- client = new DRPCClient (host , port );
44
+ List server = new ArrayList () {{
45
+ add (host );
46
+ add (port );
47
+ }};
48
+
49
+ if (!_clients .containsKey (server )) {
50
+ _clients .put (server , new DRPCClient (host , port ));
51
+ }
52
+ client = _clients .get (server );
41
53
}
42
54
43
55
try {
44
56
client .result (id , result );
45
- if (!local ) {
46
- ((DRPCClient )client ).close ();
47
- }
48
57
_collector .ack (input );
49
58
} catch (TException e ) {
50
59
_collector .fail (input );
@@ -53,7 +62,9 @@ public void execute(Tuple input) {
53
62
}
54
63
55
64
public void cleanup () {
56
-
65
+ for (DRPCClient c : _clients .values ()) {
66
+ c .close ();
67
+ }
57
68
}
58
69
59
70
public void declareOutputFields (OutputFieldsDeclarer declarer ) {
0 commit comments