68
68
* memory gets actually freed.
69
69
*
70
70
* We use a max-heap with transaction size as the key to efficiently find
71
- * the largest transaction. While the max-heap is empty, we don't update
72
- * the max-heap when updating the memory counter. Therefore, we can get
73
- * the largest transaction in O(N) time, where N is the number of
74
- * transactions including top-level transactions and subtransactions.
75
- *
76
- * We build the max-heap just before selecting the largest transactions
77
- * if the number of transactions being decoded is higher than the threshold,
78
- * MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also
79
- * update the max-heap when updating the memory counter. The intention is
80
- * to efficiently find the largest transaction in O(1) time instead of
81
- * incurring the cost of memory counter updates (O(log N)). Once the number
82
- * of transactions got lower than the threshold, we reset the max-heap
83
- * (refer to ReorderBufferMaybeResetMaxHeap() for details).
71
+ * the largest transaction. We update the max-heap whenever the memory
72
+ * counter is updated; however transactions with size 0 are not stored in
73
+ * the heap, because they have no changes to evict.
84
74
*
85
75
* We still rely on max_changes_in_memory when loading serialized changes
86
76
* back into memory. At that point we can't use the memory limit directly
122
112
#include "utils/rel.h"
123
113
#include "utils/relfilenumbermap.h"
124
114
125
- /*
126
- * Threshold of the total number of top-level and sub transactions that
127
- * controls whether we use the max-heap for tracking their sizes. Although
128
- * using the max-heap to select the largest transaction is effective when
129
- * there are many transactions being decoded, maintaining the max-heap while
130
- * updating the memory statistics can be costly. Therefore, we use
131
- * MaxConnections as the threshold so that we use the max-heap only when
132
- * using subtransactions.
133
- */
134
- #define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections
135
-
136
- /*
137
- * A macro to check if the max-heap is ready to use and needs to be updated
138
- * accordingly.
139
- */
140
- #define ReorderBufferMaxHeapIsReady (rb ) !binaryheap_empty((rb)->txn_heap)
141
-
142
115
/* entry for a hash table we use to map from xid to our transaction state */
143
116
typedef struct ReorderBufferTXNByIdEnt
144
117
{
@@ -290,9 +263,7 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
290
263
static void ReorderBufferCleanupSerializedTXNs (const char * slotname );
291
264
static void ReorderBufferSerializedPath (char * path , ReplicationSlot * slot ,
292
265
TransactionId xid , XLogSegNo segno );
293
- static void ReorderBufferBuildMaxHeap (ReorderBuffer * rb );
294
- static void ReorderBufferMaybeResetMaxHeap (ReorderBuffer * rb );
295
- static int ReorderBufferTXNSizeCompare (Datum a , Datum b , void * arg );
266
+ static int ReorderBufferTXNSizeCompare (const pairingheap_node * a , const pairingheap_node * b , void * arg );
296
267
297
268
static void ReorderBufferFreeSnap (ReorderBuffer * rb , Snapshot snap );
298
269
static Snapshot ReorderBufferCopySnap (ReorderBuffer * rb , Snapshot orig_snap ,
@@ -390,16 +361,8 @@ ReorderBufferAllocate(void)
390
361
buffer -> outbufsize = 0 ;
391
362
buffer -> size = 0 ;
392
363
393
- /*
394
- * The binaryheap is indexed for faster manipulations.
395
- *
396
- * We allocate the initial heap size greater than
397
- * MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
398
- * until the threshold is exceeded.
399
- */
400
- buffer -> txn_heap = binaryheap_allocate (MAX_HEAP_TXN_COUNT_THRESHOLD * 2 ,
401
- ReorderBufferTXNSizeCompare ,
402
- true, NULL );
364
+ /* txn_heap is ordered by transaction size */
365
+ buffer -> txn_heap = pairingheap_allocate (ReorderBufferTXNSizeCompare , NULL );
403
366
404
367
buffer -> spillTxns = 0 ;
405
368
buffer -> spillCount = 0 ;
@@ -1637,12 +1600,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1637
1600
1638
1601
/* deallocate */
1639
1602
ReorderBufferReturnTXN (rb , txn );
1640
-
1641
- /*
1642
- * After cleaning up one transaction, the number of transactions might get
1643
- * lower than the threshold for the max-heap.
1644
- */
1645
- ReorderBufferMaybeResetMaxHeap (rb );
1646
1603
}
1647
1604
1648
1605
/*
@@ -3265,20 +3222,18 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3265
3222
3266
3223
if (addition )
3267
3224
{
3225
+ Size oldsize = txn -> size ;
3226
+
3268
3227
txn -> size += sz ;
3269
3228
rb -> size += sz ;
3270
3229
3271
3230
/* Update the total size in the top transaction. */
3272
3231
toptxn -> total_size += sz ;
3273
3232
3274
- /* Update the max-heap as well if necessary */
3275
- if (ReorderBufferMaxHeapIsReady (rb ))
3276
- {
3277
- if ((txn -> size - sz ) == 0 )
3278
- binaryheap_add (rb -> txn_heap , PointerGetDatum (txn ));
3279
- else
3280
- binaryheap_update_up (rb -> txn_heap , PointerGetDatum (txn ));
3281
- }
3233
+ /* Update the max-heap */
3234
+ if (oldsize != 0 )
3235
+ pairingheap_remove (rb -> txn_heap , & txn -> txn_node );
3236
+ pairingheap_add (rb -> txn_heap , & txn -> txn_node );
3282
3237
}
3283
3238
else
3284
3239
{
@@ -3289,14 +3244,10 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3289
3244
/* Update the total size in the top transaction. */
3290
3245
toptxn -> total_size -= sz ;
3291
3246
3292
- /* Update the max-heap as well if necessary */
3293
- if (ReorderBufferMaxHeapIsReady (rb ))
3294
- {
3295
- if (txn -> size == 0 )
3296
- binaryheap_remove_node_ptr (rb -> txn_heap , PointerGetDatum (txn ));
3297
- else
3298
- binaryheap_update_down (rb -> txn_heap , PointerGetDatum (txn ));
3299
- }
3247
+ /* Update the max-heap */
3248
+ pairingheap_remove (rb -> txn_heap , & txn -> txn_node );
3249
+ if (txn -> size != 0 )
3250
+ pairingheap_add (rb -> txn_heap , & txn -> txn_node );
3300
3251
}
3301
3252
3302
3253
Assert (txn -> size <= rb -> size );
@@ -3555,10 +3506,10 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
3555
3506
3556
3507
/* Compare two transactions by size */
3557
3508
static int
3558
- ReorderBufferTXNSizeCompare (Datum a , Datum b , void * arg )
3509
+ ReorderBufferTXNSizeCompare (const pairingheap_node * a , const pairingheap_node * b , void * arg )
3559
3510
{
3560
- ReorderBufferTXN * ta = (ReorderBufferTXN * ) DatumGetPointer ( a );
3561
- ReorderBufferTXN * tb = (ReorderBufferTXN * ) DatumGetPointer ( b );
3511
+ const ReorderBufferTXN * ta = pairingheap_const_container (ReorderBufferTXN , txn_node , a );
3512
+ const ReorderBufferTXN * tb = pairingheap_const_container (ReorderBufferTXN , txn_node , b );
3562
3513
3563
3514
if (ta -> size < tb -> size )
3564
3515
return -1 ;
@@ -3568,106 +3519,16 @@ ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
3568
3519
}
3569
3520
3570
3521
/*
3571
- * Build the max-heap. The heap assembly step is deferred until the end, for
3572
- * efficiency.
3573
- */
3574
- static void
3575
- ReorderBufferBuildMaxHeap (ReorderBuffer * rb )
3576
- {
3577
- HASH_SEQ_STATUS hash_seq ;
3578
- ReorderBufferTXNByIdEnt * ent ;
3579
-
3580
- Assert (binaryheap_empty (rb -> txn_heap ));
3581
-
3582
- hash_seq_init (& hash_seq , rb -> by_txn );
3583
- while ((ent = hash_seq_search (& hash_seq )) != NULL )
3584
- {
3585
- ReorderBufferTXN * txn = ent -> txn ;
3586
-
3587
- if (txn -> size == 0 )
3588
- continue ;
3589
-
3590
- binaryheap_add_unordered (rb -> txn_heap , PointerGetDatum (txn ));
3591
- }
3592
-
3593
- binaryheap_build (rb -> txn_heap );
3594
- }
3595
-
3596
- /*
3597
- * Reset the max-heap if the number of transactions got lower than the
3598
- * threshold.
3599
- */
3600
- static void
3601
- ReorderBufferMaybeResetMaxHeap (ReorderBuffer * rb )
3602
- {
3603
- /*
3604
- * If we add and remove transactions right around the threshold, we could
3605
- * easily end up "thrashing". To avoid it, we adapt 10% of transactions to
3606
- * reset the max-heap.
3607
- */
3608
- if (ReorderBufferMaxHeapIsReady (rb ) &&
3609
- binaryheap_size (rb -> txn_heap ) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9 )
3610
- binaryheap_reset (rb -> txn_heap );
3611
- }
3612
-
3613
- /*
3614
- * Find the largest transaction (toplevel or subxact) to evict (spill to disk)
3615
- * by doing a linear search or using the max-heap depending on the number of
3616
- * transactions in ReorderBuffer. Refer to the comments atop this file for the
3617
- * algorithm details.
3522
+ * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3618
3523
*/
3619
3524
static ReorderBufferTXN *
3620
3525
ReorderBufferLargestTXN (ReorderBuffer * rb )
3621
3526
{
3622
- ReorderBufferTXN * largest = NULL ;
3623
-
3624
- if (!ReorderBufferMaxHeapIsReady (rb ))
3625
- {
3626
- /*
3627
- * If the number of transactions are small, we scan all transactions
3628
- * being decoded to get the largest transaction. This saves the cost
3629
- * of building a max-heap with a small number of transactions.
3630
- */
3631
- if (hash_get_num_entries (rb -> by_txn ) < MAX_HEAP_TXN_COUNT_THRESHOLD )
3632
- {
3633
- HASH_SEQ_STATUS hash_seq ;
3634
- ReorderBufferTXNByIdEnt * ent ;
3635
-
3636
- hash_seq_init (& hash_seq , rb -> by_txn );
3637
- while ((ent = hash_seq_search (& hash_seq )) != NULL )
3638
- {
3639
- ReorderBufferTXN * txn = ent -> txn ;
3640
-
3641
- /* if the current transaction is larger, remember it */
3642
- if ((!largest ) || (txn -> size > largest -> size ))
3643
- largest = txn ;
3644
- }
3645
-
3646
- Assert (largest );
3647
- }
3648
- else
3649
- {
3650
- /*
3651
- * There are a large number of transactions in ReorderBuffer. We
3652
- * build the max-heap for efficiently selecting the largest
3653
- * transactions.
3654
- */
3655
- ReorderBufferBuildMaxHeap (rb );
3656
-
3657
- /*
3658
- * The max-heap is ready now. We remain the max-heap at least
3659
- * until we free up enough transactions to bring the total memory
3660
- * usage below the limit. The largest transaction is selected
3661
- * below.
3662
- */
3663
- Assert (ReorderBufferMaxHeapIsReady (rb ));
3664
- }
3665
- }
3527
+ ReorderBufferTXN * largest ;
3666
3528
3667
3529
/* Get the largest transaction from the max-heap */
3668
- if (ReorderBufferMaxHeapIsReady (rb ))
3669
- largest = (ReorderBufferTXN * )
3670
- DatumGetPointer (binaryheap_first (rb -> txn_heap ));
3530
+ largest = pairingheap_container (ReorderBufferTXN , txn_node ,
3531
+ pairingheap_first (rb -> txn_heap ));
3671
3532
3672
3533
Assert (largest );
3673
3534
Assert (largest -> size > 0 );
@@ -3812,12 +3673,6 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
3812
3673
/* We must be under the memory limit now. */
3813
3674
Assert (rb -> size < logical_decoding_work_mem * 1024L );
3814
3675
3815
- /*
3816
- * After evicting some transactions, the number of transactions might get
3817
- * lower than the threshold for the max-heap.
3818
- */
3819
- ReorderBufferMaybeResetMaxHeap (rb );
3820
-
3821
3676
}
3822
3677
3823
3678
/*
0 commit comments