Skip to content

Commit da5308c

Browse files
committed
Implement internal heartbeat for multimaster
1 parent 781c855 commit da5308c

File tree

3 files changed

+147
-52
lines changed

3 files changed

+147
-52
lines changed

contrib/mmts/arbiter.c

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "utils/array.h"
4545
#include "utils/builtins.h"
4646
#include "utils/memutils.h"
47+
#include "utils/timeout.h"
4748
#include "commands/dbcommands.h"
4849
#include "miscadmin.h"
4950
#include "postmaster/autovacuum.h"
@@ -101,22 +102,23 @@ typedef struct
101102

102103
static int* sockets;
103104
static int gateway;
105+
static bool send_heartbeat;
104106

105107
static void MtmTransSender(Datum arg);
106108
static void MtmTransReceiver(Datum arg);
107109

108-
/*
109-
* static char const* const messageText[] =
110-
* {
111-
* "INVALID",
112-
* "HANDSHAKE",
113-
* "READY",
114-
* "PREPARE",
115-
* "PREPARED",
116-
* "ABORTED",
117-
* "STATUS"
118-
*};
119-
*/
110+
111+
static char const* const messageText[] =
112+
{
113+
"INVALID",
114+
"HANDSHAKE",
115+
"READY",
116+
"PREPARE",
117+
"PREPARED",
118+
"ABORTED",
119+
"STATUS",
120+
"HEARTBEAT"
121+
};
120122

121123
static BackgroundWorker MtmSender = {
122124
"mtm-sender",
@@ -513,14 +515,19 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
513515
}
514516
buf->used = 0;
515517
}
516-
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
517-
messageText[ts->cmd], ts->csn, node+1, MtmNodeId, ts->gtid.xid, ts->xid);
518-
519-
Assert(ts->cmd != MSG_INVALID);
520-
buf->data[buf->used].code = ts->cmd;
521518
buf->data[buf->used].dxid = xid;
522-
buf->data[buf->used].sxid = ts->xid;
523-
buf->data[buf->used].csn = ts->csn;
519+
520+
if (ts != NULL) {
521+
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
522+
messageText[ts->cmd], ts->csn, node+1, MtmNodeId, ts->gtid.xid, ts->xid);
523+
Assert(ts->cmd != MSG_INVALID);
524+
buf->data[buf->used].code = ts->cmd;
525+
buf->data[buf->used].sxid = ts->xid;
526+
buf->data[buf->used].csn = ts->csn;
527+
} else {
528+
buf->data[buf->used].code = MSG_HEARTBEAT;
529+
MTM_LOG3("Send HEARTBEAT message to node %d from node %d\n", node+1, MtmNodeId);
530+
}
524531
buf->data[buf->used].node = MtmNodeId;
525532
buf->data[buf->used].disabledNodeMask = Mtm->disabledNodeMask;
526533
buf->data[buf->used].oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
@@ -533,15 +540,21 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
533540
int n = 1;
534541
for (i = 0; i < Mtm->nAllNodes; i++)
535542
{
536-
if (!BIT_CHECK(Mtm->disabledNodeMask, i) && TransactionIdIsValid(ts->xids[i])) {
543+
if (!BIT_CHECK(Mtm->disabledNodeMask, i) && (ts == NULL || TransactionIdIsValid(ts->xids[i]))) {
537544
Assert(i+1 != MtmNodeId);
538-
MtmAppendBuffer(txBuffer, ts->xids[i], i, ts);
545+
MtmAppendBuffer(txBuffer, ts ? ts->xids[i] : InvalidTransactionId, i, ts);
539546
n += 1;
540547
}
541548
}
542549
Assert(n == Mtm->nLiveNodes);
543550
}
544551

552+
static void MtmSendHeartbeat()
553+
{
554+
send_heartbeat = true;
555+
PGSemaphoreUnlock(&Mtm->votingSemaphore);
556+
}
557+
545558

546559
static void MtmTransSender(Datum arg)
547560
{
@@ -556,6 +569,8 @@ static void MtmTransSender(Datum arg)
556569
sigfillset(&sset);
557570
sigprocmask(SIG_UNBLOCK, &sset, NULL);
558571

572+
RegisterTimeout(USER_TIMEOUT, MtmSendHeartbeat);
573+
559574
MtmOpenConnections();
560575

561576
for (i = 0; i < nNodes; i++) {
@@ -567,6 +582,10 @@ static void MtmTransSender(Datum arg)
567582
PGSemaphoreLock(&Mtm->votingSemaphore);
568583
CHECK_FOR_INTERRUPTS();
569584

585+
if (send_heartbeat) {
586+
send_heartbeat = false;
587+
MtmBroadcastMessage(txBuffer, NULL);
588+
}
570589
/*
571590
* Use shared lock to improve locality,
572591
* because all other process modifying this list are using exclusive lock
@@ -700,15 +719,22 @@ static void MtmTransReceiver(Datum arg)
700719

701720
for (j = 0; j < nResponses; j++) {
702721
MtmArbiterMessage* msg = &rxBuffer[i].data[j];
703-
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &msg->dxid, HASH_FIND, NULL);
704-
Assert(ts != NULL);
722+
MtmTransState* ts;
723+
705724
Assert(msg->node > 0 && msg->node <= nNodes && msg->node != MtmNodeId);
725+
Mtm->nodes[msg->node-1].oldestSnapshot = msg->oldestSnapshot;
726+
Mtm->nodes[msg->node-1].lastHeartbeat = MtmGetSystemTime();
727+
728+
if (msg->code == MSG_HEARTBEAT) {
729+
continue;
730+
}
731+
ts = (MtmTransState*)hash_search(MtmXid2State, &msg->dxid, HASH_FIND, NULL);
732+
Assert(ts != NULL);
706733

707734
if (BIT_CHECK(msg->disabledNodeMask, MtmNodeId-1) && Mtm->status != MTM_RECOVERY) {
708735
elog(PANIC, "Node %d thinks that I was dead: perform hara-kiri not to be a zombie", msg->node);
709736
}
710-
Mtm->nodes[msg->node-1].oldestSnapshot = msg->oldestSnapshot;
711-
737+
712738
if (MtmIsCoordinator(ts)) {
713739
switch (msg->code) {
714740
case MSG_READY:
@@ -768,7 +794,7 @@ static void MtmTransReceiver(Datum arg)
768794
} else {
769795
switch (msg->code) {
770796
case MSG_PREPARE:
771-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
797+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
772798
ts->status = TRANSACTION_STATUS_UNKNOWN;
773799
ts->csn = MtmAssignCSN();
774800
MtmAdjustSubtransactions(ts);

contrib/mmts/multimaster.c

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ int MtmReconnectAttempts;
192192
int MtmNodeDisableDelay;
193193
int MtmTransSpillThreshold;
194194
int MtmMaxNodes;
195+
int MtmHeartbeatSendTimeout;
196+
int MtmHeartbeatRecvTimeout;
195197
bool MtmUseRaftable;
196198
bool MtmUseDtm;
197199

@@ -742,6 +744,27 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
742744

743745
}
744746

747+
/*
748+
* Check heartbeats
749+
*/
750+
static void MtmWatchdog()
751+
{
752+
int i, n = Mtm->nAllNodes;
753+
timestamp_t now = MtmGetSystemTime();
754+
for (i = 0; i < n; i++) {
755+
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
756+
if (Mtm->nodes[i].lastHeartbeat != 0
757+
&& now > Mtm->nodes[i].lastHeartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout))
758+
{
759+
elog(WARNING, "Disable node %d because last heartbeat was received %d msec ago",
760+
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
761+
MtmOnNodeDisconnect(i+1);
762+
}
763+
}
764+
}
765+
}
766+
767+
745768
static void
746769
MtmPostPrepareTransaction(MtmCurrentTrans* x)
747770
{
@@ -771,14 +794,24 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
771794
MtmUnlock();
772795
MtmResetTransaction(x);
773796
} else {
774-
time_t timeout = Max(Mtm2PCMinTimeout, (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100000); /* usec->msec and percents */
797+
time_t transTimeout = Max(Mtm2PCMinTimeout, (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100000); /* usec->msec and percents */
798+
time_t timeout = transTimeout < MtmHeartbeatRecvTimeout ? transTimeout : MtmHeartbeatRecvTimeout;
799+
timestamp_t deadline = MtmGetSystemTime() + MSEC_TO_USEC(transTimeout);
775800
int result = 0;
776801
int nConfigChanges = Mtm->nConfigChanges;
777802
/* wait votes from all nodes */
778-
while (!ts->votingCompleted && !(result & WL_TIMEOUT)) {
803+
while (!ts->votingCompleted) {
779804
MtmUnlock();
805+
MtmWatchdog();
780806
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, timeout);
781-
ResetLatch(&MyProc->procLatch);
807+
if (result & WL_TIMEOUT) {
808+
if (MtmGetSystemTime() > deadline) {
809+
MtmLock(LW_SHARED);
810+
break;
811+
}
812+
} else {
813+
ResetLatch(&MyProc->procLatch);
814+
}
782815
MtmLock(LW_SHARED);
783816
}
784817
if (!ts->votingCompleted) {
@@ -1023,6 +1056,22 @@ void MtmHandleApplyError(void)
10231056
}
10241057

10251058

1059+
static void MtmDisableNode(int nodeId)
1060+
{
1061+
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1062+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1063+
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
1064+
Mtm->nLiveNodes -= 1;
1065+
}
1066+
1067+
static void MtmEnableNode(int nodeId)
1068+
{
1069+
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1070+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1071+
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
1072+
Mtm->nLiveNodes += 1;
1073+
}
1074+
10261075
void MtmRecoveryCompleted(void)
10271076
{
10281077
MTM_LOG1("Recovery of node %d is completed", MtmNodeId);
@@ -1117,9 +1166,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11171166
MTM_LOG1("%d: node %d is caugth-up without locking cluster", MyProcPid, nodeId);
11181167
/* We are lucky: caugth-up without locking cluster! */
11191168
}
1120-
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1121-
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1122-
Mtm->nLiveNodes += 1;
1169+
MtmEnableNode(nodeId);
11231170
Mtm->nConfigChanges += 1;
11241171
caughtUp = true;
11251172
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
@@ -1262,17 +1309,13 @@ bool MtmRefreshClusterStatus(bool nowait)
12621309
mask = ~clique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
12631310
for (i = 0; mask != 0; i++, mask >>= 1) {
12641311
if (mask & 1) {
1265-
Mtm->nLiveNodes -= 1;
1266-
BIT_SET(Mtm->disabledNodeMask, i);
1267-
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
1312+
MtmDisableNode(i+1);
12681313
}
12691314
}
12701315
mask = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
12711316
for (i = 0; mask != 0; i++, mask >>= 1) {
12721317
if (mask & 1) {
1273-
Mtm->nLiveNodes += 1;
1274-
BIT_CLEAR(Mtm->disabledNodeMask, i);
1275-
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
1318+
MtmEnableNode(i+1);
12761319
}
12771320
}
12781321
MtmCheckQuorum();
@@ -1317,7 +1360,6 @@ void MtmOnNodeDisconnect(int nodeId)
13171360
/* Avoid false detection of node failure and prevent node status blinking */
13181361
return;
13191362
}
1320-
13211363
BIT_SET(Mtm->connectivityMask, nodeId-1);
13221364
BIT_SET(Mtm->reconnectMask, nodeId-1);
13231365
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
@@ -1328,9 +1370,7 @@ void MtmOnNodeDisconnect(int nodeId)
13281370
if (!MtmRefreshClusterStatus(false)) {
13291371
MtmLock(LW_EXCLUSIVE);
13301372
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
1331-
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1332-
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1333-
Mtm->nLiveNodes -= 1;
1373+
MtmDisableNode(nodeId);
13341374
MtmCheckQuorum();
13351375
/* Interrupt voting for active transaction and abort them */
13361376
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
@@ -1504,6 +1544,7 @@ static void MtmInitialize()
15041544
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
15051545
Mtm->nodes[i].con = MtmConnections[i];
15061546
Mtm->nodes[i].flushPos = 0;
1547+
Mtm->nodes[i].lastHeartbeat = 0;
15071548
}
15081549
PGSemaphoreCreate(&Mtm->votingSemaphore);
15091550
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -1628,6 +1669,36 @@ _PG_init(void)
16281669
if (!process_shared_preload_libraries_in_progress)
16291670
return;
16301671

1672+
DefineCustomIntVariable(
1673+
"multimaster.heartbeat_send_timeout",
1674+
"Timeout in milliseconds of sending heartbeat messages",
1675+
"Period of broadcasting heartbeat messages by abiter to all nodes",
1676+
&MtmHeartbeatSendTimeout,
1677+
1000,
1678+
1,
1679+
INT_MAX,
1680+
PGC_BACKEND,
1681+
0,
1682+
NULL,
1683+
NULL,
1684+
NULL
1685+
);
1686+
1687+
DefineCustomIntVariable(
1688+
"multimaster.heartbeat_recv_timeout",
1689+
"Timeout in milliseconds of receiving heartbeat messages",
1690+
"If no heartbeat message is received from node within this period, it assumed to be dead",
1691+
&MtmHeartbeatRecvTimeout,
1692+
2000,
1693+
1,
1694+
INT_MAX,
1695+
PGC_BACKEND,
1696+
0,
1697+
NULL,
1698+
NULL,
1699+
NULL
1700+
);
1701+
16311702
DefineCustomIntVariable(
16321703
"multimaster.gc_period",
16331704
"Number of distributed transactions after which garbage collection is started",
@@ -2057,9 +2128,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20572128
{
20582129
elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nLiveNodes);
20592130
}
2060-
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
2061-
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
2062-
Mtm->nLiveNodes -= 1;
2131+
MtmDisableNode(nodeId);
20632132
MtmCheckQuorum();
20642133
if (!MtmIsBroadcast())
20652134
{
@@ -2111,17 +2180,13 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
21112180
if (MtmIsRecoverySession) {
21122181
MTM_LOG1("%d: Node %d start recovery of node %d", MyProcPid, MtmNodeId, MtmReplicationNodeId);
21132182
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
2114-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
2115-
BIT_SET(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
2116-
Mtm->nLiveNodes -= 1;
2183+
MtmDisableNode(MtmReplicationNodeId);
21172184
MtmCheckQuorum();
21182185
}
21192186
} else if (BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
21202187
if (recoveryCompleted) {
21212188
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication", MtmNodeId, MtmReplicationNodeId);
2122-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
2123-
BIT_CLEAR(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
2124-
Mtm->nLiveNodes += 1;
2189+
MtmEnableNode(MtmReplicationNodeId);
21252190
MtmCheckQuorum();
21262191
} else {
21272192
elog(ERROR, "Disabled node %d tries to reconnect without recovery", MtmReplicationNodeId);

contrib/mmts/multimaster.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ typedef enum
9292
MSG_PREPARE,
9393
MSG_PREPARED,
9494
MSG_ABORTED,
95-
MSG_STATUS
95+
MSG_STATUS,
96+
MSG_HEARTBEAT
9697
} MtmMessageCode;
9798

9899
typedef enum
@@ -127,6 +128,7 @@ typedef struct
127128
timestamp_t lastStatusChangeTime;
128129
timestamp_t receiverStartTime;
129130
timestamp_t senderStartTime;
131+
timestamp_t lastHeartbeat;
130132
int senderPid;
131133
int receiverPid;
132134
XLogRecPtr flushPos;
@@ -218,6 +220,8 @@ extern int MtmReconnectAttempts;
218220
extern int MtmKeepaliveTimeout;
219221
extern int MtmNodeDisableDelay;
220222
extern int MtmTransSpillThreshold;
223+
extern int MtmHeartbeatSendTimeout;
224+
extern int MtmHeartbeatRecvTimeout;
221225
extern bool MtmUseDtm;
222226
extern HTAB* MtmXid2State;
223227

0 commit comments

Comments
 (0)