Skip to content

Commit c48ee88

Browse files
michaelpqpull[bot]
authored andcommitted
Move tracking of in_streaming to PGOutputData
"in_streaming" is a flag used to track if an instance of pgoutput is streaming changes. When pgoutput is started, the flag was always reset, switched it back and forth in the stream start/stop callbacks. Before this commit, it was a global variable, which is confusing as it is actually attached to a state of PGOutputData. Per my analysis, using a global variable did not lead to an active bug like in 54ccfd6, but it makes the code more consistent. Note that we cannot backpatch this change anyway as it requires the addition of a new field to PGOutputData, exposed in pgoutput.h. Author: Hou Zhijie Reviewed-by: Amit Kapila, Michael Paquier, Peter Smith Discussion: https://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent 99cf29c commit c48ee88

File tree

2 files changed

+21
-16
lines changed

2 files changed

+21
-16
lines changed

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
8181
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
8282

8383
static bool publications_valid;
84-
static bool in_streaming;
8584

8685
static List *LoadPublications(List *pubnames);
8786
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -480,9 +479,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
480479
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
481480
errmsg("streaming requested, but not supported by output plugin")));
482481

483-
/* Also remember we're currently not streaming any transaction. */
484-
in_streaming = false;
485-
486482
/*
487483
* Here, we just check whether the two-phase option is passed by
488484
* plugin and decide whether to enable it at later point of time. It
@@ -680,6 +676,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
680676
ReorderBufferChange *change,
681677
Relation relation, RelationSyncEntry *relentry)
682678
{
679+
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
683680
bool schema_sent;
684681
TransactionId xid = InvalidTransactionId;
685682
TransactionId topxid = InvalidTransactionId;
@@ -692,7 +689,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
692689
* If we're not in a streaming block, just use InvalidTransactionId and
693690
* the write methods will not include it.
694691
*/
695-
if (in_streaming)
692+
if (data->in_streaming)
696693
xid = change->txn->xid;
697694

698695
if (rbtxn_is_subtxn(change->txn))
@@ -712,7 +709,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
712709
* doing that we need to study its impact on the case where we have a mix
713710
* of streaming and non-streaming transactions.
714711
*/
715-
if (in_streaming)
712+
if (data->in_streaming)
716713
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
717714
else
718715
schema_sent = relentry->schema_sent;
@@ -736,7 +733,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
736733

737734
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
738735

739-
if (in_streaming)
736+
if (data->in_streaming)
740737
set_schema_sent_in_streamed_txn(relentry, topxid);
741738
else
742739
relentry->schema_sent = true;
@@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14221419
* their association and on aborts, it can discard the corresponding
14231420
* changes.
14241421
*/
1425-
if (in_streaming)
1422+
if (data->in_streaming)
14261423
xid = change->txn->xid;
14271424

14281425
relentry = get_rel_sync_entry(data, relation);
@@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
15711568
TransactionId xid = InvalidTransactionId;
15721569

15731570
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
1574-
if (in_streaming)
1571+
if (data->in_streaming)
15751572
xid = change->txn->xid;
15761573

15771574
old = MemoryContextSwitchTo(data->context);
@@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
16401637
* Remember the xid for the message in streaming mode. See
16411638
* pgoutput_change.
16421639
*/
1643-
if (in_streaming)
1640+
if (data->in_streaming)
16441641
xid = txn->xid;
16451642

16461643
/*
@@ -1743,10 +1740,11 @@ static void
17431740
pgoutput_stream_start(struct LogicalDecodingContext *ctx,
17441741
ReorderBufferTXN *txn)
17451742
{
1743+
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
17461744
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
17471745

17481746
/* we can't nest streaming of transactions */
1749-
Assert(!in_streaming);
1747+
Assert(!data->in_streaming);
17501748

17511749
/*
17521750
* If we already sent the first stream for this transaction then don't
@@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
17641762
OutputPluginWrite(ctx, true);
17651763

17661764
/* we're streaming a chunk of transaction now */
1767-
in_streaming = true;
1765+
data->in_streaming = true;
17681766
}
17691767

17701768
/*
@@ -1774,15 +1772,17 @@ static void
17741772
pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
17751773
ReorderBufferTXN *txn)
17761774
{
1775+
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1776+
17771777
/* we should be streaming a transaction */
1778-
Assert(in_streaming);
1778+
Assert(data->in_streaming);
17791779

17801780
OutputPluginPrepareWrite(ctx, true);
17811781
logicalrep_write_stream_stop(ctx->out);
17821782
OutputPluginWrite(ctx, true);
17831783

17841784
/* we've stopped streaming a transaction */
1785-
in_streaming = false;
1785+
data->in_streaming = false;
17861786
}
17871787

17881788
/*
@@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
18021802
* The abort should happen outside streaming block, even for streamed
18031803
* transactions. The transaction has to be marked as streamed, though.
18041804
*/
1805-
Assert(!in_streaming);
1805+
Assert(!data->in_streaming);
18061806

18071807
/* determine the toplevel transaction */
18081808
toptxn = rbtxn_get_toptxn(txn);
@@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
18271827
ReorderBufferTXN *txn,
18281828
XLogRecPtr commit_lsn)
18291829
{
1830+
PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1831+
18301832
/*
18311833
* The commit should happen outside streaming block, even for streamed
18321834
* transactions. The transaction has to be marked as streamed, though.
18331835
*/
1834-
Assert(!in_streaming);
1836+
Assert(!data->in_streaming);
18351837
Assert(rbtxn_is_streamed(txn));
18361838

18371839
OutputPluginUpdateProgress(ctx, false);

src/include/replication/pgoutput.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ typedef struct PGOutputData
2121
* allocations */
2222
MemoryContext cachectx; /* private memory context for cache data */
2323

24+
bool in_streaming; /* true if we are streaming a chunk of
25+
* transaction */
26+
2427
/* client-supplied info: */
2528
uint32 protocol_version;
2629
List *publication_names;

0 commit comments

Comments
 (0)