Skip to content

Commit 48a6592

Browse files
committed
Improve speed of aggregates that use array_append as transition function.
In the previous coding, if an aggregate's transition function returned an expanded array, nodeAgg.c and nodeWindowAgg.c would always copy it and thus force it into the flat representation. This led to ping-ponging between flat and expanded formats, which costs a lot. For an aggregate using array_append as transition function, I measured about a 15X slowdown compared to the pre-9.5 code, when working on simple int[] arrays. Of course, the old code was already O(N^2) in this usage due to copying flat arrays all the time, but it wasn't quite this inefficient. To fix, teach nodeAgg.c and nodeWindowAgg.c to allow expanded transition values without copying, so long as the transition function takes care to return the transition value already properly parented under the aggcontext. That puts a bit of extra responsibility on the transition function, but doing it this way allows us to not need any extra logic in the fast path of advance_transition_function (ie, with a pass-by-value transition value, or with a modified-in-place pass-by-reference value). We already know that that's a hot spot so I'm loath to add any cycles at all there. Also, while only array_append currently knows how to follow this convention, this solution allows other transition functions to opt-in without needing to have a whitelist in the core aggregation code. (The reason we would need a whitelist is that currently, if you pass a R/W expanded-object pointer to an arbitrary function, it's allowed to do anything with it including deleting it; that breaks the core agg code's assumption that it should free discarded values. Returning a value under aggcontext is the transition function's signal that it knows it is an aggregate transition function and will play nice. Possibly the API rules for expanded objects should be refined, but that would not be a back-patchable change.) With this fix, an aggregate using array_append is no longer O(N^2), so it's much faster than pre-9.5 code rather than much slower. It's still a bit slower than the bespoke infrastructure for array_agg, but the differential seems to be only about 10%-20% rather than orders of magnitude. Discussion: <6315.1477677885@sss.pgh.pa.us>
1 parent 4a43a62 commit 48a6592

File tree

5 files changed

+142
-33
lines changed

5 files changed

+142
-33
lines changed

doc/src/sgml/xaggr.sgml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,14 +626,28 @@ if (AggCheckCallContext(fcinfo, NULL))
626626
function, the first input
627627
must be a temporary state value and can therefore safely be modified
628628
in-place rather than allocating a new copy.
629-
See <literal>int8inc()</> for an example.
629+
See <function>int8inc()</> for an example.
630630
(This is the <emphasis>only</>
631631
case where it is safe for a function to modify a pass-by-reference input.
632632
In particular, final functions for normal aggregates must not
633633
modify their inputs in any case, because in some cases they will be
634634
re-executed on the same final state value.)
635635
</para>
636636

637+
<para>
638+
The second argument of <function>AggCheckCallContext</> can be used to
639+
retrieve the memory context in which aggregate state values are being kept.
640+
This is useful for transition functions that wish to use <quote>expanded</>
641+
objects (see <xref linkend="xtypes-toast">) as their state values.
642+
On first call, the transition function should return an expanded object
643+
whose memory context is a child of the aggregate state context, and then
644+
keep returning the same expanded object on subsequent calls. See
645+
<function>array_append()</> for an example. (<function>array_append()</>
646+
is not the transition function of any built-in aggregate, but it is written
647+
to behave efficiently when used as transition function of a custom
648+
aggregate.)
649+
</para>
650+
637651
<para>
638652
Another support routine available to aggregate functions written in C
639653
is <function>AggGetAggref</>, which returns the <literal>Aggref</>

src/backend/executor/nodeAgg.c

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,13 @@
9191
* transition value or a previous function result, and in either case its
9292
* value need not be preserved. See int8inc() for an example. Notice that
9393
* advance_transition_function() is coded to avoid a data copy step when
94-
* the previous transition value pointer is returned. Also, some
95-
* transition functions want to store working state in addition to the
96-
* nominal transition value; they can use the memory context returned by
97-
* AggCheckCallContext() to do that.
94+
* the previous transition value pointer is returned. It is also possible
95+
* to avoid repeated data copying when the transition value is an expanded
96+
* object: to do that, the transition function must take care to return
97+
* an expanded object that is in a child context of the memory context
98+
* returned by AggCheckCallContext(). Also, some transition functions want
99+
* to store working state in addition to the nominal transition value; they
100+
* can use the memory context returned by AggCheckCallContext() to do that.
98101
*
99102
* Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
100103
* AggState is available as context in earlier releases (back to 8.1),
@@ -805,21 +808,36 @@ advance_transition_function(AggState *aggstate,
805808

806809
/*
807810
* If pass-by-ref datatype, must copy the new value into aggcontext and
808-
* pfree the prior transValue. But if transfn returned a pointer to its
809-
* first input, we don't need to do anything.
811+
* free the prior transValue. But if transfn returned a pointer to its
812+
* first input, we don't need to do anything. Also, if transfn returned a
813+
* pointer to a R/W expanded object that is already a child of the
814+
* aggcontext, assume we can adopt that value without copying it.
810815
*/
811816
if (!pertrans->transtypeByVal &&
812817
DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
813818
{
814819
if (!fcinfo->isnull)
815820
{
816821
MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
817-
newVal = datumCopy(newVal,
818-
pertrans->transtypeByVal,
819-
pertrans->transtypeLen);
822+
if (DatumIsReadWriteExpandedObject(newVal,
823+
false,
824+
pertrans->transtypeLen) &&
825+
MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
826+
/* do nothing */ ;
827+
else
828+
newVal = datumCopy(newVal,
829+
pertrans->transtypeByVal,
830+
pertrans->transtypeLen);
820831
}
821832
if (!pergroupstate->transValueIsNull)
822-
pfree(DatumGetPointer(pergroupstate->transValue));
833+
{
834+
if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
835+
false,
836+
pertrans->transtypeLen))
837+
DeleteExpandedObject(pergroupstate->transValue);
838+
else
839+
pfree(DatumGetPointer(pergroupstate->transValue));
840+
}
823841
}
824842

825843
pergroupstate->transValue = newVal;
@@ -1067,21 +1085,37 @@ advance_combine_function(AggState *aggstate,
10671085

10681086
/*
10691087
* If pass-by-ref datatype, must copy the new value into aggcontext and
1070-
* pfree the prior transValue. But if the combine function returned a
1071-
* pointer to its first input, we don't need to do anything.
1088+
* free the prior transValue. But if the combine function returned a
1089+
* pointer to its first input, we don't need to do anything. Also, if the
1090+
* combine function returned a pointer to a R/W expanded object that is
1091+
* already a child of the aggcontext, assume we can adopt that value
1092+
* without copying it.
10721093
*/
10731094
if (!pertrans->transtypeByVal &&
10741095
DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
10751096
{
10761097
if (!fcinfo->isnull)
10771098
{
10781099
MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
1079-
newVal = datumCopy(newVal,
1080-
pertrans->transtypeByVal,
1081-
pertrans->transtypeLen);
1100+
if (DatumIsReadWriteExpandedObject(newVal,
1101+
false,
1102+
pertrans->transtypeLen) &&
1103+
MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
1104+
/* do nothing */ ;
1105+
else
1106+
newVal = datumCopy(newVal,
1107+
pertrans->transtypeByVal,
1108+
pertrans->transtypeLen);
10821109
}
10831110
if (!pergroupstate->transValueIsNull)
1084-
pfree(DatumGetPointer(pergroupstate->transValue));
1111+
{
1112+
if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
1113+
false,
1114+
pertrans->transtypeLen))
1115+
DeleteExpandedObject(pergroupstate->transValue);
1116+
else
1117+
pfree(DatumGetPointer(pergroupstate->transValue));
1118+
}
10851119
}
10861120

10871121
pergroupstate->transValue = newVal;
@@ -1347,7 +1381,9 @@ finalize_aggregate(AggState *aggstate,
13471381
(void *) aggstate, NULL);
13481382

13491383
/* Fill in the transition state value */
1350-
fcinfo.arg[0] = pergroupstate->transValue;
1384+
fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1385+
pergroupstate->transValueIsNull,
1386+
pertrans->transtypeLen);
13511387
fcinfo.argnull[0] = pergroupstate->transValueIsNull;
13521388
anynull |= pergroupstate->transValueIsNull;
13531389

@@ -1374,6 +1410,7 @@ finalize_aggregate(AggState *aggstate,
13741410
}
13751411
else
13761412
{
1413+
/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
13771414
*resultVal = pergroupstate->transValue;
13781415
*resultIsNull = pergroupstate->transValueIsNull;
13791416
}
@@ -1424,7 +1461,9 @@ finalize_partialaggregate(AggState *aggstate,
14241461
{
14251462
FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;
14261463

1427-
fcinfo->arg[0] = pergroupstate->transValue;
1464+
fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1465+
pergroupstate->transValueIsNull,
1466+
pertrans->transtypeLen);
14281467
fcinfo->argnull[0] = pergroupstate->transValueIsNull;
14291468

14301469
*resultVal = FunctionCallInvoke(fcinfo);
@@ -1433,6 +1472,7 @@ finalize_partialaggregate(AggState *aggstate,
14331472
}
14341473
else
14351474
{
1475+
/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
14361476
*resultVal = pergroupstate->transValue;
14371477
*resultIsNull = pergroupstate->transValueIsNull;
14381478
}

src/backend/executor/nodeWindowAgg.c

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -362,21 +362,36 @@ advance_windowaggregate(WindowAggState *winstate,
362362

363363
/*
364364
* If pass-by-ref datatype, must copy the new value into aggcontext and
365-
* pfree the prior transValue. But if transfn returned a pointer to its
366-
* first input, we don't need to do anything.
365+
* free the prior transValue. But if transfn returned a pointer to its
366+
* first input, we don't need to do anything. Also, if transfn returned a
367+
* pointer to a R/W expanded object that is already a child of the
368+
* aggcontext, assume we can adopt that value without copying it.
367369
*/
368370
if (!peraggstate->transtypeByVal &&
369371
DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
370372
{
371373
if (!fcinfo->isnull)
372374
{
373375
MemoryContextSwitchTo(peraggstate->aggcontext);
374-
newVal = datumCopy(newVal,
375-
peraggstate->transtypeByVal,
376-
peraggstate->transtypeLen);
376+
if (DatumIsReadWriteExpandedObject(newVal,
377+
false,
378+
peraggstate->transtypeLen) &&
379+
MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
380+
/* do nothing */ ;
381+
else
382+
newVal = datumCopy(newVal,
383+
peraggstate->transtypeByVal,
384+
peraggstate->transtypeLen);
377385
}
378386
if (!peraggstate->transValueIsNull)
379-
pfree(DatumGetPointer(peraggstate->transValue));
387+
{
388+
if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
389+
false,
390+
peraggstate->transtypeLen))
391+
DeleteExpandedObject(peraggstate->transValue);
392+
else
393+
pfree(DatumGetPointer(peraggstate->transValue));
394+
}
380395
}
381396

382397
MemoryContextSwitchTo(oldContext);
@@ -513,8 +528,10 @@ advance_windowaggregate_base(WindowAggState *winstate,
513528

514529
/*
515530
* If pass-by-ref datatype, must copy the new value into aggcontext and
516-
* pfree the prior transValue. But if invtransfn returned a pointer to
517-
* its first input, we don't need to do anything.
531+
* free the prior transValue. But if invtransfn returned a pointer to its
532+
* first input, we don't need to do anything. Also, if invtransfn
533+
* returned a pointer to a R/W expanded object that is already a child of
534+
* the aggcontext, assume we can adopt that value without copying it.
518535
*
519536
* Note: the checks for null values here will never fire, but it seems
520537
* best to have this stanza look just like advance_windowaggregate.
@@ -525,12 +542,25 @@ advance_windowaggregate_base(WindowAggState *winstate,
525542
if (!fcinfo->isnull)
526543
{
527544
MemoryContextSwitchTo(peraggstate->aggcontext);
528-
newVal = datumCopy(newVal,
529-
peraggstate->transtypeByVal,
530-
peraggstate->transtypeLen);
545+
if (DatumIsReadWriteExpandedObject(newVal,
546+
false,
547+
peraggstate->transtypeLen) &&
548+
MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
549+
/* do nothing */ ;
550+
else
551+
newVal = datumCopy(newVal,
552+
peraggstate->transtypeByVal,
553+
peraggstate->transtypeLen);
531554
}
532555
if (!peraggstate->transValueIsNull)
533-
pfree(DatumGetPointer(peraggstate->transValue));
556+
{
557+
if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
558+
false,
559+
peraggstate->transtypeLen))
560+
DeleteExpandedObject(peraggstate->transValue);
561+
else
562+
pfree(DatumGetPointer(peraggstate->transValue));
563+
}
534564
}
535565

536566
MemoryContextSwitchTo(oldContext);
@@ -568,7 +598,9 @@ finalize_windowaggregate(WindowAggState *winstate,
568598
numFinalArgs,
569599
perfuncstate->winCollation,
570600
(void *) winstate, NULL);
571-
fcinfo.arg[0] = peraggstate->transValue;
601+
fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
602+
peraggstate->transValueIsNull,
603+
peraggstate->transtypeLen);
572604
fcinfo.argnull[0] = peraggstate->transValueIsNull;
573605
anynull = peraggstate->transValueIsNull;
574606

@@ -596,6 +628,7 @@ finalize_windowaggregate(WindowAggState *winstate,
596628
}
597629
else
598630
{
631+
/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
599632
*result = peraggstate->transValue;
600633
*isnull = peraggstate->transValueIsNull;
601634
}

src/backend/optimizer/util/clauses.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,16 @@ get_agg_clause_costs_walker(Node *node, get_agg_clause_costs_context *context)
646646
/* Use average width if aggregate definition gave one */
647647
if (aggtransspace > 0)
648648
avgwidth = aggtransspace;
649+
else if (aggtransfn == F_ARRAY_APPEND)
650+
{
651+
/*
652+
* If the transition function is array_append(), it'll use an
653+
* expanded array as transvalue, which will occupy at least
654+
* ALLOCSET_SMALL_INITSIZE and possibly more. Use that as the
655+
* estimate for lack of a better idea.
656+
*/
657+
avgwidth = ALLOCSET_SMALL_INITSIZE;
658+
}
649659
else
650660
{
651661
/*

src/backend/utils/adt/array_userfuncs.c

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,18 @@ static Datum array_position_common(FunctionCallInfo fcinfo);
3232
* Caution: if the input is a read/write pointer, this returns the input
3333
* argument; so callers must be sure that their changes are "safe", that is
3434
* they cannot leave the array in a corrupt state.
35+
*
36+
* If we're being called as an aggregate function, make sure any newly-made
37+
* expanded array is allocated in the aggregate state context, so as to save
38+
* copying operations.
3539
*/
3640
static ExpandedArrayHeader *
3741
fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
3842
{
3943
ExpandedArrayHeader *eah;
4044
Oid element_type;
4145
ArrayMetaState *my_extra;
46+
MemoryContext resultcxt;
4247

4348
/* If first time through, create datatype cache struct */
4449
my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
@@ -51,10 +56,17 @@ fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
5156
fcinfo->flinfo->fn_extra = my_extra;
5257
}
5358

59+
/* Figure out which context we want the result in */
60+
if (!AggCheckCallContext(fcinfo, &resultcxt))
61+
resultcxt = CurrentMemoryContext;
62+
5463
/* Now collect the array value */
5564
if (!PG_ARGISNULL(argno))
5665
{
66+
MemoryContext oldcxt = MemoryContextSwitchTo(resultcxt);
67+
5768
eah = PG_GETARG_EXPANDED_ARRAYX(argno, my_extra);
69+
MemoryContextSwitchTo(oldcxt);
5870
}
5971
else
6072
{
@@ -72,7 +84,7 @@ fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
7284
errmsg("input data type is not an array")));
7385

7486
eah = construct_empty_expanded_array(element_type,
75-
CurrentMemoryContext,
87+
resultcxt,
7688
my_extra);
7789
}
7890

0 commit comments

Comments
 (0)