Skip to content

Commit f832b50

Browse files
author
Amit Kapila
committed
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long time while processing large transactions during logical replication where we don't send any data of such transactions. This can happen when the table modified in the transaction is not published or because all the changes got filtered. We do try to send the keep_alive if necessary at the end of the transaction (via WalSndWriteData()) but by that time the subscriber-side can timeout and exit. To fix this we try to send the keepalive message if required after processing certain threshold of changes. Reported-by: Fabrice Chapuis Author: Wang wei and Amit Kapila Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda Backpatch-through: 10 Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
1 parent de6eec1 commit f832b50

File tree

4 files changed

+85
-6
lines changed

4 files changed

+85
-6
lines changed

src/backend/replication/logical/logical.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
624624

625625
/* set output state */
626626
ctx->accept_writes = false;
627+
ctx->end_xact = false;
627628

628629
/* do the actual work: call callback */
629630
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -651,6 +652,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
651652

652653
/* set output state */
653654
ctx->accept_writes = false;
655+
ctx->end_xact = false;
654656

655657
/* do the actual work: call callback */
656658
ctx->callbacks.shutdown_cb(ctx);
@@ -686,6 +688,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
686688
ctx->accept_writes = true;
687689
ctx->write_xid = txn->xid;
688690
ctx->write_location = txn->first_lsn;
691+
ctx->end_xact = false;
689692

690693
/* do the actual work: call callback */
691694
ctx->callbacks.begin_cb(ctx, txn);
@@ -717,6 +720,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
717720
ctx->accept_writes = true;
718721
ctx->write_xid = txn->xid;
719722
ctx->write_location = txn->end_lsn; /* points to the end of the record */
723+
ctx->end_xact = true;
720724

721725
/* do the actual work: call callback */
722726
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -756,6 +760,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
756760
*/
757761
ctx->write_location = change->lsn;
758762

763+
ctx->end_xact = false;
764+
759765
ctx->callbacks.change_cb(ctx, txn, relation, change);
760766

761767
/* Pop the error context stack */
@@ -796,6 +802,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
796802
*/
797803
ctx->write_location = change->lsn;
798804

805+
ctx->end_xact = false;
806+
799807
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
800808

801809
/* Pop the error context stack */
@@ -822,6 +830,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
822830

823831
/* set output state */
824832
ctx->accept_writes = false;
833+
ctx->end_xact = false;
825834

826835
/* do the actual work: call callback */
827836
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -859,6 +868,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
859868
ctx->accept_writes = true;
860869
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
861870
ctx->write_location = message_lsn;
871+
ctx->end_xact = false;
862872

863873
/* do the actual work: call callback */
864874
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ static bool publications_valid;
5050
static List *LoadPublications(List *pubnames);
5151
static void publication_invalidation_cb(Datum arg, int cacheid,
5252
uint32 hashvalue);
53+
static void update_replication_progress(LogicalDecodingContext *ctx);
5354

5455
/* Entry in the map used to remember which relation schemas we sent. */
5556
typedef struct RelationSyncEntry
@@ -247,7 +248,7 @@ static void
247248
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
248249
XLogRecPtr commit_lsn)
249250
{
250-
OutputPluginUpdateProgress(ctx);
251+
update_replication_progress(ctx);
251252

252253
OutputPluginPrepareWrite(ctx, true);
253254
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -309,6 +310,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
309310
MemoryContext old;
310311
RelationSyncEntry *relentry;
311312

313+
update_replication_progress(ctx);
314+
312315
if (!is_publishable_relation(relation))
313316
return;
314317

@@ -389,6 +392,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
389392
int nrelids;
390393
Oid *relids;
391394

395+
update_replication_progress(ctx);
396+
392397
old = MemoryContextSwitchTo(data->context);
393398

394399
relids = palloc0(nrelations * sizeof(Oid));
@@ -660,3 +665,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
660665
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
661666
entry->replicate_valid = false;
662667
}
668+
669+
/*
670+
* Try to update progress and send a keepalive message if too many changes were
671+
* processed.
672+
*
673+
* For a large transaction, if we don't send any change to the downstream for a
674+
* long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
675+
* This can happen when all or most of the changes are not published.
676+
*/
677+
static void
678+
update_replication_progress(LogicalDecodingContext *ctx)
679+
{
680+
static int changes_count = 0;
681+
682+
/*
683+
* We don't want to try sending a keepalive message after processing each
684+
* change as that can have overhead. Tests revealed that there is no
685+
* noticeable overhead in doing it after continuously processing 100 or so
686+
* changes.
687+
*/
688+
#define CHANGES_THRESHOLD 100
689+
690+
/*
691+
* If we are at the end of transaction LSN, update progress tracking.
692+
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
693+
* try to send a keepalive message if required.
694+
*/
695+
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
696+
{
697+
OutputPluginUpdateProgress(ctx);
698+
changes_count = 0;
699+
}
700+
}

src/backend/replication/walsender.c

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ static void ProcessStandbyMessage(void);
244244
static void ProcessStandbyReplyMessage(void);
245245
static void ProcessStandbyHSFeedbackMessage(void);
246246
static void ProcessRepliesIfAny(void);
247+
static void ProcessPendingWrites(void);
247248
static void WalSndKeepalive(bool requestReply);
248249
static void WalSndKeepaliveIfNecessary(void);
249250
static void WalSndCheckTimeOut(void);
@@ -1214,6 +1215,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12141215
}
12151216

12161217
/* If we have pending write here, go to slow path */
1218+
ProcessPendingWrites();
1219+
}
1220+
1221+
/*
1222+
* Wait until there is no pending write. Also process replies from the other
1223+
* side and check timeouts during that.
1224+
*/
1225+
static void
1226+
ProcessPendingWrites(void)
1227+
{
12171228
for (;;)
12181229
{
12191230
int wakeEvents;
@@ -1273,18 +1284,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
12731284
{
12741285
static TimestampTz sendTime = 0;
12751286
TimestampTz now = GetCurrentTimestamp();
1287+
bool end_xact = ctx->end_xact;
12761288

12771289
/*
12781290
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
12791291
* avoid flooding the lag tracker when we commit frequently.
1292+
*
1293+
* We don't have a mechanism to get the ack for any LSN other than end
1294+
* xact LSN from the downstream. So, we track lag only for end of
1295+
* transaction LSN.
12801296
*/
12811297
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1282-
if (!TimestampDifferenceExceeds(sendTime, now,
1283-
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1284-
return;
1298+
if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1299+
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1300+
{
1301+
LagTrackerWrite(lsn, now);
1302+
sendTime = now;
1303+
}
12851304

1286-
LagTrackerWrite(lsn, now);
1287-
sendTime = now;
1305+
/*
1306+
* Try to send a keepalive if required. We don't need to try sending keep
1307+
* alive messages at the transaction end as that will be done at a later
1308+
* point in time. This is required only for large transactions where we
1309+
* don't send any changes to the downstream and the receiver can timeout
1310+
* due to that.
1311+
*/
1312+
if (!end_xact &&
1313+
now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
1314+
wal_sender_timeout / 2))
1315+
ProcessPendingWrites();
12881316
}
12891317

12901318
/*

src/include/replication/logical.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ typedef struct LogicalDecodingContext
5050
*/
5151
bool fast_forward;
5252

53+
/* Are we processing the end LSN of a transaction? */
54+
bool end_xact;
55+
5356
OutputPluginCallbacks callbacks;
5457
OutputPluginOptions options;
5558

0 commit comments

Comments
 (0)