Skip to content

Commit dd38ff2

Browse files
committed
Fix recovery_prefetch with low maintenance_io_concurrency.
We should process completed IOs *before* trying to start more, so that it is always possible to decode one more record when the decoded record queue is empty, even if maintenance_io_concurrency is set so low that a single earlier WAL record might have saturated the IO queue. That bug was hidden because the effect of maintenance_io_concurrency was arbitrarily clamped to be at least 2. Fix the ordering, and also remove that clamp. We need a special case for 0, which is now treated the same as recovery_prefetch=off, but otherwise the number is used directly. This allows for testing with 1, which would have made the problem obvious in simple test scenarios. Also add an explicit error message for missing contrecords. It was a bit strange that we didn't report an error already, and became a latent bug with prefetching, since the internal state that tracks aborted contrecords would not survive retrying, as revealed by 026_overwrite_contrecord.pl with this adjustment. Reporting an error prevents that. Back-patch to 15. Reported-by: Justin Pryzby <pryzby@telsasoft.com> Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/20220831140128.GS31833%40telsasoft.com
1 parent 144cefa commit dd38ff2

File tree

3 files changed

+56
-23
lines changed

3 files changed

+56
-23
lines changed

src/backend/access/transam/xlogprefetcher.c

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@
7272
int recovery_prefetch = RECOVERY_PREFETCH_TRY;
7373

7474
#ifdef USE_PREFETCH
75-
#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
75+
#define RecoveryPrefetchEnabled() \
76+
(recovery_prefetch != RECOVERY_PREFETCH_OFF && \
77+
maintenance_io_concurrency > 0)
7678
#else
7779
#define RecoveryPrefetchEnabled() false
7880
#endif
@@ -983,6 +985,7 @@ XLogRecord *
983985
XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
984986
{
985987
DecodedXLogRecord *record;
988+
XLogRecPtr replayed_up_to;
986989

987990
/*
988991
* See if it's time to reset the prefetching machinery, because a relevant
@@ -998,7 +1001,8 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
9981001

9991002
if (RecoveryPrefetchEnabled())
10001003
{
1001-
max_inflight = Max(maintenance_io_concurrency, 2);
1004+
Assert(maintenance_io_concurrency > 0);
1005+
max_inflight = maintenance_io_concurrency;
10021006
max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
10031007
}
10041008
else
@@ -1016,14 +1020,34 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10161020
}
10171021

10181022
/*
1019-
* Release last returned record, if there is one. We need to do this so
1020-
* that we can check for empty decode queue accurately.
1023+
* Release last returned record, if there is one, as it's now been
1024+
* replayed.
10211025
*/
1022-
XLogReleasePreviousRecord(prefetcher->reader);
1026+
replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
10231027

1024-
/* If there's nothing queued yet, then start prefetching. */
1028+
/*
1029+
* Can we drop any filters yet? If we were waiting for a relation to be
1030+
* created or extended, it is now OK to access blocks in the covered
1031+
* range.
1032+
*/
1033+
XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1034+
1035+
/*
1036+
* All IO initiated by earlier WAL is now completed. This might trigger
1037+
* further prefetching.
1038+
*/
1039+
lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1040+
1041+
/*
1042+
* If there's nothing queued yet, then start prefetching to cause at least
1043+
* one record to be queued.
1044+
*/
10251045
if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1046+
{
1047+
Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1048+
Assert(lrq_completed(prefetcher->streaming_read) == 0);
10261049
lrq_prefetch(prefetcher->streaming_read);
1050+
}
10271051

10281052
/* Read the next record. */
10291053
record = XLogNextRecord(prefetcher->reader, errmsg);
@@ -1037,12 +1061,13 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10371061
Assert(record == prefetcher->reader->record);
10381062

10391063
/*
1040-
* Can we drop any prefetch filters yet, given the record we're about to
1041-
* return? This assumes that any records with earlier LSNs have been
1042-
* replayed, so if we were waiting for a relation to be created or
1043-
* extended, it is now OK to access blocks in the covered range.
1064+
* If maintenance_io_concurrency is set very low, we might have started
1065+
* prefetching some but not all of the blocks referenced in the record
1066+
* we're about to return. Forget about the rest of the blocks in this
1067+
* record by dropping the prefetcher's reference to it.
10441068
*/
1045-
XLogPrefetcherCompleteFilters(prefetcher, record->lsn);
1069+
if (record == prefetcher->record)
1070+
prefetcher->record = NULL;
10461071

10471072
/*
10481073
* See if it's time to compute some statistics, because enough WAL has
@@ -1051,13 +1076,6 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10511076
if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
10521077
XLogPrefetcherComputeStats(prefetcher);
10531078

1054-
/*
1055-
* The caller is about to replay this record, so we can now report that
1056-
* all IO initiated because of early WAL must be finished. This may
1057-
* trigger more readahead.
1058-
*/
1059-
lrq_complete_lsn(prefetcher->streaming_read, record->lsn);
1060-
10611079
Assert(record == prefetcher->reader->record);
10621080

10631081
return &record->header;

src/backend/access/transam/xlogreader.c

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,22 +275,24 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
275275
}
276276

277277
/*
278-
* See if we can release the last record that was returned by
279-
* XLogNextRecord(), if any, to free up space.
278+
* Release the last record that was returned by XLogNextRecord(), if any, to
279+
* free up space. Returns the LSN past the end of the record.
280280
*/
281-
void
281+
XLogRecPtr
282282
XLogReleasePreviousRecord(XLogReaderState *state)
283283
{
284284
DecodedXLogRecord *record;
285+
XLogRecPtr next_lsn;
285286

286287
if (!state->record)
287-
return;
288+
return InvalidXLogRecPtr;
288289

289290
/*
290291
* Remove it from the decoded record queue. It must be the oldest item
291292
* decoded, decode_queue_head.
292293
*/
293294
record = state->record;
295+
next_lsn = record->next_lsn;
294296
Assert(record == state->decode_queue_head);
295297
state->record = NULL;
296298
state->decode_queue_head = record->next;
@@ -336,6 +338,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
336338
state->decode_buffer_tail = state->decode_buffer;
337339
}
338340
}
341+
342+
return next_lsn;
339343
}
340344

341345
/*
@@ -906,6 +910,17 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
906910
*/
907911
state->abortedRecPtr = RecPtr;
908912
state->missingContrecPtr = targetPagePtr;
913+
914+
/*
915+
* If we got here without reporting an error, report one now so that
916+
* XLogPrefetcherReadRecord() doesn't bring us back a second time and
917+
* clobber the above state. Otherwise, the existing error takes
918+
* precedence.
919+
*/
920+
if (!state->errormsg_buf[0])
921+
report_invalid_record(state,
922+
"missing contrecord at %X/%X",
923+
LSN_FORMAT_ARGS(RecPtr));
909924
}
910925

911926
if (decoded && decoded->oversized)

src/include/access/xlogreader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
363363
char **errormsg);
364364

365365
/* Release the previously returned record, if necessary. */
366-
extern void XLogReleasePreviousRecord(XLogReaderState *state);
366+
extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state);
367367

368368
/* Try to read ahead, if there is data and space. */
369369
extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state,

0 commit comments

Comments
 (0)