Skip to content

Commit 13eb740

Browse files
committed
Avoid loops in transaction list
1 parent b978ab1 commit 13eb740

File tree

3 files changed

+35
-14
lines changed

3 files changed

+35
-14
lines changed

contrib/mmts/multimaster.c

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ void MtmLock(LWLockMode mode)
243243
#else
244244
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID], mode);
245245
#endif
246+
Mtm->lastLockHolder = MyProcPid;
246247
}
247248

248249
void MtmUnlock(void)
@@ -252,6 +253,7 @@ void MtmUnlock(void)
252253
#else
253254
LWLockRelease((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID]);
254255
#endif
256+
Mtm->lastLockHolder = 0;
255257
}
256258

257259
void MtmLockNode(int nodeId)
@@ -550,16 +552,20 @@ MtmAdjustOldestXid(TransactionId xid)
550552

551553
static void MtmTransactionListAppend(MtmTransState* ts)
552554
{
553-
ts->next = NULL;
554-
ts->nSubxids = 0;
555-
*Mtm->transListTail = ts;
556-
Mtm->transListTail = &ts->next;
555+
if (!ts->isEnqueued) {
556+
ts->isEnqueued = true;
557+
ts->next = NULL;
558+
ts->nSubxids = 0;
559+
*Mtm->transListTail = ts;
560+
Mtm->transListTail = &ts->next;
561+
}
557562
}
558563

559564
static void MtmTransactionListInsertAfter(MtmTransState* after, MtmTransState* ts)
560565
{
561566
ts->next = after->next;
562567
after->next = ts;
568+
ts->isEnqueued = true;
563569
if (Mtm->transListTail == &after->next) {
564570
Mtm->transListTail = &ts->next;
565571
}
@@ -700,6 +706,9 @@ MtmCreateTransState(MtmCurrentTrans* x)
700706
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
701707
ts->snapshot = x->snapshot;
702708
ts->isLocal = true;
709+
if (!found) {
710+
ts->isEnqueued = false;
711+
}
703712
if (TransactionIdIsValid(x->gtid.xid)) {
704713
Assert(x->gtid.node != MtmNodeId);
705714
ts->gtid = x->gtid;
@@ -829,6 +838,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
829838
Assert(x->gid[0]);
830839
tm->state = ts;
831840
ts->votingCompleted = true;
841+
if (!found) {
842+
ts->isEnqueued = false;
843+
}
832844
if (Mtm->status != MTM_RECOVERY) {
833845
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
834846
if (!MtmUseDtm) {
@@ -937,8 +949,12 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
937949
*/
938950
MTM_LOG1("%d: send ABORT notification abort transaction %d to coordinator %d", MyProcPid, x->gtid.xid, x->gtid.node);
939951
if (ts == NULL) {
952+
bool found;
940953
Assert(TransactionIdIsValid(x->xid));
941-
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
954+
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
955+
if (!found) {
956+
ts->isEnqueued = false;
957+
}
942958
ts->status = TRANSACTION_STATUS_ABORTED;
943959
ts->isLocal = true;
944960
ts->snapshot = x->snapshot;
@@ -1355,7 +1371,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
13551371
*/
13561372
bool MtmRefreshClusterStatus(bool nowait)
13571373
{
1358-
nodemask_t mask, clique, disabled, enabled;
1374+
nodemask_t mask, clique, disabled;
13591375
nodemask_t matrix[MAX_NODES];
13601376
MtmTransState *ts;
13611377
int clique_size;
@@ -1382,28 +1398,29 @@ bool MtmRefreshClusterStatus(bool nowait)
13821398
MTM_LOG1("Find clique %lx, disabledNodeMask %lx", (long) clique, (long) Mtm->disabledNodeMask);
13831399
MtmLock(LW_EXCLUSIVE);
13841400
disabled = ~clique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
1385-
enabled = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
13861401

13871402
for (i = 0, mask = disabled; mask != 0; i++, mask >>= 1) {
13881403
if (mask & 1) {
13891404
MtmDisableNode(i+1);
13901405
}
1391-
}
1392-
1406+
}
1407+
#if 0 /* Do not enable nodes here: them will be enabled after completion of recovery */
1408+
enabled = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
13931409
for (i = 0, mask = enabled; mask != 0; i++, mask >>= 1) {
13941410
if (mask & 1) {
13951411
MtmEnableNode(i+1);
13961412
}
13971413
}
1398-
if (disabled|enabled) {
1414+
#endif
1415+
if (disabled) {
13991416
MtmCheckQuorum();
14001417
}
14011418
/* Interrupt voting for active transaction and abort them */
14021419
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
14031420
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
14041421
ts->gid, ts->gtid.node, ts->xid, ts->status, ts->gtid.xid);
14051422
if (MtmIsCoordinator(ts)) {
1406-
if (!ts->votingCompleted && (disabled|enabled) != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
1423+
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
14071424
MtmAbortTransaction(ts);
14081425
MtmWakeUpBackend(ts);
14091426
}
@@ -2213,6 +2230,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
22132230
{
22142231
if (nodeId <= 0 || nodeId > Mtm->nLiveNodes)
22152232
{
2233+
MtmUnlock();
22162234
elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nLiveNodes);
22172235
}
22182236
MtmDisableNode(nodeId);
@@ -2278,6 +2296,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
22782296
MtmEnableNode(MtmReplicationNodeId);
22792297
MtmCheckQuorum();
22802298
} else {
2299+
MtmUnlock();
22812300
elog(ERROR, "Disabled node %d tries to reconnect without recovery", MtmReplicationNodeId);
22822301
}
22832302
} else {

contrib/mmts/multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ typedef struct MtmTransState
156156
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
157157
bool votingCompleted; /* 2PC voting is completed */
158158
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
159-
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
159+
bool isEnqueued; /* Transaction is inserted in queue */
160+
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
160161
} MtmTransState;
161162

162163
typedef struct
@@ -173,7 +174,7 @@ typedef struct
173174
nodemask_t walSenderLockerMask; /* Mask of WAL-senders IDs locking the cluster */
174175
nodemask_t nodeLockerMask; /* Mask of node IDs which WAL-senders are locking the cluster */
175176
nodemask_t reconnectMask; /* Mask of nodes connection to which has to be reestablished by sender */
176-
177+
int lastLockHolder; /* PID of process last obtaning the node lock */
177178
bool localTablesHashLoaded; /* Whether data from local_tables table is loaded in shared memory hash table */
178179
int inject2PCError; /* Simulate error during 2PC commit at this node */
179180
int nLiveNodes; /* Number of active nodes */

contrib/mmts/tests2/lib/bank_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ def exec_tx(self, name, tx_block):
141141
self.history.register_finish(event_id, 'Commit')
142142
except psycopg2.InterfaceError:
143143
self.history.register_finish(event_id, 'InterfaceError')
144-
except psycopg2.Error:
144+
except psycopg2.Error as x:
145+
print(x.pgerror)
145146
self.history.register_finish(event_id, 'PsycopgError')
146147

147148
cur.close()

0 commit comments

Comments
 (0)