From 379580dc600c079b8de3cc2f392376ad46429758 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 8 Sep 2016 20:34:06 +0300 Subject: [PATCH 2/3] Use larger read buffers in logtape. This makes the access pattern appear more sequential to the OS, making it more likely that the OS will do read-head for use. It will also ensure that there are more sequential blocks available when writing, because we can free more blocks in the underlying file at once. Sequential I/O is much cheaper than random I/O. We used to do pre-reading from each tape, in tuplesort.c, for the same reasons. But it seems simpler to do it in logtape.c, reading the raw data into larger a buffer, than converting every tuple to SortTuple format when pre-reading, like tuplesort.c used to do. --- src/backend/utils/sort/logtape.c | 134 +++++++++++++++++++++++++++++++------ src/backend/utils/sort/tuplesort.c | 35 +++++++++- src/include/utils/logtape.h | 1 + 3 files changed, 147 insertions(+), 23 deletions(-) diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 7745207..05d7697 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -131,9 +131,12 @@ typedef struct LogicalTape * reading. */ char *buffer; /* physical buffer (separately palloc'd) */ + int buffer_size; /* allocated size of the buffer */ long curBlockNumber; /* this block's logical blk# within tape */ int pos; /* next read/write position in buffer */ int nbytes; /* total # of valid bytes in buffer */ + + int read_buffer_size; /* buffer size to use when reading */ } LogicalTape; /* @@ -228,6 +231,53 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer) } /* + * Read as many blocks as we can into the per-tape buffer. + * + * The caller can specify the next physical block number to read, in + * datablocknum, or -1 to fetch the next block number from the internal block. + * If datablocknum == -1, the caller must've already set curBlockNumber. + * + * Returns true if anything was read, 'false' on EOF. + */ +static bool +ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt, long datablocknum) +{ + lt->pos = 0; + lt->nbytes = 0; + + do + { + /* Fetch next block number (unless provided by caller) */ + if (datablocknum == -1) + { + datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, lt->frozen); + if (datablocknum == -1L) + break; /* EOF */ + lt->curBlockNumber++; + } + + /* Read the block */ + ltsReadBlock(lts, datablocknum, (void *) (lt->buffer + lt->nbytes)); + if (!lt->frozen) + ltsReleaseBlock(lts, datablocknum); + + if (lt->curBlockNumber < lt->numFullBlocks) + lt->nbytes += BLCKSZ; + else + { + /* EOF */ + lt->nbytes += lt->lastBlockBytes; + break; + } + + /* Advance to next block, if we have buffer space left */ + datablocknum = -1; + } while (lt->nbytes < lt->buffer_size); + + return (lt->nbytes > 0); +} + +/* * qsort comparator for sorting freeBlocks[] into decreasing order. */ static int @@ -546,6 +596,8 @@ LogicalTapeSetCreate(int ntapes) lt->numFullBlocks = 0L; lt->lastBlockBytes = 0; lt->buffer = NULL; + lt->buffer_size = 0; + lt->read_buffer_size = BLCKSZ; lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; @@ -628,7 +680,10 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, /* Allocate data buffer and first indirect block on first write */ if (lt->buffer == NULL) + { lt->buffer = (char *) palloc(BLCKSZ); + lt->buffer_size = BLCKSZ; + } if (lt->indirect == NULL) { lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock)); @@ -636,6 +691,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, lt->indirect->nextup = NULL; } + Assert(lt->buffer_size == BLCKSZ); while (size > 0) { if (lt->pos >= BLCKSZ) @@ -709,18 +765,19 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) Assert(lt->frozen); datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); } + + /* Allocate a read buffer */ + if (lt->buffer) + pfree(lt->buffer); + lt->buffer = palloc(lt->read_buffer_size); + lt->buffer_size = lt->read_buffer_size; + /* Read the first block, or reset if tape is empty */ lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; if (datablocknum != -1L) - { - ltsReadBlock(lts, datablocknum, (void *) lt->buffer); - if (!lt->frozen) - ltsReleaseBlock(lts, datablocknum); - lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ? - BLCKSZ : lt->lastBlockBytes; - } + ltsReadFillBuffer(lts, lt, datablocknum); } else { @@ -754,6 +811,13 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; + + if (lt->buffer) + { + pfree(lt->buffer); + lt->buffer = NULL; + lt->buffer_size = 0; + } } } @@ -779,20 +843,8 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum, if (lt->pos >= lt->nbytes) { /* Try to load more data into buffer. */ - long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, - lt->frozen); - - if (datablocknum == -1L) + if (!ltsReadFillBuffer(lts, lt, -1)) break; /* EOF */ - lt->curBlockNumber++; - lt->pos = 0; - ltsReadBlock(lts, datablocknum, (void *) lt->buffer); - if (!lt->frozen) - ltsReleaseBlock(lts, datablocknum); - lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ? - BLCKSZ : lt->lastBlockBytes; - if (lt->nbytes <= 0) - break; /* EOF (possible here?) */ } nthistime = lt->nbytes - lt->pos; @@ -842,6 +894,22 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum) lt->writing = false; lt->frozen = true; datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true); + + /* + * The seek and backspace functions assume a single block read buffer. + * That's OK with current usage. A larger buffer is helpful to make the + * read pattern of the backing file look more sequential to the OS, when + * we're reading from multiple tapes. But at the end of a sort, when a + * tape is frozen, we only read from a single tape anyway. + */ + if (!lt->buffer || lt->buffer_size != BLCKSZ) + { + if (lt->buffer) + pfree(lt->buffer); + lt->buffer = palloc(BLCKSZ); + lt->buffer_size = BLCKSZ; + } + /* Read the first block, or reset if tape is empty */ lt->curBlockNumber = 0L; lt->pos = 0; @@ -875,6 +943,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size) Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(lt->frozen); + Assert(lt->buffer_size == BLCKSZ); /* * Easy case for seek within current block. @@ -941,6 +1010,7 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, lt = <s->tapes[tapenum]; Assert(lt->frozen); Assert(offset >= 0 && offset <= BLCKSZ); + Assert(lt->buffer_size == BLCKSZ); /* * Easy case for seek within current block. @@ -1000,6 +1070,9 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum, { LogicalTape *lt; + /* With a larger buffer, 'pos' wouldn't be the same as offset within page */ + Assert(lt->buffer_size == BLCKSZ); + Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; *blocknum = lt->curBlockNumber; @@ -1014,3 +1087,24 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts) { return lts->nFileBlocks; } + +/* + * Set buffer size to use, when reading from given tape. + */ +void +LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem) +{ + LogicalTape *lt; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = <s->tapes[tapenum]; + + /* + * The buffer size must be a multiple of BLCKSZ in size, so round the + * given value down to nearest BLCKSZ. Make sure we have at least one page. + */ + if (avail_mem < BLCKSZ) + avail_mem = BLCKSZ; + avail_mem -= avail_mem % BLCKSZ; + lt->read_buffer_size = avail_mem; +} diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index b9fb99c..dc35fcf 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -2487,6 +2487,8 @@ mergeruns(Tuplesortstate *state) svDummy; char *p; int i; + int per_tape, cutoff; + long avail_blocks; Assert(state->status == TSS_BUILDRUNS); Assert(state->memtupcount == 0); @@ -2535,15 +2537,17 @@ mergeruns(Tuplesortstate *state) USEMEM(state, state->memtupsize * sizeof(SortTuple)); /* - * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to - * track memory usage. + * If we had fewer runs than tapes, refund buffers for tapes that were never + * allocated. */ - state->batchUsed = true; + if (state->currentRun < state->maxTapes) + FREEMEM(state, (state->maxTapes - state->currentRun) * TAPE_BUFFER_OVERHEAD); /* Initialize the merge tuple buffer arena. */ state->batchMemoryBegin = palloc((state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE); state->batchMemoryEnd = state->batchMemoryBegin + (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE; state->freeBufferHead = (MergeTupleBuffer *) state->batchMemoryBegin; + USEMEM(state, (state->maxTapes + 1) * MERGETUPLEBUFFER_SIZE); p = state->batchMemoryBegin; for (i = 0; i < state->maxTapes; i++) @@ -2553,6 +2557,31 @@ mergeruns(Tuplesortstate *state) } ((MergeTupleBuffer *) p)->nextfree = NULL; + /* + * Use all the spare memory we have available for read buffers. Divide it + * memory evenly among all the tapes. + */ + avail_blocks = state->availMem / BLCKSZ; + per_tape = avail_blocks / state->maxTapes; + cutoff = avail_blocks % state->maxTapes; + if (per_tape == 0) + { + per_tape = 1; + cutoff = 0; + } + for (tapenum = 0; tapenum < state->maxTapes; tapenum++) + { + LogicalTapeAssignReadBufferSize(state->tapeset, tapenum, + (per_tape + (tapenum < cutoff ? 1 : 0)) * BLCKSZ); + } + USEMEM(state, avail_blocks * BLCKSZ); + + /* + * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism to + * track memory usage of indivitual tuples. + */ + state->batchUsed = true; + /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < state->tapeRange; tapenum++) LogicalTapeRewind(state->tapeset, tapenum, false); diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index fa1e992..03d0a6f 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -39,6 +39,7 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset); extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset); +extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t bufsize); extern long LogicalTapeSetBlocks(LogicalTapeSet *lts); #endif /* LOGTAPE_H */ -- 2.9.3