Skip to content

Commit 8c58624

Browse files
author
Amit Kapila
committed
Fix the logical replication timeout during large DDLs.
The DDLs like Refresh Materialized views that generate lots of temporary data due to rewrite rules may not be processed by output plugins (for example pgoutput). So, we won't send keep-alive messages for a long time while processing such commands and that can lead the subscriber side to timeout. We have previously fixed a similar case for large transactions in commit f95d53e where the output plugin filters all or most of the changes but missed to handle the DDLs. We decided not to backpatch this as this adds a new callback in the existing exposed structure and moreover, users can increase the wal_sender_timeout and wal_receiver_timeout to avoid this problem. Author: Wang wei, Hou Zhijie Reviewed-by: Peter Smith, Ashutosh Bapat, Shi yu, Amit Kapila Discussion: https://postgr.es/m/OS3PR01MB6275478E5D29E4A563302D3D9E2B9@OS3PR01MB6275.jpnprd01.prod.outlook.com Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
1 parent fee7b77 commit 8c58624

File tree

5 files changed

+89
-48
lines changed

5 files changed

+89
-48
lines changed

src/backend/replication/logical/logical.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
9393
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
9494
int nrelations, Relation relations[], ReorderBufferChange *change);
9595

96+
/* callback to update txn's progress */
97+
static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
98+
ReorderBufferTXN *txn,
99+
XLogRecPtr lsn);
100+
96101
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
97102

98103
/*
@@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options,
278283
ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
279284
ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
280285

286+
/*
287+
* Callback to support updating progress during sending data of a
288+
* transaction (and its subtransactions) to the output plugin.
289+
*/
290+
ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
291+
281292
ctx->out = makeStringInfo();
282293
ctx->prepare_write = prepare_write;
283294
ctx->write = do_write;
@@ -1584,6 +1595,45 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
15841595
error_context_stack = errcallback.previous;
15851596
}
15861597

1598+
static void
1599+
update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1600+
XLogRecPtr lsn)
1601+
{
1602+
LogicalDecodingContext *ctx = cache->private_data;
1603+
LogicalErrorCallbackState state;
1604+
ErrorContextCallback errcallback;
1605+
1606+
Assert(!ctx->fast_forward);
1607+
1608+
/* Push callback + info on the error context stack */
1609+
state.ctx = ctx;
1610+
state.callback_name = "update_progress_txn";
1611+
state.report_location = lsn;
1612+
errcallback.callback = output_plugin_error_callback;
1613+
errcallback.arg = (void *) &state;
1614+
errcallback.previous = error_context_stack;
1615+
error_context_stack = &errcallback;
1616+
1617+
/* set output state */
1618+
ctx->accept_writes = false;
1619+
ctx->write_xid = txn->xid;
1620+
1621+
/*
1622+
* Report this change's lsn so replies from clients can give an up-to-date
1623+
* answer. This won't ever be enough (and shouldn't be!) to confirm
1624+
* receipt of this transaction, but it might allow another transaction's
1625+
* commit to be confirmed with one message.
1626+
*/
1627+
ctx->write_location = lsn;
1628+
1629+
ctx->end_xact = false;
1630+
1631+
OutputPluginUpdateProgress(ctx, false);
1632+
1633+
/* Pop the error context stack */
1634+
error_context_stack = errcallback.previous;
1635+
}
1636+
15871637
/*
15881638
* Set the required catalog xmin horizon for historic snapshots in the current
15891639
* replication slot.

src/backend/replication/logical/reorderbuffer.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2100,6 +2100,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
21002100
PG_TRY();
21012101
{
21022102
ReorderBufferChange *change;
2103+
int changes_count = 0; /* used to accumulate the number of
2104+
* changes */
21032105

21042106
if (using_subtxn)
21052107
BeginInternalSubTransaction(streaming ? "stream" : "replay");
@@ -2440,6 +2442,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
24402442
elog(ERROR, "tuplecid value in changequeue");
24412443
break;
24422444
}
2445+
2446+
/*
2447+
* It is possible that the data is not sent to downstream for a
2448+
* long time either because the output plugin filtered it or there
2449+
* is a DDL that generates a lot of data that is not processed by
2450+
* the plugin. So, in such cases, the downstream can timeout. To
2451+
* avoid that we try to send a keepalive message if required.
2452+
* Trying to send a keepalive message after every change has some
2453+
* overhead, but testing showed there is no noticeable overhead if
2454+
* we do it after every ~100 changes.
2455+
*/
2456+
#define CHANGES_THRESHOLD 100
2457+
2458+
if (++changes_count >= CHANGES_THRESHOLD)
2459+
{
2460+
rb->update_progress_txn(rb, txn, change->lsn);
2461+
changes_count = 0;
2462+
}
24432463
}
24442464

24452465
/* speculative insertion record must be freed by now */

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
9292
static void send_repl_origin(LogicalDecodingContext *ctx,
9393
RepOriginId origin_id, XLogRecPtr origin_lsn,
9494
bool send_origin);
95-
static void update_replication_progress(LogicalDecodingContext *ctx,
96-
bool skipped_xact);
9795

9896
/*
9997
* Only 3 publication actions are used for row filtering ("insert", "update",
@@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
586584
* from this transaction has been sent to the downstream.
587585
*/
588586
sent_begin_txn = txndata->sent_begin_txn;
589-
update_replication_progress(ctx, !sent_begin_txn);
587+
OutputPluginUpdateProgress(ctx, !sent_begin_txn);
590588
pfree(txndata);
591589
txn->output_plugin_private = NULL;
592590

@@ -625,7 +623,7 @@ static void
625623
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
626624
XLogRecPtr prepare_lsn)
627625
{
628-
update_replication_progress(ctx, false);
626+
OutputPluginUpdateProgress(ctx, false);
629627

630628
OutputPluginPrepareWrite(ctx, true);
631629
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -639,7 +637,7 @@ static void
639637
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
640638
XLogRecPtr commit_lsn)
641639
{
642-
update_replication_progress(ctx, false);
640+
OutputPluginUpdateProgress(ctx, false);
643641

644642
OutputPluginPrepareWrite(ctx, true);
645643
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
655653
XLogRecPtr prepare_end_lsn,
656654
TimestampTz prepare_time)
657655
{
658-
update_replication_progress(ctx, false);
656+
OutputPluginUpdateProgress(ctx, false);
659657

660658
OutputPluginPrepareWrite(ctx, true);
661659
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14011399
TupleTableSlot *old_slot = NULL;
14021400
TupleTableSlot *new_slot = NULL;
14031401

1404-
update_replication_progress(ctx, false);
1405-
14061402
if (!is_publishable_relation(relation))
14071403
return;
14081404

@@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
16371633
Oid *relids;
16381634
TransactionId xid = InvalidTransactionId;
16391635

1640-
update_replication_progress(ctx, false);
1641-
16421636
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
16431637
if (in_streaming)
16441638
xid = change->txn->xid;
@@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
17021696
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
17031697
TransactionId xid = InvalidTransactionId;
17041698

1705-
update_replication_progress(ctx, false);
1706-
17071699
if (!data->messages)
17081700
return;
17091701

@@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
19031895
Assert(!in_streaming);
19041896
Assert(rbtxn_is_streamed(txn));
19051897

1906-
update_replication_progress(ctx, false);
1898+
OutputPluginUpdateProgress(ctx, false);
19071899

19081900
OutputPluginPrepareWrite(ctx, true);
19091901
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
19241916
{
19251917
Assert(rbtxn_is_streamed(txn));
19261918

1927-
update_replication_progress(ctx, false);
1919+
OutputPluginUpdateProgress(ctx, false);
19281920
OutputPluginPrepareWrite(ctx, true);
19291921
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
19301922
OutputPluginWrite(ctx, true);
@@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
24242416
}
24252417
}
24262418
}
2427-
2428-
/*
2429-
* Try to update progress and send a keepalive message if too many changes were
2430-
* processed.
2431-
*
2432-
* For a large transaction, if we don't send any change to the downstream for a
2433-
* long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
2434-
* This can happen when all or most of the changes are either not published or
2435-
* got filtered out.
2436-
*/
2437-
static void
2438-
update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
2439-
{
2440-
static int changes_count = 0;
2441-
2442-
/*
2443-
* We don't want to try sending a keepalive message after processing each
2444-
* change as that can have overhead. Tests revealed that there is no
2445-
* noticeable overhead in doing it after continuously processing 100 or so
2446-
* changes.
2447-
*/
2448-
#define CHANGES_THRESHOLD 100
2449-
2450-
/*
2451-
* If we are at the end of transaction LSN, update progress tracking.
2452-
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
2453-
* try to send a keepalive message if required.
2454-
*/
2455-
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
2456-
{
2457-
OutputPluginUpdateProgress(ctx, skipped_xact);
2458-
changes_count = 0;
2459-
}
2460-
}

src/include/replication/reorderbuffer.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,12 @@ typedef void (*ReorderBufferStreamTruncateCB) (
526526
Relation relations[],
527527
ReorderBufferChange *change);
528528

529+
/* update progress txn callback signature */
530+
typedef void (*ReorderBufferUpdateProgressTxnCB) (
531+
ReorderBuffer *rb,
532+
ReorderBufferTXN *txn,
533+
XLogRecPtr lsn);
534+
529535
struct ReorderBuffer
530536
{
531537
/*
@@ -589,6 +595,12 @@ struct ReorderBuffer
589595
ReorderBufferStreamMessageCB stream_message;
590596
ReorderBufferStreamTruncateCB stream_truncate;
591597

598+
/*
599+
* Callback to be called when updating progress during sending data of a
600+
* transaction (and its subtransactions) to the output plugin.
601+
*/
602+
ReorderBufferUpdateProgressTxnCB update_progress_txn;
603+
592604
/*
593605
* Pointer that will be passed untouched to the callbacks.
594606
*/

src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2311,6 +2311,7 @@ ReorderBufferToastEnt
23112311
ReorderBufferTupleBuf
23122312
ReorderBufferTupleCidEnt
23132313
ReorderBufferTupleCidKey
2314+
ReorderBufferUpdateProgressTxnCB
23142315
ReorderTuple
23152316
RepOriginId
23162317
ReparameterizeForeignPathByChild_function

0 commit comments

Comments
 (0)