From 4bb24a77200ae43bada3c8df3f9f8085cc4acf6c Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 4 Nov 2020 04:16:49 -0500 Subject: [PATCH v16] Support 2PC txn backend Until now two-phase transactions were decoded at COMMIT, just like regular transaction. During replay, two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. This patch allows PREPARE-time decoding two-phase transactions (if the output plugin supports this capability), in which case the transactions are replayed at PREPARE and then committed later when COMMIT PREPARED arrives. Includes backend changes to support decoding of PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED. --- src/backend/replication/logical/decode.c | 213 ++++++++++++++--- src/backend/replication/logical/reorderbuffer.c | 290 ++++++++++++++++++++---- src/include/replication/reorderbuffer.h | 33 +++ 3 files changed, 463 insertions(+), 73 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 3f84ee9..d21d92a 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -67,9 +67,12 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_commit *parsed, TransactionId xid); + xl_xact_parsed_commit *parsed, TransactionId xid, bool prepared); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid); + xl_xact_parsed_abort *parsed, TransactionId xid, bool prepared); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -244,16 +247,22 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_commit *xlrec; xl_xact_parsed_commit parsed; TransactionId xid; + bool prepared; xlrec = (xl_xact_commit *) XLogRecGetData(r); ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); + /* + * If this is COMMIT_PREPARED and the output plugin supports two-phase commits + * then set the prepared flag to true. + */ + prepared = ((info == XLOG_XACT_COMMIT_PREPARED) && ctx->twophase)? true : false; if (!TransactionIdIsValid(parsed.twophase_xid)) xid = XLogRecGetXid(r); else xid = parsed.twophase_xid; - DecodeCommit(ctx, buf, &parsed, xid); + DecodeCommit(ctx, buf, &parsed, xid, prepared); break; } case XLOG_XACT_ABORT: @@ -262,6 +271,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_abort *xlrec; xl_xact_parsed_abort parsed; TransactionId xid; + bool prepared; xlrec = (xl_xact_abort *) XLogRecGetData(r); ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); @@ -270,8 +280,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xid = XLogRecGetXid(r); else xid = parsed.twophase_xid; + /* + * If this is ABORT_PREPARED and the output plugin supports two-phase commits + * then set the prepared flag to true. + */ + prepared = ((info == XLOG_XACT_ABORT_PREPARED) && ctx->twophase) ? true : false; - DecodeAbort(ctx, buf, &parsed, xid); + DecodeAbort(ctx, buf, &parsed, xid, prepared); break; } case XLOG_XACT_ASSIGNMENT: @@ -312,17 +327,35 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } break; case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; + xl_xact_prepare *xlrec; - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); - break; + /* check that output plugin is capable of two-phase decoding */ + if (!ctx->twophase) + { + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + break; + } + + /* ok, parse it */ + xlrec = (xl_xact_prepare *)XLogRecGetData(r); + ParsePrepareRecord(XLogRecGetInfo(buf->record), + xlrec, &parsed); + + /* does output plugin want this particular transaction? */ + if (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid, + parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } @@ -581,16 +614,17 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Consolidated commit record handling between the different form of commit - * records. + * records. Handles both XLOG_XACT_COMMIT and XLOG_XACT_COMMIT_PREPARED. + * prepared is set to true if XLOG_XACT_COMMIT_PREPARED. */ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_commit *parsed, TransactionId xid) + xl_xact_parsed_commit *parsed, TransactionId xid, bool prepared) { - XLogRecPtr origin_lsn = InvalidXLogRecPtr; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; TimestampTz commit_time = parsed->xact_time; RepOriginId origin_id = XLogRecGetOrigin(buf->record); - int i; + int i; if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) { @@ -609,8 +643,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * There can be several reasons we might not be interested in this * transaction: * 1) We might not be interested in decoding transactions up to this - * LSN. This can happen because we previously decoded it and now just - * are restarting or if we haven't assembled a consistent snapshot yet. + * LSN. This can happen because we previously decoded it and now just + * are restarting or if we haven't assembled a consistent snapshot yet. * 2) The transaction happened in another database. * 3) The output plugin is not interested in the origin. * 4) We are doing fast-forwarding @@ -647,34 +681,153 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } - /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + /* + * This function could be called for COMMIT or COMMIT PREPARED (part of a + * two-phase commit) determined by the flag 'prepared'. + * If it is a regular COMMIT we need to replay all actions of the transaction + * in order by calling ReorderBufferCommit. + * + * If it is a COMMIT PREPARED, we check if this has been + * asked to be filtered using the filter prepare callback. If yes, then + * this transaction has not been decoded at PREPARE and needs to be + * handled like a regular COMMIT. + * + * If COMMIT PREPARED and not filtered we only need to call the corresponding + * callbacks as actions of the transaction were already replayed at PREPARE. + */ + if (!prepared || (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(ctx->reorder, xid, parsed->twophase_gid))) + { + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + /* + * Update the decoding stats at transaction commit/abort. It is not clear + * that sending more or less frequently than this would be better. + */ + UpdateDecodingStats(ctx); + } + else + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, true); + } + +} +/* + * Decode PREPARE record. Similar logic as in COMMIT + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed) +{ + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz commit_time = parsed->origin_timestamp; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + /* ---- + * Check whether we are interested in this specific transaction, and tell + * the reorderbuffer to forget the content of the (sub-)transactions + * if not. + * + * There can be several reasons we might not be interested in this + * transaction: + * 1) We might not be interested in decoding transactions up to this + * LSN. This can happen because we previously decoded it and now just + * are restarting or if we haven't assembled a consistent snapshot yet. + * 2) The transaction happened in another database. + * 3) The output plugin is not interested in the origin. + * 4) We are doing fast-forwarding + * + * We can't call ReorderBufferForget like we did in DecodeCommit as the + * txn hasn't yet been committed, removing the reorderbuffers before a + * commit might result in the computation of an incorrect restart_lsn. + * But we need to process cache invalidation if there are any. + * Even if we're not interested in the transaction's contents, it could + * have manipulated the catalog and we need to update the caches accordingly. + */ + + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + return; + + /* + * Tell the reorderbuffer about the surviving subtransactions. We need to + * do this because the main transaction itself has not committed since we + * are in the prepare phase right now. So we need to be sure the snapshot + * is setup correctly for the main transaction in case all changes + * happened in subtransanctions + */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + + /* replay actions of all transaction + subtransactions in order */ + ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid); /* - * Update the decoding stats at transaction commit/abort. It is not clear + * Update the decoding stats at transaction commit/two-phase prepare/abort. It is not clear * that sending more or less frequently than this would be better. */ UpdateDecodingStats(ctx); } + /* * Get the data from the various forms of abort records and pass it on to - * snapbuild.c and reorderbuffer.c + * snapbuild.c and reorderbuffer.c. This could be called either on + * a XLOG_XACT_ABORT or on a XLOG_XACT_ABORT_PREPARED. The prepared flag + * is set if called on a XLOG_XACT_ABORT_PREPARED. */ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid) + xl_xact_parsed_abort *parsed, TransactionId xid, bool prepared) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = 0; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); - for (i = 0; i < parsed->nsubxacts; i++) + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) { - ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], - buf->record->EndRecPtr); + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; } - ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + /* + * If this is a regular ABORT or to be filtered then just clean up by calling + * ReorderBufferAbort, otherwise if not to be skipped or filtered and + * previously prepared then it is a ROLLBACK PREPARED. + */ + if(!prepared || + (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(ctx->reorder, xid, parsed->twophase_gid)) || + FilterByOrigin(ctx, origin_id) || + SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + parsed->dbId != ctx->slot->data.database) + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], + buf->record->EndRecPtr); + } + + ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + + } + else + { + /* ROLLBACK PREPARED of a previously prepared txn, need to call the callbacks.*/ + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, false); + } /* update the decoding stats */ UpdateDecodingStats(ctx); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c1bd680..dff4cdd 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); -static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool txn_prepared); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -421,6 +422,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* free data that's contained */ + if (txn->gid != NULL) + { + pfree(txn->gid); + txn->gid = NULL; + } + if (txn->tuplecid_hash != NULL) { hash_destroy(txn->tuplecid_hash); @@ -1514,12 +1521,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Discard changes from a transaction (and subtransactions), after streaming - * them. Keep the remaining info - transactions, tuplecids, invalidations and - * snapshots. + * Discard changes from a transaction (and subtransactions), either after streaming or + * after a PREPARE. + * The flag txn_prepared indicates if this is called after a PREPARE. + * If streaming, keep the remaining info - transactions, tuplecids, invalidations and + * snapshots. If after a PREPARE, keep only the invalidations and snapshots. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) { dlist_mutable_iter iter; @@ -1538,7 +1547,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); } /* cleanup changes in the txn */ @@ -1572,9 +1581,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn->toptxn) || (txn->nentries_mem != 0)) + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; + if (txn_prepared) + { + /* + * If this is a prepared txn, cleanup the tuplecids we stored for decoding + * catalog snapshot access. + * They are always stored in the toplevel transaction. + */ + dlist_foreach_modify(iter, &txn->tuplecids) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + + /* Remove the change from its containing list. */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change, true); + } + } + /* * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any * memory. We could also keep the hash table and update it with new ctid @@ -1768,9 +1801,23 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferStreamTXN(rb, txn); - rb->stream_commit(rb, txn, txn->final_lsn); - - ReorderBufferCleanupTXN(rb, txn); + if (rbtxn_prepared(txn)) + { + rb->stream_prepare(rb, txn, txn->final_lsn); + /* + * This is a PREPARED transaction, part of a two-phase commit. + * The full cleanup will happen as part of the COMMIT PREPAREDs, so now + * just truncate txn by removing changes and tuple_cids + */ + ReorderBufferTruncateTXN(rb, txn, true); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + { + rb->stream_commit(rb, txn, txn->final_lsn); + ReorderBufferCleanupTXN(rb, txn); + } } /* @@ -1898,8 +1945,12 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn, ReorderBufferChange *specinsert) { - /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn); + /* Discard the changes that we just streamed. + * This can only be called if streaming and not part of a PREPARE in + * a two-phase commit, so set prepared flag as false. + */ + Assert(!rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, false); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -1921,6 +1972,11 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN. * + * We are here due to one of the 3 scenarios: + * 1. As part of streaming an in-progress transactions + * 2. Prepare of a two-phase commit + * 3. Commit of a transaction. + * * Send data of a transaction (and its subtransactions) to the * output plugin. We iterate over the top and subtransactions (using a k-way * merge) and replay the changes in lsn order. @@ -2006,7 +2062,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, prev_lsn = change->lsn; /* Set the current xid to detect concurrent aborts. */ - if (streaming) + if (streaming || rbtxn_prepared(change->txn)) { curtxn = change->txn; SetupCheckXidLive(curtxn->xid); @@ -2297,7 +2353,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } else - rb->commit(rb, txn, commit_lsn); + { + /* + * Call either PREPARE (for two-phase transactions) or COMMIT + * (for regular ones). + */ + if (rbtxn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -2331,18 +2396,32 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, RollbackAndReleaseCurrentSubTransaction(); /* - * If we are streaming the in-progress transaction then discard the - * changes that we just streamed, and mark the transactions as - * streamed (if they contained changes). Otherwise, remove all the - * changes and deallocate the ReorderBufferTXN. + * We are here due to one of the 4 scenarios: + * 1. Prepare of a two-phase commit. + * 2. Prepare of a two-phase commit and a part of streaming in-progress txn. + * 3. streaming of an in-progress txn. + * 3. Commit of a transaction. + * + * Scenario 1 and 2, we handle the same way, pass in prepared as true to + * ReorderBufferTruncateTXN and allow more elaborate truncation of txn data + * as the entire transaction has been decoded, only commit is pending. + * Scenario 3, we pass in prepared as false to ReorderBufferTruncateTXN as + * the txn is not yet completely decoded. + * Scenario 4, all txn has been decoded and we can fully cleanup the TXN reorder buffer. */ - if (streaming) + if (rbtxn_prepared(txn)) { - ReorderBufferTruncateTXN(rb, txn); + ReorderBufferTruncateTXN(rb, txn, true); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } + else if (streaming) + { + ReorderBufferTruncateTXN(rb, txn, false); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } else ReorderBufferCleanupTXN(rb, txn); } @@ -2372,17 +2451,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent - * abort of the (sub)transaction we are streaming. We need to do the + * abort of the (sub)transaction we are streaming or preparing. We need to do the * cleanup and return gracefully on this error, see SetupCheckXidLive. */ if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) { /* - * This error can only occur when we are sending the data in - * streaming mode and the streaming is not finished yet. + * This error can occur either when we are sending the data in + * streaming mode and the streaming is not finished yet or when we are + * sending the data out on a PREPARE during a two-phase commit. */ - Assert(streaming); - Assert(stream_started); + Assert(streaming || rbtxn_prepared(txn)); + Assert(stream_started || rbtxn_prepared(txn)); /* Cleanup the temporary error state. */ FlushErrorState(); @@ -2390,10 +2470,22 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, errdata = NULL; curtxn->concurrent_abort = true; - /* Reset the TXN so that it is allowed to stream remaining data. */ - ReorderBufferResetTXN(rb, txn, snapshot_now, - command_id, prev_lsn, - specinsert); + /* + * If streaming, reset the TXN so that it is allowed to stream remaining data. + * Streaming can also be on a prepared txn, handle it the same way. + */ + if (streaming) + { + ReorderBufferResetTXN(rb, txn, snapshot_now, + command_id, prev_lsn, + specinsert); + } + else + { + elog(LOG, "stopping decoding of %s (%u)", + txn->gid[0] != '\0'? txn->gid:"", txn->xid); + ReorderBufferTruncateTXN(rb, txn, true); + } } else { @@ -2415,23 +2507,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * This interface is called once a toplevel commit is read for both streamed * as well as non-streamed transactions. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, - XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) +static void +ReorderBufferCommitInternal(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; Snapshot snapshot_now; CommandId command_id = FirstCommandId; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -2473,6 +2558,120 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } /* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +bool +ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + return rb->filter_prepare(rb, txn, xid, gid); +} + + +/* + * Commit a transaction. + * + * See comments for ReorderBufferCommitInternal() + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Prepare a two-phase transaction. It calls ReorderBufferCommitInternal() + * since all prepared transactions need to be decoded at PREPARE time. + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Send standalone xact event. This is used to handle COMMIT/ROLLBACK PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + + /* + * The transaction may or may not exist (during restarts for example). + * Anyway, two-phase transactions do not contain any reorderbuffers. So allow + * it to be created below. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, + true); + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + /* this txn is obviously prepared */ + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + if (is_commit) + txn->txn_flags |= RBTXN_COMMIT_PREPARED; + else + txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; + + if (rbtxn_commit_prepared(txn)) + rb->commit_prepared(rb, txn, commit_lsn); + else if (rbtxn_rollback_prepared(txn)) + rb->rollback_prepared(rb, txn, commit_lsn); + + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); + ReorderBufferCleanupTXN(rb, txn); +} + +/* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. * @@ -2515,7 +2714,12 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* cosmetic... */ txn->final_lsn = lsn; - /* remove potential on-disk data, and deallocate */ + /* + * remove potential on-disk data, and deallocate. + * + * We remove it even for prepared transactions (GID is enough to + * commit/abort those later). + */ ReorderBufferCleanupTXN(rb, txn); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 93c79c8..03f777c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -175,6 +175,9 @@ typedef struct ReorderBufferChange #define RBTXN_IS_STREAMED 0x0010 #define RBTXN_HAS_TOAST_INSERT 0x0020 #define RBTXN_HAS_SPEC_INSERT 0x0040 +#define RBTXN_PREPARE 0x0080 +#define RBTXN_COMMIT_PREPARED 0x0100 +#define RBTXN_ROLLBACK_PREPARED 0x0200 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -234,6 +237,24 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ ) +/* Has this transaction been prepared? */ +#define rbtxn_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ +) + +/* Has this prepared transaction been committed? */ +#define rbtxn_commit_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_COMMIT_PREPARED) != 0 \ +) + +/* Has this prepared transaction been rollbacked? */ +#define rbtxn_rollback_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_ROLLBACK_PREPARED) != 0 \ +) + typedef struct ReorderBufferTXN { /* See above */ @@ -614,6 +635,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); @@ -637,6 +663,13 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, + const char *gid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); -- 1.8.3.1