Skip to content

Commit 7ab4d17

Browse files
yhuailiancheng
authored andcommitted
[SPARK-10495] [SQL] Read date values in JSON data stored by Spark 1.5.0.
https://issues.apache.org/jira/browse/SPARK-10681 Author: Yin Huai <yhuai@databricks.com> Closes apache#8806 from yhuai/SPARK-10495. (cherry picked from commit 0494c80) Signed-off-by: Cheng Lian <lian@databricks.com>
1 parent 6152099 commit 7ab4d17

File tree

3 files changed

+152
-2
lines changed

3 files changed

+152
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,38 @@ private[sql] object JacksonGenerator {
7373
valWriter(field.dataType, v)
7474
}
7575
gen.writeEndObject()
76+
77+
// For UDT, udt.serialize will produce SQL types. So, we need the following three cases.
78+
case (ArrayType(ty, _), v: ArrayData) =>
79+
gen.writeStartArray()
80+
v.foreach(ty, (_, value) => valWriter(ty, value))
81+
gen.writeEndArray()
82+
83+
case (MapType(kt, vt, _), v: MapData) =>
84+
gen.writeStartObject()
85+
v.foreach(kt, vt, { (k, v) =>
86+
gen.writeFieldName(k.toString)
87+
valWriter(vt, v)
88+
})
89+
gen.writeEndObject()
90+
91+
case (StructType(ty), v: InternalRow) =>
92+
gen.writeStartObject()
93+
var i = 0
94+
while (i < ty.length) {
95+
val field = ty(i)
96+
val value = v.get(i, field.dataType)
97+
if (value != null) {
98+
gen.writeFieldName(field.name)
99+
valWriter(field.dataType, value)
100+
}
101+
i += 1
102+
}
103+
gen.writeEndObject()
104+
105+
case (dt, v) =>
106+
sys.error(
107+
s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
76108
}
77109

78110
valWriter(rowSchema, row)
@@ -133,6 +165,10 @@ private[sql] object JacksonGenerator {
133165
i += 1
134166
}
135167
gen.writeEndObject()
168+
169+
case (dt, v) =>
170+
sys.error(
171+
s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
136172
}
137173

138174
valWriter(rowSchema, row)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,23 @@ private[sql] object JacksonParser {
6262
// guard the non string type
6363
null
6464

65+
case (VALUE_STRING, BinaryType) =>
66+
parser.getBinaryValue
67+
6568
case (VALUE_STRING, DateType) =>
66-
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
69+
val stringValue = parser.getText
70+
if (stringValue.contains("-")) {
71+
// The format of this string will probably be "yyyy-mm-dd".
72+
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
73+
} else {
74+
// In Spark 1.5.0, we store the data as number of days since epoch in string.
75+
// So, we just convert it to Int.
76+
stringValue.toInt
77+
}
6778

6879
case (VALUE_STRING, TimestampType) =>
80+
// This one will lose microseconds parts.
81+
// See https://issues.apache.org/jira/browse/SPARK-10681.
6982
DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
7083

7184
case (VALUE_NUMBER_INT, TimestampType) =>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
2424
import org.apache.spark.rdd.RDD
2525
import org.scalactic.Tolerance._
2626

27-
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
27+
import org.apache.spark.sql._
2828
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2929
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
3030
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
@@ -1163,4 +1163,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11631163
"SELECT count(a) FROM test_myjson_with_part where d1 = 1"), Row(9))
11641164
})
11651165
}
1166+
1167+
test("backward compatibility") {
1168+
// This test we make sure our JSON support can read JSON data generated by previous version
1169+
// of Spark generated through toJSON method and JSON data source.
1170+
// The data is generated by the following program.
1171+
// Here are a few notes:
1172+
// - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
1173+
// in the JSON object.
1174+
// - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
1175+
// JSON objects generated by those Spark versions (col17).
1176+
// - If the type is NullType, we do not write data out.
1177+
1178+
// Create the schema.
1179+
val struct =
1180+
StructType(
1181+
StructField("f1", FloatType, true) ::
1182+
StructField("f2", ArrayType(BooleanType), true) :: Nil)
1183+
1184+
val dataTypes =
1185+
Seq(
1186+
StringType, BinaryType, NullType, BooleanType,
1187+
ByteType, ShortType, IntegerType, LongType,
1188+
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
1189+
DateType, TimestampType,
1190+
ArrayType(IntegerType), MapType(StringType, LongType), struct,
1191+
new MyDenseVectorUDT())
1192+
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
1193+
StructField(s"col$index", dataType, nullable = true)
1194+
}
1195+
val schema = StructType(fields)
1196+
1197+
val constantValues =
1198+
Seq(
1199+
"a string in binary".getBytes("UTF-8"),
1200+
null,
1201+
true,
1202+
1.toByte,
1203+
2.toShort,
1204+
3,
1205+
Long.MaxValue,
1206+
0.25.toFloat,
1207+
0.75,
1208+
new java.math.BigDecimal(s"1234.23456"),
1209+
new java.math.BigDecimal(s"1.23456"),
1210+
java.sql.Date.valueOf("2015-01-01"),
1211+
java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
1212+
Seq(2, 3, 4),
1213+
Map("a string" -> 2000L),
1214+
Row(4.75.toFloat, Seq(false, true)),
1215+
new MyDenseVector(Array(0.25, 2.25, 4.25)))
1216+
val data =
1217+
Row.fromSeq(Seq("Spark " + sqlContext.sparkContext.version) ++ constantValues) :: Nil
1218+
1219+
// Data generated by previous versions.
1220+
// scalastyle:off
1221+
val existingJSONData =
1222+
"""{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1223+
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1224+
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1225+
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1226+
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1227+
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1228+
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
1229+
// scalastyle:on
1230+
1231+
// Generate data for the current version.
1232+
val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data, 1), schema)
1233+
withTempPath { path =>
1234+
df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
1235+
1236+
// df.toJSON will convert internal rows to external rows first and then generate
1237+
// JSON objects. While, df.write.format("json") will write internal rows directly.
1238+
val allJSON =
1239+
existingJSONData ++
1240+
df.toJSON.collect() ++
1241+
sparkContext.textFile(path.getCanonicalPath).collect()
1242+
1243+
Utils.deleteRecursively(path)
1244+
sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
1245+
1246+
// Read data back with the schema specified.
1247+
val col0Values =
1248+
Seq(
1249+
"Spark 1.2.2",
1250+
"Spark 1.3.1",
1251+
"Spark 1.3.1",
1252+
"Spark 1.4.1",
1253+
"Spark 1.4.1",
1254+
"Spark 1.5.0",
1255+
"Spark 1.5.0",
1256+
"Spark " + sqlContext.sparkContext.version,
1257+
"Spark " + sqlContext.sparkContext.version)
1258+
val expectedResult = col0Values.map { v =>
1259+
Row.fromSeq(Seq(v) ++ constantValues)
1260+
}
1261+
checkAnswer(
1262+
sqlContext.read.format("json").schema(schema).load(path.getCanonicalPath),
1263+
expectedResult
1264+
)
1265+
}
1266+
}
11661267
}

0 commit comments

Comments
 (0)