*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 49,54 **** PostgreSQL documentation
--- 49,61 ----
+ Unlike the standby's WAL receiver, pg_receivexlog>
+ flushes WAL data only when WAL file is closed, by default.
+ --synchronous> option must be specified to flush WAL data
+ in real time and ensure it's safely flushed to disk.
+
+
+
The transaction log is streamed over a regular
PostgreSQL connection, and uses the replication
protocol. The connection must be made with a superuser or a user
***************
*** 86,106 **** PostgreSQL documentation
-
-
-
-
- Specifies the maximum time to issue sync commands to ensure the
- received WAL file is safely flushed to disk, in seconds. The default
- value is zero, which disables issuing fsyncs except when WAL file is
- closed. If -1 is specified, WAL file is flushed as
- soon as possible, that is, as soon as there are WAL data which has
- not been flushed yet.
-
-
-
-
-
--- 93,98 ----
***************
*** 135,150 **** PostgreSQL documentation
When this option is used, pg_receivexlog> will report
a flush position to the server, indicating when each segment has been
synchronized to disk so that the server can remove that segment if it
! is not otherwise needed. When using this parameter, it is important
! to make sure that pg_receivexlog> cannot become the
! synchronous standby through an incautious setting of
! ; it does not flush
! data frequently enough for this to work correctly.
--- 127,152 ----
When this option is used, pg_receivexlog> will report
a flush position to the server, indicating when each segment has been
synchronized to disk so that the server can remove that segment if it
! is not otherwise needed. --synchronous option must
! be specified when making pg_receivexlog> run as
! synchronous standby by using replication slot. Otherwise WAL data
! cannot be flushed frequently enough for this to work correctly.
+
+
+
+ Issue sync commands as soon as there is WAL data which has not been
+ flushed yet. Also status packets are sent back to the server just after
+ WAL data is flushed whatever --status-interval> is set to.
+
+
+
+
+
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, 0))
/*
* Any errors will already have been reported in the function process,
--- 370,376 ----
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, false))
/*
* Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 36,45 **** static char *basedir = NULL;
static int verbose = 0;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
- static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
static bool do_create_slot = false;
static bool do_drop_slot = false;
static void usage(void);
--- 36,45 ----
static int verbose = 0;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static volatile bool time_to_abort = false;
static bool do_create_slot = false;
static bool do_drop_slot = false;
+ static bool synchronous = false;
static void usage(void);
***************
*** 66,77 **** usage(void)
printf(_(" %s [OPTION]...\n"), progname);
printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
- printf(_(" -F --fsync-interval=SECS\n"
- " time between fsyncs to transaction log files (default: %d)\n"), (fsync_interval / 1000));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -s, --status-interval=SECS\n"
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
--- 66,76 ----
printf(_(" %s [OPTION]...\n"), progname);
printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -s, --status-interval=SECS\n"
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
+ printf(_(" --synchronous flush transaction log in real time\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
***************
*** 343,349 **** StreamLog(void)
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial",
! fsync_interval);
PQfinish(conn);
conn = NULL;
--- 342,348 ----
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial",
! synchronous);
PQfinish(conn);
conn = NULL;
***************
*** 374,380 **** main(int argc, char **argv)
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-loop", no_argument, NULL, 'n'},
- {"fsync-interval", required_argument, NULL, 'F'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
--- 373,378 ----
***************
*** 383,388 **** main(int argc, char **argv)
--- 381,387 ----
/* action */
{"create-slot", no_argument, NULL, 1},
{"drop-slot", no_argument, NULL, 2},
+ {"synchronous", no_argument, NULL, 3},
{NULL, 0, NULL, 0}
};
***************
*** 408,414 **** main(int argc, char **argv)
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
--- 407,413 ----
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
long_options, &option_index)) != -1)
{
switch (c)
***************
*** 455,469 **** main(int argc, char **argv)
case 'n':
noloop = 1;
break;
- case 'F':
- fsync_interval = atoi(optarg) * 1000;
- if (fsync_interval < -1000)
- {
- fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
- progname, optarg);
- exit(1);
- }
- break;
case 'v':
verbose++;
break;
--- 454,459 ----
***************
*** 474,479 **** main(int argc, char **argv)
--- 464,472 ----
case 2:
do_drop_slot = true;
break;
+ case 3:
+ synchronous = true;
+ break;
default:
/*
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 31,44 **** static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
- static int64 last_fsync = -1; /* timestamp of last WAL file flush */
static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos,
! int fsync_interval);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
--- 31,43 ----
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos,
! bool synchronous);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
***************
*** 55,62 **** static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
stream_stop_callback stream_stop,
char *partial_suffix, XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! int64 last_status, int fsync_interval,
! XLogRecPtr blockpos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
--- 54,60 ----
stream_stop_callback stream_stop,
char *partial_suffix, XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! int64 last_status);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
***************
*** 209,215 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
progname, current_walfile_name, partial_suffix);
lastFlushPosition = pos;
- last_fsync = feGetCurrentTimestamp();
return true;
}
--- 207,212 ----
***************
*** 440,447 **** CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
! * fsync_interval controls how often we flush to the received WAL file,
! * in milliseconds.
*
* Note: The log position *must* be at a log segment start!
*/
--- 437,444 ----
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
! * If 'synchronous' is true, the received WAL is flushed as soon as written,
! * otherwise only when the WAL file is closed.
*
* Note: The log position *must* be at a log segment start!
*/
***************
*** 450,456 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! int fsync_interval)
{
char query[128];
char slotcmd[128];
--- 447,453 ----
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! bool synchronous)
{
char query[128];
char slotcmd[128];
***************
*** 595,601 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, fsync_interval);
if (res == NULL)
goto error;
--- 592,598 ----
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, synchronous);
if (res == NULL)
goto error;
***************
*** 760,766 **** static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, int fsync_interval)
{
char *copybuf = NULL;
int64 last_status = -1;
--- 757,763 ----
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, bool synchronous)
{
char *copybuf = NULL;
int64 last_status = -1;
***************
*** 784,797 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
now = feGetCurrentTimestamp();
/*
! * If fsync_interval has elapsed since last WAL flush and we've written
! * some WAL data, flush them to disk.
*/
if (lastFlushPosition < blockpos &&
! walfile != -1 &&
! ((fsync_interval > 0 &&
! feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
! fsync_interval < 0))
{
if (fsync(walfile) != 0)
{
--- 781,791 ----
now = feGetCurrentTimestamp();
/*
! * Issue sync command as soon as there are WAL data which
! * has not been flushed yet if synchronous option is true.
*/
if (lastFlushPosition < blockpos &&
! walfile != -1 && synchronous)
{
if (fsync(walfile) != 0)
{
***************
*** 799,807 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
progname, current_walfile_name, strerror(errno));
goto error;
}
-
lastFlushPosition = blockpos;
! last_fsync = now;
}
/*
--- 793,807 ----
progname, current_walfile_name, strerror(errno));
goto error;
}
lastFlushPosition = blockpos;
!
! /*
! * Send feedback so that the server sees the latest WAL locations
! * immediately if synchronous option is true.
! */
! if (!sendFeedback(conn, blockpos, now, false))
! goto error;
! last_status = now;
}
/*
***************
*** 821,827 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Calculate how long send/receive loops should sleep
*/
sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
! last_status, fsync_interval, blockpos);
r = CopyStreamReceive(conn, sleeptime, ©buf);
while (r != 0)
--- 821,827 ----
* Calculate how long send/receive loops should sleep
*/
sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
! last_status);
r = CopyStreamReceive(conn, sleeptime, ©buf);
while (r != 0)
***************
*** 1244,1277 **** CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
*/
static long
CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! int64 last_status, int fsync_interval, XLogRecPtr blockpos)
{
- int64 targettime = 0;
int64 status_targettime = 0;
- int64 fsync_targettime = 0;
long sleeptime;
if (standby_message_timeout && still_sending)
status_targettime = last_status +
(standby_message_timeout - 1) * ((int64) 1000);
! if (fsync_interval > 0 && lastFlushPosition < blockpos)
! fsync_targettime = last_fsync +
! (fsync_interval - 1) * ((int64) 1000);
!
! if ((status_targettime < fsync_targettime && status_targettime > 0) ||
! fsync_targettime == 0)
! targettime = status_targettime;
! else
! targettime = fsync_targettime;
!
! if (targettime > 0)
{
long secs;
int usecs;
feTimestampDifference(now,
! targettime,
&secs,
&usecs);
/* Always sleep at least 1 sec */
--- 1244,1265 ----
*/
static long
CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! int64 last_status)
{
int64 status_targettime = 0;
long sleeptime;
if (standby_message_timeout && still_sending)
status_targettime = last_status +
(standby_message_timeout - 1) * ((int64) 1000);
! if (status_targettime > 0)
{
long secs;
int usecs;
feTimestampDifference(now,
! status_targettime,
&secs,
&usecs);
/* Always sleep at least 1 sec */
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 31,36 **** extern bool ReceiveXlogStream(PGconn *conn,
stream_stop_callback stream_stop,
int standby_message_timeout,
char *partial_suffix,
! int fsync_interval);
#endif /* RECEIVELOG_H */
--- 31,36 ----
stream_stop_callback stream_stop,
int standby_message_timeout,
char *partial_suffix,
! bool synchronous);
#endif /* RECEIVELOG_H */