Skip to content

Commit 552e5f0

Browse files
committed
[SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in Structured Streaming plan
## What changes were proposed in this pull request? Sort in a streaming plan should be allowed only after a aggregation in complete mode. Currently it is incorrectly allowed when present anywhere in the plan. It gives unpredictable potentially incorrect results. ## How was this patch tested? New test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#16662 from tdas/SPARK-19314.
1 parent e20d9b1 commit 552e5f0

File tree

2 files changed

+8
-3
lines changed

2 files changed

+8
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ object UnsupportedOperationChecker {
8787
* data.
8888
*/
8989
def containsCompleteData(subplan: LogicalPlan): Boolean = {
90-
val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
90+
val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
9191
// Either the subplan has no streaming source, or it has aggregation with Complete mode
9292
!subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
9393
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,17 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
199199
_.intersect(_),
200200
streamStreamSupported = false)
201201

202-
// Sort: supported only on batch subplans and on aggregation + complete output mode
202+
// Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode
203203
testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
204204
assertSupportedInStreamingPlan(
205-
"sort - sort over aggregated data in Complete output mode",
205+
"sort - sort after aggregation in Complete output mode",
206206
streamRelation.groupBy()(Count("*")).sortBy(),
207207
Complete)
208+
assertNotSupportedInStreamingPlan(
209+
"sort - sort before aggregation in Complete output mode",
210+
streamRelation.sortBy().groupBy()(Count("*")),
211+
Complete,
212+
Seq("sort", "aggregat", "complete"))
208213
assertNotSupportedInStreamingPlan(
209214
"sort - sort over aggregated data in Update output mode",
210215
streamRelation.groupBy()(Count("*")).sortBy(),

0 commit comments

Comments
 (0)