Skip to content

Commit ee7ca55

Browse files
committed
Add a C API for parallel heap scans.
Using this API, one backend can set up a ParallelHeapScanDesc to which multiple backends can then attach. Each tuple in the relation will be returned to exactly one of the scanning backends. Only forward scans are supported, and rescans must be carefully coordinated. This is not exposed to the planner or executor yet. The original version of this code was written by me. Amit Kapila reviewed it, tested it, and improved it, including adding support for synchronized scans, per review comments from Jeff Davis. Extensive testing of this and related patches was performed by Haribabu Kommi. Final cleanup of this patch by me.
1 parent b0b0d84 commit ee7ca55

File tree

3 files changed

+261
-11
lines changed

3 files changed

+261
-11
lines changed

src/backend/access/heap/heapam.c

+234-10
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
#include "storage/predicate.h"
6464
#include "storage/procarray.h"
6565
#include "storage/smgr.h"
66+
#include "storage/spin.h"
6667
#include "storage/standby.h"
6768
#include "utils/datum.h"
6869
#include "utils/inval.h"
@@ -80,12 +81,14 @@ bool synchronize_seqscans = true;
8081
static HeapScanDesc heap_beginscan_internal(Relation relation,
8182
Snapshot snapshot,
8283
int nkeys, ScanKey key,
84+
ParallelHeapScanDesc parallel_scan,
8385
bool allow_strat,
8486
bool allow_sync,
8587
bool allow_pagemode,
8688
bool is_bitmapscan,
8789
bool is_samplescan,
8890
bool temp_snap);
91+
static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
8992
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
9093
TransactionId xid, CommandId cid, int options);
9194
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
226229
* results for a non-MVCC snapshot, the caller must hold some higher-level
227230
* lock that ensures the interesting tuple(s) won't change.)
228231
*/
229-
scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
232+
if (scan->rs_parallel != NULL)
233+
scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
234+
else
235+
scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
230236

231237
/*
232238
* If the table is large relative to NBuffers, use a bulk-read access
@@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
237243
* behaviors, independently of the size of the table; also there is a GUC
238244
* variable that can disable synchronized scanning.)
239245
*
240-
* During a rescan, don't make a new strategy object if we don't have to.
246+
* Note that heap_parallelscan_initialize has a very similar test; if you
247+
* change this, consider changing that one, too.
241248
*/
242249
if (!RelationUsesLocalBuffers(scan->rs_rd) &&
243250
scan->rs_nblocks > NBuffers / 4)
@@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
250257

251258
if (allow_strat)
252259
{
260+
/* During a rescan, keep the previous strategy object. */
253261
if (scan->rs_strategy == NULL)
254262
scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD);
255263
}
@@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
260268
scan->rs_strategy = NULL;
261269
}
262270

263-
if (keep_startblock)
271+
if (scan->rs_parallel != NULL)
272+
{
273+
/* For parallel scan, believe whatever ParallelHeapScanDesc says. */
274+
scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
275+
}
276+
else if (keep_startblock)
264277
{
265278
/*
266279
* When rescanning, we want to keep the previous startblock setting,
@@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan,
496509
tuple->t_data = NULL;
497510
return;
498511
}
499-
page = scan->rs_startblock; /* first page */
512+
if (scan->rs_parallel != NULL)
513+
{
514+
page = heap_parallelscan_nextpage(scan);
515+
516+
/* Other processes might have already finished the scan. */
517+
if (page == InvalidBlockNumber)
518+
{
519+
Assert(!BufferIsValid(scan->rs_cbuf));
520+
tuple->t_data = NULL;
521+
return;
522+
}
523+
}
524+
else
525+
page = scan->rs_startblock; /* first page */
500526
heapgetpage(scan, page);
501527
lineoff = FirstOffsetNumber; /* first offnum */
502528
scan->rs_inited = true;
@@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan,
519545
}
520546
else if (backward)
521547
{
548+
/* backward parallel scan not supported */
549+
Assert(scan->rs_parallel == NULL);
550+
522551
if (!scan->rs_inited)
523552
{
524553
/*
@@ -669,6 +698,11 @@ heapgettup(HeapScanDesc scan,
669698
page = scan->rs_nblocks;
670699
page--;
671700
}
701+
else if (scan->rs_parallel != NULL)
702+
{
703+
page = heap_parallelscan_nextpage(scan);
704+
finished = (page == InvalidBlockNumber);
705+
}
672706
else
673707
{
674708
page++;
@@ -773,7 +807,20 @@ heapgettup_pagemode(HeapScanDesc scan,
773807
tuple->t_data = NULL;
774808
return;
775809
}
776-
page = scan->rs_startblock; /* first page */
810+
if (scan->rs_parallel != NULL)
811+
{
812+
page = heap_parallelscan_nextpage(scan);
813+
814+
/* Other processes might have already finished the scan. */
815+
if (page == InvalidBlockNumber)
816+
{
817+
Assert(!BufferIsValid(scan->rs_cbuf));
818+
tuple->t_data = NULL;
819+
return;
820+
}
821+
}
822+
else
823+
page = scan->rs_startblock; /* first page */
777824
heapgetpage(scan, page);
778825
lineindex = 0;
779826
scan->rs_inited = true;
@@ -793,6 +840,9 @@ heapgettup_pagemode(HeapScanDesc scan,
793840
}
794841
else if (backward)
795842
{
843+
/* backward parallel scan not supported */
844+
Assert(scan->rs_parallel == NULL);
845+
796846
if (!scan->rs_inited)
797847
{
798848
/*
@@ -932,6 +982,11 @@ heapgettup_pagemode(HeapScanDesc scan,
932982
page = scan->rs_nblocks;
933983
page--;
934984
}
985+
else if (scan->rs_parallel != NULL)
986+
{
987+
page = heap_parallelscan_nextpage(scan);
988+
finished = (page == InvalidBlockNumber);
989+
}
935990
else
936991
{
937992
page++;
@@ -1341,7 +1396,7 @@ HeapScanDesc
13411396
heap_beginscan(Relation relation, Snapshot snapshot,
13421397
int nkeys, ScanKey key)
13431398
{
1344-
return heap_beginscan_internal(relation, snapshot, nkeys, key,
1399+
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
13451400
true, true, true, false, false, false);
13461401
}
13471402

@@ -1351,7 +1406,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
13511406
Oid relid = RelationGetRelid(relation);
13521407
Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
13531408

1354-
return heap_beginscan_internal(relation, snapshot, nkeys, key,
1409+
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
13551410
true, true, true, false, false, true);
13561411
}
13571412

@@ -1360,7 +1415,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot,
13601415
int nkeys, ScanKey key,
13611416
bool allow_strat, bool allow_sync)
13621417
{
1363-
return heap_beginscan_internal(relation, snapshot, nkeys, key,
1418+
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
13641419
allow_strat, allow_sync, true,
13651420
false, false, false);
13661421
}
@@ -1369,7 +1424,7 @@ HeapScanDesc
13691424
heap_beginscan_bm(Relation relation, Snapshot snapshot,
13701425
int nkeys, ScanKey key)
13711426
{
1372-
return heap_beginscan_internal(relation, snapshot, nkeys, key,
1427+
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
13731428
false, false, true, true, false, false);
13741429
}
13751430

@@ -1378,14 +1433,15 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
13781433
int nkeys, ScanKey key,
13791434
bool allow_strat, bool allow_sync, bool allow_pagemode)
13801435
{
1381-
return heap_beginscan_internal(relation, snapshot, nkeys, key,
1436+
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
13821437
allow_strat, allow_sync, allow_pagemode,
13831438
false, true, false);
13841439
}
13851440

13861441
static HeapScanDesc
13871442
heap_beginscan_internal(Relation relation, Snapshot snapshot,
13881443
int nkeys, ScanKey key,
1444+
ParallelHeapScanDesc parallel_scan,
13891445
bool allow_strat,
13901446
bool allow_sync,
13911447
bool allow_pagemode,
@@ -1418,6 +1474,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
14181474
scan->rs_allow_strat = allow_strat;
14191475
scan->rs_allow_sync = allow_sync;
14201476
scan->rs_temp_snap = temp_snap;
1477+
scan->rs_parallel = parallel_scan;
14211478

14221479
/*
14231480
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1473,6 +1530,25 @@ heap_rescan(HeapScanDesc scan,
14731530
* reinitialize scan descriptor
14741531
*/
14751532
initscan(scan, key, true);
1533+
1534+
/*
1535+
* reset parallel scan, if present
1536+
*/
1537+
if (scan->rs_parallel != NULL)
1538+
{
1539+
ParallelHeapScanDesc parallel_scan;
1540+
1541+
/*
1542+
* Caller is responsible for making sure that all workers have
1543+
* finished the scan before calling this, so it really shouldn't be
1544+
* necessary to acquire the mutex at all. We acquire it anyway, just
1545+
* to be tidy.
1546+
*/
1547+
parallel_scan = scan->rs_parallel;
1548+
SpinLockAcquire(&parallel_scan->phs_mutex);
1549+
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
1550+
SpinLockRelease(&parallel_scan->phs_mutex);
1551+
}
14761552
}
14771553

14781554
/* ----------------
@@ -1531,6 +1607,154 @@ heap_endscan(HeapScanDesc scan)
15311607
pfree(scan);
15321608
}
15331609

1610+
/* ----------------
1611+
* heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
1612+
*
1613+
* Sadly, this doesn't reduce to a constant, because the size required
1614+
* to serialize the snapshot can vary.
1615+
* ----------------
1616+
*/
1617+
Size
1618+
heap_parallelscan_estimate(Snapshot snapshot)
1619+
{
1620+
return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data),
1621+
EstimateSnapshotSpace(snapshot));
1622+
}
1623+
1624+
/* ----------------
1625+
* heap_parallelscan_initialize - initialize ParallelHeapScanDesc
1626+
*
1627+
* Must allow as many bytes of shared memory as returned by
1628+
* heap_parallelscan_estimate. Call this just once in the leader
1629+
* process; then, individual workers attach via heap_beginscan_parallel.
1630+
* ----------------
1631+
*/
1632+
void
1633+
heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
1634+
Snapshot snapshot)
1635+
{
1636+
target->phs_relid = RelationGetRelid(relation);
1637+
target->phs_nblocks = RelationGetNumberOfBlocks(relation);
1638+
/* compare phs_syncscan initialization to similar logic in initscan */
1639+
target->phs_syncscan = synchronize_seqscans &&
1640+
!RelationUsesLocalBuffers(relation) &&
1641+
target->phs_nblocks > NBuffers / 4;
1642+
SpinLockInit(&target->phs_mutex);
1643+
target->phs_cblock = InvalidBlockNumber;
1644+
target->phs_startblock = InvalidBlockNumber;
1645+
SerializeSnapshot(snapshot, target->phs_snapshot_data);
1646+
}
1647+
1648+
/* ----------------
1649+
* heap_beginscan_parallel - join a parallel scan
1650+
*
1651+
* Caller must hold a suitable lock on the correct relation.
1652+
* ----------------
1653+
*/
1654+
HeapScanDesc
1655+
heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
1656+
{
1657+
Snapshot snapshot;
1658+
1659+
Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
1660+
snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
1661+
RegisterSnapshot(snapshot);
1662+
1663+
return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
1664+
true, true, true, false, false, true);
1665+
}
1666+
1667+
/* ----------------
1668+
* heap_parallelscan_nextpage - get the next page to scan
1669+
*
1670+
* Get the next page to scan. Even if there are no pages left to scan,
1671+
* another backend could have grabbed a page to scan and not yet finished
1672+
* looking at it, so it doesn't follow that the scan is done when the
1673+
* first backend gets an InvalidBlockNumber return.
1674+
* ----------------
1675+
*/
1676+
static BlockNumber
1677+
heap_parallelscan_nextpage(HeapScanDesc scan)
1678+
{
1679+
BlockNumber page = InvalidBlockNumber;
1680+
BlockNumber sync_startpage = InvalidBlockNumber;
1681+
BlockNumber report_page = InvalidBlockNumber;
1682+
ParallelHeapScanDesc parallel_scan;
1683+
1684+
Assert(scan->rs_parallel);
1685+
parallel_scan = scan->rs_parallel;
1686+
1687+
retry:
1688+
/* Grab the spinlock. */
1689+
SpinLockAcquire(&parallel_scan->phs_mutex);
1690+
1691+
/*
1692+
* If the scan's startblock has not yet been initialized, we must do so
1693+
* now. If this is not a synchronized scan, we just start at block 0, but
1694+
* if it is a synchronized scan, we must get the starting position from
1695+
* the synchronized scan machinery. We can't hold the spinlock while
1696+
* doing that, though, so release the spinlock, get the information we
1697+
* need, and retry. If nobody else has initialized the scan in the
1698+
* meantime, we'll fill in the value we fetched on the second time
1699+
* through.
1700+
*/
1701+
if (parallel_scan->phs_startblock == InvalidBlockNumber)
1702+
{
1703+
if (!parallel_scan->phs_syncscan)
1704+
parallel_scan->phs_startblock = 0;
1705+
else if (sync_startpage != InvalidBlockNumber)
1706+
parallel_scan->phs_startblock = sync_startpage;
1707+
else
1708+
{
1709+
SpinLockRelease(&parallel_scan->phs_mutex);
1710+
sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
1711+
goto retry;
1712+
}
1713+
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
1714+
}
1715+
1716+
/*
1717+
* The current block number is the next one that needs to be scanned,
1718+
* unless it's InvalidBlockNumber already, in which case there are no more
1719+
* blocks to scan. After remembering the current value, we must advance
1720+
* it so that the next call to this function returns the next block to be
1721+
* scanned.
1722+
*/
1723+
page = parallel_scan->phs_cblock;
1724+
if (page != InvalidBlockNumber)
1725+
{
1726+
parallel_scan->phs_cblock++;
1727+
if (parallel_scan->phs_cblock >= scan->rs_nblocks)
1728+
parallel_scan->phs_cblock = 0;
1729+
if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
1730+
{
1731+
parallel_scan->phs_cblock = InvalidBlockNumber;
1732+
report_page = parallel_scan->phs_startblock;
1733+
}
1734+
}
1735+
1736+
/* Release the lock. */
1737+
SpinLockRelease(&parallel_scan->phs_mutex);
1738+
1739+
/*
1740+
* Report scan location. Normally, we report the current page number.
1741+
* When we reach the end of the scan, though, we report the starting page,
1742+
* not the ending page, just so the starting positions for later scans
1743+
* doesn't slew backwards. We only report the position at the end of the
1744+
* scan once, though: subsequent callers will have report nothing, since
1745+
* they will have page == InvalidBlockNumber.
1746+
*/
1747+
if (scan->rs_syncscan)
1748+
{
1749+
if (report_page == InvalidBlockNumber)
1750+
report_page = page;
1751+
if (report_page != InvalidBlockNumber)
1752+
ss_report_location(scan->rs_rd, report_page);
1753+
}
1754+
1755+
return page;
1756+
}
1757+
15341758
/* ----------------
15351759
* heap_getnext - retrieve next tuple in scan
15361760
*

0 commit comments

Comments
 (0)