Skip to content

Commit d30ce26

Browse files
committed
Replace XactLastRecEnd with gxact->prepare_end_lsn
1 parent b4a5fe8 commit d30ce26

File tree

7 files changed

+256
-278
lines changed

7 files changed

+256
-278
lines changed

contrib/mmts/arbiter.c

Lines changed: 81 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -91,21 +91,19 @@ static void MtmMonitor(Datum arg);
9191
static void MtmSendHeartbeat(void);
9292
static bool MtmSendToNode(int node, void const* buf, int size);
9393

94-
/*
95-
static char const* const messageText[] =
94+
static char const* const messageKindText[] =
9695
{
9796
"INVALID",
9897
"HANDSHAKE",
99-
"READY",
100-
"PREPARE",
10198
"PREPARED",
99+
"PRECOMMIT",
100+
"PRECOMMITTED",
102101
"ABORTED",
103102
"STATUS",
104103
"HEARTBEAT",
105104
"POLL_REQUEST",
106105
"POLL_STATUS"
107106
};
108-
*/
109107

110108
static BackgroundWorker MtmSenderWorker = {
111109
"mtm-sender",
@@ -364,7 +362,7 @@ static void MtmSendHeartbeat()
364362
MTM_LOG2("Send heartbeat to node %d with timestamp %ld", i+1, now);
365363
}
366364
} else {
367-
MTM_LOG1("Do not send heartbeat to node %d, busy mask %lld, status %d", i+1, (long long) busy_mask, Mtm->status);
365+
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %d", i+1, (long long) busy_mask, Mtm->status);
368366
}
369367
}
370368
}
@@ -902,9 +900,14 @@ static void MtmReceiver(Datum arg)
902900
msg->status = TRANSACTION_STATUS_ABORTED;
903901
} else {
904902
msg->status = tm->state->status;
903+
msg->csn = tm->state->csn;
905904
MTM_LOG1("Send response %d for transaction %s to node %d", msg->status, msg->gid, msg->node);
906905
}
907-
msg->code = MSG_POLL_STATUS;
906+
msg->disabledNodeMask = Mtm->disabledNodeMask;
907+
msg->connectivityMask = Mtm->connectivityMask;
908+
msg->oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
909+
msg->code = MSG_POLL_STATUS;
910+
msg->csn = ts->csn;
908911
MtmSendMessage(msg);
909912
continue;
910913
case MSG_POLL_STATUS:
@@ -915,41 +918,34 @@ static void MtmReceiver(Datum arg)
915918
} else {
916919
ts = tm->state;
917920
BIT_SET(ts->votedMask, node-1);
918-
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
919-
if (msg->status == TRANSACTION_STATUS_UNKNOWN || msg->status == TRANSACTION_STATUS_COMMITTED) {
920-
elog(LOG, "Commit transaction %s because it is in state %d at node %d",
921+
if (ts->status == TRANSACTION_STATUS_UNKNOWN) {
922+
if (msg->status == TRANSACTION_STATUS_IN_PROGRESS || msg->status == TRANSACTION_STATUS_ABORTED) {
923+
elog(LOG, "Abort transaction %s because it is in state %d at node %d",
921924
msg->gid, ts->status, node);
922-
Assert(!IsTransactionState());
923-
StartTransactionCommand();
924-
MtmSetCurrentTransactionGID(ts->gid);
925-
ts->status = TRANSACTION_STATUS_UNKNOWN;
926-
FinishPreparedTransaction(ts->gid, true);
927-
CommitTransactionCommand();
928-
Assert(ts->status == TRANSACTION_STATUS_COMMITTED);
929-
} else if (msg->status == TRANSACTION_STATUS_ABORTED
930-
|| ((ts->participantsMask & ~Mtm->disabledNodeMask) & ~ts->votedMask) == 0)
925+
MtmFinishPreparedTransaction(node, ts, false);
926+
}
927+
else if (msg->status == TRANSACTION_STATUS_COMMITTED || msg->status == TRANSACTION_STATUS_UNKNOWN)
931928
{
932-
if (msg->status == TRANSACTION_STATUS_ABORTED) {
933-
elog(LOG, "Abort transaction %s because it is aborted at node %d", msg->gid, node);
934-
} else {
935-
elog(LOG, "Abort transaction %s because it is not prepared at any online node", msg->gid);
929+
if (msg->csn > ts->csn) {
930+
ts->csn = msg->csn;
931+
MtmSyncClock(ts->csn);
932+
}
933+
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
934+
elog(LOG, "Commit transaction %s because it is prepared at all live nodes", msg->gid);
935+
MtmFinishPreparedTransaction(node, ts, true);
936936
}
937-
Assert(!IsTransactionState());
938-
StartTransactionCommand();
939-
MtmSetCurrentTransactionGID(ts->gid);
940-
FinishPreparedTransaction(ts->gid, false);
941-
CommitTransactionCommand();
942-
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
943937
} else {
944938
elog(LOG, "Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",
945-
msg->status, msg->gid, node, (long long) ts->votedMask,
946-
(long long) (ts->participantsMask & ~Mtm->disabledNodeMask) );
939+
msg->status, msg->gid, node, (long long) ts->votedMask, (long long) (ts->participantsMask & ~Mtm->disabledNodeMask));
947940
continue;
948941
}
949942
} else if (ts->status == TRANSACTION_STATUS_ABORTED && msg->status == TRANSACTION_STATUS_COMMITTED) {
950943
elog(WARNING, "Transaction %s is aborted at node %d but committed at node %d", msg->gid, MtmNodeId, node);
951944
} else if (msg->status == TRANSACTION_STATUS_ABORTED && ts->status == TRANSACTION_STATUS_COMMITTED) {
952945
elog(WARNING, "Transaction %s is committed at node %d but aborted at node %d", msg->gid, MtmNodeId, node);
946+
} else {
947+
elog(LOG, "Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx",
948+
msg->status, msg->gid, ts->status, node, (long long) ts->votedMask, (long long) (ts->participantsMask & ~Mtm->disabledNodeMask) );
953949
}
954950
}
955951
continue;
@@ -965,50 +961,49 @@ static void MtmReceiver(Datum arg)
965961
elog(WARNING, "Ignore response for unexisted transaction %d from node %d", msg->dxid, node);
966962
continue;
967963
}
964+
if (BIT_CHECK(ts->votedMask, node-1)) {
965+
elog(WARNING, "Receive deteriorated %s response for transaction %d (%s) from node %d",
966+
messageKindText[msg->code], ts->xid, ts->gid, node);
967+
continue;
968+
}
968969
MtmCheckResponse(msg);
969-
970+
BIT_SET(ts->votedMask, node-1);
971+
970972
if (MtmIsCoordinator(ts)) {
971973
switch (msg->code) {
972-
case MSG_READY:
973-
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_READY");
974+
case MSG_PREPARED:
975+
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_PREPARED");
974976
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
975-
elog(WARNING, "Receive READY response for already committed transaction %d from node %d",
977+
elog(WARNING, "Receive PREPARED response for already committed transaction %d from node %d",
976978
ts->xid, node);
977979
continue;
978980
}
979-
if (ts->nVotes >= Mtm->nLiveNodes) {
980-
elog(WARNING, "Receive deteriorated READY response for transaction %d (%s) from node %d",
981-
ts->xid, ts->gid, node);
981+
Mtm->nodes[node-1].transDelay += MtmGetCurrentTime() - ts->csn;
982+
ts->xids[node-1] = msg->sxid;
983+
984+
if ((~msg->disabledNodeMask & Mtm->disabledNodeMask) != 0) {
985+
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
986+
commit on smaller subset of nodes */
987+
elog(WARNING, "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
988+
node, (long) Mtm->disabledNodeMask, (long) msg->disabledNodeMask);
982989
MtmAbortTransaction(ts);
983-
MtmWakeUpBackend(ts);
984-
} else {
985-
Mtm->nodes[node-1].transDelay += MtmGetCurrentTime() - ts->csn;
986-
ts->xids[node-1] = msg->sxid;
987-
988-
if ((~msg->disabledNodeMask & Mtm->disabledNodeMask) != 0) {
989-
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
990-
commit on smaller subset of nodes */
991-
elog(WARNING, "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
992-
node, (long) Mtm->disabledNodeMask, (long) msg->disabledNodeMask);
993-
MtmAbortTransaction(ts);
994-
}
995-
996-
if (++ts->nVotes == Mtm->nLiveNodes) {
997-
/* All nodes are finished their transactions */
998-
if (ts->status == TRANSACTION_STATUS_ABORTED) {
999-
MtmWakeUpBackend(ts);
990+
}
991+
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
992+
/* All nodes are finished their transactions */
993+
if (ts->status == TRANSACTION_STATUS_ABORTED) {
994+
MtmWakeUpBackend(ts);
995+
} else {
996+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
997+
ts->isPrepared = true;
998+
if (ts->isTwoPhase) {
999+
MtmWakeUpBackend(ts);
1000+
} else if (MtmUseDtm) {
1001+
ts->votedMask = 0;
1002+
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PRECOMMIT");
1003+
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
10001004
} else {
1001-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1002-
if (ts->isTwoPhase) {
1003-
MtmWakeUpBackend(ts);
1004-
} else if (MtmUseDtm) {
1005-
ts->nVotes = 1; /* I voted myself */
1006-
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PREPARE");
1007-
MtmSend2PCMessage(ts, MSG_PREPARE);
1008-
} else {
1009-
ts->status = TRANSACTION_STATUS_UNKNOWN;
1010-
MtmWakeUpBackend(ts);
1011-
}
1005+
ts->status = TRANSACTION_STATUS_UNKNOWN;
1006+
MtmWakeUpBackend(ts);
10121007
}
10131008
}
10141009
}
@@ -1023,47 +1018,40 @@ static void MtmReceiver(Datum arg)
10231018
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
10241019
MtmAbortTransaction(ts);
10251020
}
1026-
if (++ts->nVotes >= Mtm->nLiveNodes) {
1021+
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
10271022
MtmWakeUpBackend(ts);
10281023
}
10291024
break;
1030-
case MSG_PREPARED:
1031-
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_PREPARED");
1032-
if (ts->nVotes >= Mtm->nLiveNodes) {
1033-
elog(WARNING, "Receive deteriorated PREPARED response for transaction %d (%s) from node %d",
1034-
ts->xid, ts->gid, node);
1035-
MtmAbortTransaction(ts);
1036-
MtmWakeUpBackend(ts);
1025+
case MSG_PRECOMMITTED:
1026+
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_PRECOMMITTED");
1027+
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1028+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1029+
if (msg->csn > ts->csn) {
1030+
ts->csn = msg->csn;
1031+
MtmSyncClock(ts->csn);
1032+
}
1033+
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
1034+
ts->csn = MtmAssignCSN();
1035+
ts->status = TRANSACTION_STATUS_UNKNOWN;
1036+
MtmWakeUpBackend(ts);
1037+
}
10371038
} else {
1038-
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1039-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1040-
if (msg->csn > ts->csn) {
1041-
ts->csn = msg->csn;
1042-
MtmSyncClock(ts->csn);
1043-
}
1044-
if (++ts->nVotes == Mtm->nLiveNodes) {
1045-
ts->csn = MtmAssignCSN();
1046-
ts->status = TRANSACTION_STATUS_UNKNOWN;
1047-
MtmWakeUpBackend(ts);
1048-
}
1049-
} else {
1050-
if (++ts->nVotes == Mtm->nLiveNodes) {
1051-
MtmWakeUpBackend(ts);
1052-
}
1053-
}
1054-
}
1039+
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
1040+
MtmWakeUpBackend(ts);
1041+
}
1042+
}
10551043
break;
10561044
default:
10571045
Assert(false);
10581046
}
10591047
} else {
10601048
switch (msg->code) {
1061-
case MSG_PREPARE:
1049+
case MSG_PRECOMMIT:
10621050
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
10631051
ts->status = TRANSACTION_STATUS_UNKNOWN;
10641052
ts->csn = MtmAssignCSN();
10651053
MtmAdjustSubtransactions(ts);
1066-
MtmSend2PCMessage(ts, MSG_PREPARED);
1054+
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
10671055
} else {
10681056
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
10691057
MtmSend2PCMessage(ts, MSG_ABORTED);

0 commit comments

Comments
 (0)