245
245
#include "catalog/pg_aggregate.h"
246
246
#include "catalog/pg_proc.h"
247
247
#include "catalog/pg_type.h"
248
+ #include "common/hashfn.h"
248
249
#include "executor/execExpr.h"
249
250
#include "executor/executor.h"
250
251
#include "executor/nodeAgg.h"
252
+ #include "lib/hyperloglog.h"
251
253
#include "miscadmin.h"
252
254
#include "nodes/makefuncs.h"
253
255
#include "nodes/nodeFuncs.h"
295
297
#define HASHAGG_READ_BUFFER_SIZE BLCKSZ
296
298
#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
297
299
300
+ /*
301
+ * HyperLogLog is used for estimating the cardinality of the spilled tuples in
302
+ * a given partition. 5 bits corresponds to a size of about 32 bytes and a
303
+ * worst-case error of around 18%. That's effective enough to choose a
304
+ * reasonable number of partitions when recursing.
305
+ */
306
+ #define HASHAGG_HLL_BIT_WIDTH 5
307
+
298
308
/*
299
309
* Estimate chunk overhead as a constant 16 bytes. XXX: should this be
300
310
* improved?
@@ -339,6 +349,7 @@ typedef struct HashAggSpill
339
349
int64 * ntuples ; /* number of tuples in each partition */
340
350
uint32 mask ; /* mask to find partition from hash value */
341
351
int shift ; /* after masking, shift by this amount */
352
+ hyperLogLogState * hll_card ; /* cardinality estimate for contents */
342
353
} HashAggSpill ;
343
354
344
355
/*
@@ -357,6 +368,7 @@ typedef struct HashAggBatch
357
368
LogicalTapeSet * tapeset ; /* borrowed reference to tape set */
358
369
int input_tapenum ; /* input partition tape */
359
370
int64 input_tuples ; /* number of tuples in this batch */
371
+ double input_card ; /* estimated group cardinality */
360
372
} HashAggBatch ;
361
373
362
374
/* used to find referenced colnos */
@@ -411,7 +423,7 @@ static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
411
423
static long hash_choose_num_buckets (double hashentrysize ,
412
424
long estimated_nbuckets ,
413
425
Size memory );
414
- static int hash_choose_num_partitions (uint64 input_groups ,
426
+ static int hash_choose_num_partitions (double input_groups ,
415
427
double hashentrysize ,
416
428
int used_bits ,
417
429
int * log2_npartittions );
@@ -432,10 +444,11 @@ static void hashagg_finish_initial_spills(AggState *aggstate);
432
444
static void hashagg_reset_spill_state (AggState * aggstate );
433
445
static HashAggBatch * hashagg_batch_new (LogicalTapeSet * tapeset ,
434
446
int input_tapenum , int setno ,
435
- int64 input_tuples , int used_bits );
447
+ int64 input_tuples , double input_card ,
448
+ int used_bits );
436
449
static MinimalTuple hashagg_batch_read (HashAggBatch * batch , uint32 * hashp );
437
450
static void hashagg_spill_init (HashAggSpill * spill , HashTapeInfo * tapeinfo ,
438
- int used_bits , uint64 input_tuples ,
451
+ int used_bits , double input_groups ,
439
452
double hashentrysize );
440
453
static Size hashagg_spill_tuple (AggState * aggstate , HashAggSpill * spill ,
441
454
TupleTableSlot * slot , uint32 hash );
@@ -1777,7 +1790,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
1777
1790
* substantially larger than the initial value.
1778
1791
*/
1779
1792
void
1780
- hash_agg_set_limits (double hashentrysize , uint64 input_groups , int used_bits ,
1793
+ hash_agg_set_limits (double hashentrysize , double input_groups , int used_bits ,
1781
1794
Size * mem_limit , uint64 * ngroups_limit ,
1782
1795
int * num_partitions )
1783
1796
{
@@ -1969,7 +1982,7 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
1969
1982
* *log2_npartitions to the log2() of the number of partitions.
1970
1983
*/
1971
1984
static int
1972
- hash_choose_num_partitions (uint64 input_groups , double hashentrysize ,
1985
+ hash_choose_num_partitions (double input_groups , double hashentrysize ,
1973
1986
int used_bits , int * log2_npartitions )
1974
1987
{
1975
1988
Size mem_wanted ;
@@ -2574,7 +2587,6 @@ agg_refill_hash_table(AggState *aggstate)
2574
2587
AggStatePerHash perhash ;
2575
2588
HashAggSpill spill ;
2576
2589
HashTapeInfo * tapeinfo = aggstate -> hash_tapeinfo ;
2577
- uint64 ngroups_estimate ;
2578
2590
bool spill_initialized = false;
2579
2591
2580
2592
if (aggstate -> hash_batches == NIL )
@@ -2583,16 +2595,7 @@ agg_refill_hash_table(AggState *aggstate)
2583
2595
batch = linitial (aggstate -> hash_batches );
2584
2596
aggstate -> hash_batches = list_delete_first (aggstate -> hash_batches );
2585
2597
2586
- /*
2587
- * Estimate the number of groups for this batch as the total number of
2588
- * tuples in its input file. Although that's a worst case, it's not bad
2589
- * here for two reasons: (1) overestimating is better than
2590
- * underestimating; and (2) we've already scanned the relation once, so
2591
- * it's likely that we've already finalized many of the common values.
2592
- */
2593
- ngroups_estimate = batch -> input_tuples ;
2594
-
2595
- hash_agg_set_limits (aggstate -> hashentrysize , ngroups_estimate ,
2598
+ hash_agg_set_limits (aggstate -> hashentrysize , batch -> input_card ,
2596
2599
batch -> used_bits , & aggstate -> hash_mem_limit ,
2597
2600
& aggstate -> hash_ngroups_limit , NULL );
2598
2601
@@ -2678,7 +2681,7 @@ agg_refill_hash_table(AggState *aggstate)
2678
2681
*/
2679
2682
spill_initialized = true;
2680
2683
hashagg_spill_init (& spill , tapeinfo , batch -> used_bits ,
2681
- ngroups_estimate , aggstate -> hashentrysize );
2684
+ batch -> input_card , aggstate -> hashentrysize );
2682
2685
}
2683
2686
/* no memory for a new group, spill */
2684
2687
hashagg_spill_tuple (aggstate , & spill , spillslot , hash );
@@ -2936,7 +2939,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
2936
2939
*/
2937
2940
static void
2938
2941
hashagg_spill_init (HashAggSpill * spill , HashTapeInfo * tapeinfo , int used_bits ,
2939
- uint64 input_groups , double hashentrysize )
2942
+ double input_groups , double hashentrysize )
2940
2943
{
2941
2944
int npartitions ;
2942
2945
int partition_bits ;
@@ -2946,13 +2949,17 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
2946
2949
2947
2950
spill -> partitions = palloc0 (sizeof (int ) * npartitions );
2948
2951
spill -> ntuples = palloc0 (sizeof (int64 ) * npartitions );
2952
+ spill -> hll_card = palloc0 (sizeof (hyperLogLogState ) * npartitions );
2949
2953
2950
2954
hashagg_tapeinfo_assign (tapeinfo , spill -> partitions , npartitions );
2951
2955
2952
2956
spill -> tapeset = tapeinfo -> tapeset ;
2953
2957
spill -> shift = 32 - used_bits - partition_bits ;
2954
2958
spill -> mask = (npartitions - 1 ) << spill -> shift ;
2955
2959
spill -> npartitions = npartitions ;
2960
+
2961
+ for (int i = 0 ; i < npartitions ; i ++ )
2962
+ initHyperLogLog (& spill -> hll_card [i ], HASHAGG_HLL_BIT_WIDTH );
2956
2963
}
2957
2964
2958
2965
/*
@@ -3001,6 +3008,13 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
3001
3008
partition = (hash & spill -> mask ) >> spill -> shift ;
3002
3009
spill -> ntuples [partition ]++ ;
3003
3010
3011
+ /*
3012
+ * All hash values destined for a given partition have some bits in
3013
+ * common, which causes bad HLL cardinality estimates. Hash the hash to
3014
+ * get a more uniform distribution.
3015
+ */
3016
+ addHyperLogLog (& spill -> hll_card [partition ], hash_bytes_uint32 (hash ));
3017
+
3004
3018
tapenum = spill -> partitions [partition ];
3005
3019
3006
3020
LogicalTapeWrite (tapeset , tapenum , (void * ) & hash , sizeof (uint32 ));
@@ -3023,7 +3037,7 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
3023
3037
*/
3024
3038
static HashAggBatch *
3025
3039
hashagg_batch_new (LogicalTapeSet * tapeset , int tapenum , int setno ,
3026
- int64 input_tuples , int used_bits )
3040
+ int64 input_tuples , double input_card , int used_bits )
3027
3041
{
3028
3042
HashAggBatch * batch = palloc0 (sizeof (HashAggBatch ));
3029
3043
@@ -3032,6 +3046,7 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
3032
3046
batch -> tapeset = tapeset ;
3033
3047
batch -> input_tapenum = tapenum ;
3034
3048
batch -> input_tuples = input_tuples ;
3049
+ batch -> input_card = input_card ;
3035
3050
3036
3051
return batch ;
3037
3052
}
@@ -3135,21 +3150,26 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
3135
3150
3136
3151
for (i = 0 ; i < spill -> npartitions ; i ++ )
3137
3152
{
3138
- int tapenum = spill -> partitions [i ];
3139
- HashAggBatch * new_batch ;
3153
+ int tapenum = spill -> partitions [i ];
3154
+ HashAggBatch * new_batch ;
3155
+ double cardinality ;
3140
3156
3141
3157
/* if the partition is empty, don't create a new batch of work */
3142
3158
if (spill -> ntuples [i ] == 0 )
3143
3159
continue ;
3144
3160
3161
+ cardinality = estimateHyperLogLog (& spill -> hll_card [i ]);
3162
+ freeHyperLogLog (& spill -> hll_card [i ]);
3163
+
3145
3164
new_batch = hashagg_batch_new (aggstate -> hash_tapeinfo -> tapeset ,
3146
3165
tapenum , setno , spill -> ntuples [i ],
3147
- used_bits );
3166
+ cardinality , used_bits );
3148
3167
aggstate -> hash_batches = lcons (new_batch , aggstate -> hash_batches );
3149
3168
aggstate -> hash_batches_used ++ ;
3150
3169
}
3151
3170
3152
3171
pfree (spill -> ntuples );
3172
+ pfree (spill -> hll_card );
3153
3173
pfree (spill -> partitions );
3154
3174
}
3155
3175
0 commit comments