Skip to content

Commit 05b6876

Browse files
committed
Do not drop slot on end of recovery and try to explicitly specify restart position for it
1 parent 597d46a commit 05b6876

File tree

5 files changed

+25
-5
lines changed

5 files changed

+25
-5
lines changed

contrib/mmts/multimaster.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1791,6 +1791,8 @@ static void MtmInitialize()
17911791
Mtm->nodes[i].con = MtmConnections[i];
17921792
Mtm->nodes[i].flushPos = 0;
17931793
Mtm->nodes[i].lastHeartbeat = 0;
1794+
Mtm->nodes[i].restartLsn = 0;
1795+
Mtm->nodes[i].originId = InvalidRepOriginId;
17941796
}
17951797
PGSemaphoreCreate(&Mtm->votingSemaphore);
17961798
PGSemaphoreReset(&Mtm->votingSemaphore);

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ typedef struct
146146
XLogRecPtr flushPos;
147147
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
148148
XLogRecPtr restartLsn;
149+
RepOriginId originId;
149150
} MtmNodeInfo;
150151

151152
typedef struct MtmTransState

contrib/mmts/pglogical_apply.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -507,11 +507,13 @@ MtmEndSession(bool unlock)
507507
static void
508508
process_remote_commit(StringInfo in)
509509
{
510+
int i;
510511
uint8 flags;
511512
csn_t csn;
512513
const char *gid = NULL;
513514
XLogRecPtr end_lsn;
514515
XLogRecPtr origin_lsn;
516+
RepOriginId originId;
515517
int n_records;
516518
/* read flags */
517519
flags = pq_getmsgbyte(in);
@@ -526,9 +528,21 @@ process_remote_commit(StringInfo in)
526528
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
527529
end_lsn = pq_getmsgint64(in); /* end_lsn */
528530
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
529-
origin_lsn = pq_getmsgint64(in);
530-
Mtm->nodes[MtmReplicationNodeId-1].restartLsn = origin_lsn;
531-
531+
532+
originId = (RepOriginId)pq_getmsgint(in, 2);
533+
origin_lsn = pq_getmsgint64(in);
534+
535+
if (originId != InvalidRepOriginId) {
536+
for (i = 0; i < Mtm->nAllNodes; i++) {
537+
if (Mtm->nodes[i].originId == originId) {
538+
Mtm->nodes[i].restartLsn = origin_lsn;
539+
break;
540+
}
541+
}
542+
if (i == Mtm->nAllNodes) {
543+
elog(WARNING, "Failed to map origin %d", originId);
544+
}
545+
}
532546
Assert(replorigin_session_origin == InvalidRepOriginId);
533547

534548
switch(PGLOGICAL_XACT_EVENT(flags))

contrib/mmts/pglogical_proto.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
183183
pq_sendint64(out, commit_lsn);
184184
pq_sendint64(out, txn->end_lsn);
185185
pq_sendint64(out, txn->commit_time);
186+
187+
pq_sendint(out, txn->origin_id, 2);
186188
pq_sendint64(out, txn->origin_lsn);
187189

188190
if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED) {

contrib/mmts/pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ pglogical_receiver_main(Datum main_arg)
272272
}
273273

274274
query = createPQExpBuffer();
275-
#if 1 /* Do we need to recreate slot ? */
275+
#if 0 /* Do we need to recreate slot ? */
276276
if (mode == REPLMODE_RECOVERED) { /* recreate slot */
277277
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
278278
res = PQexec(conn, query->data);
@@ -305,7 +305,7 @@ pglogical_receiver_main(Datum main_arg)
305305

306306
/* Start logical replication at specified position */
307307
if (mode == REPLMODE_RECOVERED) {
308-
originStartPos = Mtm->nodes[nodeId].restartLsn;
308+
originStartPos = Mtm->nodes[nodeId-1].restartLsn;
309309
}
310310
if (originStartPos == 0) {
311311
StartTransactionCommand();
@@ -325,6 +325,7 @@ pglogical_receiver_main(Datum main_arg)
325325
originStartPos = replorigin_get_progress(originId, false);
326326
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
327327
}
328+
Mtm->nodes[nodeId-1].originId = originId;
328329
CommitTransactionCommand();
329330
}
330331

0 commit comments

Comments
 (0)