@@ -162,22 +162,78 @@ static void update_synced_slots_inactive_since(void);
162
162
* *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
163
163
* modified, and decoding from the corresponding LSN's can reach a
164
164
* consistent snapshot.
165
+ *
166
+ * *remote_slot_precedes will be true if the remote slot's LSN or xmin
167
+ * precedes locally reserved position.
165
168
*/
166
169
static bool
167
170
update_local_synced_slot (RemoteSlot * remote_slot , Oid remote_dbid ,
168
- bool * found_consistent_snapshot )
171
+ bool * found_consistent_snapshot ,
172
+ bool * remote_slot_precedes )
169
173
{
170
174
ReplicationSlot * slot = MyReplicationSlot ;
171
- bool slot_updated = false;
175
+ bool updated_xmin_or_lsn = false;
176
+ bool updated_config = false;
172
177
173
178
Assert (slot -> data .invalidated == RS_INVAL_NONE );
174
179
175
180
if (found_consistent_snapshot )
176
181
* found_consistent_snapshot = false;
177
182
178
- if (remote_slot -> confirmed_lsn != slot -> data .confirmed_flush ||
179
- remote_slot -> restart_lsn != slot -> data .restart_lsn ||
180
- remote_slot -> catalog_xmin != slot -> data .catalog_xmin )
183
+ if (remote_slot_precedes )
184
+ * remote_slot_precedes = false;
185
+
186
+ /*
187
+ * Don't overwrite if we already have a newer catalog_xmin and
188
+ * restart_lsn.
189
+ */
190
+ if (remote_slot -> restart_lsn < slot -> data .restart_lsn ||
191
+ TransactionIdPrecedes (remote_slot -> catalog_xmin ,
192
+ slot -> data .catalog_xmin ))
193
+ {
194
+ /*
195
+ * This can happen in following situations:
196
+ *
197
+ * If the slot is temporary, it means either the initial WAL location
198
+ * reserved for the local slot is ahead of the remote slot's
199
+ * restart_lsn or the initial xmin_horizon computed for the local slot
200
+ * is ahead of the remote slot.
201
+ *
202
+ * If the slot is persistent, restart_lsn of the synced slot could
203
+ * still be ahead of the remote slot. Since we use slot advance
204
+ * functionality to keep snapbuild/slot updated, it is possible that
205
+ * the restart_lsn is advanced to a later position than it has on the
206
+ * primary. This can happen when slot advancing machinery finds
207
+ * running xacts record after reaching the consistent state at a later
208
+ * point than the primary where it serializes the snapshot and updates
209
+ * the restart_lsn.
210
+ *
211
+ * We LOG the message if the slot is temporary as it can help the user
212
+ * to understand why the slot is not sync-ready. In the case of a
213
+ * persistent slot, it would be a more common case and won't directly
214
+ * impact the users, so we used DEBUG1 level to log the message.
215
+ */
216
+ ereport (slot -> data .persistency == RS_TEMPORARY ? LOG : DEBUG1 ,
217
+ errmsg ("could not sync slot \"%s\" as remote slot precedes local slot" ,
218
+ remote_slot -> name ),
219
+ errdetail ("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u." ,
220
+ LSN_FORMAT_ARGS (remote_slot -> restart_lsn ),
221
+ remote_slot -> catalog_xmin ,
222
+ LSN_FORMAT_ARGS (slot -> data .restart_lsn ),
223
+ slot -> data .catalog_xmin ));
224
+
225
+ if (remote_slot_precedes )
226
+ * remote_slot_precedes = true;
227
+ }
228
+
229
+ /*
230
+ * Attempt to sync LSNs and xmins only if remote slot is ahead of local
231
+ * slot.
232
+ */
233
+ else if (remote_slot -> confirmed_lsn > slot -> data .confirmed_flush ||
234
+ remote_slot -> restart_lsn > slot -> data .restart_lsn ||
235
+ TransactionIdFollows (remote_slot -> catalog_xmin ,
236
+ slot -> data .catalog_xmin ))
181
237
{
182
238
/*
183
239
* We can't directly copy the remote slot's LSN or xmin unless there
@@ -198,7 +254,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
198
254
slot -> data .restart_lsn = remote_slot -> restart_lsn ;
199
255
slot -> data .confirmed_flush = remote_slot -> confirmed_lsn ;
200
256
slot -> data .catalog_xmin = remote_slot -> catalog_xmin ;
201
- slot -> effective_catalog_xmin = remote_slot -> catalog_xmin ;
202
257
SpinLockRelease (& slot -> mutex );
203
258
204
259
if (found_consistent_snapshot )
@@ -208,12 +263,18 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
208
263
{
209
264
LogicalSlotAdvanceAndCheckSnapState (remote_slot -> confirmed_lsn ,
210
265
found_consistent_snapshot );
211
- }
212
266
213
- ReplicationSlotsComputeRequiredXmin (false);
214
- ReplicationSlotsComputeRequiredLSN ();
267
+ /* Sanity check */
268
+ if (slot -> data .confirmed_flush != remote_slot -> confirmed_lsn )
269
+ ereport (ERROR ,
270
+ errmsg_internal ("synchronized confirmed_flush for slot \"%s\" differs from remote slot" ,
271
+ remote_slot -> name ),
272
+ errdetail_internal ("Remote slot has LSN %X/%X but local slot has LSN %X/%X." ,
273
+ LSN_FORMAT_ARGS (remote_slot -> confirmed_lsn ),
274
+ LSN_FORMAT_ARGS (slot -> data .confirmed_flush )));
275
+ }
215
276
216
- slot_updated = true;
277
+ updated_xmin_or_lsn = true;
217
278
}
218
279
219
280
if (remote_dbid != slot -> data .database ||
@@ -233,10 +294,37 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
233
294
slot -> data .failover = remote_slot -> failover ;
234
295
SpinLockRelease (& slot -> mutex );
235
296
236
- slot_updated = true;
297
+ updated_config = true;
237
298
}
238
299
239
- return slot_updated ;
300
+ /*
301
+ * We have to write the changed xmin to disk *before* we change the
302
+ * in-memory value, otherwise after a crash we wouldn't know that some
303
+ * catalog tuples might have been removed already.
304
+ */
305
+ if (updated_config || updated_xmin_or_lsn )
306
+ {
307
+ ReplicationSlotMarkDirty ();
308
+ ReplicationSlotSave ();
309
+ }
310
+
311
+ /*
312
+ * Now the new xmin is safely on disk, we can let the global value
313
+ * advance. We do not take ProcArrayLock or similar since we only advance
314
+ * xmin here and there's not much harm done by a concurrent computation
315
+ * missing that.
316
+ */
317
+ if (updated_xmin_or_lsn )
318
+ {
319
+ SpinLockAcquire (& slot -> mutex );
320
+ slot -> effective_catalog_xmin = remote_slot -> catalog_xmin ;
321
+ SpinLockRelease (& slot -> mutex );
322
+
323
+ ReplicationSlotsComputeRequiredXmin (false);
324
+ ReplicationSlotsComputeRequiredLSN ();
325
+ }
326
+
327
+ return updated_config || updated_xmin_or_lsn ;
240
328
}
241
329
242
330
/*
@@ -460,14 +548,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
460
548
{
461
549
ReplicationSlot * slot = MyReplicationSlot ;
462
550
bool found_consistent_snapshot = false;
551
+ bool remote_slot_precedes = false;
552
+
553
+ (void ) update_local_synced_slot (remote_slot , remote_dbid ,
554
+ & found_consistent_snapshot ,
555
+ & remote_slot_precedes );
463
556
464
557
/*
465
558
* Check if the primary server has caught up. Refer to the comment atop
466
559
* the file for details on this check.
467
560
*/
468
- if (remote_slot -> restart_lsn < slot -> data .restart_lsn ||
469
- TransactionIdPrecedes (remote_slot -> catalog_xmin ,
470
- slot -> data .catalog_xmin ))
561
+ if (remote_slot_precedes )
471
562
{
472
563
/*
473
564
* The remote slot didn't catch up to locally reserved position.
@@ -476,23 +567,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
476
567
* current location when recreating the slot in the next cycle. It may
477
568
* take more time to create such a slot. Therefore, we keep this slot
478
569
* and attempt the synchronization in the next cycle.
479
- *
480
- * XXX should this be changed to elog(DEBUG1) perhaps?
481
570
*/
482
- ereport (LOG ,
483
- errmsg ("could not sync slot \"%s\" as remote slot precedes local slot" ,
484
- remote_slot -> name ),
485
- errdetail ("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u." ,
486
- LSN_FORMAT_ARGS (remote_slot -> restart_lsn ),
487
- remote_slot -> catalog_xmin ,
488
- LSN_FORMAT_ARGS (slot -> data .restart_lsn ),
489
- slot -> data .catalog_xmin ));
490
571
return false;
491
572
}
492
573
493
- (void ) update_local_synced_slot (remote_slot , remote_dbid ,
494
- & found_consistent_snapshot );
495
-
496
574
/*
497
575
* Don't persist the slot if it cannot reach the consistent point from the
498
576
* restart_lsn. See comments atop this file.
@@ -633,23 +711,20 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
633
711
/*
634
712
* Sanity check: As long as the invalidations are handled
635
713
* appropriately as above, this should never happen.
714
+ *
715
+ * We don't need to check restart_lsn here. See the comments in
716
+ * update_local_synced_slot() for details.
636
717
*/
637
- if (remote_slot -> restart_lsn < slot -> data .restart_lsn )
638
- elog (ERROR ,
639
- "cannot synchronize local slot \"%s\" LSN(%X/%X)"
640
- " to remote slot's LSN(%X/%X) as synchronization"
641
- " would move it backwards" , remote_slot -> name ,
642
- LSN_FORMAT_ARGS (slot -> data .restart_lsn ),
643
- LSN_FORMAT_ARGS (remote_slot -> restart_lsn ));
644
-
645
- /* Make sure the slot changes persist across server restart */
646
- if (update_local_synced_slot (remote_slot , remote_dbid , NULL ))
647
- {
648
- ReplicationSlotMarkDirty ();
649
- ReplicationSlotSave ();
650
-
651
- slot_updated = true;
652
- }
718
+ if (remote_slot -> confirmed_lsn < slot -> data .confirmed_flush )
719
+ ereport (ERROR ,
720
+ errmsg_internal ("cannot synchronize local slot \"%s\"" ,
721
+ remote_slot -> name ),
722
+ errdetail_internal ("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X)." ,
723
+ LSN_FORMAT_ARGS (slot -> data .confirmed_flush ),
724
+ LSN_FORMAT_ARGS (remote_slot -> confirmed_lsn )));
725
+
726
+ slot_updated = update_local_synced_slot (remote_slot , remote_dbid ,
727
+ NULL , NULL );
653
728
}
654
729
}
655
730
/* Otherwise create the slot first. */
0 commit comments