diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml index e712226..59e90cb 100644 --- a/doc/src/sgml/ref/vacuum.sgml +++ b/doc/src/sgml/ref/vacuum.sgml @@ -30,6 +30,7 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ table_and_columns is: @@ -142,6 +143,20 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ N + + + Execute VACUUM in parallel by N + background workers. Collecting garbage on table is processed + in block-level parallel. For tables with indexes, parallel vacuum assigns each + index to each parallel vacuum worker and all garbages on a index are processed + by particular parallel vacuum worker. This option can not use with FULL + option. + + + + + DISABLE_PAGE_SKIPPING diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index c435482..8e64650 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -91,7 +91,6 @@ static HeapScanDesc heap_beginscan_internal(Relation relation, bool is_samplescan, bool temp_snap); static void heap_parallelscan_startblock_init(HeapScanDesc scan); -static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, @@ -1715,7 +1714,7 @@ retry: * first backend gets an InvalidBlockNumber return. * ---------------- */ -static BlockNumber +BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan) { BlockNumber page; diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index c6f7b7a..dd22bb9 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -20,6 +20,7 @@ #include "access/xlog.h" #include "catalog/namespace.h" #include "commands/async.h" +#include "commands/vacuum.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -122,6 +123,9 @@ static const struct { { "ParallelQueryMain", ParallelQueryMain + }, + { + "LazyVacuumWorkerMain", LazyVacuumWorkerMain } }; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index f439b55..4c9ac4f 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -74,7 +74,7 @@ static void vac_truncate_clog(TransactionId frozenXID, MultiXactId minMulti, TransactionId lastSaneFrozenXid, MultiXactId lastSaneMinMulti); -static bool vacuum_rel(Oid relid, RangeVar *relation, int options, +static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumOptions options, VacuumParams *params); /* @@ -89,15 +89,15 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel) VacuumParams params; /* sanity checks on options */ - Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE)); - Assert((vacstmt->options & VACOPT_VACUUM) || - !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE))); - Assert(!(vacstmt->options & VACOPT_SKIPTOAST)); + Assert(vacstmt->options.flags & (VACOPT_VACUUM | VACOPT_ANALYZE)); + Assert((vacstmt->options.flags & VACOPT_VACUUM) || + !(vacstmt->options.flags & (VACOPT_FULL | VACOPT_FREEZE))); + Assert(!(vacstmt->options.flags & VACOPT_SKIPTOAST)); /* * Make sure VACOPT_ANALYZE is specified if any column lists are present. */ - if (!(vacstmt->options & VACOPT_ANALYZE)) + if (!(vacstmt->options.flags & VACOPT_ANALYZE)) { ListCell *lc; @@ -116,7 +116,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel) * All freeze ages are zero if the FREEZE option is given; otherwise pass * them as -1 which means to use the default values. */ - if (vacstmt->options & VACOPT_FREEZE) + if (vacstmt->options.flags & VACOPT_FREEZE) { params.freeze_min_age = 0; params.freeze_table_age = 0; @@ -163,7 +163,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel) * memory context that will not disappear at transaction commit. */ void -vacuum(int options, List *relations, VacuumParams *params, +vacuum(VacuumOptions options, List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, bool isTopLevel) { static bool in_vacuum = false; @@ -174,7 +174,7 @@ vacuum(int options, List *relations, VacuumParams *params, Assert(params != NULL); - stmttype = (options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; + stmttype = (options.flags & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; /* * We cannot run VACUUM inside a user transaction block; if we were inside @@ -184,7 +184,7 @@ vacuum(int options, List *relations, VacuumParams *params, * * ANALYZE (without VACUUM) can run either way. */ - if (options & VACOPT_VACUUM) + if (options.flags & VACOPT_VACUUM) { PreventTransactionChain(isTopLevel, stmttype); in_outer_xact = false; @@ -206,17 +206,26 @@ vacuum(int options, List *relations, VacuumParams *params, /* * Sanity check DISABLE_PAGE_SKIPPING option. */ - if ((options & VACOPT_FULL) != 0 && - (options & VACOPT_DISABLE_PAGE_SKIPPING) != 0) + if ((options.flags & VACOPT_FULL) != 0 && + (options.flags & VACOPT_DISABLE_PAGE_SKIPPING) != 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL"))); /* + * Sanity check PARALLEL option. + */ + if ((options.flags & VACOPT_FULL) != 0 && + (options.flags & VACOPT_PARALLEL) != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("VACUUM option PARALLEL cannnot be used with FULL"))); + + /* * Send info about dead objects to the statistics collector, unless we are * in autovacuum --- autovacuum.c does this for itself. */ - if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess()) + if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess()) pgstat_vacuum_stat(); /* @@ -281,11 +290,11 @@ vacuum(int options, List *relations, VacuumParams *params, * transaction block, and also in an autovacuum worker, use own * transactions so we can release locks sooner. */ - if (options & VACOPT_VACUUM) + if (options.flags & VACOPT_VACUUM) use_own_xacts = true; else { - Assert(options & VACOPT_ANALYZE); + Assert(options.flags & VACOPT_ANALYZE); if (IsAutoVacuumWorkerProcess()) use_own_xacts = true; else if (in_outer_xact) @@ -335,13 +344,13 @@ vacuum(int options, List *relations, VacuumParams *params, { VacuumRelation *vrel = lfirst_node(VacuumRelation, cur); - if (options & VACOPT_VACUUM) + if (options.flags & VACOPT_VACUUM) { if (!vacuum_rel(vrel->oid, vrel->relation, options, params)) continue; } - if (options & VACOPT_ANALYZE) + if (options.flags & VACOPT_ANALYZE) { /* * If using separate xacts, start one for analyze. Otherwise, @@ -354,7 +363,7 @@ vacuum(int options, List *relations, VacuumParams *params, PushActiveSnapshot(GetTransactionSnapshot()); } - analyze_rel(vrel->oid, vrel->relation, options, params, + analyze_rel(vrel->oid, vrel->relation, options.flags, params, vrel->va_cols, in_outer_xact, vac_strategy); if (use_own_xacts) @@ -390,7 +399,7 @@ vacuum(int options, List *relations, VacuumParams *params, StartTransactionCommand(); } - if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess()) + if ((options.flags & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess()) { /* * Update pg_database.datfrozenxid, and truncate pg_xact if possible. @@ -1318,7 +1327,7 @@ vac_truncate_clog(TransactionId frozenXID, * At entry and exit, we are not inside a transaction. */ static bool -vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) +vacuum_rel(Oid relid, RangeVar *relation, VacuumOptions options, VacuumParams *params) { LOCKMODE lmode; Relation onerel; @@ -1339,7 +1348,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) */ PushActiveSnapshot(GetTransactionSnapshot()); - if (!(options & VACOPT_FULL)) + if (!(options.flags & VACOPT_FULL)) { /* * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets @@ -1379,7 +1388,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either * way, we can be sure that no other backend is vacuuming the same table. */ - lmode = (options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock; + lmode = (options.flags & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock; /* * Open the relation and get the appropriate lock on it. @@ -1390,7 +1399,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) * If we've been asked not to wait for the relation lock, acquire it first * in non-blocking mode, before calling try_relation_open(). */ - if (!(options & VACOPT_NOWAIT)) + if (!(options.flags & VACOPT_NOWAIT)) onerel = try_relation_open(relid, lmode); else if (ConditionalLockRelationOid(relid, lmode)) onerel = try_relation_open(relid, NoLock); @@ -1510,7 +1519,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) * us to process it. In VACUUM FULL, though, the toast table is * automatically rebuilt by cluster_rel so we shouldn't recurse to it. */ - if (!(options & VACOPT_SKIPTOAST) && !(options & VACOPT_FULL)) + if (!(options.flags & VACOPT_SKIPTOAST) && !(options.flags & VACOPT_FULL)) toast_relid = onerel->rd_rel->reltoastrelid; else toast_relid = InvalidOid; @@ -1529,7 +1538,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) /* * Do the actual work --- either FULL or "lazy" vacuum */ - if (options & VACOPT_FULL) + if (options.flags & VACOPT_FULL) { /* close relation before vacuuming, but hold lock until commit */ relation_close(onerel, NoLock); @@ -1537,7 +1546,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */ cluster_rel(relid, InvalidOid, false, - (options & VACOPT_VERBOSE) != 0); + (options.flags & VACOPT_VERBOSE) != 0); } else lazy_vacuum_rel(onerel, options, params, vac_strategy); @@ -1591,8 +1600,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params) * hit dangling index pointers. */ void -vac_open_indexes(Relation relation, LOCKMODE lockmode, - int *nindexes, Relation **Irel) +vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel) { List *indexoidlist; ListCell *indexoidscan; diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c index 30b1c08..f9be38e 100644 --- a/src/backend/commands/vacuumlazy.c +++ b/src/backend/commands/vacuumlazy.c @@ -22,6 +22,20 @@ * of index scans performed. So we don't use maintenance_work_mem memory for * the TID array, just enough to hold as many heap tuples as fit on one page. * + * In PostgreSQL 11, we support a parallel option for lazy vacuum. In parallel + * lazy vacuum, multiple vacuum worker processes get blocks in parallel using + * parallel heap scan and process them. If a table with indexes the parallel + * vacuum workers vacuum the heap and indexes in parallel. Also, since dead + * tuple TIDs is shared with all vacuum processes including the leader process + * the parallel vacuum processes have to make two synchronization points in + * lazy vacuum processing: when before starting vacuum and when before clearing + * dead tuple TIDs. In these two points the leader treats dead tuple TIDs as + * an arbiter. The information required by parallel lazy vacuum such as the + * statistics of table, parallel heap scan description have to be shared with + * all vacuum processes, and table statistics are funneled by the leader + * process after finished. Note that dead tuple TIDs need to be shared only + * when the table has indexes. For table with no indexes, each parallel worker + * processes blocks and vacuum them independently. * * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -41,8 +55,10 @@ #include "access/heapam_xlog.h" #include "access/htup_details.h" #include "access/multixact.h" +#include "access/relscan.h" #include "access/transam.h" #include "access/visibilitymap.h" +#include "access/xact.h" #include "access/xlog.h" #include "catalog/catalog.h" #include "catalog/storage.h" @@ -54,6 +70,7 @@ #include "portability/instr_time.h" #include "postmaster/autovacuum.h" #include "storage/bufmgr.h" +#include "storage/condition_variable.h" #include "storage/freespace.h" #include "storage/lmgr.h" #include "utils/lsyscache.h" @@ -62,6 +79,7 @@ #include "utils/timestamp.h" #include "utils/tqual.h" +//#define PLV_TIME /* * Space/time tradeoff parameters: do these need to be user-tunable? @@ -103,10 +121,81 @@ */ #define PREFETCH_SIZE ((BlockNumber) 32) +/* DSM key for parallel lazy vacuum */ +#define VACUUM_KEY_PARALLEL_SCAN UINT64CONST(0xFFFFFFFFFFF00001) +#define VACUUM_KEY_VACUUM_STATS UINT64CONST(0xFFFFFFFFFFF00002) +#define VACUUM_KEY_INDEX_STATS UINT64CONST(0xFFFFFFFFFFF00003) +#define VACUUM_KEY_DEAD_TUPLE_CTL UINT64CONST(0xFFFFFFFFFFF00004) +#define VACUUM_KEY_DEAD_TUPLES UINT64CONST(0xFFFFFFFFFFF00005) +#define VACUUM_KEY_PARALLEL_STATE UINT64CONST(0xFFFFFFFFFFF00006) + +/* + * see note of lazy_scan_heap_get_nextpage about forcing scanning of + * last page + */ +#define FORCE_CHECK_PAGE(blk) \ + (blkno == (blk - 1) && should_attempt_truncation(vacrelstats)) + +/* Check if given index is assigned to this parallel vacuum worker */ +#define IsAssignedIndex(i, pstate) \ + (pstate == NULL || \ + (((i) % ((LVParallelState *) (pstate))->nworkers -1 ) == ParallelWorkerNumber)) + +#define IsDeadTupleShared(lvstate) \ + ((LVState *)(lvstate))->parallel_mode && \ + ((LVState *)(lvstate))->vacrelstats->nindexes > 0 + +/* Vacuum worker state for parallel lazy vacuum */ +#define VACSTATE_SCAN 0x1 /* heap scan phase */ +#define VACSTATE_VACUUM 0x2 /* vacuuming on table and index */ + +/* + * Vacuum relevant options and thresholds we need share with parallel + * vacuum workers. + */ +typedef struct VacuumInfo +{ + int options; /* VACUUM options */ + bool aggressive; /* does each worker need to aggressive vacuum? */ + TransactionId oldestxmin; + TransactionId freezelimit; + MultiXactId multixactcutoff; + int elevel; +} VacuumInfo; + +/* Struct for index statistics that are used for parallel lazy vacuum */ +typedef struct LVIndStats +{ + bool updated; /* need to be updated? */ + BlockNumber num_pages; + BlockNumber num_tuples; +} LVIndStats; + +/* Struct for parallel lazy vacuum state */ +typedef struct LVParallelState +{ + int nworkers; /* # of process doing vacuum */ + VacuumInfo info; + int state; /* current parallel vacuum status */ + int finish_count; + ConditionVariable cv; + slock_t mutex; +} LVParallelState; + +/* Struct for control dead tuple TIDs array */ +typedef struct LVDeadTupleCtl +{ + int dt_max; /* # slots allocated in array */ + int dt_count; /* # of dead tuple */ + + /* Used only for parallel lazy vacuum */ + int dt_index; + slock_t mutex; +} LVDeadTupleCtl; + typedef struct LVRelStats { - /* hasindex = true means two-pass strategy; false means one-pass */ - bool hasindex; + int nindexes; /* > 0 means two-pass strategy; = 0 means one-pass */ /* Overall statistics about rel */ BlockNumber old_rel_pages; /* previous value of pg_class.relpages */ BlockNumber rel_pages; /* total number of pages */ @@ -118,19 +207,46 @@ typedef struct LVRelStats double old_rel_tuples; /* previous value of pg_class.reltuples */ double new_rel_tuples; /* new estimated total # of tuples */ double new_dead_tuples; /* new estimated total # of dead tuples */ - BlockNumber pages_removed; double tuples_deleted; - BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ - /* List of TIDs of tuples we intend to delete */ - /* NB: this list is ordered by TID address */ - int num_dead_tuples; /* current # of entries */ - int max_dead_tuples; /* # slots allocated in array */ - ItemPointer dead_tuples; /* array of ItemPointerData */ int num_index_scans; + BlockNumber pages_removed; + BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ TransactionId latestRemovedXid; bool lock_waiter_detected; } LVRelStats; +/* Struct for lazy vacuum execution */ +typedef struct LVState +{ + bool parallel_mode; + LVRelStats *vacrelstats; + /* + * Used when both parallel and non-parallel lazy vacuum, but in parallel + * lazy vacuum and table with index, dtctl points to a dynamic shared memory + * and controlled by dtctl struct. + */ + LVDeadTupleCtl *dtctl; + ItemPointer deadtuples; + + /* Used only for parallel lazy vacuum */ + ParallelContext *pcxt; + LVParallelState *pstate; + ParallelHeapScanDesc pscan; + LVIndStats *indstats; +} LVState; + +/* + * Scan description data for lazy vacuum. In parallel lazy vacuum, + * we use only heapscan instead. + */ +typedef struct LVScanDescData +{ + BlockNumber lv_cblock; /* current scanning block number */ + BlockNumber lv_next_unskippable_block; /* next block number we cannot skip */ + BlockNumber lv_nblocks; /* the number blocks of relation */ + HeapScanDesc heapscan; /* field for parallel lazy vacuum */ +} LVScanDescData; +typedef struct LVScanDescData *LVScanDesc; /* A few variables that don't seem worth passing around as parameters */ static int elevel = -1; @@ -141,32 +257,47 @@ static MultiXactId MultiXactCutoff; static BufferAccessStrategy vac_strategy; - -/* non-export function prototypes */ -static void lazy_scan_heap(Relation onerel, int options, - LVRelStats *vacrelstats, Relation *Irel, int nindexes, - bool aggressive); -static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats); +/* nonf-export function prototypes */ +static void lazy_vacuum_heap(Relation onerel, LVState *lvstate); static bool lazy_check_needs_freeze(Buffer buf, bool *hastup); static void lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats, - LVRelStats *vacrelstats); -static void lazy_cleanup_index(Relation indrel, - IndexBulkDeleteResult *stats, - LVRelStats *vacrelstats); + LVState *lvstate); +static void lazy_cleanup_index(Relation indrel, IndexBulkDeleteResult *stats, + LVRelStats *vacrelstats, LVIndStats *indstat); static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, - int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer); + int tupindex, LVState *lvstate, Buffer *vmbuffer); static bool should_attempt_truncation(LVRelStats *vacrelstats); static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats); static BlockNumber count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats); -static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks); -static void lazy_record_dead_tuple(LVRelStats *vacrelstats, - ItemPointer itemptr); +static void lazy_space_alloc(LVState *lvstate, BlockNumber relblocks); +static void lazy_record_dead_tuple(LVState *state, ItemPointer itemptr); static bool lazy_tid_reaped(ItemPointer itemptr, void *state); static int vac_cmp_itemptr(const void *left, const void *right); static bool heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); +static void do_lazy_scan_heap(LVState *lvstate, Relation onerel, Relation *Irels, + int nindexes, int options, bool aggressive); +static void lazy_scan_heap(Relation rel, LVState *lvstate, VacuumOptions options, + bool aggressive); + +/* function prototypes for parallel vacuum */ +static void lazy_gather_vacuum_stats(ParallelContext *pxct, + LVRelStats *valrelstats); +static void lazy_estimate_dsm(ParallelContext *pcxt, LVRelStats *vacrelstats); +static void lazy_initialize_dsm(ParallelContext *pcxt, Relation onrel, + LVState *lvstate, int options, bool aggressive); +static LVState *lazy_initialize_worker(shm_toc *toc); +static LVScanDesc lv_beginscan(Relation onerel, ParallelHeapScanDesc pscan); +static void lv_endscan(LVScanDesc lvscan); +static BlockNumber lazy_scan_get_nextpage(Relation onerel, LVState *lvstate, + LVScanDesc lvscan, + bool *all_visible_according_to_vm, + Buffer *vmbuffer, int options, bool aggressive); +static void lazy_prepare_vacuum(LVState *lvstate); +static void lazy_end_vacuum(LVState *lvstate); +static long lazy_get_max_dead_tuples(LVRelStats *vacrelstats); /* @@ -179,12 +310,11 @@ static bool heap_page_is_all_visible(Relation rel, Buffer buf, * and locked the relation. */ void -lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, +lazy_vacuum_rel(Relation onerel, VacuumOptions options, VacuumParams *params, BufferAccessStrategy bstrategy) { - LVRelStats *vacrelstats; - Relation *Irel; - int nindexes; + LVState *lvstate; + LVRelStats *vacrelstats; PGRUsage ru0; TimestampTz starttime = 0; long secs; @@ -211,7 +341,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, starttime = GetCurrentTimestamp(); } - if (options & VACOPT_VERBOSE) + if (options.flags & VACOPT_VERBOSE) elevel = INFO; else elevel = DEBUG2; @@ -239,10 +369,12 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, xidFullScanLimit); aggressive |= MultiXactIdPrecedesOrEquals(onerel->rd_rel->relminmxid, mxactFullScanLimit); - if (options & VACOPT_DISABLE_PAGE_SKIPPING) + if (options.flags & VACOPT_DISABLE_PAGE_SKIPPING) aggressive = true; + lvstate = (LVState *) palloc0(sizeof(LVState)); vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats)); + lvstate->vacrelstats = vacrelstats; vacrelstats->old_rel_pages = onerel->rd_rel->relpages; vacrelstats->old_rel_tuples = onerel->rd_rel->reltuples; @@ -250,15 +382,8 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, vacrelstats->pages_removed = 0; vacrelstats->lock_waiter_detected = false; - /* Open all indexes of the relation */ - vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel); - vacrelstats->hasindex = (nindexes > 0); - /* Do the vacuuming */ - lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive); - - /* Done with indexes */ - vac_close_indexes(nindexes, Irel, NoLock); + lazy_scan_heap(onerel, lvstate, options, aggressive); /* * Compute whether we actually scanned the all unfrozen pages. If we did, @@ -267,7 +392,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, * NB: We need to check this before truncating the relation, because that * will change ->rel_pages. */ - if ((vacrelstats->scanned_pages + vacrelstats->frozenskipped_pages) + if ((lvstate->vacrelstats->scanned_pages + vacrelstats->frozenskipped_pages) < vacrelstats->rel_pages) { Assert(!aggressive); @@ -329,7 +454,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, new_rel_pages, new_rel_tuples, new_rel_allvisible, - vacrelstats->hasindex, + (vacrelstats->nindexes != 0), new_frozen_xid, new_min_multi, false); @@ -439,28 +564,166 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) } /* - * lazy_scan_heap() -- scan an open heap relation + * If the number of workers is specified more than 0, we enter the parallel lazy + * vacuum mode. In parallel lazy vacuum mode, we initialize a dynamic shared memory + * and launch parallel vacuum workers. The launcher process also vacuums the table + * after launched and then waits for the all vacuum workers to finish. After all vacuum + * workers finished we gather the vacuum statistics of table and indexes, and update + * them. + */ +static void +lazy_scan_heap(Relation onerel, LVState *lvstate, VacuumOptions options, + bool aggressive) +{ + ParallelContext *pcxt; + LVRelStats *vacrelstats = lvstate->vacrelstats; + Relation *Irel; + int nindexes; + + lvstate->parallel_mode = options.nworkers > 0; + + /* Open indexes */ + vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel); + vacrelstats->nindexes = nindexes; + + if (lvstate->parallel_mode) + { + EnterParallelMode(); + + /* Create parallel context and initialize it */ + pcxt = CreateParallelContext("postgres", "LazyVacuumWorkerMain", + options.nworkers); + lvstate->pcxt = pcxt; + + /* Estimate DSM size for parallel vacuum */ + lazy_estimate_dsm(pcxt, lvstate->vacrelstats); + + /* Initialize DSM for parallel vacuum */ + InitializeParallelDSM(pcxt); + lazy_initialize_dsm(pcxt, onerel, lvstate, options.flags, aggressive); + + /* Launch workers */ + LaunchParallelWorkers(pcxt); + } + + do_lazy_scan_heap(lvstate, onerel, Irel, nindexes, options.flags, aggressive); + + /* + * We can update relation statistics such as scanned page after gathered + * statistics from all workers. Also, in parallel mode since we cannot update + * index statistics at the same time the leader process have to do it. + * + * XXX : If we allows workers to update statistics tuples at the same time + * the updating index statistics can be done in lazy_cleanup_index(). + */ + if (lvstate->parallel_mode) + { + int i; + LVIndStats *indstats = palloc(sizeof(LVIndStats) * lvstate->vacrelstats->nindexes); + + /* Wait for workers finished vacuum */ + WaitForParallelWorkersToFinish(pcxt); + + /* Gather the result of vacuum statistics from all workers */ + lazy_gather_vacuum_stats(pcxt, vacrelstats); + + /* Now we can compute the new value for pg_class.reltuples */ + vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false, + vacrelstats->rel_pages, + vacrelstats->scanned_pages, + vacrelstats->scanned_tuples); + + /* Copy new index stats to local memory */ + memcpy(indstats, lvstate->indstats, sizeof(LVIndStats) * vacrelstats->nindexes); + + DestroyParallelContext(pcxt); + ExitParallelMode(); + + /* After exit parallel mode, update index statistics */ + for (i = 0; i < vacrelstats->nindexes; i++) + { + Relation ind = Irel[i]; + LVIndStats *indstat = (LVIndStats *) &(indstats[i]); + + if (indstat->updated) + vac_update_relstats(ind, + indstat->num_pages, + indstat->num_tuples, + 0, + false, + InvalidTransactionId, + InvalidMultiXactId, + false); + } + } + + vac_close_indexes(nindexes, Irel, RowExclusiveLock); +} + +/* + * Entry point of parallel vacuum worker. + */ +void +LazyVacuumWorkerMain(dsm_segment *seg, shm_toc *toc) +{ + LVState *lvstate; + Relation rel; + Relation *indrel; + int nindexes_worker; + + /* Look up dynamic shared memory and initialize */ + lvstate = lazy_initialize_worker(toc); + + Assert(lvstate != NULL); + + rel = relation_open(lvstate->pscan->phs_relid, ShareUpdateExclusiveLock); + + /* Open all indexes */ + vac_open_indexes(rel, RowExclusiveLock, &nindexes_worker, + &indrel); + + /* Do lazy vacuum */ + do_lazy_scan_heap(lvstate, rel, indrel, lvstate->vacrelstats->nindexes, + lvstate->pstate->info.options, lvstate->pstate->info.aggressive); + + vac_close_indexes(lvstate->vacrelstats->nindexes, indrel, RowExclusiveLock); + heap_close(rel, ShareUpdateExclusiveLock); +} + +/* + * do_lazy_scan_heap() -- scan an open heap relation * * This routine prunes each page in the heap, which will among other * things truncate dead tuples to dead line pointers, defragment the - * page, and set commit status bits (see heap_page_prune). It also builds + * page, and set commit status bits (see heap_page_prune). It also uses * lists of dead tuples and pages with free space, calculates statistics * on the number of live tuples in the heap, and marks pages as * all-visible if appropriate. When done, or when we run low on space for - * dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap - * to reclaim dead line pointers. + * dead-tuple TIDs, invoke vacuuming of assigned indexes and call lazy_vacuum_heap + * to reclaim dead line pointers. In parallel vacuum, we need to synchronize + * at where scanning heap finished and vacuuming heap finished. The vacuum + * worker reached to that point first need to wait for other vacuum workers + * reached to the same point. + * + * In parallel lazy scan, we get next page number using parallel heap scan. + * Since the dead tuple TIDs are shared with all vacuum workers, we have to + * wait for all other workers to reach to the same points where before starting + * reclaiming dead tuple TIDs and before clearing dead tuple TIDs information + * in dynamic shared memory. * * If there are no indexes then we can reclaim line pointers on the fly; * dead line pointers need only be retained until all index pointers that * reference them have been killed. */ static void -lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, - Relation *Irel, int nindexes, bool aggressive) +do_lazy_scan_heap(LVState *lvstate, Relation onerel, Relation *Irel, + int nindexes, int options, bool aggressive) { - BlockNumber nblocks, - blkno; + LVRelStats *vacrelstats = lvstate->vacrelstats; + BlockNumber blkno; + BlockNumber nblocks; HeapTupleData tuple; + LVScanDesc lvscan; char *relname; BlockNumber empty_pages, vacuumed_pages; @@ -471,11 +734,15 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, IndexBulkDeleteResult **indstats; int i; PGRUsage ru0; +#ifdef PLV_TIME + PGRUsage ru_scan; + PGRUsage ru_vacuum; +#endif Buffer vmbuffer = InvalidBuffer; - BlockNumber next_unskippable_block; - bool skipping_blocks; xl_heap_freeze_tuple *frozen; StringInfoData buf; + bool all_visible_according_to_vm = false; + const int initprog_index[] = { PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_TOTAL_HEAP_BLKS, @@ -504,89 +771,24 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, vacrelstats->nonempty_pages = 0; vacrelstats->latestRemovedXid = InvalidTransactionId; - lazy_space_alloc(vacrelstats, nblocks); + lazy_space_alloc(lvstate, nblocks); frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage); + /* Begin heap scan for vacuum */ + lvscan = lv_beginscan(onerel, lvstate->pscan); + /* Report that we're scanning the heap, advertising total # of blocks */ initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP; initprog_val[1] = nblocks; - initprog_val[2] = vacrelstats->max_dead_tuples; + initprog_val[2] = lvstate->dtctl->dt_max; pgstat_progress_update_multi_param(3, initprog_index, initprog_val); - /* - * Except when aggressive is set, we want to skip pages that are - * all-visible according to the visibility map, but only when we can skip - * at least SKIP_PAGES_THRESHOLD consecutive pages. Since we're reading - * sequentially, the OS should be doing readahead for us, so there's no - * gain in skipping a page now and then; that's likely to disable - * readahead and so be counterproductive. Also, skipping even a single - * page means that we can't update relfrozenxid, so we only want to do it - * if we can skip a goodly number of pages. - * - * When aggressive is set, we can't skip pages just because they are - * all-visible, but we can still skip pages that are all-frozen, since - * such pages do not need freezing and do not affect the value that we can - * safely set for relfrozenxid or relminmxid. - * - * Before entering the main loop, establish the invariant that - * next_unskippable_block is the next block number >= blkno that we can't - * skip based on the visibility map, either all-visible for a regular scan - * or all-frozen for an aggressive scan. We set it to nblocks if there's - * no such block. We also set up the skipping_blocks flag correctly at - * this stage. - * - * Note: The value returned by visibilitymap_get_status could be slightly - * out-of-date, since we make this test before reading the corresponding - * heap page or locking the buffer. This is OK. If we mistakenly think - * that the page is all-visible or all-frozen when in fact the flag's just - * been cleared, we might fail to vacuum the page. It's easy to see that - * skipping a page when aggressive is not set is not a very big deal; we - * might leave some dead tuples lying around, but the next vacuum will - * find them. But even when aggressive *is* set, it's still OK if we miss - * a page whose all-frozen marking has just been cleared. Any new XIDs - * just added to that page are necessarily newer than the GlobalXmin we - * computed, so they'll have no effect on the value to which we can safely - * set relfrozenxid. A similar argument applies for MXIDs and relminmxid. - * - * We will scan the table's last page, at least to the extent of - * determining whether it has tuples or not, even if it should be skipped - * according to the above rules; except when we've already determined that - * it's not worth trying to truncate the table. This avoids having - * lazy_truncate_heap() take access-exclusive lock on the table to attempt - * a truncation that just fails immediately because there are tuples in - * the last page. This is worth avoiding mainly because such a lock must - * be replayed on any hot standby, where it can be disruptive. - */ - next_unskippable_block = 0; - if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0) - { - while (next_unskippable_block < nblocks) - { - uint8 vmstatus; - - vmstatus = visibilitymap_get_status(onerel, next_unskippable_block, - &vmbuffer); - if (aggressive) - { - if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0) - break; - } - else - { - if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0) - break; - } - vacuum_delay_point(); - next_unskippable_block++; - } - } - - if (next_unskippable_block >= SKIP_PAGES_THRESHOLD) - skipping_blocks = true; - else - skipping_blocks = false; - - for (blkno = 0; blkno < nblocks; blkno++) +#ifdef PLV_TIME + pg_rusage_init(&ru_scan); +#endif + while((blkno = lazy_scan_get_nextpage(onerel, lvstate, lvscan, + &all_visible_according_to_vm, + &vmbuffer, options, aggressive)) != InvalidBlockNumber) { Buffer buf; Page page; @@ -597,99 +799,31 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, int prev_dead_count; int nfrozen; Size freespace; - bool all_visible_according_to_vm = false; bool all_visible; bool all_frozen = true; /* provided all_visible is also true */ bool has_dead_tuples; TransactionId visibility_cutoff_xid = InvalidTransactionId; - - /* see note above about forcing scanning of last page */ -#define FORCE_CHECK_PAGE() \ - (blkno == nblocks - 1 && should_attempt_truncation(vacrelstats)) + int dtmax; + int dtcount; pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno); - if (blkno == next_unskippable_block) - { - /* Time to advance next_unskippable_block */ - next_unskippable_block++; - if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0) - { - while (next_unskippable_block < nblocks) - { - uint8 vmskipflags; - - vmskipflags = visibilitymap_get_status(onerel, - next_unskippable_block, - &vmbuffer); - if (aggressive) - { - if ((vmskipflags & VISIBILITYMAP_ALL_FROZEN) == 0) - break; - } - else - { - if ((vmskipflags & VISIBILITYMAP_ALL_VISIBLE) == 0) - break; - } - vacuum_delay_point(); - next_unskippable_block++; - } - } - - /* - * We know we can't skip the current block. But set up - * skipping_blocks to do the right thing at the following blocks. - */ - if (next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD) - skipping_blocks = true; - else - skipping_blocks = false; - - /* - * Normally, the fact that we can't skip this block must mean that - * it's not all-visible. But in an aggressive vacuum we know only - * that it's not all-frozen, so it might still be all-visible. - */ - if (aggressive && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer)) - all_visible_according_to_vm = true; - } - else - { - /* - * The current block is potentially skippable; if we've seen a - * long enough run of skippable blocks to justify skipping it, and - * we're not forced to check it, then go ahead and skip. - * Otherwise, the page must be at least all-visible if not - * all-frozen, so we can set all_visible_according_to_vm = true. - */ - if (skipping_blocks && !FORCE_CHECK_PAGE()) - { - /* - * Tricky, tricky. If this is in aggressive vacuum, the page - * must have been all-frozen at the time we checked whether it - * was skippable, but it might not be any more. We must be - * careful to count it as a skipped all-frozen page in that - * case, or else we'll think we can't update relfrozenxid and - * relminmxid. If it's not an aggressive vacuum, we don't - * know whether it was all-frozen, so we have to recheck; but - * in this case an approximate answer is OK. - */ - if (aggressive || VM_ALL_FROZEN(onerel, blkno, &vmbuffer)) - vacrelstats->frozenskipped_pages++; - continue; - } - all_visible_according_to_vm = true; - } - vacuum_delay_point(); /* * If we are close to overrunning the available space for dead-tuple * TIDs, pause and do a cycle of vacuuming before we tackle this page. */ - if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage && - vacrelstats->num_dead_tuples > 0) + if (IsDeadTupleShared(lvstate)) + SpinLockAcquire(&lvstate->dtctl->mutex); + + dtmax = lvstate->dtctl->dt_max; + dtcount = lvstate->dtctl->dt_count; + + if (IsDeadTupleShared(lvstate)) + SpinLockRelease(&lvstate->dtctl->mutex); + + if (((dtmax - dtcount) < MaxHeapTuplesPerPage) && dtcount > 0) { const int hvp_index[] = { PROGRESS_VACUUM_PHASE, @@ -697,6 +831,19 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, }; int64 hvp_val[2]; +#ifdef PLV_TIME + elog(WARNING, "%d Scan %s", ParallelWorkerNumber, pg_rusage_show(&ru_scan)); +#endif + /* + * Here we're about to vacuum the table and indexes actually. Before + * entering vacuum state, we have to wait for other vacuum worker to + * reach here. + */ + lazy_prepare_vacuum(lvstate); +#ifdef PLV_TIME + pg_rusage_init(&ru_vacuum); +#endif + /* * Before beginning index vacuuming, we release any pin we may * hold on the visibility map page. This isn't necessary for @@ -716,11 +863,12 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_PHASE_VACUUM_INDEX); - /* Remove index entries */ + /* Remove assigned index entries */ for (i = 0; i < nindexes; i++) - lazy_vacuum_index(Irel[i], - &indstats[i], - vacrelstats); + { + if (IsAssignedIndex(i, lvstate->pstate)) + lazy_vacuum_index(Irel[i], &indstats[i], lvstate); + } /* * Report that we are now vacuuming the heap. We also increase @@ -733,19 +881,28 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, pgstat_progress_update_multi_param(2, hvp_index, hvp_val); /* Remove tuples from heap */ - lazy_vacuum_heap(onerel, vacrelstats); + lazy_vacuum_heap(onerel, lvstate); +#ifdef PLV_TIME + elog(WARNING, "%d VACUUM : %s", ParallelWorkerNumber, pg_rusage_show(&ru_vacuum)); +#endif /* - * Forget the now-vacuumed tuples, and press on, but be careful - * not to reset latestRemovedXid since we want that value to be - * valid. + * Here we've done vacuum on the heap and index and we are going + * to begin the next round scan on heap. Wait until all vacuum worker + * finished vacuum. After all vacuum workers finished, forget the + * now-vacuumed tuples, and press on, but be careful not to reset + * latestRemoveXid since we want that value to be valid. */ - vacrelstats->num_dead_tuples = 0; - vacrelstats->num_index_scans++; + lazy_end_vacuum(lvstate); +#ifdef PLV_TIME + pg_rusage_init(&ru_scan); +#endif /* Report that we are once again scanning the heap */ pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_PHASE_SCAN_HEAP); + + vacrelstats->num_index_scans++; } /* @@ -771,7 +928,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * it's OK to skip vacuuming pages we get a lock conflict on. They * will be dealt with in some future vacuum. */ - if (!aggressive && !FORCE_CHECK_PAGE()) + if (!aggressive && !FORCE_CHECK_PAGE(blkno)) { ReleaseBuffer(buf); vacrelstats->pinskipped_pages++; @@ -923,7 +1080,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, has_dead_tuples = false; nfrozen = 0; hastup = false; - prev_dead_count = vacrelstats->num_dead_tuples; + prev_dead_count = lvstate->dtctl->dt_count; maxoff = PageGetMaxOffsetNumber(page); /* @@ -962,7 +1119,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, */ if (ItemIdIsDead(itemid)) { - lazy_record_dead_tuple(vacrelstats, &(tuple.t_self)); + lazy_record_dead_tuple(lvstate, &(tuple.t_self)); all_visible = false; continue; } @@ -1067,7 +1224,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, if (tupgone) { - lazy_record_dead_tuple(vacrelstats, &(tuple.t_self)); + lazy_record_dead_tuple(lvstate, &(tuple.t_self)); HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data, &vacrelstats->latestRemovedXid); tups_vacuumed += 1; @@ -1132,13 +1289,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, /* * If there are no indexes then we can vacuum the page right now - * instead of doing a second scan. + * instead of doing a second scan. Because each parallel worker uses its + * own dead tuple area they can vacuum independently. */ - if (nindexes == 0 && - vacrelstats->num_dead_tuples > 0) + if (Irel == NULL && lvstate->dtctl->dt_count > 0) { /* Remove tuples from heap */ - lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer); + lazy_vacuum_page(onerel, blkno, buf, 0, lvstate, &vmbuffer); has_dead_tuples = false; /* @@ -1146,7 +1303,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * not to reset latestRemovedXid since we want that value to be * valid. */ - vacrelstats->num_dead_tuples = 0; + lvstate->dtctl->dt_count = 0; + vacuumed_pages++; } @@ -1249,7 +1407,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * page, so remember its free space as-is. (This path will always be * taken if there are no indexes.) */ - if (vacrelstats->num_dead_tuples == prev_dead_count) + if (lvstate->dtctl->dt_count == prev_dead_count) RecordPageWithFreeSpace(onerel, blkno, freespace); } @@ -1264,10 +1422,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, vacrelstats->new_dead_tuples = nkeep; /* now we can compute the new value for pg_class.reltuples */ - vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false, - nblocks, - vacrelstats->tupcount_pages, - num_tuples); + if (!lvstate->parallel_mode) + vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false, + nblocks, + vacrelstats->tupcount_pages, + num_tuples); /* * Release any remaining pin on visibility map page. @@ -1280,13 +1439,25 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, /* If any tuples need to be deleted, perform final vacuum cycle */ /* XXX put a threshold on min number of tuples here? */ - if (vacrelstats->num_dead_tuples > 0) + if (lvstate->dtctl->dt_count > 0) { const int hvp_index[] = { PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_NUM_INDEX_VACUUMS }; int64 hvp_val[2]; +#ifdef PLV_TIME + elog(WARNING, "%d Scan %s", ParallelWorkerNumber, pg_rusage_show(&ru_scan)); +#endif + /* + * Here we're about to vacuum the table and indexes actually. Before + * entering vacuum state, we have to wait for other vacuum worker to + * reach here. + */ + lazy_prepare_vacuum(lvstate); +#ifdef PLV_TIME + pg_rusage_init(&ru_vacuum); +#endif /* Log cleanup info before we touch indexes */ vacuum_log_cleanup_info(onerel, vacrelstats); @@ -1297,9 +1468,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, /* Remove index entries */ for (i = 0; i < nindexes; i++) - lazy_vacuum_index(Irel[i], - &indstats[i], - vacrelstats); + { + if (IsAssignedIndex(i, lvstate->pstate)) + lazy_vacuum_index(Irel[i], &indstats[i], lvstate); + } /* Report that we are now vacuuming the heap */ hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP; @@ -1309,8 +1481,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, /* Remove tuples from heap */ pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_PHASE_VACUUM_HEAP); - lazy_vacuum_heap(onerel, vacrelstats); + + lazy_vacuum_heap(onerel, lvstate); + vacrelstats->num_index_scans++; +#ifdef PLV_TIME + elog(WARNING, "%d VACUUM : %s", ParallelWorkerNumber, pg_rusage_show(&ru_vacuum)); +#endif } /* report all blocks vacuumed; and that we're cleaning up */ @@ -1320,7 +1497,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, /* Do post-vacuum cleanup and statistics update for each index */ for (i = 0; i < nindexes; i++) - lazy_cleanup_index(Irel[i], indstats[i], vacrelstats); + { + if (IsAssignedIndex(i, lvstate->pstate)) + lazy_cleanup_index(Irel[i], indstats[i], lvstate->vacrelstats, + lvstate->parallel_mode ? &(lvstate->indstats[i]) : NULL); + } /* If no indexes, make log report that lazy_vacuum_heap would've made */ if (vacuumed_pages) @@ -1329,12 +1510,16 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, RelationGetRelationName(onerel), tups_vacuumed, vacuumed_pages))); + lv_endscan(lvscan); + /* * This is pretty messy, but we split it up so that we can skip emitting * individual parts of the message when not applicable. */ initStringInfo(&buf); appendStringInfo(&buf, + "------- worker %d TOTAL stats -------\n", ParallelWorkerNumber); + appendStringInfo(&buf, _("%.0f dead row versions cannot be removed yet, oldest xmin: %u\n"), nkeep, OldestXmin); appendStringInfo(&buf, _("There were %.0f unused item pointers.\n"), @@ -1362,6 +1547,35 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, pfree(buf.data); } +/* + * gather_vacuum_stats() -- Gather vacuum statistics from workers + */ +static void +lazy_gather_vacuum_stats(ParallelContext *pcxt, LVRelStats *vacrelstats) +{ + int i; + LVRelStats *lvstats_list; + + lvstats_list = (LVRelStats *) shm_toc_lookup(pcxt->toc, VACUUM_KEY_VACUUM_STATS, false); + + /* Gather each worker stats */ + for (i = 0; i < pcxt->nworkers_launched; i++) + { + LVRelStats *wstats = (LVRelStats*) ((char *) lvstats_list + sizeof(LVRelStats) * i); + + vacrelstats->scanned_pages += wstats->scanned_pages; + vacrelstats->pinskipped_pages += wstats->pinskipped_pages; + vacrelstats->frozenskipped_pages += wstats->frozenskipped_pages; + vacrelstats->scanned_tuples += wstats->scanned_tuples; + vacrelstats->new_dead_tuples += wstats->new_dead_tuples; + vacrelstats->pages_removed += wstats->pages_removed; + vacrelstats->tuples_deleted += wstats->tuples_deleted; + vacrelstats->nonempty_pages += wstats->nonempty_pages; + } + + /* all vacuum workers have same value of rel_pages */ + vacrelstats->rel_pages = lvstats_list->rel_pages; +} /* * lazy_vacuum_heap() -- second pass over the heap @@ -1375,18 +1589,24 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * process index entry removal in batches as large as possible. */ static void -lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) +lazy_vacuum_heap(Relation onerel, LVState *lvstate) { int tupindex; int npages; PGRUsage ru0; + BlockNumber prev_tblk; Buffer vmbuffer = InvalidBuffer; + ItemPointer deadtuples = lvstate->deadtuples; + LVDeadTupleCtl *dtctl = lvstate->dtctl; + BlockNumber ntuples = 0; + StringInfoData buf; pg_rusage_init(&ru0); npages = 0; tupindex = 0; - while (tupindex < vacrelstats->num_dead_tuples) + + while (tupindex < dtctl->dt_count) { BlockNumber tblk; Buffer buf; @@ -1395,7 +1615,40 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) vacuum_delay_point(); - tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]); + /* + * If the dead tuple TIDs are shared with all vacuum workers, + * we acquire the lock and advance tupindex before vacuuming. + * + * NB: The number of maximum tuple can be stored into single + * page is not a large number in most cases. We can use spinlock + * here. + */ + if (IsDeadTupleShared(lvstate)) + { + SpinLockAcquire(&(dtctl->mutex)); + + tupindex = dtctl->dt_index; + + if (tupindex >= dtctl->dt_count) + { + SpinLockRelease(&(dtctl->mutex)); + break; + } + + /* Advance dtct->dt_index */ + prev_tblk = tblk = ItemPointerGetBlockNumber(&deadtuples[tupindex]); + while(prev_tblk == tblk && + dtctl->dt_index < dtctl->dt_count) + { + tblk = ItemPointerGetBlockNumber(&deadtuples[dtctl->dt_index]); + dtctl->dt_index++; + ntuples++; + } + + SpinLockRelease(&(dtctl->mutex)); + } + + tblk = ItemPointerGetBlockNumber(&deadtuples[tupindex]); buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL, vac_strategy); if (!ConditionalLockBufferForCleanup(buf)) @@ -1404,7 +1657,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) ++tupindex; continue; } - tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats, + tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, lvstate, &vmbuffer); /* Now that we've compacted the page, record its available space */ @@ -1422,10 +1675,17 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) vmbuffer = InvalidBuffer; } +#ifdef PLV_TIME + elog(WARNING, "%d TABLE %s", ParallelWorkerNumber, pg_rusage_show(&ru0)); +#endif + initStringInfo(&buf); + appendStringInfo(&buf, + "------- worker %d VACUUM HEAP stats -------\n", ParallelWorkerNumber); + appendStringInfo(&buf, + "\"%s\": removed %d row versions in %d pages", + RelationGetRelationName(onerel), ntuples, npages); ereport(elevel, - (errmsg("\"%s\": removed %d row versions in %d pages", - RelationGetRelationName(onerel), - tupindex, npages), + (errmsg("%s", buf.data), errdetail_internal("%s", pg_rusage_show(&ru0)))); } @@ -1435,34 +1695,32 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) * * Caller must hold pin and buffer cleanup lock on the buffer. * - * tupindex is the index in vacrelstats->dead_tuples of the first dead - * tuple for this page. We assume the rest follow sequentially. - * The return value is the first tupindex after the tuples of this page. */ static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, - int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer) + int tupindex, LVState *lvstate, Buffer *vmbuffer) { Page page = BufferGetPage(buffer); OffsetNumber unused[MaxOffsetNumber]; int uncnt = 0; TransactionId visibility_cutoff_xid; bool all_frozen; + LVRelStats *vacrelstats = lvstate->vacrelstats; pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno); START_CRIT_SECTION(); - for (; tupindex < vacrelstats->num_dead_tuples; tupindex++) + for (; tupindex < lvstate->dtctl->dt_count; tupindex++) { BlockNumber tblk; OffsetNumber toff; ItemId itemid; - tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]); + tblk = ItemPointerGetBlockNumber(&lvstate->deadtuples[tupindex]); if (tblk != blkno) break; /* past end of tuples for this block */ - toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]); + toff = ItemPointerGetOffsetNumber(&lvstate->deadtuples[tupindex]); itemid = PageGetItemId(page, toff); ItemIdSetUnused(itemid); unused[uncnt++] = toff; @@ -1587,14 +1845,15 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup) * lazy_vacuum_index() -- vacuum one index relation. * * Delete all the index entries pointing to tuples listed in - * vacrelstats->dead_tuples, and update running statistics. + * lvstate->deadtuples, and update running statistics. */ static void lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats, - LVRelStats *vacrelstats) + LVState *lvstate) { IndexVacuumInfo ivinfo; + StringInfoData buf; PGRUsage ru0; pg_rusage_init(&ru0); @@ -1603,17 +1862,25 @@ lazy_vacuum_index(Relation indrel, ivinfo.analyze_only = false; ivinfo.estimated_count = true; ivinfo.message_level = elevel; - ivinfo.num_heap_tuples = vacrelstats->old_rel_tuples; + ivinfo.num_heap_tuples = lvstate->vacrelstats->old_rel_tuples; ivinfo.strategy = vac_strategy; /* Do bulk deletion */ - *stats = index_bulk_delete(&ivinfo, *stats, - lazy_tid_reaped, (void *) vacrelstats); + *stats = index_bulk_delete(&ivinfo, *stats, lazy_tid_reaped, (void *) lvstate); + +#ifdef PLV_TIME + elog(WARNING, "%d INDEX(%d) %s", ParallelWorkerNumber, RelationGetRelid(indrel), + pg_rusage_show(&ru0)); +#endif + initStringInfo(&buf); + appendStringInfo(&buf, + "------- worker %d VACUUM INDEX stats -------\n", ParallelWorkerNumber); + appendStringInfo(&buf, + "scanned index \"%s\" to remove %d row versions", + RelationGetRelationName(indrel), lvstate->dtctl->dt_count); ereport(elevel, - (errmsg("scanned index \"%s\" to remove %d row versions", - RelationGetRelationName(indrel), - vacrelstats->num_dead_tuples), + (errmsg("%s", buf.data), errdetail_internal("%s", pg_rusage_show(&ru0)))); } @@ -1621,11 +1888,11 @@ lazy_vacuum_index(Relation indrel, * lazy_cleanup_index() -- do post-vacuum cleanup for one index relation. */ static void -lazy_cleanup_index(Relation indrel, - IndexBulkDeleteResult *stats, - LVRelStats *vacrelstats) +lazy_cleanup_index(Relation indrel, IndexBulkDeleteResult *stats, + LVRelStats *vacrelstats, LVIndStats *indstat) { IndexVacuumInfo ivinfo; + StringInfoData buf; PGRUsage ru0; pg_rusage_init(&ru0); @@ -1639,34 +1906,54 @@ lazy_cleanup_index(Relation indrel, stats = index_vacuum_cleanup(&ivinfo, stats); + /* Will be updated by leader process after vacuumed */ + if (indstat) + indstat->updated = false; + if (!stats) return; /* * Now update statistics in pg_class, but only if the index says the count - * is accurate. + * is accurate. In parallel lazy vacuum, the worker can not update these + * information by itself, so save to DSM and then the launcher process + * updates it later. */ if (!stats->estimated_count) - vac_update_relstats(indrel, - stats->num_pages, - stats->num_index_tuples, - 0, - false, - InvalidTransactionId, - InvalidMultiXactId, - false); + { + if (indstat) + { + indstat->updated = true; + indstat->num_pages = stats->num_pages; + indstat->num_tuples = stats->num_index_tuples; + } + else + vac_update_relstats(indrel, + stats->num_pages, + stats->num_index_tuples, + 0, + false, + InvalidTransactionId, + InvalidMultiXactId, + false); + } + initStringInfo(&buf); + appendStringInfo(&buf, + "------- worker %d CLEANUP INDEX stats -------\n", ParallelWorkerNumber); + appendStringInfo(&buf, + "index \"%s\" now contains %.0f row versions in %u pages", + RelationGetRelationName(indrel), + stats->num_index_tuples, + stats->num_pages); ereport(elevel, - (errmsg("index \"%s\" now contains %.0f row versions in %u pages", - RelationGetRelationName(indrel), - stats->num_index_tuples, - stats->num_pages), - errdetail("%.0f index row versions were removed.\n" - "%u index pages have been deleted, %u are currently reusable.\n" - "%s.", - stats->tuples_removed, - stats->pages_deleted, stats->pages_free, - pg_rusage_show(&ru0)))); + (errmsg("%s", buf.data), + errdetail("%.0f index row versions were removed.\n" + "%u index pages have been deleted, %u are currently reusable.\n" + "%s.", + stats->tuples_removed, + stats->pages_deleted, stats->pages_free, + pg_rusage_show(&ru0)))); pfree(stats); } @@ -1976,59 +2263,69 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats) /* * lazy_space_alloc - space allocation decisions for lazy vacuum * + * In parallel lazy vacuum the space for dead tuple locations are already + * allocated in dynamic shared memory, so we allocate space for dead tuple + * locations in local memory only when in not parallel lazy vacuum and set + * MyDeadTuple. + * * See the comments at the head of this file for rationale. */ static void -lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks) +lazy_space_alloc(LVState *lvstate, BlockNumber relblocks) { - long maxtuples; - int vac_work_mem = IsAutoVacuumWorkerProcess() && - autovacuum_work_mem != -1 ? - autovacuum_work_mem : maintenance_work_mem; + long maxtuples; - if (vacrelstats->hasindex) - { - maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData); - maxtuples = Min(maxtuples, INT_MAX); - maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData)); + /* + * In parallel mode, we already set the pointer to dead tuple + * array when initialize. + */ + if (lvstate->parallel_mode && lvstate->vacrelstats->nindexes > 0) + return; - /* curious coding here to ensure the multiplication can't overflow */ - if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks) - maxtuples = relblocks * LAZY_ALLOC_TUPLES; + maxtuples = lazy_get_max_dead_tuples(lvstate->vacrelstats); - /* stay sane if small maintenance_work_mem */ - maxtuples = Max(maxtuples, MaxHeapTuplesPerPage); - } - else - { - maxtuples = MaxHeapTuplesPerPage; - } - - vacrelstats->num_dead_tuples = 0; - vacrelstats->max_dead_tuples = (int) maxtuples; - vacrelstats->dead_tuples = (ItemPointer) - palloc(maxtuples * sizeof(ItemPointerData)); + /* + * If in not parallel lazy vacuum, we need to allocate dead + * tuple array in local memory. + */ + lvstate->deadtuples = palloc0(sizeof(ItemPointerData) * (int)maxtuples); + lvstate->dtctl = (LVDeadTupleCtl *) palloc(sizeof(LVDeadTupleCtl)); + lvstate->dtctl->dt_max = lazy_get_max_dead_tuples(lvstate->vacrelstats); + lvstate->dtctl->dt_count = 0; } /* * lazy_record_dead_tuple - remember one deletable tuple + * + * Acquiring the spinlock before remember is required if the dead tuple + * TIDs are shared with other vacuum workers. */ static void -lazy_record_dead_tuple(LVRelStats *vacrelstats, - ItemPointer itemptr) +lazy_record_dead_tuple(LVState *lvstate, ItemPointer itemptr) { + LVDeadTupleCtl *dtctl = lvstate->dtctl; + + if (IsDeadTupleShared(lvstate)) + SpinLockAcquire(&(dtctl->mutex)); + + if (dtctl->dt_count >= dtctl->dt_max) + elog(ERROR, "dead tuple array overflow"); + /* * The array must never overflow, since we rely on all deletable tuples * being removed; inability to remove a tuple might cause an old XID to * persist beyond the freeze limit, which could be disastrous later on. */ - if (vacrelstats->num_dead_tuples >= vacrelstats->max_dead_tuples) - elog(ERROR, "dead tuple array overflow"); + if (dtctl->dt_count < dtctl->dt_max) + { - vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr; - vacrelstats->num_dead_tuples++; - pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES, - vacrelstats->num_dead_tuples); + lvstate->deadtuples[dtctl->dt_count] = *itemptr; + (dtctl->dt_count)++; + /* XXX : Update progress information here */ + } + + if (IsDeadTupleShared(lvstate)) + SpinLockRelease(&(dtctl->mutex)); } /* @@ -2041,16 +2338,23 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats, static bool lazy_tid_reaped(ItemPointer itemptr, void *state) { - LVRelStats *vacrelstats = (LVRelStats *) state; + LVState *lvstate = (LVState *) state; ItemPointer res; + /* + * We can assume that the dead tuple TIDs are sorted by TID location + * even when we shared the dead tuple TIDs with other vacuum workers. + */ res = (ItemPointer) bsearch((void *) itemptr, - (void *) vacrelstats->dead_tuples, - vacrelstats->num_dead_tuples, + (void *) lvstate->deadtuples, + lvstate->dtctl->dt_count, sizeof(ItemPointerData), vac_cmp_itemptr); - return (res != NULL); + if (res != NULL) + return true; + + return false; } /* @@ -2194,3 +2498,622 @@ heap_page_is_all_visible(Relation rel, Buffer buf, return all_visible; } + +/* + * Return the block number we need to scan next, or InvalidBlockNumber if scan + * is done. + * + * Except when aggressive is set, we want to skip pages that are + * all-visible according to the visibility map, but only when we can skip + * at least SKIP_PAGES_THRESHOLD consecutive pages. Since we're reading + * sequentially, the OS should be doing readahead for us, so there's no + * gain in skipping a page now and then; that's likely to disable + * readahead and so be counterproductive. Also, skipping even a single + * page means that we can't update relfrozenxid, so we only want to do it + * if we can skip a goodly number of pages. + * + * When aggressive is set, we can't skip pages just because they are + * all-visible, but we can still skip pages that are all-frozen, since + * such pages do not need freezing and do not affect the value that we can + * safely set for relfrozenxid or relminmxid. + * + * Before entering the main loop, establish the invariant that + * next_unskippable_block is the next block number >= blkno that we can't + * skip based on the visibility map, either all-visible for a regular scan + * or all-frozen for an aggressive scan. We set it to nblocks if there's + * no such block. We also set up the skipping_blocks flag correctly at + * this stage. + * + * In not parallel mode, before entering the main loop, establish the + * invariant that next_unskippable_block is the next block number >= blkno + * that's not we can't skip based on the visibility map, either all-visible + * for a regular scan or all-frozen for an aggressive scan. We set it to + * nblocks if there's no such block. We also set up the skipping_blocks + * flag correctly at this stage. + * + * In parallel mode, pstate is not NULL. We scan heap pages + * using parallel heap scan description. Each worker calls heap_parallelscan_nextpage() + * in order to exclusively get block number we need to scan at next. + * If given block is all-visible according to visibility map, we skip to + * scan this block immediately unlike not parallel lazy scan. + * + * Note: The value returned by visibilitymap_get_status could be slightly + * out-of-date, since we make this test before reading the corresponding + * heap page or locking the buffer. This is OK. If we mistakenly think + * that the page is all-visible or all-frozen when in fact the flag's just + * been cleared, we might fail to vacuum the page. It's easy to see that + * skipping a page when aggressive is not set is not a very big deal; we + * might leave some dead tuples lying around, but the next vacuum will + * find them. But even when aggressive *is* set, it's still OK if we miss + * a page whose all-frozen marking has just been cleared. Any new XIDs + * just added to that page are necessarily newer than the GlobalXmin we + * Computed, so they'll have no effect on the value to which we can safely + * set relfrozenxid. A similar argument applies for MXIDs and relminmxid. + * + * We will scan the table's last page, at least to the extent of + * determining whether it has tuples or not, even if it should be skipped + * according to the above rules; except when we've already determined that + * it's not worth trying to truncate the table. This avoids having + * lazy_truncate_heap() take access-exclusive lock on the table to attempt + * a truncation that just fails immediately because there are tuples in + * the last page. This is worth avoiding mainly because such a lock must + * be replayed on any hot standby, where it can be disruptive. + */ +static BlockNumber +lazy_scan_get_nextpage(Relation onerel, LVState *lvstate, + LVScanDesc lvscan, bool *all_visible_according_to_vm, + Buffer *vmbuffer, int options, bool aggressive) +{ + BlockNumber blkno; + LVRelStats *vacrelstats = lvstate->vacrelstats; + + if (lvstate->parallel_mode) + { + /* + * In parallel lazy vacuum since it's hard to know how many consecutive + * all-visible pages exits on table we skip to scan the heap page immediately. + * if it is all-visible page. + */ + while ((blkno = heap_parallelscan_nextpage(lvscan->heapscan)) != InvalidBlockNumber) + { + *all_visible_according_to_vm = false; + vacuum_delay_point(); + + /* Consider to skip scan page according visibility map */ + if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0 && + !FORCE_CHECK_PAGE(blkno)) + { + uint8 vmstatus; + + vmstatus = visibilitymap_get_status(onerel, blkno, vmbuffer); + + if (aggressive) + { + if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0) + { + vacrelstats->frozenskipped_pages++; + continue; + } + else if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0) + *all_visible_according_to_vm = true; + } + else + { + if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) != 0) + { + if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) != 0) + vacrelstats->frozenskipped_pages++; + continue; + } + } + } + + /* We need to scan current blkno, break */ + break; + } + } + else + { + bool skipping_blocks = false; + + /* Initialize lv_nextunskippable_page if needed */ + if (lvscan->lv_cblock == 0 && (options & VACOPT_DISABLE_PAGE_SKIPPING) == 0) + { + while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks) + { + uint8 vmstatus; + + vmstatus = visibilitymap_get_status(onerel, lvscan->lv_next_unskippable_block, + vmbuffer); + if (aggressive) + { + if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0) + break; + } + else + { + if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0) + break; + } + vacuum_delay_point(); + lvscan->lv_next_unskippable_block++; + } + + if (lvscan->lv_next_unskippable_block >= SKIP_PAGES_THRESHOLD) + skipping_blocks = true; + else + skipping_blocks = false; + } + + /* Decide the block number we need to scan */ + for (blkno = lvscan->lv_cblock; blkno < lvscan->lv_nblocks; blkno++) + { + if (blkno == lvscan->lv_next_unskippable_block) + { + /* Time to advance next_unskippable_block */ + lvscan->lv_next_unskippable_block++; + if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0) + { + while (lvscan->lv_next_unskippable_block < lvscan->lv_nblocks) + { + uint8 vmstatus; + + vmstatus = visibilitymap_get_status(onerel, + lvscan->lv_next_unskippable_block, + vmbuffer); + if (aggressive) + { + if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0) + break; + } + else + { + if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0) + break; + } + vacuum_delay_point(); + lvscan->lv_next_unskippable_block++; + } + } + + /* + * We know we can't skip the current block. But set up + * skipping_all_visible_blocks to do the right thing at the + * following blocks. + */ + if (lvscan->lv_next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD) + skipping_blocks = true; + else + skipping_blocks = false; + + /* + * Normally, the fact that we can't skip this block must mean that + * it's not all-visible. But in an aggressive vacuum we know only + * that it's not all-frozen, so it might still be all-visible. + */ + if (aggressive && VM_ALL_VISIBLE(onerel, blkno, vmbuffer)) + *all_visible_according_to_vm = true; + + /* Found out that next unskippable block number */ + break; + } + else + { + /* + * The current block is potentially skippable; if we've seen a + * long enough run of skippable blocks to justify skipping it, and + * we're not forced to check it, then go ahead and skip. + * Otherwise, the page must be at least all-visible if not + * all-frozen, so we can set all_visible_according_to_vm = true. + */ + if (skipping_blocks && !FORCE_CHECK_PAGE(blkno)) + { + /* + * Tricky, tricky. If this is in aggressive vacuum, the page + * must have been all-frozen at the time we checked whether it + * was skippable, but it might not be any more. We must be + * careful to count it as a skipped all-frozen page in that + * case, or else we'll think we can't update relfrozenxid and + * relminmxid. If it's not an aggressive vacuum, we don't + * know whether it was all-frozen, so we have to recheck; but + * in this case an approximate answer is OK. + */ + if (aggressive || VM_ALL_FROZEN(onerel, blkno, vmbuffer)) + vacrelstats->frozenskipped_pages++; + continue; + } + + *all_visible_according_to_vm = true; + + /* We need to scan current blkno, break */ + break; + } + } /* for */ + + /* Advance the current block number for the next scan */ + lvscan->lv_cblock = blkno + 1; + } + + return (blkno == lvscan->lv_nblocks) ? InvalidBlockNumber : blkno; +} + +/* + * Begin lazy vacuum scan. lvscan->heapscan is NULL if + * we're not in parallel lazy vacuum. + */ +static LVScanDesc +lv_beginscan(Relation onerel, ParallelHeapScanDesc pscan) +{ + LVScanDesc lvscan; + + lvscan = (LVScanDesc) palloc(sizeof(LVScanDescData)); + + lvscan->lv_cblock = 0; + lvscan->lv_next_unskippable_block = 0; + lvscan->lv_nblocks = RelationGetNumberOfBlocks(onerel); + + if (pscan != NULL) + lvscan->heapscan = heap_beginscan_parallel(onerel, pscan); + else + lvscan->heapscan = NULL; + + return lvscan; +} + +/* + * End lazy vacuum scan. + */ +static void +lv_endscan(LVScanDesc lvscan) +{ + if (lvscan->heapscan != NULL) + heap_endscan(lvscan->heapscan); + pfree(lvscan); +} + +/* ---------------------------------------------------------------- + * Parallel Lazy Vacuum Support + * ---------------------------------------------------------------- + */ + +/* + * Estimate storage for parallel lazy vacuum. + */ +static void +lazy_estimate_dsm(ParallelContext *pcxt, LVRelStats *vacrelstats) +{ + Size size = 0; + int keys = 0; + int vacuum_workers = pcxt->nworkers + 1; + long maxtuples = lazy_get_max_dead_tuples(vacrelstats); + + /* Estimate size for parallel heap scan */ + size += heap_parallelscan_estimate(SnapshotAny); + keys++; + + /* Estimate size for vacuum statistics for only workers*/ + size += BUFFERALIGN(mul_size(sizeof(LVRelStats), pcxt->nworkers)); + keys++; + + /* We have to share dead tuple information only when the table has indexes */ + if (vacrelstats->nindexes > 0) + { + /* Estimate size for index statistics */ + size += BUFFERALIGN(mul_size(sizeof(LVIndStats), vacrelstats->nindexes)); + keys++; + + /* Estimate size for dead tuple control */ + size += BUFFERALIGN(sizeof(LVDeadTupleCtl)); + keys++; + + /* Estimate size for dead tuple array */ + size += BUFFERALIGN(mul_size( + mul_size(sizeof(ItemPointerData), maxtuples), + vacuum_workers)); + keys++; + } + + /* Estimate size for parallel lazy vacuum state */ + size += BUFFERALIGN(sizeof(LVParallelState)); + keys++; + + /* Estimate size for vacuum task */ + size += BUFFERALIGN(sizeof(VacuumInfo)); + keys++; + + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, keys); +} + +/* + * Initialize dynamic shared memory for parallel lazy vacuum. We store + * relevant informations of parallel heap scanning, dead tuple array + * and vacuum statistics for each worker and some parameters for lazy vacuum. + */ +static void +lazy_initialize_dsm(ParallelContext *pcxt, Relation onerel, LVState *lvstate, + int options, bool aggressive) +{ + ParallelHeapScanDesc pscan_ptr; + ItemPointer deadtuples_ptr; + char *lvrelstats_ptr; + LVParallelState *pstate_ptr; + LVIndStats *indstats_ptr; + LVDeadTupleCtl *dtctl_ptr; + int i; + int deadtuples_size; + int lvrelstats_size; + int vacuum_workers = pcxt->nworkers + 1; + long max_tuples = lazy_get_max_dead_tuples(lvstate->vacrelstats); + + /* Allocate and initialize DSM for vacuum stats for each worker */ + lvrelstats_size = mul_size(sizeof(LVRelStats), pcxt->nworkers); + lvrelstats_ptr = shm_toc_allocate(pcxt->toc, lvrelstats_size); + for (i = 0; i < pcxt->nworkers; i++) + { + char *start; + + start = lvrelstats_ptr + i * sizeof(LVRelStats); + memcpy(start, lvstate->vacrelstats, sizeof(LVRelStats)); + } + shm_toc_insert(pcxt->toc, VACUUM_KEY_VACUUM_STATS, lvrelstats_ptr); + + if (lvstate->vacrelstats->nindexes > 0) + { + /* Allocate and initialize DSM for dead tuple control */ + dtctl_ptr = (LVDeadTupleCtl *) shm_toc_allocate(pcxt->toc, sizeof(LVDeadTupleCtl)); + SpinLockInit(&(dtctl_ptr->mutex)); + dtctl_ptr->dt_max = max_tuples * vacuum_workers; + dtctl_ptr->dt_count = 0; + dtctl_ptr->dt_index = 0; + lvstate->dtctl = dtctl_ptr; + shm_toc_insert(pcxt->toc, VACUUM_KEY_DEAD_TUPLE_CTL, dtctl_ptr); + + /* Allocate and initialize DSM for dead tuple array */ + deadtuples_size = mul_size(mul_size(sizeof(ItemPointerData), max_tuples), + vacuum_workers); + deadtuples_ptr = (ItemPointer) shm_toc_allocate(pcxt->toc, + deadtuples_size); + shm_toc_insert(pcxt->toc, VACUUM_KEY_DEAD_TUPLES, deadtuples_ptr); + lvstate->deadtuples = deadtuples_ptr; + + /* Allocate DSM for index statistics */ + indstats_ptr = (LVIndStats *) shm_toc_allocate(pcxt->toc, + mul_size(sizeof(LVIndStats), + lvstate->vacrelstats->nindexes)); + shm_toc_insert(pcxt->toc, VACUUM_KEY_INDEX_STATS, indstats_ptr); + lvstate->indstats = indstats_ptr; + } + + /* Allocate and initialize DSM for parallel scan description */ + pscan_ptr = (ParallelHeapScanDesc) shm_toc_allocate(pcxt->toc, + heap_parallelscan_estimate(SnapshotAny)); + shm_toc_insert(pcxt->toc, VACUUM_KEY_PARALLEL_SCAN, pscan_ptr); + heap_parallelscan_initialize(pscan_ptr, onerel, SnapshotAny); + lvstate->pscan = pscan_ptr; + + /* Allocate and initialize DSM for parallel vacuum state */ + pstate_ptr = (LVParallelState *) shm_toc_allocate(pcxt->toc, sizeof(LVParallelState)); + shm_toc_insert(pcxt->toc, VACUUM_KEY_PARALLEL_STATE, pstate_ptr); + + ConditionVariableInit(&(pstate_ptr->cv)); + SpinLockInit(&(pstate_ptr->mutex)); + pstate_ptr->nworkers = vacuum_workers; + pstate_ptr->state = VACSTATE_SCAN; + pstate_ptr->info.aggressive = aggressive; + pstate_ptr->info.options = options; + pstate_ptr->info.oldestxmin = OldestXmin; + pstate_ptr->info.freezelimit = FreezeLimit; + pstate_ptr->info.multixactcutoff = MultiXactCutoff; + pstate_ptr->info.elevel = elevel; + lvstate->pstate = pstate_ptr; +} + +/* + * Initialize parallel lazy vacuum for worker. + */ +static LVState * +lazy_initialize_worker(shm_toc *toc) +{ + LVState *lvstate; + char *lvstats; + + lvstate = (LVState *) palloc(sizeof(LVState)); + lvstate->parallel_mode = true; + + /* Set up vacuum stats */ + lvstats = shm_toc_lookup(toc, VACUUM_KEY_VACUUM_STATS, false); + lvstate->vacrelstats = (LVRelStats *) (lvstats + + sizeof(LVRelStats) * ParallelWorkerNumber); + + if (lvstate->vacrelstats->nindexes > 0) + { + /* Set up dead tuple control */ + lvstate->dtctl = (LVDeadTupleCtl *) shm_toc_lookup(toc, VACUUM_KEY_DEAD_TUPLE_CTL, false); + + /* Set up dead tuple array */ + lvstate->deadtuples = (ItemPointer) shm_toc_lookup(toc, VACUUM_KEY_DEAD_TUPLES, false); + + /* Set up index statistics */ + lvstate->indstats = (LVIndStats *) shm_toc_lookup(toc, VACUUM_KEY_INDEX_STATS, false); + } + + /* Set up parallel vacuum state */ + lvstate->pstate = (LVParallelState *) shm_toc_lookup(toc, VACUUM_KEY_PARALLEL_STATE, false); + + /* Set up parallel heap scan description */ + lvstate->pscan = (ParallelHeapScanDesc) shm_toc_lookup(toc, VACUUM_KEY_PARALLEL_SCAN, false); + + /* Set up parameters for lazy vacuum */ + OldestXmin = lvstate->pstate->info.oldestxmin; + FreezeLimit = lvstate->pstate->info.freezelimit; + MultiXactCutoff = lvstate->pstate->info.multixactcutoff; + elevel = lvstate->pstate->info.elevel; + + return lvstate; +} + +/* + * In the end of actual vacuumming on table and indexes actually, we have + * to wait for other all vacuum workers to reach here before clearing dead + * tuple TIDs information. + */ +static void +lazy_end_vacuum(LVState *lvstate) +{ + LVParallelState *pstate = lvstate->pstate; + bool counted = false; + + /* Exit if in not parallel vacuum */ + if (!lvstate->parallel_mode) + { + lvstate->dtctl->dt_count = 0; + return; + } + + while (true) + { + int finish_count; + int state; + + SpinLockAcquire(&(pstate->mutex)); + + /* Fetch shared information */ + if (!counted) + pstate->finish_count++; + finish_count = pstate->finish_count; + state = pstate->state; + + SpinLockRelease(&(pstate->mutex)); + + if (state == VACSTATE_SCAN) + break; + + /* + * Wake up other workers if counted up if first time to reach here and + * is a parallel worker. + */ + if (!counted && IsParallelWorker()) + ConditionVariableBroadcast(&pstate->cv); + + counted = true; + + /* + * If all launched parallel vacuum workers reached here, we can clear the + * dead tuple TIDs information. + */ + if (!IsParallelWorker() && finish_count == (lvstate->pcxt->nworkers_launched + 1)) + { + /* Clear dead tuples */ + lvstate->dtctl->dt_count = 0; + + /* need spinlock ? */ + pstate->finish_count = 0; + pstate->state = VACSTATE_SCAN; + + ConditionVariableBroadcast(&pstate->cv); + break; + } + + ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_VACUUM_DONE); + } + + ConditionVariableCancelSleep(); +} + +/* + * Before starting actual vacuuming on table and indexes, we have to wait for + * other all vacuum workers so that all worker can see the same dead tuple TIDs + * information when vacuuming. + */ +static void +lazy_prepare_vacuum(LVState *lvstate) +{ + LVParallelState *pstate = lvstate->pstate; + bool counted = false; + + /* Exit if in not parallel vacuum */ + if (!lvstate->parallel_mode) + return; + + while (true) + { + int finish_count; + int state; + + SpinLockAcquire(&(pstate->mutex)); + + if (!counted) + pstate->finish_count++; + state = pstate->state; + finish_count = pstate->finish_count; + + SpinLockRelease(&(pstate->mutex)); + + if (state == VACSTATE_VACUUM) + break; + + /* + * Wake up other workers if counted up if first time to reach here and + * is a parallel worker. + */ + if (!counted && IsParallelWorker()) + ConditionVariableBroadcast(&pstate->cv); + + counted = true; + + /* + * The leader process can change parallel vacuum state if all workers + * have reached here. + */ + if (!IsParallelWorker() && finish_count == (lvstate->pcxt->nworkers_launched + 1)) + { + qsort((void *) lvstate->deadtuples, lvstate->dtctl->dt_count, + sizeof(ItemPointerData), vac_cmp_itemptr); + + /* XXX: need spinlock ? */ + pstate->finish_count = 0; + pstate->state = VACSTATE_VACUUM; + + ConditionVariableBroadcast(&pstate->cv); + break; + } + + ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_VACUUM_PREPARE); + } + + ConditionVariableCancelSleep(); +} + +/* + * Return the number of maximum dead tuples can be stored according + * to vac_work_mem. + */ +static long +lazy_get_max_dead_tuples(LVRelStats *vacrelstats) +{ + long maxtuples; + int vac_work_mem = IsAutoVacuumWorkerProcess() && + autovacuum_work_mem != -1 ? + autovacuum_work_mem : maintenance_work_mem; + + if (vacrelstats->nindexes != 0) + { + maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData); + maxtuples = Min(maxtuples, INT_MAX); + maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData)); + + /* curious coding here to ensure the multiplication can't overflow */ + if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > vacrelstats->old_rel_pages) + maxtuples = vacrelstats->old_rel_pages * LAZY_ALLOC_TUPLES; + + /* stay sane if small maintenance_work_mem */ + maxtuples = Max(maxtuples, MaxHeapTuplesPerPage); + } + else + { + maxtuples = MaxHeapTuplesPerPage; + } + + return maxtuples; +} diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 7a70001..8cba9c8 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1662,7 +1662,12 @@ _equalDropdbStmt(const DropdbStmt *a, const DropdbStmt *b) static bool _equalVacuumStmt(const VacuumStmt *a, const VacuumStmt *b) { - COMPARE_SCALAR_FIELD(options); + if (a->options.flags != b->options.flags) + return false; + + if (a->options.nworkers != b->options.nworkers) + return false; + COMPARE_NODE_FIELD(rels); return true; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 4c83a63..85f5d07 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -187,6 +187,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType, bool *deferrable, bool *initdeferred, bool *not_valid, bool *no_inherit, core_yyscan_t yyscanner); static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); +static VacuumOptions *makeVacOpt(VacuumOption flag, int nworkers); %} @@ -237,6 +238,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); struct ImportQual *importqual; InsertStmt *istmt; VariableSetStmt *vsetstmt; + VacuumOptions *vacopts; PartitionElem *partelem; PartitionSpec *partspec; PartitionBoundSpec *partboundspec; @@ -305,7 +307,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); create_extension_opt_item alter_extension_opt_item %type opt_lock lock_type cast_context -%type vacuum_option_list vacuum_option_elem +%type vacuum_option_list vacuum_option_elem %type opt_or_replace opt_grant_grant_option opt_grant_admin_option opt_nowait opt_if_exists opt_with_data @@ -10152,32 +10154,40 @@ cluster_index_specification: VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_vacuum_relation_list { VacuumStmt *n = makeNode(VacuumStmt); - n->options = VACOPT_VACUUM; + VacuumOptions *vacopts = makeVacOpt(VACOPT_VACUUM, 1); if ($2) - n->options |= VACOPT_FULL; + vacopts->flags |= VACOPT_FULL; if ($3) - n->options |= VACOPT_FREEZE; + vacopts->flags |= VACOPT_FREEZE; if ($4) - n->options |= VACOPT_VERBOSE; + vacopts->flags |= VACOPT_VERBOSE; + n->rels = $5; + n->options.flags = vacopts->flags; + n->options.nworkers = 0; $$ = (Node *)n; + pfree(vacopts); } | VACUUM opt_full opt_freeze opt_verbose AnalyzeStmt { VacuumStmt *n = (VacuumStmt *) $5; - n->options |= VACOPT_VACUUM; + n->options.flags |= VACOPT_VACUUM; if ($2) - n->options |= VACOPT_FULL; + n->options.flags |= VACOPT_FULL; if ($3) - n->options |= VACOPT_FREEZE; + n->options.flags |= VACOPT_FREEZE; if ($4) - n->options |= VACOPT_VERBOSE; + n->options.flags |= VACOPT_VERBOSE; + n->options.nworkers = 0; $$ = (Node *)n; } | VACUUM '(' vacuum_option_list ')' opt_vacuum_relation_list { VacuumStmt *n = makeNode(VacuumStmt); - n->options = VACOPT_VACUUM | $3; + VacuumOptions *vacopts = $3; + + n->options.flags = vacopts->flags | VACOPT_VACUUM; + n->options.nworkers = vacopts->nworkers; n->rels = $5; $$ = (Node *) n; } @@ -10185,18 +10195,38 @@ VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_vacuum_relation_list vacuum_option_list: vacuum_option_elem { $$ = $1; } - | vacuum_option_list ',' vacuum_option_elem { $$ = $1 | $3; } + | vacuum_option_list ',' vacuum_option_elem + { + VacuumOptions *vacopts1 = (VacuumOptions *)$1; + VacuumOptions *vacopts2 = (VacuumOptions *)$3; + + vacopts1->flags |= vacopts2->flags; + if (vacopts2->flags == VACOPT_PARALLEL) + vacopts1->nworkers = vacopts2->nworkers; + + $$ = vacopts1; + pfree(vacopts2); + } ; vacuum_option_elem: - analyze_keyword { $$ = VACOPT_ANALYZE; } - | VERBOSE { $$ = VACOPT_VERBOSE; } - | FREEZE { $$ = VACOPT_FREEZE; } - | FULL { $$ = VACOPT_FULL; } + analyze_keyword { $$ = makeVacOpt(VACOPT_ANALYZE, 0); } + | VERBOSE { $$ = makeVacOpt(VACOPT_VERBOSE, 0); } + | FREEZE { $$ = makeVacOpt(VACOPT_FREEZE, 0); } + | FULL { $$ = makeVacOpt(VACOPT_FULL, 0); } + | PARALLEL ICONST + { + if ($2 < 1) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parallel vacuum degree must be more than 1"), + parser_errposition(@1))); + $$ = makeVacOpt(VACOPT_PARALLEL, $2); + } | IDENT { if (strcmp($1, "disable_page_skipping") == 0) - $$ = VACOPT_DISABLE_PAGE_SKIPPING; + $$ = makeVacOpt(VACOPT_DISABLE_PAGE_SKIPPING, 1); else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -10208,11 +10238,16 @@ vacuum_option_elem: AnalyzeStmt: analyze_keyword opt_verbose opt_vacuum_relation_list { VacuumStmt *n = makeNode(VacuumStmt); - n->options = VACOPT_ANALYZE; + VacuumOptions *vacopts = makeVacOpt(VACOPT_ANALYZE, 1); + if ($2) - n->options |= VACOPT_VERBOSE; + vacopts->flags |= VACOPT_VERBOSE; + + n->options.flags = vacopts->flags; + n->options.nworkers = 0; n->rels = $3; $$ = (Node *)n; + pfree(vacopts); } ; @@ -15915,6 +15950,16 @@ makeRecursiveViewSelect(char *relname, List *aliases, Node *query) return (Node *) s; } +static VacuumOptions * +makeVacOpt(VacuumOption flag, int nworkers) +{ + VacuumOptions *vacopt = palloc(sizeof(VacuumOptions)); + + vacopt->flags = flag; + vacopt->nworkers = nworkers; + return vacopt; +} + /* parser_init() * Initialize to parse one query string */ diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index c04c0b5..ee7e6be 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -189,7 +189,7 @@ typedef struct av_relation typedef struct autovac_table { Oid at_relid; - int at_vacoptions; /* bitmask of VacuumOption */ + VacuumOptions at_vacoptions; /* contains bitmask of VacuumOption */ VacuumParams at_params; int at_vacuum_cost_delay; int at_vacuum_cost_limit; @@ -2466,7 +2466,7 @@ do_autovacuum(void) * next table in our list. */ HOLD_INTERRUPTS(); - if (tab->at_vacoptions & VACOPT_VACUUM) + if (tab->at_vacoptions.flags & VACOPT_VACUUM) errcontext("automatic vacuum of table \"%s.%s.%s\"", tab->at_datname, tab->at_nspname, tab->at_relname); else @@ -2867,10 +2867,11 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map, tab = palloc(sizeof(autovac_table)); tab->at_relid = relid; tab->at_sharedrel = classForm->relisshared; - tab->at_vacoptions = VACOPT_SKIPTOAST | + tab->at_vacoptions.flags = VACOPT_SKIPTOAST | (dovacuum ? VACOPT_VACUUM : 0) | (doanalyze ? VACOPT_ANALYZE : 0) | (!wraparound ? VACOPT_NOWAIT : 0); + tab->at_vacoptions.nworkers = 1; tab->at_params.freeze_min_age = freeze_min_age; tab->at_params.freeze_table_age = freeze_table_age; tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age; @@ -3116,10 +3117,10 @@ autovac_report_activity(autovac_table *tab) int len; /* Report the command and possible options */ - if (tab->at_vacoptions & VACOPT_VACUUM) + if (tab->at_vacoptions.flags & VACOPT_VACUUM) snprintf(activity, MAX_AUTOVAC_ACTIV_LEN, "autovacuum: VACUUM%s", - tab->at_vacoptions & VACOPT_ANALYZE ? " ANALYZE" : ""); + tab->at_vacoptions.flags & VACOPT_ANALYZE ? " ANALYZE" : ""); else snprintf(activity, MAX_AUTOVAC_ACTIV_LEN, "autovacuum: ANALYZE"); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 3a0b49c..93e1138 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3610,6 +3610,12 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_PARALLEL_BITMAP_SCAN: event_name = "ParallelBitmapScan"; break; + case WAIT_EVENT_PARALLEL_VACUUM_PREPARE: + event_name = "ParallelVacuumPrepare"; + break; + case WAIT_EVENT_PARALLEL_VACUUM_DONE: + event_name = "ParallelVacuumDone"; + break; case WAIT_EVENT_PROCARRAY_GROUP_UPDATE: event_name = "ProcArrayGroupUpdate"; break; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 82a707a..ead81ff 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -669,7 +669,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, VacuumStmt *stmt = (VacuumStmt *) parsetree; /* we choose to allow this during "read only" transactions */ - PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ? + PreventCommandDuringRecovery((stmt->options.flags & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"); /* forbidden in parallel mode due to CommandIsReadOnly */ ExecVacuum(stmt, isTopLevel); @@ -2498,7 +2498,7 @@ CreateCommandTag(Node *parsetree) break; case T_VacuumStmt: - if (((VacuumStmt *) parsetree)->options & VACOPT_VACUUM) + if (((VacuumStmt *) parsetree)->options.flags & VACOPT_VACUUM) tag = "VACUUM"; else tag = "ANALYZE"; diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 294ab70..159f2e4 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -2040,7 +2040,6 @@ EstimateSnapshotSpace(Snapshot snap) Size size; Assert(snap != InvalidSnapshot); - Assert(snap->satisfies == HeapTupleSatisfiesMVCC); /* We allocate any XID arrays needed in the same palloc block. */ size = add_size(sizeof(SerializedSnapshotData), diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4e41024..57bea54 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -132,6 +132,7 @@ extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, Snapshot snapshot); extern void heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan); extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); +extern BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); extern bool heap_fetch(Relation relation, Snapshot snapshot, HeapTuple tuple, Buffer *userbuf, bool keep_buf, diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 7a7b793..e7950d5 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -15,6 +15,8 @@ #define VACUUM_H #include "access/htup.h" +#include "access/heapam.h" +#include "access/parallel.h" #include "catalog/pg_statistic.h" #include "catalog/pg_type.h" #include "nodes/parsenodes.h" @@ -157,7 +159,7 @@ extern int vacuum_multixact_freeze_table_age; /* in commands/vacuum.c */ extern void ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel); -extern void vacuum(int options, List *relations, VacuumParams *params, +extern void vacuum(VacuumOptions options, List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, bool isTopLevel); extern void vac_open_indexes(Relation relation, LOCKMODE lockmode, int *nindexes, Relation **Irel); @@ -187,8 +189,9 @@ extern void vac_update_datfrozenxid(void); extern void vacuum_delay_point(void); /* in commands/vacuumlazy.c */ -extern void lazy_vacuum_rel(Relation onerel, int options, +extern void lazy_vacuum_rel(Relation onerel, VacuumOptions options, VacuumParams *params, BufferAccessStrategy bstrategy); +extern void LazyVacuumWorkerMain(dsm_segment *seg, shm_toc *toc); /* in commands/analyze.c */ extern void analyze_rel(Oid relid, RangeVar *relation, int options, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 50eec73..d9ef28c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3095,9 +3095,16 @@ typedef enum VacuumOption VACOPT_FULL = 1 << 4, /* FULL (non-concurrent) vacuum */ VACOPT_NOWAIT = 1 << 5, /* don't wait to get lock (autovacuum only) */ VACOPT_SKIPTOAST = 1 << 6, /* don't process the TOAST table, if any */ - VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7 /* don't skip any pages */ + VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7, /* don't skip any pages */ + VACOPT_PARALLEL = 1 << 8 /* do VACUUM parallelly */ } VacuumOption; +typedef struct VacuumOptions +{ + VacuumOption flags; /* OR of VacuumOption flags */ + int nworkers; /* # of parallel vacuum workers */ +} VacuumOptions; + /* * Info about a single target table of VACUUM/ANALYZE. * @@ -3116,7 +3123,7 @@ typedef struct VacuumRelation typedef struct VacuumStmt { NodeTag type; - int options; /* OR of VacuumOption flags */ + VacuumOptions options; List *rels; /* list of VacuumRelation, or NIL for all */ } VacuumStmt; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 089b7c3..e87a7bc 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -811,6 +811,8 @@ typedef enum WAIT_EVENT_MQ_SEND, WAIT_EVENT_PARALLEL_FINISH, WAIT_EVENT_PARALLEL_BITMAP_SCAN, + WAIT_EVENT_PARALLEL_VACUUM_PREPARE, + WAIT_EVENT_PARALLEL_VACUUM_DONE, WAIT_EVENT_PROCARRAY_GROUP_UPDATE, WAIT_EVENT_CLOG_GROUP_UPDATE, WAIT_EVENT_REPLICATION_ORIGIN_DROP, diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out index c440c7e..b43ef0a 100644 --- a/src/test/regress/expected/vacuum.out +++ b/src/test/regress/expected/vacuum.out @@ -80,6 +80,9 @@ CONTEXT: SQL function "do_analyze" statement 1 SQL function "wrap_do_analyze" statement 1 VACUUM FULL vactst; VACUUM (DISABLE_PAGE_SKIPPING) vaccluster; +VACUUM (PARALLEL 2) vactst; +VACUUM (PARALLEL 2, DISABLE_PAGE_SKIPPING) vactst; +VACUUM (PARALLEL 2, FREEZE) vactst; -- partitioned table CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a); CREATE TABLE vacparted1 PARTITION OF vacparted FOR VALUES IN (1); diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql index 92eaca2..ab9bc4c 100644 --- a/src/test/regress/sql/vacuum.sql +++ b/src/test/regress/sql/vacuum.sql @@ -61,6 +61,9 @@ VACUUM FULL vaccluster; VACUUM FULL vactst; VACUUM (DISABLE_PAGE_SKIPPING) vaccluster; +VACUUM (PARALLEL 2) vactst; +VACUUM (PARALLEL 2, DISABLE_PAGE_SKIPPING) vactst; +VACUUM (PARALLEL 2, FREEZE) vactst; -- partitioned table CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);