@@ -24,7 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
24
24
import org .apache .spark .rdd .RDD
25
25
import org .scalactic .Tolerance ._
26
26
27
- import org .apache .spark .sql .{ QueryTest , Row , SQLConf }
27
+ import org .apache .spark .sql ._
28
28
import org .apache .spark .sql .catalyst .util .DateTimeUtils
29
29
import org .apache .spark .sql .execution .datasources .{ResolvedDataSource , LogicalRelation }
30
30
import org .apache .spark .sql .execution .datasources .json .InferSchema .compatibleType
@@ -1163,4 +1163,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
1163
1163
" SELECT count(a) FROM test_myjson_with_part where d1 = 1" ), Row (9 ))
1164
1164
})
1165
1165
}
1166
+
1167
+ test(" backward compatibility" ) {
1168
+ // This test we make sure our JSON support can read JSON data generated by previous version
1169
+ // of Spark generated through toJSON method and JSON data source.
1170
+ // The data is generated by the following program.
1171
+ // Here are a few notes:
1172
+ // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
1173
+ // in the JSON object.
1174
+ // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
1175
+ // JSON objects generated by those Spark versions (col17).
1176
+ // - If the type is NullType, we do not write data out.
1177
+
1178
+ // Create the schema.
1179
+ val struct =
1180
+ StructType (
1181
+ StructField (" f1" , FloatType , true ) ::
1182
+ StructField (" f2" , ArrayType (BooleanType ), true ) :: Nil )
1183
+
1184
+ val dataTypes =
1185
+ Seq (
1186
+ StringType , BinaryType , NullType , BooleanType ,
1187
+ ByteType , ShortType , IntegerType , LongType ,
1188
+ FloatType , DoubleType , DecimalType (25 , 5 ), DecimalType (6 , 5 ),
1189
+ DateType , TimestampType ,
1190
+ ArrayType (IntegerType ), MapType (StringType , LongType ), struct,
1191
+ new MyDenseVectorUDT ())
1192
+ val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
1193
+ StructField (s " col $index" , dataType, nullable = true )
1194
+ }
1195
+ val schema = StructType (fields)
1196
+
1197
+ val constantValues =
1198
+ Seq (
1199
+ " a string in binary" .getBytes(" UTF-8" ),
1200
+ null ,
1201
+ true ,
1202
+ 1 .toByte,
1203
+ 2 .toShort,
1204
+ 3 ,
1205
+ Long .MaxValue ,
1206
+ 0.25 .toFloat,
1207
+ 0.75 ,
1208
+ new java.math.BigDecimal (s " 1234.23456 " ),
1209
+ new java.math.BigDecimal (s " 1.23456 " ),
1210
+ java.sql.Date .valueOf(" 2015-01-01" ),
1211
+ java.sql.Timestamp .valueOf(" 2015-01-01 23:50:59.123" ),
1212
+ Seq (2 , 3 , 4 ),
1213
+ Map (" a string" -> 2000L ),
1214
+ Row (4.75 .toFloat, Seq (false , true )),
1215
+ new MyDenseVector (Array (0.25 , 2.25 , 4.25 )))
1216
+ val data =
1217
+ Row .fromSeq(Seq (" Spark " + sqlContext.sparkContext.version) ++ constantValues) :: Nil
1218
+
1219
+ // Data generated by previous versions.
1220
+ // scalastyle:off
1221
+ val existingJSONData =
1222
+ """ {"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1223
+ """ {"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1224
+ """ {"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1225
+ """ {"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1226
+ """ {"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1227
+ """ {"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1228
+ """ {"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
1229
+ // scalastyle:on
1230
+
1231
+ // Generate data for the current version.
1232
+ val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data, 1 ), schema)
1233
+ withTempPath { path =>
1234
+ df.write.format(" json" ).mode(" overwrite" ).save(path.getCanonicalPath)
1235
+
1236
+ // df.toJSON will convert internal rows to external rows first and then generate
1237
+ // JSON objects. While, df.write.format("json") will write internal rows directly.
1238
+ val allJSON =
1239
+ existingJSONData ++
1240
+ df.toJSON.collect() ++
1241
+ sparkContext.textFile(path.getCanonicalPath).collect()
1242
+
1243
+ Utils .deleteRecursively(path)
1244
+ sparkContext.parallelize(allJSON, 1 ).saveAsTextFile(path.getCanonicalPath)
1245
+
1246
+ // Read data back with the schema specified.
1247
+ val col0Values =
1248
+ Seq (
1249
+ " Spark 1.2.2" ,
1250
+ " Spark 1.3.1" ,
1251
+ " Spark 1.3.1" ,
1252
+ " Spark 1.4.1" ,
1253
+ " Spark 1.4.1" ,
1254
+ " Spark 1.5.0" ,
1255
+ " Spark 1.5.0" ,
1256
+ " Spark " + sqlContext.sparkContext.version,
1257
+ " Spark " + sqlContext.sparkContext.version)
1258
+ val expectedResult = col0Values.map { v =>
1259
+ Row .fromSeq(Seq (v) ++ constantValues)
1260
+ }
1261
+ checkAnswer(
1262
+ sqlContext.read.format(" json" ).schema(schema).load(path.getCanonicalPath),
1263
+ expectedResult
1264
+ )
1265
+ }
1266
+ }
1166
1267
}
0 commit comments