Skip to content

Commit 54ccfd6

Browse files
author
Amit Kapila
committed
Fix the misuse of origin filter across multiple pg_logical_slot_get_changes() calls.
The pgoutput module uses a global variable (publish_no_origin) to cache the action for the origin filter, but we didn't reset the flag when shutting down the output plugin, so subsequent retries may access the previous publish_no_origin value. We fix this by storing the flag in the output plugin's private data. Additionally, the patch removes the currently unused origin string from the structure. For the back branch, to avoid changing the exposed structure, we eliminated the global variable and instead directly used the origin string for change filtering. Author: Hou Zhijie Reviewed-by: Amit Kapila, Michael Paquier Backpatch-through: 16 Discussion: http://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent 6fc3a13 commit 54ccfd6

File tree

4 files changed

+90
-9
lines changed

4 files changed

+90
-9
lines changed

contrib/test_decoding/expected/replorigin.out

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn
267267

268268
(1 row)
269269

270+
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
271+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
272+
?column?
273+
----------
274+
init
275+
(1 row)
276+
277+
CREATE PUBLICATION pub FOR TABLE target_tbl;
278+
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
279+
pg_replication_origin_create
280+
------------------------------
281+
1
282+
(1 row)
283+
284+
-- mark session as replaying
285+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
286+
pg_replication_origin_session_setup
287+
-------------------------------------
288+
289+
(1 row)
290+
291+
INSERT INTO target_tbl(data) VALUES ('test data');
292+
-- The replayed change will be filtered.
293+
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
294+
?column?
295+
----------
296+
t
297+
(1 row)
298+
299+
-- The replayed change will be output if the origin value is not specified.
300+
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
301+
?column?
302+
----------
303+
t
304+
(1 row)
305+
306+
-- Clean up
307+
SELECT pg_replication_origin_session_reset();
308+
pg_replication_origin_session_reset
309+
-------------------------------------
310+
311+
(1 row)
312+
313+
SELECT pg_drop_replication_slot('regression_slot');
314+
pg_drop_replication_slot
315+
--------------------------
316+
317+
(1 row)
318+
319+
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
320+
pg_replication_origin_drop
321+
----------------------------
322+
323+
(1 row)
324+
325+
DROP PUBLICATION pub;

contrib/test_decoding/sql/replorigin.sql

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
124124
SELECT pg_replication_origin_session_reset();
125125
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
126126
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
127+
128+
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
129+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
130+
CREATE PUBLICATION pub FOR TABLE target_tbl;
131+
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
132+
133+
-- mark session as replaying
134+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
135+
136+
INSERT INTO target_tbl(data) VALUES ('test data');
137+
138+
-- The replayed change will be filtered.
139+
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
140+
141+
-- The replayed change will be output if the origin value is not specified.
142+
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
143+
144+
-- Clean up
145+
SELECT pg_replication_origin_session_reset();
146+
SELECT pg_drop_replication_slot('regression_slot');
147+
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
148+
DROP PUBLICATION pub;

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
8282

8383
static bool publications_valid;
8484
static bool in_streaming;
85-
static bool publish_no_origin;
8685

8786
static List *LoadPublications(List *pubnames);
8887
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -381,21 +380,23 @@ parse_output_parameters(List *options, PGOutputData *data)
381380
}
382381
else if (strcmp(defel->defname, "origin") == 0)
383382
{
383+
char *origin;
384+
384385
if (origin_option_given)
385386
ereport(ERROR,
386387
errcode(ERRCODE_SYNTAX_ERROR),
387388
errmsg("conflicting or redundant options"));
388389
origin_option_given = true;
389390

390-
data->origin = defGetString(defel);
391-
if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
392-
publish_no_origin = true;
393-
else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
394-
publish_no_origin = false;
391+
origin = defGetString(defel);
392+
if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
393+
data->publish_no_origin = true;
394+
else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
395+
data->publish_no_origin = false;
395396
else
396397
ereport(ERROR,
397398
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398-
errmsg("unrecognized origin value: \"%s\"", data->origin));
399+
errmsg("unrecognized origin value: \"%s\"", origin));
399400
}
400401
else
401402
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -1673,7 +1674,9 @@ static bool
16731674
pgoutput_origin_filter(LogicalDecodingContext *ctx,
16741675
RepOriginId origin_id)
16751676
{
1676-
if (publish_no_origin && origin_id != InvalidRepOriginId)
1677+
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1678+
1679+
if (data->publish_no_origin && origin_id != InvalidRepOriginId)
16771680
return true;
16781681

16791682
return false;

src/include/replication/pgoutput.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ typedef struct PGOutputData
2929
char streaming;
3030
bool messages;
3131
bool two_phase;
32-
char *origin;
32+
bool publish_no_origin;
3333
} PGOutputData;
3434

3535
#endif /* PGOUTPUT_H */

0 commit comments

Comments
 (0)