Skip to content

Commit 463c1bc

Browse files
committed
handle exit of wal-sender
1 parent 56aaa09 commit 463c1bc

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

contrib/mmts/multimaster.c

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -865,10 +865,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
865865

866866
void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
867867
{
868-
MtmLock(LW_EXCLUSIVE);
869-
MtmSyncClock(globalSnapshot);
870-
MtmUnlock();
871-
868+
if (globalSnapshot != INVALID_CSN) {
869+
MtmLock(LW_EXCLUSIVE);
870+
MtmSyncClock(globalSnapshot);
871+
MtmUnlock();
872+
} else {
873+
globalSnapshot = MtmTx.snapshot;
874+
}
872875
if (!TransactionIdIsValid(gtid->xid)) {
873876
/* In case of recovery InvalidTransactionId is passed */
874877
Assert(Mtm->status == MTM_RECOVERY);
@@ -1877,6 +1880,14 @@ void MtmDropNode(int nodeId, bool dropSlot)
18771880
}
18781881
}
18791882
}
1883+
static void
1884+
MtmOnProcExit(int code, Datum arg)
1885+
{
1886+
if (MtmReplicationNodeId >= 0) {
1887+
elog(WARNING, "WAL-sender to %d is terminated", MtmReplicationNodeId);
1888+
MtmOnNodeDisconnect(MtmReplicationNodeId);
1889+
}
1890+
}
18801891

18811892
static void
18821893
MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
@@ -1923,13 +1934,17 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
19231934
elog(NOTICE, "Node %d start logical replication to node %d in normal mode", MtmNodeId, MtmReplicationNodeId);
19241935
}
19251936
MtmUnlock();
1937+
on_proc_exit(MtmOnProcExit, 0);
19261938
}
19271939

19281940
static void
19291941
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
19301942
{
1931-
elog(WARNING, "Logical replication to node %d is stopped", MtmReplicationNodeId);
1932-
MtmOnNodeDisconnect(MtmReplicationNodeId);
1943+
if (MtmReplicationNodeId >= 0) {
1944+
elog(WARNING, "Logical replication to node %d is stopped", MtmReplicationNodeId);
1945+
MtmOnNodeDisconnect(MtmReplicationNodeId);
1946+
MtmReplicationNodeId = -1; /* defuse on_proc_exit hook */
1947+
}
19331948
}
19341949

19351950
static bool
@@ -2167,7 +2182,8 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
21672182
return ret;
21682183
}
21692184

2170-
void MtmNoticeReceiver(void *i, const PGresult *res)
2185+
static void
2186+
MtmNoticeReceiver(void *i, const PGresult *res)
21712187
{
21722188
char *notice = PQresultErrorMessage(res);
21732189
char *stripped_notice;

src/backend/replication/logical/decode.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,9 +541,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
541541
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
542542
FilterByOrigin(ctx, origin_id))
543543
{
544-
elog(WARNING, "%d: WAL-SENDER ignore record %lx with origin %d: SnapBuildXactNeedsSkip=%d, FilterByOrigin=%d",
545-
getpid(), buf->origptr, origin_id,
546-
SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr), FilterByOrigin(ctx, origin_id));
547544
for (i = 0; i < parsed->nsubxacts; i++)
548545
{
549546
ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);

0 commit comments

Comments
 (0)