21
21
#include "storage/bufmgr.h"
22
22
#include "storage/proc.h"
23
23
#include "storage/procarray.h"
24
+ #include "storage/read_stream.h"
24
25
#include "storage/smgr.h"
25
26
#include "utils/rel.h"
26
27
#include "utils/snapmgr.h"
@@ -41,6 +42,17 @@ typedef struct corrupt_items
41
42
ItemPointer tids ;
42
43
} corrupt_items ;
43
44
45
+ /* for collect_corrupt_items_read_stream_next_block */
46
+ struct collect_corrupt_items_read_stream_private
47
+ {
48
+ bool all_frozen ;
49
+ bool all_visible ;
50
+ BlockNumber current_blocknum ;
51
+ BlockNumber last_exclusive ;
52
+ Relation rel ;
53
+ Buffer vmbuffer ;
54
+ };
55
+
44
56
PG_FUNCTION_INFO_V1 (pg_visibility_map );
45
57
PG_FUNCTION_INFO_V1 (pg_visibility_map_rel );
46
58
PG_FUNCTION_INFO_V1 (pg_visibility );
@@ -478,6 +490,8 @@ collect_visibility_data(Oid relid, bool include_pd)
478
490
BlockNumber blkno ;
479
491
Buffer vmbuffer = InvalidBuffer ;
480
492
BufferAccessStrategy bstrategy = GetAccessStrategy (BAS_BULKREAD );
493
+ BlockRangeReadStreamPrivate p ;
494
+ ReadStream * stream = NULL ;
481
495
482
496
rel = relation_open (relid , AccessShareLock );
483
497
@@ -489,6 +503,20 @@ collect_visibility_data(Oid relid, bool include_pd)
489
503
info -> next = 0 ;
490
504
info -> count = nblocks ;
491
505
506
+ /* Create a stream if reading main fork. */
507
+ if (include_pd )
508
+ {
509
+ p .current_blocknum = 0 ;
510
+ p .last_exclusive = nblocks ;
511
+ stream = read_stream_begin_relation (READ_STREAM_FULL ,
512
+ bstrategy ,
513
+ rel ,
514
+ MAIN_FORKNUM ,
515
+ block_range_read_stream_cb ,
516
+ & p ,
517
+ 0 );
518
+ }
519
+
492
520
for (blkno = 0 ; blkno < nblocks ; ++ blkno )
493
521
{
494
522
int32 mapbits ;
@@ -513,8 +541,7 @@ collect_visibility_data(Oid relid, bool include_pd)
513
541
Buffer buffer ;
514
542
Page page ;
515
543
516
- buffer = ReadBufferExtended (rel , MAIN_FORKNUM , blkno , RBM_NORMAL ,
517
- bstrategy );
544
+ buffer = read_stream_next_buffer (stream , NULL );
518
545
LockBuffer (buffer , BUFFER_LOCK_SHARE );
519
546
520
547
page = BufferGetPage (buffer );
@@ -525,6 +552,12 @@ collect_visibility_data(Oid relid, bool include_pd)
525
552
}
526
553
}
527
554
555
+ if (include_pd )
556
+ {
557
+ Assert (read_stream_next_buffer (stream , NULL ) == InvalidBuffer );
558
+ read_stream_end (stream );
559
+ }
560
+
528
561
/* Clean up. */
529
562
if (vmbuffer != InvalidBuffer )
530
563
ReleaseBuffer (vmbuffer );
@@ -610,6 +643,38 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
610
643
}
611
644
}
612
645
646
+ /*
647
+ * Callback function to get next block for read stream object used in
648
+ * collect_corrupt_items() function.
649
+ */
650
+ static BlockNumber
651
+ collect_corrupt_items_read_stream_next_block (ReadStream * stream ,
652
+ void * callback_private_data ,
653
+ void * per_buffer_data )
654
+ {
655
+ struct collect_corrupt_items_read_stream_private * p = callback_private_data ;
656
+
657
+ for (; p -> current_blocknum < p -> last_exclusive ; p -> current_blocknum ++ )
658
+ {
659
+ bool check_frozen = false;
660
+ bool check_visible = false;
661
+
662
+ /* Make sure we are interruptible. */
663
+ CHECK_FOR_INTERRUPTS ();
664
+
665
+ if (p -> all_frozen && VM_ALL_FROZEN (p -> rel , p -> current_blocknum , & p -> vmbuffer ))
666
+ check_frozen = true;
667
+ if (p -> all_visible && VM_ALL_VISIBLE (p -> rel , p -> current_blocknum , & p -> vmbuffer ))
668
+ check_visible = true;
669
+ if (!check_visible && !check_frozen )
670
+ continue ;
671
+
672
+ return p -> current_blocknum ++ ;
673
+ }
674
+
675
+ return InvalidBlockNumber ;
676
+ }
677
+
613
678
/*
614
679
* Returns a list of items whose visibility map information does not match
615
680
* the status of the tuples on the page.
@@ -628,12 +693,13 @@ static corrupt_items *
628
693
collect_corrupt_items (Oid relid , bool all_visible , bool all_frozen )
629
694
{
630
695
Relation rel ;
631
- BlockNumber nblocks ;
632
696
corrupt_items * items ;
633
- BlockNumber blkno ;
634
697
Buffer vmbuffer = InvalidBuffer ;
635
698
BufferAccessStrategy bstrategy = GetAccessStrategy (BAS_BULKREAD );
636
699
TransactionId OldestXmin = InvalidTransactionId ;
700
+ struct collect_corrupt_items_read_stream_private p ;
701
+ ReadStream * stream ;
702
+ Buffer buffer ;
637
703
638
704
rel = relation_open (relid , AccessShareLock );
639
705
@@ -643,8 +709,6 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
643
709
if (all_visible )
644
710
OldestXmin = GetStrictOldestNonRemovableTransactionId (rel );
645
711
646
- nblocks = RelationGetNumberOfBlocks (rel );
647
-
648
712
/*
649
713
* Guess an initial array size. We don't expect many corrupted tuples, so
650
714
* start with a small array. This function uses the "next" field to track
@@ -658,34 +722,38 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
658
722
items -> count = 64 ;
659
723
items -> tids = palloc (items -> count * sizeof (ItemPointerData ));
660
724
725
+ p .current_blocknum = 0 ;
726
+ p .last_exclusive = RelationGetNumberOfBlocks (rel );
727
+ p .rel = rel ;
728
+ p .vmbuffer = InvalidBuffer ;
729
+ p .all_frozen = all_frozen ;
730
+ p .all_visible = all_visible ;
731
+ stream = read_stream_begin_relation (READ_STREAM_FULL ,
732
+ bstrategy ,
733
+ rel ,
734
+ MAIN_FORKNUM ,
735
+ collect_corrupt_items_read_stream_next_block ,
736
+ & p ,
737
+ 0 );
738
+
661
739
/* Loop over every block in the relation. */
662
- for ( blkno = 0 ; blkno < nblocks ; ++ blkno )
740
+ while (( buffer = read_stream_next_buffer ( stream , NULL )) != InvalidBuffer )
663
741
{
664
- bool check_frozen = false;
665
- bool check_visible = false;
666
- Buffer buffer ;
742
+ bool check_frozen = all_frozen ;
743
+ bool check_visible = all_visible ;
667
744
Page page ;
668
745
OffsetNumber offnum ,
669
746
maxoff ;
747
+ BlockNumber blkno ;
670
748
671
749
/* Make sure we are interruptible. */
672
750
CHECK_FOR_INTERRUPTS ();
673
751
674
- /* Use the visibility map to decide whether to check this page. */
675
- if (all_frozen && VM_ALL_FROZEN (rel , blkno , & vmbuffer ))
676
- check_frozen = true;
677
- if (all_visible && VM_ALL_VISIBLE (rel , blkno , & vmbuffer ))
678
- check_visible = true;
679
- if (!check_visible && !check_frozen )
680
- continue ;
681
-
682
- /* Read and lock the page. */
683
- buffer = ReadBufferExtended (rel , MAIN_FORKNUM , blkno , RBM_NORMAL ,
684
- bstrategy );
685
752
LockBuffer (buffer , BUFFER_LOCK_SHARE );
686
753
687
754
page = BufferGetPage (buffer );
688
755
maxoff = PageGetMaxOffsetNumber (page );
756
+ blkno = BufferGetBlockNumber (buffer );
689
757
690
758
/*
691
759
* The visibility map bits might have changed while we were acquiring
@@ -778,10 +846,13 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
778
846
779
847
UnlockReleaseBuffer (buffer );
780
848
}
849
+ read_stream_end (stream );
781
850
782
851
/* Clean up. */
783
852
if (vmbuffer != InvalidBuffer )
784
853
ReleaseBuffer (vmbuffer );
854
+ if (p .vmbuffer != InvalidBuffer )
855
+ ReleaseBuffer (p .vmbuffer );
785
856
relation_close (rel , AccessShareLock );
786
857
787
858
/*
0 commit comments