@@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
118
118
* Helper function for creating a new logical replication slot with
119
119
* given arguments. Note that this function doesn't release the created
120
120
* slot.
121
+ *
122
+ * When find_startpoint is false, the slot's confirmed_flush is not set; it's
123
+ * caller's responsibility to ensure it's set to something sensible.
121
124
*/
122
125
static void
123
126
create_logical_replication_slot (char * name , char * plugin ,
124
- bool temporary , XLogRecPtr restart_lsn )
127
+ bool temporary , XLogRecPtr restart_lsn ,
128
+ bool find_startpoint )
125
129
{
126
130
LogicalDecodingContext * ctx = NULL ;
127
131
@@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin,
139
143
temporary ? RS_TEMPORARY : RS_EPHEMERAL );
140
144
141
145
/*
142
- * Create logical decoding context, to build the initial snapshot.
146
+ * Create logical decoding context to find start point or, if we don't
147
+ * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
148
+ *
149
+ * Note: when !find_startpoint this is still important, because it's at
150
+ * this point that the output plugin is validated.
143
151
*/
144
152
ctx = CreateInitDecodingContext (plugin , NIL ,
145
- false, /* do not build snapshot */
153
+ false, /* just catalogs is OK */
146
154
restart_lsn ,
147
155
logical_read_local_xlog_page , NULL , NULL ,
148
156
NULL );
149
157
150
- /* build initial snapshot, might take a while */
151
- DecodingContextFindStartpoint (ctx );
158
+ /*
159
+ * If caller needs us to determine the decoding start point, do so now.
160
+ * This might take a while.
161
+ */
162
+ if (find_startpoint )
163
+ DecodingContextFindStartpoint (ctx );
152
164
153
165
/* don't need the decoding context anymore */
154
166
FreeDecodingContext (ctx );
@@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
179
191
create_logical_replication_slot (NameStr (* name ),
180
192
NameStr (* plugin ),
181
193
temporary ,
182
- InvalidXLogRecPtr );
194
+ InvalidXLogRecPtr ,
195
+ true);
183
196
184
197
values [0 ] = NameGetDatum (& MyReplicationSlot -> data .name );
185
198
values [1 ] = LSNGetDatum (MyReplicationSlot -> data .confirmed_flush );
@@ -691,10 +704,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
691
704
692
705
/* Create new slot and acquire it */
693
706
if (logical_slot )
707
+ {
708
+ /*
709
+ * We must not try to read WAL, since we haven't reserved it yet --
710
+ * hence pass find_startpoint false. confirmed_flush will be set
711
+ * below, by copying from the source slot.
712
+ */
694
713
create_logical_replication_slot (NameStr (* dst_name ),
695
714
plugin ,
696
715
temporary ,
697
- src_restart_lsn );
716
+ src_restart_lsn ,
717
+ false);
718
+ }
698
719
else
699
720
create_physical_replication_slot (NameStr (* dst_name ),
700
721
true,
@@ -711,6 +732,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
711
732
TransactionId copy_xmin ;
712
733
TransactionId copy_catalog_xmin ;
713
734
XLogRecPtr copy_restart_lsn ;
735
+ XLogRecPtr copy_confirmed_flush ;
714
736
bool copy_islogical ;
715
737
char * copy_name ;
716
738
@@ -722,6 +744,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
722
744
copy_xmin = src -> data .xmin ;
723
745
copy_catalog_xmin = src -> data .catalog_xmin ;
724
746
copy_restart_lsn = src -> data .restart_lsn ;
747
+ copy_confirmed_flush = src -> data .confirmed_flush ;
725
748
726
749
/* for existence check */
727
750
copy_name = pstrdup (NameStr (src -> data .name ));
@@ -746,6 +769,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
746
769
NameStr (* src_name )),
747
770
errdetail ("The source replication slot was modified incompatibly during the copy operation." )));
748
771
772
+ /* The source slot must have a consistent snapshot */
773
+ if (src_islogical && XLogRecPtrIsInvalid (copy_confirmed_flush ))
774
+ ereport (ERROR ,
775
+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
776
+ errmsg ("cannot copy unfinished logical replication slot \"%s\"" ,
777
+ NameStr (* src_name )),
778
+ errhint ("Retry when the source replication slot's confirmed_flush_lsn is valid." )));
779
+
749
780
/* Install copied values again */
750
781
SpinLockAcquire (& MyReplicationSlot -> mutex );
751
782
MyReplicationSlot -> effective_xmin = copy_effective_xmin ;
@@ -754,6 +785,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
754
785
MyReplicationSlot -> data .xmin = copy_xmin ;
755
786
MyReplicationSlot -> data .catalog_xmin = copy_catalog_xmin ;
756
787
MyReplicationSlot -> data .restart_lsn = copy_restart_lsn ;
788
+ MyReplicationSlot -> data .confirmed_flush = copy_confirmed_flush ;
757
789
SpinLockRelease (& MyReplicationSlot -> mutex );
758
790
759
791
ReplicationSlotMarkDirty ();
0 commit comments