Skip to content

Commit b7b0f3f

Browse files
committed
Use streaming I/O in sequential scans.
Instead of calling ReadBuffer() for each block, heap sequential scans and TID range scans now use the streaming API introduced in b5a9b18. Author: Melanie Plageman <melanieplageman@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/flat/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg%3DgEQ%40mail.gmail.com
1 parent 6ed83d5 commit b7b0f3f

File tree

2 files changed

+177
-73
lines changed

2 files changed

+177
-73
lines changed

src/backend/access/heap/heapam.c

Lines changed: 162 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
223223
* ----------------------------------------------------------------
224224
*/
225225

226+
/*
227+
* Streaming read API callback for parallel sequential scans. Returns the next
228+
* block the caller wants from the read stream or InvalidBlockNumber when done.
229+
*/
230+
static BlockNumber
231+
heap_scan_stream_read_next_parallel(ReadStream *stream,
232+
void *callback_private_data,
233+
void *per_buffer_data)
234+
{
235+
HeapScanDesc scan = (HeapScanDesc) callback_private_data;
236+
237+
Assert(ScanDirectionIsForward(scan->rs_dir));
238+
Assert(scan->rs_base.rs_parallel);
239+
240+
if (unlikely(!scan->rs_inited))
241+
{
242+
/* parallel scan */
243+
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
244+
scan->rs_parallelworkerdata,
245+
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
246+
247+
/* may return InvalidBlockNumber if there are no more blocks */
248+
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
249+
scan->rs_parallelworkerdata,
250+
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
251+
scan->rs_inited = true;
252+
}
253+
else
254+
{
255+
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
256+
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
257+
scan->rs_base.rs_parallel);
258+
}
259+
260+
return scan->rs_prefetch_block;
261+
}
262+
263+
/*
264+
* Streaming read API callback for serial sequential and TID range scans.
265+
* Returns the next block the caller wants from the read stream or
266+
* InvalidBlockNumber when done.
267+
*/
268+
static BlockNumber
269+
heap_scan_stream_read_next_serial(ReadStream *stream,
270+
void *callback_private_data,
271+
void *per_buffer_data)
272+
{
273+
HeapScanDesc scan = (HeapScanDesc) callback_private_data;
274+
275+
if (unlikely(!scan->rs_inited))
276+
{
277+
scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
278+
scan->rs_inited = true;
279+
}
280+
else
281+
scan->rs_prefetch_block = heapgettup_advance_block(scan,
282+
scan->rs_prefetch_block,
283+
scan->rs_dir);
284+
285+
return scan->rs_prefetch_block;
286+
}
287+
226288
/* ----------------
227289
* initscan - scan code common to heap_beginscan and heap_rescan
228290
* ----------------
@@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
325387
scan->rs_cbuf = InvalidBuffer;
326388
scan->rs_cblock = InvalidBlockNumber;
327389

390+
/*
391+
* Initialize to ForwardScanDirection because it is most common and
392+
* because heap scans go forward before going backward (e.g. CURSORs).
393+
*/
394+
scan->rs_dir = ForwardScanDirection;
395+
scan->rs_prefetch_block = InvalidBlockNumber;
396+
328397
/* page-at-a-time fields are always invalid when not rs_inited */
329398

330399
/*
@@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
508577
/*
509578
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
510579
*
511-
* Read the next block of the scan relation into a buffer and pin that buffer
512-
* before saving it in the scan descriptor.
580+
* Read the next block of the scan relation from the read stream and save it
581+
* in the scan descriptor. It is already pinned.
513582
*/
514583
static inline void
515584
heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
516585
{
586+
Assert(scan->rs_read_stream);
587+
517588
/* release previous scan buffer, if any */
518589
if (BufferIsValid(scan->rs_cbuf))
519590
{
@@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
528599
*/
529600
CHECK_FOR_INTERRUPTS();
530601

531-
if (unlikely(!scan->rs_inited))
602+
/*
603+
* If the scan direction is changing, reset the prefetch block to the
604+
* current block. Otherwise, we will incorrectly prefetch the blocks
605+
* between the prefetch block and the current block again before
606+
* prefetching blocks in the new, correct scan direction.
607+
*/
608+
if (unlikely(scan->rs_dir != dir))
532609
{
533-
scan->rs_cblock = heapgettup_initial_block(scan, dir);
610+
scan->rs_prefetch_block = scan->rs_cblock;
611+
read_stream_reset(scan->rs_read_stream);
612+
}
534613

535-
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
536-
Assert(scan->rs_cblock != InvalidBlockNumber ||
537-
!BufferIsValid(scan->rs_cbuf));
614+
scan->rs_dir = dir;
538615

539-
scan->rs_inited = true;
540-
}
541-
else
542-
scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock,
543-
dir);
544-
545-
/* read block if valid */
546-
if (BlockNumberIsValid(scan->rs_cblock))
547-
scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
548-
scan->rs_cblock, RBM_NORMAL,
549-
scan->rs_strategy);
616+
scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
617+
if (BufferIsValid(scan->rs_cbuf))
618+
scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
550619
}
551620

552621
/*
@@ -560,34 +629,18 @@ static pg_noinline BlockNumber
560629
heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
561630
{
562631
Assert(!scan->rs_inited);
632+
Assert(scan->rs_base.rs_parallel == NULL);
563633

564634
/* When there are no pages to scan, return InvalidBlockNumber */
565635
if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
566636
return InvalidBlockNumber;
567637

568638
if (ScanDirectionIsForward(dir))
569639
{
570-
/* serial scan */
571-
if (scan->rs_base.rs_parallel == NULL)
572-
return scan->rs_startblock;
573-
else
574-
{
575-
/* parallel scan */
576-
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
577-
scan->rs_parallelworkerdata,
578-
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
579-
580-
/* may return InvalidBlockNumber if there are no more blocks */
581-
return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
582-
scan->rs_parallelworkerdata,
583-
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
584-
}
640+
return scan->rs_startblock;
585641
}
586642
else
587643
{
588-
/* backward parallel scan not supported */
589-
Assert(scan->rs_base.rs_parallel == NULL);
590-
591644
/*
592645
* Disable reporting to syncscan logic in a backwards scan; it's not
593646
* very likely anyone else is doing the same thing at the same time,
@@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
699752
static inline BlockNumber
700753
heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
701754
{
702-
if (ScanDirectionIsForward(dir))
755+
Assert(scan->rs_base.rs_parallel == NULL);
756+
757+
if (likely(ScanDirectionIsForward(dir)))
703758
{
704-
if (scan->rs_base.rs_parallel == NULL)
705-
{
706-
block++;
759+
block++;
707760

708-
/* wrap back to the start of the heap */
709-
if (block >= scan->rs_nblocks)
710-
block = 0;
761+
/* wrap back to the start of the heap */
762+
if (block >= scan->rs_nblocks)
763+
block = 0;
711764

712-
/*
713-
* Report our new scan position for synchronization purposes. We
714-
* don't do that when moving backwards, however. That would just
715-
* mess up any other forward-moving scanners.
716-
*
717-
* Note: we do this before checking for end of scan so that the
718-
* final state of the position hint is back at the start of the
719-
* rel. That's not strictly necessary, but otherwise when you run
720-
* the same query multiple times the starting position would shift
721-
* a little bit backwards on every invocation, which is confusing.
722-
* We don't guarantee any specific ordering in general, though.
723-
*/
724-
if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
725-
ss_report_location(scan->rs_base.rs_rd, block);
726-
727-
/* we're done if we're back at where we started */
728-
if (block == scan->rs_startblock)
729-
return InvalidBlockNumber;
765+
/*
766+
* Report our new scan position for synchronization purposes. We don't
767+
* do that when moving backwards, however. That would just mess up any
768+
* other forward-moving scanners.
769+
*
770+
* Note: we do this before checking for end of scan so that the final
771+
* state of the position hint is back at the start of the rel. That's
772+
* not strictly necessary, but otherwise when you run the same query
773+
* multiple times the starting position would shift a little bit
774+
* backwards on every invocation, which is confusing. We don't
775+
* guarantee any specific ordering in general, though.
776+
*/
777+
if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
778+
ss_report_location(scan->rs_base.rs_rd, block);
730779

731-
/* check if the limit imposed by heap_setscanlimits() is met */
732-
if (scan->rs_numblocks != InvalidBlockNumber)
733-
{
734-
if (--scan->rs_numblocks == 0)
735-
return InvalidBlockNumber;
736-
}
780+
/* we're done if we're back at where we started */
781+
if (block == scan->rs_startblock)
782+
return InvalidBlockNumber;
737783

738-
return block;
739-
}
740-
else
784+
/* check if the limit imposed by heap_setscanlimits() is met */
785+
if (scan->rs_numblocks != InvalidBlockNumber)
741786
{
742-
return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
743-
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
744-
scan->rs_base.rs_parallel);
787+
if (--scan->rs_numblocks == 0)
788+
return InvalidBlockNumber;
745789
}
790+
791+
return block;
746792
}
747793
else
748794
{
@@ -879,6 +925,7 @@ heapgettup(HeapScanDesc scan,
879925

880926
scan->rs_cbuf = InvalidBuffer;
881927
scan->rs_cblock = InvalidBlockNumber;
928+
scan->rs_prefetch_block = InvalidBlockNumber;
882929
tuple->t_data = NULL;
883930
scan->rs_inited = false;
884931
}
@@ -974,6 +1021,7 @@ heapgettup_pagemode(HeapScanDesc scan,
9741021
ReleaseBuffer(scan->rs_cbuf);
9751022
scan->rs_cbuf = InvalidBuffer;
9761023
scan->rs_cblock = InvalidBlockNumber;
1024+
scan->rs_prefetch_block = InvalidBlockNumber;
9771025
tuple->t_data = NULL;
9781026
scan->rs_inited = false;
9791027
}
@@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot,
10691117

10701118
initscan(scan, key, false);
10711119

1120+
scan->rs_read_stream = NULL;
1121+
1122+
/*
1123+
* Set up a read stream for sequential scans and TID range scans. This
1124+
* should be done after initscan() because initscan() allocates the
1125+
* BufferAccessStrategy object passed to the streaming read API.
1126+
*/
1127+
if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
1128+
scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
1129+
{
1130+
ReadStreamBlockNumberCB cb;
1131+
1132+
if (scan->rs_base.rs_parallel)
1133+
cb = heap_scan_stream_read_next_parallel;
1134+
else
1135+
cb = heap_scan_stream_read_next_serial;
1136+
1137+
scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
1138+
scan->rs_strategy,
1139+
scan->rs_base.rs_rd,
1140+
MAIN_FORKNUM,
1141+
cb,
1142+
scan,
1143+
0);
1144+
}
1145+
1146+
10721147
return (TableScanDesc) scan;
10731148
}
10741149

@@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
11111186

11121187
Assert(scan->rs_empty_tuples_pending == 0);
11131188

1189+
/*
1190+
* The read stream is reset on rescan. This must be done before
1191+
* initscan(), as some state referred to by read_stream_reset() is reset
1192+
* in initscan().
1193+
*/
1194+
if (scan->rs_read_stream)
1195+
read_stream_reset(scan->rs_read_stream);
1196+
11141197
/*
11151198
* reinitialize scan descriptor
11161199
*/
@@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan)
11351218

11361219
Assert(scan->rs_empty_tuples_pending == 0);
11371220

1221+
/*
1222+
* Must free the read stream before freeing the BufferAccessStrategy.
1223+
*/
1224+
if (scan->rs_read_stream)
1225+
read_stream_end(scan->rs_read_stream);
1226+
11381227
/*
11391228
* decrement relation reference count and free scan descriptor storage
11401229
*/

src/include/access/heapam.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "storage/bufpage.h"
2626
#include "storage/dsm.h"
2727
#include "storage/lockdefs.h"
28+
#include "storage/read_stream.h"
2829
#include "storage/shm_toc.h"
2930
#include "utils/relcache.h"
3031
#include "utils/snapshot.h"
@@ -70,6 +71,20 @@ typedef struct HeapScanDescData
7071

7172
HeapTupleData rs_ctup; /* current tuple in scan, if any */
7273

74+
/* For scans that stream reads */
75+
ReadStream *rs_read_stream;
76+
77+
/*
78+
* For sequential scans and TID range scans to stream reads. The read
79+
* stream is allocated at the beginning of the scan and reset on rescan or
80+
* when the scan direction changes. The scan direction is saved each time
81+
* a new page is requested. If the scan direction changes from one page to
82+
* the next, the read stream releases all previously pinned buffers and
83+
* resets the prefetch block.
84+
*/
85+
ScanDirection rs_dir;
86+
BlockNumber rs_prefetch_block;
87+
7388
/*
7489
* For parallel scans to store page allocation data. NULL when not
7590
* performing a parallel scan.

0 commit comments

Comments
 (0)