Skip to content

Commit c89270b

Browse files
author
Nathan Marz
committed
upgraded to thrift 7
1 parent fbe24e6 commit c89270b

38 files changed

+4308
-4640
lines changed

project.clj

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
[commons-io "1.4"]
1111
[org.apache.commons/commons-exec "1.1"]
1212
[jvyaml "1.0.0"]
13-
[backtype/thriftjava "1.0.0"]
13+
[org.apache.thrift/libthrift "0.7.0"]
1414
[clj-time "0.3.0"]
1515
[log4j/log4j "1.2.16"]
1616
[org.apache.zookeeper/zookeeper "3.3.2"]
@@ -19,6 +19,7 @@
1919
[compojure "0.6.4"]
2020
[hiccup "0.3.6"]
2121
[ring/ring-jetty-adapter "0.3.11"]
22+
[org.slf4j/slf4j-log4j12 "1.5.8"]
2223
]
2324
:uberjar-exclusions [#"META-INF.*"]
2425
:dev-dependencies [

src/clj/backtype/storm/daemon/drpc.clj

+7-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
(ns backtype.storm.daemon.drpc
2-
(:import [org.apache.thrift.server THsHaServer THsHaServer$Options])
2+
(:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
33
(:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
44
(:import [org.apache.thrift TException])
55
(:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
@@ -64,13 +64,12 @@
6464
([spout-adder]
6565
(launch-server! DEFAULT-PORT spout-adder))
6666
([port spout-adder]
67-
(let [options (THsHaServer$Options.)
68-
_ (set! (. options maxWorkerThreads) 64)
69-
service-handler (service-handler spout-adder port)
70-
server (THsHaServer.
71-
(DistributedRPC$Processor. service-handler)
72-
(TNonblockingServerSocket. port)
73-
(TBinaryProtocol$Factory.) options)]
67+
(let [service-handler (service-handler spout-adder port)
68+
options (THsHaServer$Args. (TNonblockingServerSocket. port))
69+
_ (set! (. options maxWorkerThreads) 64)
70+
_ (set! (. options processor) (DistributedRPC$Processor. service-handler))
71+
_ (set! (. options protocolFactory) (TBinaryProtocol$Factory.))
72+
server (THsHaServer. options)]
7473
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
7574
(log-message "Starting Distributed RPC server...")
7675
(.serve server))))

src/clj/backtype/storm/daemon/nimbus.clj

+23-19
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
(ns backtype.storm.daemon.nimbus
2-
(:import [org.apache.thrift.server THsHaServer THsHaServer$Options])
2+
(:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
33
(:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
44
(:import [org.apache.thrift TException])
55
(:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
6+
(:import [java.nio ByteBuffer])
7+
(:import [java.nio.channels Channels WritableByteChannel])
68
(:use [backtype.storm bootstrap])
79
(:use [backtype.storm.daemon common])
810
(:gen-class))
@@ -469,26 +471,26 @@
469471

470472
(beginFileUpload [this]
471473
(let [fileloc (str inbox "/stormjar-" (uuid) ".jar")]
472-
(.put uploaders fileloc (FileOutputStream. fileloc))
474+
(.put uploaders fileloc (Channels/newChannel (FileOutputStream. fileloc)))
473475
(log-message "Uploading file from client to " fileloc)
474476
fileloc
475477
))
476478

477-
(^void uploadChunk [this ^String location ^bytes chunk]
478-
(let [^FileOutputStream os (.get uploaders location)]
479-
(when-not os
479+
(^void uploadChunk [this ^String location ^ByteBuffer chunk]
480+
(let [^WritableByteChannel channel (.get uploaders location)]
481+
(when-not channel
480482
(throw (RuntimeException.
481483
"File for that location does not exist (or timed out)")))
482-
(.write os chunk)
483-
(.put uploaders location os)
484+
(.write channel chunk)
485+
(.put uploaders location channel)
484486
))
485487

486488
(^void finishFileUpload [this ^String location]
487-
(let [^FileOutputStream os (.get uploaders location)]
488-
(when-not os
489+
(let [^WritableByteChannel channel (.get uploaders location)]
490+
(when-not channel
489491
(throw (RuntimeException.
490492
"File for that location does not exist (or timed out)")))
491-
(.close os)
493+
(.close channel)
492494
(log-message "Finished uploading file from client: " location)
493495
(.remove uploaders location)
494496
))
@@ -500,7 +502,7 @@
500502
id
501503
))
502504

503-
(^bytes downloadChunk [this ^String id]
505+
(^ByteBuffer downloadChunk [this ^String id]
504506
(let [^BufferFileInputStream is (.get downloaders id)]
505507
(when-not is
506508
(throw (RuntimeException.
@@ -509,7 +511,7 @@
509511
(.put downloaders id is)
510512
(when (empty? ret)
511513
(.remove downloaders id))
512-
ret
514+
(ByteBuffer/wrap ret)
513515
)))
514516

515517
(^String getTopologyConf [this ^String id]
@@ -604,13 +606,15 @@
604606

605607
(defn launch-server! [conf]
606608
(validate-distributed-mode! conf)
607-
(let [options (THsHaServer$Options.)
608-
_ (set! (. options maxWorkerThreads) 64)
609-
service-handler (service-handler conf)
610-
server (THsHaServer.
611-
(Nimbus$Processor. service-handler)
612-
(TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
613-
(TBinaryProtocol$Factory.) options)]
609+
(let [service-handler (service-handler conf)
610+
options (THsHaServer$Args.
611+
(TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))))
612+
_ (set! (. options workerThreads) 64)
613+
_ (set! (. options processor) (Nimbus$Processor. service-handler))
614+
_ (set! (. options protocolFactory) (TBinaryProtocol$Factory.))
615+
_ (set! (. options maxWorkerThreads) 64)
616+
617+
server (THsHaServer. options)]
614618
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
615619
(log-message "Starting Nimbus server...")
616620
(.serve server)))

src/genthrift.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
rm -rf gen-javabean gen-py py
22
rm -rf jvm/backtype/storm/generated
3-
thrift --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift
3+
thrift7 --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift
44
mv gen-javabean/backtype/storm/generated jvm/backtype/storm/generated
55
mv gen-py py
66
rm -rf gen-javabean

src/jvm/backtype/storm/StormSubmitter.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import backtype.storm.utils.BufferFileInputStream;
88
import backtype.storm.utils.NimbusClient;
99
import backtype.storm.utils.Utils;
10+
import java.nio.ByteBuffer;
1011
import java.util.Map;
1112
import org.apache.log4j.Logger;
1213
import org.apache.thrift.TException;
@@ -82,7 +83,7 @@ public static String submitJar(Map conf, String localJar) {
8283
while(true) {
8384
byte[] toSubmit = is.read();
8485
if(toSubmit.length==0) break;
85-
client.getClient().uploadChunk(uploadLocation, toSubmit);
86+
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
8687
}
8788
client.getClient().finishFileUpload(uploadLocation);
8889
LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);

src/jvm/backtype/storm/generated/AlreadyAliveException.java

+52-44
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)