commit fba430be4f6f1d18630ffa13dc5d73a96ccaff01 Author: Dmitry Shulga Date: Wed Sep 2 15:59:01 2020 +0700 Reduce time required to recover database from archive. Originally database recovering from archive was performed by sequential receiving of files with WAL records and applying them against the database. Delivering of files containing WAL records are performed by running a command specified by the GUC parameter restore_command. In case receiving of every file containing WAL records takes long time it results in standing idle most of time waiting until files be recevied. If time required to apply WAL records from an archive file is significantly lesser than time required to deliver the file from archive it leads to nonproductive standing idle after current WAL segment is appled and before next WAL segment be received. As a consequence a wall time required to recover a database from archive log can be unacceptably long. To reduce total time required to restore database from archive running of a command specified by the GUC parameter restore_command has been isolated into a separate bgworker process and run in parallel with applying records contained in archived files. Number of started parallel bgworker processes for delivering WAL segment from archive is limited by the new GUC parameter max_restore_command_workers. Additionally, refactoring was done to extract duplicate code used in files xlog.c and xlogrecovery.c and move it in standalone functions. Author: Ivan Taranov Reviewed-by: Anna Akenteva, Marina Polyakova, Dmitry Shulga Tested-by: Roman Zharkov diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de722..ffbf8090f45 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -32,7 +32,8 @@ OBJS = \ xlogfuncs.o \ xloginsert.o \ xlogreader.o \ - xlogutils.o + xlogutils.o \ + xlogrestore.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 09c01ed4ae4..a513174f0f6 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -37,6 +37,7 @@ #include "access/xloginsert.h" #include "access/xlogreader.h" #include "access/xlogutils.h" +#include "access/xlogrestore.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" #include "catalog/pg_database.h" @@ -3681,10 +3682,11 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, xlogfname); set_ps_display(activitymsg); - restoredFromArchive = RestoreArchivedFile(path, xlogfname, - "RECOVERYXLOG", - wal_segment_size, - InRedo); + restoredFromArchive = RestoreCommandXLog(path, xlogfname, + "RECOVERYXLOG", + wal_segment_size, + InRedo); + if (!restoredFromArchive) return -1; break; diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c index 8f8734dc1d4..86afd48ae5b 100644 --- a/src/backend/access/transam/xlogarchive.c +++ b/src/backend/access/transam/xlogarchive.c @@ -22,6 +22,7 @@ #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" +#include "access/xlogutils.h" #include "common/archive.h" #include "miscadmin.h" #include "postmaster/startup.h" @@ -55,13 +56,8 @@ RestoreArchivedFile(char *path, const char *xlogfname, bool cleanupEnabled) { char xlogpath[MAXPGPATH]; - char *xlogRestoreCmd; char lastRestartPointFname[MAXPGPATH]; int rc; - struct stat stat_buf; - XLogSegNo restartSegNo; - XLogRecPtr restartRedoPtr; - TimeLineID restartTli; /* * Ignore restore_command when not in archive recovery (meaning we are in @@ -102,22 +98,7 @@ RestoreArchivedFile(char *path, const char *xlogfname, /* * Make sure there is no existing file named recovername. */ - if (stat(xlogpath, &stat_buf) != 0) - { - if (errno != ENOENT) - ereport(FATAL, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", - xlogpath))); - } - else - { - if (unlink(xlogpath) != 0) - ereport(FATAL, - (errcode_for_file_access(), - errmsg("could not remove file \"%s\": %m", - xlogpath))); - } + FileUnlink(xlogpath); /* * Calculate the archive file cutoff point for use during log shipping @@ -136,96 +117,24 @@ RestoreArchivedFile(char *path, const char *xlogfname, * flags to signify the point when we can begin deleting WAL files from * the archive. */ - if (cleanupEnabled) - { - GetOldestRestartPoint(&restartRedoPtr, &restartTli); - XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size); - XLogFileName(lastRestartPointFname, restartTli, restartSegNo, - wal_segment_size); - /* we shouldn't need anything earlier than last restart point */ - Assert(strcmp(lastRestartPointFname, xlogfname) <= 0); - } - else - XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size); + XLogFileNameLastPoint(lastRestartPointFname, cleanupEnabled); + Assert(strcmp(lastRestartPointFname, xlogfname) <= 0); - /* Build the restore command to execute */ - xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand, - xlogpath, xlogfname, - lastRestartPointFname); - if (xlogRestoreCmd == NULL) - elog(ERROR, "could not build restore command \"%s\"", - recoveryRestoreCommand); - - ereport(DEBUG3, - (errmsg_internal("executing restore command \"%s\"", - xlogRestoreCmd))); - - /* - * Check signals before restore command and reset afterwards. - */ - PreRestoreCommand(); - - /* - * Copy xlog from archival storage to XLOGDIR - */ - rc = system(xlogRestoreCmd); - - PostRestoreCommand(); - pfree(xlogRestoreCmd); + rc = DoRestore(xlogpath, xlogfname, lastRestartPointFname); if (rc == 0) { - /* - * command apparently succeeded, but let's make sure the file is - * really there now and has the correct size. - */ - if (stat(xlogpath, &stat_buf) == 0) + bool file_not_found; + bool ret = FileValidateSize(xlogpath, expectedSize, xlogfname, + &file_not_found); + if (ret) { - if (expectedSize > 0 && stat_buf.st_size != expectedSize) - { - int elevel; + strcpy(path, xlogpath); + return true; + } - /* - * If we find a partial file in standby mode, we assume it's - * because it's just being copied to the archive, and keep - * trying. - * - * Otherwise treat a wrong-sized file as FATAL to ensure the - * DBA would notice it, but is that too strong? We could try - * to plow ahead with a local copy of the file ... but the - * problem is that there probably isn't one, and we'd - * incorrectly conclude we've reached the end of WAL and we're - * done recovering ... - */ - if (StandbyMode && stat_buf.st_size < expectedSize) - elevel = DEBUG1; - else - elevel = FATAL; - ereport(elevel, - (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu", - xlogfname, - (unsigned long) stat_buf.st_size, - (unsigned long) expectedSize))); - return false; - } - else - { - ereport(LOG, - (errmsg("restored log file \"%s\" from archive", - xlogfname))); - strcpy(path, xlogpath); - return true; - } - } - else - { - /* stat failed */ - if (errno != ENOENT) - ereport(FATAL, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", - xlogpath))); - } + if (!file_not_found) + return false; } /* diff --git a/src/backend/access/transam/xlogrestore.c b/src/backend/access/transam/xlogrestore.c new file mode 100644 index 00000000000..f5c6a14cbe1 --- /dev/null +++ b/src/backend/access/transam/xlogrestore.c @@ -0,0 +1,830 @@ +/*------------------------------------------------------------------------- + * + * xlogrestore.c + * Infrastructure for parallel executing restore commands + * + * Copyright (c) 2014-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/xlogrestore.c + * + *------------------------------------------------------------------------- + */ + +#include "access/xlogrestore.h" + +#define __STDC_FORMAT_MACROS +#include +#include +#include + +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xlogarchive.h" +#include "access/xlogdefs.h" +#include "access/xlogutils.h" +#include "common/archive.h" +#include "common/file_perm.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "port.h" +#include "port/atomics.h" +#include "postmaster/bgworker.h" +#include "postmaster/startup.h" +#include "storage/ipc.h" +#include "storage/spin.h" +#include "storage/shmem.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/latch.h" +#include "storage/lock.h" +#include "tcop/tcopprot.h" +#include "utils/timestamp.h" +#include "utils/memutils.h" + +#define PREFETCH_DIR XLOGDIR "/" PG_TEMP_FILES_DIR +#define PREFETCH_RATIO (16) + +int max_restore_command_workers; + +/* + * Data for restore_command bgworker. + */ +typedef struct RestoreSlot +{ + slock_t spin; + + /* + * The name of the archive file %f. + */ + char xlogfname[MAXFNAMELEN]; + + /* + * The name of the last recovery point file %r. + */ + char pointfname[MAXFNAMELEN]; + + /* + * The restore_command was called. + */ + bool bgwdone; + + /* + * restore_command exit code + */ + int bgwrc; + + /* + * Used to check that this slot is still linked with the bgworker. + */ + uint64 bgwid; + + /* + * bgworker start time. + */ + TimestampTz bgwtime; + + /* + * The pointer is valid for the launcher only. + */ + BackgroundWorkerHandle *bgwhandle; +} RestoreSlot; + +typedef struct RestoreDataStruct +{ + /* + * Counter for bgworkers identification. + */ + uint64 counter; + + /* + * Number of prefetched files. Required to limit the number of prefetched + * files (max_restore_command_workers*PREFETCH_RATIO). Replaces direct + * counting of files in a PREFETCH_DIR. + */ + pg_atomic_uint32 nprefetched; + + /* + * Data for background workers. + */ + RestoreSlot slots[FLEXIBLE_ARRAY_MEMBER]; +} RestoreDataStruct; + +RestoreDataStruct *RestoreData = NULL; + +static bool RestoreCommandPrefetch(char *, const char *, const char *); +static void RestoreCommandPredict(const char *); +static int RestoreCommandGarbage(void); +static bool RestoreSlotSpawn(RestoreSlot *); +static void RestoreSlotWait(RestoreSlot *); +static void RestoreSlotReset(RestoreSlot *); +static void RestoreSlotSetup(RestoreSlot *, const char *, const char *); +static RestoreSlot * RestoreSlotEmplace(bool); +static RestoreSlot * RestoreSlotFind(const char *); +static RestoreSlot * RestoreSlotPredict(const char *, const char *); +static void XLogFileNamePredict(char *, const char *); +static void XLogFilePathPrefetch(char *, const char *); +static bool FilePathExists(const char *); + +/* + * Calculate a size of shared memory used for storing bgworker slots. + */ +Size +RestoreCommandShmemSize(void) +{ + Size size; + + size = sizeof(RestoreDataStruct); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_restore_command_workers, + sizeof(RestoreSlot))); + return size; + +} + +/* + * Create a temporary directory to store prepfetched files + * and initialize a shared memory used for storing bgworker slots. + */ +void +RestoreCommandShmemInit(void) +{ + bool found; + + RestoreData = (RestoreDataStruct *) + ShmemInitStruct("Restore Command Workers Data", + RestoreCommandShmemSize(), + &found); + + if (!found) + { + int i; + + /* nprefetched is also set to 0 by this */ + memset(RestoreData, 0, RestoreCommandShmemSize()); + + /* + * Initialize memory and spin locks for each worker slot. + */ + for (i = 0; i < max_restore_command_workers; ++i) + { + RestoreSlot *slot = &RestoreData->slots[i]; + + memset(slot, 0, sizeof(RestoreSlot)); + SpinLockInit(&slot->spin); + + } + + /* Create or clear temporary wals. */ + PathNameCreateTemporaryDir(XLOGDIR, PREFETCH_DIR); + RemovePgTempFilesInDir(PREFETCH_DIR, true, true); + /* before_shmem_exit(pg_on_exit_callback function, Datum arg) */ + } +} + +/* + * This function is analogue of RestoreArchivedFile function with xlogs + * prefetching. + * + * If successful, subsequent WAL files will be predicted and + * bgworkers processes be run to restore the predicted files. + * + * The number of predicted files will be limited to the number of free slots. + * + * If not successful, then fallback RestoreArchivedFile will be called. + */ +bool +RestoreCommandXLog(char *path, const char *xlogfname, const char *recovername, + const off_t expectedSize, bool cleanupEnabled) +{ + + char pointfname[MAXFNAMELEN]; + char xlogpath[MAXPGPATH]; +#ifdef USE_ASSERT_CHECKING + uint32 new_val; +#endif + + /* + * Synchronous mode. + */ + if (max_restore_command_workers < 1) + goto fallback; + + /* + * Ignore restore_command when not in archive recovery (meaning we are in + * crash recovery). + */ + if (!ArchiveRecoveryRequested) + goto fallback; + + /* + * In standby mode, restore_command might not be supplied. + */ + if (recoveryRestoreCommand == NULL || + strcmp(recoveryRestoreCommand, "") == 0) + goto fallback; + + /* + * Create the last restart point file name. + */ + XLogFileNameLastPoint(pointfname, cleanupEnabled); + + /* + * We shouldn't need anything earlier than the last restart point. + */ + Assert(strcmp(pointfname, xlogfname) <= 0); + + /* + * If the restore failed, try fallback result. + */ + if (!RestoreCommandPrefetch(xlogpath, xlogfname, pointfname)) + goto fallback; + + /* + * Make sure the file is really there now and has the correct size. + */ + if (!FileValidateSize(xlogpath, expectedSize, xlogfname, NULL)) + { + /* Remove artifacts. */ + FileUnlink(xlogpath); + goto fallback; + } + + /* + * Move file to target path. + */ + snprintf(path, MAXPGPATH, XLOGDIR "/%s", recovername); + durable_rename(xlogpath, path, ERROR); + +#ifdef USE_ASSERT_CHECKING + new_val = pg_atomic_sub_fetch_u32(&RestoreData->nprefetched, 1); + + /* + * new_val expected to be >= 0. The assert below checks that + * RestoreData->nprefetched is not wrapped around 0 after atomic decrement. + */ + Assert(new_val != UINT_MAX); +#else + pg_atomic_sub_fetch_u32(&RestoreData->nprefetched, 1); +#endif + + /* + * Log message like in RestoreArchivedFile. + */ + ereport(LOG, + (errmsg("restored log file \"%s\" from archive", + xlogfname))); + + /* + * Remove obsolete slots. + */ + if (RestoreCommandGarbage() > 0) + { + /* + * Predict next logs and spawn bgworkers. + */ + RestoreCommandPredict(xlogfname); + } + + return true; + +fallback: + + /* + * On any errors - try default implementation + */ + return RestoreArchivedFile(path, xlogfname, recovername, expectedSize, + cleanupEnabled); +} + +/* + * Attempt to retrieve the specified file from off-line archival storage + * to PREFDIR directory. + * + * Fill "path" with its complete path. + * + * Return true if command was executed successfully and file exists, or the + * file is found in the PREFDIR directory. + */ +static bool +RestoreCommandPrefetch(char *xlogpath, const char *xlogfname, + const char *pointfname) +{ + RestoreSlot *slot; + bool bgwdone; + int rc; + + /* + * Make prefetched path for file. + */ + XLogFilePathPrefetch(xlogpath, xlogfname); + + /* + * Check if file already on bgworker pool. + */ + if (!(slot = RestoreSlotFind(xlogfname))) + { + /* + * Check if file already on prefetch dir. + */ + if (FilePathExists(xlogpath)) + return true; + + /* + * Emplace a new slot and spawn bgworker. + */ + slot = RestoreSlotEmplace(true); + Assert(slot); + + /* + * When the function RestoreSlotEmplace is invoked with the 'force' + * argument having true value this function calls the function + * RestoreSlotReset(). The function RestoreSlotReset() terminates active + * bgworker process if there is a bgwhandle associated with the slot. + * In result, when RestoreSlotEmplace(true) returns a control flow, + * the process that executes RestoreCommandWorkerMain() has been already + * finished or being finished. Anyway, it is safe to reset slot's data + * used from RestoreCommandWorkerMain() without first taking a lock + * on the spin lock slot->spin. + */ + RestoreSlotSetup(slot, xlogfname, pointfname); + + if (!RestoreSlotSpawn(slot)) + ereport(FATAL, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase max_worker_processes.") + )); + + } + + RestoreSlotWait(slot); + + bgwdone = slot->bgwdone; + rc = slot->bgwrc; + + RestoreSlotReset(slot); + + /* + * bgworker didn't execute restore_command + */ + if (!bgwdone) + return false; + + /* + * bgworker failed + */ + if (rc) + { + ereport(wait_result_is_any_signal(rc, true) ? FATAL : DEBUG2, + (errmsg("could not restore file \"%s\" from archive: %s", + xlogfname, wait_result_to_str(rc)))); + } + + return FilePathExists(xlogpath); +} + +/* + * Predict next logs and spawn bgworkers while possible. + */ +static void +RestoreCommandPredict(const char *xlogfname) +{ + char pointfname[MAXFNAMELEN]; + const char *xlogfnext; + RestoreSlot *slot; + int spawn_limit; + + XLogFileNameLastPoint(pointfname, false); + xlogfnext = xlogfname; + + spawn_limit = PREFETCH_RATIO * max_restore_command_workers - + pg_atomic_read_u32(&RestoreData->nprefetched); + + while (spawn_limit-- > 0) + { + if (!(slot = RestoreSlotPredict(xlogfnext, pointfname))) + break; + + if (!RestoreSlotSpawn(slot)) + { + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase max_worker_processes.") + )); + break; + } + + ereport(DEBUG1, + (errmsg("predicted log file \"%s\"", + slot->xlogfname))); + + xlogfnext = slot->xlogfname; + } +} + +/* + * Cleaning garbage on pool. + * + * Reset the slots linked to completed bgworkers. + * + * Return free slots count. + */ +static int +RestoreCommandGarbage(void) +{ + int capacity; + int i; + + for (i = 0, capacity = 0; i < max_restore_command_workers; ++i) + { + RestoreSlot *slot = &RestoreData->slots[i]; + + SpinLockAcquire(&slot->spin); + if (slot->bgwdone) + { + int bgwrc = slot->bgwrc; + + /* + * RestoreSlotReset() terminates a process if it does exist and + * releases a memory pointed by slot->bgwhandle. RestoreSlotReset() + * must be called before invoking of ereport() since the latter + * can result in termination of current process. In this case, + * bgworker process be orphaned and memory referenced by the + * slot->bgwhandle data member wouldn't be released. Although this + * memory will be released anyway on process termination, some + * memory sanitizer tools could produce error report for such memory + * chunks. + */ + RestoreSlotReset(slot); + + if (bgwrc) + { + ereport(wait_result_is_any_signal(bgwrc, true) ? FATAL : + DEBUG2, + (errmsg("restore_command error: %s", + wait_result_to_str(bgwrc)))); + } + + capacity++; + } + else if (slot->bgwhandle == NULL) + /* + * Slots that are not assigned to any bgworker should be also + * counted. + */ + capacity++; + + SpinLockRelease(&slot->spin); + } + + return capacity; +} + +/* + * The main entry point for bgworker. + */ +void +RestoreCommandWorkerMain(Datum main_arg) +{ + int rc; + RestoreSlot *slot; + uint64 bgwid; + bool linked; + char xlogfname[MAXFNAMELEN]; + char pointfname[MAXFNAMELEN]; + char xlogpath[MAXPGPATH]; + + /* Establish signal handlers. */ + pqsignal(SIGTERM, die); + /* We're now ready to receive signals. */ + BackgroundWorkerUnblockSignals(); + + /* Get bgwid */ + memcpy(&bgwid, MyBgworkerEntry->bgw_extra, sizeof(bgwid)); + + /* Get RestoreSlot */ + slot = &RestoreData->slots[DatumGetInt16(main_arg)]; + SpinLockAcquire(&slot->spin); + if ((linked = (slot->bgwid == bgwid))) + { + strlcpy(xlogfname, slot->xlogfname, sizeof(xlogfname)); + strlcpy(pointfname, slot->pointfname, sizeof(pointfname)); + } + SpinLockRelease(&slot->spin); + + if (!linked) + ereport(FATAL, + (errmsg("slot " UINT64_FORMAT " is unlinked during a restore", + bgwid))); + + /* Prepare path. */ + XLogFilePathPrefetch(xlogpath, xlogfname); + + /* Make sure there is no existing file named recovername. */ + FileUnlink(xlogpath); + + /* Prepare and execute the restore command. */ + if ((rc = DoRestore(xlogpath, xlogfname, pointfname))) + FileUnlink(xlogpath); + + CHECK_FOR_INTERRUPTS(); + + /* Keep the results. */ + SpinLockAcquire(&slot->spin); + /* + * Retesting of the condition 'slot->bgwid == bgwid' is required to + * guard against reusing of a slot inside RestoreCommandPrefetch function + * when RestoreSlotEmplace function called with argument value equals true. + */ + if ((linked = (slot->bgwid == bgwid))) + { + slot->bgwdone = true; + slot->bgwrc = rc; + + if (FilePathExists(xlogpath)) + pg_atomic_add_fetch_u32(&RestoreData->nprefetched, 1); + + } + SpinLockRelease(&slot->spin); + + /* If slot was unlinked - delete restored file. */ + if (!linked) + { + FileUnlink(xlogpath); + + ereport(FATAL, + (errmsg("slot %s is unlinked during a restore", + xlogfname))); + } + else + ereport(DEBUG2, + (errmsg_internal("slot %s done %d \"%s\"", + xlogfname, rc, xlogpath))); + + proc_exit(0); +} + + +/* + * Setup and spawn bgworker. + * Link it to the slot by bgwhandle. + */ +static bool +RestoreSlotSpawn(RestoreSlot *slot) +{ + BackgroundWorker bgw; + + memset(&bgw, 0, sizeof(bgw)); + snprintf(bgw.bgw_name, sizeof(bgw.bgw_name), "restore %s", slot->xlogfname); + + /* + * Length of the string literal "Restore Command Worker" is less than + * size of a buffer referenced by the data member bgw.bgw_type (the size is + * limited by the constant BGW_MAXLEN that currently has value 96). + * Therefore we can use function strcpy() instead of strncpy/strlcpy to copy + * the string literal into the buffer bgw.bgw_type. The same is true for + * other two string literals "postgres" and "RestoreCommandWorkerMain" and + * their corresponding destination buffers referenced by the data members + * bgw.bgw_library_name, bgw.bgw_function_name. + * To guards against further possible change of limit represented by the + * constant BGW_MAXLEN the asserts have been inserted before invoking of + * the function strcpy() as a sanity check. In case some of these asserts be + * fired it means that some really drastic change was done in the core + * source code that should be carefully studied. + */ + Assert(sizeof(bgw.bgw_type) >= sizeof("Restore Command Worker")); + Assert(sizeof(bgw.bgw_library_name) >= sizeof("postgres")); + Assert(sizeof(bgw.bgw_function_name) >= sizeof("RestoreCommandWorkerMain")); + + strcpy(bgw.bgw_type, "Restore Command Worker"); + strcpy(bgw.bgw_library_name, "postgres"); + strcpy(bgw.bgw_function_name, "RestoreCommandWorkerMain"); + + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; + + /* + * BgWorkerStart_PostmasterStart for PM_RECOVERY, PM_STARTUP + * BgWorkerStart_ConsistentState for PM_HOT_STANDBY + */ + bgw.bgw_start_time = HotStandbyActive() ? BgWorkerStart_ConsistentState : + BgWorkerStart_PostmasterStart; + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_main_arg = Int16GetDatum(slot - RestoreData->slots); + memcpy(bgw.bgw_extra, &slot->bgwid, sizeof(slot->bgwid)); + bgw.bgw_notify_pid = MyProcPid; + return RegisterDynamicBackgroundWorker(&bgw, &slot->bgwhandle); +} + +/* + * Wait linked bgworker to shutdown. + */ +static void +RestoreSlotWait(RestoreSlot *slot) +{ + BgwHandleStatus status; + pid_t pid; + + /* is linked slot by bgworker */ + if (slot->bgwhandle == NULL) + return; + + /* WaitForBackgroundWorkerShutdown */ + for (;;) + { + status = GetBackgroundWorkerPid(slot->bgwhandle, &pid); + if (status == BGWH_STOPPED) + break; + + WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 8, + PG_WAIT_EXTENSION); + + ResetLatch(MyLatch); + HandleStartupProcInterrupts(); + } + + pfree(slot->bgwhandle); + + slot->bgwhandle = NULL; +} + +/* + * Reset slot params and terminate linked bgworker if exists. + */ +static void +RestoreSlotReset(RestoreSlot *slot) +{ + slot->xlogfname[0] = '\0'; + slot->pointfname[0] = '\0'; + slot->bgwdone = false; + slot->bgwrc = 0; + slot->bgwid = 0; + slot->bgwtime = 0; + if (slot->bgwhandle) + { + /* If there's a bgworker tied up, stop it. */ + TerminateBackgroundWorker(slot->bgwhandle); + + /* Release memory. */ + pfree(slot->bgwhandle); + slot->bgwhandle = NULL; + } +} + +/* + * Configure slot options. + */ +static void +RestoreSlotSetup(RestoreSlot * slot, const char *xlogfname, + const char *pointfname) +{ + strlcpy(slot->xlogfname, xlogfname, sizeof(slot->xlogfname)); + strlcpy(slot->pointfname, pointfname, sizeof(slot->pointfname)); + slot->bgwid = RestoreData->counter++; + slot->bgwtime = GetCurrentTimestamp(); +} + +/* + * Get a free slot. + * + * Return NULL if no free slots. + * + * If force param is TRUE and there are no free slots, + * then return the earliest slot. + */ +static RestoreSlot * +RestoreSlotEmplace(bool force) +{ + int i; + RestoreSlot *reslot; + + reslot = NULL; + for (i = 0; i < max_restore_command_workers; ++i) + { + RestoreSlot *slot = &RestoreData->slots[i]; + + if (!slot->bgwhandle) + { + reslot = slot; + break; + } + + if (reslot == NULL || slot->bgwtime < reslot->bgwtime) + reslot = slot; + } + + if (!reslot) + return NULL; + + /* Do not use linked slots in unforced mode. */ + if (!force && reslot->bgwhandle) + return NULL; + + /* Reset slot (unlink if linked). */ + SpinLockAcquire(&reslot->spin); + RestoreSlotReset(reslot); + SpinLockRelease(&reslot->spin); + + return reslot; +} + +/* + * Find a slot on pool by WAL name. + */ +static RestoreSlot * +RestoreSlotFind(const char *xlogfname) +{ + int i; + + for (i = 0; i < max_restore_command_workers; ++i) + { + RestoreSlot *slot = &RestoreData->slots[i]; + + if (!strcmp(slot->xlogfname, xlogfname)) + return slot; + } + + return NULL; +} + +/* + * Predict the next WAL name and allocate a slot for it. + * Return NULL if no slots are available. + */ +static RestoreSlot * +RestoreSlotPredict(const char *xlogfname, const char *pointfname) +{ + char xlogfnext[MAXFNAMELEN]; + char xlogpath[MAXPGPATH]; + + strlcpy(xlogfnext, xlogfname, sizeof(xlogfnext)); + + for (;;) + { + RestoreSlot *slot; + + /* already in pool */ + XLogFileNamePredict(xlogfnext, xlogfnext); + + if (RestoreSlotFind(xlogfnext)) + continue; + + /* already restored */ + XLogFilePathPrefetch(xlogpath, xlogfnext); + + if (FilePathExists(xlogpath)) + continue; + + if (!(slot = RestoreSlotEmplace(false))) + break; + + RestoreSlotSetup(slot, xlogfnext, pointfname); + return slot; + } + + return NULL; +} + +/* + * Predict the name of the next WAL file "xlognext", + * based on the specified "xlogfname". + */ +static void +XLogFileNamePredict(char *xlogfnext, const char *xlogfname) +{ + uint32 tli; + XLogSegNo segno; + + XLogFromFileName(xlogfname, &tli, &segno, wal_segment_size); + XLogFileName(xlogfnext, tli, segno + 1, wal_segment_size); +} + +static void +XLogFilePathPrefetch(char *path, const char *xlogfname) +{ + snprintf(path, MAXPGPATH, PREFETCH_DIR "/%s", xlogfname); +} + +/* + * Check that the path does exist. + */ +static bool +FilePathExists(const char *xlogpath) +{ + struct stat statbuf; + + if (stat(xlogpath, &statbuf) == 0) + return true; + + if (errno != ENOENT) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + xlogpath))); + + return false; +} diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 7e915bcadf1..78afaf31f6b 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -18,13 +18,16 @@ #include "postgres.h" #include +#include #include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" +#include "common/archive.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/startup.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -979,3 +982,169 @@ WALReadRaiseError(WALReadError *errinfo) (Size) errinfo->wre_req))); } } + +/* + * Remove a file if it does exist. + */ +void +FileUnlink(const char *file_path) +{ + struct stat statbuf; + + if (stat(file_path, &statbuf)) + { + if (errno != ENOENT) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + file_path))); + } + else + { + if (unlink(file_path) != 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", + file_path))); + } +} + +/* + * Get the last valid restart point file name. + * + * If cleanup is not enabled, initialise the last restart point file name + * with InvalidXLogRecPtr, which will prevent the deletion of any WAL files + * from the archive because of the alphabetic sorting property of WAL + * filenames. + */ +void +XLogFileNameLastPoint(char *lastRestartPointFname, bool cleanupEnabled) +{ + XLogSegNo restartSegNo; + XLogRecPtr restartRedoPtr; + TimeLineID restartTli; + + if (cleanupEnabled) + { + GetOldestRestartPoint(&restartRedoPtr, &restartTli); + XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size); + XLogFileName(lastRestartPointFname, restartTli, restartSegNo, + wal_segment_size); + } + else + XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size); +} + +/* + * Check a file is really there now and has correct size. + * + * Return true if the file does exist and has correct size, + * else return false. + * + * If the output variable file_not_found is not null it's assigned + * either true or false value depending on whether the file does exist + * or not. + */ +bool +FileValidateSize(const char *xlogpath, off_t expectedSize, + const char *xlogfname, bool *file_not_found) +{ + struct stat stat_buf; + + if (stat(xlogpath, &stat_buf) == 0) + { + if (file_not_found) + *file_not_found = false; + + if (expectedSize > 0 && stat_buf.st_size != expectedSize) + { + int elevel; + + /* + * If we find a partial file in standby mode, we assume it's + * because it's just being copied to the archive, and keep + * trying. + * + * Otherwise treat a wrong-sized file as FATAL to ensure the + * DBA would notice it, but is that too strong? We could try + * to plow ahead with a local copy of the file ... but the + * problem is that there probably isn't one, and we'd + * incorrectly conclude we've reached the end of WAL and we're + * done recovering ... + */ + if (StandbyMode && stat_buf.st_size < expectedSize) + elevel = DEBUG1; + else + elevel = FATAL; + ereport(elevel, + (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu", + xlogfname, + (unsigned long) stat_buf.st_size, + (unsigned long) expectedSize))); + return false; + } + else + { + ereport(LOG, + (errmsg("restored log file \"%s\" from archive", + xlogfname))); + return true; + } + } + else + { + /* stat failed */ + if (errno != ENOENT) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + xlogpath))); + if (file_not_found) + *file_not_found = true; + + return false; + } + +} + +/* + * Build and execute restore_command. + * + * Return the result of command execution (the exit status of the shell), + * or -1 if a system error occurred. A return value of 127 means + * the execution of the shell failed. + */ +int +DoRestore(const char *xlogpath, const char *xlogfname, const char *pointfname) +{ + char *xlogRestoreCmd; + int rc; + + /* Build a restore command to execute */ + xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand, xlogpath, + xlogfname, pointfname); + + if (xlogRestoreCmd == NULL) + elog(PANIC, "could not build restore command \"%s\"", + recoveryRestoreCommand); + + ereport(DEBUG3, + (errmsg_internal("executing restore command \"%s\"", + xlogRestoreCmd))); + + /* + * Check signals before restore command and reset afterwards. + */ + PreRestoreCommand(); + + /* + * Execute + */ + rc = system(xlogRestoreCmd); + + PostRestoreCommand(); + + pfree(xlogRestoreCmd); + + return rc; +} diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index beb5e85434c..55c21490945 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/parallel.h" +#include "access/xlogrestore.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -128,6 +129,9 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "RestoreCommandWorkerMain", RestoreCommandWorkerMain } }; diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 96c2aaabbd6..93e0167454f 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/subtrans.h" #include "access/syncscan.h" #include "access/twophase.h" +#include "access/xlogrestore.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, RestoreCommandShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -259,6 +261,7 @@ CreateSharedMemoryAndSemaphores(void) WalSndShmemInit(); WalRcvShmemInit(); ApplyLauncherShmemInit(); + RestoreCommandShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index de87ad6ef70..d3e633dd12e 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -37,6 +37,7 @@ #include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogrestore.h" #include "catalog/namespace.h" #include "catalog/pg_authid.h" #include "catalog/storage.h" @@ -196,6 +197,7 @@ static const char *show_tcp_keepalives_count(void); static const char *show_tcp_user_timeout(void); static bool check_maxconnections(int *newval, void **extra, GucSource source); static bool check_max_worker_processes(int *newval, void **extra, GucSource source); +static bool check_max_restore_command_workers(int *newval, void **extra, GucSource source); static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source); static bool check_max_wal_senders(int *newval, void **extra, GucSource source); static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source); @@ -2982,6 +2984,18 @@ static struct config_int ConfigureNamesInt[] = check_max_worker_processes, NULL, NULL }, + { + {"max_restore_command_workers", + PGC_POSTMASTER, + WAL_ARCHIVE_RECOVERY, + gettext_noop("Maximum number of restore_command worker processes."), + NULL, + }, + &max_restore_command_workers, + 0, 0, MAX_PARALLEL_WORKER_LIMIT, + check_max_restore_command_workers, NULL, NULL + }, + { {"max_logical_replication_workers", PGC_POSTMASTER, @@ -11568,6 +11582,19 @@ check_max_worker_processes(int *newval, void **extra, GucSource source) return true; } +static bool +check_max_restore_command_workers(int *newval, void **extra, GucSource source) +{ + if (*newval > max_worker_processes) + { + GUC_check_errdetail("A value of max_restore_command_worker can't " + "exceed a value of max_worker_processes=%d", max_worker_processes); + return false; + } + + return true; +} + static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source) { diff --git a/src/include/access/xlogrestore.h b/src/include/access/xlogrestore.h new file mode 100644 index 00000000000..3e5cf867a56 --- /dev/null +++ b/src/include/access/xlogrestore.h @@ -0,0 +1,16 @@ +#ifndef XLOGRESTORE_H +#define XLOGRESTORE_H + +#include "postgres.h" + +extern int max_restore_command_workers; + +extern Size RestoreCommandShmemSize(void); +extern void RestoreCommandShmemInit(void); +extern bool RestoreCommandXLog(char *path, const char *xlogfname, + const char *recovername, + const off_t expectedSize, + bool cleanupEnabled); +extern void RestoreCommandWorkerMain(Datum main_arg) pg_attribute_noreturn(); + +#endif /* XLOGRESTORE_H */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index e59b6cf3a9f..c040a8ced70 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -60,4 +60,13 @@ extern void XLogReadDetermineTimeline(XLogReaderState *state, extern void WALReadRaiseError(WALReadError *errinfo); +extern void FileUnlink(const char *xlogpath); +extern void XLogFileNameLastPoint(char *lastRestartPointFname, + bool cleanupEnabled); +extern bool FileValidateSize(char const *xlogpath, off_t expectedSize, + char const *xlogfname, bool *file_not_found); + +extern int DoRestore(char const *xlogpath, char const *xlogfname, + char const *pointfname); + #endif diff --git a/src/test/recovery/t/021_xlogrestore.pl b/src/test/recovery/t/021_xlogrestore.pl new file mode 100644 index 00000000000..970ae6778ca --- /dev/null +++ b/src/test/recovery/t/021_xlogrestore.pl @@ -0,0 +1,143 @@ +# +# Test for xlogrestore with max_restore_command_workers parameter +# + +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 9; + +sub measure_replica_restore_time +{ + my ( $replica_name, $node_primary, $backup_name, $last_lsn, $tab_int_count, $config ) = @_; + my $timer = time(); + + # Initialize replica node from backup, fetching WAL from archives + my $node_replica = get_new_node( $replica_name ); + $node_replica->init_from_backup( $node_primary, $backup_name, + has_restoring => 1 ); + $node_replica->append_conf( 'postgresql.conf', $config ); + $node_replica->start(); + + # Wait until necessary replay has been done on replica + my $caughtup_query = + "SELECT '$last_lsn'::pg_lsn <= pg_last_wal_replay_lsn()"; + $node_replica->poll_query_until( 'postgres', $caughtup_query ) + or die "Timed out while waiting for replica to catch up"; + + # Check tab_int's rows count + my $replica_tab_int_count = + $node_replica->safe_psql( 'postgres', "SELECT count(*) FROM tab_int" ); + is( $replica_tab_int_count, $tab_int_count, 'tab_int sizes are equal' ); + + # Check the presence of temporary files specifically generated during + # archive recovery. + $node_replica->promote(); + + my $node_replica_data = $node_replica->data_dir; + ok( !-f "$node_replica_data/pg_wal/RECOVERYHISTORY", + "RECOVERYHISTORY removed after promotion"); + ok( !-f "$node_replica_data/pg_wal/RECOVERYXLOG", + "RECOVERYXLOG removed after promotion"); + ok( !-d "$node_replica_data/pg_wal/prefetch", + "pg_wal/prefetch dir removed after promotion"); + + my $res = time() - $timer; + + $node_replica->stop(); + return $res; +} + +# WAL produced count +my $wal_count = 64; + +# Size of data portion +my $wal_data_portion = 128; + +# Restore bgworkers count +my $max_restore_command_workers = 4; + +# Sleep to imitate restore delays +my $restore_sleep = 0.256; + +# Minimum expected acceleration of the restore process. +# Is this formula correct? +my $handicap = ( $wal_count * $restore_sleep ) / $max_restore_command_workers; + +# Initialize primary node, doing archives +my $node_primary = get_new_node( 'primary' ); +$node_primary->init( + has_archiving => 1, + allows_streaming => 1 +); + +# Start it +$node_primary->start; + +# Take backup for replica. +my $backup_name = 'my_backup'; +$node_primary->backup( $backup_name ); + +# Create some content on primary server that will be not present on replicas. +for ( my $i = 0; $i < $wal_count; $i++ ) +{ + if ( $i == 0 ) { + $node_primary->safe_psql('postgres', + "CREATE TABLE tab_int ( a SERIAL NOT NULL PRIMARY KEY );") + } else { + $node_primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT FROM generate_series( 1, $wal_data_portion );") + } + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal()"); +} + +my $last_lsn = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn();"); +my $tab_int_count = $node_primary->safe_psql('postgres', "SELECT count(*) FROM tab_int;"); + +$node_primary->stop(); + +# Restore command +my $restore_command; +my $path = TestLib::perl2host( $node_primary->archive_dir ); +if ( $TestLib::windows_os ) { + $path =~ s{\\}{\\\\}g; + $restore_command = qq(perl -e "select( undef, undef, undef, $restore_sleep );" & copy "$path\\\\%f" "%p); +} else { + $restore_command = qq(sleep ) . $restore_sleep . qq( && cp "$path/%f" "%p"); +} + +#diag( $restore_command ); + +# Compare the replica restore times with different max_restore_command_workers values. +#diag( 'multiple_workers_restore_time' ); +my $multiple_workers_restore_time = measure_replica_restore_time( + 'fast_restored_replica', + $node_primary, + $backup_name, + $last_lsn, + $tab_int_count, +qq( +wal_retrieve_retry_interval = '100ms' +max_restore_command_workers = $max_restore_command_workers +restore_command = '$restore_command' +)); + +#diag( 'single_worker_restore_time' ); +my $single_worker_restore_time = measure_replica_restore_time( + 'normal_restored_replica', + $node_primary, + $backup_name, + $last_lsn, + $tab_int_count, +qq( +wal_retrieve_retry_interval = '100ms' +max_restore_command_workers = 0 +restore_command = '$restore_command' +)); + +#diag( $multiple_workers_restore_time ); +#diag( $single_worker_restore_time ); +#diag( $handicap ); + +ok( $multiple_workers_restore_time + $handicap < $single_worker_restore_time, "Multiple workers are faster than a single worker" ); diff --git a/src/test/regress/expected/guc.out b/src/test/regress/expected/guc.out index 811f80a0976..be35f346df6 100644 --- a/src/test/regress/expected/guc.out +++ b/src/test/regress/expected/guc.out @@ -776,3 +776,24 @@ set default_with_oids to f; -- Should not allow to set it to true. set default_with_oids to t; ERROR: tables declared WITH OIDS are not supported +-- +-- PGPRO-3691: Check that a value for the new configration parameter +-- max_restore_command_workers is limited by a value of the parameter +-- max_worker_processes +SHOW max_worker_processes; + max_worker_processes +---------------------- + 8 +(1 row) + +-- max_worker_processes has default value 8 +-- Check that an attempt to set the parameter max_restore_command_workers +-- to a value execeeding this limit results in error +ALTER SYSTEM SET max_restore_command_workers = 16; -- fails, it is expected behaviour +ERROR: invalid value for parameter "max_restore_command_workers": 16 +DETAIL: A value of max_restore_command_worker can't exceed a value of max_worker_processes=8 +-- Check that a value lesser than max_worker_processes can be assigned +-- to the parameter max_restore_command_workers +ALTER SYSTEM SET max_restore_command_workers = 7; -- ok since 7 < max_worker_processes +-- Reset to default +ALTER SYSTEM RESET max_restore_command_workers; diff --git a/src/test/regress/sql/guc.sql b/src/test/regress/sql/guc.sql index 43dbba3775e..e7b8a4c0c5e 100644 --- a/src/test/regress/sql/guc.sql +++ b/src/test/regress/sql/guc.sql @@ -296,3 +296,22 @@ reset check_function_bodies; set default_with_oids to f; -- Should not allow to set it to true. set default_with_oids to t; + +-- +-- PGPRO-3691: Check that a value for the new configration parameter +-- max_restore_command_workers is limited by a value of the parameter +-- max_worker_processes +SHOW max_worker_processes; + +-- max_worker_processes has default value 8 +-- Check that an attempt to set the parameter max_restore_command_workers +-- to a value execeeding this limit results in error + +ALTER SYSTEM SET max_restore_command_workers = 16; -- fails, it is expected behaviour + +-- Check that a value lesser than max_worker_processes can be assigned +-- to the parameter max_restore_command_workers +ALTER SYSTEM SET max_restore_command_workers = 7; -- ok since 7 < max_worker_processes + +-- Reset to default +ALTER SYSTEM RESET max_restore_command_workers;