@@ -268,21 +268,23 @@ typedef struct AggStatePerTransData
268
268
*/
269
269
int numInputs ;
270
270
271
- /*
272
- * At each input row, we evaluate all argument expressions needed for all
273
- * the aggregates in this Agg node in a single ExecProject call. inputoff
274
- * is the starting index of this aggregate's argument expressions in the
275
- * resulting tuple (in AggState->evalslot).
276
- */
277
- int inputoff ;
278
-
279
271
/*
280
272
* Number of aggregated input columns to pass to the transfn. This
281
273
* includes the ORDER BY columns for ordered-set aggs, but not for plain
282
274
* aggs. (This doesn't count the transition state value!)
283
275
*/
284
276
int numTransInputs ;
285
277
278
+ /*
279
+ * At each input row, we perform a single ExecProject call to evaluate all
280
+ * argument expressions that will certainly be needed at this row; that
281
+ * includes this aggregate's filter expression if it has one, or its
282
+ * regular argument expressions (including any ORDER BY columns) if it
283
+ * doesn't. inputoff is the starting index of this aggregate's required
284
+ * expressions in the resulting tuple.
285
+ */
286
+ int inputoff ;
287
+
286
288
/* Oid of the state transition or combine function */
287
289
Oid transfn_oid ;
288
290
@@ -295,9 +297,8 @@ typedef struct AggStatePerTransData
295
297
/* Oid of state value's datatype */
296
298
Oid aggtranstype ;
297
299
298
- /* ExprStates of the FILTER and argument expressions. */
299
- ExprState * aggfilter ; /* state of FILTER expression, if any */
300
- List * aggdirectargs ; /* states of direct-argument expressions */
300
+ /* ExprStates for any direct-argument expressions */
301
+ List * aggdirectargs ;
301
302
302
303
/*
303
304
* fmgr lookup data for transition function or combine function. Note in
@@ -353,20 +354,21 @@ typedef struct AggStatePerTransData
353
354
transtypeByVal ;
354
355
355
356
/*
356
- * Stuff for evaluation of aggregate inputs in cases where the aggregate
357
- * requires sorted input. The arguments themselves will be evaluated via
358
- * AggState->evalslot/evalproj for all aggregates at once, but we only
359
- * want to sort the relevant columns for individual aggregates .
357
+ * Stuff for evaluation of aggregate inputs, when they must be evaluated
358
+ * separately because there's a FILTER expression. In such cases we will
359
+ * create a sortslot and the result will be stored there, whether or not
360
+ * we're actually sorting .
360
361
*/
361
- TupleDesc sortdesc ; /* descriptor of input tuples */
362
+ ProjectionInfo * evalproj ; /* projection machinery */
362
363
363
364
/*
364
365
* Slots for holding the evaluated input arguments. These are set up
365
- * during ExecInitAgg() and then used for each input row requiring
366
- * processing besides what's done in AggState->evalproj .
366
+ * during ExecInitAgg() and then used for each input row requiring either
367
+ * FILTER or ORDER BY/DISTINCT processing .
367
368
*/
368
369
TupleTableSlot * sortslot ; /* current input tuple */
369
370
TupleTableSlot * uniqslot ; /* used for multi-column DISTINCT */
371
+ TupleDesc sortdesc ; /* descriptor of input tuples */
370
372
371
373
/*
372
374
* These values are working state that is initialized at the start of an
@@ -983,30 +985,36 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
983
985
int numGroupingSets = Max (aggstate -> phase -> numsets , 1 );
984
986
int numHashes = aggstate -> num_hashes ;
985
987
int numTrans = aggstate -> numtrans ;
986
- TupleTableSlot * slot = aggstate -> evalslot ;
988
+ TupleTableSlot * combinedslot ;
987
989
988
- /* compute input for all aggregates */
989
- if (aggstate -> evalproj )
990
- aggstate -> evalslot = ExecProject (aggstate -> evalproj );
990
+ /* compute required inputs for all aggregates */
991
+ combinedslot = ExecProject (aggstate -> combinedproj );
991
992
992
993
for (transno = 0 ; transno < numTrans ; transno ++ )
993
994
{
994
995
AggStatePerTrans pertrans = & aggstate -> pertrans [transno ];
995
- ExprState * filter = pertrans -> aggfilter ;
996
996
int numTransInputs = pertrans -> numTransInputs ;
997
- int i ;
998
997
int inputoff = pertrans -> inputoff ;
998
+ TupleTableSlot * slot ;
999
+ int i ;
999
1000
1000
1001
/* Skip anything FILTERed out */
1001
- if (filter )
1002
+ if (pertrans -> aggref -> aggfilter )
1002
1003
{
1003
- Datum res ;
1004
- bool isnull ;
1005
-
1006
- res = ExecEvalExprSwitchContext (filter , aggstate -> tmpcontext ,
1007
- & isnull );
1008
- if (isnull || !DatumGetBool (res ))
1004
+ /* Check the result of the filter expression */
1005
+ if (combinedslot -> tts_isnull [inputoff ] ||
1006
+ !DatumGetBool (combinedslot -> tts_values [inputoff ]))
1009
1007
continue ;
1008
+
1009
+ /* Now it's safe to evaluate this agg's arguments */
1010
+ slot = ExecProject (pertrans -> evalproj );
1011
+ /* There's no offset needed in this slot, of course */
1012
+ inputoff = 0 ;
1013
+ }
1014
+ else
1015
+ {
1016
+ /* arguments are already evaluated into combinedslot @ inputoff */
1017
+ slot = combinedslot ;
1010
1018
}
1011
1019
1012
1020
if (pertrans -> numSortCols > 0 )
@@ -1040,11 +1048,21 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
1040
1048
tuplesort_putdatum (pertrans -> sortstates [setno ],
1041
1049
slot -> tts_values [inputoff ],
1042
1050
slot -> tts_isnull [inputoff ]);
1051
+ else if (pertrans -> aggref -> aggfilter )
1052
+ {
1053
+ /*
1054
+ * When filtering and ordering, we already have a slot
1055
+ * containing just the argument columns.
1056
+ */
1057
+ Assert (slot == pertrans -> sortslot );
1058
+ tuplesort_puttupleslot (pertrans -> sortstates [setno ], slot );
1059
+ }
1043
1060
else
1044
1061
{
1045
1062
/*
1046
- * Copy slot contents, starting from inputoff, into sort
1047
- * slot.
1063
+ * Copy argument columns from combined slot, starting at
1064
+ * inputoff, into sortslot, so that we can store just the
1065
+ * columns we want.
1048
1066
*/
1049
1067
ExecClearTuple (pertrans -> sortslot );
1050
1068
memcpy (pertrans -> sortslot -> tts_values ,
@@ -1053,9 +1071,9 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
1053
1071
memcpy (pertrans -> sortslot -> tts_isnull ,
1054
1072
& slot -> tts_isnull [inputoff ],
1055
1073
pertrans -> numInputs * sizeof (bool ));
1056
- pertrans -> sortslot -> tts_nvalid = pertrans -> numInputs ;
1057
1074
ExecStoreVirtualTuple (pertrans -> sortslot );
1058
- tuplesort_puttupleslot (pertrans -> sortstates [setno ], pertrans -> sortslot );
1075
+ tuplesort_puttupleslot (pertrans -> sortstates [setno ],
1076
+ pertrans -> sortslot );
1059
1077
}
1060
1078
}
1061
1079
}
@@ -1127,7 +1145,7 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
1127
1145
Assert (aggstate -> phase -> numsets <= 1 );
1128
1146
1129
1147
/* compute input for all aggregates */
1130
- slot = ExecProject (aggstate -> evalproj );
1148
+ slot = ExecProject (aggstate -> combinedproj );
1131
1149
1132
1150
for (transno = 0 ; transno < numTrans ; transno ++ )
1133
1151
{
@@ -2691,6 +2709,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
2691
2709
int phase ;
2692
2710
int phaseidx ;
2693
2711
List * combined_inputeval ;
2712
+ TupleDesc combineddesc ;
2713
+ TupleTableSlot * combinedslot ;
2694
2714
ListCell * l ;
2695
2715
Bitmapset * all_grouped_cols = NULL ;
2696
2716
int numGroupingSets = 1 ;
@@ -3366,19 +3386,17 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
3366
3386
aggstate -> numtrans = transno + 1 ;
3367
3387
3368
3388
/*
3369
- * Build a single projection computing the aggregate arguments for all
3389
+ * Build a single projection computing the required arguments for all
3370
3390
* aggregates at once; if there's more than one, that's considerably
3371
3391
* faster than doing it separately for each.
3372
3392
*
3373
- * First create a targetlist combining the targetlists of all the
3374
- * per-trans states.
3393
+ * First create a targetlist representing the values to compute.
3375
3394
*/
3376
3395
combined_inputeval = NIL ;
3377
3396
column_offset = 0 ;
3378
3397
for (transno = 0 ; transno < aggstate -> numtrans ; transno ++ )
3379
3398
{
3380
3399
AggStatePerTrans pertrans = & pertransstates [transno ];
3381
- ListCell * arg ;
3382
3400
3383
3401
/*
3384
3402
* Mark this per-trans state with its starting column in the combined
@@ -3387,38 +3405,70 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
3387
3405
pertrans -> inputoff = column_offset ;
3388
3406
3389
3407
/*
3390
- * Adjust resnos in the copied target entries to match the combined
3391
- * slot.
3408
+ * If the aggregate has a FILTER, we can only evaluate the filter
3409
+ * expression, not the actual input expressions, during the combined
3410
+ * eval step --- unless we're ignoring the filter because this node is
3411
+ * running combinefns not transfns.
3392
3412
*/
3393
- foreach (arg , pertrans -> aggref -> args )
3413
+ if (pertrans -> aggref -> aggfilter &&
3414
+ !DO_AGGSPLIT_COMBINE (aggstate -> aggsplit ))
3394
3415
{
3395
- TargetEntry * source_tle = lfirst_node (TargetEntry , arg );
3396
3416
TargetEntry * tle ;
3397
3417
3398
- tle = flatCopyTargetEntry (source_tle );
3399
- tle -> resno += column_offset ;
3400
-
3418
+ tle = makeTargetEntry (pertrans -> aggref -> aggfilter ,
3419
+ column_offset + 1 , NULL , false);
3401
3420
combined_inputeval = lappend (combined_inputeval , tle );
3421
+ column_offset ++ ;
3422
+
3423
+ /*
3424
+ * We'll need separate projection machinery for the real args.
3425
+ * Arrange to evaluate them into the sortslot previously created.
3426
+ */
3427
+ Assert (pertrans -> sortslot );
3428
+ pertrans -> evalproj = ExecBuildProjectionInfo (pertrans -> aggref -> args ,
3429
+ aggstate -> tmpcontext ,
3430
+ pertrans -> sortslot ,
3431
+ & aggstate -> ss .ps ,
3432
+ NULL );
3402
3433
}
3434
+ else
3435
+ {
3436
+ /*
3437
+ * Add agg's input expressions to combined_inputeval, adjusting
3438
+ * resnos in the copied target entries to match the combined slot.
3439
+ */
3440
+ ListCell * arg ;
3441
+
3442
+ foreach (arg , pertrans -> aggref -> args )
3443
+ {
3444
+ TargetEntry * source_tle = lfirst_node (TargetEntry , arg );
3445
+ TargetEntry * tle ;
3446
+
3447
+ tle = flatCopyTargetEntry (source_tle );
3448
+ tle -> resno += column_offset ;
3403
3449
3404
- column_offset += list_length (pertrans -> aggref -> args );
3450
+ combined_inputeval = lappend (combined_inputeval , tle );
3451
+ }
3452
+
3453
+ column_offset += list_length (pertrans -> aggref -> args );
3454
+ }
3405
3455
}
3406
3456
3407
3457
/* Now create a projection for the combined targetlist */
3408
- aggstate -> evaldesc = ExecTypeFromTL (combined_inputeval , false);
3409
- aggstate -> evalslot = ExecInitExtraTupleSlot (estate );
3410
- aggstate -> evalproj = ExecBuildProjectionInfo ( combined_inputeval ,
3411
- aggstate -> tmpcontext ,
3412
- aggstate -> evalslot ,
3413
- & aggstate -> ss . ps ,
3414
- NULL );
3415
- ExecSetSlotDescriptor ( aggstate -> evalslot , aggstate -> evaldesc );
3458
+ combineddesc = ExecTypeFromTL (combined_inputeval , false);
3459
+ combinedslot = ExecInitExtraTupleSlot (estate );
3460
+ ExecSetSlotDescriptor ( combinedslot , combineddesc );
3461
+ aggstate -> combinedproj = ExecBuildProjectionInfo ( combined_inputeval ,
3462
+ aggstate -> tmpcontext ,
3463
+ combinedslot ,
3464
+ & aggstate -> ss . ps ,
3465
+ NULL );
3416
3466
3417
3467
/*
3418
3468
* Last, check whether any more aggregates got added onto the node while
3419
3469
* we processed the expressions for the aggregate arguments (including not
3420
- * only the regular arguments handled immediately above, but any FILTER
3421
- * expressions and direct arguments we might've handled earlier). If so,
3470
+ * only the regular arguments and FILTER expressions handled immediately
3471
+ * above, but any direct arguments we might've handled earlier). If so,
3422
3472
* we have nested aggregate functions, which is semantically nonsensical,
3423
3473
* so complain. (This should have been caught by the parser, so we don't
3424
3474
* need to work hard on a helpful error message; but we defend against it
@@ -3483,6 +3533,8 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
3483
3533
else
3484
3534
pertrans -> numTransInputs = numArguments ;
3485
3535
3536
+ /* inputoff and evalproj will be set up later, in ExecInitAgg */
3537
+
3486
3538
/*
3487
3539
* When combining states, we have no use at all for the aggregate
3488
3540
* function's transfn. Instead we use the combinefn. In this case, the
@@ -3598,9 +3650,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
3598
3650
3599
3651
}
3600
3652
3601
- /* Initialize the input and FILTER expressions */
3602
- pertrans -> aggfilter = ExecInitExpr (aggref -> aggfilter ,
3603
- (PlanState * ) aggstate );
3653
+ /* Initialize any direct-argument expressions */
3604
3654
pertrans -> aggdirectargs = ExecInitExprList (aggref -> aggdirectargs ,
3605
3655
(PlanState * ) aggstate );
3606
3656
@@ -3634,16 +3684,20 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
3634
3684
pertrans -> numSortCols = numSortCols ;
3635
3685
pertrans -> numDistinctCols = numDistinctCols ;
3636
3686
3637
- if (numSortCols > 0 )
3687
+ /*
3688
+ * If we have either sorting or filtering to do, create a tupledesc and
3689
+ * slot corresponding to the aggregated inputs (including sort
3690
+ * expressions) of the agg.
3691
+ */
3692
+ if (numSortCols > 0 || aggref -> aggfilter )
3638
3693
{
3639
- /*
3640
- * Get a tupledesc and slot corresponding to the aggregated inputs
3641
- * (including sort expressions) of the agg.
3642
- */
3643
3694
pertrans -> sortdesc = ExecTypeFromTL (aggref -> args , false);
3644
3695
pertrans -> sortslot = ExecInitExtraTupleSlot (estate );
3645
3696
ExecSetSlotDescriptor (pertrans -> sortslot , pertrans -> sortdesc );
3697
+ }
3646
3698
3699
+ if (numSortCols > 0 )
3700
+ {
3647
3701
/*
3648
3702
* We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3649
3703
* (yet)
0 commit comments