@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
317
317
}
318
318
319
319
/*
320
- * Helper function for advancing physical replication slot forward.
321
- * The LSN position to move to is compared simply to the slot's
322
- * restart_lsn, knowing that any position older than that would be
323
- * removed by successive checkpoints.
320
+ * Helper function for advancing our physical replication slot forward.
321
+ *
322
+ * The LSN position to move to is compared simply to the slot's restart_lsn,
323
+ * knowing that any position older than that would be removed by successive
324
+ * checkpoints.
324
325
*/
325
326
static XLogRecPtr
326
327
pg_physical_replication_slot_advance (XLogRecPtr moveto )
@@ -340,76 +341,97 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
340
341
}
341
342
342
343
/*
343
- * Helper function for advancing logical replication slot forward.
344
+ * Helper function for advancing our logical replication slot forward.
345
+ *
344
346
* The slot's restart_lsn is used as start point for reading records,
345
347
* while confirmed_lsn is used as base point for the decoding context.
346
- * The LSN position to move to is checked by doing a per-record scan and
347
- * logical decoding which makes sure that confirmed_lsn is updated to a
348
- * LSN which allows the future slot consumer to get consistent logical
349
- * changes.
348
+ *
349
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
350
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
351
+ * WAL and removal of old catalog tuples. As decoding is done in fast_forward
352
+ * mode, no changes are generated anyway.
350
353
*/
351
354
static XLogRecPtr
352
355
pg_logical_replication_slot_advance (XLogRecPtr moveto )
353
356
{
354
357
LogicalDecodingContext * ctx ;
355
358
ResourceOwner old_resowner = CurrentResourceOwner ;
356
- XLogRecPtr startlsn = MyReplicationSlot -> data . restart_lsn ;
357
- XLogRecPtr retlsn = MyReplicationSlot -> data . confirmed_flush ;
359
+ XLogRecPtr startlsn ;
360
+ XLogRecPtr retlsn ;
358
361
359
362
PG_TRY ();
360
363
{
361
- /* restart at slot's confirmed_flush */
364
+ /*
365
+ * Create our decoding context in fast_forward mode, passing start_lsn
366
+ * as InvalidXLogRecPtr, so that we start processing from my slot's
367
+ * confirmed_flush.
368
+ */
362
369
ctx = CreateDecodingContext (InvalidXLogRecPtr ,
363
370
NIL ,
364
- true,
371
+ true, /* fast_forward */
365
372
logical_read_local_xlog_page ,
366
373
NULL , NULL , NULL );
367
374
368
- CurrentResourceOwner = ResourceOwnerCreate (CurrentResourceOwner ,
369
- "logical decoding" );
375
+ /*
376
+ * Start reading at the slot's restart_lsn, which we know to point to
377
+ * a valid record.
378
+ */
379
+ startlsn = MyReplicationSlot -> data .restart_lsn ;
380
+
381
+ /* Initialize our return value in case we don't do anything */
382
+ retlsn = MyReplicationSlot -> data .confirmed_flush ;
370
383
371
384
/* invalidate non-timetravel entries */
372
385
InvalidateSystemCaches ();
373
386
374
- /* Decode until we run out of records */
375
- while ((startlsn != InvalidXLogRecPtr && startlsn < moveto ) ||
376
- (ctx -> reader -> EndRecPtr != InvalidXLogRecPtr && ctx -> reader -> EndRecPtr < moveto ))
387
+ /* Decode at least one record, until we run out of records */
388
+ while ((!XLogRecPtrIsInvalid (startlsn ) &&
389
+ startlsn < moveto ) ||
390
+ (!XLogRecPtrIsInvalid (ctx -> reader -> EndRecPtr ) &&
391
+ ctx -> reader -> EndRecPtr < moveto ))
377
392
{
378
- XLogRecord * record ;
379
393
char * errm = NULL ;
394
+ XLogRecord * record ;
380
395
396
+ /*
397
+ * Read records. No changes are generated in fast_forward mode,
398
+ * but snapbuilder/slot statuses are updated properly.
399
+ */
381
400
record = XLogReadRecord (ctx -> reader , startlsn , & errm );
382
401
if (errm )
383
402
elog (ERROR , "%s" , errm );
384
403
385
- /*
386
- * Now that we've set up the xlog reader state, subsequent calls
387
- * pass InvalidXLogRecPtr to say "continue from last record"
388
- */
404
+ /* Read sequentially from now on */
389
405
startlsn = InvalidXLogRecPtr ;
390
406
391
407
/*
392
- * The {begin_txn,change,commit_txn}_wrapper callbacks above will
393
- * store the description into our tuplestore.
408
+ * Process the record. Storage-level changes are ignored in
409
+ * fast_forward mode, but other modules (such as snapbuilder)
410
+ * might still have critical updates to do.
394
411
*/
395
- if (record != NULL )
412
+ if (record )
396
413
LogicalDecodingProcessRecord (ctx , ctx -> reader );
397
414
398
- /* Stop once the moving point wanted by caller has been reached */
415
+ /* Stop once the requested target has been reached */
399
416
if (moveto <= ctx -> reader -> EndRecPtr )
400
417
break ;
401
418
402
419
CHECK_FOR_INTERRUPTS ();
403
420
}
404
421
422
+ /*
423
+ * Logical decoding could have clobbered CurrentResourceOwner during
424
+ * transaction management, so restore the executor's value. (This is
425
+ * a kluge, but it's not worth cleaning up right now.)
426
+ */
405
427
CurrentResourceOwner = old_resowner ;
406
428
407
429
if (ctx -> reader -> EndRecPtr != InvalidXLogRecPtr )
408
430
{
409
431
LogicalConfirmReceivedLocation (moveto );
410
432
411
433
/*
412
- * If only the confirmed_flush_lsn has changed the slot won't get
434
+ * If only the confirmed_flush LSN has changed the slot won't get
413
435
* marked as dirty by the above. Callers on the walsender
414
436
* interface are expected to keep track of their own progress and
415
437
* don't need it written out. But SQL-interface users cannot
0 commit comments