From 71dfbc7c05b8b65fe6480a01b10079fafc698404 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 28 Jun 2019 18:09:01 +0900 Subject: [PATCH 7/9] Make Reorderbuffer encrypt spilled out file. --- src/backend/postmaster/pgstat.c | 6 - src/backend/replication/logical/reorderbuffer.c | 457 ++++++++++-------------- src/include/pgstat.h | 2 - src/include/replication/reorderbuffer.h | 4 - 4 files changed, 197 insertions(+), 272 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index b4f2b28..5478689 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4004,12 +4004,6 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_RELATION_MAP_WRITE: event_name = "RelationMapWrite"; break; - case WAIT_EVENT_REORDER_BUFFER_READ: - event_name = "ReorderBufferRead"; - break; - case WAIT_EVENT_REORDER_BUFFER_WRITE: - event_name = "ReorderBufferWrite"; - break; case WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ: event_name = "ReorderLogicalMappingRead"; break; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e7c32f2..844b67d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -71,6 +71,7 @@ #include "replication/slot.h" #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" +#include "storage/buffile.h" #include "storage/fd.h" #include "storage/sinval.h" #include "utils/builtins.h" @@ -109,7 +110,7 @@ typedef struct ReorderBufferIterTXNEntry XLogRecPtr lsn; ReorderBufferChange *change; ReorderBufferTXN *txn; - int fd; + TransientBufFile *file; XLogSegNo segno; } ReorderBufferIterTXNEntry; @@ -192,11 +193,17 @@ static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTX static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, - int fd, ReorderBufferChange *change); + TransientBufFile *file, ReorderBufferChange *change); +static void ReorderBufferWriteData(TransientBufFile *file, void *ptr, size_t size, + ReorderBufferTXN *txn); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno); + TransientBufFile **file, XLogSegNo *segno); static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, - char *change); + TransientBufFile **file); +static ReorderBufferTupleBuf *ReorderBufferRestoreTuple(ReorderBuffer *rb, + TransientBufFile *file); +static void ReorderBufferReadData(TransientBufFile *file, void *ptr, size_t size, + bool *no_data_p); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, @@ -267,9 +274,6 @@ ReorderBufferAllocate(void) buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; - buffer->outbuf = NULL; - buffer->outbufsize = 0; - buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; dlist_init(&buffer->toplevel_by_lsn); @@ -988,7 +992,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) for (off = 0; off < state->nr_txns; off++) { - state->entries[off].fd = -1; + state->entries[off].file = NULL; state->entries[off].segno = 0; } @@ -1013,7 +1017,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, txn); - ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd, + ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file, &state->entries[off].segno); } @@ -1043,7 +1047,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, cur_txn); ReorderBufferRestoreChanges(rb, cur_txn, - &state->entries[off].fd, + &state->entries[off].file, &state->entries[off].segno); } cur_change = dlist_head_element(ReorderBufferChange, node, @@ -1124,7 +1128,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) dlist_delete(&change->node); dlist_push_tail(&state->old_change, &change->node); - if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd, + if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { /* successfully restored changes from disk */ @@ -1163,8 +1167,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, for (off = 0; off < state->nr_txns; off++) { - if (state->entries[off].fd != -1) - CloseTransientFile(state->entries[off].fd); + if (state->entries[off].file) + BufFileCloseTransient(state->entries[off].file); } /* free memory we might have "leaked" in the last *Next call */ @@ -2212,24 +2216,6 @@ ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) */ /* - * Ensure the IO buffer is >= sz. - */ -static void -ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz) -{ - if (!rb->outbufsize) - { - rb->outbuf = MemoryContextAlloc(rb->context, sz); - rb->outbufsize = sz; - } - else if (rb->outbufsize < sz) - { - rb->outbuf = repalloc(rb->outbuf, sz); - rb->outbufsize = sz; - } -} - -/* * Check whether the transaction tx should spill its data to disk. */ static void @@ -2254,7 +2240,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { dlist_iter subtxn_i; dlist_mutable_iter change_i; - int fd = -1; + TransientBufFile *file = NULL; XLogSegNo curOpenSegNo = 0; Size spilled = 0; @@ -2281,13 +2267,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * store in segment in which it belongs by start lsn, don't split over * multiple segments tho */ - if (fd == -1 || + if (file == NULL || !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size)) { char path[MAXPGPATH]; - if (fd != -1) - CloseTransientFile(fd); + if (file) + BufFileCloseTransient(file); XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size); @@ -2299,16 +2285,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) curOpenSegNo); /* open segment, create it if necessary */ - fd = OpenTransientFile(path, - O_CREAT | O_WRONLY | O_APPEND | PG_BINARY); - - if (fd < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", path))); + file = BufFileOpenTransient(path, + O_CREAT | O_WRONLY | O_APPEND | PG_BINARY); } - ReorderBufferSerializeChange(rb, txn, fd, change); + ReorderBufferSerializeChange(rb, txn, file, change); dlist_delete(&change->node); ReorderBufferReturnChange(rb, change); @@ -2320,8 +2301,8 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->nentries_mem = 0; txn->serialized = true; - if (fd != -1) - CloseTransientFile(fd); + if (file) + BufFileCloseTransient(file); } /* @@ -2329,15 +2310,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) */ static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, - int fd, ReorderBufferChange *change) + TransientBufFile *file, ReorderBufferChange *change) { - ReorderBufferDiskChange *ondisk; + ReorderBufferDiskChange hdr; Size sz = sizeof(ReorderBufferDiskChange); - ReorderBufferSerializeReserve(rb, sz); - - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); + memcpy((char *) &hdr + offsetof(ReorderBufferDiskChange, change), + change, sizeof(ReorderBufferChange)); switch (change->action) { @@ -2347,7 +2326,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: { - char *data; ReorderBufferTupleBuf *oldtup, *newtup; Size oldlen = 0; @@ -2370,66 +2348,55 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, sz += newlen; } - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, sizeof(ReorderBufferDiskChange), + txn); if (oldlen) { - memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - memcpy(data, oldtup->tuple.t_data, oldlen); - data += oldlen; + ReorderBufferWriteData(file, &oldtup->tuple, + sizeof(HeapTupleData), txn); + ReorderBufferWriteData(file, oldtup->tuple.t_data, oldlen, + txn); } if (newlen) { - memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - memcpy(data, newtup->tuple.t_data, newlen); - data += newlen; + ReorderBufferWriteData(file, &newtup->tuple, + sizeof(HeapTupleData), txn); + ReorderBufferWriteData(file, newtup->tuple.t_data, newlen, + txn); } break; } case REORDER_BUFFER_CHANGE_MESSAGE: { - char *data; Size prefix_size = strlen(change->data.msg.prefix) + 1; sz += prefix_size + change->data.msg.message_size + sizeof(Size) + sizeof(Size); - ReorderBufferSerializeReserve(rb, sz); - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, + sizeof(ReorderBufferDiskChange), + txn); /* write the prefix including the size */ - memcpy(data, &prefix_size, sizeof(Size)); - data += sizeof(Size); - memcpy(data, change->data.msg.prefix, - prefix_size); - data += prefix_size; + ReorderBufferWriteData(file, &prefix_size, sizeof(Size), txn); + ReorderBufferWriteData(file, change->data.msg.prefix, + prefix_size, txn); /* write the message including the size */ - memcpy(data, &change->data.msg.message_size, sizeof(Size)); - data += sizeof(Size); - memcpy(data, change->data.msg.message, - change->data.msg.message_size); - data += change->data.msg.message_size; + ReorderBufferWriteData(file, &change->data.msg.message_size, + sizeof(Size), txn); + ReorderBufferWriteData(file, change->data.msg.message, + change->data.msg.message_size, txn); break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; - char *data; snap = change->data.snapshot; @@ -2438,49 +2405,37 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(TransactionId) * snap->subxcnt ; - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, + sizeof(ReorderBufferDiskChange), txn); - memcpy(data, snap, sizeof(SnapshotData)); - data += sizeof(SnapshotData); + ReorderBufferWriteData(file, snap, sizeof(SnapshotData), txn); if (snap->xcnt) - { - memcpy(data, snap->xip, - sizeof(TransactionId) * snap->xcnt); - data += sizeof(TransactionId) * snap->xcnt; - } + ReorderBufferWriteData(file, snap->xip, + sizeof(TransactionId) * snap->xcnt, + txn); if (snap->subxcnt) - { - memcpy(data, snap->subxip, - sizeof(TransactionId) * snap->subxcnt); - data += sizeof(TransactionId) * snap->subxcnt; - } + ReorderBufferWriteData(file, snap->subxip, + sizeof(TransactionId) * snap->subxcnt, + txn); break; } case REORDER_BUFFER_CHANGE_TRUNCATE: { Size size; - char *data; /* account for the OIDs of truncated relations */ size = sizeof(Oid) * change->data.truncate.nrelids; sz += size; - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - - memcpy(data, change->data.truncate.relids, size); - data += size; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, sizeof(ReorderBufferDiskChange), + txn); + ReorderBufferWriteData(file, change->data.truncate.relids, size, + txn); break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -2489,27 +2444,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* ReorderBufferChange contains everything important */ break; } +} - ondisk->size = sz; - - errno = 0; - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE); - if (write(fd, rb->outbuf, ondisk->size) != ondisk->size) - { - int save_errno = errno; - - CloseTransientFile(fd); - - /* if write didn't set errno, assume problem is no disk space */ - errno = save_errno ? save_errno : ENOSPC; +/* + * Wrapper for BufFileWriteTransient() that raises ERROR if the whole chunk + * was not written. XXX Should this be a macro? + */ +static void +ReorderBufferWriteData(TransientBufFile *file, void *ptr, size_t size, + ReorderBufferTXN *txn) +{ + if (BufFileWriteTransient(file, ptr, size) != size) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to data file for XID %u: %m", txn->xid))); - } - pgstat_report_wait_end(); - - Assert(ondisk->change.action == change->action); } /* @@ -2517,7 +2466,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno) + TransientBufFile **file, XLogSegNo *segno) { Size restored = 0; XLogSegNo last_segno; @@ -2542,10 +2491,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, while (restored < max_changes_in_memory && *segno <= last_segno) { - int readBytes; - ReorderBufferDiskChange *ondisk; - - if (*fd == -1) + if (*file == NULL) { char path[MAXPGPATH]; @@ -2562,77 +2508,24 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, *segno); - *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); - if (*fd < 0 && errno == ENOENT) + *file = BufFileOpenTransient(path, O_RDONLY | PG_BINARY); + if (*file == NULL) { - *fd = -1; + Assert(errno == ENOENT); (*segno)++; continue; } - else if (*fd < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); } - /* - * Read the statically sized part of a change which has information - * about the total size. If we couldn't read a record, we're at the - * end of this file. - */ - ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); - - /* eof */ - if (readBytes == 0) + ReorderBufferRestoreChange(rb, txn, file); + if (*file) + restored++; + else { - CloseTransientFile(*fd); - *fd = -1; + /* No data could be restored. */ (*segno)++; continue; } - else if (readBytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); - else if (readBytes != sizeof(ReorderBufferDiskChange)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", - readBytes, - (uint32) sizeof(ReorderBufferDiskChange)))); - - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - - ReorderBufferSerializeReserve(rb, - sizeof(ReorderBufferDiskChange) + ondisk->size); - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), - ondisk->size - sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); - - if (readBytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); - else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", - readBytes, - (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); - - /* - * ok, read a full change from disk, now restore it into proper - * in-memory format - */ - ReorderBufferRestoreChange(rb, txn, rb->outbuf); - restored++; } return restored; @@ -2642,25 +2535,36 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * Convert change from its on-disk format to in-memory format and queue it onto * the TXN's ->changes list. * - * Note: although "data" is declared char*, at entry it points to a - * maxalign'd buffer, making it safe in most of this function to assume - * that the pointed-to data is suitably aligned for direct access. + * If no data was found in the file, close it and set *file to NULL. */ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, - char *data) + TransientBufFile **file) { - ReorderBufferDiskChange *ondisk; + ReorderBufferDiskChange ondisk; + bool no_data; ReorderBufferChange *change; - ondisk = (ReorderBufferDiskChange *) data; + /* + * Read the statically sized part of a change which has information about + * the total size. If we couldn't read a record, we're at the end of this + * file. + */ + ReorderBufferReadData(*file, &ondisk, sizeof(ReorderBufferDiskChange), + &no_data); + + /* eof */ + if (no_data) + { + BufFileCloseTransient(*file); + *file = NULL; + return; + } change = ReorderBufferGetChange(rb); /* copy static part */ - memcpy(change, &ondisk->change, sizeof(ReorderBufferChange)); - - data += sizeof(ReorderBufferDiskChange); + memcpy(change, &ondisk.change, sizeof(ReorderBufferChange)); /* restore individual stuff */ switch (change->action) @@ -2671,50 +2575,10 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: if (change->data.tp.oldtuple) - { - uint32 tuplelen = ((HeapTuple) data)->t_len; - - change->data.tp.oldtuple = - ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); - - /* restore ->tuple */ - memcpy(&change->data.tp.oldtuple->tuple, data, - sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - /* reset t_data pointer into the new tuplebuf */ - change->data.tp.oldtuple->tuple.t_data = - ReorderBufferTupleBufData(change->data.tp.oldtuple); - - /* restore tuple data itself */ - memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen); - data += tuplelen; - } + change->data.tp.oldtuple = ReorderBufferRestoreTuple(rb, *file); if (change->data.tp.newtuple) - { - /* here, data might not be suitably aligned! */ - uint32 tuplelen; - - memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len), - sizeof(uint32)); - - change->data.tp.newtuple = - ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); - - /* restore ->tuple */ - memcpy(&change->data.tp.newtuple->tuple, data, - sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - /* reset t_data pointer into the new tuplebuf */ - change->data.tp.newtuple->tuple.t_data = - ReorderBufferTupleBufData(change->data.tp.newtuple); - - /* restore tuple data itself */ - memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen); - data += tuplelen; - } + change->data.tp.newtuple = ReorderBufferRestoreTuple(rb, *file); break; case REORDER_BUFFER_CHANGE_MESSAGE: @@ -2722,44 +2586,44 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Size prefix_size; /* read prefix */ - memcpy(&prefix_size, data, sizeof(Size)); - data += sizeof(Size); + ReorderBufferReadData(*file, &prefix_size, sizeof(Size), NULL); change->data.msg.prefix = MemoryContextAlloc(rb->context, prefix_size); - memcpy(change->data.msg.prefix, data, prefix_size); + ReorderBufferReadData(*file, change->data.msg.prefix, + prefix_size, NULL); Assert(change->data.msg.prefix[prefix_size - 1] == '\0'); - data += prefix_size; /* read the message */ - memcpy(&change->data.msg.message_size, data, sizeof(Size)); - data += sizeof(Size); + ReorderBufferReadData(*file, &change->data.msg.message_size, + sizeof(Size), NULL); change->data.msg.message = MemoryContextAlloc(rb->context, change->data.msg.message_size); - memcpy(change->data.msg.message, data, - change->data.msg.message_size); - data += change->data.msg.message_size; + ReorderBufferReadData(*file, change->data.msg.message, + change->data.msg.message_size, NULL); break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { - Snapshot oldsnap; + SnapshotData oldsnap; Snapshot newsnap; Size size; - oldsnap = (Snapshot) data; + ReorderBufferReadData(*file, &oldsnap, sizeof(SnapshotData), NULL); size = sizeof(SnapshotData) + - sizeof(TransactionId) * oldsnap->xcnt + - sizeof(TransactionId) * (oldsnap->subxcnt + 0); + sizeof(TransactionId) * oldsnap.xcnt + + sizeof(TransactionId) * (oldsnap.subxcnt + 0); change->data.snapshot = MemoryContextAllocZero(rb->context, size); newsnap = change->data.snapshot; - memcpy(newsnap, data, size); + memcpy(newsnap, &oldsnap, sizeof(SnapshotData)); newsnap->xip = (TransactionId *) (((char *) newsnap) + sizeof(SnapshotData)); + ReorderBufferReadData(*file, newsnap->xip, + size - sizeof(SnapshotData), NULL); newsnap->subxip = newsnap->xip + newsnap->xcnt; newsnap->copied = true; break; @@ -2771,7 +2635,9 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, relids = ReorderBufferGetRelids(rb, change->data.truncate.nrelids); - memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid)); + ReorderBufferReadData(*file, relids, + change->data.truncate.nrelids * sizeof(Oid), + NULL); change->data.truncate.relids = relids; break; @@ -2787,6 +2653,77 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* + * Convert heap tuple from its on-disk format to in-memory format. + */ +static ReorderBufferTupleBuf * +ReorderBufferRestoreTuple(ReorderBuffer *rb, TransientBufFile *file) +{ + HeapTupleData tupdata; + uint32 tuplelen; + ReorderBufferTupleBuf *result; + + ReorderBufferReadData(file, &tupdata, sizeof(HeapTupleData), NULL); + tuplelen = tupdata.t_len; + + result = ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&result->tuple, &tupdata, sizeof(HeapTupleData)); + + /* reset t_data pointer into the new tuplebuf */ + result->tuple.t_data = ReorderBufferTupleBufData(result); + + /* restore tuple data itself */ + ReorderBufferReadData(file, result->tuple.t_data, tuplelen, NULL); + + return result; +} + +/* + * Wrapper for BufFileReadTransient() that raises ERROR if the expected amount + * of bytes was not read. + * + * If valid pointer is passed for no_data_p, set *no_data_p to indicate + * whether zero bytes was read. If NULL is passed, do not tolerate missing + * data. + */ +static void +ReorderBufferReadData(TransientBufFile *file, void *ptr, size_t size, + bool *no_data_p) +{ + int readBytes; + + /* + * Caller should not request zero bytes. This assumption simplifies + * setting of *no_data_p below. + */ + Assert(size > 0); + + if ((readBytes = BufFileReadTransient(file, ptr, size)) != size) + { + if (no_data_p) + *no_data_p = readBytes == 0; + + /* + * It is o.k. to receive exactly zero bytes if caller passed valid + * no_data_p. + */ + if (no_data_p && *no_data_p) + return; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", + readBytes, (uint32) size))); + } + else if (no_data_p) + { + /* Given that size is non-zero, readBytes must be non-zero too. */ + *no_data_p = false; + } +} + +/* * Remove all on-disk stored for the passed in transaction. */ static void diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 0a3ad3a..b923a18 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -912,8 +912,6 @@ typedef enum WAIT_EVENT_RELATION_MAP_READ, WAIT_EVENT_RELATION_MAP_SYNC, WAIT_EVENT_RELATION_MAP_WRITE, - WAIT_EVENT_REORDER_BUFFER_READ, - WAIT_EVENT_REORDER_BUFFER_WRITE, WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ, WAIT_EVENT_REPLICATION_SLOT_READ, WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 735e6d3..01fc91a 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -387,10 +387,6 @@ struct ReorderBuffer MemoryContext tup_context; XLogRecPtr current_restart_decoding_lsn; - - /* buffer for disk<->memory conversions */ - char *outbuf; - Size outbufsize; }; -- 1.8.3.1