@@ -183,6 +183,36 @@ typedef struct RelationSyncEntry
183
183
MemoryContext entry_cxt ;
184
184
} RelationSyncEntry ;
185
185
186
+ /*
187
+ * Maintain a per-transaction level variable to track whether the transaction
188
+ * has sent BEGIN. BEGIN is only sent when the first change in a transaction
189
+ * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
190
+ * messages for empty transactions which saves network bandwidth.
191
+ *
192
+ * This optimization is not used for prepared transactions because if the
193
+ * WALSender restarts after prepare of a transaction and before commit prepared
194
+ * of the same transaction then we won't be able to figure out if we have
195
+ * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
196
+ * because we would have lost the in-memory txndata information that was
197
+ * present prior to the restart. This will result in sending a spurious
198
+ * COMMIT PREPARED without a corresponding prepared transaction at the
199
+ * downstream which would lead to an error when it tries to process it.
200
+ *
201
+ * XXX We could achieve this optimization by changing protocol to send
202
+ * additional information so that downstream can detect that the corresponding
203
+ * prepare has not been sent. However, adding such a check for every
204
+ * transaction in the downstream could be costly so we might want to do it
205
+ * optionally.
206
+ *
207
+ * We also don't have this optimization for streamed transactions because
208
+ * they can contain prepared transactions.
209
+ */
210
+ typedef struct PGOutputTxnData
211
+ {
212
+ bool sent_begin_txn ; /* flag indicating whether BEGIN has
213
+ * been sent */
214
+ } PGOutputTxnData ;
215
+
186
216
/* Map used to remember which relation schemas we sent. */
187
217
static HTAB * RelationSyncCache = NULL ;
188
218
@@ -488,15 +518,41 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
488
518
}
489
519
490
520
/*
491
- * BEGIN callback
521
+ * BEGIN callback.
522
+ *
523
+ * Don't send the BEGIN message here instead postpone it until the first
524
+ * change. In logical replication, a common scenario is to replicate a set of
525
+ * tables (instead of all tables) and transactions whose changes were on
526
+ * the table(s) that are not published will produce empty transactions. These
527
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
528
+ * using bandwidth on something with little/no use for logical replication.
492
529
*/
493
530
static void
494
- pgoutput_begin_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
531
+ pgoutput_begin_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
532
+ {
533
+ PGOutputTxnData * txndata = MemoryContextAllocZero (ctx -> context ,
534
+ sizeof (PGOutputTxnData ));
535
+
536
+ txn -> output_plugin_private = txndata ;
537
+ }
538
+
539
+ /*
540
+ * Send BEGIN.
541
+ *
542
+ * This is called while processing the first change of the transaction.
543
+ */
544
+ static void
545
+ pgoutput_send_begin (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
495
546
{
496
547
bool send_replication_origin = txn -> origin_id != InvalidRepOriginId ;
548
+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
549
+
550
+ Assert (txndata );
551
+ Assert (!txndata -> sent_begin_txn );
497
552
498
553
OutputPluginPrepareWrite (ctx , !send_replication_origin );
499
554
logicalrep_write_begin (ctx -> out , txn );
555
+ txndata -> sent_begin_txn = true;
500
556
501
557
send_repl_origin (ctx , txn -> origin_id , txn -> origin_lsn ,
502
558
send_replication_origin );
@@ -511,7 +567,25 @@ static void
511
567
pgoutput_commit_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
512
568
XLogRecPtr commit_lsn )
513
569
{
514
- OutputPluginUpdateProgress (ctx );
570
+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
571
+ bool sent_begin_txn ;
572
+
573
+ Assert (txndata );
574
+
575
+ /*
576
+ * We don't need to send the commit message unless some relevant change
577
+ * from this transaction has been sent to the downstream.
578
+ */
579
+ sent_begin_txn = txndata -> sent_begin_txn ;
580
+ OutputPluginUpdateProgress (ctx , !sent_begin_txn );
581
+ pfree (txndata );
582
+ txn -> output_plugin_private = NULL ;
583
+
584
+ if (!sent_begin_txn )
585
+ {
586
+ elog (DEBUG1 , "skipped replication of an empty transaction with XID: %u" , txn -> xid );
587
+ return ;
588
+ }
515
589
516
590
OutputPluginPrepareWrite (ctx , true);
517
591
logicalrep_write_commit (ctx -> out , txn , commit_lsn );
@@ -542,7 +616,7 @@ static void
542
616
pgoutput_prepare_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
543
617
XLogRecPtr prepare_lsn )
544
618
{
545
- OutputPluginUpdateProgress (ctx );
619
+ OutputPluginUpdateProgress (ctx , false );
546
620
547
621
OutputPluginPrepareWrite (ctx , true);
548
622
logicalrep_write_prepare (ctx -> out , txn , prepare_lsn );
@@ -556,7 +630,7 @@ static void
556
630
pgoutput_commit_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
557
631
XLogRecPtr commit_lsn )
558
632
{
559
- OutputPluginUpdateProgress (ctx );
633
+ OutputPluginUpdateProgress (ctx , false );
560
634
561
635
OutputPluginPrepareWrite (ctx , true);
562
636
logicalrep_write_commit_prepared (ctx -> out , txn , commit_lsn );
@@ -572,7 +646,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
572
646
XLogRecPtr prepare_end_lsn ,
573
647
TimestampTz prepare_time )
574
648
{
575
- OutputPluginUpdateProgress (ctx );
649
+ OutputPluginUpdateProgress (ctx , false );
576
650
577
651
OutputPluginPrepareWrite (ctx , true);
578
652
logicalrep_write_rollback_prepared (ctx -> out , txn , prepare_end_lsn ,
@@ -1295,6 +1369,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1295
1369
Relation relation , ReorderBufferChange * change )
1296
1370
{
1297
1371
PGOutputData * data = (PGOutputData * ) ctx -> output_plugin_private ;
1372
+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
1298
1373
MemoryContext old ;
1299
1374
RelationSyncEntry * relentry ;
1300
1375
TransactionId xid = InvalidTransactionId ;
@@ -1370,6 +1445,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1370
1445
& action ))
1371
1446
break ;
1372
1447
1448
+ /*
1449
+ * Send BEGIN if we haven't yet.
1450
+ *
1451
+ * We send the BEGIN message after ensuring that we will actually
1452
+ * send the change. This avoids sending a pair of BEGIN/COMMIT
1453
+ * messages for empty transactions.
1454
+ */
1455
+ if (txndata && !txndata -> sent_begin_txn )
1456
+ pgoutput_send_begin (ctx , txn );
1457
+
1373
1458
/*
1374
1459
* Schema should be sent using the original relation because it
1375
1460
* also sends the ancestor's relation.
@@ -1420,6 +1505,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1420
1505
relentry , & action ))
1421
1506
break ;
1422
1507
1508
+ /* Send BEGIN if we haven't yet */
1509
+ if (txndata && !txndata -> sent_begin_txn )
1510
+ pgoutput_send_begin (ctx , txn );
1511
+
1423
1512
maybe_send_schema (ctx , change , relation , relentry );
1424
1513
1425
1514
OutputPluginPrepareWrite (ctx , true);
@@ -1480,6 +1569,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1480
1569
relentry , & action ))
1481
1570
break ;
1482
1571
1572
+ /* Send BEGIN if we haven't yet */
1573
+ if (txndata && !txndata -> sent_begin_txn )
1574
+ pgoutput_send_begin (ctx , txn );
1575
+
1483
1576
maybe_send_schema (ctx , change , relation , relentry );
1484
1577
1485
1578
OutputPluginPrepareWrite (ctx , true);
@@ -1510,6 +1603,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1510
1603
int nrelations , Relation relations [], ReorderBufferChange * change )
1511
1604
{
1512
1605
PGOutputData * data = (PGOutputData * ) ctx -> output_plugin_private ;
1606
+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
1513
1607
MemoryContext old ;
1514
1608
RelationSyncEntry * relentry ;
1515
1609
int i ;
@@ -1548,6 +1642,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1548
1642
continue ;
1549
1643
1550
1644
relids [nrelids ++ ] = relid ;
1645
+
1646
+ /* Send BEGIN if we haven't yet */
1647
+ if (txndata && !txndata -> sent_begin_txn )
1648
+ pgoutput_send_begin (ctx , txn );
1649
+
1551
1650
maybe_send_schema (ctx , change , relation , relentry );
1552
1651
}
1553
1652
@@ -1585,6 +1684,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1585
1684
if (in_streaming )
1586
1685
xid = txn -> xid ;
1587
1686
1687
+ /*
1688
+ * Output BEGIN if we haven't yet. Avoid for non-transactional
1689
+ * messages.
1690
+ */
1691
+ if (transactional )
1692
+ {
1693
+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
1694
+
1695
+ /* Send BEGIN if we haven't yet */
1696
+ if (txndata && !txndata -> sent_begin_txn )
1697
+ pgoutput_send_begin (ctx , txn );
1698
+ }
1699
+
1588
1700
OutputPluginPrepareWrite (ctx , true);
1589
1701
logicalrep_write_message (ctx -> out ,
1590
1702
xid ,
@@ -1629,6 +1741,19 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
1629
1741
if (!relentry -> pubactions .pubsequence )
1630
1742
return ;
1631
1743
1744
+ /*
1745
+ * Output BEGIN if we haven't yet. Avoid for non-transactional
1746
+ * sequence changes.
1747
+ */
1748
+ if (transactional )
1749
+ {
1750
+ PGOutputTxnData * txndata = (PGOutputTxnData * ) txn -> output_plugin_private ;
1751
+
1752
+ /* Send BEGIN if we haven't yet */
1753
+ if (txndata && !txndata -> sent_begin_txn )
1754
+ pgoutput_send_begin (ctx , txn );
1755
+ }
1756
+
1632
1757
OutputPluginPrepareWrite (ctx , true);
1633
1758
logicalrep_write_sequence (ctx -> out ,
1634
1759
relation ,
@@ -1799,7 +1924,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1799
1924
Assert (!in_streaming );
1800
1925
Assert (rbtxn_is_streamed (txn ));
1801
1926
1802
- OutputPluginUpdateProgress (ctx );
1927
+ OutputPluginUpdateProgress (ctx , false );
1803
1928
1804
1929
OutputPluginPrepareWrite (ctx , true);
1805
1930
logicalrep_write_stream_commit (ctx -> out , txn , commit_lsn );
@@ -1820,7 +1945,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1820
1945
{
1821
1946
Assert (rbtxn_is_streamed (txn ));
1822
1947
1823
- OutputPluginUpdateProgress (ctx );
1948
+ OutputPluginUpdateProgress (ctx , false );
1824
1949
OutputPluginPrepareWrite (ctx , true);
1825
1950
logicalrep_write_stream_prepare (ctx -> out , txn , prepare_lsn );
1826
1951
OutputPluginWrite (ctx , true);
0 commit comments