From 32037da69873728bed286f0b4751936f15dd5c81 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 29 Oct 2020 05:59:39 -0400 Subject: [PATCH v14] Support-2PC-txn-base. Until now two-phase transaction commands were translated into regular transactions on the subscriber, and the GID was not forwarded to it. None of the two-phase semantics were communicated to the subscriber. This patch provides infrastructure for logical decoding plugins to be informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED commands with the corresponding GID. Include logical decoding plugin API infrastructure changes. Includes contrib/test_decoding changes. Includes documentation changes. --- contrib/test_decoding/test_decoding.c | 192 +++++++++++++++++++++++++ doc/src/sgml/logicaldecoding.sgml | 145 ++++++++++++++++++- src/backend/replication/logical/logical.c | 229 ++++++++++++++++++++++++++++++ src/include/replication/logical.h | 5 + src/include/replication/output_plugin.h | 46 ++++++ src/include/replication/reorderbuffer.h | 56 ++++++++ 6 files changed, 666 insertions(+), 7 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 8e33614..7eb13ce 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -11,12 +11,16 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "miscadmin.h" +#include "access/transam.h" #include "catalog/pg_type.h" #include "replication/logical.h" #include "replication/origin.h" +#include "storage/procarray.h" + #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -36,6 +40,7 @@ typedef struct bool skip_empty_xacts; bool xact_wrote_changes; bool only_local; + TransactionId check_xid_aborted; /* track abort of this txid */ } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -73,6 +78,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx, static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); @@ -88,6 +96,18 @@ static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); void _PG_init(void) @@ -112,10 +132,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_start_cb = pg_decode_stream_start; cb->stream_stop_cb = pg_decode_stream_stop; cb->stream_abort_cb = pg_decode_stream_abort; + cb->stream_prepare_cb = pg_decode_stream_prepare; cb->stream_commit_cb = pg_decode_stream_commit; cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; cb->stream_truncate_cb = pg_decode_stream_truncate; + cb->filter_prepare_cb = pg_decode_filter_prepare; + cb->prepare_cb = pg_decode_prepare_txn; + cb->commit_prepared_cb = pg_decode_commit_prepared_txn; + cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn; } @@ -127,6 +152,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ListCell *option; TestDecodingData *data; bool enable_streaming = false; + bool enable_2pc = false; data = palloc0(sizeof(TestDecodingData)); data->context = AllocSetContextCreate(ctx->context, @@ -136,6 +162,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->check_xid_aborted = InvalidTransactionId; ctx->output_plugin_private = data; @@ -227,6 +254,40 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "two-phase-commit") == 0) + { + if (elem->arg == NULL) + continue; + else if (!parse_bool(strVal(elem->arg), &enable_2pc)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } + else if (strcmp(elem->defname, "check-xid-aborted") == 0) + { + if (elem->arg == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid-aborted needs an input value"))); + else + { + long xid; + + errno = 0; + xid = strtoul(strVal(elem->arg), NULL, 0); + if (xid == 0 || errno != 0) + data->check_xid_aborted = InvalidTransactionId; + else + data->check_xid_aborted = (TransactionId)xid; + + if (!TransactionIdIsValid(data->check_xid_aborted)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid-aborted is not a valid xid: \"%s\"", + strVal(elem->arg)))); + } + } else { ereport(ERROR, @@ -238,6 +299,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } ctx->streaming &= enable_streaming; + ctx->twophase &= enable_2pc; } /* cleanup this plugin's resources */ @@ -297,6 +359,93 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +/* + * Filter out two-phase transactions. + * + * Each plugin can implement its own filtering logic. Here + * we demonstrate a simple logic by checking the GID. If the + * GID contains the "_nodecode" substring, then we filter + * it out. + */ +static bool +pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + if (strstr(gid, "_nodecode") != NULL) + return true; + + return false; +} + +/* PREPARE callback */ +static void +pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "PREPARE TRANSACTION %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* COMMIT PREPARED callback */ +static void +pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "COMMIT PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* ROLLBACK PREPARED callback */ +static void +pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "ROLLBACK PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) @@ -455,6 +604,25 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } data->xact_wrote_changes = true; + /* if check_xid_aborted is a valid xid, then it was passed in + * as an option to check if the transaction having this xid would be aborted. + * This is to test concurrent aborts. + */ + if (TransactionIdIsValid(data->check_xid_aborted)) + { + elog(LOG, "waiting for %u to abort", data->check_xid_aborted); + while (TransactionIdIsInProgress(data->check_xid_aborted)) + { + CHECK_FOR_INTERRUPTS(); + pg_usleep(10000L); + } + if (!TransactionIdIsInProgress(data->check_xid_aborted) && + !TransactionIdDidCommit(data->check_xid_aborted)) + elog(LOG, "%u aborted", data->check_xid_aborted); + + Assert(TransactionIdDidAbort(data->check_xid_aborted)); + } + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); @@ -646,6 +814,30 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, } static void +pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "preparing streamed transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "preparing streamed transaction"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 813a037..ad8991d 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -387,11 +387,16 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; @@ -413,10 +418,19 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); An output plugin may also define functions to support streaming of large, in-progress transactions. The stream_start_cb, stream_stop_cb, stream_abort_cb, - stream_commit_cb and stream_change_cb + stream_commit_cb, stream_change_cb, + and stream_prepare_cb are required, while stream_message_cb and stream_truncate_cb are optional. + + + An output plugin may also define functions to support two-phase commits, which are + decoded on PREPARE TRANSACTION. The prepare_cb, + stream_prepare_cb, commit_prepared_cb + and rollback_prepared_cb + callbacks are required, while filter_prepare_cb is optional. + @@ -477,7 +491,13 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); never get decoded. Successful savepoints are folded into the transaction containing them in the order they were - executed within that transaction. + executed within that transaction. A transaction that is prepared for + a two-phase commit using PREPARE TRANSACTION will + also be decoded if the output plugin callbacks needed for decoding + them are provided. It is possible that the current transaction which + is being decoded is aborted concurrently via a ROLLBACK PREPARED + command. In that case, the logical decoding of this transaction will + be aborted too. @@ -578,6 +598,55 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, + + Transaction Prepare Callback + + + The required prepare_cb callback is called whenever + a transaction which is prepared for two-phase commit has been + decoded. The change_cb callbacks for all modified + rows will have been called before this, if there have been any modified + rows. + +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + + + + + + Transaction Commit Prepared Callback + + + The required commit_prepared_cb callback is called whenever + a transaction commit prepared has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + + + + + + Transaction Rollback Prepared Callback + + + The required rollback_prepared_cb callback is called whenever + a transaction rollback prepared has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr rollback_lsn); + + + + Change Callback @@ -587,7 +656,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, an INSERT, UPDATE, or DELETE. Even if the original command modified several rows at once the callback will be called individually for each - row. + row. The change_cb callback may access system or + user catalog tables to aid in the process of outputting the row + modification details. In case of decoding a prepared (but yet + uncommitted) transaction or decoding of an uncommitted transaction, this + change callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -664,6 +739,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct + + Prepare Filter Callback + + + The optional filter_prepare_cb callback + is called to determine whether data that is part of the current + two-phase commit transaction should be considered for decode + at this prepare stage or as a regular one-phase transaction at + COMMIT PREPARED time later. To signal that + decoding should be skipped, return true; + false otherwise. When the callback is not + defined, false is assumed (i.e. nothing is + filtered). + +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + + The ctx parameter has the same contents + as for the other callbacks. The txn parameter + contains meta information about the transaction. The xid + contains the XID because txn can be NULL in some cases. + The gid is the identifier that later identifies this + transaction for COMMIT PREPARED or ROLLBACK PREPARED. + + + The callback has to provide the same static answer for a given combination of + xid and gid every time it is + called. + + + Generic Message Callback @@ -685,7 +793,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, non-transactional and the XID was not assigned yet in the transaction which logged the message. The lsn has WAL location of the message. The transactional says - if the message was sent as transactional or not. + if the message was sent as transactional or not. Similar to the change + callback, in case of decoding a prepared (but yet uncommitted) + transaction or decoding of an uncommitted transaction, this message + callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. + The prefix is arbitrary null-terminated prefix which can be used for identifying interesting messages for the current plugin. And finally the message parameter holds @@ -735,6 +849,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + + Stream Prepare Callback + + The stream_prepare_cb callback is called to prepare + a previously streamed transaction as part of a two-phase commit. + +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + + + + Stream Commit Callback @@ -913,9 +1040,13 @@ OutputPluginWrite(ctx, true); When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by stream_start_cb and stream_stop_cb callbacks. Once all the decoded - changes are transmitted, the transaction is committed using the - stream_commit_cb callback (or possibly aborted using - the stream_abort_cb callback). + changes are transmitted, the transaction can be committed using the + the stream_commit_cb callback + (or possibly aborted using the stream_abort_cb callback). + If two-phase commits are supported, the transaction can be prepared using the + stream_prepare_cb callback, commit prepared using the + commit_prepared_cb callback or aborted using the + rollback_prepared_cb. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index d5cfbea..a4f8113 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -59,6 +59,14 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -74,6 +82,8 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr last_lsn); static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -207,6 +217,10 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->filter_prepare = filter_prepare_cb_wrapper; + ctx->reorder->prepare = prepare_cb_wrapper; + ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; + ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper; ctx->reorder->message = message_cb_wrapper; /* @@ -221,12 +235,27 @@ StartupDecodingContext(List *output_plugin_options, ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) || (ctx->callbacks.stream_stop_cb != NULL) || (ctx->callbacks.stream_abort_cb != NULL) || + (ctx->callbacks.stream_prepare_cb != NULL) || (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); /* + * To support two-phase logical decoding, we require prepare/commit-prepare/abort-prepare + * callbacks. The filter-prepare callback is optional. We however enable two-phase logical + * decoding when at least one of the methods is enabled so that we can easily identify + * missing methods. + * + * We decide it here, but only check it later in the wrappers. + */ + ctx->twophase = (ctx->callbacks.prepare_cb != NULL) || + (ctx->callbacks.commit_prepared_cb != NULL) || + (ctx->callbacks.rollback_prepared_cb != NULL) || + (ctx->callbacks.stream_prepare_cb != NULL) || + (ctx->callbacks.filter_prepare_cb != NULL); + + /* * streaming callbacks * * stream_message and stream_truncate callbacks are optional, so we do not @@ -237,6 +266,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_start = stream_start_cb_wrapper; ctx->reorder->stream_stop = stream_stop_cb_wrapper; ctx->reorder->stream_abort = stream_abort_cb_wrapper; + ctx->reorder->stream_prepare = stream_prepare_cb_wrapper; ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; @@ -783,6 +813,120 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* We're only supposed to call this when two-phase commits are supported */ + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "prepare"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* If the plugin supports two-phase commits then prepare callback is mandatory */ + if (ctx->callbacks.prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Output plugin did not register prepare_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* We're only supposed to call this when two-phase commits are supported */ + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "commit_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* If the plugin support two-phase commits then commit prepared callback is mandatory */ + if (ctx->callbacks.commit_prepared_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Output plugin did not register commit_prepared_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* We're only supposed to call this when two-phase commits are supported */ + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "rollback_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* If the plugin support two-phase commits then abort prepared callback is mandatory */ + if (ctx->callbacks.rollback_prepared_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Output plugin did not register rollback_prepared_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.rollback_prepared_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { @@ -859,6 +1003,51 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static bool +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + /* + * Skip if decoding of two-phase transactions at PREPARE time is not enabled. In that + * case all two-phase transactions are considered filtered out and will be + * applied as regular transactions at COMMIT PREPARED. + */ + if (!ctx->twophase) + return true; + + /* + * The filter_prepare callback is optional. When not supplied, all + * prepared transactions should go through. + */ + if (!ctx->callbacks.filter_prepare_cb) + return false; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_prepare"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + return ret; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { @@ -1057,6 +1246,46 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming and two-phase commits are supported. */ + Assert(ctx->streaming); + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_prepare"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + /* in streaming mode with two-phase commits, stream_prepare_cb is required */ + if (ctx->callbacks.stream_prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming commits requires a stream_prepare_cb callback"))); + + ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 40bab7e..7f4384b 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -85,6 +85,11 @@ typedef struct LogicalDecodingContext bool streaming; /* + * Does the output plugin support two-phase decoding, and is it enabled? + */ + bool twophase; + + /* * State for writing output. */ bool accept_writes; diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index b78c796..4c1341f 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -77,6 +77,39 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); + /* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare and + * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED and + * sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + /* * Called for the generic logical decoding messages. */ @@ -124,6 +157,14 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn); /* + * Called to prepare changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit. + */ +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* * Called to apply changes streamed to remote node from in-progress * transaction. */ @@ -171,12 +212,17 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; /* streaming of changes */ LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index dfdda93..becd20e 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "access/twophase.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -174,6 +175,9 @@ typedef struct ReorderBufferChange #define RBTXN_IS_STREAMED 0x0010 #define RBTXN_HAS_TOAST_INSERT 0x0020 #define RBTXN_HAS_SPEC_INSERT 0x0040 +#define RBTXN_PREPARE 0x0080 +#define RBTXN_COMMIT_PREPARED 0x0100 +#define RBTXN_ROLLBACK_PREPARED 0x0200 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -233,6 +237,24 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ ) +/* Has this transaction been prepared? */ +#define rbtxn_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ +) + +/* Has this prepared transaction been committed? */ +#define rbtxn_commit_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_COMMIT_PREPARED) != 0 \ +) + +/* Has this prepared transaction been rollbacked? */ +#define rbtxn_rollback_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_ROLLBACK_PREPARED) != 0 \ +) + typedef struct ReorderBufferTXN { /* See above */ @@ -244,6 +266,9 @@ typedef struct ReorderBufferTXN /* Xid of top-level transaction, if known */ TransactionId toplevel_xid; + /* In case of two-phase commit we need to pass GID to output plugin */ + char *gid; + /* * LSN of the first data carrying, WAL record with knowledge about this * xid. This is allowed to *not* be first record adorned with this xid, if @@ -405,6 +430,26 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +typedef bool (*ReorderBufferFilterPrepareCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* rollback prepared callback signature */ +typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + /* message callback signature */ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -431,6 +476,12 @@ typedef void (*ReorderBufferStreamAbortCB) ( ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + /* commit streamed transaction callback signature */ typedef void (*ReorderBufferStreamCommitCB) ( ReorderBuffer *rb, @@ -497,6 +548,10 @@ struct ReorderBuffer ReorderBufferApplyChangeCB apply_change; ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; + ReorderBufferFilterPrepareCB filter_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferRollbackPreparedCB rollback_prepared; ReorderBufferMessageCB message; /* @@ -505,6 +560,7 @@ struct ReorderBuffer ReorderBufferStreamStartCB stream_start; ReorderBufferStreamStopCB stream_stop; ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamPrepareCB stream_prepare; ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; -- 1.8.3.1