*** a/doc/src/sgml/ref/pg_receivexlog.sgml --- b/doc/src/sgml/ref/pg_receivexlog.sgml *************** *** 49,54 **** PostgreSQL documentation --- 49,61 ---- + Unlike the standby's WAL receiver, pg_receivexlog + flushes WAL data only when WAL file is closed, by default. + --synchronous option must be specified to flush WAL data + in real time and ensure it's safely flushed to disk. + + + The transaction log is streamed over a regular PostgreSQL connection, and uses the replication protocol. The connection must be made with a superuser or a user *************** *** 86,106 **** PostgreSQL documentation - - - - - Specifies the maximum time to issue sync commands to ensure the - received WAL file is safely flushed to disk, in seconds. The default - value is zero, which disables issuing fsyncs except when WAL file is - closed. If -1 is specified, WAL file is flushed as - soon as possible, that is, as soon as there are WAL data which has - not been flushed yet. - - - - - --- 93,98 ---- *************** *** 135,150 **** PostgreSQL documentation When this option is used, pg_receivexlog will report a flush position to the server, indicating when each segment has been synchronized to disk so that the server can remove that segment if it ! is not otherwise needed. When using this parameter, it is important ! to make sure that pg_receivexlog cannot become the ! synchronous standby through an incautious setting of ! ; it does not flush ! data frequently enough for this to work correctly. --- 127,152 ---- When this option is used, pg_receivexlog will report a flush position to the server, indicating when each segment has been synchronized to disk so that the server can remove that segment if it ! is not otherwise needed. --synchronous option must ! be specified when making pg_receivexlog run as ! synchronous standby by using replication slot. Otherwise WAL data ! cannot be flushed frequently enough for this to work correctly. + + + + Issue sync commands as soon as there is WAL data which has not been + flushed yet. Also status packets are sent back to the server just after + WAL data is flushed whatever --status-interval is set to. + + + + + *** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 370,376 **** LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, 0)) /* * Any errors will already have been reported in the function process, --- 370,376 ---- if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, false)) /* * Any errors will already have been reported in the function process, *** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 36,45 **** static char *basedir = NULL; static int verbose = 0; static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ - static int fsync_interval = 0; /* 0 = default */ static volatile bool time_to_abort = false; static bool do_create_slot = false; static bool do_drop_slot = false; static void usage(void); --- 36,45 ---- static int verbose = 0; static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static volatile bool time_to_abort = false; static bool do_create_slot = false; static bool do_drop_slot = false; + static bool synchronous = false; static void usage(void); *************** *** 66,77 **** usage(void) printf(_(" %s [OPTION]...\n"), progname); printf(_("\nOptions:\n")); printf(_(" -D, --directory=DIR receive transaction log files into this directory\n")); - printf(_(" -F --fsync-interval=SECS\n" - " time between fsyncs to transaction log files (default: %d)\n"), (fsync_interval / 1000)); printf(_(" -n, --no-loop do not loop on connection lost\n")); printf(_(" -s, --status-interval=SECS\n" " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); printf(_(" -S, --slot=SLOTNAME replication slot to use\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); --- 66,76 ---- printf(_(" %s [OPTION]...\n"), progname); printf(_("\nOptions:\n")); printf(_(" -D, --directory=DIR receive transaction log files into this directory\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); printf(_(" -s, --status-interval=SECS\n" " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); printf(_(" -S, --slot=SLOTNAME replication slot to use\n")); + printf(_(" --synchronous flush transaction log in real time\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); *************** *** 343,349 **** StreamLog(void) ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, stop_streaming, standby_message_timeout, ".partial", ! fsync_interval); PQfinish(conn); conn = NULL; --- 342,348 ---- ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, stop_streaming, standby_message_timeout, ".partial", ! synchronous); PQfinish(conn); conn = NULL; *************** *** 374,380 **** main(int argc, char **argv) {"port", required_argument, NULL, 'p'}, {"username", required_argument, NULL, 'U'}, {"no-loop", no_argument, NULL, 'n'}, - {"fsync-interval", required_argument, NULL, 'F'}, {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, --- 373,378 ---- *************** *** 383,388 **** main(int argc, char **argv) --- 381,387 ---- /* action */ {"create-slot", no_argument, NULL, 1}, {"drop-slot", no_argument, NULL, 2}, + {"synchronous", no_argument, NULL, 3}, {NULL, 0, NULL, 0} }; *************** *** 408,414 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nF:wWv", long_options, &option_index)) != -1) { switch (c) --- 407,413 ---- } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv", long_options, &option_index)) != -1) { switch (c) *************** *** 455,469 **** main(int argc, char **argv) case 'n': noloop = 1; break; - case 'F': - fsync_interval = atoi(optarg) * 1000; - if (fsync_interval < -1000) - { - fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"), - progname, optarg); - exit(1); - } - break; case 'v': verbose++; break; --- 454,459 ---- *************** *** 474,479 **** main(int argc, char **argv) --- 464,472 ---- case 2: do_drop_slot = true; break; + case 3: + synchronous = true; + break; default: /* *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 31,44 **** static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; - static int64 last_fsync = -1; /* timestamp of last WAL file flush */ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos, ! int fsync_interval); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, --- 31,43 ---- static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos, ! bool synchronous); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, *************** *** 55,62 **** static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, stream_stop_callback stream_stop, char *partial_suffix, XLogRecPtr *stoppos); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, ! int64 last_status, int fsync_interval, ! XLogRecPtr blockpos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); --- 54,60 ---- stream_stop_callback stream_stop, char *partial_suffix, XLogRecPtr *stoppos); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, ! int64 last_status); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); *************** *** 209,215 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) progname, current_walfile_name, partial_suffix); lastFlushPosition = pos; - last_fsync = feGetCurrentTimestamp(); return true; } --- 207,212 ---- *************** *** 440,447 **** CheckServerVersionForStreaming(PGconn *conn) * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * ! * fsync_interval controls how often we flush to the received WAL file, ! * in milliseconds. * * Note: The log position *must* be at a log segment start! */ --- 437,444 ---- * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * ! * If 'synchronous' is true, the received WAL is flushed as soon as written, ! * otherwise only when the WAL file is closed. * * Note: The log position *must* be at a log segment start! */ *************** *** 450,456 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval) { char query[128]; char slotcmd[128]; --- 447,453 ---- char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! bool synchronous) { char query[128]; char slotcmd[128]; *************** *** 595,601 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, fsync_interval); if (res == NULL) goto error; --- 592,598 ---- /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, synchronous); if (res == NULL) goto error; *************** *** 760,766 **** static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, int fsync_interval) { char *copybuf = NULL; int64 last_status = -1; --- 757,763 ---- HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, bool synchronous) { char *copybuf = NULL; int64 last_status = -1; *************** *** 784,797 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, now = feGetCurrentTimestamp(); /* ! * If fsync_interval has elapsed since last WAL flush and we've written ! * some WAL data, flush them to disk. */ if (lastFlushPosition < blockpos && ! walfile != -1 && ! ((fsync_interval > 0 && ! feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) || ! fsync_interval < 0)) { if (fsync(walfile) != 0) { --- 781,791 ---- now = feGetCurrentTimestamp(); /* ! * Issue sync command as soon as there are WAL data which ! * has not been flushed yet if synchronous option is true. */ if (lastFlushPosition < blockpos && ! walfile != -1 && synchronous) { if (fsync(walfile) != 0) { *************** *** 799,807 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, progname, current_walfile_name, strerror(errno)); goto error; } - lastFlushPosition = blockpos; ! last_fsync = now; } /* --- 793,807 ---- progname, current_walfile_name, strerror(errno)); goto error; } lastFlushPosition = blockpos; ! ! /* ! * Send feedback so that the server sees the latest WAL locations ! * immediately if synchronous option is true. ! */ ! if (!sendFeedback(conn, blockpos, now, false)) ! goto error; ! last_status = now; } /* *************** *** 821,827 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Calculate how long send/receive loops should sleep */ sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, ! last_status, fsync_interval, blockpos); r = CopyStreamReceive(conn, sleeptime, ©buf); while (r != 0) --- 821,827 ---- * Calculate how long send/receive loops should sleep */ sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, ! last_status); r = CopyStreamReceive(conn, sleeptime, ©buf); while (r != 0) *************** *** 1244,1277 **** CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, */ static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, ! int64 last_status, int fsync_interval, XLogRecPtr blockpos) { - int64 targettime = 0; int64 status_targettime = 0; - int64 fsync_targettime = 0; long sleeptime; if (standby_message_timeout && still_sending) status_targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); ! if (fsync_interval > 0 && lastFlushPosition < blockpos) ! fsync_targettime = last_fsync + ! (fsync_interval - 1) * ((int64) 1000); ! ! if ((status_targettime < fsync_targettime && status_targettime > 0) || ! fsync_targettime == 0) ! targettime = status_targettime; ! else ! targettime = fsync_targettime; ! ! if (targettime > 0) { long secs; int usecs; feTimestampDifference(now, ! targettime, &secs, &usecs); /* Always sleep at least 1 sec */ --- 1244,1265 ---- */ static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, ! int64 last_status) { int64 status_targettime = 0; long sleeptime; if (standby_message_timeout && still_sending) status_targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); ! if (status_targettime > 0) { long secs; int usecs; feTimestampDifference(now, ! status_targettime, &secs, &usecs); /* Always sleep at least 1 sec */ *** a/src/bin/pg_basebackup/receivelog.h --- b/src/bin/pg_basebackup/receivelog.h *************** *** 31,36 **** extern bool ReceiveXlogStream(PGconn *conn, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval); #endif /* RECEIVELOG_H */ --- 31,36 ---- stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! bool synchronous); #endif /* RECEIVELOG_H */