@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
317
317
}
318
318
319
319
/*
320
- * Helper function for advancing physical replication slot forward.
321
- * The LSN position to move to is compared simply to the slot's
322
- * restart_lsn, knowing that any position older than that would be
323
- * removed by successive checkpoints.
320
+ * Helper function for advancing our physical replication slot forward.
321
+ *
322
+ * The LSN position to move to is compared simply to the slot's restart_lsn,
323
+ * knowing that any position older than that would be removed by successive
324
+ * checkpoints.
324
325
*/
325
326
static XLogRecPtr
326
327
pg_physical_replication_slot_advance (XLogRecPtr moveto )
@@ -340,59 +341,78 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
340
341
}
341
342
342
343
/*
343
- * Helper function for advancing logical replication slot forward.
344
+ * Helper function for advancing our logical replication slot forward.
345
+ *
344
346
* The slot's restart_lsn is used as start point for reading records,
345
347
* while confirmed_lsn is used as base point for the decoding context.
346
- * The LSN position to move to is checked by doing a per-record scan and
347
- * logical decoding which makes sure that confirmed_lsn is updated to a
348
- * LSN which allows the future slot consumer to get consistent logical
349
- * changes.
348
+ *
349
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
350
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
351
+ * WAL and removal of old catalog tuples. As decoding is done in fast_forward
352
+ * mode, no changes are generated anyway.
350
353
*/
351
354
static XLogRecPtr
352
355
pg_logical_replication_slot_advance (XLogRecPtr moveto )
353
356
{
354
357
LogicalDecodingContext * ctx ;
355
358
ResourceOwner old_resowner = CurrentResourceOwner ;
356
- XLogRecPtr startlsn = MyReplicationSlot -> data . restart_lsn ;
357
- XLogRecPtr retlsn = MyReplicationSlot -> data . confirmed_flush ;
359
+ XLogRecPtr startlsn ;
360
+ XLogRecPtr retlsn ;
358
361
359
362
PG_TRY ();
360
363
{
361
- /* restart at slot's confirmed_flush */
364
+ /*
365
+ * Create our decoding context in fast_forward mode, passing start_lsn
366
+ * as InvalidXLogRecPtr, so that we start processing from my slot's
367
+ * confirmed_flush.
368
+ */
362
369
ctx = CreateDecodingContext (InvalidXLogRecPtr ,
363
370
NIL ,
364
- true,
371
+ true, /* fast_forward */
365
372
logical_read_local_xlog_page ,
366
373
NULL , NULL , NULL );
367
374
375
+ /*
376
+ * Start reading at the slot's restart_lsn, which we know to point to
377
+ * a valid record.
378
+ */
379
+ startlsn = MyReplicationSlot -> data .restart_lsn ;
380
+
381
+ /* Initialize our return value in case we don't do anything */
382
+ retlsn = MyReplicationSlot -> data .confirmed_flush ;
383
+
368
384
/* invalidate non-timetravel entries */
369
385
InvalidateSystemCaches ();
370
386
371
- /* Decode until we run out of records */
372
- while ((startlsn != InvalidXLogRecPtr && startlsn < moveto ) ||
373
- (ctx -> reader -> EndRecPtr != InvalidXLogRecPtr && ctx -> reader -> EndRecPtr < moveto ))
387
+ /* Decode at least one record, until we run out of records */
388
+ while ((!XLogRecPtrIsInvalid (startlsn ) &&
389
+ startlsn < moveto ) ||
390
+ (!XLogRecPtrIsInvalid (ctx -> reader -> EndRecPtr ) &&
391
+ ctx -> reader -> EndRecPtr < moveto ))
374
392
{
375
- XLogRecord * record ;
376
393
char * errm = NULL ;
394
+ XLogRecord * record ;
377
395
396
+ /*
397
+ * Read records. No changes are generated in fast_forward mode,
398
+ * but snapbuilder/slot statuses are updated properly.
399
+ */
378
400
record = XLogReadRecord (ctx -> reader , startlsn , & errm );
379
401
if (errm )
380
402
elog (ERROR , "%s" , errm );
381
403
382
- /*
383
- * Now that we've set up the xlog reader state, subsequent calls
384
- * pass InvalidXLogRecPtr to say "continue from last record"
385
- */
404
+ /* Read sequentially from now on */
386
405
startlsn = InvalidXLogRecPtr ;
387
406
388
407
/*
389
- * The {begin_txn,change,commit_txn}_wrapper callbacks above will
390
- * store the description into our tuplestore.
408
+ * Process the record. Storage-level changes are ignored in
409
+ * fast_forward mode, but other modules (such as snapbuilder)
410
+ * might still have critical updates to do.
391
411
*/
392
- if (record != NULL )
412
+ if (record )
393
413
LogicalDecodingProcessRecord (ctx , ctx -> reader );
394
414
395
- /* Stop once the moving point wanted by caller has been reached */
415
+ /* Stop once the requested target has been reached */
396
416
if (moveto <= ctx -> reader -> EndRecPtr )
397
417
break ;
398
418
@@ -411,7 +431,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
411
431
LogicalConfirmReceivedLocation (moveto );
412
432
413
433
/*
414
- * If only the confirmed_flush_lsn has changed the slot won't get
434
+ * If only the confirmed_flush LSN has changed the slot won't get
415
435
* marked as dirty by the above. Callers on the walsender
416
436
* interface are expected to keep track of their own progress and
417
437
* don't need it written out. But SQL-interface users cannot
0 commit comments