Skip to content

Commit 127dc15

Browse files
committed
Fix reace condition in tw_pahse.c
1 parent 6275e9c commit 127dc15

File tree

4 files changed

+28
-9
lines changed

4 files changed

+28
-9
lines changed

contrib/mmts/arbiter.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,9 @@ static void MtmSendHeartbeat()
371371
} else {
372372
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
373373
if (BIT_CHECK(Mtm->connectivityMask, i)) {
374-
MtmDisconnect(i);
374+
close(sockets[i]);
375+
sockets[i] = -1;
376+
MtmReconnectNode(i+1);
375377
//MtmOnNodeConnect(i+1);
376378
}
377379
MTM_LOG4("Send heartbeat to node %d with timestamp %ld", i+1, now);

contrib/mmts/multimaster.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2021,6 +2021,13 @@ void MtmOnNodeConnect(int nodeId)
20212021
MtmUnlock();
20222022
}
20232023

2024+
void MtmReconnectNode(int nodeId)
2025+
{
2026+
MtmLock(LW_EXCLUSIVE);
2027+
BIT_SET(Mtm->reconnectMask, nodeId-1);
2028+
MtmUnlock();
2029+
}
2030+
20242031

20252032

20262033
/*
@@ -3301,7 +3308,7 @@ bool MtmFilterTransaction(char* record, int size)
33013308
}
33023309

33033310
if (duplicate) {
3304-
MTM_LOG2("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origin_lsn=%lx",
3311+
MTM_LOG1("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origin_lsn=%lx",
33053312
gid, replication_node, flags, Mtm->nodes[origin_node-1].restartLSN, origin_node, MtmReplicationNodeId, end_lsn, origin_lsn);
33063313
} else {
33073314
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ extern void MtmUnlock(void);
358358
extern void MtmLockNode(int nodeId, LWLockMode mode);
359359
extern void MtmUnlockNode(int nodeId);
360360
extern void MtmDropNode(int nodeId, bool dropSlot);
361+
extern void MtmReconnectNode(int nodeId);
361362
extern void MtmRecoverNode(int nodeId);
362363
extern void MtmOnNodeDisconnect(int nodeId);
363364
extern void MtmOnNodeConnect(int nodeId);

src/backend/access/transam/twophase.c

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -627,9 +627,10 @@ LockGXact(const char *gid, Oid user)
627627
before_shmem_exit(AtProcExit_Twophase, 0);
628628
twophaseExitRegistered = true;
629629
}
630-
631-
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
630+
MyLockedGxact = NULL;
632631
i = string_hash(gid, 0) % max_prepared_xacts;
632+
Retry:
633+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
633634
for (gxact = TwoPhaseState->hashTable[i]; gxact != NULL; gxact = gxact->next)
634635
{
635636
if (strcmp(gxact->gid, gid) == 0)
@@ -639,19 +640,25 @@ LockGXact(const char *gid, Oid user)
639640
/* Lock gxact. We have to release TwoPhaseStateLock LW-Lock to avoid deadlock */
640641

641642
LWLockRelease(TwoPhaseStateLock);
642-
SpinLockAcquire(&gxact->spinlock);
643+
644+
if (MyLockedGxact != gxact) {
645+
if (MyLockedGxact != NULL) {
646+
SpinLockRelease(&MyLockedGxact->spinlock);
647+
}
648+
MyLockedGxact = gxact;
649+
SpinLockAcquire(&gxact->spinlock);
650+
goto Retry;
651+
}
643652

644653
/* Ignore not-yet-valid GIDs */
645654
if (!gxact->valid) {
646-
SpinLockRelease(&gxact->spinlock);
647655
ereport(ERROR,
648656
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
649657
errmsg("prepared transaction with identifier \"%s\" is not valid",
650658
gid)));
651659
}
652660

653661
if (user != gxact->owner && !superuser_arg(user)) {
654-
SpinLockRelease(&gxact->spinlock);
655662
ereport(ERROR,
656663
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
657664
errmsg("permission denied to finish prepared transaction"),
@@ -665,7 +672,6 @@ LockGXact(const char *gid, Oid user)
665672
* someone gets motivated to make it work.
666673
*/
667674
if (MyDatabaseId != proc->databaseId) {
668-
SpinLockRelease(&gxact->spinlock);
669675
ereport(ERROR,
670676
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
671677
errmsg("prepared transaction belongs to another database"),
@@ -676,13 +682,16 @@ LockGXact(const char *gid, Oid user)
676682
/* OK for me to lock it */
677683
Assert(gxact->locking_pid < 0);
678684
gxact->locking_pid = MyProcPid;
679-
MyLockedGxact = gxact;
680685

681686
return gxact;
682687
}
683688
}
684689

685690
LWLockRelease(TwoPhaseStateLock);
691+
if (MyLockedGxact != NULL) {
692+
SpinLockRelease(&MyLockedGxact->spinlock);
693+
MyLockedGxact = NULL;
694+
}
686695

687696
ereport(ERROR,
688697
(errcode(ERRCODE_UNDEFINED_OBJECT),

0 commit comments

Comments
 (0)