Skip to content

Commit 8d05be9

Browse files
author
Amit Kapila
committed
Fix the misuse of origin filter across multiple pg_logical_slot_get_changes() calls.
The pgoutput module uses a global variable (publish_no_origin) to cache the action for the origin filter, but we didn't reset the flag when shutting down the output plugin, so subsequent retries may access the previous publish_no_origin value. We fix this by storing the flag in the output plugin's private data. Additionally, the patch removes the currently unused origin string from the structure. For the back branch, to avoid changing the exposed structure, we eliminated the global variable and instead directly used the origin string for change filtering. Author: Hou Zhijie Reviewed-by: Amit Kapila, Michael Paquier Backpatch-through: 16 Discussion: http://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent 641db60 commit 8d05be9

File tree

3 files changed

+85
-7
lines changed

3 files changed

+85
-7
lines changed

contrib/test_decoding/expected/replorigin.out

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn
267267

268268
(1 row)
269269

270+
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
271+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
272+
?column?
273+
----------
274+
init
275+
(1 row)
276+
277+
CREATE PUBLICATION pub FOR TABLE target_tbl;
278+
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
279+
pg_replication_origin_create
280+
------------------------------
281+
1
282+
(1 row)
283+
284+
-- mark session as replaying
285+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
286+
pg_replication_origin_session_setup
287+
-------------------------------------
288+
289+
(1 row)
290+
291+
INSERT INTO target_tbl(data) VALUES ('test data');
292+
-- The replayed change will be filtered.
293+
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
294+
?column?
295+
----------
296+
t
297+
(1 row)
298+
299+
-- The replayed change will be output if the origin value is not specified.
300+
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
301+
?column?
302+
----------
303+
t
304+
(1 row)
305+
306+
-- Clean up
307+
SELECT pg_replication_origin_session_reset();
308+
pg_replication_origin_session_reset
309+
-------------------------------------
310+
311+
(1 row)
312+
313+
SELECT pg_drop_replication_slot('regression_slot');
314+
pg_drop_replication_slot
315+
--------------------------
316+
317+
(1 row)
318+
319+
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
320+
pg_replication_origin_drop
321+
----------------------------
322+
323+
(1 row)
324+
325+
DROP PUBLICATION pub;

contrib/test_decoding/sql/replorigin.sql

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
124124
SELECT pg_replication_origin_session_reset();
125125
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
126126
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
127+
128+
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
129+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
130+
CREATE PUBLICATION pub FOR TABLE target_tbl;
131+
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
132+
133+
-- mark session as replaying
134+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
135+
136+
INSERT INTO target_tbl(data) VALUES ('test data');
137+
138+
-- The replayed change will be filtered.
139+
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
140+
141+
-- The replayed change will be output if the origin value is not specified.
142+
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
143+
144+
-- Clean up
145+
SELECT pg_replication_origin_session_reset();
146+
SELECT pg_drop_replication_slot('regression_slot');
147+
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
148+
DROP PUBLICATION pub;

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
8282

8383
static bool publications_valid;
8484
static bool in_streaming;
85-
static bool publish_no_origin;
8685

8786
static List *LoadPublications(List *pubnames);
8887
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -388,11 +387,9 @@ parse_output_parameters(List *options, PGOutputData *data)
388387
origin_option_given = true;
389388

390389
data->origin = defGetString(defel);
391-
if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
392-
publish_no_origin = true;
393-
else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
394-
publish_no_origin = false;
395-
else
390+
391+
if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) != 0 &&
392+
pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) != 0)
396393
ereport(ERROR,
397394
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398395
errmsg("unrecognized origin value: \"%s\"", data->origin));
@@ -1673,7 +1670,10 @@ static bool
16731670
pgoutput_origin_filter(LogicalDecodingContext *ctx,
16741671
RepOriginId origin_id)
16751672
{
1676-
if (publish_no_origin && origin_id != InvalidRepOriginId)
1673+
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1674+
1675+
if (data->origin && (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0) &&
1676+
origin_id != InvalidRepOriginId)
16771677
return true;
16781678

16791679
return false;

0 commit comments

Comments
 (0)