Skip to content

Commit 576577b

Browse files
author
Nathan Marz
committed
finished simplified drpc...
1 parent acc414c commit 576577b

File tree

4 files changed

+12
-8
lines changed

4 files changed

+12
-8
lines changed

src/clj/backtype/storm/LocalDRPC.clj

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131
(.failRequest (:handler (. this state)) id)
3232
)
3333

34+
(defn -getServiceId [this]
35+
(:service-id (. this state)))
36+
3437
(defn -shutdown [this]
35-
(ServiceRegistry/unregisterService {:service-id (. this state)})
36-
(.shutdown {:handler (. this state)})
38+
(ServiceRegistry/unregisterService (:service-id (. this state)))
39+
(.shutdown (:handler (. this state)))
3740
)

src/clj/backtype/storm/daemon/drpc.clj

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue])
88
(:import [backtype.storm.daemon Shutdownable])
99
(:import [java.net InetAddress])
10-
(:use [backtype.storm bootstrap config])
10+
(:use [backtype.storm bootstrap config log])
1111
(:gen-class))
1212

1313
(bootstrap)
@@ -19,8 +19,9 @@
1919
(defn acquire-queue [queues-atom function]
2020
(swap! queues-atom
2121
(fn [amap]
22-
(when-not (amap function))
22+
(if-not (amap function)
2323
(assoc amap function (ConcurrentLinkedQueue.))
24+
amap)
2425
))
2526
(@queues-atom function))
2627

@@ -54,6 +55,7 @@
5455
req (DRPCRequest. args id)
5556
^ConcurrentLinkedQueue queue (acquire-queue request-queues function)
5657
]
58+
(log-message "Received DRPC request for " function " " args)
5759
(swap! id->start assoc id (current-time-secs))
5860
(swap! id->sem assoc id sem)
5961
(.add queue req)

src/jvm/backtype/storm/drpc/DRPCSpout.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class DRPCSpout implements IRichSpout {
2727
public static Logger LOG = Logger.getLogger(DRPCSpout.class);
2828

2929
SpoutOutputCollector _collector;
30-
List<DRPCClient> _clients;
30+
List<DRPCClient> _clients = new ArrayList<DRPCClient>();
3131
String _function;
3232
String _local_drpc_id = null;
3333

@@ -69,7 +69,6 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
6969
if(servers.isEmpty()) {
7070
throw new RuntimeException("No DRPC servers configured for topology");
7171
}
72-
_clients = new ArrayList<DRPCClient>();
7372
if(numTasks < servers.size()) {
7473
for(String s: servers) {
7574
_clients.add(new DRPCClient(s, port));
@@ -111,7 +110,7 @@ public void nextTuple() {
111110
}
112111
}
113112
} else {
114-
ILocalDRPC drpc = (ILocalDRPC) ServiceRegistry.getService(_local_drpc_id);
113+
DistributedRPC.Iface drpc = (DistributedRPC.Iface) ServiceRegistry.getService(_local_drpc_id);
115114
try {
116115
DRPCRequest req = drpc.fetchRequest(_function);
117116
if(req.get_request_id().length() > 0) {

src/jvm/backtype/storm/drpc/ReturnResults.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void execute(Tuple input) {
3535
String id = (String) retMap.get("id");
3636
DistributedRPC.Iface client;
3737
if(local) {
38-
client = (ILocalDRPC) ServiceRegistry.getService(host);
38+
client = (DistributedRPC.Iface) ServiceRegistry.getService(host);
3939
} else {
4040
client = new DRPCClient(host, port);
4141
}

0 commit comments

Comments
 (0)