From 384d8bcfe984df9620310c091fadc535c2f47024 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Tue, 14 Apr 2020 11:11:37 +0530 Subject: [PATCH v20 02/12] Issue individual invalidations with wal_level=logical When wal_level=logical, write individual invalidations into WAL so that decoding can use this information. We still add the invalidations to the cache, and write them to WAL at commit time in RecordTransactionCommit(). This uses the existing XLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resource manager (see LogStandbyInvalidations for details). So existing code relying on those invalidations (e.g. redo) does not need to be changed. The individual invalidations are written are written using a new xlog record type XLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource manager. See LogLogicalInvalidations for details. These new xlog records are ignored by existing redo procedures, which still rely on the invalidations written to commit records. The invalidations are decoded and added as a new ReorderBufferChange type (REORDER_BUFFER_CHANGE_INVALIDATION), and then executed during replay, unlike the existing invalidations (which are either decoded as part of commit record, or executed immediately during decoding and not added to reorderbuffer at all). LogStandbyInvalidations was accumulating all the invalidations in memory, and then only wrote them once at commit time, which may reduce the performance impact by amortizing the overhead and deduplicating the invalidations. --- src/backend/access/rmgrdesc/xactdesc.c | 40 +++++++ src/backend/access/transam/xact.c | 7 ++ src/backend/replication/logical/decode.c | 16 +++ .../replication/logical/reorderbuffer.c | 104 +++++++++++++++--- src/backend/utils/cache/inval.c | 49 +++++++++ src/include/access/xact.h | 13 ++- src/include/replication/reorderbuffer.h | 11 ++ 7 files changed, 226 insertions(+), 14 deletions(-) diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index fbc5942578..17c06f7062 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -20,6 +20,9 @@ #include "storage/standbydefs.h" #include "utils/timestamp.h" +static void xact_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs); + /* * Parse the WAL format of an xact commit and abort records into an easier to * understand format. @@ -396,6 +399,12 @@ xact_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "xtop %u: ", xlrec->xtop); xact_desc_assignment(buf, xlrec); } + else if (info == XLOG_XACT_INVALIDATIONS) + { + xl_xact_invalidations *xlrec = (xl_xact_invalidations *) rec; + + xact_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs); + } } const char * @@ -423,7 +432,38 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; + case XLOG_XACT_INVALIDATIONS: + id = "INVALIDATION"; + break; } return id; } + +static void +xact_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs) +{ + int i; + + appendStringInfoString(buf, "; inval msgs:"); + for (i = 0; i < nmsgs; i++) + { + SharedInvalidationMessage *msg = &msgs[i]; + + if (msg->id >= 0) + appendStringInfo(buf, " catcache %d", msg->id); + else if (msg->id == SHAREDINVALCATALOG_ID) + appendStringInfo(buf, " catalog %u", msg->cat.catId); + else if (msg->id == SHAREDINVALRELCACHE_ID) + appendStringInfo(buf, " relcache %u", msg->rc.relId); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALSMGR_ID) + appendStringInfoString(buf, " smgr"); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALRELMAP_ID) + appendStringInfo(buf, " relmap db %u", msg->rm.dbId); + else + appendStringInfo(buf, " unrecognized id %d", msg->id); + } +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c2604bb514..8e6b1a6ebc 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -6020,6 +6020,13 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_INVALIDATIONS) + { + /* + * XXX we do ignore this for now, what matters are invalidations + * written into the commit record. + */ + } else elog(PANIC, "xact_redo: unknown op code %u", info); } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 122c581d0f..69c1f45ef6 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -281,6 +281,22 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } case XLOG_XACT_ASSIGNMENT: break; + case XLOG_XACT_INVALIDATIONS: + { + TransactionId xid; + xl_xact_invalidations *invals; + + xid = XLogRecGetXid(r); + invals = (xl_xact_invalidations *) XLogRecGetData(r); + + Assert(TransactionIdIsValid(xid)); + ReorderBufferAddInvalidation(reorder, xid, buf->origptr, + invals->nmsgs, invals->msgs); + + + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + } + break; case XLOG_XACT_PREPARE: /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4594cf9509..b889edf461 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -220,7 +220,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); -static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs); /* * --------------------------------------- @@ -455,6 +455,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) pfree(change->data.msg.message); change->data.msg.message = NULL; break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + if (change->data.inval.invalidations) + pfree(change->data.inval.invalidations); + change->data.inval.invalidations = NULL; + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: if (change->data.snapshot) { @@ -1814,17 +1819,24 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, TeardownHistoricSnapshot(false); SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); - - /* - * Every time the CommandId is incremented, we could - * see new catalog contents, so execute all - * invalidations. - */ - ReorderBufferExecuteInvalidations(rb, txn); } break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + + /* + * Execute the invalidation messages locally. + * + * XXX Do we need to care about relcacheInitFileInval and + * the other fields added to ReorderBufferChange, or just + * about the message itself? + */ + ReorderBufferExecuteInvalidations( + change->data.inval.ninvalidations, + change->data.inval.invalidations); + break; + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; @@ -1866,7 +1878,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -1892,7 +1905,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2202,6 +2216,33 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, txn->ntuplecids++; } +/* + * Setup the invalidation of the toplevel transaction. + */ +void +ReorderBufferAddInvalidation(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, int nmsgs, + SharedInvalidationMessage *msgs) +{ + MemoryContext oldcontext; + ReorderBufferChange *change; + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.ninvalidations = nmsgs; + change->data.inval.invalidations = (SharedInvalidationMessage *) + MemoryContextAlloc(rb->context, + sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueChange(rb, xid, lsn, change); + + MemoryContextSwitchTo(oldcontext); +} + /* * Setup the invalidation of the toplevel transaction. * @@ -2234,12 +2275,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, * in the changestream but we don't know which those are. */ static void -ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs) { int i; - for (i = 0; i < txn->ninvalidations; i++) - LocalExecuteInvalidationMessage(&txn->invalidations[i]); + for (i = 0; i < nmsgs; i++) + LocalExecuteInvalidationMessage(&msgs[i]); } /* @@ -2589,6 +2630,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + char *data; + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + sz += inval_size; + + ReorderBufferSerializeReserve(rb, sz); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + memcpy(data, change->data.inval.invalidations, inval_size); + data += inval_size; + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: @@ -2736,6 +2795,11 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } + case REORDER_BUFFER_CHANGE_INVALIDATION: + sz += sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; @@ -3002,6 +3066,20 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + change->data.inval.invalidations = + MemoryContextAlloc(rb->context, inval_size); + + /* read the message */ + memcpy(change->data.inval.invalidations, data, inval_size); + data += inval_size; + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 591dd33be6..cba5b6c64b 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -85,6 +85,12 @@ * worth trying to avoid sending such inval traffic in the future, if those * problems can be overcome cheaply. * + * When wal_level=logical, write invalidations into WAL at each command end to + * support the decoding of the in-progress transaction. As of now it was + * enough to log invalidation only at commit because we are only decoding the + * transaction at the commit time. We only need to log the catalog cache and + * relcache invalidation. There can not be any active MVCC scan in logical + * decoding so we don't need to log the snapshot invalidation. * * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -104,6 +110,7 @@ #include "catalog/pg_constraint.h" #include "miscadmin.h" #include "storage/sinval.h" +#include "storage/standby.h" #include "storage/smgr.h" #include "utils/catcache.h" #include "utils/inval.h" @@ -210,6 +217,8 @@ static struct RELCACHECALLBACK static int relcache_callback_count = 0; +static void LogLogicalInvalidations(void); + /* ---------------------------------------------------------------- * Invalidation list support functions * @@ -1092,6 +1101,9 @@ CommandEndInvalidationMessages(void) if (transInvalInfo == NULL) return; + if (XLogLogicalInfoActive()) + LogLogicalInvalidations(); + ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs, LocalExecuteInvalidationMessage); AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, @@ -1501,3 +1513,40 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue) i = ccitem->link - 1; } } + +/* + * Emit WAL for invalidations. + */ +static void +LogLogicalInvalidations() +{ + xl_xact_invalidations xlrec; + SharedInvalidationMessage *invalMessages; + int nmsgs = 0; + + if (transInvalInfo->CurrentCmdInvalidMsgs.cclist) + { + ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs, + MakeSharedInvalidMessagesArray); + invalMessages = SharedInvalidMessagesArray; + nmsgs = numSharedInvalidMessagesArray; + SharedInvalidMessagesArray = NULL; + numSharedInvalidMessagesArray = 0; + } + + if (nmsgs > 0) + { + /* prepare record */ + memset(&xlrec, 0, MinSizeOfXactInvalidations); + xlrec.nmsgs = nmsgs; + + /* perform insertion */ + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvalidations); + XLogRegisterData((char *) invalMessages, + nmsgs * sizeof(SharedInvalidationMessage)); + XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS); + + pfree(invalMessages); + } +} diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 8645b3816c..b822c5e4b2 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_INVALIDATIONS 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ @@ -197,6 +197,17 @@ typedef struct xl_xact_assignment #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) +/* + * Invalidations logged with wal_level=logical. + */ +typedef struct xl_xact_invalidations +{ + int nmsgs; /* number of shared inval msgs */ + SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER]; +} xl_xact_invalidations; + +#define MinSizeOfXactInvalidations offsetof(xl_xact_invalidations, msgs) + /* * Commit and abort records can contain a lot of information. But a large * portion of the records won't need all possible pieces of information. So we diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 626ecf4dc9..af35287896 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -57,6 +57,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE, + REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, @@ -149,6 +150,14 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; + + /* Invalidation. */ + struct + { + uint32 ninvalidations; /* Number of messages */ + SharedInvalidationMessage *invalidations; /* invalidation + * message */ + } inval; } data; /* @@ -459,6 +468,8 @@ void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr ls void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid); +void ReorderBufferAddInvalidation(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + int nmsgs, SharedInvalidationMessage *msgs); void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, -- 2.23.0