@@ -200,6 +200,7 @@ static int MtmMinRecoveryLag;
200
200
static int MtmMaxRecoveryLag ;
201
201
static int Mtm2PCPrepareRatio ;
202
202
static int Mtm2PCMinTimeout ;
203
+ static int MtmGcPeriod ;
203
204
static bool MtmIgnoreTablesWithoutPk ;
204
205
205
206
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -342,16 +343,20 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
342
343
Snapshot MtmGetSnapshot (Snapshot snapshot )
343
344
{
344
345
snapshot = PgGetSnapshotData (snapshot );
345
- RecentGlobalDataXmin = RecentGlobalXmin = Mtm -> oldestXid ;//MtmAdjustOldestXid(RecentGlobalDataXmin);
346
+ RecentGlobalDataXmin = RecentGlobalXmin = Mtm -> oldestXid ;
346
347
return snapshot ;
347
348
}
348
349
349
350
350
351
TransactionId MtmGetOldestXmin (Relation rel , bool ignoreVacuum )
351
352
{
352
353
TransactionId xmin = PgGetOldestXmin (NULL , false); /* consider all backends */
353
- xmin = MtmAdjustOldestXid (xmin );
354
- return xmin ;
354
+ if (TransactionIdIsValid (xmin )) {
355
+ MtmLock (LW_EXCLUSIVE );
356
+ xmin = MtmAdjustOldestXid (xmin );
357
+ MtmUnlock ();
358
+ }
359
+ return xmin ;
355
360
}
356
361
357
362
bool MtmXidInMVCCSnapshot (TransactionId xid , Snapshot snapshot )
@@ -446,53 +451,50 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
446
451
static TransactionId
447
452
MtmAdjustOldestXid (TransactionId xid )
448
453
{
449
- if (TransactionIdIsValid (xid )) {
450
- MtmTransState * ts , * prev = NULL ;
451
- int i ;
452
-
453
- MtmLock (LW_EXCLUSIVE );
454
- ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
455
- if (ts != NULL ) {
456
- csn_t oldestSnapshot = ts -> snapshot ;
457
- Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
458
- for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
459
- if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
460
- && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
461
- {
462
- oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
463
- }
464
- }
465
- oldestSnapshot -= MtmVacuumDelay * USECS_PER_SEC ;
466
-
467
- for (ts = Mtm -> transListHead ;
468
- ts != NULL
469
- && ts -> csn < oldestSnapshot
470
- && TransactionIdPrecedes (ts -> xid , xid )
471
- && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
472
- ts -> status == TRANSACTION_STATUS_ABORTED );
473
- prev = ts , ts = ts -> next )
454
+ int i ;
455
+ MtmTransState * prev = NULL ;
456
+ MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
457
+ MTM_LOG1 ("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d" , MyProcPid , xid , ts != NULL ? ts -> snapshot : 0 , ts != NULL ? ts -> csn : 0 , ts != NULL ? ts -> status : -1 );
458
+ Mtm -> gcCount = 0 ;
459
+ if (ts != NULL ) {
460
+ csn_t oldestSnapshot = ts -> snapshot ;
461
+ Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
462
+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
463
+ if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
464
+ && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
474
465
{
475
- if (prev != NULL ) {
476
- /* Remove information about too old transactions */
477
- hash_search (MtmXid2State , & prev -> xid , HASH_REMOVE , NULL );
478
- }
466
+ oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
479
467
}
480
- }
481
- if (MtmUseDtm )
468
+ }
469
+ oldestSnapshot -= MtmVacuumDelay * USECS_PER_SEC ;
470
+
471
+ for (ts = Mtm -> transListHead ;
472
+ ts != NULL
473
+ && ts -> csn < oldestSnapshot
474
+ && TransactionIdPrecedes (ts -> xid , xid )
475
+ && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
476
+ ts -> status == TRANSACTION_STATUS_ABORTED );
477
+ prev = ts , ts = ts -> next )
482
478
{
483
479
if (prev != NULL ) {
484
- Mtm -> transListHead = prev ;
485
- Mtm -> oldestXid = xid = prev -> xid ;
486
- } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
487
- xid = Mtm -> oldestXid ;
488
- }
489
- } else {
490
- if (prev != NULL ) {
491
- Mtm -> transListHead = prev ;
480
+ /* Remove information about too old transactions */
481
+ hash_search (MtmXid2State , & prev -> xid , HASH_REMOVE , NULL );
492
482
}
493
483
}
494
- MtmUnlock ();
495
- }
484
+ }
485
+ if (MtmUseDtm )
486
+ {
487
+ if (prev != NULL ) {
488
+ Mtm -> transListHead = prev ;
489
+ Mtm -> oldestXid = xid = prev -> xid ;
490
+ } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
491
+ xid = Mtm -> oldestXid ;
492
+ }
493
+ } else {
494
+ if (prev != NULL ) {
495
+ Mtm -> transListHead = prev ;
496
+ }
497
+ }
496
498
return xid ;
497
499
}
498
500
/*
@@ -614,7 +616,12 @@ static void
614
616
MtmBeginTransaction (MtmCurrentTrans * x )
615
617
{
616
618
if (x -> snapshot == INVALID_CSN ) {
617
- MtmLock (LW_EXCLUSIVE );
619
+ TransactionId xmin = (Mtm -> gcCount >= MtmGcPeriod ) ? PgGetOldestXmin (NULL , false) : InvalidTransactionId ; /* Get oldest xmin outside critical section */
620
+
621
+ MtmLock (LW_EXCLUSIVE );
622
+ if (TransactionIdIsValid (xmin ) && Mtm -> gcCount >= MtmGcPeriod ) {
623
+ MtmAdjustOldestXid (xmin );
624
+ }
618
625
x -> xid = GetCurrentTransactionIdIfAny ();
619
626
x -> isReplicated = false;
620
627
x -> isDistributed = MtmIsUserTransaction ();
@@ -690,7 +697,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
690
697
}
691
698
692
699
MtmLock (LW_EXCLUSIVE );
693
-
694
700
/*
695
701
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
696
702
* Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
@@ -716,8 +722,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
716
722
717
723
x -> isPrepared = true;
718
724
x -> csn = ts -> csn ;
719
-
725
+
720
726
Mtm -> transCount += 1 ;
727
+ Mtm -> gcCount += 1 ;
728
+
721
729
MtmTransactionListAppend (ts );
722
730
MtmAddSubtransactions (ts , subxids , ts -> nSubxids );
723
731
MTM_LOG3 ("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)" ,
@@ -1466,8 +1474,9 @@ static void MtmInitialize()
1466
1474
Mtm -> transListHead = NULL ;
1467
1475
Mtm -> transListTail = & Mtm -> transListHead ;
1468
1476
Mtm -> nReceivers = 0 ;
1469
- Mtm -> timeShift = 0 ;
1477
+ Mtm -> timeShift = 0 ;
1470
1478
Mtm -> transCount = 0 ;
1479
+ Mtm -> gcCount = 0 ;
1471
1480
Mtm -> nConfigChanges = 0 ;
1472
1481
Mtm -> localTablesHashLoaded = false;
1473
1482
for (i = 0 ; i < MtmNodes ; i ++ ) {
@@ -1600,6 +1609,21 @@ _PG_init(void)
1600
1609
if (!process_shared_preload_libraries_in_progress )
1601
1610
return ;
1602
1611
1612
+ DefineCustomIntVariable (
1613
+ "multimaster.gc_period" ,
1614
+ "Number of distributed transactions after which garbage collection is started" ,
1615
+ "Multimaster is building xid->csn hash map which has to be cleaned to avoid hash overflow. This parameter specifies interval of invoking garbage collector for this map" ,
1616
+ & MtmGcPeriod ,
1617
+ MTM_HASH_SIZE /10 ,
1618
+ 1 ,
1619
+ INT_MAX ,
1620
+ PGC_BACKEND ,
1621
+ 0 ,
1622
+ NULL ,
1623
+ NULL ,
1624
+ NULL
1625
+ );
1626
+
1603
1627
DefineCustomIntVariable (
1604
1628
"multimaster.max_nodes" ,
1605
1629
"Maximal number of cluster nodes" ,
@@ -2339,7 +2363,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
2339
2363
values [11 ] = Int32GetDatum (Mtm -> recoverySlot );
2340
2364
values [12 ] = Int64GetDatum (hash_get_num_entries (MtmXid2State ));
2341
2365
values [13 ] = Int64GetDatum (hash_get_num_entries (MtmGid2State ));
2342
- values [14 ] = Int64GetDatum (Mtm -> oldestSnapshot );
2366
+ values [14 ] = Int32GetDatum (Mtm -> oldestXid );
2343
2367
values [15 ] = Int32GetDatum (Mtm -> nConfigChanges );
2344
2368
2345
2369
PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc , values , nulls )));
0 commit comments