Skip to content

Commit 34de24a

Browse files
nongliyhuai
authored andcommitted
[SPARK-12589][SQL] Fix UnsafeRowParquetRecordReader to properly set the row length.
The reader was previously not setting the row length meaning it was wrong if there were variable length columns. This problem does not manifest usually, since the value in the column is correct and projecting the row fixes the issue. Author: Nong Li <nong@databricks.com> Closes apache#10576 from nongli/spark-12589.
1 parent d084a2d commit 34de24a

File tree

3 files changed

+37
-0
lines changed

3 files changed

+37
-0
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ public void pointTo(byte[] buf, int sizeInBytes) {
177177
pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
178178
}
179179

180+
public void setTotalSize(int sizeInBytes) {
181+
this.sizeInBytes = sizeInBytes;
182+
}
183+
180184
public void setNotNullAt(int i) {
181185
assertIndexIsValid(i);
182186
BitSetMethods.unset(baseObject, baseOffset, i);

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,15 @@ private boolean loadBatch() throws IOException {
256256
numBatched = num;
257257
batchIdx = 0;
258258
}
259+
260+
// Update the total row lengths if the schema contained variable length. We did not maintain
261+
// this as we populated the columns.
262+
if (containsVarLenFields) {
263+
for (int i = 0; i < numBatched; ++i) {
264+
rows[i].setTotalSize(rowWriters[i].holder().totalSize());
265+
}
266+
}
267+
259268
return true;
260269
}
261270

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
3838
import org.apache.spark.SparkException
3939
import org.apache.spark.sql._
4040
import org.apache.spark.sql.catalyst.ScalaReflection
41+
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
4142
import org.apache.spark.sql.catalyst.util.DateTimeUtils
4243
import org.apache.spark.sql.test.SharedSQLContext
4344
import org.apache.spark.sql.types._
@@ -618,6 +619,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
618619
readResourceParquetFile("dec-in-fixed-len.parquet"),
619620
sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec))
620621
}
622+
623+
test("SPARK-12589 copy() on rows returned from reader works for strings") {
624+
withTempPath { dir =>
625+
val data = (1, "abc") ::(2, "helloabcde") :: Nil
626+
data.toDF().write.parquet(dir.getCanonicalPath)
627+
var hash1: Int = 0
628+
var hash2: Int = 0
629+
(false :: true :: Nil).foreach { v =>
630+
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> v.toString) {
631+
val df = sqlContext.read.parquet(dir.getCanonicalPath)
632+
val rows = df.queryExecution.toRdd.map(_.copy()).collect()
633+
val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow])
634+
if (!v) {
635+
hash1 = unsafeRows(0).hashCode()
636+
hash2 = unsafeRows(1).hashCode()
637+
} else {
638+
assert(hash1 == unsafeRows(0).hashCode())
639+
assert(hash2 == unsafeRows(1).hashCode())
640+
}
641+
}
642+
}
643+
}
644+
}
621645
}
622646

623647
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)

0 commit comments

Comments
 (0)