Skip to content

Commit a7de3dc

Browse files
committed
Support multi-stage aggregation.
Aggregate nodes now have two new modes: a "partial" mode where they output the unfinalized transition state, and a "finalize" mode where they accept unfinalized transition states rather than individual values as input. These new modes are not used anywhere yet, but they will be necessary for parallel aggregation. The infrastructure also figures to be useful for cases where we want to aggregate local data and remote data via the FDW interface, and want to bring back partial aggregates from the remote side that can then be combined with locally generated partial aggregates to produce the final value. It may also be useful even when neither FDWs nor parallelism are in play, as explained in the comments in nodeAgg.c. David Rowley and Simon Riggs, reviewed by KaiGai Kohei, Heikki Linnakangas, Haribabu Kommi, and me.
1 parent c8642d9 commit a7de3dc

File tree

21 files changed

+652
-234
lines changed

21 files changed

+652
-234
lines changed

doc/src/sgml/ref/create_aggregate.sgml

+14-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ <replacea
2727
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
2828
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
2929
[ , FINALFUNC_EXTRA ]
30+
[ , COMBINEFUNC = <replaceable class="PARAMETER">combinefunc</replaceable> ]
3031
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
3132
[ , MSFUNC = <replaceable class="PARAMETER">msfunc</replaceable> ]
3233
[ , MINVFUNC = <replaceable class="PARAMETER">minvfunc</replaceable> ]
@@ -45,6 +46,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ [ <replac
4546
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
4647
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
4748
[ , FINALFUNC_EXTRA ]
49+
[ , COMBINEFUNC = <replaceable class="PARAMETER">combinefunc</replaceable> ]
4850
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
4951
[ , HYPOTHETICAL ]
5052
)
@@ -58,6 +60,7 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
5860
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
5961
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
6062
[ , FINALFUNC_EXTRA ]
63+
[ , COMBINEFUNC = <replaceable class="PARAMETER">combinefunc</replaceable> ]
6164
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
6265
[ , MSFUNC = <replaceable class="PARAMETER">msfunc</replaceable> ]
6366
[ , MINVFUNC = <replaceable class="PARAMETER">minvfunc</replaceable> ]
@@ -105,12 +108,15 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
105108
functions:
106109
a state transition function
107110
<replaceable class="PARAMETER">sfunc</replaceable>,
108-
and an optional final calculation function
109-
<replaceable class="PARAMETER">ffunc</replaceable>.
111+
an optional final calculation function
112+
<replaceable class="PARAMETER">ffunc</replaceable>,
113+
and an optional combine function
114+
<replaceable class="PARAMETER">combinefunc</replaceable>.
110115
These are used as follows:
111116
<programlisting>
112117
<replaceable class="PARAMETER">sfunc</replaceable>( internal-state, next-data-values ) ---> next-internal-state
113118
<replaceable class="PARAMETER">ffunc</replaceable>( internal-state ) ---> aggregate-value
119+
<replaceable class="PARAMETER">combinefunc</replaceable>( internal-state, internal-state ) ---> next-internal-state
114120
</programlisting>
115121
</para>
116122

@@ -127,6 +133,12 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
127133
is returned as-is.
128134
</para>
129135

136+
<para>
137+
An aggregate function may also supply a combining function, which allows
138+
the aggregation process to be broken down into multiple steps. This
139+
facilitates query optimization techniques such as parallel query.
140+
</para>
141+
130142
<para>
131143
An aggregate function can provide an initial condition,
132144
that is, an initial value for the internal state value.

src/backend/catalog/pg_aggregate.c

+37-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ AggregateCreate(const char *aggName,
5757
Oid variadicArgType,
5858
List *aggtransfnName,
5959
List *aggfinalfnName,
60+
List *aggcombinefnName,
6061
List *aggmtransfnName,
6162
List *aggminvtransfnName,
6263
List *aggmfinalfnName,
@@ -77,6 +78,7 @@ AggregateCreate(const char *aggName,
7778
Form_pg_proc proc;
7879
Oid transfn;
7980
Oid finalfn = InvalidOid; /* can be omitted */
81+
Oid combinefn = InvalidOid; /* can be omitted */
8082
Oid mtransfn = InvalidOid; /* can be omitted */
8183
Oid minvtransfn = InvalidOid; /* can be omitted */
8284
Oid mfinalfn = InvalidOid; /* can be omitted */
@@ -396,6 +398,30 @@ AggregateCreate(const char *aggName,
396398
}
397399
Assert(OidIsValid(finaltype));
398400

401+
/* handle the combinefn, if supplied */
402+
if (aggcombinefnName)
403+
{
404+
Oid combineType;
405+
406+
/*
407+
* Combine function must have 2 argument, each of which is the
408+
* trans type
409+
*/
410+
fnArgs[0] = aggTransType;
411+
fnArgs[1] = aggTransType;
412+
413+
combinefn = lookup_agg_function(aggcombinefnName, 2, fnArgs,
414+
variadicArgType, &combineType);
415+
416+
/* Ensure the return type matches the aggregates trans type */
417+
if (combineType != aggTransType)
418+
ereport(ERROR,
419+
(errcode(ERRCODE_DATATYPE_MISMATCH),
420+
errmsg("return type of combine function %s is not %s",
421+
NameListToString(aggcombinefnName),
422+
format_type_be(aggTransType))));
423+
}
424+
399425
/*
400426
* If finaltype (i.e. aggregate return type) is polymorphic, inputs must
401427
* be polymorphic also, else parser will fail to deduce result type.
@@ -567,6 +593,7 @@ AggregateCreate(const char *aggName,
567593
values[Anum_pg_aggregate_aggnumdirectargs - 1] = Int16GetDatum(numDirectArgs);
568594
values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn);
569595
values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn);
596+
values[Anum_pg_aggregate_aggcombinefn - 1] = ObjectIdGetDatum(combinefn);
570597
values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn);
571598
values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn);
572599
values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
@@ -618,6 +645,15 @@ AggregateCreate(const char *aggName,
618645
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
619646
}
620647

648+
/* Depends on combine function, if any */
649+
if (OidIsValid(combinefn))
650+
{
651+
referenced.classId = ProcedureRelationId;
652+
referenced.objectId = combinefn;
653+
referenced.objectSubId = 0;
654+
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
655+
}
656+
621657
/* Depends on forward transition function, if any */
622658
if (OidIsValid(mtransfn))
623659
{
@@ -659,7 +695,7 @@ AggregateCreate(const char *aggName,
659695

660696
/*
661697
* lookup_agg_function
662-
* common code for finding transfn, invtransfn and finalfn
698+
* common code for finding transfn, invtransfn, finalfn, and combinefn
663699
*
664700
* Returns OID of function, and stores its return type into *rettype
665701
*

src/backend/commands/aggregatecmds.c

+4
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
6161
char aggKind = AGGKIND_NORMAL;
6262
List *transfuncName = NIL;
6363
List *finalfuncName = NIL;
64+
List *combinefuncName = NIL;
6465
List *mtransfuncName = NIL;
6566
List *minvtransfuncName = NIL;
6667
List *mfinalfuncName = NIL;
@@ -124,6 +125,8 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
124125
transfuncName = defGetQualifiedName(defel);
125126
else if (pg_strcasecmp(defel->defname, "finalfunc") == 0)
126127
finalfuncName = defGetQualifiedName(defel);
128+
else if (pg_strcasecmp(defel->defname, "combinefunc") == 0)
129+
combinefuncName = defGetQualifiedName(defel);
127130
else if (pg_strcasecmp(defel->defname, "msfunc") == 0)
128131
mtransfuncName = defGetQualifiedName(defel);
129132
else if (pg_strcasecmp(defel->defname, "minvfunc") == 0)
@@ -383,6 +386,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
383386
variadicArgType,
384387
transfuncName, /* step function name */
385388
finalfuncName, /* final function name */
389+
combinefuncName, /* combine function name */
386390
mtransfuncName, /* fwd trans function name */
387391
minvtransfuncName, /* inv trans function name */
388392
mfinalfuncName, /* final function name */

src/backend/commands/explain.c

+29-17
Original file line numberDiff line numberDiff line change
@@ -909,24 +909,36 @@ ExplainNode(PlanState *planstate, List *ancestors,
909909
break;
910910
case T_Agg:
911911
sname = "Aggregate";
912-
switch (((Agg *) plan)->aggstrategy)
913912
{
914-
case AGG_PLAIN:
915-
pname = "Aggregate";
916-
strategy = "Plain";
917-
break;
918-
case AGG_SORTED:
919-
pname = "GroupAggregate";
920-
strategy = "Sorted";
921-
break;
922-
case AGG_HASHED:
923-
pname = "HashAggregate";
924-
strategy = "Hashed";
925-
break;
926-
default:
927-
pname = "Aggregate ???";
928-
strategy = "???";
929-
break;
913+
Agg *agg = (Agg *) plan;
914+
915+
if (agg->finalizeAggs == false)
916+
operation = "Partial";
917+
else if (agg->combineStates == true)
918+
operation = "Finalize";
919+
920+
switch (agg->aggstrategy)
921+
{
922+
case AGG_PLAIN:
923+
pname = "Aggregate";
924+
strategy = "Plain";
925+
break;
926+
case AGG_SORTED:
927+
pname = "GroupAggregate";
928+
strategy = "Sorted";
929+
break;
930+
case AGG_HASHED:
931+
pname = "HashAggregate";
932+
strategy = "Hashed";
933+
break;
934+
default:
935+
pname = "Aggregate ???";
936+
strategy = "???";
937+
break;
938+
}
939+
940+
if (operation != NULL)
941+
pname = psprintf("%s %s", operation, pname);
930942
}
931943
break;
932944
case T_WindowAgg:

0 commit comments

Comments
 (0)