@@ -169,7 +169,7 @@ get_per_buffer_data(ReadStream *stream, int16 buffer_index)
169
169
/*
170
170
* Ask the callback which block it would like us to read next, with a small
171
171
* buffer in front to allow read_stream_unget_block() to work and to allow the
172
- * fast path to work in batches .
172
+ * fast path to skip this function and work directly from the array .
173
173
*/
174
174
static inline BlockNumber
175
175
read_stream_get_block (ReadStream * stream , void * per_buffer_data )
@@ -578,13 +578,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
578
578
if (likely (stream -> fast_path ))
579
579
{
580
580
BlockNumber next_blocknum ;
581
- bool need_wait ;
582
581
583
582
/* Fast path assumptions. */
584
583
Assert (stream -> ios_in_progress == 0 );
585
584
Assert (stream -> pinned_buffers == 1 );
586
585
Assert (stream -> distance == 1 );
587
- Assert (stream -> pending_read_nblocks == 1 );
586
+ Assert (stream -> pending_read_nblocks == 0 );
588
587
Assert (stream -> per_buffer_data_size == 0 );
589
588
590
589
/* We're going to return the buffer we pinned last time. */
@@ -594,58 +593,45 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
594
593
buffer = stream -> buffers [oldest_buffer_index ];
595
594
Assert (buffer != InvalidBuffer );
596
595
597
- /*
598
- * Pin a buffer for the next call. Same buffer entry, and arbitrary
599
- * I/O entry (they're all free).
600
- */
601
- need_wait = StartReadBuffer (& stream -> ios [0 ].op ,
602
- & stream -> buffers [oldest_buffer_index ],
603
- stream -> pending_read_blocknum ,
604
- stream -> advice_enabled ?
605
- READ_BUFFERS_ISSUE_ADVICE : 0 );
606
-
607
- /* Choose the block the next call will pin. */
596
+ /* Choose the next block to pin. */
608
597
if (unlikely (stream -> blocknums_next == stream -> blocknums_count ))
609
598
read_stream_fill_blocknums (stream );
610
599
next_blocknum = stream -> blocknums [stream -> blocknums_next ++ ];
611
600
612
- /*
613
- * Fast return if the next call doesn't require I/O for the buffer we
614
- * just pinned, and we have a block number to give it as a pending
615
- * read.
616
- */
617
- if (likely (!need_wait && next_blocknum != InvalidBlockNumber ))
601
+ if (likely (next_blocknum != InvalidBlockNumber ))
618
602
{
619
- stream -> pending_read_blocknum = next_blocknum ;
620
- return buffer ;
621
- }
622
-
623
- /*
624
- * For anything more complex, set up some more state and take the slow
625
- * path next time.
626
- */
627
- stream -> fast_path = false;
603
+ /*
604
+ * Pin a buffer for the next call. Same buffer entry, and
605
+ * arbitrary I/O entry (they're all free). We don't have to
606
+ * adjust pinned_buffers because we're transferring one to caller
607
+ * but pinning one more.
608
+ */
609
+ if (likely (!StartReadBuffer (& stream -> ios [0 ].op ,
610
+ & stream -> buffers [oldest_buffer_index ],
611
+ next_blocknum ,
612
+ stream -> advice_enabled ?
613
+ READ_BUFFERS_ISSUE_ADVICE : 0 )))
614
+ {
615
+ /* Fast return. */
616
+ return buffer ;
617
+ }
628
618
629
- if (need_wait )
630
- {
631
619
/* Next call must wait for I/O for the newly pinned buffer. */
632
620
stream -> oldest_io_index = 0 ;
633
621
stream -> next_io_index = stream -> max_ios > 1 ? 1 : 0 ;
634
622
stream -> ios_in_progress = 1 ;
635
623
stream -> ios [0 ].buffer_index = oldest_buffer_index ;
636
624
stream -> seq_blocknum = next_blocknum + 1 ;
637
625
}
638
- if (next_blocknum == InvalidBlockNumber )
639
- {
640
- /* Next call hits end of stream and can't pin anything more. */
641
- stream -> distance = 0 ;
642
- stream -> pending_read_nblocks = 0 ;
643
- }
644
626
else
645
627
{
646
- /* Set up the pending read. */
647
- stream -> pending_read_blocknum = next_blocknum ;
628
+ /* No more blocks, end of stream. */
629
+ stream -> distance = 0 ;
630
+ stream -> oldest_buffer_index = stream -> next_buffer_index ;
631
+ stream -> pinned_buffers = 0 ;
648
632
}
633
+
634
+ stream -> fast_path = false;
649
635
return buffer ;
650
636
}
651
637
#endif
@@ -762,15 +748,11 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
762
748
if (stream -> ios_in_progress == 0 &&
763
749
stream -> pinned_buffers == 1 &&
764
750
stream -> distance == 1 &&
765
- stream -> pending_read_nblocks == 1 &&
751
+ stream -> pending_read_nblocks == 0 &&
766
752
stream -> per_buffer_data_size == 0 )
767
753
{
768
754
stream -> fast_path = true;
769
755
}
770
- else
771
- {
772
- stream -> fast_path = false;
773
- }
774
756
#endif
775
757
776
758
return buffer ;
@@ -790,6 +772,11 @@ read_stream_reset(ReadStream *stream)
790
772
/* Stop looking ahead. */
791
773
stream -> distance = 0 ;
792
774
775
+ /* Forget buffered block numbers and fast path state. */
776
+ stream -> blocknums_next = 0 ;
777
+ stream -> blocknums_count = 0 ;
778
+ stream -> fast_path = false;
779
+
793
780
/* Unpin anything that wasn't consumed. */
794
781
while ((buffer = read_stream_next_buffer (stream , NULL )) != InvalidBuffer )
795
782
ReleaseBuffer (buffer );
0 commit comments