Skip to content

Commit 9653f24

Browse files
author
Amit Kapila
committed
Fix 'skip-empty-xacts' option in test_decoding for streaming mode.
In streaming mode, the transaction can be decoded in multiple streams and those streams can be interleaved with streams of other transactions. So, we can't remember the transaction's write status in the logical decoding context because that might get changed due to some other transactions and lead to wrong answers for 'skip-empty-xacts' option. We decided to keep each transaction's write status in the ReorderBufferTxn to avoid interleaved streams changing the status of some unrelated transactions. Diagnosed-by: Amit Kapila Author: Dilip Kumar Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/CAA4eK1LR7=XNM_TLmpZMFuV8ZQpoxkem--NZJYf8YXmesbvwLA@mail.gmail.com
1 parent 2bd49b4 commit 9653f24

File tree

6 files changed

+96
-19
lines changed

6 files changed

+96
-19
lines changed

contrib/test_decoding/expected/concurrent_stream.out

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
Parsed test spec with 2 sessions
1+
Parsed test spec with 3 sessions
22

3-
starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
3+
starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s2_ddl s1_commit s1_get_stream_changes
44
step s0_begin: BEGIN;
55
step s0_ddl: CREATE TABLE stream_test1(data text);
66
step s1_ddl: CREATE TABLE stream_test(data text);
77
step s1_begin: BEGIN;
88
step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
9+
step s2_ddl: CREATE TABLE stream_test2(data text);
910
step s1_commit: COMMIT;
1011
step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
1112
data

contrib/test_decoding/specs/concurrent_stream.spec

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@ setup { SET synchronous_commit=on; }
2323
step "s0_begin" { BEGIN; }
2424
step "s0_ddl" {CREATE TABLE stream_test1(data text);}
2525

26+
session "s2"
27+
setup { SET synchronous_commit=on; }
28+
step "s2_ddl" {CREATE TABLE stream_test2(data text);}
29+
2630
# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
2731
# the currently running s0_ddl and we want to test that s0_ddl should not get
28-
# streamed when user asked to skip-empty-xacts.
32+
# streamed when user asked to skip-empty-xacts. Similarly, the
33+
# INTERNAL_SNAPSHOT change added by s2_ddl should not change the results for
34+
# what gets streamed.
2935
session "s1"
3036
setup { SET synchronous_commit=on; }
3137
step "s1_ddl" { CREATE TABLE stream_test(data text); }
@@ -34,4 +40,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
3440
step "s1_commit" { COMMIT; }
3541
step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
3642

37-
permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
43+
permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes"

contrib/test_decoding/test_decoding.c

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,24 @@ typedef struct
3434
bool include_xids;
3535
bool include_timestamp;
3636
bool skip_empty_xacts;
37-
bool xact_wrote_changes;
3837
bool only_local;
3938
} TestDecodingData;
4039

40+
/*
41+
* Maintain the per-transaction level variables to track whether the
42+
* transaction and or streams have written any changes. In streaming mode the
43+
* transaction can be decoded in streams so along with maintaining whether the
44+
* transaction has written any changes, we also need to track whether the
45+
* current stream has written any changes. This is required so that if user
46+
* has requested to skip the empty transactions we can skip the empty streams
47+
* even though the transaction has written some changes.
48+
*/
49+
typedef struct
50+
{
51+
bool xact_wrote_changes;
52+
bool stream_wrote_changes;
53+
} TestDecodingTxnData;
54+
4155
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
4256
bool is_init);
4357
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -255,8 +269,12 @@ static void
255269
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
256270
{
257271
TestDecodingData *data = ctx->output_plugin_private;
272+
TestDecodingTxnData *txndata =
273+
MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
274+
275+
txndata->xact_wrote_changes = false;
276+
txn->output_plugin_private = txndata;
258277

259-
data->xact_wrote_changes = false;
260278
if (data->skip_empty_xacts)
261279
return;
262280

@@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
280298
XLogRecPtr commit_lsn)
281299
{
282300
TestDecodingData *data = ctx->output_plugin_private;
301+
TestDecodingTxnData *txndata = txn->output_plugin_private;
302+
bool xact_wrote_changes = txndata->xact_wrote_changes;
303+
304+
pfree(txndata);
305+
txn->output_plugin_private = NULL;
283306

284-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
307+
if (data->skip_empty_xacts && !xact_wrote_changes)
285308
return;
286309

287310
OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
442465
Relation relation, ReorderBufferChange *change)
443466
{
444467
TestDecodingData *data;
468+
TestDecodingTxnData *txndata;
445469
Form_pg_class class_form;
446470
TupleDesc tupdesc;
447471
MemoryContext old;
448472

449473
data = ctx->output_plugin_private;
474+
txndata = txn->output_plugin_private;
450475

451476
/* output BEGIN if we haven't yet */
452-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
477+
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
453478
{
454479
pg_output_begin(ctx, data, txn, false);
455480
}
456-
data->xact_wrote_changes = true;
481+
txndata->xact_wrote_changes = true;
457482

458483
class_form = RelationGetForm(relation);
459484
tupdesc = RelationGetDescr(relation);
@@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
527552
int nrelations, Relation relations[], ReorderBufferChange *change)
528553
{
529554
TestDecodingData *data;
555+
TestDecodingTxnData *txndata;
530556
MemoryContext old;
531557
int i;
532558

533559
data = ctx->output_plugin_private;
560+
txndata = txn->output_plugin_private;
534561

535562
/* output BEGIN if we haven't yet */
536-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
563+
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
537564
{
538565
pg_output_begin(ctx, data, txn, false);
539566
}
540-
data->xact_wrote_changes = true;
567+
txndata->xact_wrote_changes = true;
541568

542569
/* Avoid leaking memory by using and resetting our own context */
543570
old = MemoryContextSwitchTo(data->context);
@@ -592,8 +619,20 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
592619
ReorderBufferTXN *txn)
593620
{
594621
TestDecodingData *data = ctx->output_plugin_private;
622+
TestDecodingTxnData *txndata = txn->output_plugin_private;
595623

596-
data->xact_wrote_changes = false;
624+
/*
625+
* Allocate the txn plugin data for the first stream in the transaction.
626+
*/
627+
if (txndata == NULL)
628+
{
629+
txndata =
630+
MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
631+
txndata->xact_wrote_changes = false;
632+
txn->output_plugin_private = txndata;
633+
}
634+
635+
txndata->stream_wrote_changes = false;
597636
if (data->skip_empty_xacts)
598637
return;
599638
pg_output_stream_start(ctx, data, txn, true);
@@ -615,8 +654,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
615654
ReorderBufferTXN *txn)
616655
{
617656
TestDecodingData *data = ctx->output_plugin_private;
657+
TestDecodingTxnData *txndata = txn->output_plugin_private;
618658

619-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
659+
if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
620660
return;
621661

622662
OutputPluginPrepareWrite(ctx, true);
@@ -634,7 +674,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
634674
{
635675
TestDecodingData *data = ctx->output_plugin_private;
636676

637-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
677+
/*
678+
* stream abort can be sent for an individual subtransaction but we
679+
* maintain the output_plugin_private only under the toptxn so if this is
680+
* not the toptxn then fetch the toptxn.
681+
*/
682+
ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
683+
TestDecodingTxnData *txndata = toptxn->output_plugin_private;
684+
bool xact_wrote_changes = txndata->xact_wrote_changes;
685+
686+
if (txn->toptxn == NULL)
687+
{
688+
Assert(txn->output_plugin_private != NULL);
689+
pfree(txndata);
690+
txn->output_plugin_private = NULL;
691+
}
692+
693+
if (data->skip_empty_xacts && !xact_wrote_changes)
638694
return;
639695

640696
OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +707,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
651707
XLogRecPtr commit_lsn)
652708
{
653709
TestDecodingData *data = ctx->output_plugin_private;
710+
TestDecodingTxnData *txndata = txn->output_plugin_private;
711+
bool xact_wrote_changes = txndata->xact_wrote_changes;
712+
713+
pfree(txndata);
714+
txn->output_plugin_private = NULL;
654715

655-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
716+
if (data->skip_empty_xacts && !xact_wrote_changes)
656717
return;
657718

658719
OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +742,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
681742
ReorderBufferChange *change)
682743
{
683744
TestDecodingData *data = ctx->output_plugin_private;
745+
TestDecodingTxnData *txndata = txn->output_plugin_private;
684746

685747
/* output stream start if we haven't yet */
686-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
748+
if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
687749
{
688750
pg_output_stream_start(ctx, data, txn, false);
689751
}
690-
data->xact_wrote_changes = true;
752+
txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
691753

692754
OutputPluginPrepareWrite(ctx, true);
693755
if (data->include_xids)
@@ -734,12 +796,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
734796
ReorderBufferChange *change)
735797
{
736798
TestDecodingData *data = ctx->output_plugin_private;
799+
TestDecodingTxnData *txndata = txn->output_plugin_private;
737800

738-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
801+
if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
739802
{
740803
pg_output_stream_start(ctx, data, txn, false);
741804
}
742-
data->xact_wrote_changes = true;
805+
txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
743806

744807
OutputPluginPrepareWrite(ctx, true);
745808
if (data->include_xids)

src/backend/replication/logical/reorderbuffer.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
402402

403403
/* InvalidCommandId is not zero, so set it explicitly */
404404
txn->command_id = InvalidCommandId;
405+
txn->output_plugin_private = NULL;
405406

406407
return txn;
407408
}

src/include/replication/reorderbuffer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
378378

379379
/* If we have detected concurrent abort then ignore future changes. */
380380
bool concurrent_abort;
381+
382+
/*
383+
* Private data pointer of the output plugin.
384+
*/
385+
void *output_plugin_private;
381386
} ReorderBufferTXN;
382387

383388
/* so we can define the callbacks used inside struct ReorderBuffer itself */

src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2505,6 +2505,7 @@ Tcl_Obj
25052505
Tcl_Time
25062506
TempNamespaceStatus
25072507
TestDecodingData
2508+
TestDecodingTxnData
25082509
TestSpec
25092510
TextFreq
25102511
TextPositionState

0 commit comments

Comments
 (0)