@@ -13,9 +13,12 @@ public class SerializationFactory {
13
13
public static final int SERIALIZATION_TOKEN_BOUNDARY = 32 ;
14
14
public static Logger LOG = Logger .getLogger (SerializationFactory .class );
15
15
private static byte [] EMPTY_BYTE_ARRAY = new byte [0 ];
16
+ public static final int JAVA_SERIALIZATION_TOKEN = 1 ;
16
17
18
+ private JavaSerialization _javaSerializer = new JavaSerialization ();
19
+ private boolean _useJavaSerialization ;
17
20
private Map <Integer , ISerialization > _serializations = new HashMap <Integer , ISerialization >() {{
18
- put (1 , new ISerialization <Integer >() {
21
+ put (2 , new ISerialization <Integer >() {
19
22
public boolean accept (Class c ) {
20
23
return Integer .class .equals (c );
21
24
}
@@ -30,7 +33,7 @@ public Integer deserialize(DataInputStream stream) throws IOException {
30
33
return stream .readInt ();
31
34
}
32
35
});
33
- put (2 , new ISerialization <Long >() {
36
+ put (3 , new ISerialization <Long >() {
34
37
public boolean accept (Class c ) {
35
38
return Long .class .equals (c );
36
39
}
@@ -45,7 +48,7 @@ public Long deserialize(DataInputStream stream) throws IOException {
45
48
return stream .readLong ();
46
49
}
47
50
});
48
- put (3 , new ISerialization <Float >() {
51
+ put (4 , new ISerialization <Float >() {
49
52
public boolean accept (Class c ) {
50
53
return Float .class .equals (c );
51
54
}
@@ -58,7 +61,7 @@ public Float deserialize(DataInputStream stream) throws IOException {
58
61
return stream .readFloat ();
59
62
}
60
63
});
61
- put (4 , new ISerialization <Double >() {
64
+ put (5 , new ISerialization <Double >() {
62
65
public boolean accept (Class c ) {
63
66
return Double .class .equals (c );
64
67
}
@@ -71,7 +74,7 @@ public Double deserialize(DataInputStream stream) throws IOException {
71
74
return stream .readDouble ();
72
75
}
73
76
});
74
- put (5 , new ISerialization <Byte >() {
77
+ put (6 , new ISerialization <Byte >() {
75
78
public boolean accept (Class c ) {
76
79
return Byte .class .equals (c );
77
80
}
@@ -84,7 +87,7 @@ public Byte deserialize(DataInputStream stream) throws IOException {
84
87
return stream .readByte ();
85
88
}
86
89
});
87
- put (6 , new ISerialization <Short >() {
90
+ put (7 , new ISerialization <Short >() {
88
91
public boolean accept (Class c ) {
89
92
return Short .class .equals (c );
90
93
}
@@ -97,7 +100,7 @@ public Short deserialize(DataInputStream stream) throws IOException {
97
100
return stream .readShort ();
98
101
}
99
102
});
100
- put (7 , new ISerialization <String >() {
103
+ put (8 , new ISerialization <String >() {
101
104
public boolean accept (Class c ) {
102
105
return String .class .equals (c );
103
106
}
@@ -110,7 +113,7 @@ public String deserialize(DataInputStream stream) throws IOException {
110
113
return stream .readUTF ();
111
114
}
112
115
});
113
- put (8 , new ISerialization <Boolean >() {
116
+ put (9 , new ISerialization <Boolean >() {
114
117
public boolean accept (Class c ) {
115
118
return Boolean .class .equals (c );
116
119
}
@@ -123,8 +126,7 @@ public Boolean deserialize(DataInputStream stream) throws IOException {
123
126
return stream .readBoolean ();
124
127
}
125
128
});
126
- put (9 , new ISerialization <byte []>() {
127
-
129
+ put (10 , new ISerialization <byte []>() {
128
130
public boolean accept (Class c ) {
129
131
return EMPTY_BYTE_ARRAY .getClass ().equals (c );
130
132
}
@@ -146,6 +148,7 @@ public byte[] deserialize(DataInputStream stream) throws IOException {
146
148
}};
147
149
148
150
public SerializationFactory (Map conf ) {
151
+ _useJavaSerialization = (Boolean ) conf .get (Config .TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION );
149
152
boolean skipMissing = (Boolean ) conf .get (Config .TOPOLOGY_SKIP_MISSING_SERIALIZATIONS );
150
153
Map <Object , String > customSerializations = (Map <Object , String >) conf .get (Config .TOPOLOGY_SERIALIZATIONS );
151
154
if (customSerializations ==null ) customSerializations = new HashMap <Object , String >();
@@ -171,6 +174,9 @@ public SerializationFactory(Map conf) {
171
174
throw new RuntimeException (e );
172
175
}
173
176
}
177
+ if (_serializations .containsKey (JAVA_SERIALIZATION_TOKEN )) {
178
+ throw new RuntimeException ("Should not have a serialization with the java serialization token" );
179
+ }
174
180
}
175
181
176
182
private int toToken (Object tokenObj ) {
@@ -191,7 +197,11 @@ private int toToken(Object tokenObj) {
191
197
public FieldSerialization getSerializationForToken (int token ) {
192
198
ISerialization ser = _serializations .get (token );
193
199
if (ser ==null ) {
194
- throw new RuntimeException ("Could not find serialization for token " + token );
200
+ if (token ==JAVA_SERIALIZATION_TOKEN ) {
201
+ ser = _javaSerializer ;
202
+ } else {
203
+ throw new RuntimeException ("Could not find serialization for token " + token );
204
+ }
195
205
}
196
206
return new FieldSerialization (token , ser );
197
207
}
@@ -203,6 +213,9 @@ public FieldSerialization getSerializationForClass(Class klass) {
203
213
return getSerializationForToken (token );
204
214
}
205
215
}
216
+ if (_useJavaSerialization && _javaSerializer .accept (klass )) {
217
+ return new FieldSerialization (JAVA_SERIALIZATION_TOKEN , _javaSerializer );
218
+ }
206
219
throw new RuntimeException ("Could not find serialization for class " + klass .toString ());
207
220
}
208
221
}
0 commit comments