diff --git a/contrib/bloom/blinsert.c b/contrib/bloom/blinsert.c
index 1fcb281..4eb5d16 100644
--- a/contrib/bloom/blinsert.c
+++ b/contrib/bloom/blinsert.c
@@ -135,7 +135,8 @@ blbuild(Relation heap, Relation index, IndexInfo *indexInfo)
/* Do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- bloomBuildCallback, (void *) &buildstate);
+ bloomBuildCallback, (void *) &buildstate,
+ NULL);
/*
* There are could be some items in cached page. Flush this page if
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index d360fc4..5097333 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2036,6 +2036,46 @@ include_dir 'conf.d'
+
+ max_parallel_workers_maintenance (integer)
+
+ max_parallel_workers_maintenance> configuration parameter
+
+
+
+
+ Sets the maximum number of parallel workers that can be
+ started by a single utility command. Currently, the only
+ parallel utility command that supports the use of parallel
+ workers is CREATE INDEX>, and only when building a
+ B-Tree index. Parallel workers are taken from the pool of
+ processes established by , limited by . Note that the requested
+ number of workers may not actually be available at runtime.
+ If this occurs, the utility operation will run with fewer
+ workers than expected, which may be inefficient. The default
+ value is 2. Setting this value to 0 disables the use of
+ parallel workers by utility commands.
+
+
+
+ Note that parallel utility commands should not consume
+ substantially more memory than equivalent non-parallel
+ operations. This strategy differs from that of parallel
+ query, where resource limits generally apply per worker
+ process. Parallel utility commands treat the resource limit
+ maintenance_work_mem> as a limit to be applied to
+ the entire utility command, regardless of the number of
+ parallel worker processes. However, parallel utility
+ commands may still consume substantially more CPU resources
+ and I/O bandwidth. Note also that temp_file_limit>
+ is a special case where a limit is applied per worker process
+ for a utility command.
+
+
+
+
max_parallel_workers (integer)
@@ -2047,7 +2087,8 @@ include_dir 'conf.d'
Sets the maximum number of workers that the system can support for
parallel queries. The default value is 8. When increasing or
decreasing this value, consider also adjusting
- .
+ and
+ .
Also, note that a setting for this value which is higher than
will have no effect,
since parallel workers are taken from the pool of worker processes
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 6f82033..3d7c88b 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -3835,13 +3835,14 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
sort-start
- (int, bool, int, int, bool)
+ (int, bool, int, int, bool, int)
Probe that fires when a sort operation is started.
arg0 indicates heap, index or datum sort.
arg1 is true for unique-value enforcement.
arg2 is the number of key columns.
arg3 is the number of kilobytes of work memory allowed.
- arg4 is true if random access to the sort result is required.
+ arg4 is true if random access to the sort result is required.
+ arg5 indicates serial, parallel worker, or parallel leader sort.
sort-done
diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 6aac506..a53f31d 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -74,7 +74,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
occurrence is frequent, consider increasing
max_worker_processes and max_parallel_workers
so that more workers can be run simultaneously or alternatively reducing
- max_parallel_workers_per_gather so that the planner
+ and/or so that the planner
requests fewer workers.
@@ -105,31 +106,45 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
-
- When Can Parallel Query Be Used?
+
+ When Can Parallelism Be Used?
There are several settings which can cause the query planner not to
- generate a parallel query plan under any circumstances. In order for
- any parallel query plans whatsoever to be generated, the following
- settings must be configured as indicated.
+ generate a parallel query plan under specific circumstances. In
+ order for any parallel query plans whatsoever to be generated, the
+ following settings must be configured as indicated.
- must be set to a
- value which is greater than zero. This is a special case of the more
- general principle that no more workers should be used than the number
- configured via max_parallel_workers_per_gather.
+ In order to execute any DML query in parallel, must be set to
+ a value which is greater than zero. This is a special case of
+ the more general principle that no more workers should be used
+ than the number configured via
+ max_parallel_workers_per_gather.
+
+
+
+
+
+ In order to execute any DDL query in parallel, must be set to
+ a value which is greater than zero. This is a special case of
+ the more general principle that no more workers should be used
+ than the number configured via
+ max_parallel_workers_maintenance.
must be set to a
- value other than none. Parallel query requires dynamic
- shared memory in order to pass data between cooperating processes.
+ value other than none>. Parallel execution
+ requires dynamic shared memory in order to pass data between
+ cooperating processes.
@@ -177,7 +192,10 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
The query uses any function marked PARALLEL UNSAFE.
Most system-defined functions are PARALLEL SAFE,
but user-defined functions are marked PARALLEL
- UNSAFE by default. See the discussion of
+ UNSAFE by default. Parallel utility commands may also
+ be affected by this restriction, for example with a CREATE
+ INDEX> command for an expression index which has a PARALLEL
+ UNSAFE function in its expression. See the discussion of
.
@@ -196,7 +214,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
The transaction isolation level is serializable. This is
- a limitation of the current implementation.
+ a limitation of the current implementation. Note that parallel
+ utility commands are not affected by this limitation.
@@ -207,7 +226,9 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
that plan in parallel at execution time. If this occurs, the leader
will execute the portion of the plan below the Gather
node entirely by itself, almost as if the Gather node were
- not present. This will happen if any of the following conditions are met:
+ not present. Similarly, parallel utility commands will fall back on
+ serial execution, which is exactly equivalent to conventional serial
+ operation. This will happen if any of the following conditions are met:
@@ -384,6 +405,30 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
+
+ Parallel Sorting
+
+
+ Currently, the use of parallel sorting is limited to
+ CREATE INDEX> utility statement execution for B-Tree
+ indexes. Query plans used for DML> queries cannot
+ directly parallelize any sort operation. For CREATE
+ INDEX>, the underlying table is scanned in parallel. Worker
+ processes perform initial sorting of their portion of the
+ underlying data directly. These portions are later
+ serially> merged in the leader process. The leader
+ process incrementally outputs the final index as it consumes this
+ input from temporary files. The leader process itself
+ participates as a worker, so there will be one more sort worker
+ than the number of parallel worker processes successfully
+ launched. When parallelism is used, CREATE INDEX>
+ always makes use of temporary files, even where an equivalent
+ serial CREATE INDEX> performs the same operation
+ entirely in-memory.
+
+
+
+
Parallel Plan Tips
@@ -396,12 +441,23 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
plan even with very small values of these settings (e.g. after setting
them both to zero), there may be some reason why the query planner is
unable to generate a parallel plan for your query. See
- and
+ and
for information on why this may be
the case.
+ If a utility statement that is expected to do so does not produce
+ a parallel plan, reducing
+ or will have no effect.
+ This is because these costs are only relevant to plans with a
+ Gather> node. However, available working memory can be
+ a limiting factor for the use of parallelism by utility
+ statements. See for more
+ details.
+
+
+
When executing a parallel plan, you can use EXPLAIN (ANALYZE,
VERBOSE) to display per-worker statistics for each plan node.
This may be useful in determining whether the work is being evenly
diff --git a/doc/src/sgml/ref/create_index.sgml b/doc/src/sgml/ref/create_index.sgml
index 92c0090..ecb9c33 100644
--- a/doc/src/sgml/ref/create_index.sgml
+++ b/doc/src/sgml/ref/create_index.sgml
@@ -501,6 +501,11 @@ Indexes:
+ Parallel builds are also supported. Note, however, that the
+ second table scan is not performed in parallel.
+
+
+
Regular index builds permit other regular index builds on the
same table to occur in parallel, but only one concurrent index build
can occur on a table at a time. In both cases, no other types of schema
@@ -565,7 +570,22 @@ Indexes:
dependent on the setting of .
Larger values will reduce the time needed for index creation, so long
as you don't make it larger than the amount of memory really available,
- which would drive the machine into swapping.
+ which would drive the machine into swapping. For index methods
+ that support building indexes in parallel, all participating
+ processes collectively> use working memory bound in
+ size by maintenance_work_mem>. Ensuring that more
+ worker processes may be used by increasing may also reduce the
+ time needed for index creation. Parallel operations may benefit
+ from increasing maintenance_work_mem> in cases where
+ equivalent serial execution sees little or no benefit. This is
+ particularly likely when access to underlying temporary files in
+ parallel enables better utilization of available I/O bandwidth.
+ Note that maintenance_work_mem> also directly influences
+ the number of workers launched, since participant processes must
+ have at least a 32MB share of the total
+ maintenance_work_mem> budget in order for parallelism to
+ be used at all.
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index cafc8fe..fbeba5c 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -706,7 +706,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
* heap blocks in physical order.
*/
reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
- brinbuildCallback, (void *) state);
+ brinbuildCallback, (void *) state, NULL);
/* process the final batch */
form_and_insert_tuple(state);
@@ -1205,7 +1205,7 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
state->bs_currRangeStart = heapBlk;
IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
heapBlk, scanNumBlks,
- brinbuildCallback, (void *) state);
+ brinbuildCallback, (void *) state, NULL);
/*
* Now we update the values obtained by the scan with the placeholder
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 890b79c..6bc13e8 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -391,7 +391,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
* prefers to receive tuples in TID order.
*/
reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
- ginBuildCallback, (void *) &buildstate);
+ ginBuildCallback, (void *) &buildstate, NULL);
/* dump remaining entries to the index */
oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index 2415f00..36e3dcf 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -203,7 +203,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
* Do the heap scan.
*/
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- gistBuildCallback, (void *) &buildstate);
+ gistBuildCallback, (void *) &buildstate, NULL);
/*
* If buffering was used, flush out all the tuples that are still in the
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 0fef60a..d1713dc 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -159,7 +159,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
/* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- hashbuildCallback, (void *) &buildstate);
+ hashbuildCallback, (void *) &buildstate, NULL);
if (buildstate.spool)
{
diff --git a/src/backend/access/hash/hashsort.c b/src/backend/access/hash/hashsort.c
index 41d615d..e7fcab1 100644
--- a/src/backend/access/hash/hashsort.c
+++ b/src/backend/access/hash/hashsort.c
@@ -82,6 +82,7 @@ _h_spoolinit(Relation heap, Relation index, uint32 num_buckets)
hspool->low_mask,
hspool->max_buckets,
maintenance_work_mem,
+ NULL,
false);
return hspool;
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 3acef27..ed1175a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1624,7 +1624,17 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
SpinLockInit(&target->phs_mutex);
target->phs_startblock = InvalidBlockNumber;
pg_atomic_init_u64(&target->phs_nallocated, 0);
- SerializeSnapshot(snapshot, target->phs_snapshot_data);
+ if (IsMVCCSnapshot(snapshot))
+ {
+ SerializeSnapshot(snapshot, target->phs_snapshot_data);
+ target->phs_snapshot_any = false;
+ }
+ else
+ {
+
+ Assert(snapshot == SnapshotAny);
+ target->phs_snapshot_any = true;
+ }
}
/* ----------------
@@ -1652,11 +1662,22 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
Snapshot snapshot;
Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
- snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
- RegisterSnapshot(snapshot);
+
+ if (!parallel_scan->phs_snapshot_any)
+ {
+ /* We store a snapshot, so restore it */
+ snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
+ RegisterSnapshot(snapshot);
+ }
+ else
+ {
+ /* SnapshotAny passed by caller (not serialized) */
+ snapshot = SnapshotAny;
+ }
return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
- true, true, true, false, false, true);
+ true, true, true, false, false,
+ !parallel_scan->phs_snapshot_any);
}
/* ----------------
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 399e6a1..8e3b4e6 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -21,7 +21,6 @@
#include "access/nbtree.h"
#include "access/relscan.h"
#include "access/xlog.h"
-#include "catalog/index.h"
#include "commands/vacuum.h"
#include "pgstat.h"
#include "storage/condition_variable.h"
@@ -29,28 +28,11 @@
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
-#include "tcop/tcopprot.h" /* pgrminclude ignore */
#include "utils/builtins.h"
#include "utils/index_selfuncs.h"
#include "utils/memutils.h"
-/* Working state for btbuild and its callback */
-typedef struct
-{
- bool isUnique;
- bool haveDead;
- Relation heapRel;
- BTSpool *spool;
-
- /*
- * spool2 is needed only when the index is a unique index. Dead tuples are
- * put into spool2 instead of spool in order to avoid uniqueness check.
- */
- BTSpool *spool2;
- double indtuples;
-} BTBuildState;
-
/* Working state needed by btvacuumpage */
typedef struct
{
@@ -104,12 +86,6 @@ typedef struct BTParallelScanDescData
typedef struct BTParallelScanDescData *BTParallelScanDesc;
-static void btbuildCallback(Relation index,
- HeapTuple htup,
- Datum *values,
- bool *isnull,
- bool tupleIsAlive,
- void *state);
static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state,
BTCycleId cycleid);
@@ -172,107 +148,7 @@ bthandler(PG_FUNCTION_ARGS)
IndexBuildResult *
btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
- IndexBuildResult *result;
- double reltuples;
- BTBuildState buildstate;
-
- buildstate.isUnique = indexInfo->ii_Unique;
- buildstate.haveDead = false;
- buildstate.heapRel = heap;
- buildstate.spool = NULL;
- buildstate.spool2 = NULL;
- buildstate.indtuples = 0;
-
-#ifdef BTREE_BUILD_STATS
- if (log_btree_build_stats)
- ResetUsage();
-#endif /* BTREE_BUILD_STATS */
-
- /*
- * We expect to be called exactly once for any index relation. If that's
- * not the case, big trouble's what we have.
- */
- if (RelationGetNumberOfBlocks(index) != 0)
- elog(ERROR, "index \"%s\" already contains data",
- RelationGetRelationName(index));
-
- buildstate.spool = _bt_spoolinit(heap, index, indexInfo->ii_Unique, false);
-
- /*
- * If building a unique index, put dead tuples in a second spool to keep
- * them out of the uniqueness check.
- */
- if (indexInfo->ii_Unique)
- buildstate.spool2 = _bt_spoolinit(heap, index, false, true);
-
- /* do the heap scan */
- reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- btbuildCallback, (void *) &buildstate);
-
- /* okay, all heap tuples are indexed */
- if (buildstate.spool2 && !buildstate.haveDead)
- {
- /* spool2 turns out to be unnecessary */
- _bt_spooldestroy(buildstate.spool2);
- buildstate.spool2 = NULL;
- }
-
- /*
- * Finish the build by (1) completing the sort of the spool file, (2)
- * inserting the sorted tuples into btree pages and (3) building the upper
- * levels.
- */
- _bt_leafbuild(buildstate.spool, buildstate.spool2);
- _bt_spooldestroy(buildstate.spool);
- if (buildstate.spool2)
- _bt_spooldestroy(buildstate.spool2);
-
-#ifdef BTREE_BUILD_STATS
- if (log_btree_build_stats)
- {
- ShowUsage("BTREE BUILD STATS");
- ResetUsage();
- }
-#endif /* BTREE_BUILD_STATS */
-
- /*
- * Return statistics
- */
- result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
-
- result->heap_tuples = reltuples;
- result->index_tuples = buildstate.indtuples;
-
- return result;
-}
-
-/*
- * Per-tuple callback from IndexBuildHeapScan
- */
-static void
-btbuildCallback(Relation index,
- HeapTuple htup,
- Datum *values,
- bool *isnull,
- bool tupleIsAlive,
- void *state)
-{
- BTBuildState *buildstate = (BTBuildState *) state;
-
- /*
- * insert the index tuple into the appropriate spool file for subsequent
- * processing
- */
- if (tupleIsAlive || buildstate->spool2 == NULL)
- _bt_spool(buildstate->spool, &htup->t_self, values, isnull);
- else
- {
- /* dead tuples are put into spool2 */
- buildstate->haveDead = true;
- _bt_spool(buildstate->spool2, &htup->t_self, values, isnull);
- }
-
- buildstate->indtuples += 1;
+ return _bt_dobuild(heap, index, indexInfo);
}
/*
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index bf6c03c..b0e55bd 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -14,14 +14,17 @@
* its parent level. When we have only one page on a level, it must be
* the root -- it can be attached to the btree metapage and we are done.
*
- * This code is moderately slow (~10% slower) compared to the regular
- * btree (insertion) build code on sorted or well-clustered data. On
- * random data, however, the insertion build code is unusable -- the
- * difference on a 60MB heap is a factor of 15 because the random
- * probes into the btree thrash the buffer pool. (NOTE: the above
- * "10%" estimate is probably obsolete, since it refers to an old and
- * not very good external sort implementation that used to exist in
- * this module. tuplesort.c is almost certainly faster.)
+ * Parallel index builds are supported. We launch and coordinate worker
+ * processes when parallelism is requested. Each process has a "partial"
+ * tuplesort, whose input comes from the parallel heap scan that each worker
+ * must join. Finally, a separate leader tuplesort outputs source tuples
+ * serially, when all worker states are finished and quiescent. Source
+ * tuples are loaded in the same manner as conventional serial btree builds.
+ * Note that tuple queues are not used for transferring data between
+ * processes, in contrast to the executor's parallel infrastructure, which
+ * uses them extensively. Instead, most IPC is outsourced to tuplesort.c,
+ * which is optimized for the high latency bulk transfer of data that
+ * characterizes large sort operations.
*
* It is not wise to pack the pages entirely full, since then *any*
* insertion would cause a split (and not only of the leaf page; the need
@@ -67,28 +70,156 @@
#include "postgres.h"
#include "access/nbtree.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
+#include "access/xact.h"
#include "access/xlog.h"
#include "access/xloginsert.h"
+#include "catalog/catalog.h"
+#include "catalog/index.h"
#include "miscadmin.h"
+#include "optimizer/planner.h"
#include "storage/smgr.h"
-#include "tcop/tcopprot.h"
+#include "tcop/tcopprot.h" /* pgrminclude ignore */
#include "utils/rel.h"
#include "utils/sortsupport.h"
#include "utils/tuplesort.h"
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_BTREE_SHARED UINT64CONST(0xA000000000000001)
+#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002)
+#define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003)
+
+/*
+ * A parallel sort with one worker process, and without any leader-as-worker
+ * state may be used for testing the parallel tuplesort infrastructure.
+ */
+#ifdef NOT_USED
+#define FORCE_SINGLE_WORKER
+#endif
+
/*
* Status record for spooling/sorting phase. (Note we may have two of
* these due to the special requirements for uniqueness-checking with
* dead tuples.)
*/
-struct BTSpool
+typedef struct BTSpool
{
Tuplesortstate *sortstate; /* state data for tuplesort.c */
Relation heap;
Relation index;
bool isunique;
-};
+} BTSpool;
+
+/*
+ * Status for index builds performed in parallel. This is allocated in a
+ * dynamic shared memory segment. Note that there is a separate tuplesort TOC
+ * entry, private to tuplesort.c but allocated by this module on its behalf.
+ */
+typedef struct BTShared
+{
+ /*
+ * These fields are not modified throughout the sort. They primarily
+ * exist for the benefit of worker processes, that need to create BTSpool
+ * state corresponding to that used by the leader.
+ */
+ Oid heaprelid;
+ Oid indexrelid;
+ bool isunique;
+ bool isconcurrent;
+ int scantuplesortstates;
+
+ /*
+ * mutex protects all fields before heapdesc.
+ *
+ * These fields contain status information of interest to B-Tree index
+ * builds, that must work just the same when an index is built in
+ * parallel.
+ */
+ slock_t mutex;
+
+ /*
+ * reltuples is the total number of input heap tuples for an index build.
+ *
+ * havedead and indtuples aggregate state that is collected by
+ * _bt_build_callback() across all workers. These are needed because a
+ * unified BTBuildState is only present in leader process.
+ */
+ double reltuples;
+ bool havedead;
+ double indtuples;
+
+ /*
+ * This variable-sized field must come last.
+ *
+ * See _bt_parallel_estimate_shared().
+ */
+ ParallelHeapScanDescData heapdesc;
+} BTShared;
+
+/*
+ * Status for leader in parallel index build.
+ */
+typedef struct BTLeader
+{
+ /* parallel context itself */
+ ParallelContext *pcxt;
+
+ /*
+ * nworkertuplesorts is the exact number of worker processes successfully
+ * launched, plus the leader process if it participates as a worker.
+ */
+ int nworkertuplesorts;
+
+ /*
+ * Leader-as-worker state.
+ *
+ * workersort is the leader-as-worker tuplesort state. workersort2 is the
+ * corresponding btspool2 state, used only when building unique indexes.
+ */
+ Tuplesortstate *workersort;
+ Tuplesortstate *workersort2;
+
+ /*
+ * Leader process convenience pointers to shared state (leader avoids TOC
+ * lookups).
+ *
+ * btshared is the shared state for entire build. sharedsort is the
+ * shared, tuplesort-managed state passed to each process tuplesort.
+ * sharedsort2 is the corresponding btspool2 shared state, used only when
+ * building unique indexes. snapshot is the snapshot used by the scan,
+ * that leader reliably holds a reference to, regardless of whether is
+ * participates in the scan as a worker.
+ */
+ BTShared *btshared;
+ Sharedsort *sharedsort;
+ Sharedsort *sharedsort2;
+ Snapshot snapshot;
+} BTLeader;
+
+/* Working state for btbuild and its callback */
+typedef struct BTBuildState
+{
+ bool isunique;
+ bool havedead;
+ Relation heap;
+ BTSpool *spool;
+
+ /*
+ * spool2 is needed only when the index is a unique index. Dead tuples are
+ * put into spool2 instead of spool in order to avoid uniqueness check.
+ */
+ BTSpool *spool2;
+ double indtuples;
+
+ /*
+ * btleader is only present when a parallel index build is performed, and
+ * only in leader process (actually, only the leader has a BTBuildState.
+ * Workers have their own spool and spool2, though.)
+ */
+ BTLeader *btleader;
+} BTBuildState;
/*
* Status record for a btree page being built. We have one of these
@@ -128,6 +259,14 @@ typedef struct BTWriteState
} BTWriteState;
+static double _bt_heapscan(Relation heap, Relation index,
+ BTBuildState *buildstate, IndexInfo *indexInfo);
+static void _bt_spooldestroy(BTSpool *btspool);
+static void _bt_spool(BTSpool *btspool, ItemPointer self,
+ Datum *values, bool *isnull);
+static void _bt_leafbuild(BTBuildState buildstate);
+static void _bt_build_callback(Relation index, HeapTuple htup, Datum *values,
+ bool *isnull, bool tupleIsAlive, void *state);
static Page _bt_blnewpage(uint32 level);
static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
static void _bt_slideleft(Page page);
@@ -138,45 +277,265 @@ static void _bt_buildadd(BTWriteState *wstate, BTPageState *state,
static void _bt_uppershutdown(BTWriteState *wstate, BTPageState *state);
static void _bt_load(BTWriteState *wstate,
BTSpool *btspool, BTSpool *btspool2);
+static BTLeader *_bt_begin_parallel(BTSpool *btspool, bool isconcurrent,
+ int request, bool leaderasworker);
+static void _bt_end_parallel(BTLeader *btleader);
+static void _bt_leader_sort_as_worker(BTBuildState *buildstate);
+static double _bt_leader_wait_for_workers(BTBuildState *buildstate);
+
+/* static void _bt_worker_main(dsm_segment *seg, shm_toc *toc); */
+static Size _bt_parallel_estimate_shared(Snapshot snapshot);
+static void _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
+ BTShared * btshared, Sharedsort *sharedsort,
+ Sharedsort *sharedsort2, int sortmem);
/*
- * Interface routines
+ * Main interface routine
*/
+IndexBuildResult *
+_bt_dobuild(Relation heap, Relation index, IndexInfo *indexInfo)
+{
+ IndexBuildResult *result;
+ BTBuildState buildstate;
+ double reltuples;
+
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ ResetUsage();
+#endif /* BTREE_BUILD_STATS */
+
+ buildstate.isunique = indexInfo->ii_Unique;
+ buildstate.havedead = false;
+ buildstate.heap = heap;
+ buildstate.spool = NULL;
+ buildstate.spool2 = NULL;
+ buildstate.indtuples = 0;
+ buildstate.btleader = NULL;
+
+ /*
+ * We expect to be called exactly once for any index relation. If that's
+ * not the case, big trouble's what we have.
+ */
+ if (RelationGetNumberOfBlocks(index) != 0)
+ elog(ERROR, "index \"%s\" already contains data",
+ RelationGetRelationName(index));
+
+ reltuples = _bt_heapscan(heap, index, &buildstate, indexInfo);
+
+ /*
+ * Finish the build by (1) completing the sort of the spool file, (2)
+ * inserting the sorted tuples into btree pages and (3) building the upper
+ * levels.
+ */
+ _bt_leafbuild(buildstate);
+ _bt_spooldestroy(buildstate.spool);
+ if (buildstate.spool2)
+ _bt_spooldestroy(buildstate.spool2);
+
+ /* Shut down leader (and its workers) if required */
+ if (buildstate.btleader)
+ _bt_end_parallel(buildstate.btleader);
+
+ result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
+
+ result->heap_tuples = reltuples;
+ result->index_tuples = buildstate.indtuples;
+
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ {
+ /*
+ * For serial builds, the entire build is summarized. Otherwise, just
+ * the leader processing portion of the build is summarized.
+ */
+ if (!buildstate.btleader)
+ ShowUsage("BTREE BUILD STATS");
+ else
+ ShowUsage("BTREE LEADER PROCESSING STATS");
+
+ ResetUsage();
+ }
+#endif /* BTREE_BUILD_STATS */
+ return result;
+}
+
+
+/*
+ * Internal routines.
+ */
/*
- * create and initialize a spool structure
+ * Create and initialize one or two spool structures, and save them in
+ * caller's buildstate argument (a second spool is required for unique
+ * index builds). Scan the heap, possibly in parallel, filling spools
+ * with IndexTuples.
+ *
+ * Returns the total number of heap tuples scanned.
*/
-BTSpool *
-_bt_spoolinit(Relation heap, Relation index, bool isunique, bool isdead)
+static double
+_bt_heapscan(Relation heap, Relation index, BTBuildState *buildstate,
+ IndexInfo *indexInfo)
{
BTSpool *btspool = (BTSpool *) palloc0(sizeof(BTSpool));
- int btKbytes;
+ SortCoordinate coordinate = NULL;
+ double reltuples = 0;
+ bool force_single_worker = false;
- btspool->heap = heap;
- btspool->index = index;
- btspool->isunique = isunique;
+#ifdef FORCE_SINGLE_WORKER
+
+ /*
+ * Forcing there to be only one parallel worker and no leader-as-worker
+ * ensures that every process involved behaves deterministically
+ */
+ force_single_worker = true;
+#endif
/*
* We size the sort area as maintenance_work_mem rather than work_mem to
* speed index creation. This should be OK since a single backend can't
- * run multiple index creations in parallel. Note that creation of a
- * unique index actually requires two BTSpool objects. We expect that the
- * second one (for dead tuples) won't get very full, so we give it only
- * work_mem.
+ * run multiple index creations in parallel (see also: notes on
+ * parallelism and maintenance_work_mem below). Note that creation of a
+ * unique index actually requires two BTSpool objects.
+ */
+ btspool->heap = heap;
+ btspool->index = index;
+ btspool->isunique = indexInfo->ii_Unique;
+
+ /*
+ * Parallel index builds on catalog relations are unsupported. This is
+ * because reindex_relation() will call RelationSetIndexList() to forcibly
+ * disable the use of indexes on the table being reindexed (e.g., when
+ * REINDEX_REL_SUPPRESS_INDEX_USE is specified by CLUSTER caller), and
+ * that is not propagated to workers.
+ */
+ if (!IsCatalogRelation(heap))
+ {
+ Oid table = RelationGetRelid(btspool->heap);
+ Oid index = RelationGetRelid(btspool->index);
+ int request;
+
+ /* Determine optimal number of worker processes, and attempt launch */
+ request = plan_create_index_workers(table, index);
+
+ if (request > 0)
+ {
+ if (force_single_worker)
+ request = 1;
+ buildstate->btleader = _bt_begin_parallel(btspool,
+ indexInfo->ii_Concurrent,
+ request,
+ !force_single_worker);
+ }
+ }
+
+ /*
+ * If no workers could actually be launched (btleader is NULL), perform
+ * serial index build by passing NULL as coordinate argument
*/
- btKbytes = isdead ? work_mem : maintenance_work_mem;
- btspool->sortstate = tuplesort_begin_index_btree(heap, index, isunique,
- btKbytes, false);
+ if (buildstate->btleader)
+ {
+ coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+ coordinate->isWorker = false;
+ coordinate->nLaunched = buildstate->btleader->nworkertuplesorts;
+ coordinate->sharedsort = buildstate->btleader->sharedsort;
+ }
+
+ /*
+ * Begin tuplesort for this process (there will be worker process
+ * tuplesorts already underway by now in parallel case).
+ *
+ * In cases where parallelism is involved, the leader always gets a
+ * standard maintenance_work_mem budget here, just like a serial sort.
+ * Parallel worker states receive only a fraction of maintenance_work_mem,
+ * though. With only limited exception, there is no overlap in memory
+ * allocations among the leader process and worker processes (we rely on
+ * tuplesort.c not allocating the vast majority of memory needed by leader
+ * until output is first consumed, and we rely on most memory being
+ * released by tuplesort.c when workers are put into a quiescent state).
+ *
+ * The overall effect is that maintenance_work_mem always represents an
+ * absolute high watermark on the amount of memory used by a CREATE INDEX
+ * operation, regardless of the use of parallelism or any other factor.
+ */
+ btspool->sortstate = tuplesort_begin_index_btree(heap, index,
+ buildstate->isunique,
+ maintenance_work_mem,
+ coordinate, false);
+
+ /* Save as primary spool (any worker processes have their own) */
+ buildstate->spool = btspool;
+
+ /*
+ * If building a unique index, put dead tuples in a second spool to keep
+ * them out of the uniqueness check.
+ */
+ if (indexInfo->ii_Unique)
+ {
+ BTSpool *btspool2 = (BTSpool *) palloc0(sizeof(BTSpool));
+ SortCoordinate coordinate2 = NULL;
+
+ /* Initialize secondary spool */
+ btspool2->heap = heap;
+ btspool2->index = index;
+ btspool2->isunique = false;
+
+ if (buildstate->btleader)
+ {
+ /*
+ * Set up non-private state that is passed to
+ * tuplesort_begin_index_btree() about the basic high level
+ * coordination of a parallel sort.
+ */
+ coordinate2 = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+ coordinate2->isWorker = false;
+ coordinate2->nLaunched = buildstate->btleader->nworkertuplesorts;
+ coordinate2->sharedsort = buildstate->btleader->sharedsort2;
+ }
+
+ /*
+ * We expect that the second one (for dead tuples) won't get very
+ * full, so we give it only work_mem
+ */
+ btspool2->sortstate = tuplesort_begin_index_btree(heap, index,
+ false, work_mem,
+ coordinate2, false);
+ /* Save as secondary spool */
+ buildstate->spool2 = btspool2;
+ }
- return btspool;
+ if (!buildstate->btleader)
+ {
+ /* Spool is to be sorted serially. Do heap scan. */
+ reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
+ _bt_build_callback, (void *) buildstate,
+ NULL);
+ /* okay, all heap tuples are indexed */
+ if (buildstate->spool2 && !buildstate->havedead)
+ {
+ /* spool2 turns out to be unnecessary */
+ _bt_spooldestroy(buildstate->spool2);
+ buildstate->spool2 = NULL;
+ }
+ }
+ else
+ {
+ /* Parallel scans are underway by now. Join as worker. */
+ if (!force_single_worker)
+ _bt_leader_sort_as_worker(buildstate);
+
+ /* Wait on worker processes to finish (should be almost instant) */
+ reltuples = _bt_leader_wait_for_workers(buildstate);
+ }
+
+ return reltuples;
}
/*
* clean up a spool structure and its substructures.
*/
-void
+static void
_bt_spooldestroy(BTSpool *btspool)
{
tuplesort_end(btspool->sortstate);
@@ -186,7 +545,7 @@ _bt_spooldestroy(BTSpool *btspool)
/*
* spool an index entry into the sort file.
*/
-void
+static void
_bt_spool(BTSpool *btspool, ItemPointer self, Datum *values, bool *isnull)
{
tuplesort_putindextuplevalues(btspool->sortstate, btspool->index,
@@ -194,16 +553,18 @@ _bt_spool(BTSpool *btspool, ItemPointer self, Datum *values, bool *isnull)
}
/*
- * given a spool loaded by successive calls to _bt_spool,
- * create an entire btree.
+ * given state with a spool loaded by successive calls to _bt_spool, create
+ * an entire btree.
*/
-void
-_bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
+static void
+_bt_leafbuild(BTBuildState buildstate)
{
+ BTSpool *btspool = buildstate.spool;
+ BTSpool *btspool2 = buildstate.spool2;
BTWriteState wstate;
#ifdef BTREE_BUILD_STATS
- if (log_btree_build_stats)
+ if (log_btree_build_stats && !buildstate.btleader)
{
ShowUsage("BTREE BUILD (Spool) STATISTICS");
ResetUsage();
@@ -214,6 +575,12 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
if (btspool2)
tuplesort_performsort(btspool2->sortstate);
+ /* Release worker tuplesorts within leader process as soon as possible */
+ if (buildstate.btleader && buildstate.btleader->workersort)
+ tuplesort_end(buildstate.btleader->workersort);
+ if (buildstate.btleader && buildstate.btleader->workersort2)
+ tuplesort_end(buildstate.btleader->workersort2);
+
wstate.heap = btspool->heap;
wstate.index = btspool->index;
@@ -231,11 +598,34 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
_bt_load(&wstate, btspool, btspool2);
}
-
/*
- * Internal routines.
+ * Per-tuple callback from IndexBuildHeapScan
*/
+static void
+_bt_build_callback(Relation index,
+ HeapTuple htup,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *state)
+{
+ BTBuildState *buildstate = (BTBuildState *) state;
+
+ /*
+ * insert the index tuple into the appropriate spool file for subsequent
+ * processing
+ */
+ if (tupleIsAlive || buildstate->spool2 == NULL)
+ _bt_spool(buildstate->spool, &htup->t_self, values, isnull);
+ else
+ {
+ /* dead tuples are put into spool2 */
+ buildstate->havedead = true;
+ _bt_spool(buildstate->spool2, &htup->t_self, values, isnull);
+ }
+ buildstate->indtuples += 1;
+}
/*
* allocate workspace for a new, clean btree page, not linked to any siblings.
@@ -819,3 +1209,515 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
smgrimmedsync(wstate->index->rd_smgr, MAIN_FORKNUM);
}
}
+
+/*
+ * _bt_begin_parallel - Creates parallel context, and launches workers for
+ * leader.
+ *
+ * btspool argument should be initialized (with the exception of the tuplesort
+ * state, which may be created using shared state initially created here).
+ *
+ * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY, which
+ * uses an MVCC snapshot.
+ *
+ * request is the target number of parallel workers to launch.
+ *
+ * leaderasworker indicates if leader process participates in scanning the
+ * target heap relation and sorting runs (whether or not it participates as a
+ * worker).
+ *
+ * Returns a BTLeader, which contains state sufficient to have leader process
+ * coordinate entire parallel sort (or sorts, if a second spool is required).
+ * If not even a single worker process can be launched, returns NULL,
+ * indicating to caller that parallel sort should not be coordinated.
+ */
+static BTLeader *
+_bt_begin_parallel(BTSpool *btspool, bool isconcurrent, int request,
+ bool leaderasworker)
+{
+ ParallelContext *pcxt;
+ int scantuplesortstates;
+ Snapshot snapshot;
+ Size estspool;
+ Size estsort;
+ BTShared *btshared;
+ Sharedsort *sharedsort;
+ Sharedsort *sharedsort2;
+ BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader));
+
+ /*
+ * Enter parallel mode, and create context for parallel build of btree
+ * index
+ */
+ EnterParallelMode();
+ pcxt = CreateParallelContext("postgres", "_bt_worker_main", request, true);
+
+ /*
+ * Caller typically but not always wants leader process to participate in
+ * scan and sort step (i.e. to behave as a worker does initially).
+ */
+ Assert(request > 0);
+ scantuplesortstates = leaderasworker ? request + 1 : request;
+
+ /*
+ * Prepare for scan of the base relation. In a normal index build, we use
+ * SnapshotAny because we must retrieve all tuples and do our own time
+ * qual checks (because we have to index RECENTLY_DEAD tuples). In a
+ * concurrent build, or during bootstrap, we take a regular MVCC snapshot
+ * and index whatever's live according to that.
+ */
+ if (!isconcurrent)
+ snapshot = SnapshotAny;
+ else
+ snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+ /*
+ * Estimate size for two keys -- nbtsort.c shared workspace, and
+ * tuplesort-private shared workspace
+ */
+ estspool = _bt_parallel_estimate_shared(snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, estspool);
+ estsort = tuplesort_estimate_shared(scantuplesortstates);
+ shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+ /* Unique case requires a second spool, and so a third shared workspace */
+ if (!btspool->isunique)
+ shm_toc_estimate_keys(&pcxt->estimator, 2);
+ else
+ {
+ /* Account for PARALLEL_KEY_TUPLESORT_SPOOL2 */
+ shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+ shm_toc_estimate_keys(&pcxt->estimator, 3);
+ }
+
+ /* Everyone's had a chance to ask for space, so now create the DSM */
+ InitializeParallelDSM(pcxt);
+
+ /*
+ * Store shared build state, for which we reserved space
+ */
+ btshared = (BTShared *) shm_toc_allocate(pcxt->toc, estspool);
+ /* Initialize immutable state */
+ btshared->heaprelid = RelationGetRelid(btspool->heap);
+ btshared->indexrelid = RelationGetRelid(btspool->index);
+ btshared->isunique = btspool->isunique;
+ btshared->isconcurrent = isconcurrent;
+ btshared->scantuplesortstates = scantuplesortstates;
+ /* Initialize mutable state */
+ SpinLockInit(&btshared->mutex);
+ btshared->reltuples = 0.0;
+ btshared->havedead = false;
+ btshared->indtuples = 0.0;
+ heap_parallelscan_initialize(&btshared->heapdesc, btspool->heap, snapshot);
+
+ /*
+ * Store shared tuplesort-private state, for which we reserved space.
+ * Then, initialize opaque state using tuplesort routine.
+ */
+ sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+ tuplesort_initialize_shared(sharedsort, scantuplesortstates,
+ pcxt->seg);
+
+ /* Add TOC entries */
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_BTREE_SHARED, btshared);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
+
+ /* Unique case requires a second spool, and associated shared state */
+ if (!btspool->isunique)
+ sharedsort2 = NULL;
+ else
+ {
+ /*
+ * Store additional shared tuplesort-private state, for which we
+ * reserved space. Then, initialize opaque state using tuplesort
+ * routine.
+ */
+ sharedsort2 = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+ tuplesort_initialize_shared(sharedsort2, scantuplesortstates,
+ pcxt->seg);
+
+ /* Add additional TOC entry */
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT_SPOOL2, sharedsort2);
+ }
+
+ /*
+ * Launch workers to sort runs.
+ *
+ * Save parallel context information within leader state.
+ */
+ LaunchParallelWorkers(pcxt);
+ btleader->pcxt = pcxt;
+ btleader->nworkertuplesorts = pcxt->nworkers_launched;
+ if (leaderasworker)
+ btleader->nworkertuplesorts++;
+ btleader->workersort = NULL;
+ btleader->workersort2 = NULL;
+ btleader->btshared = btshared;
+ btleader->sharedsort = sharedsort;
+ btleader->sharedsort2 = sharedsort2;
+ btleader->snapshot = snapshot;
+
+ /* If no worker processes were launched, index build should be serial */
+ if (pcxt->nworkers_launched == 0)
+ {
+ _bt_end_parallel(btleader);
+ return NULL;
+ }
+
+ return btleader;
+}
+
+/*
+ * _bt_end_parallel - end parallel mode
+ *
+ * Shuts down workers, and destroys parallel context.
+ *
+ * This has to be called after _bt_load() has consumed any tuples that it
+ * requires, because quiesced worker processes need to remain around until
+ * leader processing completes, per tuplesort contract. Though passive, they
+ * still manage the lifetime of state that is shared with the leader.
+ */
+static void
+_bt_end_parallel(BTLeader *btleader)
+{
+ /* Shutdown worker processes */
+ WaitForParallelWorkersToFinish(btleader->pcxt);
+ /* Free last reference to MVCC snapshot, if one was used */
+ if (IsMVCCSnapshot(btleader->snapshot))
+ UnregisterSnapshot(btleader->snapshot);
+ DestroyParallelContext(btleader->pcxt);
+ ExitParallelMode();
+}
+
+/*
+ * _bt_leader_sort_as_worker - within leader, participate as a parallel worker
+ *
+ * The leader's Tuplesortstate for sorting runs is distinct from the
+ * Tuplesortstate it uses for consuming final output. The leader-as-worker
+ * case is minimally divergent from the regular worker case.
+ *
+ * Consuming even just the first index tuple output by the leader
+ * Tuplesortstate is dependent on workers having completely finished
+ * already, so there is nothing for the leader process to do when control
+ * reaches here. The leader process might as well participate as a worker
+ * (plus, we want to be consistent with other parallel operations).
+ */
+static void
+_bt_leader_sort_as_worker(BTBuildState *buildstate)
+{
+ BTLeader *btleader = buildstate->btleader;
+ BTSpool *leaderworker;
+ BTSpool *leaderworker2;
+ int sortmem;
+
+ /* Allocate memory for leader-as-worker's own private spool */
+ leaderworker = (BTSpool *) palloc0(sizeof(BTSpool));
+
+ /* Initialize leader-as-worker's own spool */
+ leaderworker->heap = buildstate->spool->heap;
+ leaderworker->index = buildstate->spool->index;
+ leaderworker->isunique = buildstate->spool->isunique;
+
+ /* Initialize leader-as-worker's own second spool, if required */
+ if (!btleader->btshared->isunique)
+ leaderworker2 = NULL;
+ else
+ {
+ /* Allocate memory for worker's own private secondary spool */
+ leaderworker2 = (BTSpool *) palloc0(sizeof(BTSpool));
+
+ /* Initialize worker's own secondary spool */
+ leaderworker2->heap = leaderworker->heap;
+ leaderworker2->index = leaderworker->index;
+ leaderworker2->isunique = false;
+ }
+
+ /*
+ * Call entry point also called by worker processes, to initialize
+ * sortstates, scan, and perform sort step.
+ *
+ * This will cause parent BTBuildState to store leader-as-worker pg_class
+ * statistics (e.g., BTBuildState.indtuples), since BTBuildState for
+ * entire sort is reused. This does not matter, because
+ * _bt_parallel_scan_and_sort() also aggregates this in shared memory,
+ * which is what we'll go on to use later.
+ *
+ * Might as well use reliable figure when doling out leader-as-worker
+ * maintenance_work_mem (when requested number of workers were not
+ * launched, this will be somewhat higher than it is for other workers).
+ */
+ sortmem = Max(maintenance_work_mem / btleader->nworkertuplesorts, 64);
+ _bt_parallel_scan_and_sort(leaderworker, leaderworker2, btleader->btshared,
+ btleader->sharedsort, btleader->sharedsort2,
+ sortmem);
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ {
+ ShowUsage("BTREE BUILD (Leader with Worker Spool) STATISTICS");
+ ResetUsage();
+ }
+#endif /* BTREE_BUILD_STATS */
+
+ /* Store tuplesort state for shut down later */
+ btleader->workersort = leaderworker->sortstate;
+ if (leaderworker2)
+ btleader->workersort2 = leaderworker2->sortstate;
+}
+
+/*
+ * _bt_leader_wait_for_workers - within leader, wait for worker processes to
+ * finish
+ */
+static double
+_bt_leader_wait_for_workers(BTBuildState *buildstate)
+{
+ BTLeader *btleader = buildstate->btleader;
+ double reltuples = 0;
+
+ /* Have tuplesort actually wait for leader to finish */
+ tuplesort_leader_wait(buildstate->spool->sortstate);
+ /* Be tidy; wait on second spool's tuplesort */
+ if (buildstate->spool2)
+ tuplesort_leader_wait(buildstate->spool2->sortstate);
+
+ /*
+ * Pass caller the sum of each worker's heap and index reltuples, which
+ * are accumulated for all workers. Caller needs this for ambuild
+ * statistics. This may overwrite leader-as-worker values already set.
+ */
+ SpinLockAcquire(&btleader->btshared->mutex);
+ buildstate->havedead = btleader->btshared->havedead;
+ reltuples = btleader->btshared->reltuples;
+ buildstate->indtuples = btleader->btshared->indtuples;
+ SpinLockRelease(&btleader->btshared->mutex);
+
+ return reltuples;
+}
+
+/*
+ * _bt_worker_main - Perform work within a launched parallel process.
+ *
+ * Parallel workers sort one or more runs only. Merging of these runs occurs
+ * serially, within the leader process (which also processes some input as a
+ * worker).
+ */
+void
+_bt_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+ BTSpool *btspool;
+ BTSpool *btspool2;
+ BTShared *btshared;
+ Sharedsort *sharedsort;
+ Sharedsort *sharedsort2;
+ Relation heapRel;
+ Relation indexRel;
+ LOCKMODE heapLockmode;
+ LOCKMODE indexLockmode;
+ int sortmem;
+
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ ResetUsage();
+#endif /* BTREE_BUILD_STATS */
+
+ /* Allocate memory for worker's own private spool */
+ btspool = (BTSpool *) palloc0(sizeof(BTSpool));
+
+ /* Look up shared state */
+ btshared = shm_toc_lookup(toc, PARALLEL_KEY_BTREE_SHARED, false);
+
+ /*
+ * Open relations using lockmodes known to be obtained by leader's caller
+ * within index.c
+ */
+ if (!btshared->isconcurrent)
+ heapLockmode = ShareLock;
+ else
+ heapLockmode = ShareUpdateExclusiveLock;
+
+ /* REINDEX and CREATE INDEX [CONCURRENTLY] use an AccessExclusiveLock */
+ indexLockmode = AccessExclusiveLock;
+
+ /* Open relations here, for worker */
+ heapRel = heap_open(btshared->heaprelid, heapLockmode);
+ indexRel = index_open(btshared->indexrelid, indexLockmode);
+
+ /* Initialize worker's own spool */
+ btspool->heap = heapRel;
+ btspool->index = indexRel;
+ btspool->isunique = btshared->isunique;
+
+ /* Look up shared state private to tuplesort.c */
+ sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
+ tuplesort_attach_shared(sharedsort, seg);
+ if (!btshared->isunique)
+ {
+ btspool2 = NULL;
+ sharedsort2 = NULL;
+ }
+ else
+ {
+ /* Allocate memory for worker's own private secondary spool */
+ btspool2 = (BTSpool *) palloc0(sizeof(BTSpool));
+
+ /* Initialize worker's own secondary spool */
+ btspool2->heap = btspool->heap;
+ btspool2->index = btspool->index;
+ btspool2->isunique = false;
+ /* Look up shared state private to tuplesort.c */
+ sharedsort2 = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT_SPOOL2, false);
+ tuplesort_attach_shared(sharedsort2, seg);
+ }
+
+ /* Perform sorting of spool, and possibly a spool2 */
+ sortmem = Max(maintenance_work_mem / btshared->scantuplesortstates, 64);
+ _bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
+ sharedsort2, sortmem);
+
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ {
+ ShowUsage("BTREE BUILD (Worker Spool) STATISTICS");
+ ResetUsage();
+ }
+#endif /* BTREE_BUILD_STATS */
+
+ /*
+ * End sort operation. This worker no longer required when released from
+ * the wait that is about to happen here.
+ */
+ tuplesort_end(btspool->sortstate);
+ /* Do the same for second worker spool, if any */
+ if (btspool2)
+ tuplesort_end(btspool2->sortstate);
+
+ index_close(indexRel, indexLockmode);
+ heap_close(heapRel, heapLockmode);
+}
+
+/*
+ * _bt_parallel_estimate_shared - returns the size need to store the given
+ * shared state for a parallel btree index build. (This does not include any
+ * shared Tuplesortstates.)
+ *
+ * This includes room for the ParallelHeapScanDesc's serialized snapshot, if
+ * that's required.
+ */
+static Size
+_bt_parallel_estimate_shared(Snapshot snapshot)
+{
+ /* Frequently, only a special snapshot (SnapshotAny) is used */
+ if (!IsMVCCSnapshot(snapshot))
+ {
+ Assert(snapshot == SnapshotAny);
+ return sizeof(BTShared);
+ }
+
+ return add_size(offsetof(BTShared, heapdesc) +
+ offsetof(ParallelHeapScanDescData, phs_snapshot_data),
+ EstimateSnapshotSpace(snapshot));
+}
+
+/*
+ * _bt_parallel_scan_and_sort - perform a worker's portion of a parallel sort.
+ *
+ * This generates a tuplesort for passed btspool, and a second tuplesort state
+ * if a second btspool is based (for unique index builds).
+ *
+ * The second spool sort is not a good target for parallel sort, since
+ * typically there is very little to sort, but it's convenient to be
+ * consistent.
+ *
+ * sortmem is the amount of working memory to use within each worker,
+ * expressed in KBs. It must be at least the work_mem GUC's minimum allowed
+ * value, 64.
+ */
+static void
+_bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
+ BTShared *btshared, Sharedsort *sharedsort,
+ Sharedsort *sharedsort2, int sortmem)
+{
+ SortCoordinate coordinate;
+ BTBuildState *heapcallbackstate;
+ HeapScanDesc scan;
+ double reltuples;
+ bool havedead = false;
+ IndexInfo *indexInfo;
+
+ /*
+ * Initialize local tuplesort coordination state.
+ */
+ coordinate = palloc0(sizeof(SortCoordinateData));
+ coordinate->isWorker = true;
+ coordinate->nLaunched = -1;
+ coordinate->sharedsort = sharedsort;
+
+ /* Begin sort of run(s) */
+ btspool->sortstate = tuplesort_begin_index_btree(btspool->heap,
+ btspool->index,
+ btspool->isunique,
+ sortmem, coordinate,
+ false);
+
+ /*
+ * Initialize heap scan state, in preparation for joining scan.
+ */
+ heapcallbackstate = (BTBuildState *) palloc0(sizeof(BTBuildState));
+ heapcallbackstate->isunique = btshared->isunique;
+ heapcallbackstate->havedead = false;
+ heapcallbackstate->heap = btspool->heap;
+ heapcallbackstate->spool = btspool;
+ heapcallbackstate->spool2 = btspool2;
+ heapcallbackstate->indtuples = 0;
+ heapcallbackstate->btleader = NULL;
+
+ /*
+ * Just as with serial case, may require a second spool (this one is
+ * worker-private instance, though; there will also be a leader-only
+ * second spool for merging worker's second spools)
+ */
+ if (heapcallbackstate->spool2)
+ {
+ SortCoordinate coordinate2;
+
+ /*
+ * We expect that the second one (for dead tuples) won't get very
+ * full, so we give it only work_mem (unless sortmem is less for
+ * worker). Worker processes are generally permitted to allocate
+ * work_mem independently.
+ */
+ coordinate2 = palloc0(sizeof(SortCoordinateData));
+ coordinate2->isWorker = true;
+ coordinate2->nLaunched = -1;
+ coordinate2->sharedsort = sharedsort2;
+ heapcallbackstate->spool2->sortstate =
+ tuplesort_begin_index_btree(btspool->heap, btspool->index, false,
+ Min(sortmem, work_mem), coordinate2,
+ false);
+ }
+
+ /* Join parallel scan */
+ indexInfo = BuildIndexInfo(btspool->index);
+ scan = heap_beginscan_parallel(btspool->heap, &btshared->heapdesc);
+ reltuples = IndexBuildHeapScan(btspool->heap, btspool->index, indexInfo,
+ true, _bt_build_callback,
+ (void *) heapcallbackstate, scan);
+
+ /* Execute this workers part of the sort */
+ tuplesort_performsort(btspool->sortstate);
+
+ if (btspool2)
+ {
+ tuplesort_performsort(btspool2->sortstate);
+ havedead = heapcallbackstate->havedead;
+ }
+
+ /*
+ * Done. Leave a way for leader to determine we're finished. Record how
+ * many tuples were in this worker's share of the relation.
+ */
+ SpinLockAcquire(&btshared->mutex);
+ btshared->havedead = btshared->havedead || havedead;
+ btshared->reltuples += reltuples;
+ btshared->indtuples += heapcallbackstate->indtuples;
+ SpinLockRelease(&btshared->mutex);
+}
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index 80b82e1..0992f7e 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -138,7 +138,8 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
ALLOCSET_DEFAULT_SIZES);
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- spgistBuildCallback, (void *) &buildstate);
+ spgistBuildCallback, (void *) &buildstate,
+ NULL);
MemoryContextDelete(buildstate.tmpCtx);
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
index 5c33c40..f4d62ef 100644
--- a/src/backend/access/transam/README.parallel
+++ b/src/backend/access/transam/README.parallel
@@ -82,8 +82,10 @@ are engaged via EnterParallelMode(), which should be called before creating
a parallel context, and disarmed via ExitParallelMode(), which should be
called after all parallel contexts have been destroyed. The most
significant restriction imposed by parallel mode is that all operations must
-be strictly read-only; we allow no writes to the database and no DDL. We
-might try to relax these restrictions in the future.
+be strictly read-only; we allow no writes to the database and no DDL. Note
+that parallel CREATE INDEX doesn't actually perform any WAL-logging in
+workers, and so sidesteps this restriction. We might try to relax these
+restrictions in the future.
To make as many operations as possible safe in parallel mode, we try to copy
the most important pieces of state from the initiating backend to each parallel
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 1f542ed..6a44d40 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
#include "postgres.h"
+#include "access/nbtree.h"
#include "access/parallel.h"
#include "access/session.h"
#include "access/xact.h"
@@ -124,6 +125,9 @@ static const struct
{
{
"ParallelQueryMain", ParallelQueryMain
+ },
+ {
+ "_bt_worker_main", _bt_worker_main
}
};
@@ -140,7 +144,7 @@ static parallel_worker_main_type LookupParallelWorkerFunction(const char *librar
*/
ParallelContext *
CreateParallelContext(const char *library_name, const char *function_name,
- int nworkers)
+ int nworkers, bool serializable_okay)
{
MemoryContext oldcontext;
ParallelContext *pcxt;
@@ -163,7 +167,7 @@ CreateParallelContext(const char *library_name, const char *function_name,
* workers, at least not until somebody enhances that mechanism to be
* parallel-aware.
*/
- if (IsolationIsSerializable())
+ if (IsolationIsSerializable() && !serializable_okay)
nworkers = 0;
/* We might be running in a short-lived memory context. */
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index c7b2f03..322aa22 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -866,7 +866,7 @@ index_create(Relation heapRelation,
Assert(indexRelationId == RelationGetRelid(indexRelation));
/*
- * Obtain exclusive lock on it. Although no other backends can see it
+ * Obtain exclusive lock on it. Although no other transactions can see it
* until we commit, this prevents deadlock-risk complaints from lock
* manager in cases such as CLUSTER.
*/
@@ -2177,13 +2177,14 @@ IndexBuildHeapScan(Relation heapRelation,
IndexInfo *indexInfo,
bool allow_sync,
IndexBuildCallback callback,
- void *callback_state)
+ void *callback_state,
+ HeapScanDesc scan)
{
return IndexBuildHeapRangeScan(heapRelation, indexRelation,
indexInfo, allow_sync,
false,
0, InvalidBlockNumber,
- callback, callback_state);
+ callback, callback_state, scan);
}
/*
@@ -2205,11 +2206,11 @@ IndexBuildHeapRangeScan(Relation heapRelation,
BlockNumber start_blockno,
BlockNumber numblocks,
IndexBuildCallback callback,
- void *callback_state)
+ void *callback_state,
+ HeapScanDesc scan)
{
bool is_system_catalog;
bool checking_uniqueness;
- HeapScanDesc scan;
HeapTuple heapTuple;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
@@ -2219,7 +2220,9 @@ IndexBuildHeapRangeScan(Relation heapRelation,
EState *estate;
ExprContext *econtext;
Snapshot snapshot;
- TransactionId OldestXmin;
+ bool need_register_snapshot = true;
+ TransactionId OldestXmin = InvalidTransactionId; /* not for CONCURRENTLY
+ * bootstrap cases */
BlockNumber root_blkno = InvalidBlockNumber;
OffsetNumber root_offsets[MaxHeapTuplesPerPage];
@@ -2255,34 +2258,56 @@ IndexBuildHeapRangeScan(Relation heapRelation,
/* Set up execution state for predicate, if any. */
predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
- /*
- * Prepare for scan of the base relation. In a normal index build, we use
- * SnapshotAny because we must retrieve all tuples and do our own time
- * qual checks (because we have to index RECENTLY_DEAD tuples). In a
- * concurrent build, or during bootstrap, we take a regular MVCC snapshot
- * and index whatever's live according to that.
- */
- if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent)
+ if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
{
- snapshot = RegisterSnapshot(GetTransactionSnapshot());
- OldestXmin = InvalidTransactionId; /* not used */
+ OldestXmin = GetOldestXmin(heapRelation, true);
- /* "any visible" mode is not compatible with this */
- Assert(!anyvisible);
+ /*
+ * We won't need to register a snapshot (and haven't already, in the
+ * parallel/"scan != NULL" case)
+ */
+ need_register_snapshot = false;
}
- else
+
+ if (!scan)
{
- snapshot = SnapshotAny;
- /* okay to ignore lazy VACUUMs here */
- OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
+ /*
+ * Prepare for scan of the base relation. In a normal index build, we
+ * use SnapshotAny because we must retrieve all tuples and do our own
+ * time qual checks (because we have to index RECENTLY_DEAD tuples).
+ * In a concurrent build, or during bootstrap, we take a regular MVCC
+ * snapshot and index whatever's live according to that.
+ */
+ if (need_register_snapshot)
+ {
+ snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+ /* "any visible" mode is not compatible with this */
+ Assert(!anyvisible);
+ }
+ else
+ {
+
+ snapshot = SnapshotAny;
+ need_register_snapshot = false;
+ /* okay to ignore lazy VACUUMs here */
+ }
+ scan =
+ heap_beginscan_strat(heapRelation, /* relation */
+ snapshot, /* snapshot */
+ 0, /* number of keys */
+ NULL, /* scan key */
+ true, /* buffer access strategy OK */
+ allow_sync); /* syncscan OK? */
}
+ else
+ {
- scan = heap_beginscan_strat(heapRelation, /* relation */
- snapshot, /* snapshot */
- 0, /* number of keys */
- NULL, /* scan key */
- true, /* buffer access strategy OK */
- allow_sync); /* syncscan OK? */
+ /* parallel caller did this for us (it manages its own scan) */
+ need_register_snapshot = false;
+ /* so we do our own timequal check below: */
+ snapshot = scan->rs_snapshot;
+ }
/* set our scan endpoints */
if (!allow_sync)
@@ -2613,8 +2638,8 @@ IndexBuildHeapRangeScan(Relation heapRelation,
heap_endscan(scan);
- /* we can now forget our snapshot, if set */
- if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent)
+ /* we can now forget our snapshot, if set and registered here directly */
+ if (need_register_snapshot)
UnregisterSnapshot(snapshot);
ExecDropSingleTupleTableSlot(slot);
@@ -2857,7 +2882,7 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot)
state.tuplesort = tuplesort_begin_datum(INT8OID, Int8LessOperator,
InvalidOid, false,
maintenance_work_mem,
- false);
+ NULL, false);
state.htups = state.itups = state.tups_inserted = 0;
(void) index_bulk_delete(&ivinfo, NULL,
@@ -3651,6 +3676,14 @@ reindex_relation(Oid relid, int flags, int options)
* for preventing catalog lookups from using that index. We also make use
* of this to catch attempted uses of user indexes during reindexing of
* those indexes.
+ *
+ * This support code isn't reliable when called from within a parallel
+ * worker process due to the fact that our state isn't propagated. This is
+ * why parallel index builds are disallowed on catalogs. It is possible
+ * that we'll fail to catch an attempted use of a user index undergoing
+ * reindexing due the non-propagation of this state to workers, which is not
+ * ideal, but the problem is not particularly likely to go undetected due to
+ * our not doing better here.
* ----------------------------------------------------------------
*/
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 48f1e6e..64205d4 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -891,7 +891,8 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
/* Set up sorting if wanted */
if (use_sort)
tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex,
- maintenance_work_mem, false);
+ maintenance_work_mem,
+ NULL, false);
else
tuplesort = NULL;
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 89114af..7fbe8fd 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -368,6 +368,10 @@ DefineIndex(Oid relationId,
* this will typically require the caller to have already locked the
* relation. To avoid lock upgrade hazards, that lock should be at least
* as strong as the one we take here.
+ *
+ * NB: If the lock strength here ever changes, code that is run by
+ * parallel workers under the control of certain particular ambuild
+ * functions will need to be updated, too.
*/
lockmode = stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock;
rel = heap_open(relationId, lockmode);
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 1b477ba..71628e4 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -425,7 +425,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
pstmt_data = ExecSerializePlan(planstate->plan, estate);
/* Create a parallel context. */
- pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
+ pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false);
pei->pcxt = pcxt;
/*
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index d26ce08..7943b9b 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -666,7 +666,7 @@ initialize_phase(AggState *aggstate, int newphase)
sortnode->collations,
sortnode->nullsFirst,
work_mem,
- false);
+ NULL, false);
}
aggstate->current_phase = newphase;
@@ -743,7 +743,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
pertrans->sortOperators[0],
pertrans->sortCollations[0],
pertrans->sortNullsFirst[0],
- work_mem, false);
+ work_mem, NULL, false);
}
else
pertrans->sortstates[aggstate->current_set] =
@@ -753,7 +753,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
pertrans->sortOperators,
pertrans->sortCollations,
pertrans->sortNullsFirst,
- work_mem, false);
+ work_mem, NULL, false);
}
/*
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 98bcaeb..a52ec41 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -93,7 +93,7 @@ ExecSort(PlanState *pstate)
plannode->collations,
plannode->nullsFirst,
work_mem,
- node->randomAccess);
+ NULL, node->randomAccess);
if (node->bounded)
tuplesort_set_bound(tuplesortstate, node->bound);
node->tuplesortstate = (void *) tuplesortstate;
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 9b7a8fd..5022e45 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -275,10 +275,11 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* We can't use parallelism in serializable mode because the predicate
* locking code is not parallel-aware. It's not catastrophic if someone
* tries to run a parallel plan in serializable mode; it just won't get
- * any workers and will run serially. But it seems like a good heuristic
- * to assume that the same serialization level will be in effect at plan
- * time and execution time, so don't generate a parallel plan if we're in
- * serializable mode.
+ * any workers and will run serially (since serializable mode is not
+ * indicated as okay when CreateParallelContext() is called). But it
+ * seems like a good heuristic to assume that the same serialization level
+ * will be in effect at plan time and execution time, so don't generate a
+ * parallel plan if we're in serializable mode.
*/
if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster &&
@@ -6194,3 +6195,144 @@ get_partitioned_child_rels_for_join(PlannerInfo *root, Relids join_relids)
return result;
}
+
+/*
+ * plan_create_index_workers
+ * Use the planner to decide how many parallel workers CREATE INDEX
+ * should use
+ *
+ * tableOid is the table that index is to be built on. indexOid is the OID
+ * of a index to be created or reindexed (which must be a btree index).
+ *
+ * Return value is the number of parallel workers to use, which can often be
+ * treated as a suggestion. However, a return value of 0 may be due to it
+ * being unsafe to proceed. We assume that the leader process participates
+ * as a worker, but return value does not count the leader process.
+ *
+ * Note: caller had better already hold some type of lock on the table and
+ * index.
+ */
+int
+plan_create_index_workers(Oid tableOid, Oid indexOid)
+{
+ PlannerInfo *root;
+ Query *query;
+ PlannerGlobal *glob;
+ RangeTblEntry *rte;
+ Relation heap;
+ Relation index;
+ RelOptInfo *rel;
+ int parallel_workers;
+ long min_sort_mem_blocks;
+ long sort_mem_blocks;
+ int min_parallel_workers;
+ BlockNumber heap_blocks;
+ double reltuples;
+ double allvisfrac;
+
+ /*
+ * Fast-path: Return immediately when parallelism disabled. Note that we
+ * deliberately don't consider force_parallel_mode.
+ */
+ if (max_parallel_workers_maintenance == 0)
+ return 0;
+
+ /* Set up largely-dummy planner state */
+ query = makeNode(Query);
+ query->commandType = CMD_SELECT;
+
+ glob = makeNode(PlannerGlobal);
+
+ root = makeNode(PlannerInfo);
+ root->parse = query;
+ root->glob = glob;
+ root->query_level = 1;
+ root->planner_cxt = CurrentMemoryContext;
+ root->wt_param_id = -1;
+
+ /*
+ * Build a minimal RTE.
+ *
+ * Set the target's table to be an inheritance parent. This is a kludge
+ * that prevents problems within get_relation_info(), which does not
+ * expect that any IndexOptInfo is currently undergoing REINDEX.
+ */
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = tableOid;
+ rte->relkind = RELKIND_RELATION; /* Don't be too picky. */
+ rte->lateral = false;
+ rte->inh = true;
+ rte->inFromCl = true;
+ query->rtable = list_make1(rte);
+
+ /* Set up RTE/RelOptInfo arrays */
+ setup_simple_rel_arrays(root);
+
+ /* Build RelOptInfo */
+ rel = build_simple_rel(root, 1, NULL);
+
+ heap = heap_open(tableOid, NoLock);
+ index = index_open(indexOid, NoLock);
+
+ /*
+ * Determine if it's safe to proceed.
+ *
+ * Currently, parallel workers can't access the leader's temporary tables.
+ * Furthermore, any index predicate or index expressions must be parallel
+ * safe.
+ */
+ if (heap->rd_rel->relpersistence == RELPERSISTENCE_TEMP ||
+ !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index)) ||
+ !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index)))
+ {
+ parallel_workers = 0;
+ goto done;
+ }
+
+ /*
+ * Estimate heap relation size ourselves, since rel->pages cannot be
+ * trusted (heap RTE was marked as inheritance parent)
+ */
+ estimate_rel_size(heap, NULL, &heap_blocks, &reltuples, &allvisfrac);
+
+ /*
+ * Establish a minimum number of workers to impose from here on, based on
+ * force_parallel_mode GUC
+ */
+ min_parallel_workers = force_parallel_mode == FORCE_PARALLEL_OFF ? 0 : 1;
+
+ /* Determine number of workers as a parallel sequential scan would */
+ parallel_workers = compute_parallel_worker(rel, heap_blocks, -1);
+ parallel_workers = Max(min_parallel_workers, parallel_workers);
+
+ /*
+ * Cap workers based on available maintenance_work_mem as needed.
+ *
+ * Note that each tuplesort participant receives an even share of the
+ * total maintenance_work_mem budget. Aim to leave workers (where
+ * leader-as-worker Tuplesortstate counts as a worker) with no less than
+ * 32MB of memory. This leaves cases where maintenance_work_mem is set to
+ * 64MB immediately past the threshold of being capable of launching a
+ * single parallel worker to sort.
+ */
+ sort_mem_blocks = (maintenance_work_mem * 1024L) / BLCKSZ;
+ min_sort_mem_blocks = (32768L * 1024L) / BLCKSZ;
+ while (parallel_workers > min_parallel_workers &&
+ sort_mem_blocks / (parallel_workers + 1) < min_sort_mem_blocks)
+ parallel_workers--;
+
+done:
+
+ /*
+ * In no case use more than max_parallel_workers_maintenance workers.
+ * (This includes cases where force_parallel_mode is not turned off.)
+ */
+ Assert(max_parallel_workers_maintenance > 0);
+ parallel_workers = Min(parallel_workers, max_parallel_workers_maintenance);
+ index_close(index, NoLock);
+ heap_close(heap, NoLock);
+
+ elog(LOG, "Worker for create index %d", parallel_workers);
+ return parallel_workers;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 5c256ff..485df21 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3622,6 +3622,12 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_REPLICATION_SLOT_DROP:
event_name = "ReplicationSlotDrop";
break;
+ case WAIT_EVENT_PARALLEL_TUPLESORT_WORKER:
+ event_name = "ParallelTuplesortWorker";
+ break;
+ case WAIT_EVENT_PARALLEL_TUPLESORT_LEADER:
+ event_name = "ParallelTuplesortLeader";
+ break;
case WAIT_EVENT_SAFE_SNAPSHOT:
event_name = "SafeSnapshot";
break;
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index d1a6410..8f6cea1 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -805,3 +805,73 @@ BufFileTellBlock(BufFile *file)
}
#endif
+
+/*
+ * Return the current file size. Counts any holes created by
+ * BufFileViewAppend as part of the size.
+ */
+off_t
+BufFileSize(BufFile *file)
+{
+ return ((file->numFiles - 1) * (off_t) MAX_PHYSICAL_FILESIZE) +
+ FileGetSize(file->files[file->numFiles - 1]);
+}
+
+/*
+ * Create a read-only view of an existing BufFile. This is useful as a
+ * starting point for concatenating a set of files. The source BufFile must
+ * not be closed or modified while the view is used.
+ */
+BufFile *
+BufFileView(BufFile *source)
+{
+ BufFile *file = (BufFile *) palloc(sizeof(BufFile));
+ int i;
+
+ file->numFiles = source->numFiles;
+ file->files = (File *) palloc(sizeof(File) * source->numFiles);
+ file->offsets = (off_t *) palloc(sizeof(off_t) * source->numFiles);
+ for (i = 0; i < source->numFiles; ++i)
+ {
+ file->files[i] = source->files[i];
+ file->offsets[i] = 0L;
+ }
+ file->isInterXact = false;
+ file->dirty = false;
+ file->readOnly = true;
+ file->resowner = CurrentResourceOwner;
+ file->curFile = 0;
+ file->curOffset = 0L;
+ file->pos = 0;
+ file->nbytes = 0;
+
+ return file;
+}
+
+/*
+ * Append the contents of file 'b' onto the end of file 'a'. This operation
+ * works by manipulating lists of segment files, so conceptually the file
+ * content is always appended at a MAX_PHYSICAL_FILESIZE boundary, potentially
+ * creating a hole which cannot be read. Return the block number within a at
+ * which the content from 'b' begins.
+ */
+long
+BufFileViewAppend(BufFile *file, BufFile *source)
+{
+ long start_block = file->numFiles * BUFFILE_SEG_SIZE;
+ int newNumFiles = file->numFiles + source->numFiles;
+ int i;
+
+ file->files = (File *)
+ repalloc(file->files, sizeof(File) * newNumFiles);
+ file->offsets = (off_t *)
+ repalloc(file->offsets, sizeof(off_t) * newNumFiles);
+ for (i = file->numFiles; i < newNumFiles; ++i)
+ {
+ file->files[i] = source->files[i - file->numFiles];
+ file->offsets[i] = 0L;
+ }
+ file->numFiles = newNumFiles;
+
+ return start_block;
+}
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 825ab32..6a29cd2 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -2262,6 +2262,16 @@ FileGetRawMode(File file)
}
/*
+ * FileGetSize
+ */
+off_t
+FileGetSize(File file)
+{
+ Assert(FileIsValid(file));
+ return VfdCache[file].fileSize;
+}
+
+/*
* Make room for another allocatedDescs[] array entry if needed and possible.
* Returns true if an array element is available.
*/
diff --git a/src/backend/utils/adt/orderedsetaggs.c b/src/backend/utils/adt/orderedsetaggs.c
index 1e323d9..07435ed 100644
--- a/src/backend/utils/adt/orderedsetaggs.c
+++ b/src/backend/utils/adt/orderedsetaggs.c
@@ -291,6 +291,7 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
qstate->sortCollations,
qstate->sortNullsFirsts,
work_mem,
+ NULL,
qstate->rescan_needed);
else
osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
@@ -298,6 +299,7 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
qstate->sortCollation,
qstate->sortNullsFirst,
work_mem,
+ NULL,
qstate->rescan_needed);
osastate->number_of_rows = 0;
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 9680a4b..49b1824 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -112,6 +112,7 @@ bool enableFsync = true;
bool allowSystemTableMods = false;
int work_mem = 1024;
int maintenance_work_mem = 16384;
+int max_parallel_workers_maintenance = 2;
/*
* Primary determinants of sizes of shared-memory structures.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index c4c1afa..9665300 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2706,6 +2706,16 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"max_parallel_workers_maintenance", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+ gettext_noop("Sets the maximum number of parallel processes per maintenance operation."),
+ NULL
+ },
+ &max_parallel_workers_maintenance,
+ 2, 0, 1024,
+ NULL, NULL, NULL
+ },
+
+ {
{"max_parallel_workers_per_gather", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
gettext_noop("Sets the maximum number of parallel processes per executor node."),
NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 368b280..ec4ca9c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -162,6 +162,7 @@
#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching
#max_worker_processes = 8 # (change requires restart)
+#max_parallel_workers_maintenance = 2 # taken from max_worker_processes
#max_parallel_workers_per_gather = 2 # taken from max_parallel_workers
#max_parallel_workers = 8 # maximum number of max_worker_processes that
# can be used in parallel queries
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index 214dc71..e3d24b4 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -52,7 +52,7 @@ provider postgresql {
probe query__done(const char *);
probe statement__status(const char *);
- probe sort__start(int, bool, int, int, bool);
+ probe sort__start(int, bool, int, int, bool, int);
probe sort__done(bool, long);
probe buffer__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool);
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 5ebb6fb..bfc206c 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -64,6 +64,13 @@
* care that all calls for a single LogicalTapeSet are made in the same
* palloc context.
*
+ * To support parallel sort operations involving coordinated callers to
+ * tuplesort.c routines across multiple workers, it is necessary to unify
+ * each worker BufFile/tapeset into one single leader-wise logical tapeset.
+ * Workers should have produced one final materialized tape (their entire
+ * output) when this happens in leader; there will always be the same number
+ * of runs as input tapes, and the same number of input tapes as workers.
+ *
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
@@ -75,9 +82,10 @@
#include "postgres.h"
-#include "storage/buffile.h"
+#include "utils/builtins.h"
#include "utils/logtape.h"
#include "utils/memutils.h"
+#include "storage/buffile.h"
/*
* A TapeBlockTrailer is stored at the end of each BLCKSZ block.
@@ -129,16 +137,21 @@ typedef struct LogicalTape
* a frozen tape. (When reading from an unfrozen tape, we use a larger
* read buffer that holds multiple blocks, so the "current" block is
* ambiguous.)
+ *
+ * When unification of worker tape BufFiles is performed, an offset to the
+ * first block in the unified BufFile space is applied during reads.
*/
long firstBlockNumber;
long curBlockNumber;
long nextBlockNumber;
+ long offsetBlockNumber;
/*
* Buffer for current data block(s).
*/
char *buffer; /* physical buffer (separately palloc'd) */
int buffer_size; /* allocated size of the buffer */
+ int max_size; /* highest useful, safe buffer_size */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
} LogicalTape;
@@ -159,10 +172,13 @@ struct LogicalTapeSet
* by ltsGetFreeBlock(), and it is always greater than or equal to
* nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
* blocks that have been allocated for a tape, but have not been written
- * to the underlying file yet.
+ * to the underlying file yet. nHoleBlocks tracks the total number of
+ * blocks that are in unused holes between worker spaces following BufFile
+ * unification (these are still logically allocated).
*/
long nBlocksAllocated; /* # of blocks allocated */
long nBlocksWritten; /* # of blocks used in underlying file */
+ long nHoleBlocks; /* # of "hole" blocks left by unification */
/*
* We store the numbers of recycled-and-available blocks in freeBlocks[].
@@ -192,6 +208,8 @@ static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static long ltsGetFreeBlock(LogicalTapeSet *lts);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
+static void ltsUnify(LogicalTapeSet *lts, TapeShare *tapes,
+ SharedFileSet *fileset);
/*
@@ -213,6 +231,11 @@ ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
* previous tape isn't flushed to disk until the end of the sort, so you
* get one-block hole, where the last block of the previous tape will
* later go.
+ *
+ * Note that BufFile unification can leave pockets of "holes" in BufFile,
+ * between worker-owned block ranges. These are tracked for reporting
+ * purposes only. We never read from nor write to these hole blocks, and
+ * so they are not considered here.
*/
while (blocknum > lts->nBlocksWritten)
{
@@ -267,15 +290,18 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
do
{
char *thisbuf = lt->buffer + lt->nbytes;
+ long datablocknum = lt->nextBlockNumber;
/* Fetch next block number */
- if (lt->nextBlockNumber == -1L)
+ if (datablocknum == -1L)
break; /* EOF */
+ /* Apply worker offset, needed for leader tapesets */
+ datablocknum += lt->offsetBlockNumber;
/* Read the block */
- ltsReadBlock(lts, lt->nextBlockNumber, (void *) thisbuf);
+ ltsReadBlock(lts, datablocknum, (void *) thisbuf);
if (!lt->frozen)
- ltsReleaseBlock(lts, lt->nextBlockNumber);
+ ltsReleaseBlock(lts, datablocknum);
lt->curBlockNumber = lt->nextBlockNumber;
lt->nbytes += TapeBlockGetNBytes(thisbuf);
@@ -371,12 +397,93 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
}
/*
+ * Claim ownership of a set of logical tapes from existing temp files.
+ *
+ * Caller should be leader process. Though tapes are marked as frozen in
+ * workers, they are not frozen when opened within leader, since unfrozen tapes
+ * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
+ * for random access.)
+ */
+static void
+ltsUnify(LogicalTapeSet *lts, TapeShare *tapes, SharedFileSet *fileset)
+{
+ LogicalTape *lt = NULL;
+ long nphysicalblocks = 0L;
+ int i;
+
+ Assert(lts->nTapes >= 1);
+
+ /* Pass worker BufFile pieces, and a placeholder leader piece */
+ for (i = 0; i < lts->nTapes; i++)
+ {
+ lt = <s->tapes[i];
+
+ /*
+ * Build concatenated view of all BufFiles, remembering the block
+ * number where each source file begins.
+ */
+ if (i < lts->nTapes - 1)
+ {
+ char filename[MAXPGPATH] = {0};
+ BufFile *file;
+
+ pg_itoa(i, filename);
+ file = BufFileOpenShared(fileset, filename);
+
+ /* Alter worker's tape state (generic values okay for leader) */
+ lt->firstBlockNumber = tapes[i].firstblocknumber;
+ /* Build concatenated view and remember where each file starts. */
+ if (i == 0)
+ {
+ lts->pfile = BufFileView(file);
+ lt->offsetBlockNumber = 0;
+ }
+ else
+ {
+ lt->offsetBlockNumber = BufFileViewAppend(lts->pfile, file);
+ }
+ /* Don't allocate more for read buffer than could possibly help */
+ lt->max_size = Min(MaxAllocSize, tapes[i].buffilesize);
+ nphysicalblocks += tapes[i].buffilesize / BLCKSZ;
+ }
+ }
+
+ /* Use initial leader offset as # of logically allocated blocks */
+ lts->nBlocksWritten = lts->nBlocksAllocated = lt->offsetBlockNumber;
+ lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
+
+ /*
+ * Rather than actually keeping the offset set for leader tape, its offset
+ * is set to zero. There are no pre-existing block numbers in
+ * leader-owned portion of unified BufFile that we'd otherwise fail to
+ * apply our offset to, since of course it hasn't been written to yet.
+ * This is why write/recycle routines don't need to know about offsets at
+ * all.
+ */
+ lts->tapes[lts->nTapes - 1].offsetBlockNumber = 0L;
+}
+
+/*
* Create a set of logical tapes in a temporary underlying file.
*
- * Each tape is initialized in write state.
+ * Each tape is initialized in write state. Serial callers pass ntapes, but
+ * NULL arguments for everything else. Parallel worker callers pass a
+ * shared handle and worker number, but tapeset should be NULL. Leader
+ * passes worker -1, a shared handle, and shared tape metadata. These are
+ * used to claim ownership of worker tapes.
+ *
+ * Leader caller is passing back an array of metadata each worker captured
+ * when LogicalTapeFreeze() was called for their final result tapes. Passed
+ * tapes array is actually sized ntapes - 1, because it includes only worker
+ * tapes, whereas leader requires and requests its own leader tape. Note
+ * that we rely on the assumption that reclaimed worker tapes will only be
+ * read from once by leader, and never written to again (tapes are
+ * initialized for writing, but that's only to be consistent). Leader may
+ * write only to its own tape.
*/
LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes)
+LogicalTapeSetCreate(int ntapes, TapeShare *tapes, SharedFileSet *fileset,
+ int worker)
{
LogicalTapeSet *lts;
LogicalTape *lt;
@@ -388,9 +495,9 @@ LogicalTapeSetCreate(int ntapes)
Assert(ntapes > 0);
lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
ntapes * sizeof(LogicalTape));
- lts->pfile = BufFileCreateTemp(false);
lts->nBlocksAllocated = 0L;
lts->nBlocksWritten = 0L;
+ lts->nHoleBlocks = 0L;
lts->forgetFreeSpace = false;
lts->blocksSorted = true; /* a zero-length array is sorted ... */
lts->freeBlocksLen = 32; /* reasonable initial guess */
@@ -412,11 +519,29 @@ LogicalTapeSetCreate(int ntapes)
lt->dirty = false;
lt->firstBlockNumber = -1L;
lt->curBlockNumber = -1L;
+ lt->nextBlockNumber = -1L;
+ lt->offsetBlockNumber = 0L;
+ /* palloc() larger than MaxAllocSize would fail */
lt->buffer = NULL;
lt->buffer_size = 0;
+ lt->max_size = MaxAllocSize;
lt->pos = 0;
lt->nbytes = 0;
}
+
+ /* Create temp BufFile storage as required */
+ if (tapes)
+ ltsUnify(lts, tapes, fileset);
+ else if (fileset)
+ {
+ char filename[MAXPGPATH] = {0};
+
+ pg_itoa(worker, filename);
+ lts->pfile = BufFileCreateShared(fileset, filename);
+ }
+ else
+ lts->pfile = BufFileCreateTemp(false);
+
return lts;
}
@@ -470,6 +595,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->writing);
+ Assert(lt->offsetBlockNumber == 0L);
/* Allocate data buffer and first block on first write */
if (lt->buffer == NULL)
@@ -566,12 +692,9 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
if (buffer_size < BLCKSZ)
buffer_size = BLCKSZ;
- /*
- * palloc() larger than MaxAllocSize would fail (a multi-gigabyte
- * buffer is unlikely to be helpful, anyway)
- */
- if (buffer_size > MaxAllocSize)
- buffer_size = MaxAllocSize;
+ /* palloc() larger than max_size is unlikely to be helpful */
+ if (buffer_size > lt->max_size)
+ buffer_size = lt->max_size;
/* round down to BLCKSZ boundary */
buffer_size -= buffer_size % BLCKSZ;
@@ -698,15 +821,25 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
* tape is rewound (after rewind is too late!). It performs a rewind
* and switch to read mode "for free". An immediately following rewind-
* for-read call is OK but not necessary.
+ *
+ * Returns metadata about storage used for tape after freezing, which
+ * may be passed to LogicalTapeSetCreate within leader process later.
+ * This metadata is only of interest to worker callers, and only when
+ * freezing their final output for leader (single materialized tape).
+ * Note that this routine flushes dirty buffer out as needed, ensuring
+ * that leader will be able to subsequently unify output tape
+ * successfully.
*/
-void
+TapeShare
LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
{
LogicalTape *lt;
+ TapeShare result;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->writing);
+ Assert(lt->offsetBlockNumber == 0L);
/*
* Completion of a write phase. Flush last partial data block, and rewind
@@ -749,6 +882,12 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
else
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
lt->nbytes = TapeBlockGetNBytes(lt->buffer);
+
+ /* Return metadata for unifiable (not unified) case */
+ result.firstblocknumber = lt->firstBlockNumber;
+ result.buffilesize = BufFileSize(lts->pfile);
+
+ return result;
}
/*
@@ -862,9 +1001,6 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
/*
* Obtain current position in a form suitable for a later LogicalTapeSeek.
- *
- * NOTE: it'd be OK to do this during write phase with intention of using
- * the position for a seek after freezing. Not clear if anyone needs that.
*/
void
LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
@@ -874,6 +1010,7 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
+ Assert(lt->frozen);
/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
Assert(lt->buffer_size == BLCKSZ);
@@ -888,5 +1025,5 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long
LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
- return lts->nBlocksAllocated;
+ return lts->nBlocksAllocated - lts->nHoleBlocks;
}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 34af8d6..d849049 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -74,6 +74,16 @@
* above. Nonetheless, with large workMem we can have many tapes (but not
* too many -- see the comments in tuplesort_merge_order).
*
+ * This module supports parallel sorting. Parallel sorts involve coordination
+ * among one or more worker processes, and a leader process, each with its own
+ * tuplesort state. The leader process (or, more accurately, the
+ * Tuplesortstate associated with a leader process) creates a unified, full
+ * tapeset consisting of worker tapes with one run to merge; a run for every
+ * worker process. This is merged in the leader process. Worker processes
+ * are guaranteed to produce exactly one output run from their partial
+ * input, perhaps by performing some amount of merging of runs (this happens
+ * when worker's workMem budget constrains what can fit in memory in worker).
+ *
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -91,17 +101,23 @@
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "access/hash.h"
+#include "access/parallel.h"
#include "catalog/index.h"
#include "catalog/pg_am.h"
#include "commands/tablespace.h"
#include "executor/executor.h"
#include "miscadmin.h"
#include "pg_trace.h"
+#include "pgstat.h"
+#include "storage/condition_variable.h"
+#include "storage/sharedfileset.h"
+#include "storage/spin.h"
#include "utils/datum.h"
#include "utils/logtape.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_rusage.h"
+#include "utils/logtape.h"
#include "utils/rel.h"
#include "utils/sortsupport.h"
#include "utils/tuplesort.h"
@@ -113,6 +129,10 @@
#define DATUM_SORT 2
#define CLUSTER_SORT 3
+/* Sort parallel code from state for sort__start probes */
+#define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \
+ (state)->worker >= 0 ? 1 : 2)
+
/* GUC variables */
#ifdef TRACE_SORT
bool trace_sort = false;
@@ -196,8 +216,8 @@ typedef enum
} TupSortStatus;
/*
- * Parameters for calculation of number of tapes to use --- see inittapes()
- * and tuplesort_merge_order().
+ * Parameters for calculation of number of tapes to use --- see
+ * tuplesort_merge_order().
*
* In this calculation we assume that each tape will cost us about 1 blocks
* worth of buffer space. This ignores the overhead of all the other data
@@ -375,6 +395,30 @@ struct Tuplesortstate
bool markpos_eof; /* saved "eof_reached" */
/*
+ * These variables are used during parallel sorting.
+ *
+ * worker is our worker identifier. Follows the general convention that
+ * -1 value relates to a leader tuplesort, and values >= 0 worker
+ * tuplesorts (though, leader process typically has both at once).
+ *
+ * shared is shared memory state, which identifies the sort for the
+ * benefit of workers, and gives workers a place to stash metadata needed
+ * by the leader.
+ *
+ * nLaunched is the number of worker Tuplesortstates known by the leader
+ * to have actually been launched, which implies that they must finish a
+ * run leader can merge if the sort is to complete successfully. Typically
+ * includes a worker state held by the leader process itself. Set in the
+ * leader Tuplesortstate only, where we need to establish condition for
+ * leader state to let tuplesort caller release all worker processes it
+ * has launched. This has to happen up-front, before leader state waits on
+ * workers to finish.
+ */
+ int worker;
+ Sharedsort *shared;
+ int nLaunched;
+
+ /*
* The sortKeys variable is used by every case other than the hash index
* case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
* MinimalTuple and CLUSTER routines, though.
@@ -436,6 +480,47 @@ struct Tuplesortstate
};
/*
+ * Private state of tuplesort-parallel-operation. This is allocated in shared
+ * memory. Multiple individual tuplesort-operation states (Tuplesortstate)
+ * are allocated in local memory in each of multiple processes, with one
+ * driving leader Tuplesortstate.
+ */
+struct Sharedsort
+{
+ /* mutex protects all fields prior to tapes */
+ slock_t mutex;
+
+ /*
+ * currentWorker generates ordinal identifier numbers for parallel sort
+ * workers. These start from 0, and are always gapless.
+ *
+ * Workers increment workersFinished to indicate having finished. If this
+ * is equal to state.nLaunched within the leader, leader is ready to merge
+ * worker runs.
+ *
+ * leaderHasTapeSet indicates if leader has taken ownership of every
+ * worker's BufFile, which allows every worker's wait to end.
+ */
+ int currentWorker;
+ ConditionVariable workersFinishedCv;
+ int workersFinished;
+ ConditionVariable leaderHasTapeSetCv;
+ bool leaderHasTapeSet;
+
+ /* Size of tapes flexible array */
+ int nTapes;
+
+ /* Temporary file space. */
+ SharedFileSet fileset;
+
+ /*
+ * Tapes array used by workers to report back information needed by the
+ * leader to unify their tapes
+ */
+ TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
* Is the given tuple allocated from the slab memory arena?
*/
#define IS_SLAB_SLOT(state, tuple) \
@@ -465,6 +550,9 @@ struct Tuplesortstate
#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
+#define SERIAL(state) ((state)->shared == NULL)
+#define WORKER(state) ((state)->shared && (state)->worker != -1)
+#define LEADER(state) ((state)->shared && (state)->worker == -1)
/*
* NOTES about on-tape representation of tuples:
@@ -521,10 +609,13 @@ struct Tuplesortstate
} while(0)
-static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
+static Tuplesortstate *tuplesort_begin_common(int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess);
static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
static bool consider_abort_common(Tuplesortstate *state);
-static void inittapes(Tuplesortstate *state);
+static void inittapes(Tuplesortstate *state, bool mergeruns);
+static void inittapestate(Tuplesortstate *state, int maxTapes);
static void selectnewtape(Tuplesortstate *state);
static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void mergeruns(Tuplesortstate *state);
@@ -572,6 +663,11 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
+static int worker_get_identifier(Tuplesortstate *state);
+static void worker_freeze_result_tape(Tuplesortstate *state);
+static void worker_wait(Tuplesortstate *state);
+static void worker_nomergeruns(Tuplesortstate *state);
+static void leader_takeover_tapes(Tuplesortstate *state);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
@@ -604,13 +700,17 @@ static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
*/
static Tuplesortstate *
-tuplesort_begin_common(int workMem, bool randomAccess)
+tuplesort_begin_common(int workMem, SortCoordinate coordinate,
+ bool randomAccess)
{
Tuplesortstate *state;
MemoryContext sortcontext;
MemoryContext tuplecontext;
MemoryContext oldcontext;
+ if (coordinate && randomAccess)
+ elog(ERROR, "random access not allow under parallel sort");
+
/*
* Create a working memory context for this sort operation. All data
* needed by the sort will live inside this context.
@@ -684,6 +784,34 @@ tuplesort_begin_common(int workMem, bool randomAccess)
state->result_tape = -1; /* flag that result tape has not been formed */
+ /*
+ * Initialize parallel-related state based on coordination information
+ * from caller
+ */
+ if (!coordinate)
+ {
+ /* Serial sort */
+ state->shared = NULL;
+ state->worker = -1;
+ state->nLaunched = -1;
+ }
+ else if (coordinate->isWorker)
+ {
+ /* Parallel worker produces exactly one final run from all input */
+ state->shared = coordinate->sharedsort;
+ state->worker = worker_get_identifier(state);
+ state->nLaunched = -1;
+ }
+ else
+ {
+ /* Parallel leader state only used for final merge */
+ state->shared = coordinate->sharedsort;
+ state->worker = -1;
+ /* Leader requires workers successfully launched (not an estimate) */
+ state->nLaunched = coordinate->nLaunched;
+ Assert(state->nLaunched >= 1);
+ }
+
MemoryContextSwitchTo(oldcontext);
return state;
@@ -694,9 +822,10 @@ tuplesort_begin_heap(TupleDesc tupDesc,
int nkeys, AttrNumber *attNums,
Oid *sortOperators, Oid *sortCollations,
bool *nullsFirstFlags,
- int workMem, bool randomAccess)
+ int workMem, SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
int i;
@@ -705,7 +834,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
AssertArg(nkeys > 0);
#ifdef TRACE_SORT
- if (trace_sort)
+ if (trace_sort && !WORKER(state))
elog(LOG,
"begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
nkeys, workMem, randomAccess ? 't' : 'f');
@@ -717,7 +846,8 @@ tuplesort_begin_heap(TupleDesc tupDesc,
false, /* no unique check */
nkeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_heap;
state->copytup = copytup_heap;
@@ -764,9 +894,11 @@ tuplesort_begin_heap(TupleDesc tupDesc,
Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,
Relation indexRel,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
ScanKey indexScanKey;
MemoryContext oldcontext;
int i;
@@ -776,7 +908,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
oldcontext = MemoryContextSwitchTo(state->sortcontext);
#ifdef TRACE_SORT
- if (trace_sort)
+ if (trace_sort && !WORKER(state))
elog(LOG,
"begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
RelationGetNumberOfAttributes(indexRel),
@@ -789,7 +921,8 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
false, /* no unique check */
state->nKeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_cluster;
state->copytup = copytup_cluster;
@@ -857,9 +990,12 @@ Tuplesortstate *
tuplesort_begin_index_btree(Relation heapRel,
Relation indexRel,
bool enforceUnique,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
ScanKey indexScanKey;
MemoryContext oldcontext;
int i;
@@ -867,7 +1003,7 @@ tuplesort_begin_index_btree(Relation heapRel,
oldcontext = MemoryContextSwitchTo(state->sortcontext);
#ifdef TRACE_SORT
- if (trace_sort)
+ if (trace_sort && !WORKER(state))
elog(LOG,
"begin index sort: unique = %c, workMem = %d, randomAccess = %c",
enforceUnique ? 't' : 'f',
@@ -880,7 +1016,8 @@ tuplesort_begin_index_btree(Relation heapRel,
enforceUnique,
state->nKeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_index_btree;
state->copytup = copytup_index;
@@ -934,15 +1071,18 @@ tuplesort_begin_index_hash(Relation heapRel,
uint32 high_mask,
uint32 low_mask,
uint32 max_buckets,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(state->sortcontext);
#ifdef TRACE_SORT
- if (trace_sort)
+ if (trace_sort && !WORKER(state))
elog(LOG,
"begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
"max_buckets = 0x%x, workMem = %d, randomAccess = %c",
@@ -973,10 +1113,11 @@ tuplesort_begin_index_hash(Relation heapRel,
Tuplesortstate *
tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
- bool nullsFirstFlag,
- int workMem, bool randomAccess)
+ bool nullsFirstFlag, int workMem,
+ SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
int16 typlen;
bool typbyval;
@@ -984,7 +1125,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
oldcontext = MemoryContextSwitchTo(state->sortcontext);
#ifdef TRACE_SORT
- if (trace_sort)
+ if (trace_sort && !WORKER(state))
elog(LOG,
"begin datum sort: workMem = %d, randomAccess = %c",
workMem, randomAccess ? 't' : 'f');
@@ -996,7 +1137,8 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
false, /* no unique check */
1,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_datum;
state->copytup = copytup_datum;
@@ -1054,7 +1196,9 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
* delayed calls at the moment.)
*
* This is a hint only. The tuplesort may still return more tuples than
- * requested.
+ * requested. Parallel leader tuplesorts will always ignore the hint,
+ * though its likely that such cases should not have used parallelism in the
+ * first place.
*/
void
tuplesort_set_bound(Tuplesortstate *state, int64 bound)
@@ -1063,6 +1207,7 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Assert(state->status == TSS_INITIAL);
Assert(state->memtupcount == 0);
Assert(!state->bounded);
+ Assert(!WORKER(state));
#ifdef DEBUG_BOUNDED_SORT
/* Honor GUC setting that disables the feature (for easy testing) */
@@ -1070,6 +1215,10 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
return;
#endif
+ /* Parallel leader ignores hint */
+ if (LEADER(state))
+ return;
+
/* We want to be able to compute bound * 2, so limit the setting */
if (bound > (int64) (INT_MAX / 2))
return;
@@ -1122,13 +1271,24 @@ tuplesort_end(Tuplesortstate *state)
* for two #ifdef TRACE_SORT sections.
*/
if (state->tapeset)
+ {
+ /* Wait for leader to finish if not leader-as-worker */
+ if (WORKER(state))
+ worker_wait(state);
+ /* Leader now has ownership of our tapeset */
LogicalTapeSetClose(state->tapeset);
+ }
+ /*
+ * Only report spaceUsed via trace_sort for leader (don't double count
+ * disk blocks used). Probes still receive per-Tuplesortstate traces.
+ */
#ifdef TRACE_SORT
- if (trace_sort)
+ if (trace_sort && !WORKER(state))
{
if (state->tapeset)
- elog(LOG, "external sort ended, %ld disk blocks used: %s",
+ elog(LOG, "%s ended, %ld disk blocks used: %s",
+ SERIAL(state) ? "external sort" : "parallel external sort",
spaceUsed, pg_rusage_show(&state->ru_start));
else
elog(LOG, "internal sort ended, %ld KB used: %s",
@@ -1503,6 +1663,8 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
static void
puttuple_common(Tuplesortstate *state, SortTuple *tuple)
{
+ Assert(!LEADER(state));
+
switch (state->status)
{
case TSS_INITIAL:
@@ -1556,7 +1718,7 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
/*
* Nope; time to switch to tape-based operation.
*/
- inittapes(state);
+ inittapes(state, true);
/*
* Dump all tuples.
@@ -1659,8 +1821,8 @@ tuplesort_performsort(Tuplesortstate *state)
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "performsort starting: %s",
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "performsort of %d starting: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
#endif
switch (state->status)
@@ -1669,14 +1831,39 @@ tuplesort_performsort(Tuplesortstate *state)
/*
* We were able to accumulate all the tuples within the allowed
- * amount of memory. Just qsort 'em and we're done.
+ * amount of memory, or leader to take over worker tapes
*/
- tuplesort_sort_memtuples(state);
+ if (SERIAL(state))
+ {
+ /* Just qsort 'em and we're done */
+ tuplesort_sort_memtuples(state);
+ state->status = TSS_SORTEDINMEM;
+ }
+ else if (WORKER(state))
+ {
+ /*
+ * Parallel workers must still dump out tuples to tape. No
+ * merge is required to produce single output run, though.
+ */
+ inittapes(state, false);
+ dumptuples(state, true);
+ worker_nomergeruns(state);
+ state->status = TSS_SORTEDONTAPE;
+ }
+ else
+ {
+ /*
+ * Leader will take over worker tapes and merge worker runs.
+ * Note that mergeruns sets the correct state->status.
+ */
+ leader_takeover_tapes(state);
+ mergeruns(state);
+ }
state->current = 0;
state->eof_reached = false;
+ state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = false;
- state->status = TSS_SORTEDINMEM;
break;
case TSS_BOUNDED:
@@ -1699,8 +1886,8 @@ tuplesort_performsort(Tuplesortstate *state)
/*
* Finish tape-based sort. First, flush all tuples remaining in
* memory out to tape; then merge until we have a single remaining
- * run (or, if !randomAccess, one run per tape). Note that
- * mergeruns sets the correct state->status.
+ * run (or, if !randomAccess and !WORKER(), one run per tape).
+ * Note that mergeruns sets the correct state->status.
*/
dumptuples(state, true);
mergeruns(state);
@@ -1723,8 +1910,8 @@ tuplesort_performsort(Tuplesortstate *state)
state->activeTapes,
pg_rusage_show(&state->ru_start));
else
- elog(LOG, "performsort done: %s",
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "performsort of %d done: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
}
#endif
@@ -1745,6 +1932,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
unsigned int tuplen;
size_t nmoved;
+ Assert(!WORKER(state));
+
switch (state->status)
{
case TSS_SORTEDINMEM:
@@ -2128,6 +2317,7 @@ tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
*/
Assert(forward);
Assert(ntuples >= 0);
+ Assert(!WORKER(state));
switch (state->status)
{
@@ -2225,54 +2415,42 @@ tuplesort_merge_order(int64 allowedMem)
* This is called only if we have found we don't have room to sort in memory.
*/
static void
-inittapes(Tuplesortstate *state)
+inittapes(Tuplesortstate *state, bool mergeruns)
{
int maxTapes,
j;
- int64 tapeSpace;
- /* Compute number of tapes to use: merge order plus 1 */
- maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+ Assert(!LEADER(state));
- state->maxTapes = maxTapes;
- state->tapeRange = maxTapes - 1;
+ if (mergeruns)
+ {
+ /* Compute number of tapes to use: merge order plus 1 */
+ maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+ }
+ else
+ {
+ /* Workers can sometimes produce single run, output without merge */
+ Assert(WORKER(state));
+ maxTapes = MINORDER + 1;
+ }
+ /*
+ * For the serial case, report that an external sort is required. Parallel
+ * workers must always dump out tuples, though, so this information is not
+ * reported there; it's almost redundant.
+ */
#ifdef TRACE_SORT
- if (trace_sort)
+ if (trace_sort && SERIAL(state))
elog(LOG, "switching to external sort with %d tapes: %s",
maxTapes, pg_rusage_show(&state->ru_start));
#endif
- /*
- * Decrease availMem to reflect the space needed for tape buffers, when
- * writing the initial runs; but don't decrease it to the point that we
- * have no room for tuples. (That case is only likely to occur if sorting
- * pass-by-value Datums; in all other scenarios the memtuples[] array is
- * unlikely to occupy more than half of allowedMem. In the pass-by-value
- * case it's not important to account for tuple space, so we don't care if
- * LACKMEM becomes inaccurate.)
- */
- tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
-
- if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
- USEMEM(state, tapeSpace);
-
- /*
- * Make sure that the temp file(s) underlying the tape set are created in
- * suitable temp tablespaces.
- */
- PrepareTempTablespaces();
-
- /*
- * Create the tape set and allocate the per-tape data arrays.
- */
- state->tapeset = LogicalTapeSetCreate(maxTapes);
-
- state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+ /* Create the tape set and allocate the per-tape data arrays */
+ inittapestate(state, maxTapes);
+ state->tapeset =
+ LogicalTapeSetCreate(maxTapes, NULL,
+ state->shared ? &state->shared->fileset : NULL,
+ state->worker);
state->currentRun = 0;
@@ -2296,6 +2474,49 @@ inittapes(Tuplesortstate *state)
}
/*
+ * inittapestate - initialize generic tape management state
+ */
+static void
+inittapestate(Tuplesortstate *state, int maxTapes)
+{
+ int64 tapeSpace;
+
+ /*
+ * Decrease availMem to reflect the space needed for tape buffers; but
+ * don't decrease it to the point that we have no room for tuples. (That
+ * case is only likely to occur if sorting pass-by-value Datums; in all
+ * other scenarios the memtuples[] array is unlikely to occupy more than
+ * half of allowedMem. In the pass-by-value case it's not important to
+ * account for tuple space, so we don't care if LACKMEM becomes
+ * inaccurate.)
+ */
+ tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
+
+ if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
+ USEMEM(state, tapeSpace);
+
+ /*
+ * Make sure that the temp file(s) underlying the tape set are created in
+ * suitable temp tablespaces. In the parallel sort case, the choice of
+ * tablespace is a function of ordinal worker identifier, which ensures
+ * both determinism, and that temp tablespaces are used in a balanced
+ * fashion.
+ */
+ PrepareTempTablespaces();
+
+ state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
+ state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+
+ /* Record # of tapes allocated (for duration of sort) */
+ state->maxTapes = maxTapes;
+ /* Record maximum # of tapes usable as inputs when merging */
+ state->tapeRange = maxTapes - 1;
+}
+
+/*
* selectnewtape -- select new tape for new initial run.
*
* This is called after finishing a run when we know another run
@@ -2477,8 +2698,8 @@ mergeruns(Tuplesortstate *state)
*/
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
- (state->availMem) / 1024, numInputTapes);
+ elog(LOG, "%d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+ state->worker, state->availMem / 1024, numInputTapes);
#endif
state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
@@ -2496,7 +2717,7 @@ mergeruns(Tuplesortstate *state)
* pass remains. If we don't have to produce a materialized sorted
* tape, we can stop at this point and do the final merge on-the-fly.
*/
- if (!state->randomAccess)
+ if (!state->randomAccess && !WORKER(state))
{
bool allOneRun = true;
@@ -2581,7 +2802,10 @@ mergeruns(Tuplesortstate *state)
* a waste of cycles anyway...
*/
state->result_tape = state->tp_tapenum[state->tapeRange];
- LogicalTapeFreeze(state->tapeset, state->result_tape);
+ if (!WORKER(state))
+ LogicalTapeFreeze(state->tapeset, state->result_tape);
+ else
+ worker_freeze_result_tape(state);
state->status = TSS_SORTEDONTAPE;
/* Release the read buffers of all the other tapes, by rewinding them. */
@@ -2650,8 +2874,8 @@ mergeonerun(Tuplesortstate *state)
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "%d finished %d-way merge step: %s", state->worker,
+ state->activeTapes, pg_rusage_show(&state->ru_start));
#endif
}
@@ -2785,8 +3009,9 @@ dumptuples(Tuplesortstate *state, bool alltuples)
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "starting quicksort of run %d: %s",
- state->currentRun, pg_rusage_show(&state->ru_start));
+ elog(LOG, "starting quicksort of run %d/%d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
#endif
/*
@@ -2797,8 +3022,9 @@ dumptuples(Tuplesortstate *state, bool alltuples)
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished quicksort of run %d: %s",
- state->currentRun, pg_rusage_show(&state->ru_start));
+ elog(LOG, "finished quicksort of run %d/%d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
#endif
memtupwrite = state->memtupcount;
@@ -2824,8 +3050,8 @@ dumptuples(Tuplesortstate *state, bool alltuples)
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished writing run %d to tape %d: %s",
- state->currentRun, state->destTape,
+ elog(LOG, "finished writing run %d/%d to tape %d: %s",
+ state->worker, state->currentRun, state->destTape,
pg_rusage_show(&state->ru_start));
#endif
@@ -2936,6 +3162,9 @@ tuplesort_restorepos(Tuplesortstate *state)
*
* This can be called after tuplesort_performsort() finishes to obtain
* printable summary information about how the sort was performed.
+ *
+ * This currently reports nothing about the number of parallel workers
+ * used.
*/
void
tuplesort_get_stats(Tuplesortstate *state,
@@ -3037,6 +3266,7 @@ make_bounded_heap(Tuplesortstate *state)
Assert(state->status == TSS_INITIAL);
Assert(state->bounded);
Assert(tupcount >= state->bound);
+ Assert(SERIAL(state));
/* Reverse sort direction so largest entry will be at root */
reversedirection(state);
@@ -3084,6 +3314,7 @@ sort_bounded_heap(Tuplesortstate *state)
Assert(state->status == TSS_BOUNDED);
Assert(state->bounded);
Assert(tupcount == state->bound);
+ Assert(SERIAL(state));
/*
* We can unheapify in place because each delete-top call will remove the
@@ -3114,10 +3345,13 @@ sort_bounded_heap(Tuplesortstate *state)
* Sort all memtuples using specialized qsort() routines.
*
* Quicksort is used for small in-memory sorts, and external sort runs.
+ * Parallel workers always use quicksort, however.
*/
static void
tuplesort_sort_memtuples(Tuplesortstate *state)
{
+ Assert(!LEADER(state));
+
if (state->memtupcount > 1)
{
/* Can we use the single-key sort function? */
@@ -4158,6 +4392,325 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
}
/*
+ * Parallel sort routines
+ */
+
+/*
+ * tuplesort_estimate_shared - estimate required shared memory allocation
+ *
+ * nWorkers is an estimate of the number of workers (it's the number that
+ * will be requested).
+ */
+Size
+tuplesort_estimate_shared(int nWorkers)
+{
+ Size tapesSize;
+
+ Assert(nWorkers > 0);
+
+ /* Make sure that BufFile shared state is MAXALIGN'd */
+ tapesSize = mul_size(sizeof(TapeShare), nWorkers);
+ tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
+
+ return tapesSize;
+}
+
+/*
+ * tuplesort_initialize_shared - initialize shared tuplesort state
+ *
+ * Must be called from leader process before workers are launched, to
+ * establish state needed up-front for worker tuplesortstates. nWorkers
+ * should match the argument passed to tuplesort_estimate_shared().
+ */
+void
+tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
+{
+ int i;
+
+ Assert(nWorkers > 0);
+
+ /* Initialize state */
+ SpinLockInit(&shared->mutex);
+ shared->nTapes = nWorkers;
+ shared->currentWorker = 0;
+ ConditionVariableInit(&shared->workersFinishedCv);
+ shared->workersFinished = 0;
+ ConditionVariableInit(&shared->leaderHasTapeSetCv);
+ shared->leaderHasTapeSet = false;
+
+ /* Everything for each worker initially zeroed */
+ for (i = 0; i < nWorkers; i++)
+ {
+ shared->tapes[i].firstblocknumber = 0L;
+ shared->tapes[i].buffilesize = 0;
+ }
+
+ /* Initialize SharedFileSet */
+ SharedFileSetInit(&shared->fileset, seg);
+}
+
+/*
+ * tuplesort_attach_shared - attach to shared tuplesort state
+ *
+ * Must be called by all workers.
+ */
+void
+tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
+{
+ /* Attach to SharedFileSet */
+ SharedFileSetAttach(&shared->fileset, seg);
+}
+
+/*
+ * tuplesort_leader_wait - have leader wait on workers to finish
+ *
+ * Leader may want to collect a little supplementary information from all
+ * workers, which is why caller has a choice of exact wait point (the wait
+ * point is broken out to callers via this function).
+ */
+void
+tuplesort_leader_wait(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+
+ Assert(LEADER(state));
+
+ /*
+ * Since leader's process typically participates as a worker itself, the
+ * wait should not be long. There may be no wait at all, so avoid
+ * preparing to sleep here.
+ */
+ for (;;)
+ {
+ bool workersDone;
+
+ /*
+ * Break when workers are done. Note that this generally includes the
+ * leader-as-worker state, which must already be done by here since it
+ * is managed from within the leader process.
+ */
+ SpinLockAcquire(&shared->mutex);
+ Assert(state->nLaunched >= shared->workersFinished);
+ workersDone = (state->nLaunched == shared->workersFinished);
+ SpinLockRelease(&shared->mutex);
+
+ if (workersDone)
+ break;
+
+ /* Wait for workers */
+ ConditionVariableSleep(&shared->workersFinishedCv,
+ WAIT_EVENT_PARALLEL_TUPLESORT_LEADER);
+ }
+
+ ConditionVariableCancelSleep();
+}
+
+/*
+ * worker_get_identifier - Assign and return ordinal identifier for worker
+ *
+ * The order in which these are assigned is not well defined, and should not
+ * matter; worker numbers across parallel sort participants need only be
+ * distinct and gapless. Assigning an identifier implies that logtape.c
+ * should be able to subsequently discover temp files for the worker on disk
+ * (subject to tuplesort contract), even when no tuples are passed to a
+ * worker Tuplesortstate.
+ *
+ * Note that the identifiers assigned from here have no relation to
+ * ParallelWorkerNumber number, to avoid making any assumption about
+ * caller's requirements. However, we do follow the ParallelWorkerNumber
+ * convention of representing a non-worker with worker number -1. This
+ * includes the leader, as well as serial Tuplesort processes. Worker
+ * states are numbered from 0 to suit the purposes of temp file management,
+ * which expects these conventions.
+ */
+static int
+worker_get_identifier(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int worker;
+
+ Assert(WORKER(state));
+
+ SpinLockAcquire(&shared->mutex);
+ worker = shared->currentWorker++;
+ SpinLockRelease(&shared->mutex);
+
+ return worker;
+}
+
+/*
+ * worker_freeze_result_tape - freeze worker's result tape for leader
+ *
+ * This is called by workers just after the result tape has been determined,
+ * instead of calling LogicalTapeFreeze() directly. They do so because
+ * workers require a few additional steps over similar serial
+ * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
+ * steps are around freeing now unneeded resources, and notifying leader
+ * that worker's input run is available for its merge.
+ *
+ * There should only be one final output run for each worker, which consists
+ * of all tuples that were originally input into worker.
+ */
+static void
+worker_freeze_result_tape(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ TapeShare output;
+
+ Assert(WORKER(state));
+ Assert(state->result_tape != -1);
+ Assert(state->memtupcount == 0);
+
+ /*
+ * Free most remaining memory, in case caller is sensitive to our holding
+ * on to it. memtuples may not be a tiny merge heap at this point.
+ * (Caller will probably completely shut down worker Tuplesortstates as
+ * soon as workers finish, though, so this is typically not critical.)
+ */
+ pfree(state->memtuples);
+ /* Be tidy */
+ state->memtuples = NULL;
+ state->memtupsize = 0;
+
+ /*
+ * Parallel worker requires result tape metadata, which is to be stored in
+ * shared memory for leader
+ */
+ output = LogicalTapeFreeze(state->tapeset, state->result_tape);
+
+ /* Store properties of output tape, and update finished worker count */
+ SpinLockAcquire(&shared->mutex);
+ shared->tapes[state->worker] = output;
+ shared->workersFinished++;
+ SpinLockRelease(&shared->mutex);
+
+ /* Indicate to leader that this worker is finished */
+ ConditionVariableSignal(&shared->workersFinishedCv);
+}
+
+/*
+ * worker_wait - Have worker wait on leader to begin its merge
+ */
+static void
+worker_wait(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+
+ Assert(WORKER(state));
+
+ /* This is a no-op within leader's own backend */
+ if (!IsParallelWorker())
+ return;
+
+ /* Prepare to sleep */
+ ConditionVariablePrepareToSleep(&shared->leaderHasTapeSetCv);
+
+ for (;;)
+ {
+ bool leaderHasTapeSet;
+
+ SpinLockAcquire(&shared->mutex);
+ leaderHasTapeSet = shared->leaderHasTapeSet;
+ SpinLockRelease(&shared->mutex);
+
+ if (leaderHasTapeSet)
+ break;
+
+ ConditionVariableSleep(&shared->leaderHasTapeSetCv,
+ WAIT_EVENT_PARALLEL_TUPLESORT_WORKER);
+ }
+
+ ConditionVariableCancelSleep();
+}
+
+/*
+ * worker_nomergeruns - dump memtuples in worker, without merging
+ *
+ * This called as an alternative to mergeruns() with a worker when no
+ * merging is required.
+ */
+static void
+worker_nomergeruns(Tuplesortstate *state)
+{
+ Assert(WORKER(state));
+ Assert(state->result_tape == -1);
+
+ state->result_tape = state->tp_tapenum[state->destTape];
+ worker_freeze_result_tape(state);
+}
+
+/*
+ * leader_takeover_tapes - create tapeset for leader from worker tapes
+ *
+ * So far, leader Tuplesortstate has performed no actual sorting. By now,
+ * all sorting has occurred in workers, which tuplesort caller already
+ * waited on using tuplesort_leader_wait.
+ *
+ * When this returns, workers are released, and leader process is left in a
+ * state that is virtually indistinguishable from it having generated runs
+ * as a serial external sort might have.
+ */
+static void
+leader_takeover_tapes(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int nLaunched = state->nLaunched;
+ int j;
+
+ Assert(LEADER(state));
+ Assert(nLaunched >= 1);
+ Assert(nLaunched == shared->workersFinished);
+
+ /*
+ * Create the tapeset from worker tapes, including a leader-owned tape at
+ * the end. Parallel workers are far more expensive than logical tapes,
+ * so the number of tapes allocated here should never be excessive. We
+ * still have a leader tape to be consistent with the serial case,
+ * though recognize that it's not useful.
+ */
+ inittapestate(state, nLaunched + 1);
+ state->tapeset = LogicalTapeSetCreate(nLaunched + 1, shared->tapes,
+ &shared->fileset, state->worker);
+
+ /* Release workers, since their temp files are now held by leader */
+ SpinLockAcquire(&shared->mutex);
+ shared->leaderHasTapeSet = true;
+ SpinLockRelease(&shared->mutex);
+
+ /* Broadcast to workers that they may go away */
+ ConditionVariableBroadcast(&shared->leaderHasTapeSetCv);
+
+ /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+ state->currentRun = nLaunched;
+
+ /*
+ * Initialize variables of Algorithm D to be consistent with runs from
+ * workers having been generated in the leader.
+ *
+ * There will always be exactly 1 run per worker, and exactly one input
+ * tape per run, because workers always output exactly 1 run, even when
+ * there were no input tuples for workers to sort.
+ */
+ for (j = 0; j < state->maxTapes; j++)
+ {
+ /* One real run; no dummy runs for worker tapes */
+ state->tp_fib[j] = 1;
+ state->tp_runs[j] = 1;
+ state->tp_dummy[j] = 0;
+ state->tp_tapenum[j] = j;
+ }
+ /* Leader tape gets one dummy run, and no real runs */
+ state->tp_fib[state->tapeRange] = 0;
+ state->tp_runs[state->tapeRange] = 0;
+ state->tp_dummy[state->tapeRange] = 1;
+
+ state->Level = 1;
+ state->destTape = 0;
+
+ state->status = TSS_BUILDRUNS;
+}
+
+/*
* Convenience routine to free a tuple previously loaded into sort memory
*/
static void
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 2d4c36d..1398613 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -20,7 +20,9 @@
#include "access/xlogreader.h"
#include "catalog/pg_index.h"
#include "lib/stringinfo.h"
+#include "nodes/execnodes.h"
#include "storage/bufmgr.h"
+#include "storage/shm_toc.h"
/* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */
typedef uint16 BTCycleId;
@@ -547,13 +549,8 @@ extern bool btvalidate(Oid opclassoid);
/*
* prototypes for functions in nbtsort.c
*/
-typedef struct BTSpool BTSpool; /* opaque type known only within nbtsort.c */
-
-extern BTSpool *_bt_spoolinit(Relation heap, Relation index,
- bool isunique, bool isdead);
-extern void _bt_spooldestroy(BTSpool *btspool);
-extern void _bt_spool(BTSpool *btspool, ItemPointer self,
- Datum *values, bool *isnull);
-extern void _bt_leafbuild(BTSpool *btspool, BTSpool *spool2);
+extern IndexBuildResult *_bt_dobuild(Relation heap, Relation index,
+ IndexInfo *indexInfo);
+extern void _bt_worker_main(dsm_segment *seg, shm_toc *toc);
#endif /* NBTREE_H */
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index e3e0cec..82691b6 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -51,7 +51,7 @@ extern bool InitializingParallelWorker;
#define IsParallelWorker() (ParallelWorkerNumber >= 0)
-extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers);
+extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers, bool serializable_okay);
extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *pcxt);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 147f862..fbdf8dd 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -39,6 +39,7 @@ typedef struct ParallelHeapScanDescData
BlockNumber phs_startblock; /* starting block number */
pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to
* workers so far. */
+ bool phs_snapshot_any; /* SnapshotAny, not phs_snapshot_data? */
char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
} ParallelHeapScanDescData;
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 1d4ec09..366747c 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -100,7 +100,8 @@ extern double IndexBuildHeapScan(Relation heapRelation,
IndexInfo *indexInfo,
bool allow_sync,
IndexBuildCallback callback,
- void *callback_state);
+ void *callback_state,
+ HeapScanDesc scan);
extern double IndexBuildHeapRangeScan(Relation heapRelation,
Relation indexRelation,
IndexInfo *indexInfo,
@@ -109,7 +110,8 @@ extern double IndexBuildHeapRangeScan(Relation heapRelation,
BlockNumber start_blockno,
BlockNumber end_blockno,
IndexBuildCallback callback,
- void *callback_state);
+ void *callback_state,
+ HeapScanDesc scan);
extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot);
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 3950054..adb6168 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -241,6 +241,7 @@ extern bool enableFsync;
extern bool allowSystemTableMods;
extern PGDLLIMPORT int work_mem;
extern PGDLLIMPORT int maintenance_work_mem;
+extern PGDLLIMPORT int max_parallel_workers_maintenance;
extern int VacuumCostPageHit;
extern int VacuumCostPageMiss;
diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h
index 2801bfd..1603b67 100644
--- a/src/include/optimizer/planner.h
+++ b/src/include/optimizer/planner.h
@@ -61,4 +61,6 @@ extern List *get_partitioned_child_rels(PlannerInfo *root, Index rti);
extern List *get_partitioned_child_rels_for_join(PlannerInfo *root,
Relids join_relids);
+extern int plan_create_index_workers(Oid tableOid, Oid indexOid);
+
#endif /* PLANNER_H */
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 089b7c3..97f9fb5 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -811,6 +811,8 @@ typedef enum
WAIT_EVENT_MQ_SEND,
WAIT_EVENT_PARALLEL_FINISH,
WAIT_EVENT_PARALLEL_BITMAP_SCAN,
+ WAIT_EVENT_PARALLEL_TUPLESORT_WORKER,
+ WAIT_EVENT_PARALLEL_TUPLESORT_LEADER,
WAIT_EVENT_PROCARRAY_GROUP_UPDATE,
WAIT_EVENT_CLOG_GROUP_UPDATE,
WAIT_EVENT_REPLICATION_ORIGIN_DROP,
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 09ff2ef..5ffb04c 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -43,6 +43,9 @@ extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size);
extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence);
extern void BufFileTell(BufFile *file, int *fileno, off_t *offset);
extern int BufFileSeekBlock(BufFile *file, long blknum);
+extern off_t BufFileSize(BufFile *file);
+extern BufFile *BufFileView(BufFile *source);
+extern long BufFileViewAppend(BufFile *file, BufFile *source);
extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
extern void BufFileExportShared(BufFile *file);
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 998797d..58fbfc5 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -78,6 +78,7 @@ extern char *FilePathName(File file);
extern int FileGetRawDesc(File file);
extern int FileGetRawFlags(File file);
extern mode_t FileGetRawMode(File file);
+extern off_t FileGetSize(File file);
/* Operations used for sharing named temporary files */
extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure);
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index a1e869b..e3c1d07 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -16,15 +16,36 @@
#ifndef LOGTAPE_H
#define LOGTAPE_H
+#include "storage/sharedfileset.h"
+
/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
typedef struct LogicalTapeSet LogicalTapeSet;
/*
+ * TapeShare is used for tape unification. It is minimal metadata about the
+ * on-disk structure of a tape.
+ */
+typedef struct TapeShare
+{
+ /*
+ * firstblocknumber is first block to read within tape's underlying
+ * storage during unification. (Should be -1 for leader tape, and is
+ * typically 0 for worker tapes.)
+ *
+ * buffilesize is the size of a worker's underlying BufFile within the
+ * worker. (Should be 0 for leader tape.)
+ */
+ long firstblocknumber;
+ off_t buffilesize;
+} TapeShare;
+
+/*
* prototypes for functions in logtape.c
*/
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes);
+extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, TapeShare *tapes,
+ SharedFileSet *fileset, int worker);
extern void LogicalTapeSetClose(LogicalTapeSet *lts);
extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
@@ -34,7 +55,7 @@ extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
size_t buffer_size);
extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
+extern TapeShare LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size);
extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index b6b8c8e..b8d3be0 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -8,7 +8,9 @@
* if necessary). It works efficiently for both small and large amounts
* of data. Small amounts are sorted in-memory using qsort(). Large
* amounts are sorted using temporary files and a standard external sort
- * algorithm.
+ * algorithm. Parallel sorts use a variant of this external sort
+ * algorithm, and are typically only used for large amounts of data. Currently
+ * parallel sorts not support for random access to the sort result.
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -23,13 +25,37 @@
#include "access/itup.h"
#include "executor/tuptable.h"
#include "fmgr.h"
+#include "storage/dsm.h"
#include "utils/relcache.h"
-/* Tuplesortstate is an opaque type whose details are not known outside
- * tuplesort.c.
+/*
+ * Tuplesortstate and Sharedsort are opaque types whose details are not
+ * known outside tuplesort.c.
*/
typedef struct Tuplesortstate Tuplesortstate;
+typedef struct Sharedsort Sharedsort;
+
+/*
+ * Tuplesort parallel coordination state. Caller initializes everything.
+ * Serial sorts should pass NULL coordinate argument. See usage notes below.
+ */
+typedef struct SortCoordinateData
+{
+ /* Worker process? If not, must be leader. */
+ bool isWorker;
+
+ /*
+ * Leader-process-passed number of workers known launched (workers set
+ * this to -1). This typically includes the leader-as-worker process.
+ */
+ int nLaunched;
+
+ /* Private opaque state in shared memory */
+ Sharedsort *sharedsort;
+} SortCoordinateData;
+
+typedef struct SortCoordinateData *SortCoordinate;
/*
* Data structures for reporting sort statistics. Note that
@@ -84,30 +110,120 @@ typedef struct TuplesortInstrumentation
*
* The "index_hash" API is similar to index_btree, but the tuples are
* actually sorted by their hash codes not the raw data.
+ *
+ *
+ * Parallel sort callers are required to coordinate multiple tuplesort
+ * states in a leader process, and one or more worker processes. The
+ * leader process must launch workers, and have each perform an independent
+ * "partial" tuplesort, typically fed by the parallel heap interface. The
+ * leader later produces the final output (internally, it merges runs output
+ * by workers).
+ *
+ * Note that callers may use the leader process to sort runs, as if it was
+ * an independent worker process (prior to the process performing a leader
+ * sort to produce the final sorted output). Doing so only requires a
+ * second "partial" tuplesort within the leader process, initialized like
+ * any worker process.
+ *
+ * Callers must do the following to perform a sort in parallel using
+ * multiple worker processes:
+ *
+ * 1. Request tuplesort-private shared memory for n workers. Use
+ * tuplesort_estimate_shared() to get the required size.
+ * 2. Have leader process initialize allocated shared memory using
+ * tuplesort_initialize_shared(). This assigns a unique identifier for
+ * the sort. See BufFileGetHandle() for notes on resource management and
+ * the shared memory segment that is passed through from this point.
+ * 3. Initialize a "coordinate" argument (serial case just passes NULL
+ * here), within both the leader process, and for each worker process.
+ * Note that this has a pointer to the shared tuplesort-private
+ * structure.
+ * 4. Begin a tuplesort using some appropriate tuplesort_begin* routine,
+ * passing a "coordinate" argument, within each worker. The workMem
+ * argument need not be identical. All other arguments to the
+ * routine should be identical across workers and the leader.
+ * The workMem argument should be at least 64 (64KB) in all cases.
+ * 5. Feed tuples to each worker, and call tuplesort_performsort() for each
+ * when input is exhausted.
+ * 6. Optionally, workers may aggregate information/statistics about the
+ * heap scan someplace; caller must handle all those details. Then, call
+ * tuplesort_end() in each worker process (but not for any leader-as-worker
+ * Tuplesortstate). Worker processes can generally shut down as soon as
+ * underlying temp file state is handed over to the leader.
+ * 7. Begin a tuplesort in the leader using the same tuplesort_begin* routine,
+ * passing a leader-appropriate "coordinate" argument. The leader must now
+ * wait for workers to finish; have the leader process wait for workers by
+ * calling tuplesort_leader_wait(). tuplesort_leader_wait() waits until
+ * workers finish, and no longer. Note that the leader requires the
+ * number of workers actually launched now, so this need only happen after
+ * caller has established that number (after step 4). If there was a
+ * leader-as-worker Tuplesortstate, call tuplesort_end() with it now.
+ * 8. Call tuplesort_performsort() in leader. When this returns, sorting
+ * has completed, or leader will do final on-the-fly merge. Consume
+ * output using the appropriate tuplesort_get* routine as required.
+ * 9. Leader caller may now optionally combine any data that may have been
+ * aggregated by workers in step 6. (e.g., for custom instrumentation.)
+ * 10. Call tuplesort_end() in leader.
+ *
+ * This division of labor assumes nothing about how input tuples are produced,
+ * but does require that caller combine the state of multiple tuplesorts for
+ * any purpose other than producing the final output. For example, callers
+ * must consider that tuplesort_get_stats() reports on only one worker's role
+ * in a sort (or the leader's role), and not statistics for the sort as a
+ * whole.
+ *
+ * Note that there is an assumption that temp_tablespaces GUC matches across
+ * processes. Typically, this happens automatically because caller uses
+ * parallel infrastructure. Note also that only a very small amount of
+ * memory will be allocated prior to the leader state first consuming input,
+ * and that workers will free the vast majority of their memory upon
+ * reaching a quiescent state. Callers can rely on this to arrange for
+ * memory to be consumed in a way that respects a workMem-style budget
+ * across an entire sort operation, and not just within one backend.
+ *
+ * Callers are also responsible for parallel safety in general. However, they
+ * can at least rely on there being no parallel safety hazards within
+ * tuplesort, because tuplesort conceptualizes the sort as several independent
+ * sorts whose results are combined. Since, in general, the behavior of sort
+ * operators is immutable, caller need only worry about the parallel safety of
+ * whatever the process is through which input tuples are generated
+ * (typically, caller uses a parallel heap scan). Furthermore, note that
+ * callers must be careful in providing a perfectly consistent tuple
+ * descriptor across processes. This can be more subtle than it appears,
+ * since for example the RECORD pseudo-type uses transient typmods that are
+ * only meaningful within a single backend (see tqueue infrastructure to
+ * support transient record types). For the cluster, index_btree and
+ * index_hash APIs, callers automatically avoid problems by opening up the
+ * target relation from within worker processes, since the relation's
+ * cataloged attributes are necessarily not of transient types.
*/
extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc,
int nkeys, AttrNumber *attNums,
Oid *sortOperators, Oid *sortCollations,
bool *nullsFirstFlags,
- int workMem, bool randomAccess);
+ int workMem, SortCoordinate coordinate,
+ bool randomAccess);
extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
- Relation indexRel,
- int workMem, bool randomAccess);
+ Relation indexRel, int workMem,
+ SortCoordinate coordinate, bool randomAccess);
extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel,
Relation indexRel,
bool enforceUnique,
- int workMem, bool randomAccess);
+ int workMem, SortCoordinate coordinate,
+ bool randomAccess);
extern Tuplesortstate *tuplesort_begin_index_hash(Relation heapRel,
Relation indexRel,
uint32 high_mask,
uint32 low_mask,
uint32 max_buckets,
- int workMem, bool randomAccess);
+ int workMem, SortCoordinate coordinate,
+ bool randomAccess);
extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
Oid sortOperator, Oid sortCollation,
bool nullsFirstFlag,
- int workMem, bool randomAccess);
+ int workMem, SortCoordinate coordinate,
+ bool randomAccess);
extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound);
@@ -141,6 +257,12 @@ extern const char *tuplesort_space_type_name(TuplesortSpaceType t);
extern int tuplesort_merge_order(int64 allowedMem);
+extern Size tuplesort_estimate_shared(int nworkers);
+extern void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers,
+ dsm_segment *seg);
+extern void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg);
+extern void tuplesort_leader_wait(Tuplesortstate *state);
+
/*
* These routines may only be called if randomAccess was specified 'true'.
* Likewise, backwards scan in gettuple/getdatum is only allowed if
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index c440c7e..307e58a 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -65,6 +65,10 @@ CREATE FUNCTION do_analyze() RETURNS VOID VOLATILE LANGUAGE SQL
AS 'ANALYZE pg_am';
CREATE FUNCTION wrap_do_analyze(c INT) RETURNS INT IMMUTABLE LANGUAGE SQL
AS 'SELECT $1 FROM do_analyze()';
+-- Disable force_parallel_mode briefly, to ensure stable output when it happens
+-- to be in effect. If we did not do this, errors raised would concern running
+-- ANALYZE in parallel mode.
+SET force_parallel_mode = off;
CREATE INDEX ON vaccluster(wrap_do_analyze(i));
INSERT INTO vaccluster VALUES (1), (2);
ANALYZE vaccluster;
@@ -112,6 +116,7 @@ ANALYZE vactst, does_not_exist, vacparted;
ERROR: relation "does_not_exist" does not exist
ANALYZE vactst (i), vacparted (does_not_exist);
ERROR: column "does_not_exist" of relation "vacparted" does not exist
+RESET force_parallel_mode;
DROP TABLE vaccluster;
DROP TABLE vactst;
DROP TABLE vacparted;
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 92eaca2..c1089cd 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -50,6 +50,10 @@ CREATE FUNCTION do_analyze() RETURNS VOID VOLATILE LANGUAGE SQL
AS 'ANALYZE pg_am';
CREATE FUNCTION wrap_do_analyze(c INT) RETURNS INT IMMUTABLE LANGUAGE SQL
AS 'SELECT $1 FROM do_analyze()';
+-- Disable force_parallel_mode briefly, to ensure stable output when it happens
+-- to be in effect. If we did not do this, errors raised would concern running
+-- ANALYZE in parallel mode.
+SET force_parallel_mode = off;
CREATE INDEX ON vaccluster(wrap_do_analyze(i));
INSERT INTO vaccluster VALUES (1), (2);
ANALYZE vaccluster;
@@ -89,6 +93,7 @@ ANALYZE vacparted (b), vactst;
ANALYZE vactst, does_not_exist, vacparted;
ANALYZE vactst (i), vacparted (does_not_exist);
+RESET force_parallel_mode;
DROP TABLE vaccluster;
DROP TABLE vactst;
DROP TABLE vacparted;
diff --git a/src/tools/valgrind.supp b/src/tools/valgrind.supp
index af03051..775db0d 100644
--- a/src/tools/valgrind.supp
+++ b/src/tools/valgrind.supp
@@ -120,6 +120,15 @@
fun:RelationMapFinishBootstrap
}
+{
+ logtape_write
+ Memcheck:Param
+ write(buf)
+
+ ...
+ fun:LogicalTape*
+}
+
# gcc on ppc64 can generate a four-byte read to fetch the final "char" fields
# of a FormData_pg_cast. This is valid compiler behavior, because a proper