From 6839ee902bcc4e725e2144b4aa2bcb125efd05eb Mon Sep 17 00:00:00 2001 From: Takashi Menjo Date: Tue, 4 Aug 2020 13:03:02 +0900 Subject: [PATCH v4 3/3] Walreceiver WAL IO using PMDK Author: Yoshimi Ichiyanagi --- src/backend/replication/walreceiver.c | 62 ++++++++++++++++----------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index d5a9b568a6..b7fbd841ae 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -68,6 +68,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" +#include "storage/pmem.h" #include "storage/pmsignal.h" #include "storage/procarray.h" #include "storage/procsignal.h" @@ -103,6 +104,8 @@ WalReceiverFunctionsType *WalReceiverFunctions = NULL; static int recvFile = -1; static TimeLineID recvFileTLI = 0; static XLogSegNo recvSegNo = 0; +static uint32 recvOff = 0; +void *mappedFileAddr = NULL; /* * Flags set by interrupt handlers of walreceiver for later service in the @@ -610,13 +613,13 @@ WalReceiverMain(void) * End of WAL reached on the requested timeline. Close the last * segment, and await for new orders from the startup process. */ - if (recvFile >= 0) + if (recvFile >= 0 || mappedFileAddr != NULL) { char xlogfname[MAXFNAMELEN]; XLogWalRcvFlush(false); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); - if (close(recvFile) != 0) + if (do_XLogFileClose(recvFile, mappedFileAddr) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", @@ -632,6 +635,7 @@ WalReceiverMain(void) XLogArchiveNotify(xlogfname); } recvFile = -1; + mappedFileAddr = NULL; elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); WalRcvWaitForStartPosition(&startpoint, &startpointTLI); @@ -902,7 +906,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int segbytes; - if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) + if ((recvFile < 0 && mappedFileAddr == NULL) || + !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) { bool use_existent; @@ -910,7 +915,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * fsync() and close current file before we switch to next one. We * would otherwise have to reopen this file to fsync it later */ - if (recvFile >= 0) + if (recvFile >= 0 || mappedFileAddr != NULL) { char xlogfname[MAXFNAMELEN]; @@ -923,7 +928,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * process soon, so we don't advise the OS to release cache * pages associated with the file like XLogFileClose() does. */ - if (close(recvFile) != 0) + if (do_XLogFileClose(recvFile, mappedFileAddr) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", @@ -939,11 +944,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) XLogArchiveNotify(xlogfname); } recvFile = -1; + mappedFileAddr = NULL; /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); use_existent = true; - recvFile = XLogFileInit(recvSegNo, &use_existent, true); + recvFile = XLogFileInit(recvSegNo, &use_existent, true, &mappedFileAddr); recvFileTLI = ThisTimeLineID; } @@ -955,27 +961,35 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) else segbytes = nbytes; - /* OK to write the logs */ - errno = 0; - - byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); - if (byteswritten <= 0) + if (mappedFileAddr) + { + PmemFileWrite((char *) mappedFileAddr + startoff, buf, segbytes); + byteswritten = segbytes; + } + else { - char xlogfname[MAXFNAMELEN]; - int save_errno; + /* OK to write the logs */ + errno = 0; + + byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); + if (byteswritten <= 0) + { + char xlogfname[MAXFNAMELEN]; + int save_errno; - /* if write didn't set errno, assume no disk space */ - if (errno == 0) - errno = ENOSPC; + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; - save_errno = errno; - XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); - errno = save_errno; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write to log segment %s " - "at offset %u, length %lu: %m", - xlogfname, startoff, (unsigned long) segbytes))); + save_errno = errno; + XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); + errno = save_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log segment %s " + "at offset %u, length %lu: %m", + xlogfname, startoff, (unsigned long) segbytes))); + } } /* Update state for write */ -- 2.25.1