Skip to content

Commit f95d53e

Browse files
author
Amit Kapila
committed
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long time while processing large transactions during logical replication where we don't send any data of such transactions. This can happen when the table modified in the transaction is not published or because all the changes got filtered. We do try to send the keep_alive if necessary at the end of the transaction (via WalSndWriteData()) but by that time the subscriber-side can timeout and exit. To fix this we try to send the keepalive message if required after processing certain threshold of changes. Reported-by: Fabrice Chapuis Author: Wang wei and Amit Kapila Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda Backpatch-through: 10 Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
1 parent 8bbf846 commit f95d53e

File tree

4 files changed

+98
-9
lines changed

4 files changed

+98
-9
lines changed

src/backend/replication/logical/logical.c

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
746746

747747
/* set output state */
748748
ctx->accept_writes = false;
749+
ctx->end_xact = false;
749750

750751
/* do the actual work: call callback */
751752
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -773,6 +774,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
773774

774775
/* set output state */
775776
ctx->accept_writes = false;
777+
ctx->end_xact = false;
776778

777779
/* do the actual work: call callback */
778780
ctx->callbacks.shutdown_cb(ctx);
@@ -808,6 +810,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
808810
ctx->accept_writes = true;
809811
ctx->write_xid = txn->xid;
810812
ctx->write_location = txn->first_lsn;
813+
ctx->end_xact = false;
811814

812815
/* do the actual work: call callback */
813816
ctx->callbacks.begin_cb(ctx, txn);
@@ -839,6 +842,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
839842
ctx->accept_writes = true;
840843
ctx->write_xid = txn->xid;
841844
ctx->write_location = txn->end_lsn; /* points to the end of the record */
845+
ctx->end_xact = true;
842846

843847
/* do the actual work: call callback */
844848
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -879,6 +883,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
879883
ctx->accept_writes = true;
880884
ctx->write_xid = txn->xid;
881885
ctx->write_location = txn->first_lsn;
886+
ctx->end_xact = false;
882887

883888
/*
884889
* If the plugin supports two-phase commits then begin prepare callback is
@@ -923,6 +928,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
923928
ctx->accept_writes = true;
924929
ctx->write_xid = txn->xid;
925930
ctx->write_location = txn->end_lsn; /* points to the end of the record */
931+
ctx->end_xact = true;
926932

927933
/*
928934
* If the plugin supports two-phase commits then prepare callback is
@@ -967,6 +973,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
967973
ctx->accept_writes = true;
968974
ctx->write_xid = txn->xid;
969975
ctx->write_location = txn->end_lsn; /* points to the end of the record */
976+
ctx->end_xact = true;
970977

971978
/*
972979
* If the plugin support two-phase commits then commit prepared callback
@@ -1012,6 +1019,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10121019
ctx->accept_writes = true;
10131020
ctx->write_xid = txn->xid;
10141021
ctx->write_location = txn->end_lsn; /* points to the end of the record */
1022+
ctx->end_xact = true;
10151023

10161024
/*
10171025
* If the plugin support two-phase commits then rollback prepared callback
@@ -1062,6 +1070,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
10621070
*/
10631071
ctx->write_location = change->lsn;
10641072

1073+
ctx->end_xact = false;
1074+
10651075
ctx->callbacks.change_cb(ctx, txn, relation, change);
10661076

10671077
/* Pop the error context stack */
@@ -1102,6 +1112,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
11021112
*/
11031113
ctx->write_location = change->lsn;
11041114

1115+
ctx->end_xact = false;
1116+
11051117
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
11061118

11071119
/* Pop the error context stack */
@@ -1129,6 +1141,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
11291141

11301142
/* set output state */
11311143
ctx->accept_writes = false;
1144+
ctx->end_xact = false;
11321145

11331146
/* do the actual work: call callback */
11341147
ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
@@ -1159,6 +1172,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
11591172

11601173
/* set output state */
11611174
ctx->accept_writes = false;
1175+
ctx->end_xact = false;
11621176

11631177
/* do the actual work: call callback */
11641178
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -1196,6 +1210,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
11961210
ctx->accept_writes = true;
11971211
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
11981212
ctx->write_location = message_lsn;
1213+
ctx->end_xact = false;
11991214

12001215
/* do the actual work: call callback */
12011216
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1239,6 +1254,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
12391254
*/
12401255
ctx->write_location = first_lsn;
12411256

1257+
ctx->end_xact = false;
1258+
12421259
/* in streaming mode, stream_start_cb is required */
12431260
if (ctx->callbacks.stream_start_cb == NULL)
12441261
ereport(ERROR,
@@ -1286,6 +1303,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
12861303
*/
12871304
ctx->write_location = last_lsn;
12881305

1306+
ctx->end_xact = false;
1307+
12891308
/* in streaming mode, stream_stop_cb is required */
12901309
if (ctx->callbacks.stream_stop_cb == NULL)
12911310
ereport(ERROR,
@@ -1325,6 +1344,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
13251344
ctx->accept_writes = true;
13261345
ctx->write_xid = txn->xid;
13271346
ctx->write_location = abort_lsn;
1347+
ctx->end_xact = true;
13281348

13291349
/* in streaming mode, stream_abort_cb is required */
13301350
if (ctx->callbacks.stream_abort_cb == NULL)
@@ -1369,6 +1389,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
13691389
ctx->accept_writes = true;
13701390
ctx->write_xid = txn->xid;
13711391
ctx->write_location = txn->end_lsn;
1392+
ctx->end_xact = true;
13721393

13731394
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
13741395
if (ctx->callbacks.stream_prepare_cb == NULL)
@@ -1409,6 +1430,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
14091430
ctx->accept_writes = true;
14101431
ctx->write_xid = txn->xid;
14111432
ctx->write_location = txn->end_lsn;
1433+
ctx->end_xact = true;
14121434

14131435
/* in streaming mode, stream_commit_cb is required */
14141436
if (ctx->callbacks.stream_commit_cb == NULL)
@@ -1457,6 +1479,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
14571479
*/
14581480
ctx->write_location = change->lsn;
14591481

1482+
ctx->end_xact = false;
1483+
14601484
/* in streaming mode, stream_change_cb is required */
14611485
if (ctx->callbacks.stream_change_cb == NULL)
14621486
ereport(ERROR,
@@ -1501,6 +1525,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
15011525
ctx->accept_writes = true;
15021526
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
15031527
ctx->write_location = message_lsn;
1528+
ctx->end_xact = false;
15041529

15051530
/* do the actual work: call callback */
15061531
ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1549,6 +1574,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
15491574
*/
15501575
ctx->write_location = change->lsn;
15511576

1577+
ctx->end_xact = false;
1578+
15521579
ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
15531580

15541581
/* Pop the error context stack */

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
9191
static void send_repl_origin(LogicalDecodingContext *ctx,
9292
RepOriginId origin_id, XLogRecPtr origin_lsn,
9393
bool send_origin);
94+
static void update_replication_progress(LogicalDecodingContext *ctx,
95+
bool skipped_xact);
9496

9597
/*
9698
* Only 3 publication actions are used for row filtering ("insert", "update",
@@ -558,7 +560,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
558560
* from this transaction has been sent to the downstream.
559561
*/
560562
sent_begin_txn = txndata->sent_begin_txn;
561-
OutputPluginUpdateProgress(ctx, !sent_begin_txn);
563+
update_replication_progress(ctx, !sent_begin_txn);
562564
pfree(txndata);
563565
txn->output_plugin_private = NULL;
564566

@@ -597,7 +599,7 @@ static void
597599
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
598600
XLogRecPtr prepare_lsn)
599601
{
600-
OutputPluginUpdateProgress(ctx, false);
602+
update_replication_progress(ctx, false);
601603

602604
OutputPluginPrepareWrite(ctx, true);
603605
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -611,7 +613,7 @@ static void
611613
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
612614
XLogRecPtr commit_lsn)
613615
{
614-
OutputPluginUpdateProgress(ctx, false);
616+
update_replication_progress(ctx, false);
615617

616618
OutputPluginPrepareWrite(ctx, true);
617619
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -627,7 +629,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
627629
XLogRecPtr prepare_end_lsn,
628630
TimestampTz prepare_time)
629631
{
630-
OutputPluginUpdateProgress(ctx, false);
632+
update_replication_progress(ctx, false);
631633

632634
OutputPluginPrepareWrite(ctx, true);
633635
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1360,6 +1362,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
13601362
TupleTableSlot *old_slot = NULL;
13611363
TupleTableSlot *new_slot = NULL;
13621364

1365+
update_replication_progress(ctx, false);
1366+
13631367
if (!is_publishable_relation(relation))
13641368
return;
13651369

@@ -1592,6 +1596,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15921596
Oid *relids;
15931597
TransactionId xid = InvalidTransactionId;
15941598

1599+
update_replication_progress(ctx, false);
1600+
15951601
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
15961602
if (in_streaming)
15971603
xid = change->txn->xid;
@@ -1655,6 +1661,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
16551661
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
16561662
TransactionId xid = InvalidTransactionId;
16571663

1664+
update_replication_progress(ctx, false);
1665+
16581666
if (!data->messages)
16591667
return;
16601668

@@ -1847,7 +1855,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
18471855
Assert(!in_streaming);
18481856
Assert(rbtxn_is_streamed(txn));
18491857

1850-
OutputPluginUpdateProgress(ctx, false);
1858+
update_replication_progress(ctx, false);
18511859

18521860
OutputPluginPrepareWrite(ctx, true);
18531861
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1868,7 +1876,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
18681876
{
18691877
Assert(rbtxn_is_streamed(txn));
18701878

1871-
OutputPluginUpdateProgress(ctx, false);
1879+
update_replication_progress(ctx, false);
18721880
OutputPluginPrepareWrite(ctx, true);
18731881
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
18741882
OutputPluginWrite(ctx, true);
@@ -2361,3 +2369,37 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
23612369
}
23622370
}
23632371
}
2372+
2373+
/*
2374+
* Try to update progress and send a keepalive message if too many changes were
2375+
* processed.
2376+
*
2377+
* For a large transaction, if we don't send any change to the downstream for a
2378+
* long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
2379+
* This can happen when all or most of the changes are either not published or
2380+
* got filtered out.
2381+
*/
2382+
static void
2383+
update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
2384+
{
2385+
static int changes_count = 0;
2386+
2387+
/*
2388+
* We don't want to try sending a keepalive message after processing each
2389+
* change as that can have overhead. Tests revealed that there is no
2390+
* noticeable overhead in doing it after continuously processing 100 or so
2391+
* changes.
2392+
*/
2393+
#define CHANGES_THRESHOLD 100
2394+
2395+
/*
2396+
* If we are at the end of transaction LSN, update progress tracking.
2397+
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
2398+
* try to send a keepalive message if required.
2399+
*/
2400+
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
2401+
{
2402+
OutputPluginUpdateProgress(ctx, skipped_xact);
2403+
changes_count = 0;
2404+
}
2405+
}

src/backend/replication/walsender.c

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,14 +1482,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
14821482
{
14831483
static TimestampTz sendTime = 0;
14841484
TimestampTz now = GetCurrentTimestamp();
1485+
bool pending_writes = false;
1486+
bool end_xact = ctx->end_xact;
14851487

14861488
/*
14871489
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
14881490
* avoid flooding the lag tracker when we commit frequently.
1491+
*
1492+
* We don't have a mechanism to get the ack for any LSN other than end
1493+
* xact LSN from the downstream. So, we track lag only for end of
1494+
* transaction LSN.
14891495
*/
14901496
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1491-
if (TimestampDifferenceExceeds(sendTime, now,
1492-
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1497+
if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1498+
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
14931499
{
14941500
LagTrackerWrite(lsn, now);
14951501
sendTime = now;
@@ -1515,8 +1521,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
15151521

15161522
/* If we have pending write here, make sure it's actually flushed */
15171523
if (pq_is_send_pending())
1518-
ProcessPendingWrites();
1524+
pending_writes = true;
15191525
}
1526+
1527+
/*
1528+
* Process pending writes if any or try to send a keepalive if required.
1529+
* We don't need to try sending keep alive messages at the transaction end
1530+
* as that will be done at a later point in time. This is required only
1531+
* for large transactions where we don't send any changes to the
1532+
* downstream and the receiver can timeout due to that.
1533+
*/
1534+
if (pending_writes || (!end_xact &&
1535+
now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
1536+
wal_sender_timeout / 2)))
1537+
ProcessPendingWrites();
15201538
}
15211539

15221540
/*

src/include/replication/logical.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ typedef struct LogicalDecodingContext
107107
bool prepared_write;
108108
XLogRecPtr write_location;
109109
TransactionId write_xid;
110+
/* Are we processing the end LSN of a transaction? */
111+
bool end_xact;
110112
} LogicalDecodingContext;
111113

112114

0 commit comments

Comments
 (0)