From 0b378ffc8bf286a4e9788d135c1061eaaa19d539 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 1 May 2020 19:56:35 +0530 Subject: [PATCH v20 05/12] Implement streaming mode in ReorderBuffer Instead of serializing the transaction to disk after reaching the maximum number of changes in memory (4096 changes), we consume the changes we have in memory and invoke new stream API methods. This happens in ReorderBufferStreamTXN() using about the same logic as in ReorderBufferCommit() logic. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages. This adds a second iterator for the streaming case, without the spill-to-disk functionality and only processing changes currently in memory. Theoretically, we could get rid of the k-way merge, and append the changes to the toplevel xact directly (and remember the position in the list in case the subxact gets aborted later). It also adds ReorderBufferTXN pointer to two places: * ReorderBufferChange, so that we know which xact it belongs to * ReorderBufferTXN, pointing to toplevel xact (from subxact) The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). --- src/backend/access/heap/heapam_visibility.c | 38 +- .../replication/logical/reorderbuffer.c | 712 ++++++++++++++++-- src/include/replication/reorderbuffer.h | 36 + 3 files changed, 699 insertions(+), 87 deletions(-) diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index dba10890aa..160b167adb 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -1571,8 +1571,23 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); + /* + * If we haven't resolved the combocid to cmin/cmax, that means + * we have not decoded the combocid yet. That means the cmin is + * definitely in the future, and we're not supposed to see the + * tuple yet. + * + * XXX This only applies to decoding of in-progress transactions. + * In regular logical decoding we only execute this code at commit + * time, at which point we should have seen all relevant combocids. + * So we should error out in this case. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ if (!resolved) - elog(ERROR, "could not resolve cmin/cmax of catalog tuple"); + return false; Assert(cmin != InvalidCommandId); @@ -1642,10 +1657,23 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); - if (!resolved) - elog(ERROR, "could not resolve combocid to cmax"); - - Assert(cmax != InvalidCommandId); + /* + * If we haven't resolved the combocid to cmin/cmax, that means + * we have not decoded the combocid yet. That means the cmax is + * definitely in the future, and we're still supposed to see the + * tuple. + * + * XXX This only applies to decoding of in-progress transactions. + * In regular logical decoding we only execute this code at commit + * time, at which point we should have seen all relevant combocids. + * So we should error out in this case. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ + if (!resolved || cmax == InvalidCommandId) + return true; if (cmax >= snapshot->curcid) return true; /* deleted after scan started */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b889edf461..bc5821b2bf 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -236,6 +236,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -244,6 +245,15 @@ static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid); +/* + * --------------------------------------- + * Streaming support functions + * --------------------------------------- + */ +static inline bool ReorderBufferCanStream(ReorderBuffer *rb); +static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn); + /* --------------------------------------- * toast reassembly support * --------------------------------------- @@ -371,6 +381,9 @@ ReorderBufferGetTXN(ReorderBuffer *rb) dlist_init(&txn->tuplecids); dlist_init(&txn->subtxns); + /* InvalidCommandId is not zero, so set it explicitly */ + txn->command_id = InvalidCommandId; + return txn; } @@ -772,6 +785,38 @@ AssertTXNLsnOrder(ReorderBuffer *rb) #endif } +/* + * AssertChangeLsnOrder + * + * Check ordering of changes in the toplevel transaction. + */ +static void +AssertChangeLsnOrder(ReorderBufferTXN *txn) +{ +#ifdef USE_ASSERT_CHECKING + dlist_iter iter; + XLogRecPtr prev_lsn = txn->first_lsn; + + dlist_foreach(iter, &txn->changes) + { + ReorderBufferChange *cur_change; + + cur_change = dlist_container(ReorderBufferChange, node, iter.cur); + + Assert(txn->first_lsn != InvalidXLogRecPtr); + Assert(cur_change->lsn != InvalidXLogRecPtr); + Assert(txn->first_lsn <= cur_change->lsn); + + if (txn->end_lsn != InvalidXLogRecPtr) + Assert(cur_change->lsn <= txn->end_lsn); + + Assert(prev_lsn <= cur_change->lsn); + + prev_lsn = cur_change->lsn; + } +#endif +} + /* * ReorderBufferGetOldestTXN * Return oldest transaction in reorderbuffer @@ -865,6 +910,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, subtxn->toplevel_xid = xid; Assert(subtxn->nsubtxns == 0); + /* set the reference to toplevel transaction */ + subtxn->toptxn = txn; + /* add to subtransaction list */ dlist_push_tail(&txn->subtxns, &subtxn->node); txn->nsubtxns++; @@ -988,7 +1036,7 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, */ /* - * Binary heap comparison function. + * Binary heap comparison function (regular non-streaming iterator). */ static int ReorderBufferIterCompare(Datum a, Datum b, void *arg) @@ -1024,6 +1072,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, *iter_state = NULL; + /* Check ordering of changes in the toplevel transaction. */ + AssertChangeLsnOrder(txn); + /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder @@ -1038,6 +1089,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + /* Check ordering of changes in this subtransaction. */ + AssertChangeLsnOrder(cur_txn); + if (cur_txn->nentries > 0) nr_txns++; } @@ -1315,6 +1369,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) dlist_delete(&txn->base_snapshot_node); } + /* + * Cleanup the snapshot for the last streamed run. + */ + if (txn->snapshot_now != NULL) + { + Assert(rbtxn_is_streamed(txn)); + ReorderBufferFreeSnap(rb, txn->snapshot_now); + } + /* * Remove TXN from its containing list. * @@ -1340,9 +1403,94 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferReturnTXN(rb, txn); } +/* + * Discard changes from a transaction (and subtransactions), after streaming + * them. Keep the remaining info - transactions, tuplecids and snapshots. + */ +static void +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + dlist_mutable_iter iter; + + /* cleanup subtransactions & their changes */ + dlist_foreach_modify(iter, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); + + /* + * Subtransactions are always associated to the toplevel TXN, even if + * they originally were happening inside another subtxn, so we won't + * ever recurse more than one level deep here. + */ + Assert(rbtxn_is_known_subxact(subtxn)); + Assert(subtxn->nsubtxns == 0); + + ReorderBufferTruncateTXN(rb, subtxn); + } + + /* cleanup changes in the toplevel txn */ + dlist_foreach_modify(iter, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* remove the change from it's containing list */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change); + } + + /* + * Mark the transaction as streamed. + * + * The toplevel transaction, identified by (toptxn==NULL), is marked + * as streamed always, even if it does not contain any changes (that + * is, when all the changes are in subtransactions). + * + * For subtransactions, we only mark them as streamed when there are + * any changes in them. + * + * We do it this way because of aborts - we don't want to send aborts + * for XIDs the downstream is not aware of. And of course, it always + * knows about the toplevel xact (we send the XID in all messages), + * but we never stream XIDs of empty subxacts. + */ + if ((!txn->toptxn) || (txn->nentries_mem != 0)) + txn->txn_flags |= RBTXN_IS_STREAMED; + + /* + * Destroy the (relfilenode, ctid) hashtable, so that we don't leak + * any memory. We could also keep the hash table and update it with + * new ctid values, but this seems simpler and good enough for now. + */ + if (txn->tuplecid_hash != NULL) + { + hash_destroy(txn->tuplecid_hash); + txn->tuplecid_hash = NULL; + } + + /* also reset the number of entries in the transaction */ + txn->nentries_mem = 0; + txn->nentries = 0; +} + /* * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by - * HeapTupleSatisfiesHistoricMVCC. + * tqual.c's HeapTupleSatisfiesHistoricMVCC. + * + * We do build the hash table even if there are no CIDs. That's + * because when streaming in-progress transactions we may run into + * tuples with the CID before actually decoding them. Think e.g. about + * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded + * yet when applying the INSERT. So we build a hash table so that + * ResolveCminCmaxDuringDecoding does not segfault in this case. + * + * XXX We might limit this behavior to streaming mode, and just bail + * out when decoding transaction at commit time (at which point it's + * guaranteed to see all CIDs). */ static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) @@ -1350,9 +1498,6 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) dlist_iter iter; HASHCTL hash_ctl; - if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids)) - return; - memset(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey); @@ -1491,59 +1636,76 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) } /* - * Perform the replay of a transaction and its non-aborted subtransactions. - * - * Subtransactions previously have to be processed by - * ReorderBufferCommitChild(), even if previously assigned to the toplevel - * transaction with ReorderBufferAssignChild. - * - * We currently can only decode a transaction's contents when its commit - * record is read because that's the only place where we know about cache - * invalidations. Thus, once a toplevel commit is read, we iterate over the top - * and subtransactions (using a k-way merge) and replay the changes in lsn - * order. + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, - XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) +static void +ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) { - ReorderBufferTXN *txn; - volatile Snapshot snapshot_now; - volatile CommandId command_id = FirstCommandId; - bool using_subtxn; - ReorderBufferIterTXNState *volatile iterstate = NULL; + /* we should only call this for previously streamed transactions */ + Assert(rbtxn_is_streamed(txn)); - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); + ReorderBufferStreamTXN(rb, txn); - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; + rb->stream_commit(rb, txn, txn->final_lsn); - txn->final_lsn = commit_lsn; - txn->end_lsn = end_lsn; - txn->commit_time = commit_time; - txn->origin_id = origin_id; - txn->origin_lsn = origin_lsn; + ReorderBufferCleanupTXN(rb, txn); +} +/* + * Set xid for concurrent abort check + * + * While streaming an in-progress transaction there is a possibility that the + * (sub)transaction might get aborted concurrently. In such case if the + * (sub)transaction has catalog update then we might decode the tuple using + * wrong catalog version. So for detecting the concurrent abort we set + * CheckXidAlive to the current (sub)transaction's xid for which this change + * belongs to. And, during catalog scan we can check the status of the xid and + * if it is aborted we will report a specific error that we can ignore. We + * might have already streamed some of the changes for the aborted + * (sub)transaction, but that is fine because when we decode the abort we will + * stream abort message to truncate the changes in the subscriber. + */ +static inline void +SetupCheckXidLive(TransactionId xid) +{ /* - * If this transaction has no snapshot, it didn't make any changes to the - * database, so there's nothing to decode. Note that - * ReorderBufferCommitChild will have transferred any snapshots from - * subtransactions if there were any. + * setup CheckXidAlive if it's not committed yet. We don't check if the xid + * aborted. That will happen during catalog access. Also reset the + * sysbegin_called flag. */ - if (txn->base_snapshot == NULL) + if (!TransactionIdDidCommit(xid)) { - Assert(txn->ninvalidations == 0); - ReorderBufferCleanupTXN(rb, txn); - return; + CheckXidAlive = xid; + bsysscan = false; } + else + CheckXidAlive = InvalidTransactionId; +} - snapshot_now = txn->base_snapshot; +/* + * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN + * + * Send data of a transaction (and its subtransactions) to the + * output plugin. If streaming is true then data will be sent using stream API. + */ +static void +ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn, + volatile Snapshot snapshot_now, + volatile CommandId command_id, + bool streaming) +{ + bool using_subtxn; + MemoryContext ccxt = CurrentMemoryContext; + ReorderBufferIterTXNState *volatile iterstate = NULL; + volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr; + ReorderBufferChange *volatile specinsert = NULL; - /* build data to be able to lookup the CommandIds of catalog tuples */ + /* + * build data to be able to lookup the CommandIds of catalog tuples + */ ReorderBufferBuildTupleCidHash(rb, txn); /* setup the initial snapshot */ @@ -1564,14 +1726,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_TRY(); { ReorderBufferChange *change; - ReorderBufferChange *specinsert = NULL; if (using_subtxn) - BeginInternalSubTransaction("replay"); + BeginInternalSubTransaction("stream"); else StartTransactionCommand(); - rb->begin(rb, txn); + /* start streaming this chunk of transaction */ + if (streaming) + rb->stream_start(rb, txn); + else + rb->begin(rb, txn); ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) @@ -1579,6 +1744,19 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, Relation relation = NULL; Oid reloid; + /* + * Enforce correct ordering of changes, merged from multiple + * subtransactions. The changes may have the same LSN due to + * MULTI_INSERT xlog records. + */ + Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn); + + prev_lsn = change->lsn; + + /* Set the xid for concurrent abort check. */ + if (streaming) + SetupCheckXidLive(change->txn->xid); + switch (change->action) { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -1588,8 +1766,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * use as a normal record. It'll be cleaned up at the end * of INSERT processing. */ - if (specinsert == NULL) - elog(ERROR, "invalid ordering of speculative insertion changes"); Assert(specinsert->data.tp.oldtuple == NULL); change = specinsert; change->action = REORDER_BUFFER_CHANGE_INSERT; @@ -1655,7 +1831,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (!IsToastRelation(relation)) { ReorderBufferToastReplace(rb, txn, relation, change); - rb->apply_change(rb, txn, relation, change); + if (streaming) + { + rb->stream_change(rb, txn, relation, change); + + /* Remember that we have sent some data for this xid. */ + change->txn->any_data_sent = true; + } + else + rb->apply_change(rb, txn, relation, change); /* * Only clear reassembled toast chunks if we're sure @@ -1676,8 +1860,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * freed/reused while restoring spooled data from * disk. */ - Assert(change->data.tp.newtuple != NULL); - dlist_delete(&change->node); ReorderBufferToastAppendChunk(rb, txn, relation, change); @@ -1695,7 +1877,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = NULL; } - if (relation != NULL) + if (RelationIsValid(relation)) { RelationClose(relation); relation = NULL; @@ -1753,7 +1935,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, relations[nrelations++] = relation; } - rb->apply_truncate(rb, txn, nrelations, relations, change); + if (streaming) + { + rb->stream_truncate(rb, txn, nrelations, relations, change); + + /* Remember that we have sent some data. */ + change->txn->any_data_sent = true; + } + else + rb->apply_truncate(rb, txn, nrelations, relations, change); for (i = 0; i < nrelations; i++) RelationClose(relations[i]); @@ -1762,10 +1952,16 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } case REORDER_BUFFER_CHANGE_MESSAGE: - rb->message(rb, txn, change->lsn, true, - change->data.msg.prefix, - change->data.msg.message_size, - change->data.msg.message); + if (streaming) + rb->stream_message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); + else + rb->message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: @@ -1796,7 +1992,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, snapshot_now = change->data.snapshot; } - /* and continue with the new one */ SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); break; @@ -1858,14 +2053,46 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + /* + * Done with current changes, call stream_stop callback for streaming + * transaction, commit callback otherwise. + */ + if (streaming) + { + /* + * Set the last of the stream as the final lsn before calling + * stream stop. + */ + if (!XLogRecPtrIsInvalid(prev_lsn)) + txn->final_lsn = prev_lsn; + rb->stream_stop(rb, txn); + } + else + rb->commit(rb, txn, commit_lsn); /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) elog(ERROR, "output plugin used XID %u", GetCurrentTransactionId()); + /* + * Remember the command ID and snapshot if transaction is streaming + * otherwise free the snapshot if we have copied it. + */ + if (streaming) + { + txn->command_id = command_id; + + /* Avoid copying if it's already copied. */ + if (snapshot_now->copied) + txn->snapshot_now = snapshot_now; + else + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + } + else if (snapshot_now->copied) + ReorderBufferFreeSnap(rb, snapshot_now); + /* cleanup */ TeardownHistoricSnapshot(false); @@ -1884,14 +2111,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - if (snapshot_now->copied) - ReorderBufferFreeSnap(rb, snapshot_now); - - /* remove potential on-disk data, and deallocate */ - ReorderBufferCleanupTXN(rb, txn); + /* + * If we are streaming the in-progress transaction then discard the + * changes that we just streamed, and mark the transactions as streamed + * (if they contained changes). Otherwise, remove all the changes and + * deallocate the ReorderBufferTXN. + */ + if (streaming) + ReorderBufferTruncateTXN(rb, txn); + else + ReorderBufferCleanupTXN(rb, txn); } PG_CATCH(); { + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ if (iterstate) ReorderBufferIterTXNFinish(rb, iterstate); @@ -1911,17 +2146,130 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - if (snapshot_now->copied) - ReorderBufferFreeSnap(rb, snapshot_now); - - /* remove potential on-disk data, and deallocate */ - ReorderBufferCleanupTXN(rb, txn); + if (streaming) + { + /* Discard the changes that we just streamed. */ + ReorderBufferTruncateTXN(rb, txn); - PG_RE_THROW(); + /* Re-throw only if it's not an abort. */ + if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK) + { + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + else + { + FlushErrorState(); + FreeErrorData(errdata); + errdata = NULL; + + /* remember the command ID and snapshot for the streaming run */ + txn->command_id = command_id; + + /* Avoid copying if it's already copied. */ + if (snapshot_now->copied) + txn->snapshot_now = snapshot_now; + else + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + /* + * Set the last last of the stream as the final lsn before + * calling stream stop. + */ + txn->final_lsn = prev_lsn; + rb->stream_stop(rb, txn); + ReorderBufferToastReset(rb, txn); + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + } + } + else + { + ReorderBufferCleanupTXN(rb, txn); + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } } PG_END_TRY(); } +/* + * Perform the replay of a transaction and its non-aborted subtransactions. + * + * Subtransactions previously have to be processed by + * ReorderBufferCommitChild(), even if previously assigned to the toplevel + * transaction with ReorderBufferAssignChild. + * + * We currently can only decode a transaction's contents when its commit + * record is read because that's the only place where we know about cache + * invalidations. Thus, once a toplevel commit is read, we iterate over the top + * and subtransactions (using a k-way merge) and replay the changes in lsn + * order. + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + Snapshot snapshot_now; + CommandId command_id = FirstCommandId; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + /* + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. + * + * XXX Called after everything (origin ID and LSN, ...) is stored in the + * transaction, so we don't pass that directly. + * + * XXX Somewhat hackish redirection, perhaps needs to be refactored? + */ + if (rbtxn_is_streamed(txn)) + { + ReorderBufferStreamCommit(rb, txn); + return; + } + + /* + * If this transaction has no snapshot, it didn't make any changes to the + * database, so there's nothing to decode. Note that + * ReorderBufferCommitChild will have transferred any snapshots from + * subtransactions if there were any. + */ + if (txn->base_snapshot == NULL) + { + Assert(txn->ninvalidations == 0); + ReorderBufferCleanupTXN(rb, txn); + return; + } + + snapshot_now = txn->base_snapshot; + + /* + * Access the main routine to decode the changes and send to output plugin. + */ + ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now, + command_id, false); +} + /* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. @@ -1946,6 +2294,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + /* + * If the (sub)transaction was streamed, notify the remote node + * about the abort only if we have sent any data for this transaction. + */ + if (rbtxn_is_streamed(txn) && txn->any_data_sent) + rb->stream_abort(rb, txn, lsn); + /* cosmetic... */ txn->final_lsn = lsn; @@ -2015,6 +2370,13 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + /* + * If the (sub)transaction was streamed, notify the remote node + * about the abort only if we have sent any data for this transaction. + */ + if (rbtxn_is_streamed(txn) && txn->any_data_sent) + rb->stream_abort(rb, txn, lsn); + /* cosmetic... */ txn->final_lsn = lsn; @@ -2150,8 +2512,17 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, } /* - * Update the memory accounting info. We track memory used by the whole - * reorder buffer and the transaction containing the change. + * Update memory counters to account for the new or removed change. + * + * We update two counters - in the reorder buffer, and in the transaction + * containing the change. The reorder buffer counter allows us to quickly + * decide if we reached the memory limit, the transaction counter allows + * us to quickly pick the largest transaction for eviction. + * + * When streaming is enabled, we need to update the toplevel transaction + * counters instead - we don't really care about subtransactions as we + * can't stream them individually anyway, and we only pick toplevel + * transactions for eviction. So only toplevel transactions matter. */ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, @@ -2159,6 +2530,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, bool addition) { Size sz; + ReorderBufferTXN *txn; Assert(change->txn); @@ -2170,19 +2542,28 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) return; + txn = change->txn; + + /* if subxact, and streaming supported, use the toplevel instead */ + if (txn->toptxn && ReorderBufferCanStream(rb)) + txn = txn->toptxn; + sz = ReorderBufferChangeSize(change); if (addition) { - change->txn->size += sz; + txn->size += sz; rb->size += sz; } else { - Assert((rb->size >= sz) && (change->txn->size >= sz)); - change->txn->size -= sz; + Assert((rb->size >= sz) && (txn->size >= sz)); + txn->size -= sz; rb->size -= sz; } + + Assert(txn->size <= rb->size); + Assert((txn->size >= 0) && (rb->size >= 0)); } /* @@ -2211,6 +2592,7 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, change->lsn = lsn; change->txn = txn; change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; + change->txn = txn; dlist_push_tail(&txn->tuplecids, &change->node); txn->ntuplecids++; @@ -2295,6 +2677,13 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + + /* + * TOCHECK: Mark toplevel transaction as having catalog changes too + * if one of its children has. + */ + if (txn->toptxn != NULL) + txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; } /* @@ -2398,6 +2787,38 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) return largest; } +/* + * Find the largest toplevel transaction to evict (by streaming). + * + * This can be seen as an optimized version of ReorderBufferLargestTXN, which + * should give us the same transaction (because we don't update memory account + * for subtransaction with streaming, so it's always 0). But we can simply + * iterate over the limited number of toplevel transactions. + */ +static ReorderBufferTXN * +ReorderBufferLargestTopTXN(ReorderBuffer *rb) +{ + dlist_iter iter; + ReorderBufferTXN *largest = NULL; + + dlist_foreach(iter, &rb->toplevel_by_lsn) + { + ReorderBufferTXN *txn; + + txn = dlist_container(ReorderBufferTXN, node, iter.cur); + + /* if the current transaction is larger, remember it */ + if ((!largest) || (txn->size > largest->size)) + largest = txn; + } + + Assert(largest); + Assert(largest->size > 0); + Assert(largest->size <= rb->size); + + return largest; +} + /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the transaction to evict and spill the changes to disk. @@ -2418,15 +2839,46 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) /* * Pick the largest transaction (or subtransaction) and evict it from - * memory by serializing it to disk. + * memory by streaming, if supported. Otherwise spill to disk. */ - txn = ReorderBufferLargestTXN(rb); + if (ReorderBufferCanStream(rb)) + { + /* + * Pick the largest toplevel transaction and evict it from memory by + * streaming the already decoded part. + */ + txn = ReorderBufferLargestTopTXN(rb); + + /* we know there has to be one, because the size is not zero */ + Assert(txn && !txn->toptxn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); + + ReorderBufferStreamTXN(rb, txn); + } + else + { + /* + * Pick the largest transaction (or subtransaction) and evict it from + * memory by serializing it to disk. + */ + txn = ReorderBufferLargestTXN(rb); - ReorderBufferSerializeTXN(rb, txn); + /* we know there has to be one, because the size is not zero */ + Assert(txn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); + + ReorderBufferSerializeTXN(rb, txn); + } /* * After eviction, the transaction should have no entries in memory, and * should use 0 bytes for changes. + * + * XXX Checking the size is fine for both cases - spill to disk and + * streaming. But for streaming we should really check nentries_mem for + * all subtransactions too. */ Assert(txn->size == 0); Assert(txn->nentries_mem == 0); @@ -2746,6 +3198,102 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Assert(ondisk->change.action == change->action); } +static inline bool +ReorderBufferCanStream(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + + return ctx->streaming; +} + +/* + * Send data of a large transaction (and its subtransactions) to the + * output plugin, but using the stream API. + * + * XXX Do we need to check if the transaction has some changes to stream + * (maybe it got streamed right before the commit, which attempts to + * stream it again before the commit)? + */ +static void +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + Snapshot snapshot_now; + CommandId command_id; + + /* We can never reach here for a sub transaction. */ + Assert(txn->toptxn == NULL); + + /* + * XXX Not sure if we can make any assumptions about base snapshot here, + * similarly to what ReorderBufferCommit() does. That relies on + * base_snapshot getting transferred from subxact in + * ReorderBufferCommitChild(), but that was not yet called as the + * transaction is in-progress. + * + * So just walk the subxacts and use the same logic here. But we only need + * to do that once, when the transaction is streamed for the first time. + * After that we need to reuse the snapshot from the previous run. + */ + if (txn->snapshot_now == NULL) + { + dlist_iter subxact_i; + + /* make sure this transaction is streamed for the first time */ + Assert(!rbtxn_is_streamed(txn)); + + /* at the beginning we should have invalid command ID */ + Assert(txn->command_id == InvalidCommandId); + + dlist_foreach(subxact_i, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur); + ReorderBufferTransferSnapToParent(txn, subtxn); + } + + command_id = FirstCommandId; + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot, + txn, command_id); + } + else + { + /* the transaction must have been already streamed */ + Assert(rbtxn_is_streamed(txn)); + + /* + * Nah, we already have snapshot from the previous streaming run. We + * assume new subxacts can't move the LSN backwards, and so can't beat + * the LSN condition in the previous branch (so no need to walk + * through subxacts again). In fact, we must not do that as we may be + * using snapshot half-way through the subxact. + */ + command_id = txn->command_id; + + /* + * We can not use txn->snapshot_now directly because after the last + * streaming run, we might have got some new sub-transactions. So we + * need to add them to the snapshot. + */ + snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now, + txn, command_id); + + /* Free the previously copied snapshot. */ + Assert(txn->snapshot_now->copied); + ReorderBufferFreeSnap(rb, txn->snapshot_now); + } + + /* + * Access the main routine to decode the changes and send to output plugin. + */ + ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, + command_id, true); + + Assert(dlist_is_empty(&txn->changes)); + Assert(txn->nentries == 0); + Assert(txn->nentries_mem == 0); +} + /* * Size of a change in memory. */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index e102840486..6d65986a82 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -171,6 +171,7 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_STREAMED 0x0008 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -190,6 +191,24 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ ) +/* + * Has this transaction been streamed to downstream? + * + * (It's not possible to deduce this from nentries and nentries_mem for + * various reasons. For example, all changes may be in subtransactions in + * which case we'd have nentries==0 for the toplevel one, which would say + * nothing about the streaming. So we maintain this flag, but only for the + * toplevel transaction.) + * + * Note: We never do both stream and serialize a transaction (we only spill + * to disk when streaming is not supported by the plugin), so only one of + * those two flags may be set at any given time. + */ +#define rbtxn_is_streamed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ +) + typedef struct ReorderBufferTXN { /* See above */ @@ -224,6 +243,16 @@ typedef struct ReorderBufferTXN */ XLogRecPtr final_lsn; + /* + * Have we sent any changes for this transaction in output plugin? + */ + bool any_data_sent; + + /* + * Toplevel transaction for this subxact (NULL for top-level). + */ + struct ReorderBufferTXN *toptxn; + /* * LSN pointing to the end of the commit record + 1. */ @@ -254,6 +283,13 @@ typedef struct ReorderBufferTXN XLogRecPtr base_snapshot_lsn; dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ + /* + * Snapshot/CID from the previous streaming run. Only valid for already + * streamed transactions (NULL/InvalidCommandId otherwise). + */ + Snapshot snapshot_now; + CommandId command_id; + /* * How many ReorderBufferChange's do we have in this txn. * -- 2.23.0