From 95cb7a2f0ee195bb1ce74f66ba413d88b38cdb69 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 20 Mar 2020 15:03:01 +0530 Subject: [PATCH v20 01/12] Immediately WAL-log assignments The logical decoding infrastructure needs to know which top-level transaction the subxact belongs to, in order to decode all the changes. Until now that might be delayed until commit, due to the caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring incremental decoding. So we also write the assignment info into WAL immediately, as part of the next WAL record (to minimize overhead). However, we can not remove the existing XLOG_XACT_ASSIGNMENT wal as that is required for avoiding overflow in the hot standby snapshot. --- src/backend/access/transam/xact.c | 46 ++++++++++++++++++++++++ src/backend/access/transam/xloginsert.c | 22 ++++++++++-- src/backend/access/transam/xlogreader.c | 5 +++ src/backend/replication/logical/decode.c | 39 ++++++++++---------- src/include/access/xact.h | 3 ++ src/include/access/xlog.h | 1 + src/include/access/xlogreader.h | 3 ++ src/include/access/xlogrecord.h | 1 + 8 files changed, 99 insertions(+), 21 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 3984dd3e1a..c2604bb514 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -191,6 +191,7 @@ typedef struct TransactionStateData bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ bool chain; /* start a new block after this one */ + bool assigned; /* assigned to toplevel XID */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -223,6 +224,7 @@ typedef struct SerializedTransactionState static TransactionStateData TopTransactionStateData = { .state = TRANS_DEFAULT, .blockState = TBLOCK_DEFAULT, + .assigned = false, }; /* @@ -5118,6 +5120,7 @@ PushTransaction(void) GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; s->parallelModeLevel = 0; + s->assigned = false; CurrentTransactionState = s; @@ -6020,3 +6023,46 @@ xact_redo(XLogReaderState *record) else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * IsSubTransactionAssignmentPending + * + * This returns true if we are inside a valid substransaction, for which + * the assignment was not yet written to any WAL record. + */ +bool +IsSubTransactionAssignmentPending(void) +{ + if (!XLogLogicalInfoActive()) + return false; + + /* we need to be in a transaction state */ + if (!IsTransactionState()) + return false; + + /* it has to be a subtransaction */ + if (!IsSubTransaction()) + return false; + + /* the subtransaction has to have a XID assigned */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return false; + + /* and it needs to have 'assigned' */ + return !CurrentTransactionState->assigned; + +} + +/* + * MarkSubTransactionAssigned + * + * Mark the subtransaction assignment as completed. + */ +void +MarkSubTransactionAssigned(void) +{ + Assert(IsSubTransactionAssignmentPending()); + + CurrentTransactionState->assigned = true; + +} diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 4259309dba..3c49954b57 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -89,11 +89,13 @@ static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) +#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) #define HEADER_SCRATCH_SIZE \ (SizeOfXLogRecord + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ - SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin) + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ + SizeOfXLogTransactionId) /* * An array of XLogRecData structs, to hold registered data. @@ -195,6 +197,10 @@ XLogResetInsertion(void) { int i; + /* reset the subxact assignment flag (if needed) */ + if (curinsert_flags & XLOG_INCLUDE_XID) + MarkSubTransactionAssigned(); + for (i = 0; i < max_registered_block_id; i++) registered_buffers[i].in_use = false; @@ -398,7 +404,7 @@ void XLogSetRecordFlags(uint8 flags) { Assert(begininsert_called); - curinsert_flags = flags; + curinsert_flags |= flags; } /* @@ -748,6 +754,18 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, scratch += sizeof(replorigin_session_origin); } + /* followed by toplevel XID, if not already included in previous record */ + if (IsSubTransactionAssignmentPending()) + { + TransactionId xid = GetTopTransactionIdIfAny(); + + /* update the flag (later used by XLogInsertRecord) */ + XLogSetRecordFlags(XLOG_INCLUDE_XID); + *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; + memcpy(scratch, &xid, sizeof(TransactionId)); + scratch += sizeof(TransactionId); + } + /* followed by main data, if any */ if (mainrdata_len > 0) { diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 79ff976474..7b5257fe81 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1189,6 +1189,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) state->decoded_record = record; state->record_origin = InvalidRepOriginId; + state->toplevel_xid = InvalidTransactionId; ptr = (char *) record; ptr += SizeOfXLogRecord; @@ -1227,6 +1228,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) { COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); } + else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) + { + COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + } else if (block_id <= XLR_MAX_BLOCK_ID) { /* XLogRecordBlockHeader */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c2e5e3abf8..122c581d0f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -93,12 +93,28 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) { - XLogRecordBuffer buf; + XLogRecordBuffer buf; + TransactionId txid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; buf.record = record; + txid = XLogRecGetTopXid(record); + + /* + * If the toplevel xid is valid, we need to assign the subxact to the + * toplevel xact. We need to do this for all records, hence we do it before + * the switch. + */ + if (TransactionIdIsValid(txid)) + { + ReorderBufferAssignChild(ctx->reorder, + txid, + record->decoded_record->xl_xid, + buf.origptr); + } + /* cast so we get a warning when new rmgrs are added */ switch ((RmgrId) XLogRecGetRmid(record)) { @@ -217,12 +233,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * If the snapshot isn't yet fully built, we cannot decode anything, so * bail out. * - * However, it's critical to process XLOG_XACT_ASSIGNMENT records even + * However, it's critical to process records with subxid assignment even * when the snapshot is being built: it is possible to get later records * that require subxids to be properly assigned. */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT && - info != XLOG_XACT_ASSIGNMENT) + !TransactionIdIsValid(XLogRecGetTopXid(r))) return; switch (info) @@ -264,22 +280,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_ASSIGNMENT: - { - xl_xact_assignment *xlrec; - int i; - TransactionId *sub_xid; - - xlrec = (xl_xact_assignment *) XLogRecGetData(r); - - sub_xid = &xlrec->xsub[0]; - - for (i = 0; i < xlrec->nsubxacts; i++) - { - ReorderBufferAssignChild(reorder, xlrec->xtop, - *(sub_xid++), buf->origptr); - } - break; - } + break; case XLOG_XACT_PREPARE: /* diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 7ee04babc2..8645b3816c 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg); extern void RegisterSubXactCallback(SubXactCallback callback, void *arg); extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); +extern bool IsSubTransactionAssignmentPending(void); +extern void MarkSubTransactionAssigned(void); + extern int xactGetCommittedChildren(TransactionId **ptr); extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0a12afb59e..3289ad753a 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -237,6 +237,7 @@ extern bool XLOG_DEBUG; */ #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ +#define XLOG_INCLUDE_XID 0x04 /* include XID of toplevel xact */ /* Checkpoint statistics */ diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 4582196e18..756f6df8cf 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -150,6 +150,8 @@ struct XLogReaderState RepOriginId record_origin; + TransactionId toplevel_xid; /* XID of toplevel transaction */ + /* information about blocks referenced by the record. */ DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; @@ -285,6 +287,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) #define XLogRecGetOrigin(decoder) ((decoder)->record_origin) +#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) #define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index acd9af0194..2f0c8bf589 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong #define XLR_BLOCK_ID_DATA_SHORT 255 #define XLR_BLOCK_ID_DATA_LONG 254 #define XLR_BLOCK_ID_ORIGIN 253 +#define XLR_BLOCK_ID_TOPLEVEL_XID 252 #endif /* XLOGRECORD_H */ -- 2.23.0