Skip to content

Commit 277f6e9

Browse files
author
Nathan Marz
committed
merged trident into main storm repo
1 parent 58ce226 commit 277f6e9

File tree

151 files changed

+8962
-2
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

151 files changed

+8962
-2
lines changed

project.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
[org.apache.httpcomponents/httpclient "4.1.1"]
2727
[storm/tools.cli "0.2.2"]
2828
[com.googlecode.disruptor/disruptor "2.10.1"]
29+
[storm/jgrapht "0.8.3"]
2930
]
3031
:dev-dependencies [
3132
[swank-clojure "1.4.0-SNAPSHOT" :exclusions [org.clojure/clojure]]

src/clj/backtype/storm/LocalDRPC.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
(ns backtype.storm.LocalDRPC
2-
(:use [backtype.storm.daemon drpc])
2+
(:require [backtype.storm.daemon [drpc :as drpc]])
33
(:use [backtype.storm util])
44
(:import [backtype.storm.utils InprocMessaging ServiceRegistry])
55
(:gen-class
@@ -9,7 +9,7 @@
99
:state state ))
1010

1111
(defn -init []
12-
(let [handler (service-handler)
12+
(let [handler (drpc/service-handler)
1313
id (ServiceRegistry/registerService handler)
1414
]
1515
[[] {:service-id id :handler handler}]

src/clj/storm/trident/testing.clj

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
(ns storm.trident.testing
2+
(:import [storm.trident.testing FeederBatchSpout MemoryMapState MemoryMapState$Factory])
3+
(:import [backtype.storm LocalDRPC])
4+
(:import [backtype.storm.tuple Fields])
5+
(:import [backtype.storm.generated KillOptions])
6+
(:require [backtype.storm [testing :as t]])
7+
(:use [backtype.storm util])
8+
)
9+
10+
(defn local-drpc []
11+
(LocalDRPC.))
12+
13+
(defn exec-drpc [^LocalDRPC drpc function-name args]
14+
(let [res (.execute drpc function-name args)]
15+
(from-json res)))
16+
17+
(defn feeder-spout [fields]
18+
(FeederBatchSpout. fields))
19+
20+
(defn feed [feeder tuples]
21+
(.feed feeder tuples))
22+
23+
(defn fields [& fields]
24+
(Fields. fields))
25+
26+
(defn memory-map-state []
27+
(MemoryMapState$Factory.))
28+
29+
(defmacro with-drpc [[drpc] & body]
30+
`(let [~drpc (backtype.storm.LocalDRPC.)]
31+
~@body
32+
(.shutdown ~drpc)
33+
))
34+
35+
(defn with-topology* [cluster topo body-fn]
36+
(t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo))
37+
(body-fn)
38+
(.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))
39+
)
40+
41+
(defmacro with-topology [[cluster topo] & body]
42+
`(with-topology* ~cluster ~topo (fn [] ~@body)))
43+
44+
(defn bootstrap-imports []
45+
(import 'backtype.storm.LocalDRPC)
46+
(import 'storm.trident.TridentTopology)
47+
(import '[storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN])
48+
)

src/jvm/storm/trident/JoinType.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package storm.trident;
2+
3+
import java.util.Arrays;
4+
import java.util.List;
5+
6+
public enum JoinType {
7+
INNER,
8+
OUTER;
9+
10+
public static List<JoinType> mixed(JoinType... types) {
11+
return Arrays.asList(types);
12+
}
13+
}

0 commit comments

Comments
 (0)