Skip to content

Commit 1dc71ec

Browse files
committed
[SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source filter API
JIRA: https://issues.apache.org/jira/browse/SPARK-12218 When creating filters for Parquet/ORC, we should not push nested AND expressions partially. Author: Yin Huai <yhuai@databricks.com> Closes apache#10362 from yhuai/SPARK-12218. (cherry picked from commit 41ee7c5) Signed-off-by: Yin Huai <yhuai@databricks.com>
1 parent df02319 commit 1dc71ec

File tree

4 files changed

+60
-13
lines changed

4 files changed

+60
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,17 @@ private[sql] object ParquetFilters {
257257
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
258258

259259
case sources.And(lhs, rhs) =>
260-
(createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and)
260+
// At here, it is not safe to just convert one side if we do not understand the
261+
// other side. Here is an example used to explain the reason.
262+
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
263+
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
264+
// NOT(a = 2), which will generate wrong results.
265+
// Pushing one side of AND down is only safe to do at the top level.
266+
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
267+
for {
268+
lhsFilter <- createFilter(schema, lhs)
269+
rhsFilter <- createFilter(schema, rhs)
270+
} yield FilterApi.and(lhsFilter, rhsFilter)
261271

262272
case sources.Or(lhs, rhs) =>
263273
for {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,4 +364,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
364364
}
365365
}
366366
}
367+
368+
test("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
369+
import testImplicits._
370+
371+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
372+
withTempPath { dir =>
373+
val path = s"${dir.getCanonicalPath}/table1"
374+
(1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.parquet(path)
375+
376+
checkAnswer(
377+
sqlContext.read.parquet(path).where("not (a = 2) or not(b in ('1'))"),
378+
(1 to 5).map(i => Row(i, (i % 2).toString)))
379+
380+
checkAnswer(
381+
sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"),
382+
(1 to 5).map(i => Row(i, (i % 2).toString)))
383+
}
384+
}
385+
}
367386
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,20 @@ private[orc] object OrcFilters extends Logging {
7474

7575
expression match {
7676
case And(left, right) =>
77-
val tryLeft = buildSearchArgument(left, newBuilder)
78-
val tryRight = buildSearchArgument(right, newBuilder)
79-
80-
val conjunction = for {
81-
_ <- tryLeft
82-
_ <- tryRight
77+
// At here, it is not safe to just convert one side if we do not understand the
78+
// other side. Here is an example used to explain the reason.
79+
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
80+
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
81+
// NOT(a = 2), which will generate wrong results.
82+
// Pushing one side of AND down is only safe to do at the top level.
83+
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
84+
for {
85+
_ <- buildSearchArgument(left, newBuilder)
86+
_ <- buildSearchArgument(right, newBuilder)
8387
lhs <- buildSearchArgument(left, builder.startAnd())
8488
rhs <- buildSearchArgument(right, lhs)
8589
} yield rhs.end()
8690

87-
// For filter `left AND right`, we can still push down `left` even if `right` is not
88-
// convertible, and vice versa.
89-
conjunction
90-
.orElse(tryLeft.flatMap(_ => buildSearchArgument(left, builder)))
91-
.orElse(tryRight.flatMap(_ => buildSearchArgument(right, builder)))
92-
9391
case Or(left, right) =>
9492
for {
9593
_ <- buildSearchArgument(left, newBuilder)

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.orc
2020
import org.apache.hadoop.fs.Path
2121

2222
import org.apache.spark.deploy.SparkHadoopUtil
23+
import org.apache.spark.sql.{Row, SQLConf}
2324
import org.apache.spark.sql.sources.HadoopFsRelationTest
2425
import org.apache.spark.sql.types._
2526

@@ -60,4 +61,23 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
6061
"dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load())
6162
}
6263
}
64+
65+
test("SPARK-12218: 'Not' is included in ORC filter pushdown") {
66+
import testImplicits._
67+
68+
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
69+
withTempPath { dir =>
70+
val path = s"${dir.getCanonicalPath}/table1"
71+
(1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path)
72+
73+
checkAnswer(
74+
sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"),
75+
(1 to 5).map(i => Row(i, (i % 2).toString)))
76+
77+
checkAnswer(
78+
sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"),
79+
(1 to 5).map(i => Row(i, (i % 2).toString)))
80+
}
81+
}
82+
}
6383
}

0 commit comments

Comments
 (0)