@@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
62
62
ReorderBufferTXN * txn , XLogRecPtr message_lsn ,
63
63
bool transactional , const char * prefix ,
64
64
Size sz , const char * message );
65
+ static void pg_decode_stream_start (LogicalDecodingContext * ctx ,
66
+ ReorderBufferTXN * txn );
67
+ static void pg_decode_stream_stop (LogicalDecodingContext * ctx ,
68
+ ReorderBufferTXN * txn );
69
+ static void pg_decode_stream_abort (LogicalDecodingContext * ctx ,
70
+ ReorderBufferTXN * txn ,
71
+ XLogRecPtr abort_lsn );
72
+ static void pg_decode_stream_commit (LogicalDecodingContext * ctx ,
73
+ ReorderBufferTXN * txn ,
74
+ XLogRecPtr commit_lsn );
75
+ static void pg_decode_stream_change (LogicalDecodingContext * ctx ,
76
+ ReorderBufferTXN * txn ,
77
+ Relation relation ,
78
+ ReorderBufferChange * change );
79
+ static void pg_decode_stream_message (LogicalDecodingContext * ctx ,
80
+ ReorderBufferTXN * txn , XLogRecPtr message_lsn ,
81
+ bool transactional , const char * prefix ,
82
+ Size sz , const char * message );
83
+ static void pg_decode_stream_truncate (LogicalDecodingContext * ctx ,
84
+ ReorderBufferTXN * txn ,
85
+ int nrelations , Relation relations [],
86
+ ReorderBufferChange * change );
65
87
66
88
void
67
89
_PG_init (void )
@@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
83
105
cb -> filter_by_origin_cb = pg_decode_filter ;
84
106
cb -> shutdown_cb = pg_decode_shutdown ;
85
107
cb -> message_cb = pg_decode_message ;
108
+ cb -> stream_start_cb = pg_decode_stream_start ;
109
+ cb -> stream_stop_cb = pg_decode_stream_stop ;
110
+ cb -> stream_abort_cb = pg_decode_stream_abort ;
111
+ cb -> stream_commit_cb = pg_decode_stream_commit ;
112
+ cb -> stream_change_cb = pg_decode_stream_change ;
113
+ cb -> stream_message_cb = pg_decode_stream_message ;
114
+ cb -> stream_truncate_cb = pg_decode_stream_truncate ;
86
115
}
87
116
88
117
@@ -540,3 +569,150 @@ pg_decode_message(LogicalDecodingContext *ctx,
540
569
appendBinaryStringInfo (ctx -> out , message , sz );
541
570
OutputPluginWrite (ctx , true);
542
571
}
572
+
573
+ /*
574
+ * We never try to stream any empty xact so we don't need any special handling
575
+ * for skip_empty_xacts in streaming mode APIs.
576
+ */
577
+ static void
578
+ pg_decode_stream_start (LogicalDecodingContext * ctx ,
579
+ ReorderBufferTXN * txn )
580
+ {
581
+ TestDecodingData * data = ctx -> output_plugin_private ;
582
+
583
+ OutputPluginPrepareWrite (ctx , true);
584
+ if (data -> include_xids )
585
+ appendStringInfo (ctx -> out , "opening a streamed block for transaction TXN %u" , txn -> xid );
586
+ else
587
+ appendStringInfo (ctx -> out , "opening a streamed block for transaction" );
588
+ OutputPluginWrite (ctx , true);
589
+ }
590
+
591
+ /*
592
+ * We never try to stream any empty xact so we don't need any special handling
593
+ * for skip_empty_xacts in streaming mode APIs.
594
+ */
595
+ static void
596
+ pg_decode_stream_stop (LogicalDecodingContext * ctx ,
597
+ ReorderBufferTXN * txn )
598
+ {
599
+ TestDecodingData * data = ctx -> output_plugin_private ;
600
+
601
+ OutputPluginPrepareWrite (ctx , true);
602
+ if (data -> include_xids )
603
+ appendStringInfo (ctx -> out , "closing a streamed block for transaction TXN %u" , txn -> xid );
604
+ else
605
+ appendStringInfo (ctx -> out , "closing a streamed block for transaction" );
606
+ OutputPluginWrite (ctx , true);
607
+ }
608
+
609
+ /*
610
+ * We never try to stream any empty xact so we don't need any special handling
611
+ * for skip_empty_xacts in streaming mode APIs.
612
+ */
613
+ static void
614
+ pg_decode_stream_abort (LogicalDecodingContext * ctx ,
615
+ ReorderBufferTXN * txn ,
616
+ XLogRecPtr abort_lsn )
617
+ {
618
+ TestDecodingData * data = ctx -> output_plugin_private ;
619
+
620
+ OutputPluginPrepareWrite (ctx , true);
621
+ if (data -> include_xids )
622
+ appendStringInfo (ctx -> out , "aborting streamed (sub)transaction TXN %u" , txn -> xid );
623
+ else
624
+ appendStringInfo (ctx -> out , "aborting streamed (sub)transaction" );
625
+ OutputPluginWrite (ctx , true);
626
+ }
627
+
628
+ /*
629
+ * We never try to stream any empty xact so we don't need any special handling
630
+ * for skip_empty_xacts in streaming mode APIs.
631
+ */
632
+ static void
633
+ pg_decode_stream_commit (LogicalDecodingContext * ctx ,
634
+ ReorderBufferTXN * txn ,
635
+ XLogRecPtr commit_lsn )
636
+ {
637
+ TestDecodingData * data = ctx -> output_plugin_private ;
638
+
639
+ OutputPluginPrepareWrite (ctx , true);
640
+
641
+ if (data -> include_xids )
642
+ appendStringInfo (ctx -> out , "committing streamed transaction TXN %u" , txn -> xid );
643
+ else
644
+ appendStringInfo (ctx -> out , "committing streamed transaction" );
645
+
646
+ if (data -> include_timestamp )
647
+ appendStringInfo (ctx -> out , " (at %s)" ,
648
+ timestamptz_to_str (txn -> commit_time ));
649
+
650
+ OutputPluginWrite (ctx , true);
651
+ }
652
+
653
+ /*
654
+ * In streaming mode, we don't display the changes as the transaction can abort
655
+ * at a later point in time. We don't want users to see the changes until the
656
+ * transaction is committed.
657
+ */
658
+ static void
659
+ pg_decode_stream_change (LogicalDecodingContext * ctx ,
660
+ ReorderBufferTXN * txn ,
661
+ Relation relation ,
662
+ ReorderBufferChange * change )
663
+ {
664
+ TestDecodingData * data = ctx -> output_plugin_private ;
665
+
666
+ OutputPluginPrepareWrite (ctx , true);
667
+ if (data -> include_xids )
668
+ appendStringInfo (ctx -> out , "streaming change for TXN %u" , txn -> xid );
669
+ else
670
+ appendStringInfo (ctx -> out , "streaming change for transaction" );
671
+ OutputPluginWrite (ctx , true);
672
+ }
673
+
674
+ /*
675
+ * In streaming mode, we don't display the contents for transactional messages
676
+ * as the transaction can abort at a later point in time. We don't want users to
677
+ * see the message contents until the transaction is committed.
678
+ */
679
+ static void
680
+ pg_decode_stream_message (LogicalDecodingContext * ctx ,
681
+ ReorderBufferTXN * txn , XLogRecPtr lsn , bool transactional ,
682
+ const char * prefix , Size sz , const char * message )
683
+ {
684
+ OutputPluginPrepareWrite (ctx , true);
685
+
686
+ if (transactional )
687
+ {
688
+ appendStringInfo (ctx -> out , "streaming message: transactional: %d prefix: %s, sz: %zu" ,
689
+ transactional , prefix , sz );
690
+ }
691
+ else
692
+ {
693
+ appendStringInfo (ctx -> out , "streaming message: transactional: %d prefix: %s, sz: %zu content:" ,
694
+ transactional , prefix , sz );
695
+ appendBinaryStringInfo (ctx -> out , message , sz );
696
+ }
697
+
698
+ OutputPluginWrite (ctx , true);
699
+ }
700
+
701
+ /*
702
+ * In streaming mode, we don't display the detailed information of Truncate.
703
+ * See pg_decode_stream_change.
704
+ */
705
+ static void
706
+ pg_decode_stream_truncate (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
707
+ int nrelations , Relation relations [],
708
+ ReorderBufferChange * change )
709
+ {
710
+ TestDecodingData * data = ctx -> output_plugin_private ;
711
+
712
+ OutputPluginPrepareWrite (ctx , true);
713
+ if (data -> include_xids )
714
+ appendStringInfo (ctx -> out , "streaming truncate for TXN %u" , txn -> xid );
715
+ else
716
+ appendStringInfo (ctx -> out , "streaming truncate for transaction" );
717
+ OutputPluginWrite (ctx , true);
718
+ }
0 commit comments