diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 537913d1bb..5792040bc1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -520,12 +520,14 @@ heapgettup(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWork pbscanwork = + (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work; table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -720,9 +722,11 @@ heapgettup(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWork pbscanwork = + (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work; page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); finished = (page == InvalidBlockNumber); } else @@ -834,12 +838,14 @@ heapgettup_pagemode(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWork pbscanwork = + (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work; table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -1019,9 +1025,11 @@ heapgettup_pagemode(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWork pbscanwork = + (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work; page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); finished = (page == InvalidBlockNumber); } else @@ -1155,6 +1163,8 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_base.rs_nkeys = nkeys; scan->rs_base.rs_flags = flags; scan->rs_base.rs_parallel = parallel_scan; + scan->rs_base.rs_parallel_work = + palloc0(sizeof(ParallelBlockTableScanWorkData)); scan->rs_strategy = NULL; /* set in initscan */ /* diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index c814733b22..d388a744b9 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -25,10 +25,15 @@ #include "access/tableam.h" #include "access/xact.h" #include "optimizer/plancat.h" +#include "port/pg_bitutils.h" #include "storage/bufmgr.h" #include "storage/shmem.h" #include "storage/smgr.h" +/* The number of I/O chunks we try to break a parallel seqscan down into */ +#define PARALLEL_SEQSCAN_NCHUNKS 2048 +/* Ramp down size of allocations when we've only this number of chunks left */ +#define PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS 64 /* GUC variables */ char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; @@ -404,10 +409,21 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) * to set the startblock once. */ void -table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan) +table_block_parallelscan_startblock_init(Relation rel, + ParallelBlockTableScanWork workspace, + ParallelBlockTableScanDesc pbscan) { BlockNumber sync_startpage = InvalidBlockNumber; + /* Reset the state we use for controlling allocation size. */ + memset(workspace, 0, sizeof(*workspace)); + + StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE, + "pg_nextpower2_32 may be too small for non-standard BlockNumber width"); + + workspace->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks / + PARALLEL_SEQSCAN_NCHUNKS, 1)); + retry: /* Grab the spinlock. */ SpinLockAcquire(&pbscan->phs_mutex); @@ -447,12 +463,35 @@ retry: * backend gets an InvalidBlockNumber return. */ BlockNumber -table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan) +table_block_parallelscan_nextpage(Relation rel, + ParallelBlockTableScanWork workspace, + ParallelBlockTableScanDesc pbscan) { BlockNumber page; uint64 nallocated; /* + * The logic below allocates block numbers out to parallel workers in a + * way that each worker will receive a set of consecutive block numbers to + * scan. Earlier versions of this would allocate the next highest block + * number to the next worker to call this function. This would generally + * result in the scan being divided up in stripes of 1 block in size. + * Some operating systems would not detect the sequential I/O pattern due + * to each backend being a different process which could result in poor + * performance due to inefficient or no readahead. To work around this + * issue, we now allocate a range of block numbers for each worker and + * when they come back for another block, we give them the next one in + * that range until the range is complete, we then allocate them another + * range of blocks to scan and return the first block number from that + * range. + * + * Here we name these ranges of blocks "chunks". The initial size of + * these chunks is determined in table_block_parallelscan_startblock_init + * based on the size of the relation. Towards the end of the scan, we + * start making reductions in the size of the chunks in order to attempt + * to divide the remaining work evenly over all workers as evenly as + * possible. + * * phs_nallocated tracks how many pages have been allocated to workers * already. When phs_nallocated >= rs_nblocks, all blocks have been * allocated. @@ -467,7 +506,48 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca * The actual page to return is calculated by adding the counter to the * starting block number, modulo nblocks. */ - nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1); + + /* + * First check if we have any remaining blocks in a previous chunk for + * this worker. We must consume all of the blocks from that before we + * allocate another chunk for the worker. + */ + if (workspace->phsw_alloc_remaining > 0) + { + /* + * Give them the next page in the range and update the remaining pages + */ + nallocated = ++workspace->phsw_nallocated; + workspace->phsw_alloc_remaining--; + } + else + { + /* + * When we've only got PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS chunks + * remaining in the scan, we half the chunk size. Since we reduce + * the chunk size here, we'll hit this again after doing + * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS at the new size. After a few + * iterations of this, we'll end up doing the last few blocks with the + * chunk size set to 1. + */ + if (workspace->phsw_chunk_size > 1 && + workspace->phsw_nallocated > pbscan->phs_nblocks - + (workspace->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS)) + workspace->phsw_chunk_size >>= 1; + + nallocated = workspace->phsw_nallocated = + pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, + workspace->phsw_chunk_size); + +#ifdef TBPN_DEBUG + elog(NOTICE, "chunksize = %u, allocated = %u", workspace->phsw_chunk_size, nallocated); +#endif + /* + * Set the remaining number of pages in this chunk so that subsequent + * calls from this worker continue on with this chunk until it's done. + */ + workspace->phsw_alloc_remaining = workspace->phsw_chunk_size - 1; + } if (nallocated >= pbscan->phs_nblocks) page = InvalidBlockNumber; /* all blocks have been allocated */ else diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 6f0258831f..78d563aab1 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -42,9 +42,9 @@ typedef struct TableScanDescData */ uint32 rs_flags; + void *rs_parallel_work; struct ParallelTableScanDescData *rs_parallel; /* parallel scan * information */ - } TableScanDescData; typedef struct TableScanDescData *TableScanDesc; @@ -81,6 +81,18 @@ typedef struct ParallelBlockTableScanDescData } ParallelBlockTableScanDescData; typedef struct ParallelBlockTableScanDescData *ParallelBlockTableScanDesc; +/* + * Per backend state for parallel table sacan, for block oriented storage. + */ +typedef struct ParallelBlockTableScanWorkData +{ + uint64 phsw_nallocated; /* Current # of pages into the scan */ + uint32 phsw_alloc_remaining; /* Pages left for this allocation */ + uint32 phsw_chunk_size; /* The number of pages to take in each + * I/O chunk for the scan */ +} ParallelBlockTableScanWorkData; +typedef struct ParallelBlockTableScanWorkData *ParallelBlockTableScanWork; + /* * Base class for fetches from a table via an index. This is the base-class * for such scans, which needs to be embedded in the respective struct for diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index eb18739c36..f0cefd7e26 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1790,8 +1790,10 @@ extern Size table_block_parallelscan_initialize(Relation rel, extern void table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan); extern BlockNumber table_block_parallelscan_nextpage(Relation rel, + ParallelBlockTableScanWork pbscanwork, ParallelBlockTableScanDesc pbscan); extern void table_block_parallelscan_startblock_init(Relation rel, + ParallelBlockTableScanWork pbscanwork, ParallelBlockTableScanDesc pbscan);