Skip to content

Commit 51e784c

Browse files
author
Nathan Marz
committed
added fallback to java serialization feature
1 parent eb3601f commit 51e784c

File tree

8 files changed

+137
-11
lines changed

8 files changed

+137
-11
lines changed

conf/defaults.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,5 @@ topology.max.task.parallelism: null
6464
topology.max.spout.pending: null
6565
topology.state.synchronization.timeout.secs: 60
6666
topology.stats.sample.rate: 0.05
67+
topology.fall.back.on.java.serialization: true
6768

src/jvm/backtype/storm/Config.java

+9
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,11 @@ public class Config extends HashMap<String, Object> {
311311
*/
312312
public static String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
313313

314+
/**
315+
* Whether or not to use Java serialization in a topology.
316+
*/
317+
public static String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization";
318+
314319
/**
315320
* The number of threads that should be used by the zeromq context in each worker process.
316321
*/
@@ -381,5 +386,9 @@ public void setMaxSpoutPending(int max) {
381386
public void setStatsSampleRate(double rate) {
382387
put(Config.TOPOLOGY_STATS_SAMPLE_RATE, rate);
383388
}
389+
390+
public void setFallBackOnJavaSerialization(boolean fallback) {
391+
put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, fallback);
392+
}
384393

385394
}

src/jvm/backtype/storm/serialization/FieldSerialization.java

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ public int getToken() {
1818
return _token;
1919
}
2020

21+
public ISerialization getSerialization() {
22+
return _serialization;
23+
}
24+
2125
public void serialize(Object obj, DataOutputStream out) throws IOException {
2226
_serialization.serialize(obj, out);
2327
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* To change this template, choose Tools | Templates
3+
* and open the template in the editor.
4+
*/
5+
package backtype.storm.serialization;
6+
7+
import java.io.DataInputStream;
8+
import java.io.DataOutputStream;
9+
import java.io.IOException;
10+
import java.io.ObjectInputStream;
11+
import java.io.ObjectOutputStream;
12+
import java.io.Serializable;
13+
14+
public class JavaSerialization implements ISerialization<Serializable> {
15+
public boolean accept(Class c) {
16+
return Serializable.class.isAssignableFrom(c);
17+
}
18+
19+
public void serialize(Serializable object, DataOutputStream stream) throws IOException {
20+
new ObjectOutputStream(stream).writeObject(object);
21+
}
22+
23+
public Serializable deserialize(DataInputStream stream) throws IOException {
24+
try {
25+
return (Serializable) new ObjectInputStream(stream).readObject();
26+
} catch (ClassNotFoundException e) {
27+
throw new IOException(e);
28+
}
29+
}
30+
}

src/jvm/backtype/storm/serialization/SerializationFactory.java

+24-11
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ public class SerializationFactory {
1313
public static final int SERIALIZATION_TOKEN_BOUNDARY = 32;
1414
public static Logger LOG = Logger.getLogger(SerializationFactory.class);
1515
private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
16+
public static final int JAVA_SERIALIZATION_TOKEN = 1;
1617

18+
private JavaSerialization _javaSerializer = new JavaSerialization();
19+
private boolean _useJavaSerialization;
1720
private Map<Integer, ISerialization> _serializations = new HashMap<Integer, ISerialization>() {{
18-
put(1, new ISerialization<Integer>() {
21+
put(2, new ISerialization<Integer>() {
1922
public boolean accept(Class c) {
2023
return Integer.class.equals(c);
2124
}
@@ -30,7 +33,7 @@ public Integer deserialize(DataInputStream stream) throws IOException {
3033
return stream.readInt();
3134
}
3235
});
33-
put(2, new ISerialization<Long>() {
36+
put(3, new ISerialization<Long>() {
3437
public boolean accept(Class c) {
3538
return Long.class.equals(c);
3639
}
@@ -45,7 +48,7 @@ public Long deserialize(DataInputStream stream) throws IOException {
4548
return stream.readLong();
4649
}
4750
});
48-
put(3, new ISerialization<Float>() {
51+
put(4, new ISerialization<Float>() {
4952
public boolean accept(Class c) {
5053
return Float.class.equals(c);
5154
}
@@ -58,7 +61,7 @@ public Float deserialize(DataInputStream stream) throws IOException {
5861
return stream.readFloat();
5962
}
6063
});
61-
put(4, new ISerialization<Double>() {
64+
put(5, new ISerialization<Double>() {
6265
public boolean accept(Class c) {
6366
return Double.class.equals(c);
6467
}
@@ -71,7 +74,7 @@ public Double deserialize(DataInputStream stream) throws IOException {
7174
return stream.readDouble();
7275
}
7376
});
74-
put(5, new ISerialization<Byte>() {
77+
put(6, new ISerialization<Byte>() {
7578
public boolean accept(Class c) {
7679
return Byte.class.equals(c);
7780
}
@@ -84,7 +87,7 @@ public Byte deserialize(DataInputStream stream) throws IOException {
8487
return stream.readByte();
8588
}
8689
});
87-
put(6, new ISerialization<Short>() {
90+
put(7, new ISerialization<Short>() {
8891
public boolean accept(Class c) {
8992
return Short.class.equals(c);
9093
}
@@ -97,7 +100,7 @@ public Short deserialize(DataInputStream stream) throws IOException {
97100
return stream.readShort();
98101
}
99102
});
100-
put(7, new ISerialization<String>() {
103+
put(8, new ISerialization<String>() {
101104
public boolean accept(Class c) {
102105
return String.class.equals(c);
103106
}
@@ -110,7 +113,7 @@ public String deserialize(DataInputStream stream) throws IOException {
110113
return stream.readUTF();
111114
}
112115
});
113-
put(8, new ISerialization<Boolean>() {
116+
put(9, new ISerialization<Boolean>() {
114117
public boolean accept(Class c) {
115118
return Boolean.class.equals(c);
116119
}
@@ -123,8 +126,7 @@ public Boolean deserialize(DataInputStream stream) throws IOException {
123126
return stream.readBoolean();
124127
}
125128
});
126-
put(9, new ISerialization<byte[]>() {
127-
129+
put(10, new ISerialization<byte[]>() {
128130
public boolean accept(Class c) {
129131
return EMPTY_BYTE_ARRAY.getClass().equals(c);
130132
}
@@ -146,6 +148,7 @@ public byte[] deserialize(DataInputStream stream) throws IOException {
146148
}};
147149

148150
public SerializationFactory(Map conf) {
151+
_useJavaSerialization = (Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION);
149152
boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_SERIALIZATIONS);
150153
Map<Object, String> customSerializations = (Map<Object, String>) conf.get(Config.TOPOLOGY_SERIALIZATIONS);
151154
if(customSerializations==null) customSerializations = new HashMap<Object, String>();
@@ -171,6 +174,9 @@ public SerializationFactory(Map conf) {
171174
throw new RuntimeException(e);
172175
}
173176
}
177+
if(_serializations.containsKey(JAVA_SERIALIZATION_TOKEN)) {
178+
throw new RuntimeException("Should not have a serialization with the java serialization token");
179+
}
174180
}
175181

176182
private int toToken(Object tokenObj) {
@@ -191,7 +197,11 @@ private int toToken(Object tokenObj) {
191197
public FieldSerialization getSerializationForToken(int token) {
192198
ISerialization ser = _serializations.get(token);
193199
if(ser==null) {
194-
throw new RuntimeException("Could not find serialization for token " + token);
200+
if(token==JAVA_SERIALIZATION_TOKEN) {
201+
ser = _javaSerializer;
202+
} else {
203+
throw new RuntimeException("Could not find serialization for token " + token);
204+
}
195205
}
196206
return new FieldSerialization(token, ser);
197207
}
@@ -203,6 +213,9 @@ public FieldSerialization getSerializationForClass(Class klass) {
203213
return getSerializationForToken(token);
204214
}
205215
}
216+
if(_useJavaSerialization && _javaSerializer.accept(klass)) {
217+
return new FieldSerialization(JAVA_SERIALIZATION_TOKEN, _javaSerializer);
218+
}
206219
throw new RuntimeException("Could not find serialization for class " + klass.toString());
207220
}
208221
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package backtype.storm.testing;
2+
3+
import java.io.Serializable;
4+
5+
public class TestSerObject implements Serializable {
6+
public int f1;
7+
public int f2;
8+
9+
public TestSerObject(int f1, int f2) {
10+
this.f1 = f1;
11+
this.f2 = f2;
12+
}
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
2+
package backtype.storm.testing;
3+
4+
import backtype.storm.serialization.ISerialization;
5+
import java.io.DataInputStream;
6+
import java.io.DataOutputStream;
7+
import java.io.IOException;
8+
9+
public class TestSerObjectSerialization implements ISerialization<TestSerObject> {
10+
11+
@Override
12+
public boolean accept(Class c) {
13+
return c.equals(TestSerObject.class);
14+
}
15+
16+
@Override
17+
public void serialize(TestSerObject object, DataOutputStream stream) throws IOException {
18+
stream.writeInt(object.f1);
19+
stream.writeInt(object.f2);
20+
}
21+
22+
@Override
23+
public TestSerObject deserialize(DataInputStream stream) throws IOException {
24+
int f1 = stream.readInt();
25+
int f2 = stream.readInt();
26+
return new TestSerObject(f1, f2);
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
(ns backtype.storm.serialization-test
2+
(:use [clojure test])
3+
(:import [backtype.storm.serialization SerializationFactory JavaSerialization])
4+
(:import [backtype.storm.testing TestSerObject TestSerObjectSerialization])
5+
(:use [backtype.storm util config])
6+
)
7+
8+
9+
(defn mk-conf [extra]
10+
(merge (read-default-config) extra))
11+
12+
(deftest test-java-serialization
13+
(letlocals
14+
(bind ser1 (SerializationFactory. (mk-conf {TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION true})))
15+
(bind ser2 (SerializationFactory. (mk-conf {TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION true
16+
TOPOLOGY-SERIALIZATIONS {33 (.getName TestSerObjectSerialization)}})))
17+
(bind ser3 (SerializationFactory. (mk-conf {TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false})))
18+
19+
(bind fser (.getSerializationForClass ser1 TestSerObject))
20+
(is (= SerializationFactory/JAVA_SERIALIZATION_TOKEN (.getToken fser)))
21+
(is (= JavaSerialization (class (.getSerialization fser))))
22+
(is (= 33 (-> ser2 (.getSerializationForClass TestSerObject) .getToken)))
23+
(is (thrown? Exception (.getSerializationForClass ser3 TestSerObject)))
24+
25+
(is (not-nil? (.getSerializationForToken ser1 SerializationFactory/JAVA_SERIALIZATION_TOKEN)))
26+
27+
28+
))

0 commit comments

Comments
 (0)