From c9736171b0480c57ce8f457a3ce1a8ee29ce02f6 Mon Sep 17 00:00:00 2001 From: Takashi Menjo Date: Wed, 24 Jun 2020 15:07:59 +0900 Subject: [PATCH v3 4/5] pg_basebackup supports non-volatile WAL buffer Now pg_basebackup copies received WAL segments onto non-volatile WAL buffer if you run it with "nvwal" mode (-Fn). You should specify a new NVWAL path with --nvwal-path option. The path will be written to postgresql.auto.conf or recovery.conf. The size of the new NVWAL is same as the master's one. --- src/bin/pg_basebackup/pg_basebackup.c | 335 +++++++++++++++++++++++++- src/bin/pg_basebackup/streamutil.c | 69 ++++++ src/bin/pg_basebackup/streamutil.h | 3 + src/bin/pg_rewind/pg_rewind.c | 4 +- src/fe_utils/recovery_gen.c | 9 +- src/include/fe_utils/recovery_gen.h | 3 +- 6 files changed, 407 insertions(+), 16 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 4f29671d0c..e56fae7f47 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -25,6 +25,9 @@ #ifdef HAVE_LIBZ #include #endif +#ifdef USE_NVWAL +#include +#endif #include "access/xlog_internal.h" #include "common/file_perm.h" @@ -127,7 +130,8 @@ typedef enum static char *basedir = NULL; static TablespaceList tablespace_dirs = {NULL, NULL}; static char *xlog_dir = NULL; -static char format = 'p'; /* p(lain)/t(ar) */ +static char format = 'p'; /* p(lain)/t(ar); 'p' even if 'nvwal' given */ +static bool format_nvwal = false; /* true if 'nvwal' given */ static char *label = "pg_basebackup base backup"; static bool noclean = false; static bool checksum_failure = false; @@ -150,14 +154,24 @@ static bool verify_checksums = true; static bool manifest = true; static bool manifest_force_encode = false; static char *manifest_checksums = NULL; +static char *nvwal_path = NULL; +#ifdef USE_NVWAL +static size_t nvwal_size = 0; +static char *nvwal_pages = NULL; +static size_t nvwal_mapped_len = 0; +#endif static bool success = false; +static bool xlogdir_is_pg_xlog = false; static bool made_new_pgdata = false; static bool found_existing_pgdata = false; static bool made_new_xlogdir = false; static bool found_existing_xlogdir = false; static bool made_tablespace_dirs = false; static bool found_tablespace_dirs = false; +#ifdef USE_NVWAL +static bool made_new_nvwal = false; +#endif /* Progress counters */ static uint64 totalsize_kb; @@ -381,7 +395,7 @@ usage(void) printf(_(" %s [OPTION]...\n"), progname); printf(_("\nOptions controlling the output:\n")); printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n")); - printf(_(" -F, --format=p|t output format (plain (default), tar)\n")); + printf(_(" -F, --format=p|t|n output format (plain (default), tar, nvwal)\n")); printf(_(" -r, --max-rate=RATE maximum transfer rate to transfer data directory\n" " (in kB/s, or use suffix \"k\" or \"M\")\n")); printf(_(" -R, --write-recovery-conf\n" @@ -389,6 +403,7 @@ usage(void) printf(_(" -T, --tablespace-mapping=OLDDIR=NEWDIR\n" " relocate tablespace in OLDDIR to NEWDIR\n")); printf(_(" --waldir=WALDIR location for the write-ahead log directory\n")); + printf(_(" --nvwal-path=NVWAL location for the NVWAL file\n")); printf(_(" -X, --wal-method=none|fetch|stream\n" " include required WAL files with specified method\n")); printf(_(" -z, --gzip compress tar output\n")); @@ -629,9 +644,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) /* In post-10 cluster, pg_xlog has been renamed to pg_wal */ snprintf(param->xlog, sizeof(param->xlog), "%s/%s", - basedir, - PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? - "pg_xlog" : "pg_wal"); + basedir, xlogdir_is_pg_xlog ? "pg_xlog" : "pg_wal"); /* Temporary replication slots are only supported in 10 and newer */ if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS) @@ -668,9 +681,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) * tar file may arrive later. */ snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status", - basedir, - PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? - "pg_xlog" : "pg_wal"); + basedir, xlogdir_is_pg_xlog ? "pg_xlog" : "pg_wal"); if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST) { @@ -1787,6 +1798,135 @@ ReceiveBackupManifestInMemoryChunk(size_t r, char *copybuf, appendPQExpBuffer(buf, copybuf, r); } +#ifdef USE_NVWAL +static void +cleanup_nvwal_atexit(void) +{ + if (success || in_log_streamer) + return; + + if (nvwal_pages != NULL) + { + pg_log_info("unmapping NVWAL"); + if (pmem_unmap(nvwal_pages, nvwal_mapped_len) < 0) + { + pg_log_error("could not unmap NVWAL: %m"); + return; + } + } + + if (nvwal_path != NULL && made_new_nvwal) + { + pg_log_info("removing NVWAL file \"%s\"", nvwal_path); + if (unlink(nvwal_path) < 0) + { + pg_log_error("could not remove NVWAL file \"%s\": %m", nvwal_path); + return; + } + } +} + +static int +filter_walseg(const struct dirent *d) +{ + char fullpath[MAXPGPATH]; + struct stat statbuf; + + if (!IsXLogFileName(d->d_name)) + return 0; + + snprintf(fullpath, sizeof(fullpath), "%s/%s/%s", + basedir, xlogdir_is_pg_xlog ? "pg_xlog" : "pg_wal", d->d_name); + + if (stat(fullpath, &statbuf) < 0) + return 0; + + if (!S_ISREG(statbuf.st_mode)) + return 0; + + if (statbuf.st_size != WalSegSz) + return 0; + + return 1; +} + +static int +compare_walseg(const struct dirent **a, const struct dirent **b) +{ + return strcmp((*a)->d_name, (*b)->d_name); +} + +static void +free_namelist(struct dirent **namelist, int nr) +{ + for (int i = 0; i < nr; ++i) + free(namelist[i]); + + free(namelist); +} + +static bool +copy_walseg_onto_nvwal(const char *segname) +{ + char fullpath[MAXPGPATH]; + int fd; + size_t off; + struct stat statbuf; + TimeLineID tli; + XLogSegNo segno; + + snprintf(fullpath, sizeof(fullpath), "%s/%s/%s", + basedir, xlogdir_is_pg_xlog ? "pg_xlog" : "pg_wal", segname); + + fd = open(fullpath, O_RDONLY); + if (fd < 0) + { + pg_log_error("could not open xlog segment \"%s\": %m", fullpath); + return false; + } + + if (fstat(fd, &statbuf) < 0) + { + pg_log_error("could not fstat xlog segment \"%s\": %m", fullpath); + goto close_on_error; + } + + if (!S_ISREG(statbuf.st_mode)) + { + pg_log_error("xlog segment \"%s\" is not a regular file", fullpath); + goto close_on_error; + } + + if (statbuf.st_size != WalSegSz) + { + pg_log_error("invalid size of xlog segment \"%s\"; expected %d, actual %zd", + fullpath, WalSegSz, (ssize_t) statbuf.st_size); + goto close_on_error; + } + + XLogFromFileName(segname, &tli, &segno, WalSegSz); + off = ((size_t) segno * WalSegSz) % nvwal_size; + + if (read(fd, &nvwal_pages[off], WalSegSz) < WalSegSz) + { + pg_log_error("could not fully read xlog segment \"%s\": %m", fullpath); + goto close_on_error; + } + + if (close(fd) < 0) + { + pg_log_error("could not close xlog segment \"%s\": %m", fullpath); + return false; + } + + return true; + +close_on_error: + (void) close(fd); + return false; +} +#endif + static void BaseBackup(void) { @@ -1845,7 +1985,8 @@ BaseBackup(void) * Build contents of configuration file if requested */ if (writerecoveryconf) - recoveryconfcontents = GenerateRecoveryConfig(conn, replication_slot); + recoveryconfcontents = GenerateRecoveryConfig(conn, replication_slot, + nvwal_path); /* * Run IDENTIFY_SYSTEM so we can get the timeline @@ -2214,6 +2355,69 @@ BaseBackup(void) exit(1); } +#ifdef USE_NVWAL + /* Copy xlog segments into NVWAL when nvwal mode */ + if (format_nvwal) + { + char xldr_path[MAXPGPATH]; + int nr_segs; + struct dirent **namelist; + + /* clear NVWAL before copying xlog segments */ + pmem_memset_persist(nvwal_pages, 0, nvwal_size); + + snprintf(xldr_path, sizeof(xldr_path), "%s/%s", + basedir, xlogdir_is_pg_xlog ? "pg_xlog" : "pg_wal"); + + /* + * Sort xlog segments in ascending order, filtering out non-segment + * files and directories. + */ + nr_segs = scandir(xldr_path, &namelist, filter_walseg, compare_walseg); + if (nr_segs < 0) + { + pg_log_error("could not scan xlog directory \"%s\": %m", xldr_path); + exit(1); + } + + /* Copy xlog segments onto NVWAL */ + for (int i = 0; i < nr_segs; ++i) + { + if (!copy_walseg_onto_nvwal(namelist[i]->d_name)) + { + free_namelist(namelist, nr_segs); + exit(1); + } + } + + /* Copy compelete; now remove all the xlog segments */ + for (int i = 0; i < nr_segs; ++i) + { + char fullpath[MAXPGPATH]; + + snprintf(fullpath, sizeof(fullpath), "%s/%s", + xldr_path, namelist[i]->d_name); + + if (unlink(fullpath) < 0) + { + pg_log_error("could not remove xlog segment \"%s\": %m", fullpath); + free_namelist(namelist, nr_segs); + exit(1); + } + } + + free_namelist(namelist, nr_segs); + + if (pmem_unmap(nvwal_pages, nvwal_mapped_len) < 0) + { + pg_log_error("could not unmap NVWAL: %m"); + exit(1); + } + nvwal_pages = NULL; + nvwal_mapped_len = 0; + } +#endif + if (verbose) pg_log_info("base backup completed"); } @@ -2255,6 +2459,7 @@ main(int argc, char **argv) {"no-manifest", no_argument, NULL, 5}, {"manifest-force-encode", no_argument, NULL, 6}, {"manifest-checksums", required_argument, NULL, 7}, + {"nvwal-path", required_argument, NULL, 8}, {NULL, 0, NULL, 0} }; int c; @@ -2295,9 +2500,27 @@ main(int argc, char **argv) break; case 'F': if (strcmp(optarg, "p") == 0 || strcmp(optarg, "plain") == 0) + { + /* See the comment for "nvwal" below */ format = 'p'; + format_nvwal = false; + } else if (strcmp(optarg, "t") == 0 || strcmp(optarg, "tar") == 0) + { + /* See the comment for "nvwal" below */ format = 't'; + format_nvwal = false; + } + else if (strcmp(optarg, "n") == 0 || strcmp(optarg, "nvwal") == 0) + { + /* + * If "nvwal" mode given, we set two variables as follows + * because it is almost same as "plain" mode, except NVWAL + * handling. + */ + format = 'p'; + format_nvwal = true; + } else { pg_log_error("invalid output format \"%s\", must be \"plain\" or \"tar\"", @@ -2352,6 +2575,9 @@ main(int argc, char **argv) case 1: xlog_dir = pg_strdup(optarg); break; + case 8: + nvwal_path = pg_strdup(optarg); + break; case 'l': label = pg_strdup(optarg); break; @@ -2533,7 +2759,7 @@ main(int argc, char **argv) { if (format != 'p') { - pg_log_error("WAL directory location can only be specified in plain mode"); + pg_log_error("WAL directory location can only be specified in plain or nvwal mode"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -2550,6 +2776,44 @@ main(int argc, char **argv) } } +#ifdef USE_NVWAL + if (format_nvwal) + { + if (nvwal_path == NULL) + { + pg_log_error("NVWAL file location must be given in nvwal mode"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + /* clean up NVWAL file name and check if it is absolute */ + canonicalize_path(nvwal_path); + if (!is_absolute_path(nvwal_path)) + { + pg_log_error("NVWAL file location must be an absolute path"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + /* We do not map NVWAL file here because we do not know its size yet */ + } + else if (nvwal_path != NULL) + { + pg_log_error("NVWAL file location can only be specified in plain or nvwal mode"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } +#else + if (format_nvwal || nvwal_path != NULL) + { + pg_log_error("this build does not support nvwal mode"); + exit(1); + } +#endif /* USE_NVWAL */ + #ifndef HAVE_LIBZ if (compresslevel != 0) { @@ -2594,6 +2858,9 @@ main(int argc, char **argv) } atexit(disconnect_atexit); + /* Remember the predicate for use after disconnection */ + xlogdir_is_pg_xlog = (PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL); + /* * Set umask so that directories/files are created with the same * permissions as directories/files in the source data directory. @@ -2620,6 +2887,16 @@ main(int argc, char **argv) if (!RetrieveWalSegSize(conn)) exit(1); +#ifdef USE_NVWAL + /* determine remote server's NVWAL size */ + if (format_nvwal) + { + nvwal_size = RetrieveNvwalSize(conn); + if (nvwal_size == 0) + exit(1); + } +#endif + /* Create pg_wal symlink, if required */ if (xlog_dir) { @@ -2632,8 +2909,7 @@ main(int argc, char **argv) * renamed to pg_wal in post-10 clusters. */ linkloc = psprintf("%s/%s", basedir, - PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? - "pg_xlog" : "pg_wal"); + xlogdir_is_pg_xlog ? "pg_xlog" : "pg_wal"); #ifdef HAVE_SYMLINK if (symlink(xlog_dir, linkloc) != 0) @@ -2648,6 +2924,41 @@ main(int argc, char **argv) free(linkloc); } +#ifdef USE_NVWAL + /* Create and map NVWAL file if required */ + if (format_nvwal) + { + int is_pmem = 0; + + nvwal_pages = pmem_map_file(nvwal_path, nvwal_size, + PMEM_FILE_CREATE|PMEM_FILE_EXCL, + pg_file_create_mode, + &nvwal_mapped_len, &is_pmem); + if (nvwal_pages == NULL) + { + pg_log_error("could not map a new NVWAL file \"%s\": %m", + nvwal_path); + exit(1); + } + + made_new_nvwal = true; + atexit(cleanup_nvwal_atexit); + + if (!is_pmem) + { + pg_log_error("NVWAL file \"%s\" is not on PMEM", nvwal_path); + exit(1); + } + + if (nvwal_size != nvwal_mapped_len) + { + pg_log_error("invalid size of NVWAL file \"%s\"; expected %zu, actual %zu", + nvwal_path, nvwal_size, nvwal_mapped_len); + exit(1); + } + } +#endif + BaseBackup(); success = true; diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 410116492e..af2bb21e4c 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -397,6 +397,75 @@ RetrieveDataDirCreatePerm(PGconn *conn) return true; } +#ifdef USE_NVWAL +/* + * Returns nvwal_size in bytes if available, 0 otherwise. + * Note that it is caller's responsibility to check if the returned + * nvwal_size is really valid, that is, multiple of WAL segment size. + */ +size_t +RetrieveNvwalSize(PGconn *conn) +{ + PGresult *res; + char unit[3]; + int val; + size_t nvwal_size; + + /* check connection existence */ + Assert(conn != NULL); + + /* fail if we do not have SHOW command */ + if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD) + { + pg_log_error("SHOW command is not supported for retrieving nvwal_size"); + return 0; + } + + res = PQexec(conn, "SHOW nvwal_size"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not send replication command \"%s\": %s", + "SHOW nvwal_size", PQerrorMessage(conn)); + + PQclear(res); + return 0; + } + if (PQntuples(res) != 1 || PQnfields(res) < 1) + { + pg_log_error("could not fetch NVWAL size: got %d rows and %d fields, expected %d rows and %d or more fields", + PQntuples(res), PQnfields(res), 1, 1); + + PQclear(res); + return 0; + } + + /* fetch value and unit from the result */ + if (sscanf(PQgetvalue(res, 0, 0), "%d%s", &val, unit) != 2) + { + pg_log_error("NVWAL size could not be parsed"); + PQclear(res); + return 0; + } + + PQclear(res); + + /* convert to bytes */ + if (strcmp(unit, "MB") == 0) + nvwal_size = ((size_t) val) << 20; + else if (strcmp(unit, "GB") == 0) + nvwal_size = ((size_t) val) << 30; + else if (strcmp(unit, "TB") == 0) + nvwal_size = ((size_t) val) << 40; + else + { + pg_log_error("unsupported NVWAL unit"); + return 0; + } + + return nvwal_size; +} +#endif + /* * Run IDENTIFY_SYSTEM through a given connection and give back to caller * some result information if requested: diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 57448656e3..b4c2ab1a74 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -41,6 +41,9 @@ extern bool RunIdentifySystem(PGconn *conn, char **sysid, XLogRecPtr *startpos, char **db_name); extern bool RetrieveWalSegSize(PGconn *conn); +#ifdef USE_NVWAL +extern size_t RetrieveNvwalSize(PGconn *conn); +#endif extern TimestampTz feGetCurrentTimestamp(void); extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs); diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c index 0015d3b461..578b37b588 100644 --- a/src/bin/pg_rewind/pg_rewind.c +++ b/src/bin/pg_rewind/pg_rewind.c @@ -360,7 +360,7 @@ main(int argc, char **argv) pg_log_info("no rewind required"); if (writerecoveryconf && !dry_run) WriteRecoveryConfig(conn, datadir_target, - GenerateRecoveryConfig(conn, NULL)); + GenerateRecoveryConfig(conn, NULL, NULL)); exit(0); } @@ -460,7 +460,7 @@ main(int argc, char **argv) if (writerecoveryconf && !dry_run) WriteRecoveryConfig(conn, datadir_target, - GenerateRecoveryConfig(conn, NULL)); + GenerateRecoveryConfig(conn, NULL, NULL)); pg_log_info("Done!"); diff --git a/src/fe_utils/recovery_gen.c b/src/fe_utils/recovery_gen.c index 46ca20e20b..1e08ec3fa8 100644 --- a/src/fe_utils/recovery_gen.c +++ b/src/fe_utils/recovery_gen.c @@ -20,7 +20,7 @@ static char *escape_quotes(const char *src); * return it. */ PQExpBuffer -GenerateRecoveryConfig(PGconn *pgconn, char *replication_slot) +GenerateRecoveryConfig(PGconn *pgconn, char *replication_slot, char *nvwal_path) { PQconninfoOption *connOptions; PQExpBufferData conninfo_buf; @@ -95,6 +95,13 @@ GenerateRecoveryConfig(PGconn *pgconn, char *replication_slot) replication_slot); } + if (nvwal_path) + { + escaped = escape_quotes(nvwal_path); + appendPQExpBuffer(contents, "nvwal_path = '%s'\n", escaped); + free(escaped); + } + if (PQExpBufferBroken(contents)) { pg_log_error("out of memory"); diff --git a/src/include/fe_utils/recovery_gen.h b/src/include/fe_utils/recovery_gen.h index c8655cd294..061c59125b 100644 --- a/src/include/fe_utils/recovery_gen.h +++ b/src/include/fe_utils/recovery_gen.h @@ -21,7 +21,8 @@ #define MINIMUM_VERSION_FOR_RECOVERY_GUC 120000 extern PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, - char *pg_replication_slot); + char *pg_replication_slot, + char *nvwal_path); extern void WriteRecoveryConfig(PGconn *pgconn, char *target_dir, PQExpBuffer contents); -- 2.17.1