From 0cb1f9197350d76ad8ef1fc2115afb7abdfc4fdc Mon Sep 17 00:00:00 2001 From: Takashi Menjo Date: Wed, 24 Jun 2020 15:07:57 +0900 Subject: [PATCH v3 2/5] Non-volatile WAL buffer Now external WAL buffer becomes non-volatile. Bumps PG_CONTROL_VERSION. --- src/backend/access/transam/xlog.c | 1154 ++++++++++++++++-- src/backend/access/transam/xlogreader.c | 24 + src/bin/pg_controldata/pg_controldata.c | 3 + src/include/access/xlog.h | 8 + src/include/catalog/pg_control.h | 17 +- src/test/regress/expected/misc_functions.out | 14 +- src/test/regress/sql/misc_functions.sql | 14 +- 7 files changed, 1097 insertions(+), 137 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0681ba1262..45e05b9498 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -654,6 +654,13 @@ typedef struct XLogCtlData TimeLineID ThisTimeLineID; TimeLineID PrevTimeLineID; + /* + * Used for non-volatile WAL buffer (NVWAL). + * + * All the records up to this LSN are persistent in NVWAL. + */ + XLogRecPtr persistentUpTo; + /* * SharedRecoveryState indicates if we're still in crash or archive * recovery. Protected by info_lck. @@ -783,11 +790,13 @@ typedef enum XLOG_FROM_ANY = 0, /* request to read WAL from any source */ XLOG_FROM_ARCHIVE, /* restored using restore_command */ XLOG_FROM_PG_WAL, /* existing file in pg_wal */ - XLOG_FROM_STREAM /* streamed from master */ + XLOG_FROM_NVWAL, /* non-volatile WAL buffer */ + XLOG_FROM_STREAM, /* streamed from master via segment file */ + XLOG_FROM_STREAM_NVWAL /* same as above, but via NVWAL */ } XLogSource; /* human-readable names for XLogSources, for debugging output */ -static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"}; +static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "nvwal", "stream", "stream_nvwal"}; /* * openLogFile is -1 or a kernel FD for an open log file segment. @@ -922,6 +931,7 @@ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static void XLogFileClose(void); +static void PreallocNonVolatileXlogBuffer(void); static void PreallocXlogFiles(XLogRecPtr endptr); static void RemoveTempXlogFiles(void); static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecPtr endptr); @@ -1204,6 +1214,43 @@ XLogInsertRecord(XLogRecData *rdata, } } + /* + * Request a checkpoint here if non-volatile WAL buffer is used and we + * have consumed too much WAL since the last checkpoint. + * + * We first screen under the condition (1) OR (2) below: + * + * (1) The record was the first one in a certain segment. + * (2) The record was inserted across segments. + * + * We then check the segment number which the record was inserted into. + */ + if (NvwalAvail && inserted && + (StartPos % wal_segment_size == SizeOfXLogLongPHD || + StartPos / wal_segment_size < EndPos / wal_segment_size)) + { + XLogSegNo end_segno; + + XLByteToSeg(EndPos, end_segno, wal_segment_size); + + /* + * NOTE: We do not signal walsender here because the inserted record + * have not drained by NVWAL buffer yet. + * + * NOTE: We do not signal walarchiver here because the inserted record + * have not flushed to a segment file. So we don't need to update + * XLogCtl->lastSegSwitch{Time,LSN}, used only by CheckArchiveTimeout. + */ + + /* Two-step checking for speed (see also XLogWrite) */ + if (IsUnderPostmaster && XLogCheckpointNeeded(end_segno)) + { + (void) GetRedoRecPtr(); + if (XLogCheckpointNeeded(end_segno)) + RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); + } + } + #ifdef WAL_DEBUG if (XLOG_DEBUG) { @@ -2136,6 +2183,15 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) XLogRecPtr NewPageBeginPtr; XLogPageHeader NewPage; int npages = 0; + bool is_firstpage; + + if (NvwalAvail) + elog(DEBUG1, "XLogCtl->InitializedUpTo %X/%X; upto %X/%X; opportunistic %s", + (uint32) (XLogCtl->InitializedUpTo >> 32), + (uint32) XLogCtl->InitializedUpTo, + (uint32) (upto >> 32), + (uint32) upto, + opportunistic ? "true" : "false"); LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); @@ -2197,7 +2253,25 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) { /* Have to write it ourselves */ TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START(); - WriteRqst.Write = OldPageRqstPtr; + + if (NvwalAvail) + { + /* + * If we use non-volatile WAL buffer, it is a special + * but expected case to write the buffer pages out to + * segment files, and for simplicity, it is done in + * segment by segment. + */ + XLogRecPtr OldSegEndPtr; + + OldSegEndPtr = OldPageRqstPtr - XLOG_BLCKSZ + wal_segment_size; + Assert(OldSegEndPtr % wal_segment_size == 0); + + WriteRqst.Write = OldSegEndPtr; + } + else + WriteRqst.Write = OldPageRqstPtr; + WriteRqst.Flush = 0; XLogWrite(WriteRqst, false); LWLockRelease(WALWriteLock); @@ -2224,7 +2298,20 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) * Be sure to re-zero the buffer so that bytes beyond what we've * written will look like zeroes and not valid XLOG records... */ - MemSet((char *) NewPage, 0, XLOG_BLCKSZ); + if (NvwalAvail) + { + /* + * We do not take the way that combines MemSet() and pmem_persist() + * because pmem_persist() may use slow and strong-ordered cache + * flush instruction if weak-ordered fast one is not supported. + * Instead, we first fill the buffer with zero by + * pmem_memset_persist() that can leverage non-temporal fast store + * instructions, then make the header persistent later. + */ + nv_memset_persist(NewPage, 0, XLOG_BLCKSZ); + } + else + MemSet((char *) NewPage, 0, XLOG_BLCKSZ); /* * Fill the new page's header @@ -2256,7 +2343,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) /* * If first page of an XLOG segment file, make it a long header. */ - if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0) + is_firstpage = ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0); + if (is_firstpage) { XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; @@ -2271,7 +2359,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) * before the xlblocks update. GetXLogBuffer() reads xlblocks without * holding a lock. */ - pg_write_barrier(); + if (NvwalAvail) + { + /* Make the header persistent on PMEM */ + nv_persist(NewPage, is_firstpage ? SizeOfXLogLongPHD : SizeOfXLogShortPHD); + } + else + pg_write_barrier(); *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr; @@ -2281,6 +2375,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) } LWLockRelease(WALBufMappingLock); + if (NvwalAvail) + elog(DEBUG1, "ControlFile->discardedUpTo %X/%X; XLogCtl->InitializedUpTo %X/%X", + (uint32) (ControlFile->discardedUpTo >> 32), + (uint32) ControlFile->discardedUpTo, + (uint32) (XLogCtl->InitializedUpTo >> 32), + (uint32) XLogCtl->InitializedUpTo); + #ifdef WAL_DEBUG if (XLOG_DEBUG && npages > 0) { @@ -2662,6 +2763,23 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) LogwrtResult.Flush = LogwrtResult.Write; } + /* + * Update discardedUpTo if NVWAL is used. A new value should not fall + * behind the old one. + */ + if (NvwalAvail) + { + Assert(LogwrtResult.Write == LogwrtResult.Flush); + + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + if (ControlFile->discardedUpTo < LogwrtResult.Write) + { + ControlFile->discardedUpTo = LogwrtResult.Write; + UpdateControlFile(); + } + LWLockRelease(ControlFileLock); + } + /* * Update shared-memory status * @@ -2866,6 +2984,123 @@ XLogFlush(XLogRecPtr record) return; } + if (NvwalAvail) + { + XLogRecPtr FromPos; + + /* + * No page on the NVWAL is to be flushed to segment files. Instead, + * we wait all the insertions preceding this one complete. We will + * wait for all the records to be persistent on the NVWAL below. + */ + record = WaitXLogInsertionsToFinish(record); + + /* + * Check if another backend already have done what I am doing. + * + * We can compare something <= XLogCtl->persistentUpTo without + * holding XLogCtl->info_lck spinlock because persistentUpTo is + * monotonically increasing and can be loaded atomically on each + * NVWAL-supported platform (now x64 only). + */ + FromPos = *((volatile XLogRecPtr *) &XLogCtl->persistentUpTo); + if (record <= FromPos) + return; + + /* + * In a very rare case, we rounded whole the NVWAL. We do not need + * to care old pages here because they already have been evicted to + * segment files at record insertion. + * + * In such a case, we flush whole the NVWAL. We also log it as + * warning because it can be time-consuming operation. + * + * TODO Advance XLogCtl->persistentUpTo at the end of XLogWrite, and + * we can remove the following first if-block. + */ + if (record - FromPos > NvwalSize) + { + elog(WARNING, "flush whole the NVWAL; FromPos %X/%X; record %X/%X", + (uint32) (FromPos >> 32), (uint32) FromPos, + (uint32) (record >> 32), (uint32) record); + + nv_flush(XLogCtl->pages, NvwalSize); + } + else + { + char *frompos; + char *uptopos; + size_t fromoff; + size_t uptooff; + + /* + * Flush each record that is probably not flushed yet. + * + * We have two reasons why we say "probably". The first is because + * such a record copied with non-temporal store instruction has + * already "flushed" but we cannot distinguish it. nv_flush is + * harmless for it in consistency. + * + * The second reason is that the target record might have already + * been evicted to a segment file until now. Also in this case, + * nv_flush is harmless in consistency. + */ + uptooff = record % NvwalSize; + uptopos = XLogCtl->pages + uptooff; + fromoff = FromPos % NvwalSize; + frompos = XLogCtl->pages + fromoff; + + /* Handles rotation */ + if (uptopos <= frompos) + { + nv_flush(frompos, NvwalSize - fromoff); + fromoff = 0; + frompos = XLogCtl->pages; + } + + nv_flush(frompos, uptooff - fromoff); + } + + /* + * To guarantee durability ("D" of ACID), we should satisfy the + * following two for each transaction X: + * + * (1) All the WAL records inserted by X, including the commit record + * of X, should persist on NVWAL before the server commits X. + * + * (2) All the WAL records inserted by any other transactions than + * X, that have less LSN than the commit record just inserted + * by X, should persist on NVWAL before the server commits X. + * + * The (1) can be satisfied by a store barrier after the commit record + * of X is flushed because each WAL record on X is already flushed in + * the end of its insertion. The (2) can be satisfied by waiting for + * any record insertions that have less LSN than the commit record just + * inserted by X, and by a store barrier as well. + * + * Now is the time. Have a store barrier. + */ + nv_drain(); + + /* + * Remember where the last persistent record is. A new value should + * not fall behind the old one. + */ + SpinLockAcquire(&XLogCtl->info_lck); + if (XLogCtl->persistentUpTo < record) + XLogCtl->persistentUpTo = record; + SpinLockRelease(&XLogCtl->info_lck); + + /* + * The records up to the returned "record" have been persisntent on + * NVWAL. Now signal walsenders. + */ + WalSndWakeupRequest(); + WalSndWakeupProcessRequests(); + + return; + } + /* Quick exit if already known flushed */ if (record <= LogwrtResult.Flush) return; @@ -3049,6 +3284,13 @@ XLogBackgroundFlush(void) if (RecoveryInProgress()) return false; + /* + * Quick exit if NVWAL buffer is used and archiving is not active. In this + * case, we need no WAL segment file in pg_wal directory. + */ + if (NvwalAvail && !XLogArchivingActive()) + return false; + /* read LogwrtResult and update local state */ SpinLockAcquire(&XLogCtl->info_lck); LogwrtResult = XLogCtl->LogwrtResult; @@ -3067,6 +3309,18 @@ XLogBackgroundFlush(void) flexible = false; /* ensure it all gets written */ } + /* + * If NVWAL is used, back off to the last compeleted segment boundary + * for writing the buffer page to files in segment by segment. We do so + * nowhere but here after XLogCtl->asyncXactLSN is loaded because it + * should be considered. + */ + if (NvwalAvail) + { + WriteRqst.Write -= WriteRqst.Write % wal_segment_size; + flexible = false; /* ensure it all gets written */ + } + /* * If already known flushed, we're done. Just need to check if we are * holding an open file handle to a logfile that's no longer in use, @@ -3093,7 +3347,12 @@ XLogBackgroundFlush(void) flushbytes = WriteRqst.Write / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ; - if (WalWriterFlushAfter == 0 || lastflush == 0) + if (NvwalAvail) + { + WriteRqst.Flush = WriteRqst.Write; + lastflush = now; + } + else if (WalWriterFlushAfter == 0 || lastflush == 0) { /* first call, or block based limits disabled */ WriteRqst.Flush = WriteRqst.Write; @@ -3152,7 +3411,28 @@ XLogBackgroundFlush(void) * Great, done. To take some work off the critical path, try to initialize * as many of the no-longer-needed WAL buffers for future use as we can. */ - AdvanceXLInsertBuffer(InvalidXLogRecPtr, true); + if (NvwalAvail && max_wal_senders == 0) + { + XLogRecPtr upto; + + /* + * If NVWAL is used and there is no walsender, nobody is to load + * segments on the buffer. So let's recycle segments up to {where we + * have requested to write and flush} + NvwalSize. + * + * Note that if NVWAL is used and a walsender seems running, we have to + * do nothing; keep the written pages on the buffer for walsenders to be + * loaded from the buffer, not from the segment files. Note that the + * buffer pages are eventually to be recycled by checkpoint. + */ + Assert(WriteRqst.Write == WriteRqst.Flush); + Assert(WriteRqst.Write % wal_segment_size == 0); + + upto = WriteRqst.Write + NvwalSize; + AdvanceXLInsertBuffer(upto - XLOG_BLCKSZ, false); + } + else + AdvanceXLInsertBuffer(InvalidXLogRecPtr, true); /* * If we determined that we need to write data, but somebody else @@ -3885,6 +4165,43 @@ XLogFileClose(void) ReleaseExternalFD(); } +/* + * Preallocate non-volatile XLOG buffers. + * + * This zeroes buffers and prepare page headers up to + * ControlFile->discardedUpTo + S, where S is the total size of + * the non-volatile XLOG buffers. + * + * It is caller's responsibility to update ControlFile->discardedUpTo + * and to set XLogCtl->InitializedUpTo properly. + */ +static void +PreallocNonVolatileXlogBuffer(void) +{ + XLogRecPtr newupto, + InitializedUpTo; + + Assert(NvwalAvail); + + LWLockAcquire(ControlFileLock, LW_SHARED); + newupto = ControlFile->discardedUpTo; + LWLockRelease(ControlFileLock); + + InitializedUpTo = XLogCtl->InitializedUpTo; + + newupto += NvwalSize; + Assert(newupto % wal_segment_size == 0); + + if (newupto <= InitializedUpTo) + return; + + /* + * Subtracting XLOG_BLCKSZ is important, because AdvanceXLInsertBuffer + * handles the first argument as the beginning of pages, not the end. + */ + AdvanceXLInsertBuffer(newupto - XLOG_BLCKSZ, false); +} + /* * Preallocate log files beyond the specified log endpoint. * @@ -4181,8 +4498,11 @@ RemoveXlogFile(const char *segname, XLogRecPtr lastredoptr, XLogRecPtr endptr) * Before deleting the file, see if it can be recycled as a future log * segment. Only recycle normal files, pg_standby for example can create * symbolic links pointing to a separate archive directory. + * + * If NVWAL buffer is used, a log segment file is never to be recycled + * (that is, always go into else block). */ - if (wal_recycle && + if (!NvwalAvail && wal_recycle && endlogSegNo <= recycleSegNo && lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) && InstallXLogFileSegment(&endlogSegNo, path, @@ -4600,6 +4920,7 @@ InitControlFile(uint64 sysidentifier) memcpy(ControlFile->mock_authentication_nonce, mock_auth_nonce, MOCK_AUTH_NONCE_LEN); ControlFile->state = DB_SHUTDOWNED; ControlFile->unloggedLSN = FirstNormalUnloggedLSN; + ControlFile->discardedUpTo = (NvwalAvail) ? wal_segment_size : InvalidXLogRecPtr; /* Set important parameter values for use when replaying WAL */ ControlFile->MaxConnections = MaxConnections; @@ -5430,41 +5751,58 @@ BootStrapXLOG(void) record->xl_crc = crc; /* Create first XLOG segment file */ - use_existent = false; - openLogFile = XLogFileInit(1, &use_existent, false); + if (NvwalAvail) + { + pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE); + nv_memcpy_nodrain(XLogCtl->pages + wal_segment_size, page, XLOG_BLCKSZ); + pgstat_report_wait_end(); - /* - * We needn't bother with Reserve/ReleaseExternalFD here, since we'll - * close the file again in a moment. - */ + pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC); + nv_drain(); + pgstat_report_wait_end(); - /* Write the first page with the initial record */ - errno = 0; - pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE); - if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write bootstrap write-ahead log file: %m"))); + /* + * Other WAL stuffs will be initialized in startup process. + */ } - pgstat_report_wait_end(); + else + { + use_existent = false; + openLogFile = XLogFileInit(1, &use_existent, false); - pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC); - if (pg_fsync(openLogFile) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not fsync bootstrap write-ahead log file: %m"))); - pgstat_report_wait_end(); + /* + * We needn't bother with Reserve/ReleaseExternalFD here, since we'll + * close the file again in a moment. + */ - if (close(openLogFile) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not close bootstrap write-ahead log file: %m"))); + /* Write the first page with the initial record */ + errno = 0; + pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE); + if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write bootstrap write-ahead log file: %m"))); + } + pgstat_report_wait_end(); - openLogFile = -1; + pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC); + if (pg_fsync(openLogFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not fsync bootstrap write-ahead log file: %m"))); + pgstat_report_wait_end(); + + if (close(openLogFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close bootstrap write-ahead log file: %m"))); + + openLogFile = -1; + } /* Now create pg_control */ InitControlFile(sysidentifier); @@ -5718,41 +6056,47 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog) * happens in the middle of a segment, copy data from the last WAL segment * of the old timeline up to the switch point, to the starting WAL segment * on the new timeline. + * + * If non-volatile WAL buffer is used, no new segment file is created. Data + * up to the switch point will be copied into NVWAL buffer by StartupXLOG(). */ - if (endLogSegNo == startLogSegNo) - { - /* - * Make a copy of the file on the new timeline. - * - * Writing WAL isn't allowed yet, so there are no locking - * considerations. But we should be just as tense as XLogFileInit to - * avoid emplacing a bogus file. - */ - XLogFileCopy(endLogSegNo, endTLI, endLogSegNo, - XLogSegmentOffset(endOfLog, wal_segment_size)); - } - else + if (!NvwalAvail) { - /* - * The switch happened at a segment boundary, so just create the next - * segment on the new timeline. - */ - bool use_existent = true; - int fd; + if (endLogSegNo == startLogSegNo) + { + /* + * Make a copy of the file on the new timeline. + * + * Writing WAL isn't allowed yet, so there are no locking + * considerations. But we should be just as tense as XLogFileInit to + * avoid emplacing a bogus file. + */ + XLogFileCopy(endLogSegNo, endTLI, endLogSegNo, + XLogSegmentOffset(endOfLog, wal_segment_size)); + } + else + { + /* + * The switch happened at a segment boundary, so just create the next + * segment on the new timeline. + */ + bool use_existent = true; + int fd; - fd = XLogFileInit(startLogSegNo, &use_existent, true); + fd = XLogFileInit(startLogSegNo, &use_existent, true); - if (close(fd) != 0) - { - char xlogfname[MAXFNAMELEN]; - int save_errno = errno; + if (close(fd) != 0) + { + char xlogfname[MAXFNAMELEN]; + int save_errno = errno; - XLogFileName(xlogfname, ThisTimeLineID, startLogSegNo, - wal_segment_size); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", xlogfname))); + XLogFileName(xlogfname, ThisTimeLineID, startLogSegNo, + wal_segment_size); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", xlogfname))); + } } } @@ -7009,6 +7353,11 @@ StartupXLOG(void) InRecovery = true; } + /* Dump discardedUpTo just before REDO */ + elog(LOG, "ControlFile->discardedUpTo %X/%X", + (uint32) (ControlFile->discardedUpTo >> 32), + (uint32) ControlFile->discardedUpTo); + /* REDO */ if (InRecovery) { @@ -7795,10 +8144,88 @@ StartupXLOG(void) Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec); Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog); + if (NvwalAvail) + { + XLogRecPtr discardedUpTo; + + discardedUpTo = ControlFile->discardedUpTo; + Assert(discardedUpTo == InvalidXLogRecPtr || + discardedUpTo % wal_segment_size == 0); + + if (discardedUpTo == InvalidXLogRecPtr) + { + elog(DEBUG1, "brand-new NVWAL"); + + /* The following "Tricky point" is to initialize the buffer */ + } + else if (EndOfLog <= discardedUpTo) + { + elog(DEBUG1, "no record on NVWAL has been UNDONE"); + + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + ControlFile->discardedUpTo = InvalidXLogRecPtr; + UpdateControlFile(); + LWLockRelease(ControlFileLock); + + nv_memset_persist(XLogCtl->pages, 0, NvwalSize); + + /* The following "Tricky point" is to initialize the buffer */ + } + else + { + int last_idx; + int idx; + XLogRecPtr ptr; + + elog(DEBUG1, "some records on NVWAL have been UNDONE; keep them"); + + /* + * Initialize xlblock array because we decided to keep UNDONE + * records on NVWAL buffer; or each page on the buffer that meets + * xlblocks == 0 (initialized as so by XLOGShmemInit) is to be + * accidentally cleared by the following AdvanceXLInsertBuffer! + * + * Two cases can be considered: + * + * 1) EndOfLog is on a page boundary (divisible by XLOG_BLCKSZ): + * Initialize up to (and including) the page containing the last + * record. That page should end with EndOfLog. The one more + * next page "N" beginning with EndOfLog is to be untouched + * because, in such a very corner case that all the NVWAL + * buffer pages are already filled, page N is on the same + * location as the first page "F" beginning with discardedUpTo. + * Of cource we should not overwrite the page F. + * + * In this case, we first get XLogRecPtrToBufIdx(EndOfLog) as + * last_idx, indicating the page N. Then, we go forward from + * the page F up to (but excluding) page N that have the same + * index as the page F. + * + * 2) EndOfLog is not on a page boundary: Initialize all the pages + * but the page "L" having the last record. The page L is to be + * initialized by the following "Tricky point", including its + * content. + * + * In either case, XLogCtl->InitializedUpTo is to be initialized in + * the following "Tricky" if-else block. + */ + + last_idx = XLogRecPtrToBufIdx(EndOfLog); + + ptr = discardedUpTo; + for (idx = XLogRecPtrToBufIdx(ptr); idx != last_idx; + idx = NextBufIdx(idx)) + { + ptr += XLOG_BLCKSZ; + XLogCtl->xlblocks[idx] = ptr; + } + } + } + /* - * Tricky point here: readBuf contains the *last* block that the LastRec - * record spans, not the one it starts in. The last block is indeed the - * one we want to use. + * Tricky point here: readBuf contains the *last* block that the + * LastRec record spans, not the one it starts in. The last block is + * indeed the one we want to use. */ if (EndOfLog % XLOG_BLCKSZ != 0) { @@ -7818,6 +8245,9 @@ StartupXLOG(void) memcpy(page, xlogreader->readBuf, len); memset(page + len, 0, XLOG_BLCKSZ - len); + if (NvwalAvail) + nv_persist(page, XLOG_BLCKSZ); + XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ; XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ; } @@ -7831,12 +8261,54 @@ StartupXLOG(void) XLogCtl->InitializedUpTo = EndOfLog; } - LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + if (NvwalAvail) + { + XLogRecPtr SegBeginPtr; + + /* + * If NVWAL buffer is used, writing records out to segment files should + * be done in segment by segment. So Logwrt{Rqst,Result} (and also + * discardedUpTo) should be multiple of wal_segment_size. Let's get + * them back off to the last segment boundary. + */ + + SegBeginPtr = EndOfLog - (EndOfLog % wal_segment_size); + LogwrtResult.Write = LogwrtResult.Flush = SegBeginPtr; + XLogCtl->LogwrtResult = LogwrtResult; + XLogCtl->LogwrtRqst.Write = SegBeginPtr; + XLogCtl->LogwrtRqst.Flush = SegBeginPtr; + + /* + * persistentUpTo does not need to be multiple of wal_segment_size, + * and should be drained-up-to LSN. walsender will use it to load + * records from NVWAL buffer. + */ + XLogCtl->persistentUpTo = EndOfLog; + + /* Update discardedUpTo in pg_control if still invalid */ + if (ControlFile->discardedUpTo == InvalidXLogRecPtr) + { + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + ControlFile->discardedUpTo = SegBeginPtr; + UpdateControlFile(); + LWLockRelease(ControlFileLock); + } + + elog(DEBUG1, "EndOfLog: %X/%X", + (uint32) (EndOfLog >> 32), (uint32) EndOfLog); - XLogCtl->LogwrtResult = LogwrtResult; + elog(DEBUG1, "SegBeginPtr: %X/%X", + (uint32) (SegBeginPtr >> 32), (uint32) SegBeginPtr); + } + else + { + LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; - XLogCtl->LogwrtRqst.Write = EndOfLog; - XLogCtl->LogwrtRqst.Flush = EndOfLog; + XLogCtl->LogwrtResult = LogwrtResult; + + XLogCtl->LogwrtRqst.Write = EndOfLog; + XLogCtl->LogwrtRqst.Flush = EndOfLog; + } /* * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE @@ -7967,6 +8439,7 @@ StartupXLOG(void) char origpath[MAXPGPATH]; char partialfname[MAXFNAMELEN]; char partialpath[MAXPGPATH]; + XLogRecPtr discardedUpTo; XLogFilePath(origpath, EndOfLogTLI, endLogSegNo, wal_segment_size); snprintf(partialfname, MAXFNAMELEN, "%s.partial", origfname); @@ -7978,6 +8451,53 @@ StartupXLOG(void) */ XLogArchiveCleanup(partialfname); + /* + * If NVWAL is also used for archival recovery, write old + * records out to segment files to archive them. Note that we + * need locks related to WAL because LocalXLogInsertAllowed + * already got to -1. + */ + discardedUpTo = ControlFile->discardedUpTo; + if (NvwalAvail && discardedUpTo != InvalidXLogRecPtr && + discardedUpTo < EndOfLog) + { + XLogwrtRqst WriteRqst; + TimeLineID thisTLI = ThisTimeLineID; + XLogRecPtr SegBeginPtr = + EndOfLog - (EndOfLog % wal_segment_size); + + /* + * XXX Assume that all the records have the same TLI. + */ + ThisTimeLineID = EndOfLogTLI; + + WriteRqst.Write = EndOfLog; + WriteRqst.Flush = 0; + + LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); + XLogWrite(WriteRqst, false); + + /* + * Force back-off to the last segment boundary. + */ + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + ControlFile->discardedUpTo = SegBeginPtr; + UpdateControlFile(); + LWLockRelease(ControlFileLock); + + LogwrtResult.Write = LogwrtResult.Flush = SegBeginPtr; + + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->LogwrtResult = LogwrtResult; + XLogCtl->LogwrtRqst.Write = SegBeginPtr; + XLogCtl->LogwrtRqst.Flush = SegBeginPtr; + SpinLockRelease(&XLogCtl->info_lck); + + LWLockRelease(WALWriteLock); + + ThisTimeLineID = thisTLI; + } + durable_rename(origpath, partialpath, ERROR); XLogArchiveNotify(partialfname); } @@ -7987,7 +8507,10 @@ StartupXLOG(void) /* * Preallocate additional log files, if wanted. */ - PreallocXlogFiles(EndOfLog); + if (NvwalAvail) + PreallocNonVolatileXlogBuffer(); + else + PreallocXlogFiles(EndOfLog); /* * Okay, we're officially UP. @@ -8550,10 +9073,24 @@ GetInsertRecPtr(void) /* * GetFlushRecPtr -- Returns the current flush position, ie, the last WAL * position known to be fsync'd to disk. + * + * If NVWAL is used, this returns the last persistent WAL position instead. */ XLogRecPtr GetFlushRecPtr(void) { + if (NvwalAvail) + { + XLogRecPtr ret; + + SpinLockAcquire(&XLogCtl->info_lck); + LogwrtResult = XLogCtl->LogwrtResult; + ret = XLogCtl->persistentUpTo; + SpinLockRelease(&XLogCtl->info_lck); + + return ret; + } + SpinLockAcquire(&XLogCtl->info_lck); LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); @@ -8853,6 +9390,9 @@ CreateCheckPoint(int flags) VirtualTransactionId *vxids; int nvxids; + /* for non-volatile WAL buffer */ + XLogRecPtr newDiscardedUpTo = 0; + /* * An end-of-recovery checkpoint is really a shutdown checkpoint, just * issued at a different time. @@ -9164,6 +9704,22 @@ CreateCheckPoint(int flags) */ PriorRedoPtr = ControlFile->checkPointCopy.redo; + /* + * If non-volatile WAL buffer is used, discardedUpTo should be updated and + * persist on the control file. So the new value should be caluculated + * here. + * + * TODO Do not copy and paste codes... + */ + if (NvwalAvail) + { + XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); + KeepLogSeg(recptr, &_logSegNo); + _logSegNo--; + + newDiscardedUpTo = _logSegNo * wal_segment_size; + } + /* * Update the control file. */ @@ -9172,6 +9728,16 @@ CreateCheckPoint(int flags) ControlFile->state = DB_SHUTDOWNED; ControlFile->checkPoint = ProcLastRecPtr; ControlFile->checkPointCopy = checkPoint; + if (NvwalAvail) + { + /* + * A new value should not fall behind the old one. + */ + if (ControlFile->discardedUpTo < newDiscardedUpTo) + ControlFile->discardedUpTo = newDiscardedUpTo; + else + newDiscardedUpTo = ControlFile->discardedUpTo; + } ControlFile->time = (pg_time_t) time(NULL); /* crash recovery should always recover to the end of WAL */ ControlFile->minRecoveryPoint = InvalidXLogRecPtr; @@ -9189,6 +9755,44 @@ CreateCheckPoint(int flags) UpdateControlFile(); LWLockRelease(ControlFileLock); + /* + * If we use non-volatile XLOG buffer, update XLogCtl->Logwrt{Rqst,Result} + * so that the XLOG records older than newDiscardedUpTo are treated as + * "already written and flushed." + */ + if (NvwalAvail) + { + Assert(newDiscardedUpTo > 0); + + /* Update process-local variables */ + LogwrtResult.Write = LogwrtResult.Flush = newDiscardedUpTo; + + /* + * Update shared-memory variables. We need both light-weight lock and + * spin lock to update them. + */ + LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); + SpinLockAcquire(&XLogCtl->info_lck); + + /* + * Note that there can be a corner case that process-local + * LogwrtResult falls behind shared XLogCtl->LogwrtResult if whole the + * non-volatile XLOG buffer is filled and some pages are written out + * to segment files between UpdateControlFile and LWLockAcquire above. + * + * TODO For now, we ignore that case because it can hardly occur. + */ + XLogCtl->LogwrtResult = LogwrtResult; + + if (XLogCtl->LogwrtRqst.Write < newDiscardedUpTo) + XLogCtl->LogwrtRqst.Write = newDiscardedUpTo; + if (XLogCtl->LogwrtRqst.Flush < newDiscardedUpTo) + XLogCtl->LogwrtRqst.Flush = newDiscardedUpTo; + + SpinLockRelease(&XLogCtl->info_lck); + LWLockRelease(WALWriteLock); + } + /* Update shared-memory copy of checkpoint XID/epoch */ SpinLockAcquire(&XLogCtl->info_lck); XLogCtl->ckptFullXid = checkPoint.nextFullXid; @@ -9212,22 +9816,48 @@ CreateCheckPoint(int flags) if (PriorRedoPtr != InvalidXLogRecPtr) UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr); - /* - * Delete old log files, those no longer needed for last checkpoint to - * prevent the disk holding the xlog from growing full. - */ - XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(recptr, &_logSegNo); - InvalidateObsoleteReplicationSlots(_logSegNo); - _logSegNo--; - RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr); + if (NvwalAvail) + { + /* + * We already set _logSegNo to the value equivalent to discardedUpTo. + * We first increment it to call InvalidateObsoleteReplicationSlots. + */ + _logSegNo++; + InvalidateObsoleteReplicationSlots(_logSegNo); + + /* + * Then we decrement _logSegNo again to remove WAL segment files + * having spilled out of non-volatile WAL buffer. + * + * Note that you should set wal_recycle to off to remove segment files. + */ + _logSegNo--; + RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr); + } + else + { + /* + * Delete old log files, those no longer needed for last checkpoint to + * prevent the disk holding the xlog from growing full. + */ + XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); + KeepLogSeg(recptr, &_logSegNo); + InvalidateObsoleteReplicationSlots(_logSegNo); + _logSegNo--; + RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr); + } /* * Make more log segments if needed. (Do this after recycling old log * segments, since that may supply some of the needed files.) */ if (!shutdown) - PreallocXlogFiles(recptr); + { + if (NvwalAvail) + PreallocNonVolatileXlogBuffer(); + else + PreallocXlogFiles(recptr); + } /* * Truncate pg_subtrans if possible. We can throw away all data before @@ -11971,6 +12601,170 @@ CancelBackup(void) } } +/* + * Is NVWAL used? + */ +bool +IsNvwalAvail(void) +{ + return NvwalAvail; +} + +/* + * Returns the size we can load from NVWAL and sets nvwalptr to load-from LSN. + */ +Size +GetLoadableSizeFromNvwal(XLogRecPtr target, Size count, XLogRecPtr *nvwalptr) +{ + XLogRecPtr readUpTo; + XLogRecPtr discardedUpTo; + + Assert(IsNvwalAvail()); + Assert(nvwalptr != NULL); + + readUpTo = target + count; + + LWLockAcquire(ControlFileLock, LW_SHARED); + discardedUpTo = ControlFile->discardedUpTo; + LWLockRelease(ControlFileLock); + + /* Check if all the records are on WAL segment files */ + if (readUpTo <= discardedUpTo) + return 0; + + /* Check if all the records are on NVWAL */ + if (discardedUpTo <= target) + { + *nvwalptr = target; + return count; + } + + /* Some on WAL segment files, some on NVWAL */ + *nvwalptr = discardedUpTo; + return (Size) (readUpTo - discardedUpTo); +} + +/* + * It is like WALRead @ xlogreader.c, but loads from non-volatile WAL + * buffer. + */ +bool +CopyXLogRecordsFromNVWAL(char *buf, Size count, XLogRecPtr startptr) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + Assert(NvwalAvail); + + p = buf; + recptr = startptr; + nbytes = count; + + /* + * Hold shared WALBufMappingLock to let others not rotate WAL buffer + * while copying WAL records from it. We do not need exclusive lock + * because we will not rotate the buffer in this function. + */ + LWLockAcquire(WALBufMappingLock, LW_SHARED); + + while (nbytes > 0) + { + char *q; + Size off; + Size max_copy; + Size copybytes; + XLogRecPtr discardedUpTo; + + LWLockAcquire(ControlFileLock, LW_SHARED); + discardedUpTo = ControlFile->discardedUpTo; + LWLockRelease(ControlFileLock); + + /* Check if the records we need have been already evicted or not */ + if (recptr < discardedUpTo) + { + LWLockRelease(WALBufMappingLock); + + /* TODO error handling? */ + return false; + } + + /* + * Get the target address on non-volatile WAL buffer and the size we + * can copy from it at once because the buffer can rotate and we + * might have to copy what we want devided into two or more. + */ + off = recptr % NvwalSize; + q = XLogCtl->pages + off; + max_copy = NvwalSize - off; + copybytes = Min(nbytes, max_copy); + + memcpy(p, q, copybytes); + + /* Update state for copy */ + recptr += copybytes; + nbytes -= copybytes; + p += copybytes; + } + + LWLockRelease(WALBufMappingLock); + return true; +} + +static bool +IsXLogSourceFromStream(XLogSource source) +{ + switch (source) + { + case XLOG_FROM_STREAM: + case XLOG_FROM_STREAM_NVWAL: + return true; + + default: + return false; + } +} + +static bool +IsXLogSourceFromNvwal(XLogSource source) +{ + switch (source) + { + case XLOG_FROM_NVWAL: + case XLOG_FROM_STREAM_NVWAL: + return true; + + default: + return false; + } +} + +static bool +NeedsForMoreXLog(XLogRecPtr targetChunkEndPtr) +{ + switch (readSource) + { + case XLOG_FROM_ARCHIVE: + case XLOG_FROM_PG_WAL: + return (readFile < 0); + + case XLOG_FROM_NVWAL: + Assert(NvwalAvail); + return false; + + case XLOG_FROM_STREAM: + return (flushedUpto < targetChunkEndPtr); + + case XLOG_FROM_STREAM_NVWAL: + Assert(NvwalAvail); + return (flushedUpto < targetChunkEndPtr); + + default: /* XLOG_FROM_ANY */ + Assert(readFile < 0); + return true; + } +} + /* * Read the XLOG page containing RecPtr into readBuf (if not read already). * Returns number of bytes read, if the page is read successfully, or -1 @@ -12012,7 +12806,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, * See if we need to switch to a new segment because the requested record * is not in the currently open one. */ - if (readFile >= 0 && + if ((readFile >= 0 || IsXLogSourceFromNvwal(readSource)) && !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size)) { /* @@ -12029,7 +12823,8 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, } } - close(readFile); + if (readFile >= 0) + close(readFile); readFile = -1; readSource = XLOG_FROM_ANY; } @@ -12038,9 +12833,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, retry: /* See if we need to retrieve more data */ - if (readFile < 0 || - (readSource == XLOG_FROM_STREAM && - flushedUpto < targetPagePtr + reqLen)) + if (NeedsForMoreXLog(targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, private->randAccess, @@ -12061,7 +12854,7 @@ retry: * At this point, we have the right segment open and if we're streaming we * know the requested record is in it. */ - Assert(readFile != -1); + Assert(readFile != -1 || IsXLogSourceFromNvwal(readSource)); /* * If the current segment is being streamed from master, calculate how @@ -12069,7 +12862,7 @@ retry: * requested record has been received, but this is for the benefit of * future calls, to allow quick exit at the top of this function. */ - if (readSource == XLOG_FROM_STREAM) + if (IsXLogSourceFromStream(readSource)) { if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ)) readLen = XLOG_BLCKSZ; @@ -12080,41 +12873,59 @@ retry: else readLen = XLOG_BLCKSZ; - /* Read the requested page */ readOff = targetPageOff; - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); - if (r != XLOG_BLCKSZ) + if (IsXLogSourceFromNvwal(readSource)) { - char fname[MAXFNAMELEN]; - int save_errno = errno; + Size offset = (Size) (targetPagePtr % NvwalSize); + char *readpos = XLogCtl->pages + offset; + + Assert(offset % XLOG_BLCKSZ == 0); + /* Load the requested page from non-volatile WAL buffer */ + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); + memcpy(readBuf, readpos, readLen); pgstat_report_wait_end(); - XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); - if (r < 0) + + /* There are not any other clues of TLI... */ + xlogreader->seg.ws_tli = ((XLogPageHeader) readBuf)->xlp_tli; + } + else + { + /* Read the requested page from file */ + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); + r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); + if (r != XLOG_BLCKSZ) { - errno = save_errno; - ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); + char fname[MAXFNAMELEN]; + int save_errno = errno; + + pgstat_report_wait_end(); + XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); + if (r < 0) + { + errno = save_errno; + ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), + (errcode_for_file_access(), + errmsg("could not read from log segment %s, offset %u: %m", + fname, readOff))); + } + else + ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read from log segment %s, offset %u: read %d of %zu", + fname, readOff, r, (Size) XLOG_BLCKSZ))); + goto next_record_is_invalid; } - else - ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read from log segment %s, offset %u: read %d of %zu", - fname, readOff, r, (Size) XLOG_BLCKSZ))); - goto next_record_is_invalid; + pgstat_report_wait_end(); + + xlogreader->seg.ws_tli = curFileTLI; } - pgstat_report_wait_end(); Assert(targetSegNo == readSegNo); Assert(targetPageOff == readOff); Assert(reqLen <= readLen); - xlogreader->seg.ws_tli = curFileTLI; - /* * Check the page header immediately, so that we can retry immediately if * it's not valid. This may seem unnecessary, because XLogReadRecord() @@ -12148,6 +12959,17 @@ retry: goto next_record_is_invalid; } + /* + * Updating curFileTLI on each page verified if non-volatile WAL buffer + * is used because there is no TimeLineID information in NVWAL's filename. + */ + if (IsXLogSourceFromNvwal(readSource) && + curFileTLI != xlogreader->latestPageTLI) + { + curFileTLI = xlogreader->latestPageTLI; + elog(DEBUG1, "curFileTLI: %u", curFileTLI); + } + return readLen; next_record_is_invalid: @@ -12229,7 +13051,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (!InArchiveRecovery) currentSource = XLOG_FROM_PG_WAL; else if (currentSource == XLOG_FROM_ANY || - (!StandbyMode && currentSource == XLOG_FROM_STREAM)) + (!StandbyMode && IsXLogSourceFromStream(currentSource))) { lastSourceFailed = false; currentSource = XLOG_FROM_ARCHIVE; @@ -12252,6 +13074,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, { case XLOG_FROM_ARCHIVE: case XLOG_FROM_PG_WAL: + case XLOG_FROM_NVWAL: /* * Check to see if the trigger file exists. Note that we @@ -12265,6 +13088,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, return false; } + /* Try NVWAL if available */ + if (NvwalAvail && currentSource != XLOG_FROM_NVWAL) + { + currentSource = XLOG_FROM_NVWAL; + break; + } + /* * Not in standby mode, and we've now tried the archive * and pg_wal. @@ -12276,11 +13106,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * Move to XLOG_FROM_STREAM state, and set to start a * walreceiver if necessary. */ - currentSource = XLOG_FROM_STREAM; + if (currentSource == XLOG_FROM_NVWAL) + currentSource = XLOG_FROM_STREAM_NVWAL; + else + currentSource = XLOG_FROM_STREAM; startWalReceiver = true; break; case XLOG_FROM_STREAM: + case XLOG_FROM_STREAM_NVWAL: /* * Failure while streaming. Most likely, we got here @@ -12386,6 +13220,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, { case XLOG_FROM_ARCHIVE: case XLOG_FROM_PG_WAL: + case XLOG_FROM_NVWAL: /* * WAL receiver must not be running when reading WAL from @@ -12403,6 +13238,59 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (randAccess) curFileTLI = 0; + /* Try to load from NVWAL */ + if (currentSource == XLOG_FROM_NVWAL) + { + XLogRecPtr discardedUpTo; + + Assert(NvwalAvail); + + /* + * Check if the target page exists on NVWAL. Note that + * RecPtr points to the end of the target chunk. + * + * TODO need ControlFileLock? + */ + discardedUpTo = ControlFile->discardedUpTo; + if (discardedUpTo != InvalidXLogRecPtr && + discardedUpTo < RecPtr && + RecPtr <= discardedUpTo + NvwalSize) + { + /* Report recovery progress in PS display */ + set_ps_display("recovering NVWAL"); + elog(DEBUG1, "recovering NVWAL"); + + /* Track source of data and receipt time */ + readSource = XLOG_FROM_NVWAL; + XLogReceiptSource = XLOG_FROM_NVWAL; + XLogReceiptTime = GetCurrentTimestamp(); + + /* + * Construct expectedTLEs. This is necessary to + * recover only from NVWAL because its filename does + * not have any TLI information. + */ + if (!expectedTLEs) + { + TimeLineHistoryEntry *entry; + + entry = palloc(sizeof(TimeLineHistoryEntry)); + entry->tli = recoveryTargetTLI; + entry->begin = entry->end = InvalidXLogRecPtr; + + expectedTLEs = list_make1(entry); + elog(DEBUG1, "expectedTLEs: [%u]", + (uint32) recoveryTargetTLI); + } + + return true; + } + + /* Target page does not exist on NVWAL */ + lastSourceFailed = true; + break; + } + /* * Try to restore the file from archive, or read an existing * file from pg_wal. @@ -12420,6 +13308,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, break; case XLOG_FROM_STREAM: + case XLOG_FROM_STREAM_NVWAL: { bool havedata; @@ -12544,21 +13433,34 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * info is set correctly and XLogReceiptTime isn't * changed. */ - if (readFile < 0) + if (currentSource == XLOG_FROM_STREAM_NVWAL) { if (!expectedTLEs) expectedTLEs = readTimeLineHistory(receiveTLI); - readFile = XLogFileRead(readSegNo, PANIC, - receiveTLI, - XLOG_FROM_STREAM, false); - Assert(readFile >= 0); + + /* TODO is it ok to return, not to break switch? */ + readSource = XLOG_FROM_STREAM_NVWAL; + XLogReceiptSource = XLOG_FROM_STREAM_NVWAL; + return true; } else { - /* just make sure source info is correct... */ - readSource = XLOG_FROM_STREAM; - XLogReceiptSource = XLOG_FROM_STREAM; - return true; + if (readFile < 0) + { + if (!expectedTLEs) + expectedTLEs = readTimeLineHistory(receiveTLI); + readFile = XLogFileRead(readSegNo, PANIC, + receiveTLI, + XLOG_FROM_STREAM, false); + Assert(readFile >= 0); + } + else + { + /* just make sure source info is correct... */ + readSource = XLOG_FROM_STREAM; + XLogReceiptSource = XLOG_FROM_STREAM; + return true; + } } break; } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index cb76be4f46..77f629fda2 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1066,11 +1066,24 @@ WALRead(XLogReaderState *state, char *p; XLogRecPtr recptr; Size nbytes; +#ifndef FRONTEND + XLogRecPtr recptr_nvwal = 0; + Size nbytes_nvwal = 0; +#endif p = buf; recptr = startptr; nbytes = count; +#ifndef FRONTEND + /* Try to load records directly from NVWAL if used */ + if (IsNvwalAvail()) + { + nbytes_nvwal = GetLoadableSizeFromNvwal(startptr, count, &recptr_nvwal); + nbytes = count - nbytes_nvwal; + } +#endif + while (nbytes > 0) { uint32 startoff; @@ -1138,6 +1151,17 @@ WALRead(XLogReaderState *state, p += readbytes; } +#ifndef FRONTEND + if (IsNvwalAvail()) + { + if (!CopyXLogRecordsFromNVWAL(p, nbytes_nvwal, recptr_nvwal)) + { + /* TODO graceful error handling */ + elog(PANIC, "some records on NVWAL had been discarded"); + } + } +#endif + return true; } diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index e73639df74..4c594e915f 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -272,6 +272,9 @@ main(int argc, char *argv[]) ControlFile->checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile->checkPointCopy.newestCommitTsXid); + printf(_("discarded Up To: %X/%X\n"), + (uint32) (ControlFile->discardedUpTo >> 32), + (uint32) ControlFile->discardedUpTo); printf(_("Time of latest checkpoint: %s\n"), ckpttime_str); printf(_("Fake LSN counter for unlogged rels: %X/%X\n"), diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0a05e79524..75433a6dc0 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -351,6 +351,14 @@ extern void XLogRequestWalReceiverReply(void); extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); +extern bool IsNvwalAvail(void); +extern XLogRecPtr GetLoadableSizeFromNvwal(XLogRecPtr target, + Size count, + XLogRecPtr *nvwalptr); +extern bool CopyXLogRecordsFromNVWAL(char *buf, + Size count, + XLogRecPtr startptr); + /* * Routines to start, stop, and get status of a base backup. */ diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index de5670e538..fe71992a69 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -22,7 +22,7 @@ /* Version identifier for this pg_control format */ -#define PG_CONTROL_VERSION 1300 +#define PG_CONTROL_VERSION 1301 /* Nonce key length, see below */ #define MOCK_AUTH_NONCE_LEN 32 @@ -132,6 +132,21 @@ typedef struct ControlFileData XLogRecPtr unloggedLSN; /* current fake LSN value, for unlogged rels */ + /* + * Used for non-volatile WAL buffer (NVWAL). + * + * discardedUpTo is updated to the oldest LSN in the NVWAL when either a + * checkpoint or a restartpoint is completed successfully, or whole the + * NVWAL is filled with WAL records and a new record is being inserted. + * This field tells that the NVWAL contains WAL records in the range of + * [discardedUpTo, discardedUpTo+S), where S is the size of the NVWAL. + * Note that the WAL records whose LSN are less than discardedUpTo would + * remain in WAL segment files and be needed for recovery. + * + * It is set to zero when NVWAL is not used. + */ + XLogRecPtr discardedUpTo; + /* * These two values determine the minimum point we must recover up to * before starting up: diff --git a/src/test/regress/expected/misc_functions.out b/src/test/regress/expected/misc_functions.out index d3acb98d04..bbd47e1663 100644 --- a/src/test/regress/expected/misc_functions.out +++ b/src/test/regress/expected/misc_functions.out @@ -142,14 +142,17 @@ HINT: No function matches the given name and argument types. You might need to select setting as segsize from pg_settings where name = 'wal_segment_size' \gset -select count(*) > 0 as ok from pg_ls_waldir(); +select setting as nvwal_path +from pg_settings where name = 'nvwal_path' +\gset +select count(*) > 0 or :'nvwal_path' <> '' as ok from pg_ls_waldir(); ok ---- t (1 row) -- Test ProjectSet as well as FunctionScan -select count(*) > 0 as ok from (select pg_ls_waldir()) ss; +select count(*) > 0 or :'nvwal_path' <> '' as ok from (select pg_ls_waldir()) ss; ok ---- t @@ -161,14 +164,15 @@ select * from pg_ls_waldir() limit 0; ------+------+-------------- (0 rows) -select count(*) > 0 as ok from (select * from pg_ls_waldir() limit 1) ss; +select count(*) > 0 or :'nvwal_path' <> '' as ok from (select * from pg_ls_waldir() limit 1) ss; ok ---- t (1 row) -select (w).size = :segsize as ok -from (select pg_ls_waldir() w) ss where length((w).name) = 24 limit 1; +select count(*) > 0 or :'nvwal_path' <> '' as ok from + (select * from pg_ls_waldir() w + where length((w).name) = 24 and (w).size = :segsize limit 1) ss; ok ---- t diff --git a/src/test/regress/sql/misc_functions.sql b/src/test/regress/sql/misc_functions.sql index 094e8f8296..09c326775d 100644 --- a/src/test/regress/sql/misc_functions.sql +++ b/src/test/regress/sql/misc_functions.sql @@ -39,15 +39,19 @@ SELECT num_nulls(); select setting as segsize from pg_settings where name = 'wal_segment_size' \gset +select setting as nvwal_path +from pg_settings where name = 'nvwal_path' +\gset -select count(*) > 0 as ok from pg_ls_waldir(); +select count(*) > 0 or :'nvwal_path' <> '' as ok from pg_ls_waldir(); -- Test ProjectSet as well as FunctionScan -select count(*) > 0 as ok from (select pg_ls_waldir()) ss; +select count(*) > 0 or :'nvwal_path' <> '' as ok from (select pg_ls_waldir()) ss; -- Test not-run-to-completion cases. select * from pg_ls_waldir() limit 0; -select count(*) > 0 as ok from (select * from pg_ls_waldir() limit 1) ss; -select (w).size = :segsize as ok -from (select pg_ls_waldir() w) ss where length((w).name) = 24 limit 1; +select count(*) > 0 or :'nvwal_path' <> '' as ok from (select * from pg_ls_waldir() limit 1) ss; +select count(*) > 0 or :'nvwal_path' <> '' as ok from + (select * from pg_ls_waldir() w + where length((w).name) = 24 and (w).size = :segsize limit 1) ss; select count(*) >= 0 as ok from pg_ls_archive_statusdir(); -- 2.17.1