Skip to content

Commit 96bd494

Browse files
alvherreArseny Shermichaelpq
committed
Rewrite comments in replication slot advance implementation
The code added by 9c7d06d was a bit obscure; clarify that by rewriting the comments. Lack of clarity has already caused bugs, so it's a worthy goal. Co-authored-by: Arseny Sher <a.sher@postgrespro.ru> Co-authored-by: Michaël Paquier <michael@paquier.xyz> Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org> Reviewed-by: Petr Jelínek <petr.jelinek@2ndquadrant.com> Discussion: https://postgr.es/m/87y3fgoyrn.fsf@ars-thinkpad
1 parent dc961e5 commit 96bd494

File tree

2 files changed

+54
-29
lines changed

2 files changed

+54
-29
lines changed

src/backend/replication/logical/logical.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin,
338338
* that, see below).
339339
*
340340
* output_plugin_options
341-
* contains options passed to the output plugin.
341+
* options passed to the output plugin.
342+
*
343+
* fast_forward
344+
* bypass the generation of logical changes.
342345
*
343346
* read_page, prepare_write, do_write, update_progress
344347
* callbacks that have to be filled to perform the use-case dependent,

src/backend/replication/slotfuncs.c

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
317317
}
318318

319319
/*
320-
* Helper function for advancing physical replication slot forward.
321-
* The LSN position to move to is compared simply to the slot's
322-
* restart_lsn, knowing that any position older than that would be
323-
* removed by successive checkpoints.
320+
* Helper function for advancing our physical replication slot forward.
321+
*
322+
* The LSN position to move to is compared simply to the slot's restart_lsn,
323+
* knowing that any position older than that would be removed by successive
324+
* checkpoints.
324325
*/
325326
static XLogRecPtr
326327
pg_physical_replication_slot_advance(XLogRecPtr moveto)
@@ -340,76 +341,97 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
340341
}
341342

342343
/*
343-
* Helper function for advancing logical replication slot forward.
344+
* Helper function for advancing our logical replication slot forward.
345+
*
344346
* The slot's restart_lsn is used as start point for reading records,
345347
* while confirmed_lsn is used as base point for the decoding context.
346-
* The LSN position to move to is checked by doing a per-record scan and
347-
* logical decoding which makes sure that confirmed_lsn is updated to a
348-
* LSN which allows the future slot consumer to get consistent logical
349-
* changes.
348+
*
349+
* We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
350+
* because we need to digest WAL to advance restart_lsn allowing to recycle
351+
* WAL and removal of old catalog tuples. As decoding is done in fast_forward
352+
* mode, no changes are generated anyway.
350353
*/
351354
static XLogRecPtr
352355
pg_logical_replication_slot_advance(XLogRecPtr moveto)
353356
{
354357
LogicalDecodingContext *ctx;
355358
ResourceOwner old_resowner = CurrentResourceOwner;
356-
XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
357-
XLogRecPtr retlsn = MyReplicationSlot->data.confirmed_flush;
359+
XLogRecPtr startlsn;
360+
XLogRecPtr retlsn;
358361

359362
PG_TRY();
360363
{
361-
/* restart at slot's confirmed_flush */
364+
/*
365+
* Create our decoding context in fast_forward mode, passing start_lsn
366+
* as InvalidXLogRecPtr, so that we start processing from my slot's
367+
* confirmed_flush.
368+
*/
362369
ctx = CreateDecodingContext(InvalidXLogRecPtr,
363370
NIL,
364-
true,
371+
true, /* fast_forward */
365372
logical_read_local_xlog_page,
366373
NULL, NULL, NULL);
367374

368-
CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
369-
"logical decoding");
375+
/*
376+
* Start reading at the slot's restart_lsn, which we know to point to
377+
* a valid record.
378+
*/
379+
startlsn = MyReplicationSlot->data.restart_lsn;
380+
381+
/* Initialize our return value in case we don't do anything */
382+
retlsn = MyReplicationSlot->data.confirmed_flush;
370383

371384
/* invalidate non-timetravel entries */
372385
InvalidateSystemCaches();
373386

374-
/* Decode until we run out of records */
375-
while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
376-
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
387+
/* Decode at least one record, until we run out of records */
388+
while ((!XLogRecPtrIsInvalid(startlsn) &&
389+
startlsn < moveto) ||
390+
(!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
391+
ctx->reader->EndRecPtr < moveto))
377392
{
378-
XLogRecord *record;
379393
char *errm = NULL;
394+
XLogRecord *record;
380395

396+
/*
397+
* Read records. No changes are generated in fast_forward mode,
398+
* but snapbuilder/slot statuses are updated properly.
399+
*/
381400
record = XLogReadRecord(ctx->reader, startlsn, &errm);
382401
if (errm)
383402
elog(ERROR, "%s", errm);
384403

385-
/*
386-
* Now that we've set up the xlog reader state, subsequent calls
387-
* pass InvalidXLogRecPtr to say "continue from last record"
388-
*/
404+
/* Read sequentially from now on */
389405
startlsn = InvalidXLogRecPtr;
390406

391407
/*
392-
* The {begin_txn,change,commit_txn}_wrapper callbacks above will
393-
* store the description into our tuplestore.
408+
* Process the record. Storage-level changes are ignored in
409+
* fast_forward mode, but other modules (such as snapbuilder)
410+
* might still have critical updates to do.
394411
*/
395-
if (record != NULL)
412+
if (record)
396413
LogicalDecodingProcessRecord(ctx, ctx->reader);
397414

398-
/* Stop once the moving point wanted by caller has been reached */
415+
/* Stop once the requested target has been reached */
399416
if (moveto <= ctx->reader->EndRecPtr)
400417
break;
401418

402419
CHECK_FOR_INTERRUPTS();
403420
}
404421

422+
/*
423+
* Logical decoding could have clobbered CurrentResourceOwner during
424+
* transaction management, so restore the executor's value. (This is
425+
* a kluge, but it's not worth cleaning up right now.)
426+
*/
405427
CurrentResourceOwner = old_resowner;
406428

407429
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
408430
{
409431
LogicalConfirmReceivedLocation(moveto);
410432

411433
/*
412-
* If only the confirmed_flush_lsn has changed the slot won't get
434+
* If only the confirmed_flush LSN has changed the slot won't get
413435
* marked as dirty by the above. Callers on the walsender
414436
* interface are expected to keep track of their own progress and
415437
* don't need it written out. But SQL-interface users cannot

0 commit comments

Comments
 (0)