@@ -76,6 +76,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
76
76
ReorderBufferTXN * txn , XLogRecPtr message_lsn ,
77
77
bool transactional , const char * prefix ,
78
78
Size sz , const char * message );
79
+ static bool pg_decode_filter_prepare (LogicalDecodingContext * ctx ,
80
+ const char * gid );
81
+ static void pg_decode_begin_prepare_txn (LogicalDecodingContext * ctx ,
82
+ ReorderBufferTXN * txn );
83
+ static void pg_decode_prepare_txn (LogicalDecodingContext * ctx ,
84
+ ReorderBufferTXN * txn ,
85
+ XLogRecPtr prepare_lsn );
86
+ static void pg_decode_commit_prepared_txn (LogicalDecodingContext * ctx ,
87
+ ReorderBufferTXN * txn ,
88
+ XLogRecPtr commit_lsn );
89
+ static void pg_decode_rollback_prepared_txn (LogicalDecodingContext * ctx ,
90
+ ReorderBufferTXN * txn ,
91
+ XLogRecPtr prepare_end_lsn ,
92
+ TimestampTz prepare_time );
79
93
static void pg_decode_stream_start (LogicalDecodingContext * ctx ,
80
94
ReorderBufferTXN * txn );
81
95
static void pg_output_stream_start (LogicalDecodingContext * ctx ,
@@ -87,6 +101,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
87
101
static void pg_decode_stream_abort (LogicalDecodingContext * ctx ,
88
102
ReorderBufferTXN * txn ,
89
103
XLogRecPtr abort_lsn );
104
+ static void pg_decode_stream_prepare (LogicalDecodingContext * ctx ,
105
+ ReorderBufferTXN * txn ,
106
+ XLogRecPtr prepare_lsn );
90
107
static void pg_decode_stream_commit (LogicalDecodingContext * ctx ,
91
108
ReorderBufferTXN * txn ,
92
109
XLogRecPtr commit_lsn );
@@ -123,9 +140,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
123
140
cb -> filter_by_origin_cb = pg_decode_filter ;
124
141
cb -> shutdown_cb = pg_decode_shutdown ;
125
142
cb -> message_cb = pg_decode_message ;
143
+ cb -> filter_prepare_cb = pg_decode_filter_prepare ;
144
+ cb -> begin_prepare_cb = pg_decode_begin_prepare_txn ;
145
+ cb -> prepare_cb = pg_decode_prepare_txn ;
146
+ cb -> commit_prepared_cb = pg_decode_commit_prepared_txn ;
147
+ cb -> rollback_prepared_cb = pg_decode_rollback_prepared_txn ;
126
148
cb -> stream_start_cb = pg_decode_stream_start ;
127
149
cb -> stream_stop_cb = pg_decode_stream_stop ;
128
150
cb -> stream_abort_cb = pg_decode_stream_abort ;
151
+ cb -> stream_prepare_cb = pg_decode_stream_prepare ;
129
152
cb -> stream_commit_cb = pg_decode_stream_commit ;
130
153
cb -> stream_change_cb = pg_decode_stream_change ;
131
154
cb -> stream_message_cb = pg_decode_stream_message ;
@@ -141,6 +164,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
141
164
ListCell * option ;
142
165
TestDecodingData * data ;
143
166
bool enable_streaming = false;
167
+ bool enable_twophase = false;
144
168
145
169
data = palloc0 (sizeof (TestDecodingData ));
146
170
data -> context = AllocSetContextCreate (ctx -> context ,
@@ -241,6 +265,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
241
265
errmsg ("could not parse value \"%s\" for parameter \"%s\"" ,
242
266
strVal (elem -> arg ), elem -> defname )));
243
267
}
268
+ else if (strcmp (elem -> defname , "two-phase-commit" ) == 0 )
269
+ {
270
+ if (elem -> arg == NULL )
271
+ continue ;
272
+ else if (!parse_bool (strVal (elem -> arg ), & enable_twophase ))
273
+ ereport (ERROR ,
274
+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
275
+ errmsg ("could not parse value \"%s\" for parameter \"%s\"" ,
276
+ strVal (elem -> arg ), elem -> defname )));
277
+ }
244
278
else
245
279
{
246
280
ereport (ERROR ,
@@ -252,6 +286,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
252
286
}
253
287
254
288
ctx -> streaming &= enable_streaming ;
289
+ ctx -> twophase &= enable_twophase ;
255
290
}
256
291
257
292
/* cleanup this plugin's resources */
@@ -320,6 +355,111 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
320
355
OutputPluginWrite (ctx , true);
321
356
}
322
357
358
+ /* BEGIN PREPARE callback */
359
+ static void
360
+ pg_decode_begin_prepare_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn )
361
+ {
362
+ TestDecodingData * data = ctx -> output_plugin_private ;
363
+ TestDecodingTxnData * txndata =
364
+ MemoryContextAllocZero (ctx -> context , sizeof (TestDecodingTxnData ));
365
+
366
+ txndata -> xact_wrote_changes = false;
367
+ txn -> output_plugin_private = txndata ;
368
+
369
+ if (data -> skip_empty_xacts )
370
+ return ;
371
+
372
+ pg_output_begin (ctx , data , txn , true);
373
+ }
374
+
375
+ /* PREPARE callback */
376
+ static void
377
+ pg_decode_prepare_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
378
+ XLogRecPtr prepare_lsn )
379
+ {
380
+ TestDecodingData * data = ctx -> output_plugin_private ;
381
+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
382
+
383
+ if (data -> skip_empty_xacts && !txndata -> xact_wrote_changes )
384
+ return ;
385
+
386
+ OutputPluginPrepareWrite (ctx , true);
387
+
388
+ appendStringInfo (ctx -> out , "PREPARE TRANSACTION %s" ,
389
+ quote_literal_cstr (txn -> gid ));
390
+
391
+ if (data -> include_xids )
392
+ appendStringInfo (ctx -> out , ", txid %u" , txn -> xid );
393
+
394
+ if (data -> include_timestamp )
395
+ appendStringInfo (ctx -> out , " (at %s)" ,
396
+ timestamptz_to_str (txn -> commit_time ));
397
+
398
+ OutputPluginWrite (ctx , true);
399
+ }
400
+
401
+ /* COMMIT PREPARED callback */
402
+ static void
403
+ pg_decode_commit_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
404
+ XLogRecPtr commit_lsn )
405
+ {
406
+ TestDecodingData * data = ctx -> output_plugin_private ;
407
+
408
+ OutputPluginPrepareWrite (ctx , true);
409
+
410
+ appendStringInfo (ctx -> out , "COMMIT PREPARED %s" ,
411
+ quote_literal_cstr (txn -> gid ));
412
+
413
+ if (data -> include_xids )
414
+ appendStringInfo (ctx -> out , ", txid %u" , txn -> xid );
415
+
416
+ if (data -> include_timestamp )
417
+ appendStringInfo (ctx -> out , " (at %s)" ,
418
+ timestamptz_to_str (txn -> commit_time ));
419
+
420
+ OutputPluginWrite (ctx , true);
421
+ }
422
+
423
+ /* ROLLBACK PREPARED callback */
424
+ static void
425
+ pg_decode_rollback_prepared_txn (LogicalDecodingContext * ctx ,
426
+ ReorderBufferTXN * txn ,
427
+ XLogRecPtr prepare_end_lsn ,
428
+ TimestampTz prepare_time )
429
+ {
430
+ TestDecodingData * data = ctx -> output_plugin_private ;
431
+
432
+ OutputPluginPrepareWrite (ctx , true);
433
+
434
+ appendStringInfo (ctx -> out , "ROLLBACK PREPARED %s" ,
435
+ quote_literal_cstr (txn -> gid ));
436
+
437
+ if (data -> include_xids )
438
+ appendStringInfo (ctx -> out , ", txid %u" , txn -> xid );
439
+
440
+ if (data -> include_timestamp )
441
+ appendStringInfo (ctx -> out , " (at %s)" ,
442
+ timestamptz_to_str (txn -> commit_time ));
443
+
444
+ OutputPluginWrite (ctx , true);
445
+ }
446
+
447
+ /*
448
+ * Filter out two-phase transactions.
449
+ *
450
+ * Each plugin can implement its own filtering logic. Here we demonstrate a
451
+ * simple logic by checking the GID. If the GID contains the "_nodecode"
452
+ * substring, then we filter it out.
453
+ */
454
+ static bool
455
+ pg_decode_filter_prepare (LogicalDecodingContext * ctx , const char * gid )
456
+ {
457
+ if (strstr (gid , "_nodecode" ) != NULL )
458
+ return true;
459
+
460
+ return false;
461
+ }
462
+
323
463
static bool
324
464
pg_decode_filter (LogicalDecodingContext * ctx ,
325
465
RepOriginId origin_id )
@@ -701,6 +841,33 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
701
841
OutputPluginWrite (ctx , true);
702
842
}
703
843
844
+ static void
845
+ pg_decode_stream_prepare (LogicalDecodingContext * ctx ,
846
+ ReorderBufferTXN * txn ,
847
+ XLogRecPtr prepare_lsn )
848
+ {
849
+ TestDecodingData * data = ctx -> output_plugin_private ;
850
+ TestDecodingTxnData * txndata = txn -> output_plugin_private ;
851
+
852
+ if (data -> skip_empty_xacts && !txndata -> xact_wrote_changes )
853
+ return ;
854
+
855
+ OutputPluginPrepareWrite (ctx , true);
856
+
857
+ if (data -> include_xids )
858
+ appendStringInfo (ctx -> out , "preparing streamed transaction TXN %s, txid %u" ,
859
+ quote_literal_cstr (txn -> gid ), txn -> xid );
860
+ else
861
+ appendStringInfo (ctx -> out , "preparing streamed transaction %s" ,
862
+ quote_literal_cstr (txn -> gid ));
863
+
864
+ if (data -> include_timestamp )
865
+ appendStringInfo (ctx -> out , " (at %s)" ,
866
+ timestamptz_to_str (txn -> commit_time ));
867
+
868
+ OutputPluginWrite (ctx , true);
869
+ }
870
+
704
871
static void
705
872
pg_decode_stream_commit (LogicalDecodingContext * ctx ,
706
873
ReorderBufferTXN * txn ,
0 commit comments