63
63
#include "storage/predicate.h"
64
64
#include "storage/procarray.h"
65
65
#include "storage/smgr.h"
66
+ #include "storage/spin.h"
66
67
#include "storage/standby.h"
67
68
#include "utils/datum.h"
68
69
#include "utils/inval.h"
@@ -80,12 +81,14 @@ bool synchronize_seqscans = true;
80
81
static HeapScanDesc heap_beginscan_internal (Relation relation ,
81
82
Snapshot snapshot ,
82
83
int nkeys , ScanKey key ,
84
+ ParallelHeapScanDesc parallel_scan ,
83
85
bool allow_strat ,
84
86
bool allow_sync ,
85
87
bool allow_pagemode ,
86
88
bool is_bitmapscan ,
87
89
bool is_samplescan ,
88
90
bool temp_snap );
91
+ static BlockNumber heap_parallelscan_nextpage (HeapScanDesc scan );
89
92
static HeapTuple heap_prepare_insert (Relation relation , HeapTuple tup ,
90
93
TransactionId xid , CommandId cid , int options );
91
94
static XLogRecPtr log_heap_update (Relation reln , Buffer oldbuf ,
@@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
226
229
* results for a non-MVCC snapshot, the caller must hold some higher-level
227
230
* lock that ensures the interesting tuple(s) won't change.)
228
231
*/
229
- scan -> rs_nblocks = RelationGetNumberOfBlocks (scan -> rs_rd );
232
+ if (scan -> rs_parallel != NULL )
233
+ scan -> rs_nblocks = scan -> rs_parallel -> phs_nblocks ;
234
+ else
235
+ scan -> rs_nblocks = RelationGetNumberOfBlocks (scan -> rs_rd );
230
236
231
237
/*
232
238
* If the table is large relative to NBuffers, use a bulk-read access
@@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
237
243
* behaviors, independently of the size of the table; also there is a GUC
238
244
* variable that can disable synchronized scanning.)
239
245
*
240
- * During a rescan, don't make a new strategy object if we don't have to.
246
+ * Note that heap_parallelscan_initialize has a very similar test; if you
247
+ * change this, consider changing that one, too.
241
248
*/
242
249
if (!RelationUsesLocalBuffers (scan -> rs_rd ) &&
243
250
scan -> rs_nblocks > NBuffers / 4 )
@@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
250
257
251
258
if (allow_strat )
252
259
{
260
+ /* During a rescan, keep the previous strategy object. */
253
261
if (scan -> rs_strategy == NULL )
254
262
scan -> rs_strategy = GetAccessStrategy (BAS_BULKREAD );
255
263
}
@@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
260
268
scan -> rs_strategy = NULL ;
261
269
}
262
270
263
- if (keep_startblock )
271
+ if (scan -> rs_parallel != NULL )
272
+ {
273
+ /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
274
+ scan -> rs_syncscan = scan -> rs_parallel -> phs_syncscan ;
275
+ }
276
+ else if (keep_startblock )
264
277
{
265
278
/*
266
279
* When rescanning, we want to keep the previous startblock setting,
@@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan,
496
509
tuple -> t_data = NULL ;
497
510
return ;
498
511
}
499
- page = scan -> rs_startblock ; /* first page */
512
+ if (scan -> rs_parallel != NULL )
513
+ {
514
+ page = heap_parallelscan_nextpage (scan );
515
+
516
+ /* Other processes might have already finished the scan. */
517
+ if (page == InvalidBlockNumber )
518
+ {
519
+ Assert (!BufferIsValid (scan -> rs_cbuf ));
520
+ tuple -> t_data = NULL ;
521
+ return ;
522
+ }
523
+ }
524
+ else
525
+ page = scan -> rs_startblock ; /* first page */
500
526
heapgetpage (scan , page );
501
527
lineoff = FirstOffsetNumber ; /* first offnum */
502
528
scan -> rs_inited = true;
@@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan,
519
545
}
520
546
else if (backward )
521
547
{
548
+ /* backward parallel scan not supported */
549
+ Assert (scan -> rs_parallel == NULL );
550
+
522
551
if (!scan -> rs_inited )
523
552
{
524
553
/*
@@ -669,6 +698,11 @@ heapgettup(HeapScanDesc scan,
669
698
page = scan -> rs_nblocks ;
670
699
page -- ;
671
700
}
701
+ else if (scan -> rs_parallel != NULL )
702
+ {
703
+ page = heap_parallelscan_nextpage (scan );
704
+ finished = (page == InvalidBlockNumber );
705
+ }
672
706
else
673
707
{
674
708
page ++ ;
@@ -773,7 +807,20 @@ heapgettup_pagemode(HeapScanDesc scan,
773
807
tuple -> t_data = NULL ;
774
808
return ;
775
809
}
776
- page = scan -> rs_startblock ; /* first page */
810
+ if (scan -> rs_parallel != NULL )
811
+ {
812
+ page = heap_parallelscan_nextpage (scan );
813
+
814
+ /* Other processes might have already finished the scan. */
815
+ if (page == InvalidBlockNumber )
816
+ {
817
+ Assert (!BufferIsValid (scan -> rs_cbuf ));
818
+ tuple -> t_data = NULL ;
819
+ return ;
820
+ }
821
+ }
822
+ else
823
+ page = scan -> rs_startblock ; /* first page */
777
824
heapgetpage (scan , page );
778
825
lineindex = 0 ;
779
826
scan -> rs_inited = true;
@@ -793,6 +840,9 @@ heapgettup_pagemode(HeapScanDesc scan,
793
840
}
794
841
else if (backward )
795
842
{
843
+ /* backward parallel scan not supported */
844
+ Assert (scan -> rs_parallel == NULL );
845
+
796
846
if (!scan -> rs_inited )
797
847
{
798
848
/*
@@ -932,6 +982,11 @@ heapgettup_pagemode(HeapScanDesc scan,
932
982
page = scan -> rs_nblocks ;
933
983
page -- ;
934
984
}
985
+ else if (scan -> rs_parallel != NULL )
986
+ {
987
+ page = heap_parallelscan_nextpage (scan );
988
+ finished = (page == InvalidBlockNumber );
989
+ }
935
990
else
936
991
{
937
992
page ++ ;
@@ -1341,7 +1396,7 @@ HeapScanDesc
1341
1396
heap_beginscan (Relation relation , Snapshot snapshot ,
1342
1397
int nkeys , ScanKey key )
1343
1398
{
1344
- return heap_beginscan_internal (relation , snapshot , nkeys , key ,
1399
+ return heap_beginscan_internal (relation , snapshot , nkeys , key , NULL ,
1345
1400
true, true, true, false, false, false);
1346
1401
}
1347
1402
@@ -1351,7 +1406,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
1351
1406
Oid relid = RelationGetRelid (relation );
1352
1407
Snapshot snapshot = RegisterSnapshot (GetCatalogSnapshot (relid ));
1353
1408
1354
- return heap_beginscan_internal (relation , snapshot , nkeys , key ,
1409
+ return heap_beginscan_internal (relation , snapshot , nkeys , key , NULL ,
1355
1410
true, true, true, false, false, true);
1356
1411
}
1357
1412
@@ -1360,7 +1415,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot,
1360
1415
int nkeys , ScanKey key ,
1361
1416
bool allow_strat , bool allow_sync )
1362
1417
{
1363
- return heap_beginscan_internal (relation , snapshot , nkeys , key ,
1418
+ return heap_beginscan_internal (relation , snapshot , nkeys , key , NULL ,
1364
1419
allow_strat , allow_sync , true,
1365
1420
false, false, false);
1366
1421
}
@@ -1369,7 +1424,7 @@ HeapScanDesc
1369
1424
heap_beginscan_bm (Relation relation , Snapshot snapshot ,
1370
1425
int nkeys , ScanKey key )
1371
1426
{
1372
- return heap_beginscan_internal (relation , snapshot , nkeys , key ,
1427
+ return heap_beginscan_internal (relation , snapshot , nkeys , key , NULL ,
1373
1428
false, false, true, true, false, false);
1374
1429
}
1375
1430
@@ -1378,14 +1433,15 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
1378
1433
int nkeys , ScanKey key ,
1379
1434
bool allow_strat , bool allow_sync , bool allow_pagemode )
1380
1435
{
1381
- return heap_beginscan_internal (relation , snapshot , nkeys , key ,
1436
+ return heap_beginscan_internal (relation , snapshot , nkeys , key , NULL ,
1382
1437
allow_strat , allow_sync , allow_pagemode ,
1383
1438
false, true, false);
1384
1439
}
1385
1440
1386
1441
static HeapScanDesc
1387
1442
heap_beginscan_internal (Relation relation , Snapshot snapshot ,
1388
1443
int nkeys , ScanKey key ,
1444
+ ParallelHeapScanDesc parallel_scan ,
1389
1445
bool allow_strat ,
1390
1446
bool allow_sync ,
1391
1447
bool allow_pagemode ,
@@ -1418,6 +1474,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
1418
1474
scan -> rs_allow_strat = allow_strat ;
1419
1475
scan -> rs_allow_sync = allow_sync ;
1420
1476
scan -> rs_temp_snap = temp_snap ;
1477
+ scan -> rs_parallel = parallel_scan ;
1421
1478
1422
1479
/*
1423
1480
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1473,6 +1530,25 @@ heap_rescan(HeapScanDesc scan,
1473
1530
* reinitialize scan descriptor
1474
1531
*/
1475
1532
initscan (scan , key , true);
1533
+
1534
+ /*
1535
+ * reset parallel scan, if present
1536
+ */
1537
+ if (scan -> rs_parallel != NULL )
1538
+ {
1539
+ ParallelHeapScanDesc parallel_scan ;
1540
+
1541
+ /*
1542
+ * Caller is responsible for making sure that all workers have
1543
+ * finished the scan before calling this, so it really shouldn't be
1544
+ * necessary to acquire the mutex at all. We acquire it anyway, just
1545
+ * to be tidy.
1546
+ */
1547
+ parallel_scan = scan -> rs_parallel ;
1548
+ SpinLockAcquire (& parallel_scan -> phs_mutex );
1549
+ parallel_scan -> phs_cblock = parallel_scan -> phs_startblock ;
1550
+ SpinLockRelease (& parallel_scan -> phs_mutex );
1551
+ }
1476
1552
}
1477
1553
1478
1554
/* ----------------
@@ -1531,6 +1607,154 @@ heap_endscan(HeapScanDesc scan)
1531
1607
pfree (scan );
1532
1608
}
1533
1609
1610
+ /* ----------------
1611
+ * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
1612
+ *
1613
+ * Sadly, this doesn't reduce to a constant, because the size required
1614
+ * to serialize the snapshot can vary.
1615
+ * ----------------
1616
+ */
1617
+ Size
1618
+ heap_parallelscan_estimate (Snapshot snapshot )
1619
+ {
1620
+ return add_size (offsetof(ParallelHeapScanDescData , phs_snapshot_data ),
1621
+ EstimateSnapshotSpace (snapshot ));
1622
+ }
1623
+
1624
+ /* ----------------
1625
+ * heap_parallelscan_initialize - initialize ParallelHeapScanDesc
1626
+ *
1627
+ * Must allow as many bytes of shared memory as returned by
1628
+ * heap_parallelscan_estimate. Call this just once in the leader
1629
+ * process; then, individual workers attach via heap_beginscan_parallel.
1630
+ * ----------------
1631
+ */
1632
+ void
1633
+ heap_parallelscan_initialize (ParallelHeapScanDesc target , Relation relation ,
1634
+ Snapshot snapshot )
1635
+ {
1636
+ target -> phs_relid = RelationGetRelid (relation );
1637
+ target -> phs_nblocks = RelationGetNumberOfBlocks (relation );
1638
+ /* compare phs_syncscan initialization to similar logic in initscan */
1639
+ target -> phs_syncscan = synchronize_seqscans &&
1640
+ !RelationUsesLocalBuffers (relation ) &&
1641
+ target -> phs_nblocks > NBuffers / 4 ;
1642
+ SpinLockInit (& target -> phs_mutex );
1643
+ target -> phs_cblock = InvalidBlockNumber ;
1644
+ target -> phs_startblock = InvalidBlockNumber ;
1645
+ SerializeSnapshot (snapshot , target -> phs_snapshot_data );
1646
+ }
1647
+
1648
+ /* ----------------
1649
+ * heap_beginscan_parallel - join a parallel scan
1650
+ *
1651
+ * Caller must hold a suitable lock on the correct relation.
1652
+ * ----------------
1653
+ */
1654
+ HeapScanDesc
1655
+ heap_beginscan_parallel (Relation relation , ParallelHeapScanDesc parallel_scan )
1656
+ {
1657
+ Snapshot snapshot ;
1658
+
1659
+ Assert (RelationGetRelid (relation ) == parallel_scan -> phs_relid );
1660
+ snapshot = RestoreSnapshot (parallel_scan -> phs_snapshot_data );
1661
+ RegisterSnapshot (snapshot );
1662
+
1663
+ return heap_beginscan_internal (relation , snapshot , 0 , NULL , parallel_scan ,
1664
+ true, true, true, false, false, true);
1665
+ }
1666
+
1667
+ /* ----------------
1668
+ * heap_parallelscan_nextpage - get the next page to scan
1669
+ *
1670
+ * Get the next page to scan. Even if there are no pages left to scan,
1671
+ * another backend could have grabbed a page to scan and not yet finished
1672
+ * looking at it, so it doesn't follow that the scan is done when the
1673
+ * first backend gets an InvalidBlockNumber return.
1674
+ * ----------------
1675
+ */
1676
+ static BlockNumber
1677
+ heap_parallelscan_nextpage (HeapScanDesc scan )
1678
+ {
1679
+ BlockNumber page = InvalidBlockNumber ;
1680
+ BlockNumber sync_startpage = InvalidBlockNumber ;
1681
+ BlockNumber report_page = InvalidBlockNumber ;
1682
+ ParallelHeapScanDesc parallel_scan ;
1683
+
1684
+ Assert (scan -> rs_parallel );
1685
+ parallel_scan = scan -> rs_parallel ;
1686
+
1687
+ retry :
1688
+ /* Grab the spinlock. */
1689
+ SpinLockAcquire (& parallel_scan -> phs_mutex );
1690
+
1691
+ /*
1692
+ * If the scan's startblock has not yet been initialized, we must do so
1693
+ * now. If this is not a synchronized scan, we just start at block 0, but
1694
+ * if it is a synchronized scan, we must get the starting position from
1695
+ * the synchronized scan machinery. We can't hold the spinlock while
1696
+ * doing that, though, so release the spinlock, get the information we
1697
+ * need, and retry. If nobody else has initialized the scan in the
1698
+ * meantime, we'll fill in the value we fetched on the second time
1699
+ * through.
1700
+ */
1701
+ if (parallel_scan -> phs_startblock == InvalidBlockNumber )
1702
+ {
1703
+ if (!parallel_scan -> phs_syncscan )
1704
+ parallel_scan -> phs_startblock = 0 ;
1705
+ else if (sync_startpage != InvalidBlockNumber )
1706
+ parallel_scan -> phs_startblock = sync_startpage ;
1707
+ else
1708
+ {
1709
+ SpinLockRelease (& parallel_scan -> phs_mutex );
1710
+ sync_startpage = ss_get_location (scan -> rs_rd , scan -> rs_nblocks );
1711
+ goto retry ;
1712
+ }
1713
+ parallel_scan -> phs_cblock = parallel_scan -> phs_startblock ;
1714
+ }
1715
+
1716
+ /*
1717
+ * The current block number is the next one that needs to be scanned,
1718
+ * unless it's InvalidBlockNumber already, in which case there are no more
1719
+ * blocks to scan. After remembering the current value, we must advance
1720
+ * it so that the next call to this function returns the next block to be
1721
+ * scanned.
1722
+ */
1723
+ page = parallel_scan -> phs_cblock ;
1724
+ if (page != InvalidBlockNumber )
1725
+ {
1726
+ parallel_scan -> phs_cblock ++ ;
1727
+ if (parallel_scan -> phs_cblock >= scan -> rs_nblocks )
1728
+ parallel_scan -> phs_cblock = 0 ;
1729
+ if (parallel_scan -> phs_cblock == parallel_scan -> phs_startblock )
1730
+ {
1731
+ parallel_scan -> phs_cblock = InvalidBlockNumber ;
1732
+ report_page = parallel_scan -> phs_startblock ;
1733
+ }
1734
+ }
1735
+
1736
+ /* Release the lock. */
1737
+ SpinLockRelease (& parallel_scan -> phs_mutex );
1738
+
1739
+ /*
1740
+ * Report scan location. Normally, we report the current page number.
1741
+ * When we reach the end of the scan, though, we report the starting page,
1742
+ * not the ending page, just so the starting positions for later scans
1743
+ * doesn't slew backwards. We only report the position at the end of the
1744
+ * scan once, though: subsequent callers will have report nothing, since
1745
+ * they will have page == InvalidBlockNumber.
1746
+ */
1747
+ if (scan -> rs_syncscan )
1748
+ {
1749
+ if (report_page == InvalidBlockNumber )
1750
+ report_page = page ;
1751
+ if (report_page != InvalidBlockNumber )
1752
+ ss_report_location (scan -> rs_rd , report_page );
1753
+ }
1754
+
1755
+ return page ;
1756
+ }
1757
+
1534
1758
/* ----------------
1535
1759
* heap_getnext - retrieve next tuple in scan
1536
1760
*
0 commit comments