Skip to content

Commit 38172d1

Browse files
committed
Injection points for hash aggregation.
Requires adding a guard against shift-by-32. Previously, that was impossible because the number of partitions was always greater than 1, but a new injection point can force the number of partitions to 1. Discussion: https://postgr.es/m/ff4e59305e5d689e03cd256a736348d3e7958f8f.camel@j-davis.com
1 parent 052026c commit 38172d1

File tree

5 files changed

+141
-4
lines changed

5 files changed

+141
-4
lines changed

src/backend/executor/nodeAgg.c

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@
269269
#include "utils/datum.h"
270270
#include "utils/dynahash.h"
271271
#include "utils/expandeddatum.h"
272+
#include "utils/injection_point.h"
272273
#include "utils/logtape.h"
273274
#include "utils/lsyscache.h"
274275
#include "utils/memutils.h"
@@ -1489,6 +1490,14 @@ build_hash_tables(AggState *aggstate)
14891490
perhash->aggnode->numGroups,
14901491
memory);
14911492

1493+
#ifdef USE_INJECTION_POINTS
1494+
if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-oversize-table"))
1495+
{
1496+
nbuckets = memory / sizeof(TupleHashEntryData);
1497+
INJECTION_POINT_CACHED("hash-aggregate-oversize-table");
1498+
}
1499+
#endif
1500+
14921501
build_hash_table(aggstate, setno, nbuckets);
14931502
}
14941503

@@ -1860,6 +1869,18 @@ hash_agg_check_limits(AggState *aggstate)
18601869
true);
18611870
Size hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
18621871
true);
1872+
bool do_spill = false;
1873+
1874+
#ifdef USE_INJECTION_POINTS
1875+
if (ngroups >= 1000)
1876+
{
1877+
if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000"))
1878+
{
1879+
do_spill = true;
1880+
INJECTION_POINT_CACHED("hash-aggregate-spill-1000");
1881+
}
1882+
}
1883+
#endif
18631884

18641885
/*
18651886
* Don't spill unless there's at least one group in the hash table so we
@@ -1869,8 +1890,11 @@ hash_agg_check_limits(AggState *aggstate)
18691890
(meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
18701891
ngroups > aggstate->hash_ngroups_limit))
18711892
{
1872-
hash_agg_enter_spill_mode(aggstate);
1893+
do_spill = true;
18731894
}
1895+
1896+
if (do_spill)
1897+
hash_agg_enter_spill_mode(aggstate);
18741898
}
18751899

18761900
/*
@@ -1881,6 +1905,7 @@ hash_agg_check_limits(AggState *aggstate)
18811905
static void
18821906
hash_agg_enter_spill_mode(AggState *aggstate)
18831907
{
1908+
INJECTION_POINT("hash-aggregate-enter-spill-mode");
18841909
aggstate->hash_spill_mode = true;
18851910
hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
18861911

@@ -2652,6 +2677,7 @@ agg_refill_hash_table(AggState *aggstate)
26522677
*/
26532678
hashagg_recompile_expressions(aggstate, true, true);
26542679

2680+
INJECTION_POINT("hash-aggregate-process-batch");
26552681
for (;;)
26562682
{
26572683
TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
@@ -2900,6 +2926,15 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
29002926
npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
29012927
used_bits, &partition_bits);
29022928

2929+
#ifdef USE_INJECTION_POINTS
2930+
if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition"))
2931+
{
2932+
npartitions = 1;
2933+
partition_bits = 0;
2934+
INJECTION_POINT_CACHED("hash-aggregate-single-partition");
2935+
}
2936+
#endif
2937+
29032938
spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
29042939
spill->ntuples = palloc0(sizeof(int64) * npartitions);
29052940
spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
@@ -2908,7 +2943,10 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
29082943
spill->partitions[i] = LogicalTapeCreate(tapeset);
29092944

29102945
spill->shift = 32 - used_bits - partition_bits;
2911-
spill->mask = (npartitions - 1) << spill->shift;
2946+
if (spill->shift < 32)
2947+
spill->mask = (npartitions - 1) << spill->shift;
2948+
else
2949+
spill->mask = 0;
29122950
spill->npartitions = npartitions;
29132951

29142952
for (int i = 0; i < npartitions; i++)
@@ -2957,7 +2995,11 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
29572995

29582996
tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
29592997

2960-
partition = (hash & spill->mask) >> spill->shift;
2998+
if (spill->shift < 32)
2999+
partition = (hash & spill->mask) >> spill->shift;
3000+
else
3001+
partition = 0;
3002+
29613003
spill->ntuples[partition]++;
29623004

29633005
/*

src/test/modules/injection_points/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ EXTENSION = injection_points
1111
DATA = injection_points--1.0.sql
1212
PGFILEDESC = "injection_points - facility for injection points"
1313

14-
REGRESS = injection_points reindex_conc
14+
REGRESS = injection_points hashagg reindex_conc
1515
REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
1616

1717
ISOLATION = basic inplace syscache-update-pruned
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
-- Test for hash aggregation
2+
CREATE EXTENSION injection_points;
3+
SELECT injection_points_set_local();
4+
injection_points_set_local
5+
----------------------------
6+
7+
(1 row)
8+
9+
SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
10+
injection_points_attach
11+
-------------------------
12+
13+
(1 row)
14+
15+
SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
16+
injection_points_attach
17+
-------------------------
18+
19+
(1 row)
20+
21+
-- force partition fan-out to 1
22+
SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
23+
injection_points_attach
24+
-------------------------
25+
26+
(1 row)
27+
28+
-- force spilling after 1000 groups
29+
SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
30+
injection_points_attach
31+
-------------------------
32+
33+
(1 row)
34+
35+
CREATE TABLE hashagg_ij(x INTEGER);
36+
INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
37+
SET max_parallel_workers=0;
38+
SET max_parallel_workers_per_gather=0;
39+
SET enable_sort=FALSE;
40+
SET work_mem='4MB';
41+
SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
42+
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
43+
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
44+
NOTICE: notice triggered for injection point hash-aggregate-single-partition
45+
NOTICE: notice triggered for injection point hash-aggregate-process-batch
46+
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
47+
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
48+
NOTICE: notice triggered for injection point hash-aggregate-single-partition
49+
NOTICE: notice triggered for injection point hash-aggregate-process-batch
50+
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
51+
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
52+
NOTICE: notice triggered for injection point hash-aggregate-single-partition
53+
NOTICE: notice triggered for injection point hash-aggregate-process-batch
54+
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
55+
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
56+
NOTICE: notice triggered for injection point hash-aggregate-single-partition
57+
NOTICE: notice triggered for injection point hash-aggregate-process-batch
58+
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
59+
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
60+
NOTICE: notice triggered for injection point hash-aggregate-single-partition
61+
NOTICE: notice triggered for injection point hash-aggregate-process-batch
62+
count
63+
-------
64+
5100
65+
(1 row)
66+
67+
DROP TABLE hashagg_ij;
68+
DROP EXTENSION injection_points;

src/test/modules/injection_points/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ tests += {
3535
'regress': {
3636
'sql': [
3737
'injection_points',
38+
'hashagg',
3839
'reindex_conc',
3940
],
4041
'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'],
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-- Test for hash aggregation
2+
CREATE EXTENSION injection_points;
3+
4+
SELECT injection_points_set_local();
5+
6+
SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
7+
SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
8+
9+
-- force partition fan-out to 1
10+
SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
11+
12+
-- force spilling after 1000 groups
13+
SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
14+
15+
CREATE TABLE hashagg_ij(x INTEGER);
16+
INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
17+
18+
SET max_parallel_workers=0;
19+
SET max_parallel_workers_per_gather=0;
20+
SET enable_sort=FALSE;
21+
SET work_mem='4MB';
22+
23+
SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
24+
25+
DROP TABLE hashagg_ij;
26+
DROP EXTENSION injection_points;

0 commit comments

Comments
 (0)