24
24
#include "replication/message.h"
25
25
#include "replication/origin.h"
26
26
27
+ #include "storage/procarray.h"
28
+
27
29
#include "utils/builtins.h"
28
30
#include "utils/lsyscache.h"
29
31
#include "utils/memutils.h"
@@ -283,15 +285,35 @@ pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
283
285
{
284
286
TestDecodingData * data = ctx -> output_plugin_private ;
285
287
286
- // has_catalog_changes?
287
- // LWLockAcquire(TwoPhaseStateLock, LW_SHARED) ;
288
+ if (! data -> twophase_decoding )
289
+ return true ;
288
290
289
- // OutputPluginPrepareWrite(ctx, true);
291
+ if (txn -> has_catalog_changes )
292
+ {
293
+ LWLockAcquire (TwoPhaseStateLock , LW_SHARED );
294
+
295
+ if (TransactionIdIsInProgress (txn -> xid ))
296
+ {
297
+ /*
298
+ * XXX
299
+ */
300
+ LWLockRelease (TwoPhaseStateLock );
301
+ return true;
302
+ }
303
+ else if (TransactionIdDidAbort (txn -> xid ))
304
+ {
305
+ /*
306
+ * Here we know that it is already aborted and should humble
307
+ * ourselves.
308
+ */
309
+ LWLockRelease (TwoPhaseStateLock );
310
+ return true;
311
+ }
290
312
291
- // appendStringInfo(ctx->out, "pg_filter_prepare %s", gid);
313
+ LWLockRelease (TwoPhaseStateLock );
314
+ }
292
315
293
- // OutputPluginWrite(ctx, true);
294
- return true;
316
+ return false;
295
317
}
296
318
297
319
@@ -307,7 +329,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
307
329
308
330
OutputPluginPrepareWrite (ctx , true);
309
331
310
- appendStringInfo (ctx -> out , "PREPARE! '%s'" , txn -> gid );
332
+ appendStringInfo (ctx -> out , "PREPARE '%s'" , txn -> gid );
311
333
312
334
if (data -> include_xids )
313
335
appendStringInfo (ctx -> out , " %u" , txn -> xid );
0 commit comments