Skip to content

Commit 8b1a313

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents fe7790f + 8ea606c commit 8b1a313

File tree

3 files changed

+23
-13
lines changed

3 files changed

+23
-13
lines changed

contrib/mmts/multimaster.c

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,8 @@ void MtmSleep(timestamp_t interval)
274274
{
275275
struct timespec ts;
276276
struct timespec rem;
277-
ts.tv_sec = interval/1000000;
278-
ts.tv_nsec = interval%1000000*1000;
277+
ts.tv_sec = interval/USECS_PER_SEC;
278+
ts.tv_nsec = interval%USECS_PER_SEC*1000;
279279

280280
while (nanosleep(&ts, &rem) < 0) {
281281
Assert(errno == EINTR);
@@ -330,7 +330,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
330330

331331
MtmLock(LW_SHARED);
332332
ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
333-
if (ts != NULL) {
333+
if (ts != NULL && !ts->isLocal) {
334334
snapshot = ts->snapshot;
335335
}
336336
MtmUnlock();
@@ -452,8 +452,8 @@ MtmAdjustOldestXid(TransactionId xid)
452452

453453
MtmLock(LW_EXCLUSIVE);
454454
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
455-
if (ts != NULL && ts->status == TRANSACTION_STATUS_COMMITTED) {
456-
csn_t oldestSnapshot = ts->csn;
455+
if (ts != NULL) {
456+
csn_t oldestSnapshot = ts->snapshot;
457457
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
458458
for (i = 0; i < Mtm->nAllNodes; i++) {
459459
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
@@ -483,8 +483,7 @@ MtmAdjustOldestXid(TransactionId xid)
483483
if (prev != NULL) {
484484
Mtm->transListHead = prev;
485485
Mtm->oldestXid = xid = prev->xid;
486-
} else {
487-
Assert(TransactionIdPrecedesOrEquals(Mtm->oldestXid, xid));
486+
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
488487
xid = Mtm->oldestXid;
489488
}
490489
} else {
@@ -650,6 +649,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
650649
if (!found) {
651650
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
652651
ts->snapshot = x->snapshot;
652+
ts->isLocal = true;
653653
if (TransactionIdIsValid(x->gtid.xid)) {
654654
Assert(x->gtid.node != MtmNodeId);
655655
ts->gtid = x->gtid;
@@ -704,7 +704,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
704704
/*
705705
* Invalid CSN prevent replication of transaction by logical replication
706706
*/
707-
ts->snapshot = x->isReplicated || !x->containsDML ? INVALID_CSN : x->snapshot;
707+
ts->isLocal = x->isReplicated || !x->containsDML;
708+
ts->snapshot = x->snapshot;
708709
ts->csn = MtmAssignCSN();
709710
ts->procno = MyProc->pgprocno;
710711
ts->nVotes = 1; /* I am voted myself */
@@ -752,16 +753,20 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
752753
} else {
753754
time_t timeout = Max(Mtm2PCMinTimeout, (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100000); /* usec->msec and percents */
754755
int result = 0;
756+
int nConfigChanges = Mtm->nConfigChanges;
755757
/* wait votes from all nodes */
756758
while (!ts->votingCompleted && !(result & WL_TIMEOUT)) {
757759
MtmUnlock();
758760
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, timeout);
759761
ResetLatch(&MyProc->procLatch);
760762
MtmLock(LW_SHARED);
761763
}
762-
if (!ts->votingCompleted) {
764+
if (!ts->votingCompleted) {
763765
ts->status = TRANSACTION_STATUS_ABORTED;
764-
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)timeout, (int)((ts->csn - x->snapshot)/1000));
766+
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)timeout, (int)USEC_TO_MSEC(ts->csn - x->snapshot));
767+
} else if (nConfigChanges != Mtm->nConfigChanges) {
768+
ts->status = TRANSACTION_STATUS_ABORTED;
769+
elog(WARNING, "Transaction is aborted because cluster configuration is changed during commit");
765770
}
766771
x->status = ts->status;
767772
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
@@ -830,7 +835,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
830835
Assert(TransactionIdIsValid(x->xid));
831836
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
832837
ts->status = TRANSACTION_STATUS_ABORTED;
833-
ts->snapshot = INVALID_CSN;
838+
ts->isLocal = true;
839+
ts->snapshot = x->snapshot;
834840
ts->csn = MtmAssignCSN();
835841
ts->gtid = x->gtid;
836842
ts->nSubxids = 0;
@@ -1089,6 +1095,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
10891095
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
10901096
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
10911097
Mtm->nLiveNodes += 1;
1098+
Mtm->nConfigChanges += 1;
10921099
caughtUp = true;
10931100
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
10941101
&& slotLSN + MtmMinRecoveryLag > walLSN)
@@ -1263,6 +1270,7 @@ bool MtmRefreshClusterStatus(bool nowait)
12631270

12641271
void MtmCheckQuorum(void)
12651272
{
1273+
Mtm->nConfigChanges += 1;
12661274
if (Mtm->nLiveNodes < Mtm->nAllNodes/2+1) {
12671275
if (Mtm->status == MTM_ONLINE) { /* out of quorum */
12681276
elog(WARNING, "Node is in minority: disabled mask %lx", (long) Mtm->disabledNodeMask);
@@ -1460,6 +1468,7 @@ static void MtmInitialize()
14601468
Mtm->nReceivers = 0;
14611469
Mtm->timeShift = 0;
14621470
Mtm->transCount = 0;
1471+
Mtm->nConfigChanges = 0;
14631472
Mtm->localTablesHashLoaded = false;
14641473
for (i = 0; i < MtmNodes; i++) {
14651474
Mtm->nodes[i].oldestSnapshot = 0;

contrib/mmts/multimaster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ typedef struct MtmTransState
145145
struct MtmTransState* nextVoting; /* Next element in L1-list of voting transactions. */
146146
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
147147
bool votingCompleted; /* 2PC voting is completed */
148+
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
148149
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
149150
} MtmTransState;
150151

@@ -169,7 +170,8 @@ typedef struct
169170
int nReceivers; /* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
170171
int nLockers; /* Number of lockers */
171172
int nActiveTransactions; /* Nunmber of active 2PC transactions */
172-
long timeShift; /* Local time correction */
173+
int nConfigChanges; /* Number of cluster configuration changes */
174+
int64 timeShift; /* Local time correction */
173175
csn_t csn; /* Last obtained CSN: used to provide unique acending CSNs based on system time */
174176
MtmTransState* votingTransactions; /* L1-list of replicated transactions sendings notifications to coordinator.
175177
This list is used to pass information to mtm-sender BGW */

contrib/mmts/tests/dtmacid.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ void* reader(void* arg)
144144
}
145145
}
146146
}
147-
t.transactions += 2;
148147
t.selects += 2;
149148
txn1.commit();
150149
txn2.commit();

0 commit comments

Comments
 (0)