diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index d157958..68b75b7 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -1324,6 +1324,15 @@ FROM ( { numeric_literal |
+ autovacuum_vacuum_parallel_workers, toast.autovacuum_multixact_freeze_max_age (integer)
+
+
+ This sets the number of worker that can be used to vacuum for this table. If not set, the autovacuum performs with no workers (non-parallel).
+
+
+
+
+
autovacuum_multixact_freeze_table_age, toast.autovacuum_multixact_freeze_table_age (integer)
diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index f5bc87e..b504741 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -30,6 +30,7 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ table_and_columns is:
@@ -142,6 +143,20 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ N
+
+
+ Execute VACUUM in parallel by N
+ background workers. Collecting garbage on table is processed
+ in block-level parallel. For tables with indexes, parallel vacuum assigns each
+ index to each parallel vacuum worker and all garbages on a index are processed
+ by particular parallel vacuum worker. This option can not use with FULL>
+ option.
+
+
+
+
+
DISABLE_PAGE_SKIPPING
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index ec10762..110a41b 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -283,6 +283,14 @@ static relopt_int intRelOpts[] =
},
{
{
+ "autovacuum_vacuum_parallel_workers",
+ "Number of parallel processes that can be used to vacuum for this relation",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ }, -1, 0, 1024
+ },
+ {
+ {
"log_autovacuum_min_duration",
"Sets the minimum execution time above which autovacuum actions will be logged",
RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
@@ -1342,6 +1350,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, multixact_freeze_max_age)},
{"autovacuum_multixact_freeze_table_age", RELOPT_TYPE_INT,
offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, multixact_freeze_table_age)},
+ {"autovacuum_vacuum_parallel_workers", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_parallel_workers)},
{"log_autovacuum_min_duration", RELOPT_TYPE_INT,
offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, log_min_duration)},
{"autovacuum_vacuum_scale_factor", RELOPT_TYPE_REAL,
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 52dda41..113495e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -90,8 +90,6 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
bool is_bitmapscan,
bool is_samplescan,
bool temp_snap);
-static void heap_parallelscan_startblock_init(HeapScanDesc scan);
-static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -1667,7 +1665,7 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
* only to set the startblock once.
* ----------------
*/
-static void
+void
heap_parallelscan_startblock_init(HeapScanDesc scan)
{
BlockNumber sync_startpage = InvalidBlockNumber;
@@ -1715,7 +1713,7 @@ retry:
* first backend gets an InvalidBlockNumber return.
* ----------------
*/
-static BlockNumber
+BlockNumber
heap_parallelscan_nextpage(HeapScanDesc scan)
{
BlockNumber page;
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index d683050..7f79341 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -20,6 +20,7 @@
#include "access/xlog.h"
#include "catalog/namespace.h"
#include "commands/async.h"
+#include "commands/vacuum.h"
#include "executor/execParallel.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -122,6 +123,9 @@ static const struct
{
{
"ParallelQueryMain", ParallelQueryMain
+ },
+ {
+ "LazyVacuumWorkerMain", LazyVacuumWorkerMain
}
};
@@ -1125,6 +1129,9 @@ ParallelWorkerMain(Datum main_arg)
/* Set ParallelMasterBackendId so we know how to address temp relations. */
ParallelMasterBackendId = fps->parallel_master_backend_id;
+ /* Report pid of master process for progress information */
+ pgstat_report_leader_pid(fps->parallel_master_pid);
+
/*
* We've initialized all of our state now; nothing should change
* hereafter.
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index dc40cde..be754e8 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -883,8 +883,9 @@ CREATE VIEW pg_stat_bgwriter AS
CREATE VIEW pg_stat_progress_vacuum AS
SELECT
- S.pid AS pid, S.datid AS datid, D.datname AS datname,
- S.relid AS relid,
+ S.pid,
+ S.datid,
+ S.relid,
CASE S.param1 WHEN 0 THEN 'initializing'
WHEN 1 THEN 'scanning heap'
WHEN 2 THEN 'vacuuming indexes'
@@ -893,11 +894,22 @@ CREATE VIEW pg_stat_progress_vacuum AS
WHEN 5 THEN 'truncating heap'
WHEN 6 THEN 'performing final cleanup'
END AS phase,
- S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned,
- S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count,
- S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples
- FROM pg_stat_get_progress_info('VACUUM') AS S
- LEFT JOIN pg_database D ON S.datid = D.oid;
+ S.param2 AS heap_blks_total,
+ W.heap_blks_scanned,
+ W.heap_blks_vacuumed,
+ W.index_vacuum_count,
+ S.param6 AS max_dead_tuples,
+ W.num_dead_tuples
+ FROM pg_stat_get_progress_info('VACUUM') AS S,
+ (SELECT leader_pid,
+ max(param3) AS heap_blks_scanned,
+ max(param4) AS heap_blks_vacuumed,
+ max(param5) AS index_vacuum_count,
+ max(param7) AS num_dead_tuples
+ FROM pg_stat_get_progress_info('VACUUM')
+ GROUP BY leader_pid) AS W
+ WHERE
+ S.pid = W.leader_pid;
CREATE VIEW pg_user_mappings AS
SELECT
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index cbd6e9b..1484583 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -74,7 +74,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
MultiXactId minMulti,
TransactionId lastSaneFrozenXid,
MultiXactId lastSaneMinMulti);
-static bool vacuum_rel(Oid relid, RangeVar *relation, int options,
+static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumOptions options,
VacuumParams *params);
/*
@@ -89,15 +89,15 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
VacuumParams params;
/* sanity checks on options */
- Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE));
- Assert((vacstmt->options & VACOPT_VACUUM) ||
- !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
- Assert(!(vacstmt->options & VACOPT_SKIPTOAST));
+ Assert(vacstmt->options.flags & (VACOPT_VACUUM | VACOPT_ANALYZE));
+ Assert((vacstmt->options.flags & VACOPT_VACUUM) ||
+ !(vacstmt->options.flags & (VACOPT_FULL | VACOPT_FREEZE)));
+ Assert(!(vacstmt->options.flags & VACOPT_SKIPTOAST));
/*
* Make sure VACOPT_ANALYZE is specified if any column lists are present.
*/
- if (!(vacstmt->options & VACOPT_ANALYZE))
+ if (!(vacstmt->options.flags & VACOPT_ANALYZE))
{
ListCell *lc;
@@ -116,7 +116,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
* All freeze ages are zero if the FREEZE option is given; otherwise pass
* them as -1 which means to use the default values.
*/
- if (vacstmt->options & VACOPT_FREEZE)
+ if (vacstmt->options.flags & VACOPT_FREEZE)
{
params.freeze_min_age = 0;
params.freeze_table_age = 0;
@@ -163,7 +163,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
* memory context that will not disappear at transaction commit.
*/
void
-vacuum(int options, List *relations, VacuumParams *params,
+vacuum(VacuumOptions options, List *relations, VacuumParams *params,
BufferAccessStrategy bstrategy, bool isTopLevel)
{
static bool in_vacuum = false;
@@ -174,7 +174,7 @@ vacuum(int options, List *relations, VacuumParams *params,
Assert(params != NULL);
- stmttype = (options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
+ stmttype = (options.flags & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
/*
* We cannot run VACUUM inside a user transaction block; if we were inside
@@ -184,7 +184,7 @@ vacuum(int options, List *relations, VacuumParams *params,
*
* ANALYZE (without VACUUM) can run either way.
*/
- if (options & VACOPT_VACUUM)
+ if (options.flags & VACOPT_VACUUM)
{
PreventTransactionChain(isTopLevel, stmttype);
in_outer_xact = false;
@@ -206,17 +206,26 @@ vacuum(int options, List *relations, VacuumParams *params,
/*
* Sanity check DISABLE_PAGE_SKIPPING option.
*/
- if ((options & VACOPT_FULL) != 0 &&
- (options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
+ if ((options.flags & VACOPT_FULL) != 0 &&
+ (options.flags & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL")));
/*
+ * Sanity check PARALLEL option.
+ */
+ if ((options.flags & VACOPT_FULL) != 0 &&
+ (options.flags & VACOPT_PARALLEL) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("VACUUM option PARALLEL cannnot be used with FULL")));
+
+ /*
* Send info about dead objects to the statistics collector, unless we are
* in autovacuum --- autovacuum.c does this for itself.
*/
- if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+ if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
pgstat_vacuum_stat();
/*
@@ -281,11 +290,11 @@ vacuum(int options, List *relations, VacuumParams *params,
* transaction block, and also in an autovacuum worker, use own
* transactions so we can release locks sooner.
*/
- if (options & VACOPT_VACUUM)
+ if (options.flags & VACOPT_VACUUM)
use_own_xacts = true;
else
{
- Assert(options & VACOPT_ANALYZE);
+ Assert(options.flags & VACOPT_ANALYZE);
if (IsAutoVacuumWorkerProcess())
use_own_xacts = true;
else if (in_outer_xact)
@@ -335,13 +344,13 @@ vacuum(int options, List *relations, VacuumParams *params,
{
VacuumRelation *vrel = lfirst_node(VacuumRelation, cur);
- if (options & VACOPT_VACUUM)
+ if (options.flags & VACOPT_VACUUM)
{
if (!vacuum_rel(vrel->oid, vrel->relation, options, params))
continue;
}
- if (options & VACOPT_ANALYZE)
+ if (options.flags & VACOPT_ANALYZE)
{
/*
* If using separate xacts, start one for analyze. Otherwise,
@@ -354,7 +363,7 @@ vacuum(int options, List *relations, VacuumParams *params,
PushActiveSnapshot(GetTransactionSnapshot());
}
- analyze_rel(vrel->oid, vrel->relation, options, params,
+ analyze_rel(vrel->oid, vrel->relation, options.flags, params,
vrel->va_cols, in_outer_xact, vac_strategy);
if (use_own_xacts)
@@ -390,7 +399,7 @@ vacuum(int options, List *relations, VacuumParams *params,
StartTransactionCommand();
}
- if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+ if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
{
/*
* Update pg_database.datfrozenxid, and truncate pg_xact if possible.
@@ -1321,7 +1330,7 @@ vac_truncate_clog(TransactionId frozenXID,
* At entry and exit, we are not inside a transaction.
*/
static bool
-vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
+vacuum_rel(Oid relid, RangeVar *relation, VacuumOptions options, VacuumParams *params)
{
LOCKMODE lmode;
Relation onerel;
@@ -1342,7 +1351,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
*/
PushActiveSnapshot(GetTransactionSnapshot());
- if (!(options & VACOPT_FULL))
+ if (!(options.flags & VACOPT_FULL))
{
/*
* In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
@@ -1382,7 +1391,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
* vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
* way, we can be sure that no other backend is vacuuming the same table.
*/
- lmode = (options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;
+ lmode = (options.flags & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;
/*
* Open the relation and get the appropriate lock on it.
@@ -1393,7 +1402,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
* If we've been asked not to wait for the relation lock, acquire it first
* in non-blocking mode, before calling try_relation_open().
*/
- if (!(options & VACOPT_NOWAIT))
+ if (!(options.flags & VACOPT_NOWAIT))
onerel = try_relation_open(relid, lmode);
else if (ConditionalLockRelationOid(relid, lmode))
onerel = try_relation_open(relid, NoLock);
@@ -1510,7 +1519,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
* us to process it. In VACUUM FULL, though, the toast table is
* automatically rebuilt by cluster_rel so we shouldn't recurse to it.
*/
- if (!(options & VACOPT_SKIPTOAST) && !(options & VACOPT_FULL))
+ if (!(options.flags & VACOPT_SKIPTOAST) && !(options.flags & VACOPT_FULL))
toast_relid = onerel->rd_rel->reltoastrelid;
else
toast_relid = InvalidOid;
@@ -1529,7 +1538,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
/*
* Do the actual work --- either FULL or "lazy" vacuum
*/
- if (options & VACOPT_FULL)
+ if (options.flags & VACOPT_FULL)
{
/* close relation before vacuuming, but hold lock until commit */
relation_close(onerel, NoLock);
@@ -1537,7 +1546,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
/* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
cluster_rel(relid, InvalidOid, false,
- (options & VACOPT_VERBOSE) != 0);
+ (options.flags & VACOPT_VERBOSE) != 0);
}
else
lazy_vacuum_rel(onerel, options, params, vac_strategy);
@@ -1591,8 +1600,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
* hit dangling index pointers.
*/
void
-vac_open_indexes(Relation relation, LOCKMODE lockmode,
- int *nindexes, Relation **Irel)
+vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel)
{
List *indexoidlist;
ListCell *indexoidscan;
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 30b1c08..9b5da1f 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -22,6 +22,20 @@
* of index scans performed. So we don't use maintenance_work_mem memory for
* the TID array, just enough to hold as many heap tuples as fit on one page.
*
+ * In PostgreSQL 11, we support a parallel option for lazy vacuum. In parallel
+ * lazy vacuum, multiple vacuum worker processes get blocks in parallel using
+ * parallel heap scan and process them. If a table with indexes the parallel
+ * vacuum workers vacuum the heap and indexes in parallel. Also, since dead
+ * tuple TIDs is shared with all vacuum processes including the leader process
+ * the parallel vacuum processes have to make two synchronization points in
+ * lazy vacuum processing: when before starting vacuum and when before clearing
+ * dead tuple TIDs. In these two points the leader treats dead tuple TIDs as
+ * an arbiter. The information required by parallel lazy vacuum such as the
+ * statistics of table, parallel heap scan description have to be shared with
+ * all vacuum processes, and table statistics are funneled by the leader
+ * process after finished. Note that dead tuple TIDs need to be shared only
+ * when the table has indexes. For table with no indexes, each parallel worker
+ * processes blocks and vacuum them independently.
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -41,8 +55,10 @@
#include "access/heapam_xlog.h"
#include "access/htup_details.h"
#include "access/multixact.h"
+#include "access/relscan.h"
#include "access/transam.h"
#include "access/visibilitymap.h"
+#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
#include "catalog/storage.h"
@@ -54,6 +70,7 @@
#include "portability/instr_time.h"
#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "utils/lsyscache.h"
@@ -103,10 +120,81 @@
*/
#define PREFETCH_SIZE ((BlockNumber) 32)
+/* DSM key for parallel lazy vacuum */
+#define VACUUM_KEY_PARALLEL_SCAN UINT64CONST(0xFFFFFFFFFFF00001)
+#define VACUUM_KEY_VACUUM_STATS UINT64CONST(0xFFFFFFFFFFF00002)
+#define VACUUM_KEY_INDEX_STATS UINT64CONST(0xFFFFFFFFFFF00003)
+#define VACUUM_KEY_DEAD_TUPLE_CTL UINT64CONST(0xFFFFFFFFFFF00004)
+#define VACUUM_KEY_DEAD_TUPLES UINT64CONST(0xFFFFFFFFFFF00005)
+#define VACUUM_KEY_PARALLEL_STATE UINT64CONST(0xFFFFFFFFFFF00006)
+
+/*
+ * see note of lazy_scan_heap_get_nextpage about forcing scanning of
+ * last page
+ */
+#define FORCE_CHECK_PAGE(blk) \
+ (blkno == (blk - 1) && should_attempt_truncation(vacrelstats))
+
+/* Check if given index is assigned to this parallel vacuum worker */
+#define IsAssignedIndex(i, pstate) \
+ (pstate == NULL || \
+ (((i) % ((LVParallelState *) (pstate))->nworkers -1 ) == ParallelWorkerNumber))
+
+#define IsDeadTupleShared(lvstate) \
+ (((LVState *)(lvstate))->parallel_mode && ((LVState *)(lvstate))->vacrelstats->nindexes > 0)
+
+/* Vacuum worker state for parallel lazy vacuum */
+#define VACSTATE_SCAN 0x1 /* heap scan phase */
+#define VACSTATE_VACUUM 0x2 /* vacuuming on table and index */
+
+/*
+ * Vacuum relevant options and thresholds that we need share with parallel
+ * vacuum workers.
+ */
+typedef struct VacuumInfo
+{
+ int options; /* VACUUM options */
+ bool aggressive; /* does each worker need to aggressive vacuum? */
+ TransactionId oldestxmin;
+ TransactionId freezelimit;
+ MultiXactId multixactcutoff;
+ int elevel;
+} VacuumInfo;
+
+/* Struct for index statistics that are used for parallel lazy vacuum */
+typedef struct LVIndStats
+{
+ bool updated; /* need to be updated? */
+ BlockNumber num_pages;
+ BlockNumber num_tuples;
+} LVIndStats;
+
+/* Struct for parallel lazy vacuum state */
+typedef struct LVParallelState
+{
+ int nworkers; /* # of process doing vacuum */
+ VacuumInfo info; /* vacuum relevant options to share */
+ int state; /* current parallel vacuum status */
+ int finish_count; /* the number of workers finished current state */
+ ConditionVariable cv;
+ slock_t mutex; /* protect above fields */
+} LVParallelState;
+
+/* Struct for control dead tuple TIDs array */
+typedef struct LVDeadTupleCtl
+{
+ int dt_max; /* # slots allocated in array */
+ int dt_count; /* # of dead tuple */
+
+ /* Used only for parallel lazy vacuum */
+ int dt_index; /* current index of dead tuple array used
+ in lazy_vacuum_heap */
+ slock_t mutex; /* protect above fields */
+} LVDeadTupleCtl;
+
typedef struct LVRelStats
{
- /* hasindex = true means two-pass strategy; false means one-pass */
- bool hasindex;
+ int nindexes; /* > 0 means two-pass strategy; = 0 means one-pass */
/* Overall statistics about rel */
BlockNumber old_rel_pages; /* previous value of pg_class.relpages */
BlockNumber rel_pages; /* total number of pages */
@@ -118,19 +206,46 @@ typedef struct LVRelStats
double old_rel_tuples; /* previous value of pg_class.reltuples */
double new_rel_tuples; /* new estimated total # of tuples */
double new_dead_tuples; /* new estimated total # of dead tuples */
- BlockNumber pages_removed;
double tuples_deleted;
- BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
- /* List of TIDs of tuples we intend to delete */
- /* NB: this list is ordered by TID address */
- int num_dead_tuples; /* current # of entries */
- int max_dead_tuples; /* # slots allocated in array */
- ItemPointer dead_tuples; /* array of ItemPointerData */
int num_index_scans;
+ BlockNumber pages_removed;
+ BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
TransactionId latestRemovedXid;
bool lock_waiter_detected;
} LVRelStats;
+/* Struct for lazy vacuum execution */
+typedef struct LVState
+{
+ bool parallel_mode;
+ LVRelStats *vacrelstats;
+ /*
+ * Used when both parallel and non-parallel lazy vacuum, but in parallel
+ * lazy vacuum and table with index, dtctl points to a dynamic shared memory
+ * and controlled by dtctl struct.
+ */
+ LVDeadTupleCtl *dtctl;
+ ItemPointer deadtuples;
+
+ /* Used only for parallel lazy vacuum */
+ ParallelContext *pcxt;
+ LVParallelState *pstate;
+ ParallelHeapScanDesc pscan;
+ LVIndStats *indstats;
+} LVState;
+
+/*
+ * Scan description data for lazy vacuum. In parallel lazy vacuum,
+ * we use only heapscan instead.
+ */
+typedef struct LVScanDescData
+{
+ BlockNumber lv_cblock; /* current scanning block number */
+ BlockNumber lv_next_unskippable_block; /* next block number we cannot skip */
+ BlockNumber lv_nblocks; /* the number blocks of relation */
+ HeapScanDesc heapscan; /* field for parallel lazy vacuum */
+} LVScanDescData;
+typedef struct LVScanDescData *LVScanDesc;
/* A few variables that don't seem worth passing around as parameters */
static int elevel = -1;
@@ -141,32 +256,47 @@ static MultiXactId MultiXactCutoff;
static BufferAccessStrategy vac_strategy;
-
-/* non-export function prototypes */
-static void lazy_scan_heap(Relation onerel, int options,
- LVRelStats *vacrelstats, Relation *Irel, int nindexes,
- bool aggressive);
-static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
+/* nonf-export function prototypes */
+static void lazy_vacuum_heap(Relation onerel, LVState *lvstate);
static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
static void lazy_vacuum_index(Relation indrel,
IndexBulkDeleteResult **stats,
- LVRelStats *vacrelstats);
-static void lazy_cleanup_index(Relation indrel,
- IndexBulkDeleteResult *stats,
- LVRelStats *vacrelstats);
+ LVState *lvstate);
+static void lazy_cleanup_index(Relation indrel, IndexBulkDeleteResult *stats,
+ LVRelStats *vacrelstats, LVIndStats *indstat);
static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
- int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
+ int tupindex, LVState *lvstate, Buffer *vmbuffer);
static bool should_attempt_truncation(LVRelStats *vacrelstats);
static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
static BlockNumber count_nondeletable_pages(Relation onerel,
LVRelStats *vacrelstats);
-static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
-static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
- ItemPointer itemptr);
+static void lazy_space_alloc(LVState *lvstate, BlockNumber relblocks);
+static void lazy_record_dead_tuple(LVState *state, ItemPointer itemptr);
static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
static int vac_cmp_itemptr(const void *left, const void *right);
static bool heap_page_is_all_visible(Relation rel, Buffer buf,
TransactionId *visibility_cutoff_xid, bool *all_frozen);
+static void do_lazy_scan_heap(LVState *lvstate, Relation onerel, Relation *Irels,
+ int nindexes, int options, bool aggressive);
+static void lazy_scan_heap(Relation rel, LVState *lvstate, VacuumOptions options,
+ bool aggressive);
+
+/* function prototypes for parallel vacuum */
+static void lazy_gather_vacuum_stats(ParallelContext *pxct,
+ LVRelStats *valrelstats);
+static void lazy_estimate_dsm(ParallelContext *pcxt, LVRelStats *vacrelstats);
+static void lazy_initialize_dsm(ParallelContext *pcxt, Relation onrel,
+ LVState *lvstate, int options, bool aggressive);
+static LVState *lazy_initialize_worker(shm_toc *toc);
+static LVScanDesc lv_beginscan(Relation onerel, ParallelHeapScanDesc pscan);
+static void lv_endscan(LVScanDesc lvscan);
+static BlockNumber lazy_scan_get_nextpage(Relation onerel, LVState *lvstate,
+ LVScanDesc lvscan,
+ bool *all_visible_according_to_vm,
+ Buffer *vmbuffer, int options, bool aggressive);
+static void lazy_prepare_vacuum(LVState *lvstate);
+static void lazy_end_vacuum(LVState *lvstate);
+static long lazy_get_max_dead_tuples(LVRelStats *vacrelstats);
/*
@@ -179,12 +309,11 @@ static bool heap_page_is_all_visible(Relation rel, Buffer buf,
* and locked the relation.
*/
void
-lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
+lazy_vacuum_rel(Relation onerel, VacuumOptions options, VacuumParams *params,
BufferAccessStrategy bstrategy)
{
- LVRelStats *vacrelstats;
- Relation *Irel;
- int nindexes;
+ LVState *lvstate;
+ LVRelStats *vacrelstats;
PGRUsage ru0;
TimestampTz starttime = 0;
long secs;
@@ -211,7 +340,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
starttime = GetCurrentTimestamp();
}
- if (options & VACOPT_VERBOSE)
+ if (options.flags & VACOPT_VERBOSE)
elevel = INFO;
else
elevel = DEBUG2;
@@ -239,10 +368,12 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
xidFullScanLimit);
aggressive |= MultiXactIdPrecedesOrEquals(onerel->rd_rel->relminmxid,
mxactFullScanLimit);
- if (options & VACOPT_DISABLE_PAGE_SKIPPING)
+ if (options.flags & VACOPT_DISABLE_PAGE_SKIPPING)
aggressive = true;
+ lvstate = (LVState *) palloc0(sizeof(LVState));
vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));
+ lvstate->vacrelstats = vacrelstats;
vacrelstats->old_rel_pages = onerel->rd_rel->relpages;
vacrelstats->old_rel_tuples = onerel->rd_rel->reltuples;
@@ -250,15 +381,8 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
vacrelstats->pages_removed = 0;
vacrelstats->lock_waiter_detected = false;
- /* Open all indexes of the relation */
- vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
- vacrelstats->hasindex = (nindexes > 0);
-
/* Do the vacuuming */
- lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive);
-
- /* Done with indexes */
- vac_close_indexes(nindexes, Irel, NoLock);
+ lazy_scan_heap(onerel, lvstate, options, aggressive);
/*
* Compute whether we actually scanned the all unfrozen pages. If we did,
@@ -267,7 +391,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
* NB: We need to check this before truncating the relation, because that
* will change ->rel_pages.
*/
- if ((vacrelstats->scanned_pages + vacrelstats->frozenskipped_pages)
+ if ((lvstate->vacrelstats->scanned_pages + vacrelstats->frozenskipped_pages)
< vacrelstats->rel_pages)
{
Assert(!aggressive);
@@ -329,7 +453,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
new_rel_pages,
new_rel_tuples,
new_rel_allvisible,
- vacrelstats->hasindex,
+ (vacrelstats->nindexes != 0),
new_frozen_xid,
new_min_multi,
false);
@@ -439,28 +563,180 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
}
/*
- * lazy_scan_heap() -- scan an open heap relation
+ * If the number of workers is specified more than 0, we enter the parallel lazy
+ * vacuum mode. In parallel lazy vacuum mode, we initialize a dynamic shared memory
+ * and launch parallel vacuum workers. The launcher process also vacuums the table
+ * after launched and then waits for the all vacuum workers to finish. After all vacuum
+ * workers finished we gather the vacuum statistics of table and indexes, and update
+ * them.
+ */
+static void
+lazy_scan_heap(Relation onerel, LVState *lvstate, VacuumOptions options,
+ bool aggressive)
+{
+ ParallelContext *pcxt;
+ LVRelStats *vacrelstats = lvstate->vacrelstats;
+ Relation *Irel;
+ int nindexes;
+
+ lvstate->parallel_mode = options.nworkers > 0;
+
+ /* Open indexes */
+ vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
+ vacrelstats->nindexes = nindexes;
+
+ if (lvstate->parallel_mode)
+ {
+ EnterParallelMode();
+
+ /* Create parallel context and initialize it */
+ pcxt = CreateParallelContext("postgres", "LazyVacuumWorkerMain",
+ options.nworkers);
+ lvstate->pcxt = pcxt;
+
+ /* Estimate DSM size for parallel vacuum */
+ lazy_estimate_dsm(pcxt, lvstate->vacrelstats);
+
+ /* Initialize DSM for parallel vacuum */
+ InitializeParallelDSM(pcxt);
+ lazy_initialize_dsm(pcxt, onerel, lvstate, options.flags, aggressive);
+
+ /* Set master pid to itself */
+ pgstat_report_leader_pid(MyProcPid);
+
+ /* Launch workers */
+ LaunchParallelWorkers(pcxt);
+ }
+
+ do_lazy_scan_heap(lvstate, onerel, Irel, nindexes, options.flags, aggressive);
+
+ /*
+ * We can update relation statistics such as scanned page after gathered
+ * statistics from all workers. Also, in parallel mode since we cannot update
+ * index statistics at the same time the leader process have to do it.
+ *
+ * XXX : If we allows workers to update statistics tuples at the same time
+ * the updating index statistics can be done in lazy_cleanup_index().
+ */
+ if (lvstate->parallel_mode)
+ {
+ int i;
+ LVIndStats *indstats = palloc(sizeof(LVIndStats) * lvstate->vacrelstats->nindexes);
+
+ /* Wait for workers finished vacuum */
+ WaitForParallelWorkersToFinish(pcxt);
+
+ /* Gather the result of vacuum statistics from all workers */
+ lazy_gather_vacuum_stats(pcxt, vacrelstats);
+
+ /* Now we can compute the new value for pg_class.reltuples */
+ vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
+ vacrelstats->rel_pages,
+ vacrelstats->scanned_pages,
+ vacrelstats->scanned_tuples);
+
+ /* Copy new index stats to local memory */
+ memcpy(indstats, lvstate->indstats, sizeof(LVIndStats) * vacrelstats->nindexes);
+
+ DestroyParallelContext(pcxt);
+ ExitParallelMode();
+
+ /* After exit parallel mode, update index statistics */
+ for (i = 0; i < vacrelstats->nindexes; i++)
+ {
+ Relation ind = Irel[i];
+ LVIndStats *indstat = (LVIndStats *) &(indstats[i]);
+
+ if (indstat->updated)
+ vac_update_relstats(ind,
+ indstat->num_pages,
+ indstat->num_tuples,
+ 0,
+ false,
+ InvalidTransactionId,
+ InvalidMultiXactId,
+ false);
+ }
+
+ pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED,
+ vacrelstats->scanned_pages);
+ pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED,
+ vacrelstats->rel_pages);
+ pgstat_progress_update_param(PROGRESS_VACUUM_NUM_INDEX_VACUUMS,
+ vacrelstats->num_index_scans);
+ }
+
+ vac_close_indexes(nindexes, Irel, RowExclusiveLock);
+}
+
+/*
+ * Entry point of parallel vacuum worker.
+ */
+void
+LazyVacuumWorkerMain(dsm_segment *seg, shm_toc *toc)
+{
+ LVState *lvstate;
+ Relation rel;
+ Relation *indrel;
+ int nindexes_worker;
+
+ /* Look up dynamic shared memory and initialize */
+ lvstate = lazy_initialize_worker(toc);
+
+ Assert(lvstate != NULL);
+
+ rel = relation_open(lvstate->pscan->phs_relid, ShareUpdateExclusiveLock);
+
+ /* Open all indexes */
+ vac_open_indexes(rel, RowExclusiveLock, &nindexes_worker,
+ &indrel);
+
+ pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM,
+ RelationGetRelid(rel));
+ /* Do lazy vacuum */
+ do_lazy_scan_heap(lvstate, rel, indrel, lvstate->vacrelstats->nindexes,
+ lvstate->pstate->info.options, lvstate->pstate->info.aggressive);
+
+ pgstat_progress_end_command();
+
+ vac_close_indexes(lvstate->vacrelstats->nindexes, indrel, RowExclusiveLock);
+ heap_close(rel, ShareUpdateExclusiveLock);
+}
+
+/*
+ * do_lazy_scan_heap() -- scan an open heap relation
*
* This routine prunes each page in the heap, which will among other
* things truncate dead tuples to dead line pointers, defragment the
- * page, and set commit status bits (see heap_page_prune). It also builds
+ * page, and set commit status bits (see heap_page_prune). It also uses
* lists of dead tuples and pages with free space, calculates statistics
* on the number of live tuples in the heap, and marks pages as
* all-visible if appropriate. When done, or when we run low on space for
- * dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
- * to reclaim dead line pointers.
+ * dead-tuple TIDs, invoke vacuuming of assigned indexes and call lazy_vacuum_heap
+ * to reclaim dead line pointers. In parallel vacuum, we need to synchronize
+ * at where scanning heap finished and vacuuming heap finished. The vacuum
+ * worker reached to that point first need to wait for other vacuum workers
+ * reached to the same point.
+ *
+ * In parallel lazy scan, we get next page number using parallel heap scan.
+ * Since the dead tuple TIDs are shared with all vacuum workers, we have to
+ * wait for all other workers to reach to the same points where before starting
+ * reclaiming dead tuple TIDs and before clearing dead tuple TIDs information
+ * in dynamic shared memory.
*
* If there are no indexes then we can reclaim line pointers on the fly;
* dead line pointers need only be retained until all index pointers that
* reference them have been killed.
*/
static void
-lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
- Relation *Irel, int nindexes, bool aggressive)
+do_lazy_scan_heap(LVState *lvstate, Relation onerel, Relation *Irel,
+ int nindexes, int options, bool aggressive)
{
- BlockNumber nblocks,
- blkno;
+ LVRelStats *vacrelstats = lvstate->vacrelstats;
+ BlockNumber blkno;
+ BlockNumber nblocks;
HeapTupleData tuple;
+ LVScanDesc lvscan;
char *relname;
BlockNumber empty_pages,
vacuumed_pages;
@@ -468,14 +744,15 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
tups_vacuumed,
nkeep,
nunused;
+ int dt_vacuum_threshold;
IndexBulkDeleteResult **indstats;
int i;
PGRUsage ru0;
Buffer vmbuffer = InvalidBuffer;
- BlockNumber next_unskippable_block;
- bool skipping_blocks;
xl_heap_freeze_tuple *frozen;
StringInfoData buf;
+ bool all_visible_according_to_vm = false;
+
const int initprog_index[] = {
PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -504,89 +781,27 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
vacrelstats->nonempty_pages = 0;
vacrelstats->latestRemovedXid = InvalidTransactionId;
- lazy_space_alloc(vacrelstats, nblocks);
+ lazy_space_alloc(lvstate, nblocks);
frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
+ /* Begin heap scan for vacuum */
+ lvscan = lv_beginscan(onerel, lvstate->pscan);
+
/* Report that we're scanning the heap, advertising total # of blocks */
initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
initprog_val[1] = nblocks;
- initprog_val[2] = vacrelstats->max_dead_tuples;
+ initprog_val[2] = lvstate->dtctl->dt_max;
pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
- /*
- * Except when aggressive is set, we want to skip pages that are
- * all-visible according to the visibility map, but only when we can skip
- * at least SKIP_PAGES_THRESHOLD consecutive pages. Since we're reading
- * sequentially, the OS should be doing readahead for us, so there's no
- * gain in skipping a page now and then; that's likely to disable
- * readahead and so be counterproductive. Also, skipping even a single
- * page means that we can't update relfrozenxid, so we only want to do it
- * if we can skip a goodly number of pages.
- *
- * When aggressive is set, we can't skip pages just because they are
- * all-visible, but we can still skip pages that are all-frozen, since
- * such pages do not need freezing and do not affect the value that we can
- * safely set for relfrozenxid or relminmxid.
- *
- * Before entering the main loop, establish the invariant that
- * next_unskippable_block is the next block number >= blkno that we can't
- * skip based on the visibility map, either all-visible for a regular scan
- * or all-frozen for an aggressive scan. We set it to nblocks if there's
- * no such block. We also set up the skipping_blocks flag correctly at
- * this stage.
- *
- * Note: The value returned by visibilitymap_get_status could be slightly
- * out-of-date, since we make this test before reading the corresponding
- * heap page or locking the buffer. This is OK. If we mistakenly think
- * that the page is all-visible or all-frozen when in fact the flag's just
- * been cleared, we might fail to vacuum the page. It's easy to see that
- * skipping a page when aggressive is not set is not a very big deal; we
- * might leave some dead tuples lying around, but the next vacuum will
- * find them. But even when aggressive *is* set, it's still OK if we miss
- * a page whose all-frozen marking has just been cleared. Any new XIDs
- * just added to that page are necessarily newer than the GlobalXmin we
- * computed, so they'll have no effect on the value to which we can safely
- * set relfrozenxid. A similar argument applies for MXIDs and relminmxid.
- *
- * We will scan the table's last page, at least to the extent of
- * determining whether it has tuples or not, even if it should be skipped
- * according to the above rules; except when we've already determined that
- * it's not worth trying to truncate the table. This avoids having
- * lazy_truncate_heap() take access-exclusive lock on the table to attempt
- * a truncation that just fails immediately because there are tuples in
- * the last page. This is worth avoiding mainly because such a lock must
- * be replayed on any hot standby, where it can be disruptive.
- */
- next_unskippable_block = 0;
- if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
- {
- while (next_unskippable_block < nblocks)
- {
- uint8 vmstatus;
-
- vmstatus = visibilitymap_get_status(onerel, next_unskippable_block,
- &vmbuffer);
- if (aggressive)
- {
- if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
- break;
- }
- else
- {
- if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
- break;
- }
- vacuum_delay_point();
- next_unskippable_block++;
- }
- }
-
- if (next_unskippable_block >= SKIP_PAGES_THRESHOLD)
- skipping_blocks = true;
+ if (lvstate->parallel_mode)
+ dt_vacuum_threshold = MaxHeapTuplesPerPage *
+ (lvstate->pstate->nworkers + 1);
else
- skipping_blocks = false;
+ dt_vacuum_threshold = MaxHeapTuplesPerPage;
- for (blkno = 0; blkno < nblocks; blkno++)
+ while((blkno = lazy_scan_get_nextpage(onerel, lvstate, lvscan,
+ &all_visible_according_to_vm,
+ &vmbuffer, options, aggressive)) != InvalidBlockNumber)
{
Buffer buf;
Page page;
@@ -597,99 +812,35 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
int prev_dead_count;
int nfrozen;
Size freespace;
- bool all_visible_according_to_vm = false;
bool all_visible;
bool all_frozen = true; /* provided all_visible is also true */
bool has_dead_tuples;
TransactionId visibility_cutoff_xid = InvalidTransactionId;
+ int dtmax;
+ int dtcount;
- /* see note above about forcing scanning of last page */
-#define FORCE_CHECK_PAGE() \
- (blkno == nblocks - 1 && should_attempt_truncation(vacrelstats))
-
+ elog(NOTICE, "pid %d blkno %d", MyProcPid, blkno);
pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
- if (blkno == next_unskippable_block)
- {
- /* Time to advance next_unskippable_block */
- next_unskippable_block++;
- if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
- {
- while (next_unskippable_block < nblocks)
- {
- uint8 vmskipflags;
-
- vmskipflags = visibilitymap_get_status(onerel,
- next_unskippable_block,
- &vmbuffer);
- if (aggressive)
- {
- if ((vmskipflags & VISIBILITYMAP_ALL_FROZEN) == 0)
- break;
- }
- else
- {
- if ((vmskipflags & VISIBILITYMAP_ALL_VISIBLE) == 0)
- break;
- }
- vacuum_delay_point();
- next_unskippable_block++;
- }
- }
-
- /*
- * We know we can't skip the current block. But set up
- * skipping_blocks to do the right thing at the following blocks.
- */
- if (next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD)
- skipping_blocks = true;
- else
- skipping_blocks = false;
-
- /*
- * Normally, the fact that we can't skip this block must mean that
- * it's not all-visible. But in an aggressive vacuum we know only
- * that it's not all-frozen, so it might still be all-visible.
- */
- if (aggressive && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer))
- all_visible_according_to_vm = true;
- }
- else
- {
- /*
- * The current block is potentially skippable; if we've seen a
- * long enough run of skippable blocks to justify skipping it, and
- * we're not forced to check it, then go ahead and skip.
- * Otherwise, the page must be at least all-visible if not
- * all-frozen, so we can set all_visible_according_to_vm = true.
- */
- if (skipping_blocks && !FORCE_CHECK_PAGE())
- {
- /*
- * Tricky, tricky. If this is in aggressive vacuum, the page
- * must have been all-frozen at the time we checked whether it
- * was skippable, but it might not be any more. We must be
- * careful to count it as a skipped all-frozen page in that
- * case, or else we'll think we can't update relfrozenxid and
- * relminmxid. If it's not an aggressive vacuum, we don't
- * know whether it was all-frozen, so we have to recheck; but
- * in this case an approximate answer is OK.
- */
- if (aggressive || VM_ALL_FROZEN(onerel, blkno, &vmbuffer))
- vacrelstats->frozenskipped_pages++;
- continue;
- }
- all_visible_according_to_vm = true;
- }
-
vacuum_delay_point();
/*
* If we are close to overrunning the available space for dead-tuple
* TIDs, pause and do a cycle of vacuuming before we tackle this page.
+ * We don't need to acquire lock because dt_max should not be changed
+ * while running vacuum.
*/
- if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
- vacrelstats->num_dead_tuples > 0)
+ if (IsDeadTupleShared(lvstate))
+ {
+ SpinLockAcquire(&lvstate->dtctl->mutex);
+ dtcount = lvstate->dtctl->dt_count;
+ SpinLockRelease(&lvstate->dtctl->mutex);
+ }
+ else
+ dtcount = lvstate->dtctl->dt_count;
+
+ dtmax = lvstate->dtctl->dt_max;
+ if (((dtmax - dtcount) < dt_vacuum_threshold) && dtcount > 0)
{
const int hvp_index[] = {
PROGRESS_VACUUM_PHASE,
@@ -698,6 +849,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
int64 hvp_val[2];
/*
+ * Here we're about to vacuum the table and indexes actually. Before
+ * entering vacuum state, we have to wait for other vacuum worker to
+ * reach here.
+ */
+ lazy_prepare_vacuum(lvstate);
+
+ /*
* Before beginning index vacuuming, we release any pin we may
* hold on the visibility map page. This isn't necessary for
* correctness, but we do it anyway to avoid holding the pin
@@ -716,11 +874,12 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
- /* Remove index entries */
+ /* Remove assigned index entries */
for (i = 0; i < nindexes; i++)
- lazy_vacuum_index(Irel[i],
- &indstats[i],
- vacrelstats);
+ {
+ if (IsAssignedIndex(i, lvstate->pstate))
+ lazy_vacuum_index(Irel[i], &indstats[i], lvstate);
+ }
/*
* Report that we are now vacuuming the heap. We also increase
@@ -733,19 +892,22 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
pgstat_progress_update_multi_param(2, hvp_index, hvp_val);
/* Remove tuples from heap */
- lazy_vacuum_heap(onerel, vacrelstats);
+ lazy_vacuum_heap(onerel, lvstate);
/*
- * Forget the now-vacuumed tuples, and press on, but be careful
- * not to reset latestRemovedXid since we want that value to be
- * valid.
+ * Here we've done vacuum on the heap and index and we are going
+ * to begin the next round scan on heap. Wait until all vacuum worker
+ * finished vacuum. After all vacuum workers finished, forget the
+ * now-vacuumed tuples, and press on, but be careful not to reset
+ * latestRemoveXid since we want that value to be valid.
*/
- vacrelstats->num_dead_tuples = 0;
- vacrelstats->num_index_scans++;
+ lazy_end_vacuum(lvstate);
/* Report that we are once again scanning the heap */
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_PHASE_SCAN_HEAP);
+
+ vacrelstats->num_index_scans++;
}
/*
@@ -771,7 +933,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
* it's OK to skip vacuuming pages we get a lock conflict on. They
* will be dealt with in some future vacuum.
*/
- if (!aggressive && !FORCE_CHECK_PAGE())
+ if (!aggressive && !FORCE_CHECK_PAGE(blkno))
{
ReleaseBuffer(buf);
vacrelstats->pinskipped_pages++;
@@ -923,7 +1085,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
has_dead_tuples = false;
nfrozen = 0;
hastup = false;
- prev_dead_count = vacrelstats->num_dead_tuples;
+ prev_dead_count = lvstate->dtctl->dt_count;
maxoff = PageGetMaxOffsetNumber(page);
/*
@@ -962,7 +1124,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
*/
if (ItemIdIsDead(itemid))
{
- lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+ lazy_record_dead_tuple(lvstate, &(tuple.t_self));
all_visible = false;
continue;
}
@@ -1067,7 +1229,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
if (tupgone)
{
- lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+ lazy_record_dead_tuple(lvstate, &(tuple.t_self));
HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
&vacrelstats->latestRemovedXid);
tups_vacuumed += 1;
@@ -1132,13 +1294,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
/*
* If there are no indexes then we can vacuum the page right now
- * instead of doing a second scan.
+ * instead of doing a second scan. Because each parallel worker uses its
+ * own dead tuple area they can vacuum independently.
*/
- if (nindexes == 0 &&
- vacrelstats->num_dead_tuples > 0)
+ if (Irel == NULL && lvstate->dtctl->dt_count > 0)
{
/* Remove tuples from heap */
- lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
+ lazy_vacuum_page(onerel, blkno, buf, 0, lvstate, &vmbuffer);
has_dead_tuples = false;
/*
@@ -1146,7 +1308,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
* not to reset latestRemovedXid since we want that value to be
* valid.
*/
- vacrelstats->num_dead_tuples = 0;
+ lvstate->dtctl->dt_count = 0;
+
vacuumed_pages++;
}
@@ -1249,12 +1412,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
* page, so remember its free space as-is. (This path will always be
* taken if there are no indexes.)
*/
- if (vacrelstats->num_dead_tuples == prev_dead_count)
+ if (lvstate->dtctl->dt_count == prev_dead_count)
RecordPageWithFreeSpace(onerel, blkno, freespace);
}
/* report that everything is scanned and vacuumed */
- pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
+ pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED,
+ RelationGetNumberOfBlocks(onerel));
pfree(frozen);
@@ -1264,10 +1428,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
vacrelstats->new_dead_tuples = nkeep;
/* now we can compute the new value for pg_class.reltuples */
- vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
- nblocks,
- vacrelstats->tupcount_pages,
- num_tuples);
+ if (!lvstate->parallel_mode)
+ vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
+ nblocks,
+ vacrelstats->tupcount_pages,
+ num_tuples);
/*
* Release any remaining pin on visibility map page.
@@ -1280,7 +1445,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
/* If any tuples need to be deleted, perform final vacuum cycle */
/* XXX put a threshold on min number of tuples here? */
- if (vacrelstats->num_dead_tuples > 0)
+ if (lvstate->dtctl->dt_count > 0)
{
const int hvp_index[] = {
PROGRESS_VACUUM_PHASE,
@@ -1288,6 +1453,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
};
int64 hvp_val[2];
+ /*
+ * Here we're about to vacuum the table and indexes actually. Before
+ * entering vacuum state, we have to wait for other vacuum worker to
+ * reach here.
+ */
+ lazy_prepare_vacuum(lvstate);
+
/* Log cleanup info before we touch indexes */
vacuum_log_cleanup_info(onerel, vacrelstats);
@@ -1297,9 +1469,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
/* Remove index entries */
for (i = 0; i < nindexes; i++)
- lazy_vacuum_index(Irel[i],
- &indstats[i],
- vacrelstats);
+ {
+ if (IsAssignedIndex(i, lvstate->pstate))
+ lazy_vacuum_index(Irel[i], &indstats[i], lvstate);
+ }
/* Report that we are now vacuuming the heap */
hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP;
@@ -1309,18 +1482,25 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
/* Remove tuples from heap */
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_PHASE_VACUUM_HEAP);
- lazy_vacuum_heap(onerel, vacrelstats);
+
+ lazy_vacuum_heap(onerel, lvstate);
+
vacrelstats->num_index_scans++;
}
/* report all blocks vacuumed; and that we're cleaning up */
- pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
+ pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED,
+ RelationGetNumberOfBlocks(onerel));
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
/* Do post-vacuum cleanup and statistics update for each index */
for (i = 0; i < nindexes; i++)
- lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+ {
+ if (IsAssignedIndex(i, lvstate->pstate))
+ lazy_cleanup_index(Irel[i], indstats[i], lvstate->vacrelstats,
+ lvstate->parallel_mode ? &(lvstate->indstats[i]) : NULL);
+ }
/* If no indexes, make log report that lazy_vacuum_heap would've made */
if (vacuumed_pages)
@@ -1329,6 +1509,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
RelationGetRelationName(onerel),
tups_vacuumed, vacuumed_pages)));
+ lv_endscan(lvscan);
+
/*
* This is pretty messy, but we split it up so that we can skip emitting
* individual parts of the message when not applicable.
@@ -1362,6 +1544,35 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
pfree(buf.data);
}
+/*
+ * gather_vacuum_stats() -- Gather vacuum statistics from workers
+ */
+static void
+lazy_gather_vacuum_stats(ParallelContext *pcxt, LVRelStats *vacrelstats)
+{
+ int i;
+ LVRelStats *lvstats_list;
+
+ lvstats_list = (LVRelStats *) shm_toc_lookup(pcxt->toc, VACUUM_KEY_VACUUM_STATS, false);
+
+ /* Gather each worker stats */
+ for (i = 0; i < pcxt->nworkers_launched; i++)
+ {
+ LVRelStats *wstats = (LVRelStats*) ((char *) lvstats_list + sizeof(LVRelStats) * i);
+
+ vacrelstats->scanned_pages += wstats->scanned_pages;
+ vacrelstats->pinskipped_pages += wstats->pinskipped_pages;
+ vacrelstats->frozenskipped_pages += wstats->frozenskipped_pages;
+ vacrelstats->scanned_tuples += wstats->scanned_tuples;
+ vacrelstats->new_dead_tuples += wstats->new_dead_tuples;
+ vacrelstats->pages_removed += wstats->pages_removed;
+ vacrelstats->tuples_deleted += wstats->tuples_deleted;
+ vacrelstats->nonempty_pages += wstats->nonempty_pages;
+ }
+
+ /* all vacuum workers have same value of rel_pages */
+ vacrelstats->rel_pages = lvstats_list->rel_pages;
+}
/*
* lazy_vacuum_heap() -- second pass over the heap
@@ -1375,18 +1586,27 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
* process index entry removal in batches as large as possible.
*/
static void
-lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
+lazy_vacuum_heap(Relation onerel, LVState *lvstate)
{
int tupindex;
int npages;
PGRUsage ru0;
+ BlockNumber prev_tblk;
Buffer vmbuffer = InvalidBuffer;
+ ItemPointer deadtuples = lvstate->deadtuples;
+ LVDeadTupleCtl *dtctl = lvstate->dtctl;
+ BlockNumber ntuples = 0;
+ StringInfoData buf;
pg_rusage_init(&ru0);
npages = 0;
tupindex = 0;
- while (tupindex < vacrelstats->num_dead_tuples)
+
+ elog(WARNING, "min blk %u, max blk %u", ItemPointerGetBlockNumber(&deadtuples[0]),
+ ItemPointerGetBlockNumber(&deadtuples[dtctl->dt_count - 1]));
+
+ while (tupindex < dtctl->dt_count)
{
BlockNumber tblk;
Buffer buf;
@@ -1395,7 +1615,42 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
vacuum_delay_point();
- tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+ /*
+ * If the dead tuple TIDs are shared among with all vacuum workers,
+ * we acquire the lock and advance tupindex before vacuuming.
+ *
+ * NB: The number of maximum tuple can be stored into single
+ * page is not a large number in most cases. We can use spinlock
+ * here.
+ */
+ if (IsDeadTupleShared(lvstate))
+ {
+ SpinLockAcquire(&(dtctl->mutex));
+
+ tupindex = dtctl->dt_index;
+
+ if (tupindex >= dtctl->dt_count)
+ {
+ SpinLockRelease(&(dtctl->mutex));
+ break;
+ }
+
+ /* Advance dtct->dt_index */
+ for (prev_tblk = tblk = ItemPointerGetBlockNumber(&deadtuples[tupindex]);
+ dtctl->dt_index < dtctl->dt_count;
+ dtctl->dt_index++)
+ {
+ tblk = ItemPointerGetBlockNumber(&deadtuples[dtctl->dt_index]);
+ if (prev_tblk != tblk)
+ break;
+
+ ntuples++;
+ }
+
+ SpinLockRelease(&(dtctl->mutex));
+ }
+
+ tblk = ItemPointerGetBlockNumber(&deadtuples[tupindex]);
buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
vac_strategy);
if (!ConditionalLockBufferForCleanup(buf))
@@ -1404,7 +1659,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
++tupindex;
continue;
}
- tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats,
+ tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, lvstate,
&vmbuffer);
/* Now that we've compacted the page, record its available space */
@@ -1422,10 +1677,12 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
vmbuffer = InvalidBuffer;
}
+ initStringInfo(&buf);
+ appendStringInfo(&buf,
+ "\"%s\": removed %d row versions in %d pages",
+ RelationGetRelationName(onerel), ntuples, npages);
ereport(elevel,
- (errmsg("\"%s\": removed %d row versions in %d pages",
- RelationGetRelationName(onerel),
- tupindex, npages),
+ (errmsg("%s", buf.data),
errdetail_internal("%s", pg_rusage_show(&ru0))));
}
@@ -1435,34 +1692,32 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
*
* Caller must hold pin and buffer cleanup lock on the buffer.
*
- * tupindex is the index in vacrelstats->dead_tuples of the first dead
- * tuple for this page. We assume the rest follow sequentially.
- * The return value is the first tupindex after the tuples of this page.
*/
static int
lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
- int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
+ int tupindex, LVState *lvstate, Buffer *vmbuffer)
{
Page page = BufferGetPage(buffer);
OffsetNumber unused[MaxOffsetNumber];
int uncnt = 0;
TransactionId visibility_cutoff_xid;
bool all_frozen;
+ LVRelStats *vacrelstats = lvstate->vacrelstats;
pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
START_CRIT_SECTION();
- for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
+ for (; tupindex < lvstate->dtctl->dt_count; tupindex++)
{
BlockNumber tblk;
OffsetNumber toff;
ItemId itemid;
- tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+ tblk = ItemPointerGetBlockNumber(&lvstate->deadtuples[tupindex]);
if (tblk != blkno)
break; /* past end of tuples for this block */
- toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
+ toff = ItemPointerGetOffsetNumber(&lvstate->deadtuples[tupindex]);
itemid = PageGetItemId(page, toff);
ItemIdSetUnused(itemid);
unused[uncnt++] = toff;
@@ -1587,14 +1842,15 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
* lazy_vacuum_index() -- vacuum one index relation.
*
* Delete all the index entries pointing to tuples listed in
- * vacrelstats->dead_tuples, and update running statistics.
+ * lvstate->deadtuples, and update running statistics.
*/
static void
lazy_vacuum_index(Relation indrel,
IndexBulkDeleteResult **stats,
- LVRelStats *vacrelstats)
+ LVState *lvstate)
{
IndexVacuumInfo ivinfo;
+ StringInfoData buf;
PGRUsage ru0;
pg_rusage_init(&ru0);
@@ -1603,17 +1859,19 @@ lazy_vacuum_index(Relation indrel,
ivinfo.analyze_only = false;
ivinfo.estimated_count = true;
ivinfo.message_level = elevel;
- ivinfo.num_heap_tuples = vacrelstats->old_rel_tuples;
+ ivinfo.num_heap_tuples = lvstate->vacrelstats->old_rel_tuples;
ivinfo.strategy = vac_strategy;
/* Do bulk deletion */
- *stats = index_bulk_delete(&ivinfo, *stats,
- lazy_tid_reaped, (void *) vacrelstats);
+ *stats = index_bulk_delete(&ivinfo, *stats, lazy_tid_reaped, (void *) lvstate);
+
+ initStringInfo(&buf);
+ appendStringInfo(&buf,
+ "scanned index \"%s\" to remove %d row versions",
+ RelationGetRelationName(indrel), lvstate->dtctl->dt_count);
ereport(elevel,
- (errmsg("scanned index \"%s\" to remove %d row versions",
- RelationGetRelationName(indrel),
- vacrelstats->num_dead_tuples),
+ (errmsg("%s", buf.data),
errdetail_internal("%s", pg_rusage_show(&ru0))));
}
@@ -1621,11 +1879,11 @@ lazy_vacuum_index(Relation indrel,
* lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
*/
static void
-lazy_cleanup_index(Relation indrel,
- IndexBulkDeleteResult *stats,
- LVRelStats *vacrelstats)
+lazy_cleanup_index(Relation indrel, IndexBulkDeleteResult *stats,
+ LVRelStats *vacrelstats, LVIndStats *indstat)
{
IndexVacuumInfo ivinfo;
+ StringInfoData buf;
PGRUsage ru0;
pg_rusage_init(&ru0);
@@ -1639,34 +1897,52 @@ lazy_cleanup_index(Relation indrel,
stats = index_vacuum_cleanup(&ivinfo, stats);
+ /* Will be updated by leader process after vacuumed */
+ if (indstat)
+ indstat->updated = false;
+
if (!stats)
return;
/*
* Now update statistics in pg_class, but only if the index says the count
- * is accurate.
+ * is accurate. In parallel lazy vacuum, the worker can not update these
+ * information by itself, so save to DSM and then the launcher process
+ * updates it later.
*/
if (!stats->estimated_count)
- vac_update_relstats(indrel,
- stats->num_pages,
- stats->num_index_tuples,
- 0,
- false,
- InvalidTransactionId,
- InvalidMultiXactId,
- false);
+ {
+ if (indstat)
+ {
+ indstat->updated = true;
+ indstat->num_pages = stats->num_pages;
+ indstat->num_tuples = stats->num_index_tuples;
+ }
+ else
+ vac_update_relstats(indrel,
+ stats->num_pages,
+ stats->num_index_tuples,
+ 0,
+ false,
+ InvalidTransactionId,
+ InvalidMultiXactId,
+ false);
+ }
+ initStringInfo(&buf);
+ appendStringInfo(&buf,
+ "index \"%s\" now contains %.0f row versions in %u pages",
+ RelationGetRelationName(indrel),
+ stats->num_index_tuples,
+ stats->num_pages);
ereport(elevel,
- (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
- RelationGetRelationName(indrel),
- stats->num_index_tuples,
- stats->num_pages),
- errdetail("%.0f index row versions were removed.\n"
- "%u index pages have been deleted, %u are currently reusable.\n"
- "%s.",
- stats->tuples_removed,
- stats->pages_deleted, stats->pages_free,
- pg_rusage_show(&ru0))));
+ (errmsg("%s", buf.data),
+ errdetail("%.0f index row versions were removed.\n"
+ "%u index pages have been deleted, %u are currently reusable.\n"
+ "%s.",
+ stats->tuples_removed,
+ stats->pages_deleted, stats->pages_free,
+ pg_rusage_show(&ru0))));
pfree(stats);
}
@@ -1976,59 +2252,70 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
/*
* lazy_space_alloc - space allocation decisions for lazy vacuum
*
+ * In parallel lazy vacuum the space for dead tuple locations are already
+ * allocated in dynamic shared memory, so we allocate space for dead tuple
+ * locations in local memory only when in not parallel lazy vacuum and set
+ * MyDeadTuple.
+ *
* See the comments at the head of this file for rationale.
*/
static void
-lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+lazy_space_alloc(LVState *lvstate, BlockNumber relblocks)
{
- long maxtuples;
- int vac_work_mem = IsAutoVacuumWorkerProcess() &&
- autovacuum_work_mem != -1 ?
- autovacuum_work_mem : maintenance_work_mem;
-
- if (vacrelstats->hasindex)
- {
- maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
- maxtuples = Min(maxtuples, INT_MAX);
- maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
+ long maxtuples;
- /* curious coding here to ensure the multiplication can't overflow */
- if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks)
- maxtuples = relblocks * LAZY_ALLOC_TUPLES;
+ /*
+ * In parallel mode, we already set the pointer to dead tuple
+ * array when initialize.
+ */
+ if (lvstate->parallel_mode && lvstate->vacrelstats->nindexes > 0)
+ return;
- /* stay sane if small maintenance_work_mem */
- maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
- }
- else
- {
- maxtuples = MaxHeapTuplesPerPage;
- }
+ maxtuples = lazy_get_max_dead_tuples(lvstate->vacrelstats);
- vacrelstats->num_dead_tuples = 0;
- vacrelstats->max_dead_tuples = (int) maxtuples;
- vacrelstats->dead_tuples = (ItemPointer)
- palloc(maxtuples * sizeof(ItemPointerData));
+ /*
+ * If in not parallel lazy vacuum, we need to allocate dead
+ * tuple array in local memory.
+ */
+ lvstate->deadtuples = palloc0(sizeof(ItemPointerData) * (int)maxtuples);
+ lvstate->dtctl = (LVDeadTupleCtl *) palloc(sizeof(LVDeadTupleCtl));
+ lvstate->dtctl->dt_max = lazy_get_max_dead_tuples(lvstate->vacrelstats);
+ lvstate->dtctl->dt_count = 0;
}
/*
* lazy_record_dead_tuple - remember one deletable tuple
+ *
+ * Acquiring the spinlock before remember is required if the dead tuple
+ * TIDs are shared with other vacuum workers.
*/
static void
-lazy_record_dead_tuple(LVRelStats *vacrelstats,
- ItemPointer itemptr)
+lazy_record_dead_tuple(LVState *lvstate, ItemPointer itemptr)
{
+ LVDeadTupleCtl *dtctl = lvstate->dtctl;
+
+ if (IsDeadTupleShared(lvstate))
+ SpinLockAcquire(&(dtctl->mutex));
+
+ if (dtctl->dt_count >= dtctl->dt_max)
+ elog(ERROR, "dead tuple array overflow");
+
/*
* The array must never overflow, since we rely on all deletable tuples
* being removed; inability to remove a tuple might cause an old XID to
* persist beyond the freeze limit, which could be disastrous later on.
*/
- if (vacrelstats->num_dead_tuples >= vacrelstats->max_dead_tuples)
- elog(ERROR, "dead tuple array overflow");
+ if (dtctl->dt_count < dtctl->dt_max)
+ {
+
+ lvstate->deadtuples[dtctl->dt_count] = *itemptr;
+ (dtctl->dt_count)++;
+ pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
+ dtctl->dt_count);
+ }
- vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
- vacrelstats->num_dead_tuples++;
- pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
- vacrelstats->num_dead_tuples);
+ if (IsDeadTupleShared(lvstate))
+ SpinLockRelease(&(dtctl->mutex));
}
/*
@@ -2041,16 +2328,23 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats,
static bool
lazy_tid_reaped(ItemPointer itemptr, void *state)
{
- LVRelStats *vacrelstats = (LVRelStats *) state;
+ LVState *lvstate = (LVState *) state;
ItemPointer res;
+ /*
+ * We can assume that the dead tuple TIDs are sorted by TID location
+ * even when we shared the dead tuple TIDs with other vacuum workers.
+ */
res = (ItemPointer) bsearch((void *) itemptr,
- (void *) vacrelstats->dead_tuples,
- vacrelstats->num_dead_tuples,
+ (void *) lvstate->deadtuples,
+ lvstate->dtctl->dt_count,
sizeof(ItemPointerData),
vac_cmp_itemptr);
- return (res != NULL);
+ if (res != NULL)
+ return true;
+
+ return false;
}
/*
@@ -2194,3 +2488,627 @@ heap_page_is_all_visible(Relation rel, Buffer buf,
return all_visible;
}
+
+/*
+ * Return the block number we need to scan next, or InvalidBlockNumber if scan
+ * is done.
+ *
+ * Except when aggressive is set, we want to skip pages that are
+ * all-visible according to the visibility map, but only when we can skip
+ * at least SKIP_PAGES_THRESHOLD consecutive pages. Since we're reading
+ * sequentially, the OS should be doing readahead for us, so there's no
+ * gain in skipping a page now and then; that's likely to disable
+ * readahead and so be counterproductive. Also, skipping even a single
+ * page means that we can't update relfrozenxid, so we only want to do it
+ * if we can skip a goodly number of pages.
+ *
+ * When aggressive is set, we can't skip pages just because they are
+ * all-visible, but we can still skip pages that are all-frozen, since
+ * such pages do not need freezing and do not affect the value that we can
+ * safely set for relfrozenxid or relminmxid.
+ *
+ * Before entering the main loop, establish the invariant that
+ * next_unskippable_block is the next block number >= blkno that we can't
+ * skip based on the visibility map, either all-visible for a regular scan
+ * or all-frozen for an aggressive scan. We set it to nblocks if there's
+ * no such block. We also set up the skipping_blocks flag correctly at
+ * this stage.
+ *
+ * In not parallel mode, before entering the main loop, establish the
+ * invariant that next_unskippable_block is the next block number >= blkno
+ * that's not we can't skip based on the visibility map, either all-visible
+ * for a regular scan or all-frozen for an aggressive scan. We set it to
+ * nblocks if there's no such block. We also set up the skipping_blocks
+ * flag correctly at this stage.
+ *
+ * In parallel mode, pstate is not NULL. We scan heap pages
+ * using parallel heap scan description. Each worker calls heap_parallelscan_nextpage()
+ * in order to exclusively get block number we need to scan at next.
+ * If given block is all-visible according to visibility map, we skip to
+ * scan this block immediately unlike not parallel lazy scan.
+ *
+ * Note: The value returned by visibilitymap_get_status could be slightly
+ * out-of-date, since we make this test before reading the corresponding
+ * heap page or locking the buffer. This is OK. If we mistakenly think
+ * that the page is all-visible or all-frozen when in fact the flag's just
+ * been cleared, we might fail to vacuum the page. It's easy to see that
+ * skipping a page when aggressive is not set is not a very big deal; we
+ * might leave some dead tuples lying around, but the next vacuum will
+ * find them. But even when aggressive *is* set, it's still OK if we miss
+ * a page whose all-frozen marking has just been cleared. Any new XIDs
+ * just added to that page are necessarily newer than the GlobalXmin we
+ * Computed, so they'll have no effect on the value to which we can safely
+ * set relfrozenxid. A similar argument applies for MXIDs and relminmxid.
+ *
+ * We will scan the table's last page, at least to the extent of
+ * determining whether it has tuples or not, even if it should be skipped
+ * according to the above rules; except when we've already determined that
+ * it's not worth trying to truncate the table. This avoids having
+ * lazy_truncate_heap() take access-exclusive lock on the table to attempt
+ * a truncation that just fails immediately because there are tuples in
+ * the last page. This is worth avoiding mainly because such a lock must
+ * be replayed on any hot standby, where it can be disruptive.
+ */
+static BlockNumber
+lazy_scan_get_nextpage(Relation onerel, LVState *lvstate,
+ LVScanDesc lvscan, bool *all_visible_according_to_vm,
+ Buffer *vmbuffer, int options, bool aggressive)
+{
+ BlockNumber blkno;
+ LVRelStats *vacrelstats = lvstate->vacrelstats;
+
+ if (lvstate->parallel_mode)
+ {
+ /*
+ * In parallel lazy vacuum since it's hard to know how many consecutive
+ * all-visible pages exits on table we skip to scan the heap page immediately.
+ * if it is all-visible page.
+ */
+ while ((blkno = heap_parallelscan_nextpage(lvscan->heapscan)) != InvalidBlockNumber)
+ {
+ *all_visible_according_to_vm = false;
+ vacuum_delay_point();
+
+ /* Consider to skip scan page according visibility map */
+ if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0 &&
+ !FORCE_CHECK_PAGE(blkno))
+ {
+ uint8 vmstatus;
+
+ vmstatus = visibilitymap_get_status(onerel, blkno, vmbuffer);
+
+ if (aggressive)
+ {
+ if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0)
+ {
+ vacrelstats->frozenskipped_pages++;
+ continue;
+ }
+ else if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0)
+ *all_visible_according_to_vm = true;
+ }
+ else
+ {
+ if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0)
+ {
+ if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0)
+ vacrelstats->frozenskipped_pages++;
+ continue;
+ }
+ }
+ }
+
+ /* We need to scan current blkno, break */
+ break;
+ }
+ }
+ else
+ {
+ bool skipping_blocks = false;
+
+ /* Initialize lv_nextunskippable_page if needed */
+ if (lvscan->lv_cblock == 0 && (options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
+ {
+ while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks)
+ {
+ uint8 vmstatus;
+
+ vmstatus = visibilitymap_get_status(onerel, lvscan->lv_next_unskippable_block,
+ vmbuffer);
+ if (aggressive)
+ {
+ if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
+ break;
+ }
+ else
+ {
+ if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
+ break;
+ }
+ vacuum_delay_point();
+ lvscan->lv_next_unskippable_block++;
+ }
+
+ if (lvscan->lv_next_unskippable_block >= SKIP_PAGES_THRESHOLD)
+ skipping_blocks = true;
+ else
+ skipping_blocks = false;
+ }
+
+ /* Decide the block number we need to scan */
+ for (blkno = lvscan->lv_cblock; blkno < lvscan->lv_nblocks; blkno++)
+ {
+ if (blkno == lvscan->lv_next_unskippable_block)
+ {
+ /* Time to advance next_unskippable_block */
+ lvscan->lv_next_unskippable_block++;
+ if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0)
+ {
+ while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks)
+ {
+ uint8 vmstatus;
+
+ vmstatus = visibilitymap_get_status(onerel,
+ lvscan->lv_next_unskippable_block,
+ vmbuffer);
+ if (aggressive)
+ {
+ if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
+ break;
+ }
+ else
+ {
+ if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
+ break;
+ }
+ vacuum_delay_point();
+ lvscan->lv_next_unskippable_block++;
+ }
+ }
+
+ /*
+ * We know we can't skip the current block. But set up
+ * skipping_all_visible_blocks to do the right thing at the
+ * following blocks.
+ */
+ if (lvscan->lv_next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD)
+ skipping_blocks = true;
+ else
+ skipping_blocks = false;
+
+ /*
+ * Normally, the fact that we can't skip this block must mean that
+ * it's not all-visible. But in an aggressive vacuum we know only
+ * that it's not all-frozen, so it might still be all-visible.
+ */
+ if (aggressive && VM_ALL_VISIBLE(onerel, blkno, vmbuffer))
+ *all_visible_according_to_vm = true;
+
+ /* Found out that next unskippable block number */
+ break;
+ }
+ else
+ {
+ /*
+ * The current block is potentially skippable; if we've seen a
+ * long enough run of skippable blocks to justify skipping it, and
+ * we're not forced to check it, then go ahead and skip.
+ * Otherwise, the page must be at least all-visible if not
+ * all-frozen, so we can set all_visible_according_to_vm = true.
+ */
+ if (skipping_blocks && !FORCE_CHECK_PAGE(blkno))
+ {
+ /*
+ * Tricky, tricky. If this is in aggressive vacuum, the page
+ * must have been all-frozen at the time we checked whether it
+ * was skippable, but it might not be any more. We must be
+ * careful to count it as a skipped all-frozen page in that
+ * case, or else we'll think we can't update relfrozenxid and
+ * relminmxid. If it's not an aggressive vacuum, we don't
+ * know whether it was all-frozen, so we have to recheck; but
+ * in this case an approximate answer is OK.
+ */
+ if (aggressive || VM_ALL_FROZEN(onerel, blkno, vmbuffer))
+ vacrelstats->frozenskipped_pages++;
+ continue;
+ }
+
+ *all_visible_according_to_vm = true;
+
+ /* We need to scan current blkno, break */
+ break;
+ }
+ } /* for */
+
+ /* Advance the current block number for the next scan */
+ lvscan->lv_cblock = blkno + 1;
+ }
+
+ return (blkno == lvscan->lv_nblocks) ? InvalidBlockNumber : blkno;
+}
+
+/*
+ * Begin lazy vacuum scan. lvscan->heapscan is NULL if
+ * we're not in parallel lazy vacuum.
+ */
+static LVScanDesc
+lv_beginscan(Relation onerel, ParallelHeapScanDesc pscan)
+{
+ LVScanDesc lvscan;
+
+ lvscan = (LVScanDesc) palloc(sizeof(LVScanDescData));
+
+ lvscan->lv_cblock = 0;
+ lvscan->lv_next_unskippable_block = 0;
+ lvscan->lv_nblocks = RelationGetNumberOfBlocks(onerel);
+
+ if (pscan != NULL)
+ {
+ lvscan->heapscan = heap_beginscan_parallel(onerel, pscan);
+ heap_parallelscan_startblock_init(lvscan->heapscan);
+ }
+ else
+ lvscan->heapscan = NULL;
+
+ return lvscan;
+}
+
+/*
+ * End lazy vacuum scan.
+ */
+static void
+lv_endscan(LVScanDesc lvscan)
+{
+ if (lvscan->heapscan != NULL)
+ heap_endscan(lvscan->heapscan);
+ pfree(lvscan);
+}
+
+/* ----------------------------------------------------------------
+ * Parallel Lazy Vacuum Support
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * Estimate storage for parallel lazy vacuum.
+ */
+static void
+lazy_estimate_dsm(ParallelContext *pcxt, LVRelStats *vacrelstats)
+{
+ Size size = 0;
+ int keys = 0;
+ int vacuum_workers = pcxt->nworkers + 1;
+ long maxtuples = lazy_get_max_dead_tuples(vacrelstats);
+
+ /* Estimate size for parallel heap scan */
+ size += heap_parallelscan_estimate(SnapshotAny);
+ keys++;
+
+ /* Estimate size for vacuum statistics for only workers*/
+ size += BUFFERALIGN(mul_size(sizeof(LVRelStats), pcxt->nworkers));
+ keys++;
+
+ /* We have to share dead tuple information only when the table has indexes */
+ if (vacrelstats->nindexes > 0)
+ {
+ /* Estimate size for index statistics */
+ size += BUFFERALIGN(mul_size(sizeof(LVIndStats), vacrelstats->nindexes));
+ keys++;
+
+ /* Estimate size for dead tuple control */
+ size += BUFFERALIGN(sizeof(LVDeadTupleCtl));
+ keys++;
+
+ /* Estimate size for dead tuple array */
+ size += BUFFERALIGN(mul_size(
+ mul_size(sizeof(ItemPointerData), maxtuples),
+ vacuum_workers));
+ keys++;
+ }
+
+ /* Estimate size for parallel lazy vacuum state */
+ size += BUFFERALIGN(sizeof(LVParallelState));
+ keys++;
+
+ /* Estimate size for vacuum task */
+ size += BUFFERALIGN(sizeof(VacuumInfo));
+ keys++;
+
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, keys);
+}
+
+/*
+ * Initialize dynamic shared memory for parallel lazy vacuum. We store
+ * relevant informations of parallel heap scanning, dead tuple array
+ * and vacuum statistics for each worker and some parameters for lazy vacuum.
+ */
+static void
+lazy_initialize_dsm(ParallelContext *pcxt, Relation onerel, LVState *lvstate,
+ int options, bool aggressive)
+{
+ ParallelHeapScanDesc pscan_ptr;
+ ItemPointer deadtuples_ptr;
+ char *lvrelstats_ptr;
+ LVParallelState *pstate_ptr;
+ LVIndStats *indstats_ptr;
+ LVDeadTupleCtl *dtctl_ptr;
+ int i;
+ int deadtuples_size;
+ int lvrelstats_size;
+ int vacuum_workers = pcxt->nworkers + 1;
+ long max_tuples = lazy_get_max_dead_tuples(lvstate->vacrelstats);
+
+ /* Allocate and initialize DSM for vacuum stats for each worker */
+ lvrelstats_size = mul_size(sizeof(LVRelStats), pcxt->nworkers);
+ lvrelstats_ptr = shm_toc_allocate(pcxt->toc, lvrelstats_size);
+ for (i = 0; i < pcxt->nworkers; i++)
+ {
+ char *start;
+
+ start = lvrelstats_ptr + i * sizeof(LVRelStats);
+ memcpy(start, lvstate->vacrelstats, sizeof(LVRelStats));
+ }
+ shm_toc_insert(pcxt->toc, VACUUM_KEY_VACUUM_STATS, lvrelstats_ptr);
+
+ if (lvstate->vacrelstats->nindexes > 0)
+ {
+ /* Allocate and initialize DSM for dead tuple control */
+ dtctl_ptr = (LVDeadTupleCtl *) shm_toc_allocate(pcxt->toc, sizeof(LVDeadTupleCtl));
+ SpinLockInit(&(dtctl_ptr->mutex));
+ dtctl_ptr->dt_max = max_tuples * vacuum_workers;
+ dtctl_ptr->dt_count = 0;
+ dtctl_ptr->dt_index = 0;
+ lvstate->dtctl = dtctl_ptr;
+ shm_toc_insert(pcxt->toc, VACUUM_KEY_DEAD_TUPLE_CTL, dtctl_ptr);
+
+ /* Allocate and initialize DSM for dead tuple array */
+ deadtuples_size = mul_size(mul_size(sizeof(ItemPointerData), max_tuples),
+ vacuum_workers);
+ deadtuples_ptr = (ItemPointer) shm_toc_allocate(pcxt->toc,
+ deadtuples_size);
+ shm_toc_insert(pcxt->toc, VACUUM_KEY_DEAD_TUPLES, deadtuples_ptr);
+ lvstate->deadtuples = deadtuples_ptr;
+
+ /* Allocate DSM for index statistics */
+ indstats_ptr = (LVIndStats *) shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(LVIndStats),
+ lvstate->vacrelstats->nindexes));
+ shm_toc_insert(pcxt->toc, VACUUM_KEY_INDEX_STATS, indstats_ptr);
+ lvstate->indstats = indstats_ptr;
+ }
+
+ /* Allocate and initialize DSM for parallel scan description */
+ pscan_ptr = (ParallelHeapScanDesc) shm_toc_allocate(pcxt->toc,
+ heap_parallelscan_estimate(SnapshotAny));
+ shm_toc_insert(pcxt->toc, VACUUM_KEY_PARALLEL_SCAN, pscan_ptr);
+ heap_parallelscan_initialize(pscan_ptr, onerel, SnapshotAny);
+ lvstate->pscan = pscan_ptr;
+
+ /* Allocate and initialize DSM for parallel vacuum state */
+ pstate_ptr = (LVParallelState *) shm_toc_allocate(pcxt->toc, sizeof(LVParallelState));
+ shm_toc_insert(pcxt->toc, VACUUM_KEY_PARALLEL_STATE, pstate_ptr);
+
+ ConditionVariableInit(&(pstate_ptr->cv));
+ SpinLockInit(&(pstate_ptr->mutex));
+ pstate_ptr->nworkers = vacuum_workers;
+ pstate_ptr->state = VACSTATE_SCAN;
+ pstate_ptr->info.aggressive = aggressive;
+ pstate_ptr->info.options = options;
+ pstate_ptr->info.oldestxmin = OldestXmin;
+ pstate_ptr->info.freezelimit = FreezeLimit;
+ pstate_ptr->info.multixactcutoff = MultiXactCutoff;
+ pstate_ptr->info.elevel = elevel;
+ lvstate->pstate = pstate_ptr;
+}
+
+/*
+ * Initialize parallel lazy vacuum for worker.
+ */
+static LVState *
+lazy_initialize_worker(shm_toc *toc)
+{
+ LVState *lvstate;
+ char *lvstats;
+
+ lvstate = (LVState *) palloc(sizeof(LVState));
+ lvstate->parallel_mode = true;
+
+ /* Set up vacuum stats */
+ lvstats = shm_toc_lookup(toc, VACUUM_KEY_VACUUM_STATS, false);
+ lvstate->vacrelstats = (LVRelStats *) (lvstats +
+ sizeof(LVRelStats) * ParallelWorkerNumber);
+
+ if (lvstate->vacrelstats->nindexes > 0)
+ {
+ /* Set up dead tuple control */
+ lvstate->dtctl = (LVDeadTupleCtl *) shm_toc_lookup(toc, VACUUM_KEY_DEAD_TUPLE_CTL, false);
+
+ /* Set up dead tuple array */
+ lvstate->deadtuples = (ItemPointer) shm_toc_lookup(toc, VACUUM_KEY_DEAD_TUPLES, false);
+
+ /* Set up index statistics */
+ lvstate->indstats = (LVIndStats *) shm_toc_lookup(toc, VACUUM_KEY_INDEX_STATS, false);
+ }
+
+ /* Set up parallel vacuum state */
+ lvstate->pstate = (LVParallelState *) shm_toc_lookup(toc, VACUUM_KEY_PARALLEL_STATE, false);
+
+ /* Set up parallel heap scan description */
+ lvstate->pscan = (ParallelHeapScanDesc) shm_toc_lookup(toc, VACUUM_KEY_PARALLEL_SCAN, false);
+
+ /* Set up parameters for lazy vacuum */
+ OldestXmin = lvstate->pstate->info.oldestxmin;
+ FreezeLimit = lvstate->pstate->info.freezelimit;
+ MultiXactCutoff = lvstate->pstate->info.multixactcutoff;
+ elevel = lvstate->pstate->info.elevel;
+
+ return lvstate;
+}
+
+/*
+ * In the end of actual vacuumming on table and indexes actually, we have
+ * to wait for other all vacuum workers to reach here before clearing dead
+ * tuple TIDs information.
+ */
+static void
+lazy_end_vacuum(LVState *lvstate)
+{
+ LVParallelState *pstate = lvstate->pstate;
+ bool counted = false;
+
+ /* Exit if in not parallel vacuum */
+ if (!lvstate->parallel_mode)
+ {
+ lvstate->dtctl->dt_count = 0;
+ return;
+ }
+
+ while (true)
+ {
+ int finish_count;
+ int state;
+
+ SpinLockAcquire(&(pstate->mutex));
+
+ /* Fetch shared information */
+ if (!counted)
+ pstate->finish_count++;
+ finish_count = pstate->finish_count;
+ state = pstate->state;
+
+ SpinLockRelease(&(pstate->mutex));
+
+ if (state == VACSTATE_SCAN)
+ break;
+
+ /*
+ * Wake up other workers if counted up if first time to reach here and
+ * is a parallel worker.
+ */
+ if (!counted && IsParallelWorker())
+ ConditionVariableBroadcast(&pstate->cv);
+
+ counted = true;
+
+ /*
+ * If all launched parallel vacuum workers reached here, we can clear the
+ * dead tuple TIDs information.
+ */
+ if (!IsParallelWorker() && finish_count == (lvstate->pcxt->nworkers_launched + 1))
+ {
+ /* Clear dead tuples */
+ lvstate->dtctl->dt_count = 0;
+
+ SpinLockAcquire(&pstate->mutex);
+ pstate->finish_count = 0;
+ pstate->state = VACSTATE_SCAN;
+ SpinLockRelease(&pstate->mutex);
+
+ ConditionVariableBroadcast(&pstate->cv);
+ break;
+ }
+
+ ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_VACUUM_DONE);
+ }
+
+ ConditionVariableCancelSleep();
+}
+
+/*
+ * Before starting actual vacuuming on table and indexes, we have to wait for
+ * other all vacuum workers so that all worker can see the same dead tuple TIDs
+ * information when vacuuming.
+ */
+static void
+lazy_prepare_vacuum(LVState *lvstate)
+{
+ LVParallelState *pstate = lvstate->pstate;
+ bool counted = false;
+
+ /* Exit if in not parallel vacuum */
+ if (!lvstate->parallel_mode)
+ return;
+
+ while (true)
+ {
+ int finish_count;
+ int state;
+
+ SpinLockAcquire(&(pstate->mutex));
+
+ if (!counted)
+ pstate->finish_count++;
+ state = pstate->state;
+ finish_count = pstate->finish_count;
+
+ SpinLockRelease(&(pstate->mutex));
+
+ if (state == VACSTATE_VACUUM)
+ break;
+
+ /*
+ * Wake up other workers if counted up if first time to reach here and
+ * is a parallel worker.
+ */
+ if (!counted && IsParallelWorker())
+ ConditionVariableBroadcast(&pstate->cv);
+
+ counted = true;
+
+ /*
+ * The leader process can change parallel vacuum state if all workers
+ * have reached here.
+ */
+ if (!IsParallelWorker() && finish_count == (lvstate->pcxt->nworkers_launched + 1))
+ {
+ qsort((void *) lvstate->deadtuples, lvstate->dtctl->dt_count,
+ sizeof(ItemPointerData), vac_cmp_itemptr);
+
+ SpinLockAcquire(&pstate->mutex);
+ pstate->finish_count = 0;
+ pstate->state = VACSTATE_VACUUM;
+ SpinLockRelease(&pstate->mutex);
+
+ ConditionVariableBroadcast(&pstate->cv);
+ break;
+ }
+
+ ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_VACUUM_PREPARE);
+ }
+
+ ConditionVariableCancelSleep();
+}
+
+/*
+ * Return the number of maximum dead tuples can be stored according
+ * to vac_work_mem.
+ */
+static long
+lazy_get_max_dead_tuples(LVRelStats *vacrelstats)
+{
+ long maxtuples;
+ int vac_work_mem = IsAutoVacuumWorkerProcess() &&
+ autovacuum_work_mem != -1 ?
+ autovacuum_work_mem : maintenance_work_mem;
+
+ if (vacrelstats->nindexes != 0)
+ {
+ maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
+ maxtuples = Min(maxtuples, INT_MAX);
+ maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
+
+ /* curious coding here to ensure the multiplication can't overflow */
+ if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > vacrelstats->old_rel_pages)
+ maxtuples = vacrelstats->old_rel_pages * LAZY_ALLOC_TUPLES;
+
+ /* stay sane if small maintenance_work_mem */
+ maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
+ }
+ else
+ {
+ maxtuples = MaxHeapTuplesPerPage;
+ }
+
+ return maxtuples;
+}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 7a70001..8cba9c8 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1662,7 +1662,12 @@ _equalDropdbStmt(const DropdbStmt *a, const DropdbStmt *b)
static bool
_equalVacuumStmt(const VacuumStmt *a, const VacuumStmt *b)
{
- COMPARE_SCALAR_FIELD(options);
+ if (a->options.flags != b->options.flags)
+ return false;
+
+ if (a->options.nworkers != b->options.nworkers)
+ return false;
+
COMPARE_NODE_FIELD(rels);
return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 4c83a63..399640f 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -187,6 +187,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
bool *deferrable, bool *initdeferred, bool *not_valid,
bool *no_inherit, core_yyscan_t yyscanner);
static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
+static VacuumOptions *makeVacOpt(VacuumOption flag, int nworkers);
%}
@@ -237,6 +238,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
struct ImportQual *importqual;
InsertStmt *istmt;
VariableSetStmt *vsetstmt;
+ VacuumOptions *vacopts;
PartitionElem *partelem;
PartitionSpec *partspec;
PartitionBoundSpec *partboundspec;
@@ -305,7 +307,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
create_extension_opt_item alter_extension_opt_item
%type opt_lock lock_type cast_context
-%type vacuum_option_list vacuum_option_elem
+%type vacuum_option_list vacuum_option_elem
%type opt_or_replace
opt_grant_grant_option opt_grant_admin_option
opt_nowait opt_if_exists opt_with_data
@@ -10152,32 +10154,40 @@ cluster_index_specification:
VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_vacuum_relation_list
{
VacuumStmt *n = makeNode(VacuumStmt);
- n->options = VACOPT_VACUUM;
+ VacuumOptions *vacopts = makeVacOpt(VACOPT_VACUUM, 0);
if ($2)
- n->options |= VACOPT_FULL;
+ vacopts->flags |= VACOPT_FULL;
if ($3)
- n->options |= VACOPT_FREEZE;
+ vacopts->flags |= VACOPT_FREEZE;
if ($4)
- n->options |= VACOPT_VERBOSE;
+ vacopts->flags |= VACOPT_VERBOSE;
+
n->rels = $5;
+ n->options.flags = vacopts->flags;
+ n->options.nworkers = 0;
$$ = (Node *)n;
+ pfree(vacopts);
}
| VACUUM opt_full opt_freeze opt_verbose AnalyzeStmt
{
VacuumStmt *n = (VacuumStmt *) $5;
- n->options |= VACOPT_VACUUM;
+ n->options.flags |= VACOPT_VACUUM;
if ($2)
- n->options |= VACOPT_FULL;
+ n->options.flags |= VACOPT_FULL;
if ($3)
- n->options |= VACOPT_FREEZE;
+ n->options.flags |= VACOPT_FREEZE;
if ($4)
- n->options |= VACOPT_VERBOSE;
+ n->options.flags |= VACOPT_VERBOSE;
+ n->options.nworkers = 0;
$$ = (Node *)n;
}
| VACUUM '(' vacuum_option_list ')' opt_vacuum_relation_list
{
VacuumStmt *n = makeNode(VacuumStmt);
- n->options = VACOPT_VACUUM | $3;
+ VacuumOptions *vacopts = $3;
+
+ n->options.flags = vacopts->flags | VACOPT_VACUUM;
+ n->options.nworkers = vacopts->nworkers;
n->rels = $5;
$$ = (Node *) n;
}
@@ -10185,18 +10195,38 @@ VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_vacuum_relation_list
vacuum_option_list:
vacuum_option_elem { $$ = $1; }
- | vacuum_option_list ',' vacuum_option_elem { $$ = $1 | $3; }
+ | vacuum_option_list ',' vacuum_option_elem
+ {
+ VacuumOptions *vacopts1 = (VacuumOptions *)$1;
+ VacuumOptions *vacopts2 = (VacuumOptions *)$3;
+
+ vacopts1->flags |= vacopts2->flags;
+ if (vacopts2->flags == VACOPT_PARALLEL)
+ vacopts1->nworkers = vacopts2->nworkers;
+
+ $$ = vacopts1;
+ pfree(vacopts2);
+ }
;
vacuum_option_elem:
- analyze_keyword { $$ = VACOPT_ANALYZE; }
- | VERBOSE { $$ = VACOPT_VERBOSE; }
- | FREEZE { $$ = VACOPT_FREEZE; }
- | FULL { $$ = VACOPT_FULL; }
+ analyze_keyword { $$ = makeVacOpt(VACOPT_ANALYZE, 0); }
+ | VERBOSE { $$ = makeVacOpt(VACOPT_VERBOSE, 0); }
+ | FREEZE { $$ = makeVacOpt(VACOPT_FREEZE, 0); }
+ | FULL { $$ = makeVacOpt(VACOPT_FULL, 0); }
+ | PARALLEL ICONST
+ {
+ if ($2 < 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("parallel vacuum degree must be more than 1"),
+ parser_errposition(@1)));
+ $$ = makeVacOpt(VACOPT_PARALLEL, $2);
+ }
| IDENT
{
if (strcmp($1, "disable_page_skipping") == 0)
- $$ = VACOPT_DISABLE_PAGE_SKIPPING;
+ $$ = makeVacOpt(VACOPT_DISABLE_PAGE_SKIPPING, 0);
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -10208,11 +10238,16 @@ vacuum_option_elem:
AnalyzeStmt: analyze_keyword opt_verbose opt_vacuum_relation_list
{
VacuumStmt *n = makeNode(VacuumStmt);
- n->options = VACOPT_ANALYZE;
+ VacuumOptions *vacopts = makeVacOpt(VACOPT_ANALYZE, 0);
+
if ($2)
- n->options |= VACOPT_VERBOSE;
+ vacopts->flags |= VACOPT_VERBOSE;
+
+ n->options.flags = vacopts->flags;
+ n->options.nworkers = 0;
n->rels = $3;
$$ = (Node *)n;
+ pfree(vacopts);
}
;
@@ -15915,6 +15950,16 @@ makeRecursiveViewSelect(char *relname, List *aliases, Node *query)
return (Node *) s;
}
+static VacuumOptions *
+makeVacOpt(VacuumOption flag, int nworkers)
+{
+ VacuumOptions *vacopt = palloc(sizeof(VacuumOptions));
+
+ vacopt->flags = flag;
+ vacopt->nworkers = nworkers;
+ return vacopt;
+}
+
/* parser_init()
* Initialize to parse one query string
*/
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index c04c0b5..6d89052 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -189,7 +189,7 @@ typedef struct av_relation
typedef struct autovac_table
{
Oid at_relid;
- int at_vacoptions; /* bitmask of VacuumOption */
+ VacuumOptions at_vacoptions; /* contains bitmask of VacuumOption */
VacuumParams at_params;
int at_vacuum_cost_delay;
int at_vacuum_cost_limit;
@@ -2466,7 +2466,7 @@ do_autovacuum(void)
* next table in our list.
*/
HOLD_INTERRUPTS();
- if (tab->at_vacoptions & VACOPT_VACUUM)
+ if (tab->at_vacoptions.flags & VACOPT_VACUUM)
errcontext("automatic vacuum of table \"%s.%s.%s\"",
tab->at_datname, tab->at_nspname, tab->at_relname);
else
@@ -2818,6 +2818,7 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
int vac_cost_limit;
int vac_cost_delay;
int log_min_duration;
+ int vacuum_parallel_workers;
/*
* Calculate the vacuum cost parameters and the freeze ages. If there
@@ -2864,13 +2865,19 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
? avopts->multixact_freeze_table_age
: default_multixact_freeze_table_age;
+ vacuum_parallel_workers = (avopts &&
+ avopts->vacuum_parallel_workers >= 0)
+ ? avopts->vacuum_parallel_workers
+ : 0;
+
tab = palloc(sizeof(autovac_table));
tab->at_relid = relid;
tab->at_sharedrel = classForm->relisshared;
- tab->at_vacoptions = VACOPT_SKIPTOAST |
+ tab->at_vacoptions.flags = VACOPT_SKIPTOAST |
(dovacuum ? VACOPT_VACUUM : 0) |
(doanalyze ? VACOPT_ANALYZE : 0) |
(!wraparound ? VACOPT_NOWAIT : 0);
+ tab->at_vacoptions.nworkers = vacuum_parallel_workers;
tab->at_params.freeze_min_age = freeze_min_age;
tab->at_params.freeze_table_age = freeze_table_age;
tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age;
@@ -3116,10 +3123,10 @@ autovac_report_activity(autovac_table *tab)
int len;
/* Report the command and possible options */
- if (tab->at_vacoptions & VACOPT_VACUUM)
+ if (tab->at_vacoptions.flags & VACOPT_VACUUM)
snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
"autovacuum: VACUUM%s",
- tab->at_vacoptions & VACOPT_ANALYZE ? " ANALYZE" : "");
+ tab->at_vacoptions.flags & VACOPT_ANALYZE ? " ANALYZE" : "");
else
snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
"autovacuum: ANALYZE");
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 3a0b49c..34d8a5c 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3210,6 +3210,25 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
pgstat_increment_changecount_after(beentry);
}
+/*-----------
+ * pgstat_report_leader_pid() -
+ *
+ * Report process id of the leader process that this backend is involved
+ * with.
+ */
+void
+pgstat_report_leader_pid(int pid)
+{
+ volatile PgBackendStatus *beentry = MyBEEntry;
+
+ if (!beentry)
+ return;
+
+ pgstat_increment_changecount_before(beentry);
+ beentry->st_leader_pid = pid;
+ pgstat_increment_changecount_after(beentry);
+}
+
/* ----------
* pgstat_read_current_status() -
*
@@ -3610,6 +3629,12 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
event_name = "ParallelBitmapScan";
break;
+ case WAIT_EVENT_PARALLEL_VACUUM_PREPARE:
+ event_name = "ParallelVacuumPrepare";
+ break;
+ case WAIT_EVENT_PARALLEL_VACUUM_DONE:
+ event_name = "ParallelVacuumDone";
+ break;
case WAIT_EVENT_PROCARRAY_GROUP_UPDATE:
event_name = "ProcArrayGroupUpdate";
break;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 82a707a..ead81ff 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -669,7 +669,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
VacuumStmt *stmt = (VacuumStmt *) parsetree;
/* we choose to allow this during "read only" transactions */
- PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
+ PreventCommandDuringRecovery((stmt->options.flags & VACOPT_VACUUM) ?
"VACUUM" : "ANALYZE");
/* forbidden in parallel mode due to CommandIsReadOnly */
ExecVacuum(stmt, isTopLevel);
@@ -2498,7 +2498,7 @@ CreateCommandTag(Node *parsetree)
break;
case T_VacuumStmt:
- if (((VacuumStmt *) parsetree)->options & VACOPT_VACUUM)
+ if (((VacuumStmt *) parsetree)->options.flags & VACOPT_VACUUM)
tag = "VACUUM";
else
tag = "ANALYZE";
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 8d9e7c1..463f4d2 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -439,7 +439,7 @@ pg_stat_get_backend_idset(PG_FUNCTION_ARGS)
Datum
pg_stat_get_progress_info(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_PROGRESS_COLS PGSTAT_NUM_PROGRESS_PARAM + 3
+#define PG_STAT_GET_PROGRESS_COLS PGSTAT_NUM_PROGRESS_PARAM + 4
int num_backends = pgstat_fetch_stat_numbackends();
int curr_backend;
char *cmd = text_to_cstring(PG_GETARG_TEXT_PP(0));
@@ -516,14 +516,16 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
if (has_privs_of_role(GetUserId(), beentry->st_userid))
{
values[2] = ObjectIdGetDatum(beentry->st_progress_command_target);
+ values[3] = Int32GetDatum(beentry->st_leader_pid);
for (i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++)
- values[i + 3] = Int64GetDatum(beentry->st_progress_param[i]);
+ values[i + 4] = Int64GetDatum(beentry->st_progress_param[i]);
}
else
{
nulls[2] = true;
+ nulls[3] = true;
for (i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++)
- nulls[i + 3] = true;
+ nulls[i + 4] = true;
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 294ab70..159f2e4 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -2040,7 +2040,6 @@ EstimateSnapshotSpace(Snapshot snap)
Size size;
Assert(snap != InvalidSnapshot);
- Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
/* We allocate any XID arrays needed in the same palloc block. */
size = add_size(sizeof(SerializedSnapshotData),
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index a09c49d..4b44afe 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2008,6 +2008,7 @@ psql_completion(const char *text, int start, int end)
"autovacuum_multixact_freeze_max_age",
"autovacuum_multixact_freeze_min_age",
"autovacuum_multixact_freeze_table_age",
+ "autovacuum_vacuum_parallel_workers",
"autovacuum_vacuum_cost_delay",
"autovacuum_vacuum_cost_limit",
"autovacuum_vacuum_scale_factor",
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9f4367d..cbda797 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -132,6 +132,8 @@ extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
Relation relation, Snapshot snapshot);
extern void heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan);
extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
+extern void heap_parallelscan_startblock_init(HeapScanDesc scan);
+extern BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
extern bool heap_fetch(Relation relation, Snapshot snapshot,
HeapTuple tuple, Buffer *userbuf, bool keep_buf,
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 93c031a..5353beb 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2879,7 +2879,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 0 f
DESCR("statistics: currently active backend IDs");
DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f f f t s r 1 0 2249 "23" "{23,26,23,26,25,25,25,25,25,1184,1184,1184,1184,869,25,23,28,28,25,16,25,25,23,16,25}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,sslclientdn}" _null_ _null_ pg_stat_get_activity _null_ _null_ _null_ ));
DESCR("statistics: information about currently active backends");
-DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
+DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,23,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,leader_pid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
DESCR("statistics: information about progress of backends running maintenance command");
DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 7a7b793..e7950d5 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -15,6 +15,8 @@
#define VACUUM_H
#include "access/htup.h"
+#include "access/heapam.h"
+#include "access/parallel.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_type.h"
#include "nodes/parsenodes.h"
@@ -157,7 +159,7 @@ extern int vacuum_multixact_freeze_table_age;
/* in commands/vacuum.c */
extern void ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel);
-extern void vacuum(int options, List *relations, VacuumParams *params,
+extern void vacuum(VacuumOptions options, List *relations, VacuumParams *params,
BufferAccessStrategy bstrategy, bool isTopLevel);
extern void vac_open_indexes(Relation relation, LOCKMODE lockmode,
int *nindexes, Relation **Irel);
@@ -187,8 +189,9 @@ extern void vac_update_datfrozenxid(void);
extern void vacuum_delay_point(void);
/* in commands/vacuumlazy.c */
-extern void lazy_vacuum_rel(Relation onerel, int options,
+extern void lazy_vacuum_rel(Relation onerel, VacuumOptions options,
VacuumParams *params, BufferAccessStrategy bstrategy);
+extern void LazyVacuumWorkerMain(dsm_segment *seg, shm_toc *toc);
/* in commands/analyze.c */
extern void analyze_rel(Oid relid, RangeVar *relation, int options,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 732e5d6..a32917b 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3095,9 +3095,16 @@ typedef enum VacuumOption
VACOPT_FULL = 1 << 4, /* FULL (non-concurrent) vacuum */
VACOPT_NOWAIT = 1 << 5, /* don't wait to get lock (autovacuum only) */
VACOPT_SKIPTOAST = 1 << 6, /* don't process the TOAST table, if any */
- VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7 /* don't skip any pages */
+ VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7, /* don't skip any pages */
+ VACOPT_PARALLEL = 1 << 8 /* do VACUUM parallelly */
} VacuumOption;
+typedef struct VacuumOptions
+{
+ VacuumOption flags; /* OR of VacuumOption flags */
+ int nworkers; /* # of parallel vacuum workers */
+} VacuumOptions;
+
/*
* Info about a single target table of VACUUM/ANALYZE.
*
@@ -3116,7 +3123,7 @@ typedef struct VacuumRelation
typedef struct VacuumStmt
{
NodeTag type;
- int options; /* OR of VacuumOption flags */
+ VacuumOptions options;
List *rels; /* list of VacuumRelation, or NIL for all */
} VacuumStmt;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 089b7c3..5860b4a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -811,6 +811,8 @@ typedef enum
WAIT_EVENT_MQ_SEND,
WAIT_EVENT_PARALLEL_FINISH,
WAIT_EVENT_PARALLEL_BITMAP_SCAN,
+ WAIT_EVENT_PARALLEL_VACUUM_PREPARE,
+ WAIT_EVENT_PARALLEL_VACUUM_DONE,
WAIT_EVENT_PROCARRAY_GROUP_UPDATE,
WAIT_EVENT_CLOG_GROUP_UPDATE,
WAIT_EVENT_REPLICATION_ORIGIN_DROP,
@@ -1014,13 +1016,17 @@ typedef struct PgBackendStatus
/*
* Command progress reporting. Any command which wishes can advertise
- * that it is running by setting st_progress_command,
+ * that it is running by setting st_leaderpid, st_progress_command,
* st_progress_command_target, and st_progress_param[].
* st_progress_command_target should be the OID of the relation which the
* command targets (we assume there's just one, as this is meant for
* utility commands), but the meaning of each element in the
* st_progress_param array is command-specific.
+ * st_leader_pid can be used for command progress reporting of parallel
+ * operation. Setting by the leader's pid of parallel operation we can
+ * group them in progress reporting SQL.
*/
+ int st_leader_pid;
ProgressCommandType st_progress_command;
Oid st_progress_command_target;
int64 st_progress_param[PGSTAT_NUM_PROGRESS_PARAM];
@@ -1180,6 +1186,7 @@ extern void pgstat_report_activity(BackendState state, const char *cmd_str);
extern void pgstat_report_tempfile(size_t filesize);
extern void pgstat_report_appname(const char *appname);
extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
+extern void pgstat_report_leader_pid(int pid);
extern const char *pgstat_get_wait_event(uint32 wait_event_info);
extern const char *pgstat_get_wait_event_type(uint32 wait_event_info);
extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 4bc61e5..2f44d39 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -268,6 +268,7 @@ typedef struct AutoVacOpts
int multixact_freeze_min_age;
int multixact_freeze_max_age;
int multixact_freeze_table_age;
+ int vacuum_parallel_workers;
int log_min_duration;
float8 vacuum_scale_factor;
float8 analyze_scale_factor;
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index c440c7e..b43ef0a 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -80,6 +80,9 @@ CONTEXT: SQL function "do_analyze" statement 1
SQL function "wrap_do_analyze" statement 1
VACUUM FULL vactst;
VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (PARALLEL 2) vactst;
+VACUUM (PARALLEL 2, DISABLE_PAGE_SKIPPING) vactst;
+VACUUM (PARALLEL 2, FREEZE) vactst;
-- partitioned table
CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);
CREATE TABLE vacparted1 PARTITION OF vacparted FOR VALUES IN (1);
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 92eaca2..ab9bc4c 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -61,6 +61,9 @@ VACUUM FULL vaccluster;
VACUUM FULL vactst;
VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (PARALLEL 2) vactst;
+VACUUM (PARALLEL 2, DISABLE_PAGE_SKIPPING) vactst;
+VACUUM (PARALLEL 2, FREEZE) vactst;
-- partitioned table
CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);