|
1 | 1 | (ns backtype.storm.daemon.nimbus
|
2 |
| - (:import [org.apache.thrift.server THsHaServer THsHaServer$Options]) |
| 2 | + (:import [org.apache.thrift.server THsHaServer THsHaServer$Args]) |
3 | 3 | (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
|
4 | 4 | (:import [org.apache.thrift TException])
|
5 | 5 | (:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
|
| 6 | + (:import [java.nio ByteBuffer]) |
| 7 | + (:import [java.nio.channels Channels WritableByteChannel]) |
6 | 8 | (:use [backtype.storm bootstrap])
|
7 | 9 | (:use [backtype.storm.daemon common])
|
8 | 10 | (:gen-class))
|
|
469 | 471 |
|
470 | 472 | (beginFileUpload [this]
|
471 | 473 | (let [fileloc (str inbox "/stormjar-" (uuid) ".jar")]
|
472 |
| - (.put uploaders fileloc (FileOutputStream. fileloc)) |
| 474 | + (.put uploaders fileloc (Channels/newChannel (FileOutputStream. fileloc))) |
473 | 475 | (log-message "Uploading file from client to " fileloc)
|
474 | 476 | fileloc
|
475 | 477 | ))
|
476 | 478 |
|
477 |
| - (^void uploadChunk [this ^String location ^bytes chunk] |
478 |
| - (let [^FileOutputStream os (.get uploaders location)] |
479 |
| - (when-not os |
| 479 | + (^void uploadChunk [this ^String location ^ByteBuffer chunk] |
| 480 | + (let [^WritableByteChannel channel (.get uploaders location)] |
| 481 | + (when-not channel |
480 | 482 | (throw (RuntimeException.
|
481 | 483 | "File for that location does not exist (or timed out)")))
|
482 |
| - (.write os chunk) |
483 |
| - (.put uploaders location os) |
| 484 | + (.write channel chunk) |
| 485 | + (.put uploaders location channel) |
484 | 486 | ))
|
485 | 487 |
|
486 | 488 | (^void finishFileUpload [this ^String location]
|
487 |
| - (let [^FileOutputStream os (.get uploaders location)] |
488 |
| - (when-not os |
| 489 | + (let [^WritableByteChannel channel (.get uploaders location)] |
| 490 | + (when-not channel |
489 | 491 | (throw (RuntimeException.
|
490 | 492 | "File for that location does not exist (or timed out)")))
|
491 |
| - (.close os) |
| 493 | + (.close channel) |
492 | 494 | (log-message "Finished uploading file from client: " location)
|
493 | 495 | (.remove uploaders location)
|
494 | 496 | ))
|
|
500 | 502 | id
|
501 | 503 | ))
|
502 | 504 |
|
503 |
| - (^bytes downloadChunk [this ^String id] |
| 505 | + (^ByteBuffer downloadChunk [this ^String id] |
504 | 506 | (let [^BufferFileInputStream is (.get downloaders id)]
|
505 | 507 | (when-not is
|
506 | 508 | (throw (RuntimeException.
|
|
509 | 511 | (.put downloaders id is)
|
510 | 512 | (when (empty? ret)
|
511 | 513 | (.remove downloaders id))
|
512 |
| - ret |
| 514 | + (ByteBuffer/wrap ret) |
513 | 515 | )))
|
514 | 516 |
|
515 | 517 | (^String getTopologyConf [this ^String id]
|
|
604 | 606 |
|
605 | 607 | (defn launch-server! [conf]
|
606 | 608 | (validate-distributed-mode! conf)
|
607 |
| - (let [options (THsHaServer$Options.) |
608 |
| - _ (set! (. options maxWorkerThreads) 64) |
609 |
| - service-handler (service-handler conf) |
610 |
| - server (THsHaServer. |
611 |
| - (Nimbus$Processor. service-handler) |
612 |
| - (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) |
613 |
| - (TBinaryProtocol$Factory.) options)] |
| 609 | + (let [service-handler (service-handler conf) |
| 610 | + options (THsHaServer$Args. |
| 611 | + (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))) |
| 612 | + _ (set! (. options workerThreads) 64) |
| 613 | + _ (set! (. options processor) (Nimbus$Processor. service-handler)) |
| 614 | + _ (set! (. options protocolFactory) (TBinaryProtocol$Factory.)) |
| 615 | + _ (set! (. options maxWorkerThreads) 64) |
| 616 | + |
| 617 | + server (THsHaServer. options)] |
614 | 618 | (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
|
615 | 619 | (log-message "Starting Nimbus server...")
|
616 | 620 | (.serve server)))
|
|
0 commit comments