@@ -46,6 +46,7 @@ typedef struct
46
46
bool skip_empty_xacts ;
47
47
bool xact_wrote_changes ;
48
48
bool only_local ;
49
+ bool twophase_decoding ;
49
50
} TestDecodingData ;
50
51
51
52
static void pg_decode_startup (LogicalDecodingContext * ctx , OutputPluginOptions * opt ,
@@ -68,6 +69,19 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
68
69
ReorderBufferTXN * txn , XLogRecPtr message_lsn ,
69
70
bool transactional , const char * prefix ,
70
71
Size sz , const char * message );
72
+ static bool pg_filter_prepare (LogicalDecodingContext * ctx ,
73
+ ReorderBufferTXN * txn ,
74
+ char * gid );
75
+ static void pg_decode_prepare_txn (LogicalDecodingContext * ctx ,
76
+ ReorderBufferTXN * txn ,
77
+ XLogRecPtr prepare_lsn );
78
+ static void pg_decode_commit_prepared_txn (LogicalDecodingContext * ctx ,
79
+ ReorderBufferTXN * txn ,
80
+ XLogRecPtr commit_lsn );
81
+ static void pg_decode_abort_prepared_txn (LogicalDecodingContext * ctx ,
82
+ ReorderBufferTXN * txn ,
83
+ XLogRecPtr abort_lsn );
84
+
71
85
72
86
void
73
87
_PG_init (void )
@@ -85,9 +99,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
85
99
cb -> begin_cb = pg_decode_begin_txn ;
86
100
cb -> change_cb = pg_decode_change ;
87
101
cb -> commit_cb = pg_decode_commit_txn ;
102
+
88
103
cb -> filter_by_origin_cb = pg_decode_filter ;
89
104
cb -> shutdown_cb = pg_decode_shutdown ;
90
105
cb -> message_cb = pg_decode_message ;
106
+
107
+ cb -> filter_prepare_cb = pg_filter_prepare ;
108
+ cb -> prepare_cb = pg_decode_prepare_txn ;
109
+ cb -> commit_prepared_cb = pg_decode_commit_prepared_txn ;
110
+ cb -> abort_prepared_cb = pg_decode_abort_prepared_txn ;
91
111
}
92
112
93
113
@@ -107,6 +127,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
107
127
data -> include_timestamp = false;
108
128
data -> skip_empty_xacts = false;
109
129
data -> only_local = false;
130
+ data -> twophase_decoding = false;
110
131
111
132
ctx -> output_plugin_private = data ;
112
133
@@ -176,6 +197,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
176
197
errmsg ("could not parse value \"%s\" for parameter \"%s\"" ,
177
198
strVal (elem -> arg ), elem -> defname )));
178
199
}
200
+ else if (strcmp (elem -> defname , "twophase-decoding" ) == 0 )
201
+ {
202
+
203
+ if (elem -> arg == NULL )
204
+ data -> twophase_decoding = true;
205
+ else if (!parse_bool (strVal (elem -> arg ), & data -> twophase_decoding ))
206
+ ereport (ERROR ,
207
+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
208
+ errmsg ("could not parse value \"%s\" for parameter \"%s\"" ,
209
+ strVal (elem -> arg ), elem -> defname )));
210
+ }
179
211
else
180
212
{
181
213
ereport (ERROR ,
@@ -233,21 +265,97 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
233
265
234
266
OutputPluginPrepareWrite (ctx , true);
235
267
236
- switch (txn -> xact_action )
237
- {
238
- case XLOG_XACT_COMMIT :
239
- appendStringInfoString (ctx -> out , "COMMIT" );
240
- break ;
241
- case XLOG_XACT_PREPARE :
242
- appendStringInfo (ctx -> out , "PREPARE '%s'" , txn -> gid );
243
- break ;
244
- case XLOG_XACT_COMMIT_PREPARED :
245
- appendStringInfo (ctx -> out , "COMMIT PREPARED '%s'" , txn -> gid );
246
- break ;
247
- case XLOG_XACT_ABORT_PREPARED :
248
- appendStringInfo (ctx -> out , "ABORT PREPARED '%s'" , txn -> gid );
249
- break ;
250
- }
268
+ appendStringInfoString (ctx -> out , "COMMIT" );
269
+
270
+ if (data -> include_xids )
271
+ appendStringInfo (ctx -> out , " %u" , txn -> xid );
272
+
273
+ if (data -> include_timestamp )
274
+ appendStringInfo (ctx -> out , " (at %s)" ,
275
+ timestamptz_to_str (txn -> commit_time ));
276
+
277
+ OutputPluginWrite (ctx , true);
278
+ }
279
+
280
+ static bool
281
+ pg_filter_prepare (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
282
+ char * gid )
283
+ {
284
+ TestDecodingData * data = ctx -> output_plugin_private ;
285
+
286
+ // has_catalog_changes?
287
+ // LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
288
+
289
+ // OutputPluginPrepareWrite(ctx, true);
290
+
291
+ // appendStringInfo(ctx->out, "pg_filter_prepare %s", gid);
292
+
293
+ // OutputPluginWrite(ctx, true);
294
+ return true;
295
+ }
296
+
297
+
298
+ /* PREPARE callback */
299
+ static void
300
+ pg_decode_prepare_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
301
+ XLogRecPtr prepare_lsn )
302
+ {
303
+ TestDecodingData * data = ctx -> output_plugin_private ;
304
+
305
+ if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
306
+ return ;
307
+
308
+ OutputPluginPrepareWrite (ctx , true);
309
+
310
+ appendStringInfo (ctx -> out , "PREPARE! '%s'" , txn -> gid );
311
+
312
+ if (data -> include_xids )
313
+ appendStringInfo (ctx -> out , " %u" , txn -> xid );
314
+
315
+ if (data -> include_timestamp )
316
+ appendStringInfo (ctx -> out , " (at %s)" ,
317
+ timestamptz_to_str (txn -> commit_time ));
318
+
319
+ OutputPluginWrite (ctx , true);
320
+ }
321
+
322
+ /* COMMIT PREPARED callback */
323
+ static void
324
+ pg_decode_commit_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
325
+ XLogRecPtr commit_lsn )
326
+ {
327
+ TestDecodingData * data = ctx -> output_plugin_private ;
328
+
329
+ if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
330
+ return ;
331
+
332
+ OutputPluginPrepareWrite (ctx , true);
333
+
334
+ appendStringInfo (ctx -> out , "COMMIT PREPARED '%s'" , txn -> gid );
335
+
336
+ if (data -> include_xids )
337
+ appendStringInfo (ctx -> out , " %u" , txn -> xid );
338
+
339
+ if (data -> include_timestamp )
340
+ appendStringInfo (ctx -> out , " (at %s)" ,
341
+ timestamptz_to_str (txn -> commit_time ));
342
+
343
+ OutputPluginWrite (ctx , true);
344
+ }
345
+
346
+ /* ABORT PREPARED callback */
347
+ static void
348
+ pg_decode_abort_prepared_txn (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
349
+ XLogRecPtr abort_lsn )
350
+ {
351
+ TestDecodingData * data = ctx -> output_plugin_private ;
352
+
353
+ if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
354
+ return ;
355
+
356
+ OutputPluginPrepareWrite (ctx , true);
357
+
358
+ appendStringInfo (ctx -> out , "ABORT PREPARED '%s'" , txn -> gid );
251
359
252
360
if (data -> include_xids )
253
361
appendStringInfo (ctx -> out , " %u" , txn -> xid );
0 commit comments