Skip to content

Commit df51185

Browse files
committed
Abort voting backends on node disconnect
1 parent 9a0a869 commit df51185

File tree

3 files changed

+36
-18
lines changed

3 files changed

+36
-18
lines changed

contrib/mmts/arbiter.c

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -584,12 +584,6 @@ static void MtmTransSender(Datum arg)
584584
}
585585
}
586586

587-
static void MtmWakeUpBackend(MtmTransState* ts)
588-
{
589-
MTM_TRACE("Wakeup backed procno=%d, pid=%d\n", ts->procno, ProcGlobal->allProcs[ts->procno].pid);
590-
ts->votingCompleted = true;
591-
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
592-
}
593587

594588
#if !USE_EPOLL
595589
static bool MtmRecovery()
@@ -715,9 +709,7 @@ static void MtmTransReceiver(Datum arg)
715709
commit on smaller subset of nodes */
716710
elog(WARNING, "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
717711
msg->node, Mtm->disabledNodeMask, msg->disabledNodeMask);
718-
ts->status = TRANSACTION_STATUS_ABORTED;
719-
MtmAdjustSubtransactions(ts);
720-
Mtm->nActiveTransactions -= 1;
712+
MtmAbortTransaction(ts);
721713
}
722714

723715
if (++ts->nVotes == Mtm->nNodes) {
@@ -735,9 +727,7 @@ static void MtmTransReceiver(Datum arg)
735727
Assert(ts->nVotes < Mtm->nNodes);
736728
if (ts->status != TRANSACTION_STATUS_ABORTED) {
737729
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
738-
ts->status = TRANSACTION_STATUS_ABORTED;
739-
MtmAdjustSubtransactions(ts);
740-
Mtm->nActiveTransactions -= 1;
730+
MtmAbortTransaction(ts);
741731
}
742732
if (++ts->nVotes == Mtm->nNodes) {
743733
MtmWakeUpBackend(ts);

contrib/mmts/multimaster.c

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId* subxids, int
553553
Assert(!found);
554554
sts->status = ts->status;
555555
sts->csn = ts->csn;
556+
sts->votingCompleted = true;
556557
MtmTransactionListInsertAfter(ts, sts);
557558
}
558559
}
@@ -745,7 +746,8 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
745746
if (!MtmIsCoordinator(ts) || Mtm->status == MTM_RECOVERY) {
746747
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
747748
Assert(x->gid[0]);
748-
tm->state = ts;
749+
tm->state = ts;
750+
ts->votingCompleted = true;
749751
if (Mtm->status != MTM_RECOVERY) {
750752
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
751753
} else {
@@ -777,9 +779,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
777779
MtmLock(LW_EXCLUSIVE);
778780
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
779781
Assert(tm != NULL);
780-
tm->state->status = TRANSACTION_STATUS_ABORTED;
781-
MtmAdjustSubtransactions(tm->state);
782-
Mtm->nActiveTransactions -= 1;
782+
MtmAbortTransaction(tm->state);
783783
MtmUnlock();
784784
x->status = TRANSACTION_STATUS_ABORTED;
785785
}
@@ -835,6 +835,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
835835
ts->gtid = x->gtid;
836836
ts->nSubxids = 0;
837837
ts->cmd = MSG_INVALID;
838+
ts->votingCompleted = true;
838839
MtmTransactionListAppend(ts);
839840
}
840841
MtmSendNotificationMessage(ts, MSG_ABORTED); /* send notification to coordinator */
@@ -937,6 +938,20 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
937938
return csn;
938939
}
939940

941+
void MtmWakeUpBackend(MtmTransState* ts)
942+
{
943+
MTM_TRACE("Wakeup backed procno=%d, pid=%d\n", ts->procno, ProcGlobal->allProcs[ts->procno].pid);
944+
ts->votingCompleted = true;
945+
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
946+
}
947+
948+
void MtmAbortTransaction(MtmTransState* ts)
949+
{
950+
ts->status = TRANSACTION_STATUS_ABORTED;
951+
MtmAdjustSubtransactions(ts);
952+
Mtm->nActiveTransactions -= 1;
953+
}
954+
940955
/*
941956
* -------------------------------------------
942957
* HA functions
@@ -1213,9 +1228,10 @@ void MtmCheckQuorum(void)
12131228
}
12141229
}
12151230

1216-
12171231
void MtmOnNodeDisconnect(int nodeId)
1218-
{
1232+
{
1233+
MtmTransState *ts;
1234+
12191235
BIT_SET(Mtm->connectivityMask, nodeId-1);
12201236
BIT_SET(Mtm->reconnectMask, nodeId-1);
12211237
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
@@ -1229,6 +1245,16 @@ void MtmOnNodeDisconnect(int nodeId)
12291245
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
12301246
Mtm->nNodes -= 1;
12311247
MtmCheckQuorum();
1248+
/* Interrupt voting for active transaction and abort them */
1249+
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1250+
if (!ts->votingCompleted) {
1251+
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1252+
elog(WARNING, "Rollback active transaction %d:%d", ts->gtid.node, ts->gtid.xid);
1253+
MtmAbortTransaction(ts);
1254+
}
1255+
MtmWakeUpBackend(ts);
1256+
}
1257+
}
12321258
}
12331259
MtmUnlock();
12341260
}

contrib/mmts/multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,10 @@ extern void MtmDropNode(int nodeId, bool dropSlot);
199199
extern void MtmRecoverNode(int nodeId);
200200
extern void MtmOnNodeDisconnect(int nodeId);
201201
extern void MtmOnNodeConnect(int nodeId);
202+
extern void MtmWakeUpBackend(MtmTransState* ts);
202203
extern timestamp_t MtmGetCurrentTime(void);
203204
extern void MtmSleep(timestamp_t interval);
205+
extern void MtmAbortTransaction(MtmTransState* ts);
204206
extern void MtmSetCurrentTransactionGID(char const* gid);
205207
extern csn_t MtmGetTransactionCSN(TransactionId xid);
206208
extern void MtmSetCurrentTransactionCSN(csn_t csn);

0 commit comments

Comments
 (0)