Skip to content

Commit 7326a7d

Browse files
committed
Fix logical replication slot initialization
This was broken in commit 9c7d06d, which inadvertently gave the wrong value to fast_forward in one StartupDecodingContext call. Fix by flipping the value. Add a test for the obvious error, namely trying to initialize a replication slot with an nonexistent output plugin. While at it, move the CreateDecodingContext call earlier, so that any errors are reported before sending the CopyBoth message. Author: Dave Cramer <davecramer@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Discussion: https://postgr.es/m/CADK3HHLVkeRe1v4P02-5hj55H3_yJg3AEtpXyEY5T3wuzO2jSg@mail.gmail.com
1 parent 34295b8 commit 7326a7d

File tree

4 files changed

+18
-11
lines changed

4 files changed

+18
-11
lines changed

contrib/test_decoding/expected/slot.out

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'tes
3030
init
3131
(1 row)
3232

33+
SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
34+
ERROR: could not access file "nonexistent": No such file or directory
3335
-- here we want to start a new session and wait till old one is gone
3436
select pg_backend_pid() as oldpid \gset
3537
\c -

contrib/test_decoding/sql/slot.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test
99

1010
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true);
1111

12+
SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
13+
1214
-- here we want to start a new session and wait till old one is gone
1315
select pg_backend_pid() as oldpid \gset
1416
\c -

src/backend/replication/logical/logical.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ CreateInitDecodingContext(char *plugin,
312312
ReplicationSlotSave();
313313

314314
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
315-
need_full_snapshot, true,
315+
need_full_snapshot, false,
316316
read_page, prepare_write, do_write,
317317
update_progress);
318318

src/backend/replication/walsender.c

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,19 @@ StartLogicalReplication(StartReplicationCmd *cmd)
10611061
got_STOPPING = true;
10621062
}
10631063

1064+
/*
1065+
* Create our decoding context, making it start at the previously ack'ed
1066+
* position.
1067+
*
1068+
* Do this before sending CopyBoth, so that any errors are reported early.
1069+
*/
1070+
logical_decoding_ctx =
1071+
CreateDecodingContext(cmd->startpoint, cmd->options, false,
1072+
logical_read_xlog_page,
1073+
WalSndPrepareWrite, WalSndWriteData,
1074+
WalSndUpdateProgress);
1075+
1076+
10641077
WalSndSetState(WALSNDSTATE_CATCHUP);
10651078

10661079
/* Send a CopyBothResponse message, and start streaming */
@@ -1070,16 +1083,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
10701083
pq_endmessage(&buf);
10711084
pq_flush();
10721085

1073-
/*
1074-
* Initialize position to the last ack'ed one, then the xlog records begin
1075-
* to be shipped from that position.
1076-
*/
1077-
logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
1078-
false,
1079-
logical_read_xlog_page,
1080-
WalSndPrepareWrite,
1081-
WalSndWriteData,
1082-
WalSndUpdateProgress);
10831086

10841087
/* Start reading WAL from the oldest required WAL. */
10851088
logical_startptr = MyReplicationSlot->data.restart_lsn;

0 commit comments

Comments
 (0)