Skip to content

Commit 07321e6

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' into PGPROEE9_6
2 parents bf09fde + a649b7d commit 07321e6

File tree

15 files changed

+226
-105
lines changed

15 files changed

+226
-105
lines changed

contrib/mmts/arbiter.c

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -359,11 +359,17 @@ static void MtmSendHeartbeat()
359359
for (i = 0; i < Mtm->nAllNodes; i++)
360360
{
361361
if (i+1 != MtmNodeId) {
362-
if (!BIT_CHECK(busy_mask, i)
363-
&& (Mtm->status != MTM_ONLINE
364-
|| sockets[i] >= 0
365-
|| !BIT_CHECK(Mtm->disabledNodeMask, i)
366-
|| BIT_CHECK(Mtm->reconnectMask, i)))
362+
if (!BIT_CHECK(busy_mask, i))
363+
/*
364+
* Old behaviour here can cause subtle bugs, for example
365+
* it can happened that none of mentioned conditiotions is
366+
* true when disabled node connects to a major node which
367+
* is online. So just send it allways. --sk
368+
*/
369+
// && (Mtm->status != MTM_ONLINE
370+
// || sockets[i] >= 0
371+
// || !BIT_CHECK(Mtm->disabledNodeMask, i)
372+
// || BIT_CHECK(Mtm->reconnectMask, i)))
367373
{
368374
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
369375
MTM_ELOG(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
@@ -981,6 +987,7 @@ static void MtmReceiver(Datum arg)
981987
msg->gid, MtmTxnStatusMnem[msg->status], node);
982988

983989
replorigin_session_origin = DoNotReplicateId;
990+
TXFINISH("%s ABORT, MSG_POLL_STATUS", msg->gid);
984991
MtmFinishPreparedTransaction(ts, false);
985992
replorigin_session_origin = InvalidRepOriginId;
986993
}
@@ -994,6 +1001,7 @@ static void MtmReceiver(Datum arg)
9941001
MTM_ELOG(LOG, "Commit transaction %s because it is prepared at all live nodes", msg->gid);
9951002

9961003
replorigin_session_origin = DoNotReplicateId;
1004+
TXFINISH("%s COMMIT, MSG_POLL_STATUS", msg->gid);
9971005
MtmFinishPreparedTransaction(ts, true);
9981006
replorigin_session_origin = InvalidRepOriginId;
9991007
} else {
@@ -1069,17 +1077,10 @@ static void MtmReceiver(Datum arg)
10691077
if (ts->isTwoPhase) {
10701078
MtmWakeUpBackend(ts);
10711079
} else if (MtmUseDtm) {
1072-
ts->votedMask = 0;
10731080
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PRECOMMIT");
10741081
Assert(replorigin_session_origin == InvalidRepOriginId);
1075-
MTM_LOG2("SetPreparedTransactionState for %s", ts->gid);
1076-
MtmUnlock();
1077-
MtmResetTransaction();
1078-
StartTransactionCommand();
1079-
SetPreparedTransactionState(ts->gid, MULTIMASTER_PRECOMMITTED);
1080-
CommitTransactionCommand();
1081-
Assert(!MtmTransIsActive());
1082-
MtmLock(LW_EXCLUSIVE);
1082+
ts->isPrepared = false;
1083+
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
10831084
} else {
10841085
ts->status = TRANSACTION_STATUS_UNKNOWN;
10851086
MtmWakeUpBackend(ts);

contrib/mmts/multimaster.c

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,7 @@ MtmVotingCompleted(MtmTransState* ts)
12551255
ts->status = TRANSACTION_STATUS_UNKNOWN;
12561256
return true;
12571257
} else {
1258-
MTM_LOG1("Transaction %s is considered as prepared (status=%s participants=%llx disabled=%llx, voted=%llx)",
1258+
MTM_LOG2("Transaction %s is considered as prepared (status=%s participants=%llx disabled=%llx, voted=%llx)",
12591259
ts->gid, MtmTxnStatusMnem[ts->status], ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
12601260
ts->isPrepared = true;
12611261
if (ts->isTwoPhase) {
@@ -1530,7 +1530,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
15301530
(commit ? "commit" : "rollback"), ts->gid, ts->xid, ts->gtid.node, ts->gtid.xid, MtmTxnStatusMnem[ts->status]);
15311531
if (commit) {
15321532
if (!(ts->status == TRANSACTION_STATUS_UNKNOWN
1533-
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)))
1533+
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status <= MTM_RECOVERY)))
15341534
{
15351535
MtmUnlock();
15361536
MTM_ELOG(ERROR, "Attempt to commit %s transaction %s (%llu)",
@@ -1690,7 +1690,7 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
16901690

16911691
for (i = 0; i < Mtm->nAllNodes; i++)
16921692
{
1693-
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i))
1693+
if (BIT_CHECK(ts->participantsMask, i))
16941694
{
16951695
msg.node = i+1;
16961696
MTM_LOG3("Send request for transaction %s to node %d", msg.gid, msg.node);
@@ -1734,7 +1734,7 @@ static void MtmLoadPreparedTransactions(void)
17341734
ts->gtid.xid = xid;
17351735
ts->nSubxids = 0;
17361736
ts->votingCompleted = true;
1737-
ts->participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~Mtm->disabledNodeMask & ~((nodemask_t)1 << (MtmNodeId-1));
1737+
ts->participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~((nodemask_t)1 << (MtmNodeId-1));
17381738
ts->nConfigChanges = Mtm->nConfigChanges;
17391739
ts->votedMask = 0;
17401740
strcpy(ts->gid, gid);
@@ -1973,10 +1973,12 @@ void MtmPollStatusOfPreparedTransactionsForDisabledNode(int disabledNodeId, bool
19731973
Assert(ts->gid[0]);
19741974
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
19751975
MTM_ELOG(LOG, "Abort transaction %s because its coordinator is disabled and it is not prepared at node %d", ts->gid, MtmNodeId);
1976+
TXFINISH("%s ABORT, PollStatusOfPrepared", ts->gid);
19761977
MtmFinishPreparedTransaction(ts, false);
19771978
} else {
19781979
if (commitPrecommited)
19791980
{
1981+
TXFINISH("%s COMMIT, PollStatusOfPrepared", ts->gid);
19801982
MtmFinishPreparedTransaction(ts, true);
19811983
}
19821984
else
@@ -2033,10 +2035,10 @@ MtmCheckSlots()
20332035
if (slot->in_use
20342036
&& sscanf(slot->data.name.data, MULTIMASTER_SLOT_PATTERN, &nodeId) == 1
20352037
&& BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)
2036-
&& slot->data.confirmed_flush + MtmMaxRecoveryLag < GetXLogInsertRecPtr()
2038+
&& slot->data.confirmed_flush + MtmMaxRecoveryLag * 1024 < GetXLogInsertRecPtr()
20372039
&& slot->data.confirmed_flush != 0)
20382040
{
2039-
MTM_ELOG(WARNING, "Drop slot for node %d which lag %lld is larger than threshold %d",
2041+
MTM_ELOG(WARNING, "Drop slot for node %d which lag %lld B is larger than threshold %d kB",
20402042
nodeId,
20412043
(long64)(GetXLogInsertRecPtr() - slot->data.restart_lsn),
20422044
MtmMaxRecoveryLag);
@@ -2102,7 +2104,7 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
21022104
if (MtmIsRecoveredNode(nodeId)) {
21032105
lsn_t walLSN = GetXLogInsertRecPtr();
21042106
if (!BIT_CHECK(Mtm->originLockNodeMask, nodeId-1)
2105-
&& slotLSN + MtmMinRecoveryLag > walLSN)
2107+
&& slotLSN + MtmMinRecoveryLag * 1024 > walLSN)
21062108
{
21072109
/*
21082110
* Wal sender almost caught up.
@@ -2860,14 +2862,14 @@ _PG_init(void)
28602862
);
28612863
DefineCustomIntVariable(
28622864
"multimaster.trans_spill_threshold",
2863-
"Maximal size (Mb) of transaction after which transaction is written to the disk",
2865+
"Maximal size of transaction after which transaction is written to the disk",
28642866
NULL,
28652867
&MtmTransSpillThreshold,
2866-
100, /* 100Mb */
2867-
0,
2868-
MaxAllocSize/MB,
2869-
PGC_BACKEND,
2868+
100 * 1024, /* 100Mb */
28702869
0,
2870+
MaxAllocSize/GUC_UNIT_KB,
2871+
PGC_SIGHUP,
2872+
GUC_UNIT_KB,
28712873
NULL,
28722874
NULL,
28732875
NULL
@@ -2894,11 +2896,11 @@ _PG_init(void)
28942896
"When wal-sender almost catch-up WAL current position we need to stop 'Achilles tortile competition' and "
28952897
"temporary stop commit of new transactions until node will be completely repared",
28962898
&MtmMinRecoveryLag,
2897-
100000,
2898-
1,
2899-
INT_MAX,
2900-
PGC_BACKEND,
2899+
10 * 1024, /* 10 MB */
29012900
0,
2901+
INT_MAX,
2902+
PGC_SIGHUP,
2903+
GUC_UNIT_KB,
29022904
NULL,
29032905
NULL,
29042906
NULL
@@ -2910,11 +2912,11 @@ _PG_init(void)
29102912
"Dropping slot makes it not possible to recover node using logical replication mechanism, it will be ncessary to completely copy content of some other nodes "
29112913
"using basebackup or similar tool. Zero value of parameter disable dropping slot.",
29122914
&MtmMaxRecoveryLag,
2913-
100000000,
2915+
1 * 1024 * 1024, /* 1 GB */
29142916
0,
29152917
INT_MAX,
2916-
PGC_BACKEND,
2917-
0,
2918+
PGC_SIGHUP,
2919+
GUC_UNIT_KB,
29182920
NULL,
29192921
NULL,
29202922
NULL
@@ -3309,6 +3311,7 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
33093311
StartTransactionCommand();
33103312
MtmBeginSession(nodeId);
33113313
MtmSetCurrentTransactionGID(gid);
3314+
TXFINISH("%s ABORT, MtmRollbackPrepared", gid);
33123315
FinishPreparedTransaction(gid, false);
33133316
MtmTx.isActive = true;
33143317
CommitTransactionCommand();
@@ -3814,7 +3817,7 @@ void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
38143817
*/
38153818
void MtmBeginSession(int nodeId)
38163819
{
3817-
MtmLockNode(nodeId, LW_EXCLUSIVE);
3820+
// MtmLockNode(nodeId, LW_EXCLUSIVE);
38183821
Assert(replorigin_session_origin == InvalidRepOriginId);
38193822
replorigin_session_origin = Mtm->nodes[nodeId-1].originId;
38203823
Assert(replorigin_session_origin != InvalidRepOriginId);
@@ -3834,9 +3837,9 @@ void MtmEndSession(int nodeId, bool unlock)
38343837
replorigin_session_origin_lsn = INVALID_LSN;
38353838
replorigin_session_origin_timestamp = 0;
38363839
replorigin_session_reset();
3837-
if (unlock) {
3838-
MtmUnlockNode(nodeId);
3839-
}
3840+
// if (unlock) {
3841+
// MtmUnlockNode(nodeId);
3842+
// }
38403843
MTM_LOG3("%d: End reset replorigin session: %d", MyProcPid, replorigin_session_origin);
38413844
}
38423845
}
@@ -4592,9 +4595,11 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
45924595
ts = (MtmTransState*) hash_search(MtmXid2State, &(x->xid), HASH_FIND, NULL);
45934596
Assert(ts);
45944597

4598+
TXFINISH("%s ABORT, MtmTwoPhase", x->gid);
45954599
FinishPreparedTransaction(x->gid, false);
45964600
MTM_ELOG(ERROR, "Transaction %s (%llu) is aborted on node %d. Check its log to see error details.", x->gid, (long64)x->xid, ts->abortedByNode);
45974601
} else {
4602+
TXFINISH("%s COMMIT, MtmTwoPhase", x->gid);
45984603
FinishPreparedTransaction(x->gid, true);
45994604
MTM_TXTRACE(x, "MtmTwoPhaseCommit Committed");
46004605
MTM_LOG2("Distributed transaction %s (%lld) is committed at %lld with LSN=%lld", x->gid, (long64)x->xid, MtmGetCurrentTime(), (long64)GetXLogInsertRecPtr());

contrib/mmts/multimaster.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@
4141
#define MTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4242
#endif
4343

44+
// #define MTM_TXFINISH 1
45+
46+
#ifndef MTM_TXFINISH
47+
#define TXFINISH(fmt, ...)
48+
#else
49+
#define TXFINISH(fmt, ...) elog(LOG, MTM_TAG "[TXFINISH] " fmt, ## __VA_ARGS__)
50+
#endif
51+
4452
// #define MTM_TRACE 1
4553

4654
#ifndef MTM_TRACE

contrib/mmts/pglogical_apply.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,7 @@ process_remote_commit(StringInfo in)
726726
MTM_LOG1("Perform delayed rollback of prepared global transaction %s", gid);
727727
StartTransactionCommand();
728728
MtmSetCurrentTransactionGID(gid);
729+
TXFINISH("%s ABORT, PGLOGICAL_PREPARE", gid);
729730
FinishPreparedTransaction(gid, false);
730731
CommitTransactionCommand();
731732
Assert(!MtmTransIsActive());
@@ -739,7 +740,7 @@ process_remote_commit(StringInfo in)
739740
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
740741
csn = pq_getmsgint64(in);
741742
/*
742-
* Since our recovery method allows undershoot of csn, we can receive
743+
* Since our recovery method allows undershoot of lsn, we can receive
743744
* some already committed transactions. And in case of donor node reboot
744745
* xid<->csn mapping for them will be lost. However we must filter such
745746
* transactions in walreceiver before this code. --sk
@@ -750,8 +751,12 @@ process_remote_commit(StringInfo in)
750751
MtmResetTransaction();
751752
StartTransactionCommand();
752753
MtmBeginSession(origin_node);
753-
MtmSetCurrentTransactionCSN(csn);
754+
if (csn == INVALID_CSN && Mtm->status == MTM_RECOVERY)
755+
MtmSetCurrentTransactionCSN(MtmAssignCSN());
756+
else
757+
MtmSetCurrentTransactionCSN(csn);
754758
MtmSetCurrentTransactionGID(gid);
759+
TXFINISH("%s COMMIT, PGLOGICAL_COMMIT_PREPARED csn=%lld", gid, csn);
755760
FinishPreparedTransaction(gid, true);
756761
MTM_LOG2("Distributed transaction %s is committed", gid);
757762
CommitTransactionCommand();

contrib/mmts/pglogical_receiver.c

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -366,20 +366,21 @@ pglogical_receiver_main(Datum main_arg)
366366
res = PQexec(conn, query->data);
367367
if (PQresultStatus(res) != PGRES_COPY_BOTH)
368368
{
369-
int i, n_deleted_slots = 0;
369+
// int i, n_deleted_slots = 0;
370370

371-
elog(WARNING, "Can't find slot on node%d. Shutting down receiver.", nodeId);
372-
Mtm->nodes[nodeId-1].slotDeleted = true;
373-
for (i = 0; i < Mtm->nAllNodes; i++)
374-
{
375-
if (Mtm->nodes[i].slotDeleted)
376-
n_deleted_slots++;
377-
}
378-
if (n_deleted_slots == Mtm->nAllNodes - 1)
379-
{
380-
elog(FATAL, "All neighbour nopes have no replication slot for us. Exiting.");
381-
}
382-
proc_exit(1);
371+
elog(WARNING, "Can't find slot on node%d. Shutting down receiver. %s", nodeId, PQresultErrorMessage(res));
372+
goto OnError;
373+
// Mtm->nodes[nodeId-1].slotDeleted = true;
374+
// for (i = 0; i < Mtm->nAllNodes; i++)
375+
// {
376+
// if (Mtm->nodes[i].slotDeleted)
377+
// n_deleted_slots++;
378+
// }
379+
// if (n_deleted_slots == Mtm->nAllNodes - 1)
380+
// {
381+
// elog(FATAL, "All neighbour nopes have no replication slot for us. Exiting.");
382+
// }
383+
// proc_exit(1);
383384
}
384385
PQclear(res);
385386
resetPQExpBuffer(query);
@@ -576,7 +577,8 @@ pglogical_receiver_main(Datum main_arg)
576577
elog(WARNING, "Commit of prepared transaction takes %lld usec, flags=%x", stop - start, stmt[1]);
577578
}
578579
} else {
579-
Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT); /* all other commits should be applied in place */
580+
/* all other commits should be applied in place */
581+
// Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT || stmt[1] == PGLOGICAL_PRECOMMIT_PREPARED);
580582
MtmExecute(buf.data, buf.used);
581583
}
582584
}

contrib/mmts/state.c

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,15 @@ MtmCheckState(void)
102102
// XXXX: should we restrict major with two nodes setup?
103103
|| (nConnected == Mtm->nAllNodes/2 && MtmMajorNode) /* or half + major node */
104104
|| (nConnected == Mtm->nAllNodes/2 && Mtm->refereeGrant) ) /* or half + referee */
105-
&& BIT_CHECK(Mtm->clique, MtmNodeId-1) /* in clique */
105+
&& (BIT_CHECK(Mtm->clique, MtmNodeId-1) || Mtm->refereeGrant) /* in clique when non-major */
106106
&& !BIT_CHECK(Mtm->stoppedNodeMask, MtmNodeId-1); /* is not stopped */
107107

108108
/* ANY -> MTM_DISABLED */
109109
if (!isEnabledState)
110110
{
111-
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
111+
// BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
112112
MtmSetClusterStatus(MTM_DISABLED);
113+
MtmDisableNode(MtmNodeId);
113114
return;
114115
}
115116

@@ -311,9 +312,9 @@ void MtmOnNodeDisconnect(int nodeId)
311312
* We should disable it, as clique detector will not necessarily
312313
* do that. For example it will anyway find clique with one node.
313314
*/
314-
MtmDisableNode(nodeId);
315315

316316
MtmLock(LW_EXCLUSIVE);
317+
MtmDisableNode(nodeId);
317318
BIT_SET(SELF_CONNECTIVITY_MASK, nodeId-1);
318319
BIT_SET(Mtm->reconnectMask, nodeId-1);
319320
Mtm->nConfigChanges += 1;
@@ -323,6 +324,7 @@ void MtmOnNodeDisconnect(int nodeId)
323324
// MtmRefreshClusterStatus();
324325
}
325326

327+
// XXXX: make that event too
326328
void MtmOnNodeConnect(int nodeId)
327329
{
328330
// if (!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId-1))
@@ -450,12 +452,6 @@ MtmRefreshClusterStatus()
450452
}
451453
}
452454

453-
/*
454-
* Do not check clique with referee grant, because we can disable ourself.
455-
*/
456-
if (Mtm->refereeGrant)
457-
return;
458-
459455
/*
460456
* Check for clique.
461457
*/
@@ -499,6 +495,18 @@ MtmRefreshClusterStatus()
499495

500496
Mtm->clique = newClique;
501497

498+
/*
499+
* Do not perform any action based on clique with referee grant,
500+
* because we can disable ourself.
501+
* But we also need to maintain actual clique not disable ourselves
502+
* when neighbour node will come back and we erase refereeGrant.
503+
*/
504+
if (Mtm->refereeGrant)
505+
{
506+
MtmUnlock();
507+
return;
508+
}
509+
502510
for (i = 0; i < Mtm->nAllNodes; i++)
503511
{
504512
bool old_status = BIT_CHECK(Mtm->disabledNodeMask, i);

0 commit comments

Comments
 (0)