@@ -58,6 +58,14 @@ static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions
58
58
bool is_init );
59
59
static void shutdown_cb_wrapper (LogicalDecodingContext * ctx );
60
60
static void begin_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn );
61
+ static bool filter_prepare_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
62
+ char * gid );
63
+ static void prepare_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
64
+ XLogRecPtr prepare_lsn );
65
+ static void commit_prepared_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
66
+ XLogRecPtr commit_lsn );
67
+ static void abort_prepared_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
68
+ XLogRecPtr abort_lsn );
61
69
static void commit_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
62
70
XLogRecPtr commit_lsn );
63
71
static void change_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
@@ -122,6 +130,7 @@ StartupDecodingContext(List *output_plugin_options,
122
130
MemoryContext context ,
123
131
old_context ;
124
132
LogicalDecodingContext * ctx ;
133
+ int twophase_callbacks ;
125
134
126
135
/* shorter lines... */
127
136
slot = MyReplicationSlot ;
@@ -179,8 +188,25 @@ StartupDecodingContext(List *output_plugin_options,
179
188
ctx -> reorder -> begin = begin_cb_wrapper ;
180
189
ctx -> reorder -> apply_change = change_cb_wrapper ;
181
190
ctx -> reorder -> commit = commit_cb_wrapper ;
191
+ ctx -> reorder -> filter_prepare = filter_prepare_cb_wrapper ;
192
+ ctx -> reorder -> prepare = prepare_cb_wrapper ;
193
+ ctx -> reorder -> commit_prepared = commit_prepared_cb_wrapper ;
194
+ ctx -> reorder -> abort_prepared = abort_prepared_cb_wrapper ;
182
195
ctx -> reorder -> message = message_cb_wrapper ;
183
196
197
+ /* check that plugin implements all necessary callbacks to perform 2PC */
198
+ twophase_callbacks = (ctx -> callbacks .prepare_cb != NULL ) +
199
+ (ctx -> callbacks .commit_prepared_cb != NULL ) +
200
+ (ctx -> callbacks .abort_prepared_cb != NULL );
201
+
202
+ ctx -> twophase_hadling = (twophase_callbacks == 3 );
203
+
204
+ if (twophase_callbacks != 3 && twophase_callbacks != 0 )
205
+ ereport (WARNING ,
206
+ (errmsg ("Output plugin registered only %d twophase callbacks out of 3. "
207
+ "Twophase transactions will be decoded as ordinary ones." ,
208
+ twophase_callbacks )));
209
+
184
210
ctx -> out = makeStringInfo ();
185
211
ctx -> prepare_write = prepare_write ;
186
212
ctx -> write = do_write ;
@@ -649,6 +675,93 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
649
675
error_context_stack = errcallback .previous ;
650
676
}
651
677
678
+ static void
679
+ prepare_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
680
+ XLogRecPtr prepare_lsn )
681
+ {
682
+ LogicalDecodingContext * ctx = cache -> private_data ;
683
+ LogicalErrorCallbackState state ;
684
+ ErrorContextCallback errcallback ;
685
+
686
+ /* Push callback + info on the error context stack */
687
+ state .ctx = ctx ;
688
+ state .callback_name = "prepare" ;
689
+ state .report_location = txn -> final_lsn ; /* beginning of commit record */
690
+ errcallback .callback = output_plugin_error_callback ;
691
+ errcallback .arg = (void * ) & state ;
692
+ errcallback .previous = error_context_stack ;
693
+ error_context_stack = & errcallback ;
694
+
695
+ /* set output state */
696
+ ctx -> accept_writes = true;
697
+ ctx -> write_xid = txn -> xid ;
698
+ ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
699
+
700
+ /* do the actual work: call callback */
701
+ ctx -> callbacks .prepare_cb (ctx , txn , prepare_lsn );
702
+
703
+ /* Pop the error context stack */
704
+ error_context_stack = errcallback .previous ;
705
+ }
706
+
707
+ static void
708
+ commit_prepared_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
709
+ XLogRecPtr commit_lsn )
710
+ {
711
+ LogicalDecodingContext * ctx = cache -> private_data ;
712
+ LogicalErrorCallbackState state ;
713
+ ErrorContextCallback errcallback ;
714
+
715
+ /* Push callback + info on the error context stack */
716
+ state .ctx = ctx ;
717
+ state .callback_name = "commit_prepared" ;
718
+ state .report_location = txn -> final_lsn ; /* beginning of commit record */
719
+ errcallback .callback = output_plugin_error_callback ;
720
+ errcallback .arg = (void * ) & state ;
721
+ errcallback .previous = error_context_stack ;
722
+ error_context_stack = & errcallback ;
723
+
724
+ /* set output state */
725
+ ctx -> accept_writes = true;
726
+ ctx -> write_xid = txn -> xid ;
727
+ ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
728
+
729
+ /* do the actual work: call callback */
730
+ ctx -> callbacks .commit_prepared_cb (ctx , txn , commit_lsn );
731
+
732
+ /* Pop the error context stack */
733
+ error_context_stack = errcallback .previous ;
734
+ }
735
+
736
+ static void
737
+ abort_prepared_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
738
+ XLogRecPtr abort_lsn )
739
+ {
740
+ LogicalDecodingContext * ctx = cache -> private_data ;
741
+ LogicalErrorCallbackState state ;
742
+ ErrorContextCallback errcallback ;
743
+
744
+ /* Push callback + info on the error context stack */
745
+ state .ctx = ctx ;
746
+ state .callback_name = "abort_prepared" ;
747
+ state .report_location = txn -> final_lsn ; /* beginning of commit record */
748
+ errcallback .callback = output_plugin_error_callback ;
749
+ errcallback .arg = (void * ) & state ;
750
+ errcallback .previous = error_context_stack ;
751
+ error_context_stack = & errcallback ;
752
+
753
+ /* set output state */
754
+ ctx -> accept_writes = true;
755
+ ctx -> write_xid = txn -> xid ;
756
+ ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
757
+
758
+ /* do the actual work: call callback */
759
+ ctx -> callbacks .abort_prepared_cb (ctx , txn , abort_lsn );
760
+
761
+ /* Pop the error context stack */
762
+ error_context_stack = errcallback .previous ;
763
+ }
764
+
652
765
static void
653
766
change_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn ,
654
767
Relation relation , ReorderBufferChange * change )
@@ -684,6 +797,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
684
797
error_context_stack = errcallback .previous ;
685
798
}
686
799
800
+ static bool
801
+ filter_prepare_cb_wrapper (ReorderBuffer * cache , ReorderBufferTXN * txn , char * gid )
802
+ {
803
+ LogicalDecodingContext * ctx = cache -> private_data ;
804
+ LogicalErrorCallbackState state ;
805
+ ErrorContextCallback errcallback ;
806
+ bool ret ;
807
+
808
+ /* Push callback + info on the error context stack */
809
+ state .ctx = ctx ;
810
+ state .callback_name = "filter_prepare" ;
811
+ state .report_location = InvalidXLogRecPtr ;
812
+ errcallback .callback = output_plugin_error_callback ;
813
+ errcallback .arg = (void * ) & state ;
814
+ errcallback .previous = error_context_stack ;
815
+ error_context_stack = & errcallback ;
816
+
817
+ // /* set output state */
818
+ // ctx->accept_writes = false;
819
+
820
+ /* do the actual work: call callback */
821
+ ret = ctx -> callbacks .filter_prepare_cb (ctx , txn , gid );
822
+
823
+ /* Pop the error context stack */
824
+ error_context_stack = errcallback .previous ;
825
+ return ret ;
826
+ }
827
+
687
828
bool
688
829
filter_by_origin_cb_wrapper (LogicalDecodingContext * ctx , RepOriginId origin_id )
689
830
{
0 commit comments