@@ -34,10 +34,24 @@ typedef struct
34
34
bool include_xids ;
35
35
bool include_timestamp ;
36
36
bool skip_empty_xacts ;
37
- bool xact_wrote_changes ;
38
37
bool only_local ;
39
38
} TestDecodingData ;
40
39
40
+ /*
41
+ * Maintain the per-transaction level variables to track whether the
42
+ * transaction and or streams have written any changes. In streaming mode the
43
+ * transaction can be decoded in streams so along with maintaining whether the
44
+ * transaction has written any changes, we also need to track whether the
45
+ * current stream has written any changes. This is required so that if user
46
+ * has requested to skip the empty transactions we can skip the empty streams
47
+ * even though the transaction has written some changes.
48
+ */
49
+ typedef struct
50
+ {
51
+ bool xact_wrote_changes ;
52
+ bool stream_wrote_changes ;
53
+ } TestDecodingTxnData ;
54
+
41
55
static void pg_decode_startup (LogicalDecodingContext * ctx , OutputPluginOptions * opt ,
42
56
bool is_init );
43
57
static void pg_decode_shutdown (LogicalDecodingContext * ctx );
@@ -255,8 +269,12 @@ static void
255
269
pg_decode_begin_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
256
270
{
257
271
TestDecodingData * data = ctx -> output_plugin_private ;
272
+ TestDecodingTxnData * txndata =
273
+ MemoryContextAllocZero (ctx -> context , sizeof (TestDecodingTxnData ));
274
+
275
+ txndata -> xact_wrote_changes = false;
276
+ txn -> output_plugin_private = txndata ;
258
277
259
- data -> xact_wrote_changes = false;
260
278
if (data -> skip_empty_xacts )
261
279
return ;
262
280
@@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
280
298
XLogRecPtr commit_lsn )
281
299
{
282
300
TestDecodingData * data = ctx -> output_plugin_private ;
301
+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
302
+ bool xact_wrote_changes = txndata -> xact_wrote_changes ;
303
+
304
+ pfree (txndata );
305
+ txn -> output_plugin_private = NULL ;
283
306
284
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
307
+ if (data -> skip_empty_xacts && !xact_wrote_changes )
285
308
return ;
286
309
287
310
OutputPluginPrepareWrite (ctx , true);
@@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
442
465
Relation relation , ReorderBufferChange * change )
443
466
{
444
467
TestDecodingData * data ;
468
+ TestDecodingTxnData * txndata ;
445
469
Form_pg_class class_form ;
446
470
TupleDesc tupdesc ;
447
471
MemoryContext old ;
448
472
449
473
data = ctx -> output_plugin_private ;
474
+ txndata = txn -> output_plugin_private ;
450
475
451
476
/* output BEGIN if we haven't yet */
452
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
477
+ if (data -> skip_empty_xacts && !txndata -> xact_wrote_changes )
453
478
{
454
479
pg_output_begin (ctx , data , txn , false);
455
480
}
456
- data -> xact_wrote_changes = true;
481
+ txndata -> xact_wrote_changes = true;
457
482
458
483
class_form = RelationGetForm (relation );
459
484
tupdesc = RelationGetDescr (relation );
@@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
527
552
int nrelations , Relation relations [], ReorderBufferChange * change )
528
553
{
529
554
TestDecodingData * data ;
555
+ TestDecodingTxnData * txndata ;
530
556
MemoryContext old ;
531
557
int i ;
532
558
533
559
data = ctx -> output_plugin_private ;
560
+ txndata = txn -> output_plugin_private ;
534
561
535
562
/* output BEGIN if we haven't yet */
536
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
563
+ if (data -> skip_empty_xacts && !txndata -> xact_wrote_changes )
537
564
{
538
565
pg_output_begin (ctx , data , txn , false);
539
566
}
540
- data -> xact_wrote_changes = true;
567
+ txndata -> xact_wrote_changes = true;
541
568
542
569
/* Avoid leaking memory by using and resetting our own context */
543
570
old = MemoryContextSwitchTo (data -> context );
@@ -592,8 +619,20 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
592
619
ReorderBufferTXN * txn )
593
620
{
594
621
TestDecodingData * data = ctx -> output_plugin_private ;
622
+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
595
623
596
- data -> xact_wrote_changes = false;
624
+ /*
625
+ * Allocate the txn plugin data for the first stream in the transaction.
626
+ */
627
+ if (txndata == NULL )
628
+ {
629
+ txndata =
630
+ MemoryContextAllocZero (ctx -> context , sizeof (TestDecodingTxnData ));
631
+ txndata -> xact_wrote_changes = false;
632
+ txn -> output_plugin_private = txndata ;
633
+ }
634
+
635
+ txndata -> stream_wrote_changes = false;
597
636
if (data -> skip_empty_xacts )
598
637
return ;
599
638
pg_output_stream_start (ctx , data , txn , true);
@@ -615,8 +654,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
615
654
ReorderBufferTXN * txn )
616
655
{
617
656
TestDecodingData * data = ctx -> output_plugin_private ;
657
+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
618
658
619
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
659
+ if (data -> skip_empty_xacts && !txndata -> stream_wrote_changes )
620
660
return ;
621
661
622
662
OutputPluginPrepareWrite (ctx , true);
@@ -634,7 +674,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
634
674
{
635
675
TestDecodingData * data = ctx -> output_plugin_private ;
636
676
637
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
677
+ /*
678
+ * stream abort can be sent for an individual subtransaction but we
679
+ * maintain the output_plugin_private only under the toptxn so if this is
680
+ * not the toptxn then fetch the toptxn.
681
+ */
682
+ ReorderBufferTXN * toptxn = txn -> toptxn ? txn -> toptxn : txn ;
683
+ TestDecodingTxnData * txndata = toptxn -> output_plugin_private ;
684
+ bool xact_wrote_changes = txndata -> xact_wrote_changes ;
685
+
686
+ if (txn -> toptxn == NULL )
687
+ {
688
+ Assert (txn -> output_plugin_private != NULL );
689
+ pfree (txndata );
690
+ txn -> output_plugin_private = NULL ;
691
+ }
692
+
693
+ if (data -> skip_empty_xacts && !xact_wrote_changes )
638
694
return ;
639
695
640
696
OutputPluginPrepareWrite (ctx , true);
@@ -651,8 +707,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
651
707
XLogRecPtr commit_lsn )
652
708
{
653
709
TestDecodingData * data = ctx -> output_plugin_private ;
710
+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
711
+ bool xact_wrote_changes = txndata -> xact_wrote_changes ;
712
+
713
+ pfree (txndata );
714
+ txn -> output_plugin_private = NULL ;
654
715
655
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
716
+ if (data -> skip_empty_xacts && !xact_wrote_changes )
656
717
return ;
657
718
658
719
OutputPluginPrepareWrite (ctx , true);
@@ -681,13 +742,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
681
742
ReorderBufferChange * change )
682
743
{
683
744
TestDecodingData * data = ctx -> output_plugin_private ;
745
+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
684
746
685
747
/* output stream start if we haven't yet */
686
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
748
+ if (data -> skip_empty_xacts && !txndata -> stream_wrote_changes )
687
749
{
688
750
pg_output_stream_start (ctx , data , txn , false);
689
751
}
690
- data -> xact_wrote_changes = true;
752
+ txndata -> xact_wrote_changes = txndata -> stream_wrote_changes = true;
691
753
692
754
OutputPluginPrepareWrite (ctx , true);
693
755
if (data -> include_xids )
@@ -734,12 +796,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
734
796
ReorderBufferChange * change )
735
797
{
736
798
TestDecodingData * data = ctx -> output_plugin_private ;
799
+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
737
800
738
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
801
+ if (data -> skip_empty_xacts && !txndata -> stream_wrote_changes )
739
802
{
740
803
pg_output_stream_start (ctx , data , txn , false);
741
804
}
742
- data -> xact_wrote_changes = true;
805
+ txndata -> xact_wrote_changes = txndata -> stream_wrote_changes = true;
743
806
744
807
OutputPluginPrepareWrite (ctx , true);
745
808
if (data -> include_xids )
0 commit comments