@@ -95,14 +95,14 @@ object SamplePushDown extends Rule[LogicalPlan] {
95
95
* Intersect:
96
96
* It is not safe to pushdown Projections through it because we need to get the
97
97
* intersect of rows by comparing the entire rows. It is fine to pushdown Filters
98
- * because we will not have non- deterministic expressions .
98
+ * with deterministic condition .
99
99
*
100
100
* Except:
101
101
* It is not safe to pushdown Projections through it because we need to get the
102
102
* intersect of rows by comparing the entire rows. It is fine to pushdown Filters
103
- * because we will not have non- deterministic expressions .
103
+ * with deterministic condition .
104
104
*/
105
- object SetOperationPushDown extends Rule [LogicalPlan ] {
105
+ object SetOperationPushDown extends Rule [LogicalPlan ] with PredicateHelper {
106
106
107
107
/**
108
108
* Maps Attributes from the left side to the corresponding Attribute on the right side.
@@ -129,34 +129,65 @@ object SetOperationPushDown extends Rule[LogicalPlan] {
129
129
result.asInstanceOf [A ]
130
130
}
131
131
132
+ /**
133
+ * Splits the condition expression into small conditions by `And`, and partition them by
134
+ * deterministic, and finally recombine them by `And`. It returns an expression containing
135
+ * all deterministic expressions (the first field of the returned Tuple2) and an expression
136
+ * containing all non-deterministic expressions (the second field of the returned Tuple2).
137
+ */
138
+ private def partitionByDeterministic (condition : Expression ): (Expression , Expression ) = {
139
+ val andConditions = splitConjunctivePredicates(condition)
140
+ andConditions.partition(_.deterministic) match {
141
+ case (deterministic, nondeterministic) =>
142
+ deterministic.reduceOption(And ).getOrElse(Literal (true )) ->
143
+ nondeterministic.reduceOption(And ).getOrElse(Literal (true ))
144
+ }
145
+ }
146
+
132
147
def apply (plan : LogicalPlan ): LogicalPlan = plan transform {
133
148
// Push down filter into union
134
149
case Filter (condition, u @ Union (left, right)) =>
150
+ val (deterministic, nondeterministic) = partitionByDeterministic(condition)
135
151
val rewrites = buildRewrites(u)
136
- Union (
137
- Filter (condition, left),
138
- Filter (pushToRight(condition, rewrites), right))
139
-
140
- // Push down projection through UNION ALL
141
- case Project (projectList, u @ Union (left, right)) =>
142
- val rewrites = buildRewrites(u)
143
- Union (
144
- Project (projectList, left),
145
- Project (projectList.map(pushToRight(_, rewrites)), right))
152
+ Filter (nondeterministic,
153
+ Union (
154
+ Filter (deterministic, left),
155
+ Filter (pushToRight(deterministic, rewrites), right)
156
+ )
157
+ )
158
+
159
+ // Push down deterministic projection through UNION ALL
160
+ case p @ Project (projectList, u @ Union (left, right)) =>
161
+ if (projectList.forall(_.deterministic)) {
162
+ val rewrites = buildRewrites(u)
163
+ Union (
164
+ Project (projectList, left),
165
+ Project (projectList.map(pushToRight(_, rewrites)), right))
166
+ } else {
167
+ p
168
+ }
146
169
147
170
// Push down filter through INTERSECT
148
171
case Filter (condition, i @ Intersect (left, right)) =>
172
+ val (deterministic, nondeterministic) = partitionByDeterministic(condition)
149
173
val rewrites = buildRewrites(i)
150
- Intersect (
151
- Filter (condition, left),
152
- Filter (pushToRight(condition, rewrites), right))
174
+ Filter (nondeterministic,
175
+ Intersect (
176
+ Filter (deterministic, left),
177
+ Filter (pushToRight(deterministic, rewrites), right)
178
+ )
179
+ )
153
180
154
181
// Push down filter through EXCEPT
155
182
case Filter (condition, e @ Except (left, right)) =>
183
+ val (deterministic, nondeterministic) = partitionByDeterministic(condition)
156
184
val rewrites = buildRewrites(e)
157
- Except (
158
- Filter (condition, left),
159
- Filter (pushToRight(condition, rewrites), right))
185
+ Filter (nondeterministic,
186
+ Except (
187
+ Filter (deterministic, left),
188
+ Filter (pushToRight(deterministic, rewrites), right)
189
+ )
190
+ )
160
191
}
161
192
}
162
193
0 commit comments