1
1
package backtype .storm .drpc ;
2
2
3
+ import backtype .storm .Config ;
3
4
import backtype .storm .task .OutputCollector ;
4
5
import backtype .storm .task .TopologyContext ;
5
6
import backtype .storm .topology .IRichBolt ;
6
7
import backtype .storm .topology .OutputFieldsDeclarer ;
7
8
import backtype .storm .tuple .Tuple ;
8
9
import backtype .storm .utils .DRPCClient ;
10
+ import backtype .storm .utils .InprocMessaging ;
9
11
import java .util .Map ;
10
12
import org .apache .thrift7 .TException ;
11
13
import org .json .simple .JSONValue ;
14
16
public class ReturnResults implements IRichBolt {
15
17
16
18
OutputCollector _collector ;
19
+ boolean local ;
17
20
18
21
public void prepare (Map stormConf , TopologyContext context , OutputCollector collector ) {
19
22
_collector = collector ;
23
+ local = stormConf .get (Config .STORM_CLUSTER_MODE ).equals ("local" );
20
24
}
21
25
22
26
public void execute (Tuple input ) {
@@ -25,15 +29,19 @@ public void execute(Tuple input) {
25
29
if (returnInfo !=null ) {
26
30
Map retMap = (Map ) JSONValue .parse (returnInfo );
27
31
String ip = (String ) retMap .get ("ip" );
28
- Long port = (Long ) retMap .get ("port" );
32
+ int port = (int ) (( Long ) retMap .get ("port" )). longValue ( );
29
33
String id = (String ) retMap .get ("id" );
30
- try {
31
- DRPCClient client = new DRPCClient (ip , (int ) port .longValue ());
32
- client .result (id , result );
33
- client .close ();
34
- _collector .ack (input );
35
- } catch (TException e ) {
36
- _collector .fail (input );
34
+ if (local ) {
35
+ InprocMessaging .sendMessage (port , new Object [] {id , result });
36
+ } else {
37
+ try {
38
+ DRPCClient client = new DRPCClient (ip , port );
39
+ client .result (id , result );
40
+ client .close ();
41
+ _collector .ack (input );
42
+ } catch (TException e ) {
43
+ _collector .fail (input );
44
+ }
37
45
}
38
46
}
39
47
}
0 commit comments