Skip to content

Commit 043799f

Browse files
Use streaming read I/O in heap amcheck
Instead of directly invoking ReadBuffer() for each unskippable block in the heap relation, verify_heapam() now uses the read stream API to acquire the next buffer to check for corruption. Author: Matheus Alcantara <matheusssilv97@gmail.com> Co-authored-by: Melanie Plageman <melanieplageman@gmail.com> Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com> Reviewed-by: Kirill Reshke <reshkekirill@gmail.com> Reviewed-by: jian he <jian.universality@gmail.com> Discussion: https://postgr.es/m/flat/CAFY6G8eLyz7%2BsccegZYFj%3D5tAUR-GZ9uEq4Ch5gvwKqUwb_hCA%40mail.gmail.com
1 parent 4623d71 commit 043799f

File tree

2 files changed

+109
-25
lines changed

2 files changed

+109
-25
lines changed

contrib/amcheck/verify_heapam.c

Lines changed: 108 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "miscadmin.h"
2626
#include "storage/bufmgr.h"
2727
#include "storage/procarray.h"
28+
#include "storage/read_stream.h"
2829
#include "utils/builtins.h"
2930
#include "utils/fmgroids.h"
3031
#include "utils/rel.h"
@@ -118,7 +119,10 @@ typedef struct HeapCheckContext
118119
Relation valid_toast_index;
119120
int num_toast_indexes;
120121

121-
/* Values for iterating over pages in the relation */
122+
/*
123+
* Values for iterating over pages in the relation. `blkno` is the most
124+
* recent block in the buffer yielded by the read stream API.
125+
*/
122126
BlockNumber blkno;
123127
BufferAccessStrategy bstrategy;
124128
Buffer buffer;
@@ -153,7 +157,32 @@ typedef struct HeapCheckContext
153157
Tuplestorestate *tupstore;
154158
} HeapCheckContext;
155159

160+
/*
161+
* The per-relation data provided to the read stream API for heap amcheck to
162+
* use in its callback for the SKIP_PAGES_ALL_FROZEN and
163+
* SKIP_PAGES_ALL_VISIBLE options.
164+
*/
165+
typedef struct HeapCheckReadStreamData
166+
{
167+
/*
168+
* `range` is used by all SkipPages options. SKIP_PAGES_NONE uses the
169+
* default read stream callback, block_range_read_stream_cb(), which takes
170+
* a BlockRangeReadStreamPrivate as its callback_private_data. `range`
171+
* keeps track of the current block number across
172+
* read_stream_next_buffer() invocations.
173+
*/
174+
BlockRangeReadStreamPrivate range;
175+
SkipPages skip_option;
176+
Relation rel;
177+
Buffer *vmbuffer;
178+
} HeapCheckReadStreamData;
179+
180+
156181
/* Internal implementation */
182+
static BlockNumber heapcheck_read_stream_next_unskippable(ReadStream *stream,
183+
void *callback_private_data,
184+
void *per_buffer_data);
185+
157186
static void check_tuple(HeapCheckContext *ctx,
158187
bool *xmin_commit_status_ok,
159188
XidCommitStatus *xmin_commit_status);
@@ -231,6 +260,11 @@ verify_heapam(PG_FUNCTION_ARGS)
231260
BlockNumber last_block;
232261
BlockNumber nblocks;
233262
const char *skip;
263+
ReadStream *stream;
264+
int stream_flags;
265+
ReadStreamBlockNumberCB stream_cb;
266+
void *stream_data;
267+
HeapCheckReadStreamData stream_skip_data;
234268

235269
/* Check supplied arguments */
236270
if (PG_ARGISNULL(0))
@@ -404,7 +438,35 @@ verify_heapam(PG_FUNCTION_ARGS)
404438
if (TransactionIdIsNormal(ctx.relfrozenxid))
405439
ctx.oldest_xid = ctx.relfrozenxid;
406440

407-
for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
441+
/* Now that `ctx` is set up, set up the read stream */
442+
stream_skip_data.range.current_blocknum = first_block;
443+
stream_skip_data.range.last_exclusive = last_block + 1;
444+
stream_skip_data.skip_option = skip_option;
445+
stream_skip_data.rel = ctx.rel;
446+
stream_skip_data.vmbuffer = &vmbuffer;
447+
448+
if (skip_option == SKIP_PAGES_NONE)
449+
{
450+
stream_cb = block_range_read_stream_cb;
451+
stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL;
452+
stream_data = &stream_skip_data.range;
453+
}
454+
else
455+
{
456+
stream_cb = heapcheck_read_stream_next_unskippable;
457+
stream_flags = READ_STREAM_DEFAULT;
458+
stream_data = &stream_skip_data;
459+
}
460+
461+
stream = read_stream_begin_relation(stream_flags,
462+
ctx.bstrategy,
463+
ctx.rel,
464+
MAIN_FORKNUM,
465+
stream_cb,
466+
stream_data,
467+
0);
468+
469+
while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
408470
{
409471
OffsetNumber maxoff;
410472
OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +479,11 @@ verify_heapam(PG_FUNCTION_ARGS)
417479

418480
memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
419481

420-
/* Optionally skip over all-frozen or all-visible blocks */
421-
if (skip_option != SKIP_PAGES_NONE)
422-
{
423-
int32 mapbits;
424-
425-
mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
426-
&vmbuffer);
427-
if (skip_option == SKIP_PAGES_ALL_FROZEN)
428-
{
429-
if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
430-
continue;
431-
}
432-
433-
if (skip_option == SKIP_PAGES_ALL_VISIBLE)
434-
{
435-
if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
436-
continue;
437-
}
438-
}
439-
440-
/* Read and lock the next page. */
441-
ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
442-
RBM_NORMAL, ctx.bstrategy);
482+
/* Lock the next page. */
483+
Assert(BufferIsValid(ctx.buffer));
443484
LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
485+
486+
ctx.blkno = BufferGetBlockNumber(ctx.buffer);
444487
ctx.page = BufferGetPage(ctx.buffer);
445488

446489
/* Perform tuple checks */
@@ -799,6 +842,10 @@ verify_heapam(PG_FUNCTION_ARGS)
799842
break;
800843
}
801844

845+
/* Ensure that the stream is completely read */
846+
Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
847+
read_stream_end(stream);
848+
802849
if (vmbuffer != InvalidBuffer)
803850
ReleaseBuffer(vmbuffer);
804851

@@ -815,6 +862,42 @@ verify_heapam(PG_FUNCTION_ARGS)
815862
PG_RETURN_NULL();
816863
}
817864

865+
/*
866+
* Heap amcheck's read stream callback for getting the next unskippable block.
867+
* This callback is only used when 'all-visible' or 'all-frozen' is provided
868+
* as the skip option to verify_heapam(). With the default 'none',
869+
* block_range_read_stream_cb() is used instead.
870+
*/
871+
static BlockNumber
872+
heapcheck_read_stream_next_unskippable(ReadStream *stream,
873+
void *callback_private_data,
874+
void *per_buffer_data)
875+
{
876+
HeapCheckReadStreamData *p = callback_private_data;
877+
878+
/* Loops over [current_blocknum, last_exclusive) blocks */
879+
for (BlockNumber i; (i = p->range.current_blocknum++) < p->range.last_exclusive;)
880+
{
881+
uint8 mapbits = visibilitymap_get_status(p->rel, i, p->vmbuffer);
882+
883+
if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
884+
{
885+
if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
886+
continue;
887+
}
888+
889+
if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
890+
{
891+
if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
892+
continue;
893+
}
894+
895+
return i;
896+
}
897+
898+
return InvalidBlockNumber;
899+
}
900+
818901
/*
819902
* Shared internal implementation for report_corruption and
820903
* report_toast_corruption.

src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,6 +1169,7 @@ HeadlineJsonState
11691169
HeadlineParsedText
11701170
HeadlineWordEntry
11711171
HeapCheckContext
1172+
HeapCheckReadStreamData
11721173
HeapPageFreeze
11731174
HeapScanDesc
11741175
HeapTuple

0 commit comments

Comments
 (0)