Skip to content

Commit 992147c

Browse files
committed
Introduce COMMITTED state
1 parent 6ca55e7 commit 992147c

File tree

5 files changed

+42
-38
lines changed

5 files changed

+42
-38
lines changed

contrib/mmts/arbiter.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
243243
if (rc == 1) {
244244
while ((rc = send(sd, src, size, 0)) < 0 && errno == EINTR);
245245
if (rc < 0) {
246+
if (errno == EINPROGRESS) {
247+
continue;
248+
}
246249
return false;
247250
}
248251
size -= rc;
@@ -258,7 +261,7 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
258261
{
259262
int rc;
260263
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
261-
if (rc <= 0 && errno == EAGAIN) {
264+
if (rc <= 0 && (errno == EAGAIN || errno == EINPROGRESS)) {
262265
rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout);
263266
if (rc == 1) {
264267
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
@@ -328,6 +331,7 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
328331
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1)
329332
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
330333
&& Mtm->status != MTM_RECOVERY
334+
&& Mtm->status != MTM_RECOVERED
331335
&& Mtm->nodes[MtmNodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < MtmGetSystemTime())
332336
{
333337
elog(WARNING, "Node %d thinks that I am dead, while I am %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], MtmMessageKindMnem[resp->code]);

contrib/mmts/multimaster.c

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ char const* const MtmNodeStatusMnem[] =
201201
"Connected",
202202
"Online",
203203
"Recovery",
204+
"Recovered",
204205
"InMinor",
205206
"OutOfService"
206207
};
@@ -831,7 +832,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
831832
* Allow applying of replicated transactions to avoid deadlock (to caught-up we need active transaction counter to become zero).
832833
* Also allow user to complete explicit 2PC transactions.
833834
*/
834-
if (x->isDistributed && !x->isReplicated && x->isTwoPhase && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
835+
if (x->isDistributed && !x->isReplicated && !x->isTwoPhase && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
835836
MtmCheckClusterLock();
836837
}
837838

@@ -1872,7 +1873,7 @@ void MtmRecoveryCompleted(void)
18721873
BIT_SET(Mtm->nodeLockerMask, MtmNodeId-1); /* it is trik: this mask was originally use by WAL senders performing recovery, but here we are in opposite (recovered) side:
18731874
* if this mask is not zero loadReq will be broadcasted to all other nodes by heartbeat, suspending their activity
18741875
*/
1875-
MtmSwitchClusterMode(MTM_CONNECTED);
1876+
MtmSwitchClusterMode(MTM_RECOVERED);
18761877
}
18771878
MtmUnlock();
18781879
}
@@ -2029,35 +2030,17 @@ MtmCheckClusterLock()
20292030
timestamp_t delay = MIN_WAIT_TIMEOUT;
20302031
while (true)
20312032
{
2032-
nodemask_t mask = Mtm->walSenderLockerMask;
2033-
if (Mtm->globalLockerMask | mask) {
2034-
if (Mtm->nActiveTransactions == 0) {
2035-
lsn_t currLogPos = GetXLogInsertRecPtr();
2036-
int i;
2037-
for (i = 0; mask != 0; i++, mask >>= 1) {
2038-
if (mask & 1) {
2039-
if (WalSndCtl->walsnds[i].sentPtr != currLogPos) {
2040-
/* recovery is in progress */
2041-
break;
2042-
} else {
2043-
/* recovered replica caught up with master */
2044-
MTM_LOG1("WAL-sender %d complete recovery", i);
2045-
BIT_CLEAR(Mtm->walSenderLockerMask, i);
2046-
}
2047-
}
2048-
}
2033+
if (Mtm->globalLockerMask | Mtm->walSenderLockerMask) {
2034+
/* some "almost cautch-up" wal-senders are still working. */
2035+
/* Do not start new transactions until them are completed. */
2036+
MtmUnlock();
2037+
MtmSleep(delay);
2038+
if (delay*2 <= MAX_WAIT_TIMEOUT) {
2039+
delay *= 2;
20492040
}
2050-
if (Mtm->globalLockerMask | mask) {
2051-
/* some "almost cautch-up" wal-senders are still working. */
2052-
/* Do not start new transactions until them are completed. */
2053-
MtmUnlock();
2054-
MtmSleep(delay);
2055-
if (delay*2 <= MAX_WAIT_TIMEOUT) {
2056-
delay *= 2;
2057-
}
2058-
MtmLock(LW_EXCLUSIVE);
2059-
continue;
2060-
} else {
2041+
MtmLock(LW_EXCLUSIVE);
2042+
} else {
2043+
if (Mtm->nodeLockerMask != 0) {
20612044
/* All lockers have synchronized their logs */
20622045
/* Remove lock and mark them as recovered */
20632046
MTM_LOG1("Complete recovery of %d nodes (node mask %llx)", Mtm->nLockers, Mtm->nodeLockerMask);
@@ -2070,8 +2053,8 @@ MtmCheckClusterLock()
20702053
Mtm->nodeLockerMask = 0;
20712054
MtmCheckQuorum();
20722055
}
2056+
break;
20732057
}
2074-
break;
20752058
}
20762059
}
20772060

@@ -3229,7 +3212,9 @@ void MtmReceiverStarted(int nodeId)
32293212
MtmCheckQuorum();
32303213
}
32313214
elog(LOG, "Start %d receivers and %d senders from %d cluster status %s", Mtm->nReceivers+1, Mtm->nSenders, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
3232-
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
3215+
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1
3216+
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
3217+
{
32333218
BIT_CLEAR(Mtm->nodeLockerMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
32343219
MtmSwitchClusterMode(MTM_ONLINE);
32353220
}
@@ -3331,7 +3316,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33313316
Mtm->preparedTransactionsLoaded = true;
33323317
}
33333318

3334-
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3319+
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_RECOVERED && Mtm->status != MTM_ONLINE)
3320+
|| BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
33353321
{
33363322
if (*shutdown)
33373323
{
@@ -3540,7 +3526,9 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
35403526
if (!BIT_CHECK(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1)) {
35413527
elog(LOG, "Start %d senders and %d receivers from %d cluster status %s", Mtm->nSenders+1, Mtm->nReceivers, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
35423528
BIT_SET(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1);
3543-
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
3529+
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1
3530+
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
3531+
{
35443532
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
35453533
BIT_CLEAR(Mtm->nodeLockerMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
35463534
MtmSwitchClusterMode(MTM_ONLINE);

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ typedef enum
133133
MTM_CONNECTED, /* Arbiter is established connections with other nodes */
134134
MTM_ONLINE, /* Ready to receive client's queries */
135135
MTM_RECOVERY, /* Node is in recovery process */
136+
MTM_RECOVERED, /* Node is recovered by is not yet switched to ONLINE because not all sender/receivers are restarted */
136137
MTM_IN_MINORITY, /* Node is out of quorum */
137138
MTM_OUT_OF_SERVICE /* Node is not avaiable to to critical, non-recoverable error */
138139
} MtmNodeStatus;

contrib/mmts/pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ pglogical_receiver_main(Datum main_arg)
305305
timestamp_t start = MtmGetSystemTime();
306306
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
307307
res = PQexec(conn, query->data);
308-
elog(LOG, "Recreate replication slot %s: %ld millisconds", slotName, (long)USEC_TO_MSEC(MtmGetSystemTime() - start));
308+
elog(LOG, "Drop replication slot %s: %ld milliseconds", slotName, (long)USEC_TO_MSEC(MtmGetSystemTime() - start));
309309
PQclear(res);
310310
resetPQExpBuffer(query);
311311
timeline = Mtm->nodes[nodeId-1].timeline;

src/backend/replication/logical/logical.c

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,14 +269,19 @@ CreateInitDecodingContext(char *plugin,
269269
* protecting against vacuum.
270270
* ----
271271
*/
272+
elog(LOG, "CreateInitDecodingContext: try to obtain proc array lock");
272273
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
274+
elog(LOG, "CreateInitDecodingContext: grant proc array lock");
273275

274276
slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
275277
slot->data.catalog_xmin = slot->effective_catalog_xmin;
276278

279+
elog(LOG, "CreateInitDecodingContext: GetOldestSafeDecodingTransactionId");
277280
ReplicationSlotsComputeRequiredXmin(true);
281+
elog(LOG, "CreateInitDecodingContext: ReplicationSlotsComputeRequiredXmin");
278282

279283
LWLockRelease(ProcArrayLock);
284+
elog(LOG, "CreateInitDecodingContext: release proc array lock");
280285

281286
/*
282287
* tell the snapshot builder to only assemble snapshot once reaching the
@@ -414,11 +419,12 @@ void
414419
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
415420
{
416421
XLogRecPtr startptr;
422+
int n_records = 0;
417423

418424
/* Initialize from where to start reading WAL. */
419425
startptr = ctx->slot->data.restart_lsn;
420426

421-
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
427+
elog(LOG, "searching for logical decoding starting point, starting at %X/%X",
422428
(uint32) (ctx->slot->data.restart_lsn >> 32),
423429
(uint32) ctx->slot->data.restart_lsn);
424430

@@ -436,7 +442,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
436442
elog(ERROR, "no record found"); /* shouldn't happen */
437443

438444
startptr = InvalidXLogRecPtr;
439-
445+
n_records += 1;
440446
LogicalDecodingProcessRecord(ctx, ctx->reader);
441447

442448
/* only continue till we found a consistent spot */
@@ -446,6 +452,11 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
446452
CHECK_FOR_INTERRUPTS();
447453
}
448454

455+
elog(LOG, "Locate starting point at %X/%X after proceeding %d records",
456+
(uint32) (ctx->reader->EndRecPtr >> 32),
457+
(uint32) ctx->reader->EndRecPtr,
458+
n_records);
459+
449460
ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
450461
}
451462

0 commit comments

Comments
 (0)