Skip to content

Commit c3dfe0f

Browse files
committed
Repair breakage of aggregate FILTER option.
An aggregate's input expression(s) are not supposed to be evaluated at all for a row where its FILTER test fails ... but commit 8ed3f11 overlooked that requirement. Reshuffle so that aggregates having a filter clause evaluate their arguments separately from those without. This still gets the benefit of doing only one ExecProject in the common case of multiple Aggrefs, none of which have filters. While at it, arrange for filter clauses to be included in the common ExecProject evaluation, thus perhaps buying a little bit even when there are filters. Back-patch to v10 where the bug was introduced. Discussion: https://postgr.es/m/30065.1508161354@sss.pgh.pa.us
1 parent 60a1d96 commit c3dfe0f

File tree

4 files changed

+130
-70
lines changed

4 files changed

+130
-70
lines changed

src/backend/executor/nodeAgg.c

Lines changed: 120 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -268,21 +268,23 @@ typedef struct AggStatePerTransData
268268
*/
269269
int numInputs;
270270

271-
/*
272-
* At each input row, we evaluate all argument expressions needed for all
273-
* the aggregates in this Agg node in a single ExecProject call. inputoff
274-
* is the starting index of this aggregate's argument expressions in the
275-
* resulting tuple (in AggState->evalslot).
276-
*/
277-
int inputoff;
278-
279271
/*
280272
* Number of aggregated input columns to pass to the transfn. This
281273
* includes the ORDER BY columns for ordered-set aggs, but not for plain
282274
* aggs. (This doesn't count the transition state value!)
283275
*/
284276
int numTransInputs;
285277

278+
/*
279+
* At each input row, we perform a single ExecProject call to evaluate all
280+
* argument expressions that will certainly be needed at this row; that
281+
* includes this aggregate's filter expression if it has one, or its
282+
* regular argument expressions (including any ORDER BY columns) if it
283+
* doesn't. inputoff is the starting index of this aggregate's required
284+
* expressions in the resulting tuple.
285+
*/
286+
int inputoff;
287+
286288
/* Oid of the state transition or combine function */
287289
Oid transfn_oid;
288290

@@ -295,9 +297,8 @@ typedef struct AggStatePerTransData
295297
/* Oid of state value's datatype */
296298
Oid aggtranstype;
297299

298-
/* ExprStates of the FILTER and argument expressions. */
299-
ExprState *aggfilter; /* state of FILTER expression, if any */
300-
List *aggdirectargs; /* states of direct-argument expressions */
300+
/* ExprStates for any direct-argument expressions */
301+
List *aggdirectargs;
301302

302303
/*
303304
* fmgr lookup data for transition function or combine function. Note in
@@ -353,20 +354,21 @@ typedef struct AggStatePerTransData
353354
transtypeByVal;
354355

355356
/*
356-
* Stuff for evaluation of aggregate inputs in cases where the aggregate
357-
* requires sorted input. The arguments themselves will be evaluated via
358-
* AggState->evalslot/evalproj for all aggregates at once, but we only
359-
* want to sort the relevant columns for individual aggregates.
357+
* Stuff for evaluation of aggregate inputs, when they must be evaluated
358+
* separately because there's a FILTER expression. In such cases we will
359+
* create a sortslot and the result will be stored there, whether or not
360+
* we're actually sorting.
360361
*/
361-
TupleDesc sortdesc; /* descriptor of input tuples */
362+
ProjectionInfo *evalproj; /* projection machinery */
362363

363364
/*
364365
* Slots for holding the evaluated input arguments. These are set up
365-
* during ExecInitAgg() and then used for each input row requiring
366-
* processing besides what's done in AggState->evalproj.
366+
* during ExecInitAgg() and then used for each input row requiring either
367+
* FILTER or ORDER BY/DISTINCT processing.
367368
*/
368369
TupleTableSlot *sortslot; /* current input tuple */
369370
TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */
371+
TupleDesc sortdesc; /* descriptor of input tuples */
370372

371373
/*
372374
* These values are working state that is initialized at the start of an
@@ -983,30 +985,36 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
983985
int numGroupingSets = Max(aggstate->phase->numsets, 1);
984986
int numHashes = aggstate->num_hashes;
985987
int numTrans = aggstate->numtrans;
986-
TupleTableSlot *slot = aggstate->evalslot;
988+
TupleTableSlot *combinedslot;
987989

988-
/* compute input for all aggregates */
989-
if (aggstate->evalproj)
990-
aggstate->evalslot = ExecProject(aggstate->evalproj);
990+
/* compute required inputs for all aggregates */
991+
combinedslot = ExecProject(aggstate->combinedproj);
991992

992993
for (transno = 0; transno < numTrans; transno++)
993994
{
994995
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
995-
ExprState *filter = pertrans->aggfilter;
996996
int numTransInputs = pertrans->numTransInputs;
997-
int i;
998997
int inputoff = pertrans->inputoff;
998+
TupleTableSlot *slot;
999+
int i;
9991000

10001001
/* Skip anything FILTERed out */
1001-
if (filter)
1002+
if (pertrans->aggref->aggfilter)
10021003
{
1003-
Datum res;
1004-
bool isnull;
1005-
1006-
res = ExecEvalExprSwitchContext(filter, aggstate->tmpcontext,
1007-
&isnull);
1008-
if (isnull || !DatumGetBool(res))
1004+
/* Check the result of the filter expression */
1005+
if (combinedslot->tts_isnull[inputoff] ||
1006+
!DatumGetBool(combinedslot->tts_values[inputoff]))
10091007
continue;
1008+
1009+
/* Now it's safe to evaluate this agg's arguments */
1010+
slot = ExecProject(pertrans->evalproj);
1011+
/* There's no offset needed in this slot, of course */
1012+
inputoff = 0;
1013+
}
1014+
else
1015+
{
1016+
/* arguments are already evaluated into combinedslot @ inputoff */
1017+
slot = combinedslot;
10101018
}
10111019

10121020
if (pertrans->numSortCols > 0)
@@ -1040,11 +1048,21 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
10401048
tuplesort_putdatum(pertrans->sortstates[setno],
10411049
slot->tts_values[inputoff],
10421050
slot->tts_isnull[inputoff]);
1051+
else if (pertrans->aggref->aggfilter)
1052+
{
1053+
/*
1054+
* When filtering and ordering, we already have a slot
1055+
* containing just the argument columns.
1056+
*/
1057+
Assert(slot == pertrans->sortslot);
1058+
tuplesort_puttupleslot(pertrans->sortstates[setno], slot);
1059+
}
10431060
else
10441061
{
10451062
/*
1046-
* Copy slot contents, starting from inputoff, into sort
1047-
* slot.
1063+
* Copy argument columns from combined slot, starting at
1064+
* inputoff, into sortslot, so that we can store just the
1065+
* columns we want.
10481066
*/
10491067
ExecClearTuple(pertrans->sortslot);
10501068
memcpy(pertrans->sortslot->tts_values,
@@ -1053,9 +1071,9 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
10531071
memcpy(pertrans->sortslot->tts_isnull,
10541072
&slot->tts_isnull[inputoff],
10551073
pertrans->numInputs * sizeof(bool));
1056-
pertrans->sortslot->tts_nvalid = pertrans->numInputs;
10571074
ExecStoreVirtualTuple(pertrans->sortslot);
1058-
tuplesort_puttupleslot(pertrans->sortstates[setno], pertrans->sortslot);
1075+
tuplesort_puttupleslot(pertrans->sortstates[setno],
1076+
pertrans->sortslot);
10591077
}
10601078
}
10611079
}
@@ -1127,7 +1145,7 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
11271145
Assert(aggstate->phase->numsets <= 1);
11281146

11291147
/* compute input for all aggregates */
1130-
slot = ExecProject(aggstate->evalproj);
1148+
slot = ExecProject(aggstate->combinedproj);
11311149

11321150
for (transno = 0; transno < numTrans; transno++)
11331151
{
@@ -2691,6 +2709,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
26912709
int phase;
26922710
int phaseidx;
26932711
List *combined_inputeval;
2712+
TupleDesc combineddesc;
2713+
TupleTableSlot *combinedslot;
26942714
ListCell *l;
26952715
Bitmapset *all_grouped_cols = NULL;
26962716
int numGroupingSets = 1;
@@ -3366,19 +3386,17 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
33663386
aggstate->numtrans = transno + 1;
33673387

33683388
/*
3369-
* Build a single projection computing the aggregate arguments for all
3389+
* Build a single projection computing the required arguments for all
33703390
* aggregates at once; if there's more than one, that's considerably
33713391
* faster than doing it separately for each.
33723392
*
3373-
* First create a targetlist combining the targetlists of all the
3374-
* per-trans states.
3393+
* First create a targetlist representing the values to compute.
33753394
*/
33763395
combined_inputeval = NIL;
33773396
column_offset = 0;
33783397
for (transno = 0; transno < aggstate->numtrans; transno++)
33793398
{
33803399
AggStatePerTrans pertrans = &pertransstates[transno];
3381-
ListCell *arg;
33823400

33833401
/*
33843402
* Mark this per-trans state with its starting column in the combined
@@ -3387,38 +3405,70 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
33873405
pertrans->inputoff = column_offset;
33883406

33893407
/*
3390-
* Adjust resnos in the copied target entries to match the combined
3391-
* slot.
3408+
* If the aggregate has a FILTER, we can only evaluate the filter
3409+
* expression, not the actual input expressions, during the combined
3410+
* eval step --- unless we're ignoring the filter because this node is
3411+
* running combinefns not transfns.
33923412
*/
3393-
foreach(arg, pertrans->aggref->args)
3413+
if (pertrans->aggref->aggfilter &&
3414+
!DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
33943415
{
3395-
TargetEntry *source_tle = lfirst_node(TargetEntry, arg);
33963416
TargetEntry *tle;
33973417

3398-
tle = flatCopyTargetEntry(source_tle);
3399-
tle->resno += column_offset;
3400-
3418+
tle = makeTargetEntry(pertrans->aggref->aggfilter,
3419+
column_offset + 1, NULL, false);
34013420
combined_inputeval = lappend(combined_inputeval, tle);
3421+
column_offset++;
3422+
3423+
/*
3424+
* We'll need separate projection machinery for the real args.
3425+
* Arrange to evaluate them into the sortslot previously created.
3426+
*/
3427+
Assert(pertrans->sortslot);
3428+
pertrans->evalproj = ExecBuildProjectionInfo(pertrans->aggref->args,
3429+
aggstate->tmpcontext,
3430+
pertrans->sortslot,
3431+
&aggstate->ss.ps,
3432+
NULL);
34023433
}
3434+
else
3435+
{
3436+
/*
3437+
* Add agg's input expressions to combined_inputeval, adjusting
3438+
* resnos in the copied target entries to match the combined slot.
3439+
*/
3440+
ListCell *arg;
3441+
3442+
foreach(arg, pertrans->aggref->args)
3443+
{
3444+
TargetEntry *source_tle = lfirst_node(TargetEntry, arg);
3445+
TargetEntry *tle;
3446+
3447+
tle = flatCopyTargetEntry(source_tle);
3448+
tle->resno += column_offset;
34033449

3404-
column_offset += list_length(pertrans->aggref->args);
3450+
combined_inputeval = lappend(combined_inputeval, tle);
3451+
}
3452+
3453+
column_offset += list_length(pertrans->aggref->args);
3454+
}
34053455
}
34063456

34073457
/* Now create a projection for the combined targetlist */
3408-
aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false);
3409-
aggstate->evalslot = ExecInitExtraTupleSlot(estate);
3410-
aggstate->evalproj = ExecBuildProjectionInfo(combined_inputeval,
3411-
aggstate->tmpcontext,
3412-
aggstate->evalslot,
3413-
&aggstate->ss.ps,
3414-
NULL);
3415-
ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc);
3458+
combineddesc = ExecTypeFromTL(combined_inputeval, false);
3459+
combinedslot = ExecInitExtraTupleSlot(estate);
3460+
ExecSetSlotDescriptor(combinedslot, combineddesc);
3461+
aggstate->combinedproj = ExecBuildProjectionInfo(combined_inputeval,
3462+
aggstate->tmpcontext,
3463+
combinedslot,
3464+
&aggstate->ss.ps,
3465+
NULL);
34163466

34173467
/*
34183468
* Last, check whether any more aggregates got added onto the node while
34193469
* we processed the expressions for the aggregate arguments (including not
3420-
* only the regular arguments handled immediately above, but any FILTER
3421-
* expressions and direct arguments we might've handled earlier). If so,
3470+
* only the regular arguments and FILTER expressions handled immediately
3471+
* above, but any direct arguments we might've handled earlier). If so,
34223472
* we have nested aggregate functions, which is semantically nonsensical,
34233473
* so complain. (This should have been caught by the parser, so we don't
34243474
* need to work hard on a helpful error message; but we defend against it
@@ -3483,6 +3533,8 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
34833533
else
34843534
pertrans->numTransInputs = numArguments;
34853535

3536+
/* inputoff and evalproj will be set up later, in ExecInitAgg */
3537+
34863538
/*
34873539
* When combining states, we have no use at all for the aggregate
34883540
* function's transfn. Instead we use the combinefn. In this case, the
@@ -3598,9 +3650,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
35983650

35993651
}
36003652

3601-
/* Initialize the input and FILTER expressions */
3602-
pertrans->aggfilter = ExecInitExpr(aggref->aggfilter,
3603-
(PlanState *) aggstate);
3653+
/* Initialize any direct-argument expressions */
36043654
pertrans->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
36053655
(PlanState *) aggstate);
36063656

@@ -3634,16 +3684,20 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
36343684
pertrans->numSortCols = numSortCols;
36353685
pertrans->numDistinctCols = numDistinctCols;
36363686

3637-
if (numSortCols > 0)
3687+
/*
3688+
* If we have either sorting or filtering to do, create a tupledesc and
3689+
* slot corresponding to the aggregated inputs (including sort
3690+
* expressions) of the agg.
3691+
*/
3692+
if (numSortCols > 0 || aggref->aggfilter)
36383693
{
3639-
/*
3640-
* Get a tupledesc and slot corresponding to the aggregated inputs
3641-
* (including sort expressions) of the agg.
3642-
*/
36433694
pertrans->sortdesc = ExecTypeFromTL(aggref->args, false);
36443695
pertrans->sortslot = ExecInitExtraTupleSlot(estate);
36453696
ExecSetSlotDescriptor(pertrans->sortslot, pertrans->sortdesc);
3697+
}
36463698

3699+
if (numSortCols > 0)
3700+
{
36473701
/*
36483702
* We don't implement DISTINCT or ORDER BY aggs in the HASHED case
36493703
* (yet)

src/include/nodes/execnodes.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1830,10 +1830,8 @@ typedef struct AggState
18301830
int num_hashes;
18311831
AggStatePerHash perhash;
18321832
AggStatePerGroup *hash_pergroup; /* array of per-group pointers */
1833-
/* support for evaluation of agg inputs */
1834-
TupleTableSlot *evalslot; /* slot for agg inputs */
1835-
ProjectionInfo *evalproj; /* projection machinery */
1836-
TupleDesc evaldesc; /* descriptor of input tuples */
1833+
/* support for evaluation of agg input expressions: */
1834+
ProjectionInfo *combinedproj; /* projection machinery */
18371835
} AggState;
18381836

18391837
/* ----------------

src/test/regress/expected/aggregates.out

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,6 +1388,12 @@ select min(unique1) filter (where unique1 > 100) from tenk1;
13881388
101
13891389
(1 row)
13901390

1391+
select sum(1/ten) filter (where ten > 0) from tenk1;
1392+
sum
1393+
------
1394+
1000
1395+
(1 row)
1396+
13911397
select ten, sum(distinct four) filter (where four::text ~ '123') from onek a
13921398
group by ten;
13931399
ten | sum

src/test/regress/sql/aggregates.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,8 @@ drop table bytea_test_table;
524524

525525
select min(unique1) filter (where unique1 > 100) from tenk1;
526526

527+
select sum(1/ten) filter (where ten > 0) from tenk1;
528+
527529
select ten, sum(distinct four) filter (where four::text ~ '123') from onek a
528530
group by ten;
529531

0 commit comments

Comments
 (0)