From cdfaedd530b5b60171952dc21ad3e2a5a3e6451b Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Tue, 7 Apr 2020 22:56:27 +1200 Subject: [PATCH v10 2/3] Allow XLogReadRecord() to be non-blocking. Extend read_local_xlog_page() to support non-blocking modes: 1. Reading as far as the WAL receiver has written so far. 2. Reading all the way to the end, when the end LSN is unknown. Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/access/transam/xlogreader.c | 37 ++++-- src/backend/access/transam/xlogutils.c | 149 +++++++++++++++++------- src/backend/replication/walsender.c | 2 +- src/include/access/xlogreader.h | 14 ++- src/include/access/xlogutils.h | 26 +++++ 5 files changed, 173 insertions(+), 55 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 67996018da..aad9fc2ce1 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -261,6 +261,9 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * If the reading fails for some other reason, NULL is also returned, and * *errormsg is set to a string with details of the failure. * + * If the read_page callback is one that returns XLOGPAGEREAD_WOULDBLOCK rather + * than waiting for WAL to arrive, NULL is also returned in that case. + * * The returned pointer (or *errormsg) points to an internal buffer that's * valid until the next call to XLogReadRecord. */ @@ -550,10 +553,11 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) err: /* - * Invalidate the read state. We might read from a different source after - * failure. + * Invalidate the read state, if this was an error. We might read from a + * different source after failure. */ - XLogReaderInvalReadState(state); + if (readOff != XLOGPAGEREAD_WOULDBLOCK) + XLogReaderInvalReadState(state); if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; @@ -565,8 +569,9 @@ err: * Read a single xlog page including at least [pageptr, reqLen] of valid data * via the page_read() callback. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the page_read callback). + * Returns XLOGPAGEREAD_ERROR or XLOGPAGEREAD_WOULDBLOCK if the required page + * cannot be read for some reason; errormsg_buf is set in the former case + * (unless the error occurs in the page_read callback). * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -663,8 +668,11 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) return readLen; err: + if (readLen == XLOGPAGEREAD_WOULDBLOCK) + return XLOGPAGEREAD_WOULDBLOCK; + XLogReaderInvalReadState(state); - return -1; + return XLOGPAGEREAD_ERROR; } /* @@ -943,6 +951,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; char *errormsg; + int readLen; Assert(!XLogRecPtrIsInvalid(RecPtr)); @@ -956,7 +965,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr targetPagePtr; int targetRecOff; uint32 pageHeaderSize; - int readLen; /* * Compute targetRecOff. It should typically be equal or greater than @@ -1037,7 +1045,8 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) } err: - XLogReaderInvalReadState(state); + if (readLen != XLOGPAGEREAD_WOULDBLOCK) + XLogReaderInvalReadState(state); return InvalidXLogRecPtr; } @@ -1096,8 +1105,16 @@ WALRead(XLogReaderState *state, XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); state->routine.segment_open(state, nextSegNo, &tli); - /* This shouldn't happen -- indicates a bug in segment_open */ - Assert(state->seg.ws_file >= 0); + /* callback reported that there was no such file */ + if (state->seg.ws_file < 0) + { + errinfo->wre_errno = errno; + errinfo->wre_req = 0; + errinfo->wre_read = 0; + errinfo->wre_off = startoff; + errinfo->wre_seg = state->seg; + return false; + } /* Update the current segment info. */ state->seg.ws_tli = tli; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b2ca0cd4cf..3bc647eff1 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -25,6 +25,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -808,6 +809,29 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, path))); } +/* + * XLogReaderRoutine->segment_open callback that reports missing files rather + * than raising an error. + */ +void +wal_segment_try_open(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + + XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; + + if (errno != ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); +} + /* stock XLogReaderRoutine->segment_close callback */ void wal_segment_close(XLogReaderState *state) @@ -823,6 +847,10 @@ wal_segment_close(XLogReaderState *state) * Public because it would likely be very helpful for someone writing another * output method outside walsender, e.g. in a bgworker. * + * A pointer to an XLogReadLocalOptions struct may be passed in as + * XLogReaderRouter->page_read_private to control the behavior of this + * function. + * * TODO: The walsender has its own version of this, but it relies on the * walsender's latch being set whenever WAL is flushed. No such infrastructure * exists for normal backends, so we have to do a check/sleep/repeat style of @@ -837,58 +865,89 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, TimeLineID tli; int count; WALReadError errinfo; + XLogReadLocalOptions *options = + (XLogReadLocalOptions *) state->routine.page_read_private; loc = targetPagePtr + reqLen; /* Loop waiting for xlog to be available if necessary */ while (1) { - /* - * Determine the limit of xlog we can currently read to, and what the - * most recent timeline is. - * - * RecoveryInProgress() will update ThisTimeLineID when it first - * notices recovery finishes, so we only have to maintain it for the - * local process until recovery ends. - */ - if (!RecoveryInProgress()) - read_upto = GetFlushRecPtr(); - else - read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - tli = ThisTimeLineID; + switch (options ? options->read_upto_policy : -1) + { + case XLRO_WALRCV_WRITTEN: + /* + * We'll try to read as far as has been written by the WAL + * receiver, on the requested timeline. When we run out of valid + * data, we'll return an error. This is used by xlogprefetch.c + * while streaming. + */ + read_upto = GetWalRcvWriteRecPtr(); + state->currTLI = tli = options->tli; + break; - /* - * Check which timeline to get the record from. - * - * We have to do it each time through the loop because if we're in - * recovery as a cascading standby, the current timeline might've - * become historical. We can't rely on RecoveryInProgress() because in - * a standby configuration like - * - * A => B => C - * - * if we're a logical decoding session on C, and B gets promoted, our - * timeline will change while we remain in recovery. - * - * We can't just keep reading from the old timeline as the last WAL - * archive in the timeline will get renamed to .partial by - * StartupXLOG(). - * - * If that happens after our caller updated ThisTimeLineID but before - * we actually read the xlog page, we might still try to read from the - * old (now renamed) segment and fail. There's not much we can do - * about this, but it can only happen when we're a leaf of a cascading - * standby whose primary gets promoted while we're decoding, so a - * one-off ERROR isn't too bad. - */ - XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + case XLRO_END: + /* + * We'll try to read as far as we can on one timeline. This is + * used by xlogprefetch.c for crash recovery. + */ + read_upto = (XLogRecPtr) -1; + state->currTLI = tli = options->tli; + break; + + default: + /* + * Determine the limit of xlog we can currently read to, and what the + * most recent timeline is. + * + * RecoveryInProgress() will update ThisTimeLineID when it first + * notices recovery finishes, so we only have to maintain it for + * the local process until recovery ends. + */ + if (!RecoveryInProgress()) + read_upto = GetFlushRecPtr(); + else + read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); + tli = ThisTimeLineID; + + /* + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. We can't rely on RecoveryInProgress() + * because in a standby configuration like + * + * A => B => C + * + * if we're a logical decoding session on C, and B gets promoted, + * our timeline will change while we remain in recovery. + * + * We can't just keep reading from the old timeline as the last + * WAL archive in the timeline will get renamed to .partial by + * StartupXLOG(). + * + * If that happens after our caller updated ThisTimeLineID but + * before we actually read the xlog page, we might still try to + * read from the old (now renamed) segment and fail. There's not + * much we can do about this, but it can only happen when we're a + * leaf of a cascading standby whose primary gets promoted while + * we're decoding, so a one-off ERROR isn't too bad. + */ + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + break; + } - if (state->currTLI == ThisTimeLineID) + if (state->currTLI == tli) { if (loc <= read_upto) break; + /* not enough data there, but we were asked not to wait */ + if (options && options->nowait) + return XLOGPAGEREAD_WOULDBLOCK; + CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } @@ -930,7 +989,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - return -1; + return XLOGPAGEREAD_ERROR; } else { @@ -945,7 +1004,17 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, */ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &errinfo)) + { + /* + * When not following timeline changes, we may read past the end of + * available segments. Report lack of file as an error rather than + * raising an error. + */ + if (errinfo.wre_errno == ENOENT) + return XLOGPAGEREAD_ERROR; + WALReadRaiseError(&errinfo); + } /* number of valid bytes in the buffer */ return count; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 460ca3f947..e6a3b5073b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -830,7 +830,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) - return -1; + return XLOGPAGEREAD_ERROR; if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index b976882229..ede9b71b64 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -57,6 +57,10 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; +/* Special negative return values for XLogPageReadCB functions */ +#define XLOGPAGEREAD_ERROR -1 +#define XLOGPAGEREAD_WOULDBLOCK -2 + /* Function type definitions for various xlogreader interactions */ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, @@ -76,10 +80,11 @@ typedef struct XLogReaderRoutine * This callback shall read at least reqLen valid bytes of the xlog page * starting at targetPagePtr, and store them in readBuf. The callback * shall return the number of bytes read (never more than XLOG_BLCKSZ), or - * -1 on failure. The callback shall sleep, if necessary, to wait for the - * requested bytes to become available. The callback will not be invoked - * again for the same page unless more than the returned number of bytes - * are needed. + * XLOGPAGEREAD_ERROR on failure. The callback shall either sleep, if + * necessary, to wait for the requested bytes to become available, or + * return XLOGPAGEREAD_WOULDBLOCK. The callback will not be invoked again + * for the same page unless more than the returned number of bytes are + * needed. * * targetRecPtr is the position of the WAL record we're reading. Usually * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs @@ -91,6 +96,7 @@ typedef struct XLogReaderRoutine * read from. */ XLogPageReadCB page_read; + void *page_read_private; /* * Callback to open the specified WAL segment for reading. ->seg.ws_file diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index e59b6cf3a9..6325c23dc2 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,12 +47,38 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); +/* + * A pointer to an XLogReadLocalOptions struct can supplied as the private data + * for an XLogReader, causing read_local_xlog_page() to modify its behavior. + */ +typedef struct XLogReadLocalOptions +{ + /* Don't block waiting for new WAL to arrive. */ + bool nowait; + + /* + * For XLRO_WALRCV_WRITTEN and XLRO_END modes, the timeline ID must be + * provided. + */ + TimeLineID tli; + + /* How far to read. */ + enum { + XLRO_STANDARD, + XLRO_WALRCV_WRITTEN, + XLRO_END + } read_upto_policy; +} XLogReadLocalOptions; + extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); extern void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); +extern void wal_segment_try_open(XLogReaderState *state, + XLogSegNo nextSegNo, + TimeLineID *tli_p); extern void wal_segment_close(XLogReaderState *state); extern void XLogReadDetermineTimeline(XLogReaderState *state, -- 2.20.1