Skip to content

Commit d5a9d86

Browse files
author
Amit Kapila
committed
Skip empty transactions for logical replication.
The current logical replication behavior is to send every transaction to subscriber even if the transaction is empty. This can happen because transaction doesn't contain changes from the selected publications or all the changes got filtered. It is a waste of CPU cycles and network bandwidth to build/transmit these empty transactions. This patch addresses the above problem by postponing the BEGIN message until the first change is sent. While processing a COMMIT message, if there was no other change for that transaction, do not send the COMMIT message. This allows us to skip sending BEGIN/COMMIT messages for empty transactions. When skipping empty transactions in synchronous replication mode, we send a keepalive message to avoid delaying such transactions. Author: Ajin Cherian, Hou Zhijie, Euler Taveira Reviewed-by: Peter Smith, Takamichi Osumi, Shi Yu, Masahiko Sawada, Greg Nancarrow, Vignesh C, Amit Kapila Discussion: https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com
1 parent ad4f2c4 commit d5a9d86

File tree

8 files changed

+228
-30
lines changed

8 files changed

+228
-30
lines changed

src/backend/replication/logical/logical.c

+4-2
Original file line numberDiff line numberDiff line change
@@ -683,12 +683,14 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
683683
* Update progress tracking (if supported).
684684
*/
685685
void
686-
OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
686+
OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
687+
bool skipped_xact)
687688
{
688689
if (!ctx->update_progress)
689690
return;
690691

691-
ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
692+
ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
693+
skipped_xact);
692694
}
693695

694696
/*

src/backend/replication/pgoutput/pgoutput.c

+133-8
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,36 @@ typedef struct RelationSyncEntry
183183
MemoryContext entry_cxt;
184184
} RelationSyncEntry;
185185

186+
/*
187+
* Maintain a per-transaction level variable to track whether the transaction
188+
* has sent BEGIN. BEGIN is only sent when the first change in a transaction
189+
* is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
190+
* messages for empty transactions which saves network bandwidth.
191+
*
192+
* This optimization is not used for prepared transactions because if the
193+
* WALSender restarts after prepare of a transaction and before commit prepared
194+
* of the same transaction then we won't be able to figure out if we have
195+
* skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
196+
* because we would have lost the in-memory txndata information that was
197+
* present prior to the restart. This will result in sending a spurious
198+
* COMMIT PREPARED without a corresponding prepared transaction at the
199+
* downstream which would lead to an error when it tries to process it.
200+
*
201+
* XXX We could achieve this optimization by changing protocol to send
202+
* additional information so that downstream can detect that the corresponding
203+
* prepare has not been sent. However, adding such a check for every
204+
* transaction in the downstream could be costly so we might want to do it
205+
* optionally.
206+
*
207+
* We also don't have this optimization for streamed transactions because
208+
* they can contain prepared transactions.
209+
*/
210+
typedef struct PGOutputTxnData
211+
{
212+
bool sent_begin_txn; /* flag indicating whether BEGIN has
213+
* been sent */
214+
} PGOutputTxnData;
215+
186216
/* Map used to remember which relation schemas we sent. */
187217
static HTAB *RelationSyncCache = NULL;
188218

@@ -488,15 +518,41 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
488518
}
489519

490520
/*
491-
* BEGIN callback
521+
* BEGIN callback.
522+
*
523+
* Don't send the BEGIN message here instead postpone it until the first
524+
* change. In logical replication, a common scenario is to replicate a set of
525+
* tables (instead of all tables) and transactions whose changes were on
526+
* the table(s) that are not published will produce empty transactions. These
527+
* empty transactions will send BEGIN and COMMIT messages to subscribers,
528+
* using bandwidth on something with little/no use for logical replication.
492529
*/
493530
static void
494-
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
531+
pgoutput_begin_txn(LogicalDecodingContext * ctx, ReorderBufferTXN * txn)
532+
{
533+
PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
534+
sizeof(PGOutputTxnData));
535+
536+
txn->output_plugin_private = txndata;
537+
}
538+
539+
/*
540+
* Send BEGIN.
541+
*
542+
* This is called while processing the first change of the transaction.
543+
*/
544+
static void
545+
pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
495546
{
496547
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
548+
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
549+
550+
Assert(txndata);
551+
Assert(!txndata->sent_begin_txn);
497552

498553
OutputPluginPrepareWrite(ctx, !send_replication_origin);
499554
logicalrep_write_begin(ctx->out, txn);
555+
txndata->sent_begin_txn = true;
500556

501557
send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
502558
send_replication_origin);
@@ -511,7 +567,25 @@ static void
511567
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
512568
XLogRecPtr commit_lsn)
513569
{
514-
OutputPluginUpdateProgress(ctx);
570+
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
571+
bool sent_begin_txn;
572+
573+
Assert(txndata);
574+
575+
/*
576+
* We don't need to send the commit message unless some relevant change
577+
* from this transaction has been sent to the downstream.
578+
*/
579+
sent_begin_txn = txndata->sent_begin_txn;
580+
OutputPluginUpdateProgress(ctx, !sent_begin_txn);
581+
pfree(txndata);
582+
txn->output_plugin_private = NULL;
583+
584+
if (!sent_begin_txn)
585+
{
586+
elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
587+
return;
588+
}
515589

516590
OutputPluginPrepareWrite(ctx, true);
517591
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -542,7 +616,7 @@ static void
542616
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
543617
XLogRecPtr prepare_lsn)
544618
{
545-
OutputPluginUpdateProgress(ctx);
619+
OutputPluginUpdateProgress(ctx, false);
546620

547621
OutputPluginPrepareWrite(ctx, true);
548622
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -556,7 +630,7 @@ static void
556630
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
557631
XLogRecPtr commit_lsn)
558632
{
559-
OutputPluginUpdateProgress(ctx);
633+
OutputPluginUpdateProgress(ctx, false);
560634

561635
OutputPluginPrepareWrite(ctx, true);
562636
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -572,7 +646,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
572646
XLogRecPtr prepare_end_lsn,
573647
TimestampTz prepare_time)
574648
{
575-
OutputPluginUpdateProgress(ctx);
649+
OutputPluginUpdateProgress(ctx, false);
576650

577651
OutputPluginPrepareWrite(ctx, true);
578652
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1295,6 +1369,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
12951369
Relation relation, ReorderBufferChange *change)
12961370
{
12971371
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1372+
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
12981373
MemoryContext old;
12991374
RelationSyncEntry *relentry;
13001375
TransactionId xid = InvalidTransactionId;
@@ -1370,6 +1445,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
13701445
&action))
13711446
break;
13721447

1448+
/*
1449+
* Send BEGIN if we haven't yet.
1450+
*
1451+
* We send the BEGIN message after ensuring that we will actually
1452+
* send the change. This avoids sending a pair of BEGIN/COMMIT
1453+
* messages for empty transactions.
1454+
*/
1455+
if (txndata && !txndata->sent_begin_txn)
1456+
pgoutput_send_begin(ctx, txn);
1457+
13731458
/*
13741459
* Schema should be sent using the original relation because it
13751460
* also sends the ancestor's relation.
@@ -1420,6 +1505,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14201505
relentry, &action))
14211506
break;
14221507

1508+
/* Send BEGIN if we haven't yet */
1509+
if (txndata && !txndata->sent_begin_txn)
1510+
pgoutput_send_begin(ctx, txn);
1511+
14231512
maybe_send_schema(ctx, change, relation, relentry);
14241513

14251514
OutputPluginPrepareWrite(ctx, true);
@@ -1480,6 +1569,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14801569
relentry, &action))
14811570
break;
14821571

1572+
/* Send BEGIN if we haven't yet */
1573+
if (txndata && !txndata->sent_begin_txn)
1574+
pgoutput_send_begin(ctx, txn);
1575+
14831576
maybe_send_schema(ctx, change, relation, relentry);
14841577

14851578
OutputPluginPrepareWrite(ctx, true);
@@ -1510,6 +1603,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15101603
int nrelations, Relation relations[], ReorderBufferChange *change)
15111604
{
15121605
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1606+
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
15131607
MemoryContext old;
15141608
RelationSyncEntry *relentry;
15151609
int i;
@@ -1548,6 +1642,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15481642
continue;
15491643

15501644
relids[nrelids++] = relid;
1645+
1646+
/* Send BEGIN if we haven't yet */
1647+
if (txndata && !txndata->sent_begin_txn)
1648+
pgoutput_send_begin(ctx, txn);
1649+
15511650
maybe_send_schema(ctx, change, relation, relentry);
15521651
}
15531652

@@ -1585,6 +1684,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15851684
if (in_streaming)
15861685
xid = txn->xid;
15871686

1687+
/*
1688+
* Output BEGIN if we haven't yet. Avoid for non-transactional
1689+
* messages.
1690+
*/
1691+
if (transactional)
1692+
{
1693+
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1694+
1695+
/* Send BEGIN if we haven't yet */
1696+
if (txndata && !txndata->sent_begin_txn)
1697+
pgoutput_send_begin(ctx, txn);
1698+
}
1699+
15881700
OutputPluginPrepareWrite(ctx, true);
15891701
logicalrep_write_message(ctx->out,
15901702
xid,
@@ -1629,6 +1741,19 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
16291741
if (!relentry->pubactions.pubsequence)
16301742
return;
16311743

1744+
/*
1745+
* Output BEGIN if we haven't yet. Avoid for non-transactional
1746+
* sequence changes.
1747+
*/
1748+
if (transactional)
1749+
{
1750+
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1751+
1752+
/* Send BEGIN if we haven't yet */
1753+
if (txndata && !txndata->sent_begin_txn)
1754+
pgoutput_send_begin(ctx, txn);
1755+
}
1756+
16321757
OutputPluginPrepareWrite(ctx, true);
16331758
logicalrep_write_sequence(ctx->out,
16341759
relation,
@@ -1799,7 +1924,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
17991924
Assert(!in_streaming);
18001925
Assert(rbtxn_is_streamed(txn));
18011926

1802-
OutputPluginUpdateProgress(ctx);
1927+
OutputPluginUpdateProgress(ctx, false);
18031928

18041929
OutputPluginPrepareWrite(ctx, true);
18051930
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1820,7 +1945,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
18201945
{
18211946
Assert(rbtxn_is_streamed(txn));
18221947

1823-
OutputPluginUpdateProgress(ctx);
1948+
OutputPluginUpdateProgress(ctx, false);
18241949
OutputPluginPrepareWrite(ctx, true);
18251950
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
18261951
OutputPluginWrite(ctx, true);

0 commit comments

Comments
 (0)