@@ -274,8 +274,8 @@ void MtmSleep(timestamp_t interval)
274
274
{
275
275
struct timespec ts ;
276
276
struct timespec rem ;
277
- ts .tv_sec = interval /1000000 ;
278
- ts .tv_nsec = interval %1000000 * 1000 ;
277
+ ts .tv_sec = interval /USECS_PER_SEC ;
278
+ ts .tv_nsec = interval %USECS_PER_SEC * 1000 ;
279
279
280
280
while (nanosleep (& ts , & rem ) < 0 ) {
281
281
Assert (errno == EINTR );
@@ -330,7 +330,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
330
330
331
331
MtmLock (LW_SHARED );
332
332
ts = hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
333
- if (ts != NULL ) {
333
+ if (ts != NULL && ! ts -> isLocal ) {
334
334
snapshot = ts -> snapshot ;
335
335
}
336
336
MtmUnlock ();
@@ -452,8 +452,8 @@ MtmAdjustOldestXid(TransactionId xid)
452
452
453
453
MtmLock (LW_EXCLUSIVE );
454
454
ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
455
- if (ts != NULL && ts -> status == TRANSACTION_STATUS_COMMITTED ) {
456
- csn_t oldestSnapshot = ts -> csn ;
455
+ if (ts != NULL ) {
456
+ csn_t oldestSnapshot = ts -> snapshot ;
457
457
Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
458
458
for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
459
459
if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
@@ -483,8 +483,7 @@ MtmAdjustOldestXid(TransactionId xid)
483
483
if (prev != NULL ) {
484
484
Mtm -> transListHead = prev ;
485
485
Mtm -> oldestXid = xid = prev -> xid ;
486
- } else {
487
- Assert (TransactionIdPrecedesOrEquals (Mtm -> oldestXid , xid ));
486
+ } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
488
487
xid = Mtm -> oldestXid ;
489
488
}
490
489
} else {
@@ -650,6 +649,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
650
649
if (!found ) {
651
650
ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
652
651
ts -> snapshot = x -> snapshot ;
652
+ ts -> isLocal = true;
653
653
if (TransactionIdIsValid (x -> gtid .xid )) {
654
654
Assert (x -> gtid .node != MtmNodeId );
655
655
ts -> gtid = x -> gtid ;
@@ -704,7 +704,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
704
704
/*
705
705
* Invalid CSN prevent replication of transaction by logical replication
706
706
*/
707
- ts -> snapshot = x -> isReplicated || !x -> containsDML ? INVALID_CSN : x -> snapshot ;
707
+ ts -> isLocal = x -> isReplicated || !x -> containsDML ;
708
+ ts -> snapshot = x -> snapshot ;
708
709
ts -> csn = MtmAssignCSN ();
709
710
ts -> procno = MyProc -> pgprocno ;
710
711
ts -> nVotes = 1 ; /* I am voted myself */
@@ -752,16 +753,20 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
752
753
} else {
753
754
time_t timeout = Max (Mtm2PCMinTimeout , (ts -> csn - ts -> snapshot )* Mtm2PCPrepareRatio /100000 ); /* usec->msec and percents */
754
755
int result = 0 ;
756
+ int nConfigChanges = Mtm -> nConfigChanges ;
755
757
/* wait votes from all nodes */
756
758
while (!ts -> votingCompleted && !(result & WL_TIMEOUT )) {
757
759
MtmUnlock ();
758
760
result = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET |WL_TIMEOUT , timeout );
759
761
ResetLatch (& MyProc -> procLatch );
760
762
MtmLock (LW_SHARED );
761
763
}
762
- if (!ts -> votingCompleted ) {
764
+ if (!ts -> votingCompleted ) {
763
765
ts -> status = TRANSACTION_STATUS_ABORTED ;
764
- elog (WARNING , "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec" , (int )timeout , (int )((ts -> csn - x -> snapshot )/1000 ));
766
+ elog (WARNING , "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec" , (int )timeout , (int )USEC_TO_MSEC (ts -> csn - x -> snapshot ));
767
+ } else if (nConfigChanges != Mtm -> nConfigChanges ) {
768
+ ts -> status = TRANSACTION_STATUS_ABORTED ;
769
+ elog (WARNING , "Transaction is aborted because cluster configuration is changed during commit" );
765
770
}
766
771
x -> status = ts -> status ;
767
772
MTM_LOG3 ("%d: Result of vote: %d" , MyProcPid , ts -> status );
@@ -830,7 +835,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
830
835
Assert (TransactionIdIsValid (x -> xid ));
831
836
ts = hash_search (MtmXid2State , & x -> xid , HASH_ENTER , NULL );
832
837
ts -> status = TRANSACTION_STATUS_ABORTED ;
833
- ts -> snapshot = INVALID_CSN ;
838
+ ts -> isLocal = true;
839
+ ts -> snapshot = x -> snapshot ;
834
840
ts -> csn = MtmAssignCSN ();
835
841
ts -> gtid = x -> gtid ;
836
842
ts -> nSubxids = 0 ;
@@ -1089,6 +1095,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
1089
1095
BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1090
1096
Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ();
1091
1097
Mtm -> nLiveNodes += 1 ;
1098
+ Mtm -> nConfigChanges += 1 ;
1092
1099
caughtUp = true;
1093
1100
} else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
1094
1101
&& slotLSN + MtmMinRecoveryLag > walLSN )
@@ -1263,6 +1270,7 @@ bool MtmRefreshClusterStatus(bool nowait)
1263
1270
1264
1271
void MtmCheckQuorum (void )
1265
1272
{
1273
+ Mtm -> nConfigChanges += 1 ;
1266
1274
if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) {
1267
1275
if (Mtm -> status == MTM_ONLINE ) { /* out of quorum */
1268
1276
elog (WARNING , "Node is in minority: disabled mask %lx" , (long ) Mtm -> disabledNodeMask );
@@ -1460,6 +1468,7 @@ static void MtmInitialize()
1460
1468
Mtm -> nReceivers = 0 ;
1461
1469
Mtm -> timeShift = 0 ;
1462
1470
Mtm -> transCount = 0 ;
1471
+ Mtm -> nConfigChanges = 0 ;
1463
1472
Mtm -> localTablesHashLoaded = false;
1464
1473
for (i = 0 ; i < MtmNodes ; i ++ ) {
1465
1474
Mtm -> nodes [i ].oldestSnapshot = 0 ;
0 commit comments