From e3a4da834a79770c63c26c9859dc179911a37540 Mon Sep 17 00:00:00 2001 From: Takashi Menjo Date: Wed, 24 Jun 2020 15:07:58 +0900 Subject: [PATCH v3 3/5] walreceiver supports non-volatile WAL buffer Now walreceiver stores received records directly to non-volatile WAL buffer if applicable. --- src/backend/access/transam/xlog.c | 31 +++++++++++++++- src/backend/replication/walreceiver.c | 53 ++++++++++++++++++++++++++- src/include/access/xlog.h | 4 ++ 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 45e05b9498..2a022be36a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -925,6 +925,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); +static bool CopyXLogRecordsOnNVWAL(char *buf, Size count, XLogRecPtr startptr, + bool store); static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, @@ -12650,6 +12652,21 @@ GetLoadableSizeFromNvwal(XLogRecPtr target, Size count, XLogRecPtr *nvwalptr) */ bool CopyXLogRecordsFromNVWAL(char *buf, Size count, XLogRecPtr startptr) +{ + return CopyXLogRecordsOnNVWAL(buf, count, startptr, false); +} + +/* + * Called by walreceiver. + */ +bool +CopyXLogRecordsToNVWAL(char *buf, Size count, XLogRecPtr startptr) +{ + return CopyXLogRecordsOnNVWAL(buf, count, startptr, true); +} + +static bool +CopyXLogRecordsOnNVWAL(char *buf, Size count, XLogRecPtr startptr, bool store) { char *p; XLogRecPtr recptr; @@ -12699,7 +12716,13 @@ CopyXLogRecordsFromNVWAL(char *buf, Size count, XLogRecPtr startptr) max_copy = NvwalSize - off; copybytes = Min(nbytes, max_copy); - memcpy(p, q, copybytes); + if (store) + { + memcpy(q, p, copybytes); + nv_flush(q, copybytes); + } + else + memcpy(p, q, copybytes); /* Update state for copy */ recptr += copybytes; @@ -12711,6 +12734,12 @@ CopyXLogRecordsFromNVWAL(char *buf, Size count, XLogRecPtr startptr) return true; } +void +SyncNVWAL(void) +{ + nv_drain(); +} + static bool IsXLogSourceFromStream(XLogSource source) { diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index d1ad75da87..20922ed230 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -130,6 +130,7 @@ static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *start static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); +static void XLogWalRcvStore(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); @@ -856,7 +857,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) buf += hdrlen; len -= hdrlen; - XLogWalRcvWrite(buf, len, dataStart); + if (IsNvwalAvail()) + XLogWalRcvStore(buf, len, dataStart); + else + XLogWalRcvWrite(buf, len, dataStart); break; } case 'k': /* Keepalive */ @@ -991,6 +995,42 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); } +/* + * Like XLogWalRcvWrite, but store to non-volatile WAL buffer. + */ +static void +XLogWalRcvStore(char *buf, Size nbytes, XLogRecPtr recptr) +{ + Assert(IsNvwalAvail()); + + CopyXLogRecordsToNVWAL(buf, nbytes, recptr); + + /* + * Also write out to file if we have to archive segments. + * + * We could do this segment by segment but we reuse existing method to + * do it record by record because the former gives us more complexity + * (locking WalBufMappingLock, getting the address of the segment on + * non-volatile WAL buffer, etc). + */ + if (XLogArchiveMode == ARCHIVE_MODE_ALWAYS) + XLogWalRcvWrite(buf, nbytes, recptr); + else + { + /* + * Update status as like XLogWalRcvWrite does. + */ + + /* Update process-local status */ + XLByteToSeg(recptr + nbytes, recvSegNo, wal_segment_size); + recvFileTLI = ThisTimeLineID; + LogstreamResult.Write = recptr + nbytes; + + /* Update shared-memory status */ + pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); + } +} + /* * Flush the log to disk. * @@ -1004,7 +1044,16 @@ XLogWalRcvFlush(bool dying) { WalRcvData *walrcv = WalRcv; - issue_xlog_fsync(recvFile, recvSegNo); + /* + * We should call both SyncNVWAL and issue_xlog_fsync if we use NVWAL + * and WAL archive. So we have the following two if-statements, not + * one if-else-statement. + */ + if (IsNvwalAvail()) + SyncNVWAL(); + + if (recvFile >= 0) + issue_xlog_fsync(recvFile, recvSegNo); LogstreamResult.Flush = LogstreamResult.Write; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 75433a6dc0..e6ca151271 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -358,6 +358,10 @@ extern XLogRecPtr GetLoadableSizeFromNvwal(XLogRecPtr target, extern bool CopyXLogRecordsFromNVWAL(char *buf, Size count, XLogRecPtr startptr); +extern bool CopyXLogRecordsToNVWAL(char *buf, + Size count, + XLogRecPtr startptr); +extern void SyncNVWAL(void); /* * Routines to start, stop, and get status of a base backup. -- 2.17.1