@@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
223
223
* ----------------------------------------------------------------
224
224
*/
225
225
226
+ /*
227
+ * Streaming read API callback for parallel sequential scans. Returns the next
228
+ * block the caller wants from the read stream or InvalidBlockNumber when done.
229
+ */
230
+ static BlockNumber
231
+ heap_scan_stream_read_next_parallel (ReadStream * stream ,
232
+ void * callback_private_data ,
233
+ void * per_buffer_data )
234
+ {
235
+ HeapScanDesc scan = (HeapScanDesc ) callback_private_data ;
236
+
237
+ Assert (ScanDirectionIsForward (scan -> rs_dir ));
238
+ Assert (scan -> rs_base .rs_parallel );
239
+
240
+ if (unlikely (!scan -> rs_inited ))
241
+ {
242
+ /* parallel scan */
243
+ table_block_parallelscan_startblock_init (scan -> rs_base .rs_rd ,
244
+ scan -> rs_parallelworkerdata ,
245
+ (ParallelBlockTableScanDesc ) scan -> rs_base .rs_parallel );
246
+
247
+ /* may return InvalidBlockNumber if there are no more blocks */
248
+ scan -> rs_prefetch_block = table_block_parallelscan_nextpage (scan -> rs_base .rs_rd ,
249
+ scan -> rs_parallelworkerdata ,
250
+ (ParallelBlockTableScanDesc ) scan -> rs_base .rs_parallel );
251
+ scan -> rs_inited = true;
252
+ }
253
+ else
254
+ {
255
+ scan -> rs_prefetch_block = table_block_parallelscan_nextpage (scan -> rs_base .rs_rd ,
256
+ scan -> rs_parallelworkerdata , (ParallelBlockTableScanDesc )
257
+ scan -> rs_base .rs_parallel );
258
+ }
259
+
260
+ return scan -> rs_prefetch_block ;
261
+ }
262
+
263
+ /*
264
+ * Streaming read API callback for serial sequential and TID range scans.
265
+ * Returns the next block the caller wants from the read stream or
266
+ * InvalidBlockNumber when done.
267
+ */
268
+ static BlockNumber
269
+ heap_scan_stream_read_next_serial (ReadStream * stream ,
270
+ void * callback_private_data ,
271
+ void * per_buffer_data )
272
+ {
273
+ HeapScanDesc scan = (HeapScanDesc ) callback_private_data ;
274
+
275
+ if (unlikely (!scan -> rs_inited ))
276
+ {
277
+ scan -> rs_prefetch_block = heapgettup_initial_block (scan , scan -> rs_dir );
278
+ scan -> rs_inited = true;
279
+ }
280
+ else
281
+ scan -> rs_prefetch_block = heapgettup_advance_block (scan ,
282
+ scan -> rs_prefetch_block ,
283
+ scan -> rs_dir );
284
+
285
+ return scan -> rs_prefetch_block ;
286
+ }
287
+
226
288
/* ----------------
227
289
* initscan - scan code common to heap_beginscan and heap_rescan
228
290
* ----------------
@@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
325
387
scan -> rs_cbuf = InvalidBuffer ;
326
388
scan -> rs_cblock = InvalidBlockNumber ;
327
389
390
+ /*
391
+ * Initialize to ForwardScanDirection because it is most common and
392
+ * because heap scans go forward before going backward (e.g. CURSORs).
393
+ */
394
+ scan -> rs_dir = ForwardScanDirection ;
395
+ scan -> rs_prefetch_block = InvalidBlockNumber ;
396
+
328
397
/* page-at-a-time fields are always invalid when not rs_inited */
329
398
330
399
/*
@@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
508
577
/*
509
578
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
510
579
*
511
- * Read the next block of the scan relation into a buffer and pin that buffer
512
- * before saving it in the scan descriptor.
580
+ * Read the next block of the scan relation from the read stream and save it
581
+ * in the scan descriptor. It is already pinned .
513
582
*/
514
583
static inline void
515
584
heap_fetch_next_buffer (HeapScanDesc scan , ScanDirection dir )
516
585
{
586
+ Assert (scan -> rs_read_stream );
587
+
517
588
/* release previous scan buffer, if any */
518
589
if (BufferIsValid (scan -> rs_cbuf ))
519
590
{
@@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
528
599
*/
529
600
CHECK_FOR_INTERRUPTS ();
530
601
531
- if (unlikely (!scan -> rs_inited ))
602
+ /*
603
+ * If the scan direction is changing, reset the prefetch block to the
604
+ * current block. Otherwise, we will incorrectly prefetch the blocks
605
+ * between the prefetch block and the current block again before
606
+ * prefetching blocks in the new, correct scan direction.
607
+ */
608
+ if (unlikely (scan -> rs_dir != dir ))
532
609
{
533
- scan -> rs_cblock = heapgettup_initial_block (scan , dir );
610
+ scan -> rs_prefetch_block = scan -> rs_cblock ;
611
+ read_stream_reset (scan -> rs_read_stream );
612
+ }
534
613
535
- /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
536
- Assert (scan -> rs_cblock != InvalidBlockNumber ||
537
- !BufferIsValid (scan -> rs_cbuf ));
614
+ scan -> rs_dir = dir ;
538
615
539
- scan -> rs_inited = true;
540
- }
541
- else
542
- scan -> rs_cblock = heapgettup_advance_block (scan , scan -> rs_cblock ,
543
- dir );
544
-
545
- /* read block if valid */
546
- if (BlockNumberIsValid (scan -> rs_cblock ))
547
- scan -> rs_cbuf = ReadBufferExtended (scan -> rs_base .rs_rd , MAIN_FORKNUM ,
548
- scan -> rs_cblock , RBM_NORMAL ,
549
- scan -> rs_strategy );
616
+ scan -> rs_cbuf = read_stream_next_buffer (scan -> rs_read_stream , NULL );
617
+ if (BufferIsValid (scan -> rs_cbuf ))
618
+ scan -> rs_cblock = BufferGetBlockNumber (scan -> rs_cbuf );
550
619
}
551
620
552
621
/*
@@ -560,34 +629,18 @@ static pg_noinline BlockNumber
560
629
heapgettup_initial_block (HeapScanDesc scan , ScanDirection dir )
561
630
{
562
631
Assert (!scan -> rs_inited );
632
+ Assert (scan -> rs_base .rs_parallel == NULL );
563
633
564
634
/* When there are no pages to scan, return InvalidBlockNumber */
565
635
if (scan -> rs_nblocks == 0 || scan -> rs_numblocks == 0 )
566
636
return InvalidBlockNumber ;
567
637
568
638
if (ScanDirectionIsForward (dir ))
569
639
{
570
- /* serial scan */
571
- if (scan -> rs_base .rs_parallel == NULL )
572
- return scan -> rs_startblock ;
573
- else
574
- {
575
- /* parallel scan */
576
- table_block_parallelscan_startblock_init (scan -> rs_base .rs_rd ,
577
- scan -> rs_parallelworkerdata ,
578
- (ParallelBlockTableScanDesc ) scan -> rs_base .rs_parallel );
579
-
580
- /* may return InvalidBlockNumber if there are no more blocks */
581
- return table_block_parallelscan_nextpage (scan -> rs_base .rs_rd ,
582
- scan -> rs_parallelworkerdata ,
583
- (ParallelBlockTableScanDesc ) scan -> rs_base .rs_parallel );
584
- }
640
+ return scan -> rs_startblock ;
585
641
}
586
642
else
587
643
{
588
- /* backward parallel scan not supported */
589
- Assert (scan -> rs_base .rs_parallel == NULL );
590
-
591
644
/*
592
645
* Disable reporting to syncscan logic in a backwards scan; it's not
593
646
* very likely anyone else is doing the same thing at the same time,
@@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
699
752
static inline BlockNumber
700
753
heapgettup_advance_block (HeapScanDesc scan , BlockNumber block , ScanDirection dir )
701
754
{
702
- if (ScanDirectionIsForward (dir ))
755
+ Assert (scan -> rs_base .rs_parallel == NULL );
756
+
757
+ if (likely (ScanDirectionIsForward (dir )))
703
758
{
704
- if (scan -> rs_base .rs_parallel == NULL )
705
- {
706
- block ++ ;
759
+ block ++ ;
707
760
708
- /* wrap back to the start of the heap */
709
- if (block >= scan -> rs_nblocks )
710
- block = 0 ;
761
+ /* wrap back to the start of the heap */
762
+ if (block >= scan -> rs_nblocks )
763
+ block = 0 ;
711
764
712
- /*
713
- * Report our new scan position for synchronization purposes. We
714
- * don't do that when moving backwards, however. That would just
715
- * mess up any other forward-moving scanners.
716
- *
717
- * Note: we do this before checking for end of scan so that the
718
- * final state of the position hint is back at the start of the
719
- * rel. That's not strictly necessary, but otherwise when you run
720
- * the same query multiple times the starting position would shift
721
- * a little bit backwards on every invocation, which is confusing.
722
- * We don't guarantee any specific ordering in general, though.
723
- */
724
- if (scan -> rs_base .rs_flags & SO_ALLOW_SYNC )
725
- ss_report_location (scan -> rs_base .rs_rd , block );
726
-
727
- /* we're done if we're back at where we started */
728
- if (block == scan -> rs_startblock )
729
- return InvalidBlockNumber ;
765
+ /*
766
+ * Report our new scan position for synchronization purposes. We don't
767
+ * do that when moving backwards, however. That would just mess up any
768
+ * other forward-moving scanners.
769
+ *
770
+ * Note: we do this before checking for end of scan so that the final
771
+ * state of the position hint is back at the start of the rel. That's
772
+ * not strictly necessary, but otherwise when you run the same query
773
+ * multiple times the starting position would shift a little bit
774
+ * backwards on every invocation, which is confusing. We don't
775
+ * guarantee any specific ordering in general, though.
776
+ */
777
+ if (scan -> rs_base .rs_flags & SO_ALLOW_SYNC )
778
+ ss_report_location (scan -> rs_base .rs_rd , block );
730
779
731
- /* check if the limit imposed by heap_setscanlimits() is met */
732
- if (scan -> rs_numblocks != InvalidBlockNumber )
733
- {
734
- if (-- scan -> rs_numblocks == 0 )
735
- return InvalidBlockNumber ;
736
- }
780
+ /* we're done if we're back at where we started */
781
+ if (block == scan -> rs_startblock )
782
+ return InvalidBlockNumber ;
737
783
738
- return block ;
739
- }
740
- else
784
+ /* check if the limit imposed by heap_setscanlimits() is met */
785
+ if (scan -> rs_numblocks != InvalidBlockNumber )
741
786
{
742
- return table_block_parallelscan_nextpage (scan -> rs_base .rs_rd ,
743
- scan -> rs_parallelworkerdata , (ParallelBlockTableScanDesc )
744
- scan -> rs_base .rs_parallel );
787
+ if (-- scan -> rs_numblocks == 0 )
788
+ return InvalidBlockNumber ;
745
789
}
790
+
791
+ return block ;
746
792
}
747
793
else
748
794
{
@@ -879,6 +925,7 @@ heapgettup(HeapScanDesc scan,
879
925
880
926
scan -> rs_cbuf = InvalidBuffer ;
881
927
scan -> rs_cblock = InvalidBlockNumber ;
928
+ scan -> rs_prefetch_block = InvalidBlockNumber ;
882
929
tuple -> t_data = NULL ;
883
930
scan -> rs_inited = false;
884
931
}
@@ -974,6 +1021,7 @@ heapgettup_pagemode(HeapScanDesc scan,
974
1021
ReleaseBuffer (scan -> rs_cbuf );
975
1022
scan -> rs_cbuf = InvalidBuffer ;
976
1023
scan -> rs_cblock = InvalidBlockNumber ;
1024
+ scan -> rs_prefetch_block = InvalidBlockNumber ;
977
1025
tuple -> t_data = NULL ;
978
1026
scan -> rs_inited = false;
979
1027
}
@@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot,
1069
1117
1070
1118
initscan (scan , key , false);
1071
1119
1120
+ scan -> rs_read_stream = NULL ;
1121
+
1122
+ /*
1123
+ * Set up a read stream for sequential scans and TID range scans. This
1124
+ * should be done after initscan() because initscan() allocates the
1125
+ * BufferAccessStrategy object passed to the streaming read API.
1126
+ */
1127
+ if (scan -> rs_base .rs_flags & SO_TYPE_SEQSCAN ||
1128
+ scan -> rs_base .rs_flags & SO_TYPE_TIDRANGESCAN )
1129
+ {
1130
+ ReadStreamBlockNumberCB cb ;
1131
+
1132
+ if (scan -> rs_base .rs_parallel )
1133
+ cb = heap_scan_stream_read_next_parallel ;
1134
+ else
1135
+ cb = heap_scan_stream_read_next_serial ;
1136
+
1137
+ scan -> rs_read_stream = read_stream_begin_relation (READ_STREAM_SEQUENTIAL ,
1138
+ scan -> rs_strategy ,
1139
+ scan -> rs_base .rs_rd ,
1140
+ MAIN_FORKNUM ,
1141
+ cb ,
1142
+ scan ,
1143
+ 0 );
1144
+ }
1145
+
1146
+
1072
1147
return (TableScanDesc ) scan ;
1073
1148
}
1074
1149
@@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
1111
1186
1112
1187
Assert (scan -> rs_empty_tuples_pending == 0 );
1113
1188
1189
+ /*
1190
+ * The read stream is reset on rescan. This must be done before
1191
+ * initscan(), as some state referred to by read_stream_reset() is reset
1192
+ * in initscan().
1193
+ */
1194
+ if (scan -> rs_read_stream )
1195
+ read_stream_reset (scan -> rs_read_stream );
1196
+
1114
1197
/*
1115
1198
* reinitialize scan descriptor
1116
1199
*/
@@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan)
1135
1218
1136
1219
Assert (scan -> rs_empty_tuples_pending == 0 );
1137
1220
1221
+ /*
1222
+ * Must free the read stream before freeing the BufferAccessStrategy.
1223
+ */
1224
+ if (scan -> rs_read_stream )
1225
+ read_stream_end (scan -> rs_read_stream );
1226
+
1138
1227
/*
1139
1228
* decrement relation reference count and free scan descriptor storage
1140
1229
*/
0 commit comments