From 20ece623faf93722bc18d4c8b71e81f376d5dab7 Mon Sep 17 00:00:00 2001 From: Nikhil Sontakke Date: Thu, 5 Apr 2018 19:43:01 +0530 Subject: [PATCH 3/5] Support decoding of two-phase transactions at PREPARE Until now two-phase transactions were decoded at COMMIT, just like regular transaction. During replay, two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. This patch allows PREPARE-time decoding two-phase transactions (if the output plugin supposts this capability), in which case the transactions are replayed at PREPARE and then committed later when COMMIT PREPARED arrives. On the subscriber, the transactions will be executed as two-phase transactions, with the same GID. This is important for various external transaction managers, that often encode information into the GID itself. All catalog access while decoding of such 2PC has to be carried out via the use of LogicalLockTransaction/LogicalUnlockTransaction APIs at relevant locations. This includes the location where the output plugin's change apply API is to be invoked. This protects any catalog access inside the output plugin's change apply API from concurrent rollback operations. Includes documentation changes. --- doc/src/sgml/logicaldecoding.sgml | 128 +++++++++++++- src/backend/access/transam/twophase.c | 8 + src/backend/replication/logical/decode.c | 147 +++++++++++++-- src/backend/replication/logical/logical.c | 202 +++++++++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 226 +++++++++++++++++++++--- src/include/replication/logical.h | 11 +- src/include/replication/output_plugin.h | 45 +++++ src/include/replication/reorderbuffer.h | 54 ++++++ 8 files changed, 783 insertions(+), 38 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index f6b14dccb0..b11752789d 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -384,7 +384,12 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeAbortCB abort_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; @@ -454,7 +459,13 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); never get decoded. Successful savepoints are folded into the transaction containing them in the order they were - executed within that transaction. + executed within that transaction. A transaction that is prepared for + a two-phase commit using PREPARE TRANSACTION will + also be decoded if the output plugin callbacks needed for decoding + them are provided. It is possible that the current transaction which + is being decoded is aborted concurrently via a ROLLBACK PREPARED + command. In that case, the logical decoding of this transaction will + be aborted too. @@ -555,6 +566,74 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, + + Transaction Prepare Callback + + + The optional prepare_cb callback is called whenever + a transaction which is prepared for two-phase commit has been + decoded. The change_cb callbacks for all modified + rows will have been called before this, if there have been any modified + rows. + +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + + + + + + Commit Prepared Transaction Callback + + + The optional commit_prepared_cb callback is called whenever + a commit prepared transaction has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + + + + + + Rollback Prepared Transaction Callback + + + The optional abort_prepared_cb callback is called whenever + a rollback prepared transaction has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + + + Transaction Abort Callback + + + The required abort_cb callback is called whenever + a transaction abort has to be initiated. This can happen if we are + decoding a transaction that has been prepared for two-phase commit and + a concurrent rollback happens while we are decoding it. It might make + sense, even before we commence decoding, in such cases to check if the + rollback happened even before we start looking at the changes to + completely avoid the decoding of such transactions. + +typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + Change Callback @@ -564,7 +643,12 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, an INSERT, UPDATE, or DELETE. Even if the original command modified several rows at once the callback will be called individually for each - row. + row. The change_cb callback may access system or + user catalog tables to aid in the process of outputting the row + modification details. In case of decoding a prepared (but yet + uncommitted) transaction or decoding of an uncommitted transaction, this + change callback is ensured sane access to catalog tables regardless of + simultaneous rollback by another backend of this very same transaction. typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -619,6 +703,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct + + Prepare Filter Callback + + + The optional filter_prepare_cb callback + is called to determine whether data that is part of the current + two-phase commit transaction should be considered for decode + at this prepare stage or as a regular one-phase transaction at + COMMIT PREPARED time later. To signal that + decoding should be skipped, return true; + false otherwise. When the callback is not + defined, false is assumed (i.e. nothing is + filtered). + +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + + The ctx parameter has the same contents + as for the other callbacks. The txn parameter + contains meta information about the transaction. The xid + contains the XID because txn can be NULL in some cases. + The gid is the identifier that later identifies this + transaction for COMMIT PREPARED or ROLLBACK PREPARED. + + + The callback has to provide the same static answer for a given combination of + xid and gid every time it is + called. + + + Generic Message Callback @@ -640,7 +757,12 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, non-transactional and the XID was not assigned yet in the transaction which logged the message. The lsn has WAL location of the message. The transactional says - if the message was sent as transactional or not. + if the message was sent as transactional or not. Similar to the change + callback, in case of decoding a prepared (but yet uncommitted) + transaction or decoding of an uncommitted transaction, this message + callback is ensured sane access to catalog tables regardless of + simultaneous rollback by another backend of this very same transaction. + The prefix is arbitrary null-terminated prefix which can be used for identifying interesting messages for the current plugin. And finally the message parameter holds diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index d6e4b7980f..30ebe5e72d 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1507,6 +1507,14 @@ FinishPreparedTransaction(const char *gid, bool isCommit) ProcArrayRemove(proc, latestXid); + /* + * Coordinate with logical decoding backends that may be already + * decoding this prepared transaction. When aborting a transaction, + * we need to wait for all of them to leave the decoding group. If + * committing, we simply remove all members from the group. + */ + LogicalDecodeRemoveTransaction(proc, isCommit); + /* * In case we fail while running the callbacks, mark the gxact invalid so * no one else will try to commit/rollback, and so it will be recycled if diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 6eb0d5527e..51d544d0f5 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -34,6 +34,7 @@ #include "access/xlogutils.h" #include "access/xlogreader.h" #include "access/xlogrecord.h" +#include "access/twophase.h" #include "catalog/pg_control.h" @@ -72,6 +73,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -280,16 +283,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + /* check that output plugin is capable of twophase decoding */ + if (!ctx->enable_twophase) + { + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + break; + } + + /* ok, parse it */ + ParsePrepareRecord(XLogRecGetInfo(buf->record), + XLogRecGetData(buf->record), &parsed); + + /* does output plugin want this particular transaction? */ + if (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid, + parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); @@ -627,9 +647,90 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + /* + * Decide if we're processing COMMIT PREPARED, or a regular COMMIT. + * Regular commit simply triggers a replay of transaction changes from the + * reorder buffer. For COMMIT PREPARED that however already happened at + * PREPARE time, and so we only need to notify the subscriber that the GID + * finally committed. + * + * For output plugins that do not support PREPARE-time decoding of + * two-phase transactions, we never even see the PREPARE and all two-phase + * transactions simply fall through to the second branch. + */ + if (TransactionIdIsValid(parsed->twophase_xid) && + ReorderBufferTxnIsPrepared(ctx->reorder, + parsed->twophase_xid, parsed->twophase_gid)) + { + Assert(xid == parsed->twophase_xid); + /* we are processing COMMIT PREPARED */ + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, true); + } + else + { + /* replay actions of all transaction + subtransactions in order */ + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } +} + +/* + * Decode PREPARE record. Similar logic as in COMMIT + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed) +{ + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz commit_time = parsed->origin_timestamp; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + /* + * Process invalidation messages, even if we're not interested in the + * transaction's contents, since the various caches need to always be + * consistent. + */ + if (parsed->nmsgs > 0) + { + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + } + + /* + * Tell the reorderbuffer about the surviving subtransactions. We need to + * do this because the main transaction itself has not committed since we + * are in the prepare phase right now. So we need to be sure the snapshot + * is setup correctly for the main transaction in case all changes + * happened in subtransanctions + */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); + } + ReorderBufferForget(ctx->reorder, xid, buf->origptr); + + return; + } + /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid); } /* @@ -641,6 +742,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = 0; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + + /* + * If it's ROLLBACK PREPARED then handle it via callbacks. + */ + if (TransactionIdIsValid(xid) && + !SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) && + parsed->dbId == ctx->slot->data.database && + !FilterByOrigin(ctx, origin_id) && + ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid)) + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, false); + return; + } for (i = 0; i < parsed->nsubxacts; i++) { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 9bb382bb97..08052f6846 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -60,6 +60,16 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -125,6 +135,7 @@ StartupDecodingContext(List *output_plugin_options, MemoryContext context, old_context; LogicalDecodingContext *ctx; + int twophase_callbacks; /* shorter lines... */ slot = MyReplicationSlot; @@ -184,8 +195,38 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->abort = abort_cb_wrapper; + ctx->reorder->filter_prepare = filter_prepare_cb_wrapper; + ctx->reorder->prepare = prepare_cb_wrapper; + ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; + ctx->reorder->abort_prepared = abort_prepared_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + /* + * Check that plugin implements all callbacks necessary to decode + * two-phase transactions - we either have to have all of them or none. + * The filter_prepare callback is optional, but can only be defined when + * two-phase decoding is enabled (i.e. the three other callbacks are + * defined). + */ + twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) + + (ctx->callbacks.commit_prepared_cb != NULL) + + (ctx->callbacks.abort_prepared_cb != NULL); + + ctx->enable_twophase = (twophase_callbacks == 3); + + /* Plugins with incorrect number of two-phase callbacks are broken. */ + if ((twophase_callbacks != 3) && (twophase_callbacks != 0)) + ereport(ERROR, + (errmsg("Output plugin registered only %d twophase callbacks. ", + twophase_callbacks))); + + /* filter_prepare is optional, but requires two-phase decoding */ + if ((ctx->callbacks.filter_prepare_cb != NULL) && (!ctx->enable_twophase)) + ereport(ERROR, + (errmsg("Output plugin does not support two-phase decoding, but " + "registered filter_prepared callback."))); + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -697,6 +738,122 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort"; + state.report_location = txn->final_lsn; /* beginning of abort record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.abort_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "prepare"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "commit_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) @@ -734,6 +891,51 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static bool +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + /* + * Skip if decoding of twophase at PREPARE time is not enabled. In that + * case all twophase transactions are considered filtered out and will be + * applied as regular transactions at COMMIT PREPARED. + */ + if (!ctx->enable_twophase) + return true; + + /* + * The filter_prepare callback is optional. When not supplied, all + * prepared transactions should go through. + */ + if (!ctx->callbacks.filter_prepare_cb) + return false; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_prepare"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + return ret; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 3c9af58640..178a99d158 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -337,6 +337,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* free data that's contained */ + if (txn->gid != NULL) + { + pfree(txn->gid); + txn->gid = NULL; + } if (txn->tuplecid_hash != NULL) { @@ -1277,25 +1282,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) * the top and subtransactions (using a k-way merge) and replay the changes in * lsn order. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, - XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) +static void +ReorderBufferCommitInternal(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; volatile Snapshot snapshot_now; volatile CommandId command_id = FirstCommandId; bool using_subtxn; ReorderBufferIterTXNState *volatile iterstate = NULL; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -1372,8 +1370,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, case REORDER_BUFFER_CHANGE_DELETE: Assert(snapshot_now); + /* Lock transaction before catalog access */ + if (!LogicalLockTransaction(txn)) + goto change_cleanup; reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, change->data.tp.relnode.relNode); + LogicalUnlockTransaction(txn); /* * Catalog tuple without data, emitted while catalog was @@ -1388,8 +1390,14 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, relpathperm(change->data.tp.relnode, MAIN_FORKNUM)); + /* Lock transaction before catalog access */ + if (!LogicalLockTransaction(txn)) + goto change_cleanup; + relation = RelationIdGetRelation(reloid); + LogicalUnlockTransaction(txn); + if (relation == NULL) elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", reloid, @@ -1418,8 +1426,23 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, /* user-triggered change */ if (!IsToastRelation(relation)) { + /* + * Output plugins can access catalog metadata and we + * do not have any control over that. We could ask + * them to call + * LogicalLockTransaction/LogicalUnlockTransaction + * APIs themselves, but that leads to unnecessary + * complications and expectations from plugin + * writers. We avoid this by calling these APIs + * here, thereby ensuring that the in-progress + * transaction will be around for the duration of + * the apply_change call below + */ + if (!LogicalLockTransaction(txn)) + goto change_cleanup; ReorderBufferToastReplace(rb, txn, relation, change); rb->apply_change(rb, txn, relation, change); + LogicalUnlockTransaction(txn); /* * Only clear reassembled toast chunks if we're sure @@ -1492,10 +1515,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, break; case REORDER_BUFFER_CHANGE_MESSAGE: + if (!LogicalLockTransaction(txn)) + goto change_cleanup; rb->message(rb, txn, change->lsn, true, change->data.msg.prefix, change->data.msg.message_size, change->data.msg.message); + LogicalUnlockTransaction(txn); break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: @@ -1565,7 +1591,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, break; } } - +change_cleanup: /* * There's a speculative insertion remaining, just clean in up, it * can't have been successful, otherwise we'd gotten a confirmation @@ -1581,8 +1607,26 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + /* + * Call abort/commit/prepare callback, depending on the transaction + * state. + * + * If the transaction aborted during apply (which currently can happen + * only for prepared transactions), simply call the abort callback. + * + * Otherwise call either PREPARE (for twophase transactions) or COMMIT + * (for regular ones). + */ + if (rbtxn_rollback(txn)) + rb->abort(rb, txn, commit_lsn); + else if (rbtxn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + + /* remove ourself from the decodeGroupLeader */ + if (MyProc->decodeGroupLeader) + RemoveDecodeGroupMember(MyProc->decodeGroupLeader); /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -1609,7 +1653,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (snapshot_now->copied) ReorderBufferFreeSnap(rb, snapshot_now); - /* remove potential on-disk data, and deallocate */ + /* + * remove potential on-disk data, and deallocate. + * + * We remove it even for prepared transactions (GID is enough to + * commit/abort those later). + */ ReorderBufferCleanupTXN(rb, txn); } PG_CATCH(); @@ -1643,6 +1692,141 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_END_TRY(); } + +/* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +bool +ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + return rb->filter_prepare(rb, txn, xid, gid); +} + + +/* + * Commit a transaction. + * + * See comments for ReorderBufferCommitInternal() + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Prepare a twophase transaction. It calls ReorderBufferCommitInternal() + * since all prepared transactions need to be decoded at PREPARE time. + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Check whether this transaction was sent as prepared to subscribers. + * Called while handling commit|abort prepared. + */ +bool +ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* + * Always call the prepare filter. It's the job of the prepare filter to + * give us the *same* response for a given xid across multiple calls + * (including ones on restart) + */ + return !(rb->filter_prepare(rb, txn, xid, gid)); +} + +/* + * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + + /* + * The transaction may or may not exist (during restarts for example). + * Anyways, 2PC transactions do not contain any reorderbuffers. So allow + * it to be created below. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, + true); + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + /* this txn is obviously prepared */ + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + if (is_commit) + { + txn->txn_flags |= RBTXN_COMMIT_PREPARED; + rb->commit_prepared(rb, txn, commit_lsn); + } + else + { + txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; + rb->abort_prepared(rb, txn, commit_lsn); + } + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferCleanupTXN(rb, txn); +} + /* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. @@ -1711,7 +1895,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) if (rbtxn_is_serialized(txn) && txn->final_lsn == 0) { ReorderBufferChange *last = - dlist_tail_element(ReorderBufferChange, node, &txn->changes); + dlist_tail_element(ReorderBufferChange, node, &txn->changes); txn->final_lsn = last->lsn; } @@ -2625,9 +2809,9 @@ ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid XLogSegNoOffsetToRecPtr(segno, 0, recptr, wal_segment_size); snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap", - NameStr(MyReplicationSlot->data.name), - xid, - (uint32) (recptr >> 32), (uint32) recptr); + NameStr(MyReplicationSlot->data.name), + xid, + (uint32) (recptr >> 32), (uint32) recptr); } /* diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 63b14367f0..fbe18dff56 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -46,11 +46,11 @@ typedef struct LogicalDecodingContext struct SnapBuild *snapshot_builder; /* - * Marks the logical decoding context as fast forward decoding one. - * Such a context does not have plugin loaded so most of the the following + * Marks the logical decoding context as fast forward decoding one. Such a + * context does not have plugin loaded so most of the the following * properties are unused. */ - bool fast_forward; + bool fast_forward; OutputPluginCallbacks callbacks; OutputPluginOptions options; @@ -89,6 +89,11 @@ typedef struct LogicalDecodingContext bool prepared_write; XLogRecPtr write_location; TransactionId write_xid; + + /* + * Capabilities of the output plugin. + */ + bool enable_twophase; } LogicalDecodingContext; diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 82875d6b3d..5254210a46 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -68,6 +68,46 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* + * Called for an implicit ABORT of a transaction. + */ +typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + /* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare and + * commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED and + * sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + /* * Called for the generic logical decoding messages. */ @@ -99,7 +139,12 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeAbortCB abort_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 385bb486bb..d890e6628c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "access/twophase.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -179,6 +180,9 @@ typedef struct ReorderBufferTXN */ TransactionId xid; + /* In case of 2PC we need to pass GID to output plugin */ + char *gid; + /* * LSN of the first data carrying, WAL record with knowledge about this * xid. This is allowed to *not* be first record adorned with this xid, if @@ -312,6 +316,37 @@ typedef void (*ReorderBufferCommitCB) ( ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* abort callback signature */ +typedef void (*ReorderBufferAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +typedef bool (*ReorderBufferFilterPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* abort prepared callback signature */ +typedef void (*ReorderBufferAbortPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + /* message callback signature */ typedef void (*ReorderBufferMessageCB) ( ReorderBuffer *rb, @@ -347,6 +382,11 @@ struct ReorderBuffer ReorderBufferBeginCB begin; ReorderBufferApplyChangeCB apply_change; ReorderBufferCommitCB commit; + ReorderBufferAbortCB abort; + ReorderBufferFilterPrepareCB filter_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferAbortPreparedCB abort_prepared; ReorderBufferMessageCB message; /* @@ -394,6 +434,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); @@ -417,6 +462,15 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, + const char *gid); +bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); -- 2.15.1 (Apple Git-101)