Skip to content

Commit f04783a

Browse files
committed
kinda lock in filter
1 parent 7d2f872 commit f04783a

File tree

2 files changed

+33
-8
lines changed

2 files changed

+33
-8
lines changed

contrib/test_decoding/sql/prepared.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ DROP TABLE test_prepared1;
4545
DROP TABLE test_prepared2;
4646

4747
-- show results
48-
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
48+
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
49+
50+
-- same but with twophase decoding
51+
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1') ;
4952

5053
SELECT pg_drop_replication_slot('regression_slot');

contrib/test_decoding/test_decoding.c

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
#include "replication/message.h"
2525
#include "replication/origin.h"
2626

27+
#include "storage/procarray.h"
28+
2729
#include "utils/builtins.h"
2830
#include "utils/lsyscache.h"
2931
#include "utils/memutils.h"
@@ -283,15 +285,35 @@ pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
283285
{
284286
TestDecodingData *data = ctx->output_plugin_private;
285287

286-
// has_catalog_changes?
287-
// LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
288+
if (!data->twophase_decoding)
289+
return true;
288290

289-
// OutputPluginPrepareWrite(ctx, true);
291+
if (txn->has_catalog_changes)
292+
{
293+
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
294+
295+
if (TransactionIdIsInProgress(txn->xid))
296+
{
297+
/*
298+
* XXX
299+
*/
300+
LWLockRelease(TwoPhaseStateLock);
301+
return true;
302+
}
303+
else if (TransactionIdDidAbort(txn->xid))
304+
{
305+
/*
306+
* Here we know that it is already aborted and should humble
307+
* ourselves.
308+
*/
309+
LWLockRelease(TwoPhaseStateLock);
310+
return true;
311+
}
290312

291-
// appendStringInfo(ctx->out, "pg_filter_prepare %s", gid);
313+
LWLockRelease(TwoPhaseStateLock);
314+
}
292315

293-
// OutputPluginWrite(ctx, true);
294-
return true;
316+
return false;
295317
}
296318

297319

@@ -307,7 +329,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
307329

308330
OutputPluginPrepareWrite(ctx, true);
309331

310-
appendStringInfo(ctx->out, "PREPARE! '%s'", txn->gid);
332+
appendStringInfo(ctx->out, "PREPARE '%s'", txn->gid);
311333

312334
if (data->include_xids)
313335
appendStringInfo(ctx->out, " %u", txn->xid);

0 commit comments

Comments
 (0)