From 3716f5eee7130f9d40b42a59440fefe13849bb8a Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 9 Sep 2020 05:33:35 -0400 Subject: [PATCH 1/2] Support decoding of two-phase transactions at PREPARE Until now two-phase transactions were decoded at COMMIT, just like regular transaction. During replay, two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. This patch allows PREPARE-time decoding two-phase transactions (if the output plugin supports this capability), in which case the transactions are replayed at PREPARE and then committed later when COMMIT PREPARED arrives. On the subscriber, the transactions will be executed as two-phase transactions, with the same GID. This is important for various external transaction managers, that often encode information into the GID itself. Includes documentation changes. --- contrib/test_decoding/expected/prepared.out | 185 ++++++++++++++++++--- contrib/test_decoding/sql/prepared.sql | 77 ++++++++- contrib/test_decoding/test_decoding.c | 181 ++++++++++++++++++++ doc/src/sgml/logicaldecoding.sgml | 127 +++++++++++++- src/backend/replication/logical/decode.c | 141 ++++++++++++++-- src/backend/replication/logical/logical.c | 194 ++++++++++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 209 +++++++++++++++++++++--- src/include/replication/output_plugin.h | 46 ++++++ src/include/replication/reorderbuffer.h | 78 ++++++++- 9 files changed, 1165 insertions(+), 73 deletions(-) diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out index 46e915d..94fb0c9 100644 --- a/contrib/test_decoding/expected/prepared.out +++ b/contrib/test_decoding/expected/prepared.out @@ -6,19 +6,50 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d init (1 row) -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + PREPARE TRANSACTION 'test_prepared#1' +(3 rows) + COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +----------------------------------- + COMMIT PREPARED 'test_prepared#1' +(1 row) + INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:2 + COMMIT + BEGIN + table public.test_prepared1: INSERT: id[integer]:3 + PREPARE TRANSACTION 'test_prepared#2' +(6 rows) + ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------- + ROLLBACK PREPARED 'test_prepared#2' +(1 row) + INSERT INTO test_prepared1 VALUES (4); -- test prepared xact containing ddl BEGIN; @@ -26,45 +57,149 @@ INSERT INTO test_prepared1 VALUES (5); ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. -INSERT INTO test_prepared2 VALUES (7); -COMMIT PREPARED 'test_prepared#3'; --- make sure stuff still works -INSERT INTO test_prepared1 VALUES (8); -INSERT INTO test_prepared2 VALUES (9); --- cleanup -DROP TABLE test_prepared1; -DROP TABLE test_prepared2; --- show results +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + relation | locktype | mode +-----------------+----------+--------------------- + test_prepared_1 | relation | RowExclusiveLock + test_prepared_1 | relation | AccessExclusiveLock +(2 rows) + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------------------------------------------------------------------------- BEGIN - table public.test_prepared1: INSERT: id[integer]:1 - COMMIT - BEGIN - table public.test_prepared1: INSERT: id[integer]:2 - COMMIT - BEGIN table public.test_prepared1: INSERT: id[integer]:4 COMMIT BEGIN - table public.test_prepared2: INSERT: id[integer]:7 - COMMIT - BEGIN table public.test_prepared1: INSERT: id[integer]:5 table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' + PREPARE TRANSACTION 'test_prepared#3' +(7 rows) + +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- +INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared2: INSERT: id[integer]:7 COMMIT +(3 rows) + +COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +----------------------------------- + COMMIT PREPARED 'test_prepared#3' +(1 row) + +-- make sure stuff still works +INSERT INTO test_prepared1 VALUES (8); +INSERT INTO test_prepared2 VALUES (9); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------- BEGIN table public.test_prepared1: INSERT: id[integer]:8 data[text]:null COMMIT BEGIN table public.test_prepared2: INSERT: id[integer]:9 COMMIT -(22 rows) +(6 rows) + +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; +BEGIN; +insert into test_prepared2 values (12); +PREPARE TRANSACTION 'test_prepared_lock2'; +COMMIT PREPARED 'test_prepared_lock2'; +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + relation | locktype | mode +----------+----------+------ +(0 rows) + +-- Shouldn't timeout on 2pc decoding. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2' + PREPARE TRANSACTION 'test_prepared_lock' + BEGIN + table public.test_prepared2: INSERT: id[integer]:12 + PREPARE TRANSACTION 'test_prepared_lock2' + COMMIT PREPARED 'test_prepared_lock2' +(8 rows) + +RESET statement_timeout; +COMMIT PREPARED 'test_prepared_lock'; +-- will work normally after we commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------- + COMMIT PREPARED 'test_prepared_lock' +(1 row) + +-- test savepoints +BEGIN; +SAVEPOINT test_savepoint; +CREATE TABLE test_prepared_savepoint (a int); +PREPARE TRANSACTION 'test_prepared_savepoint'; +COMMIT PREPARED 'test_prepared_savepoint'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------- + COMMIT PREPARED 'test_prepared_savepoint' +(1 row) + +-- test that a GID containing "nodecode" gets decoded at commit prepared time +BEGIN; +INSERT INTO test_prepared1 VALUES (20); +PREPARE TRANSACTION 'test_prepared_nodecode'; +-- should show nothing +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +COMMIT PREPARED 'test_prepared_nodecode'; +-- should be decoded now +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:20 data[text]:null + COMMIT +(3 rows) + +-- cleanup +DROP TABLE test_prepared1; +DROP TABLE test_prepared2; +-- show results. There should be nothing to show +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql index e726397..ca801e4 100644 --- a/contrib/test_decoding/sql/prepared.sql +++ b/contrib/test_decoding/sql/prepared.sql @@ -2,21 +2,25 @@ SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); INSERT INTO test_prepared1 VALUES (4); @@ -27,24 +31,83 @@ ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- make sure stuff still works INSERT INTO test_prepared1 VALUES (8); INSERT INTO test_prepared2 VALUES (9); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; + +BEGIN; +insert into test_prepared2 values (12); +PREPARE TRANSACTION 'test_prepared_lock2'; +COMMIT PREPARED 'test_prepared_lock2'; + +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + +-- Shouldn't timeout on 2pc decoding. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +RESET statement_timeout; + +COMMIT PREPARED 'test_prepared_lock'; + +-- will work normally after we commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- test savepoints +BEGIN; +SAVEPOINT test_savepoint; +CREATE TABLE test_prepared_savepoint (a int); +PREPARE TRANSACTION 'test_prepared_savepoint'; +COMMIT PREPARED 'test_prepared_savepoint'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- test that a GID containing "nodecode" gets decoded at commit prepared time +BEGIN; +INSERT INTO test_prepared1 VALUES (20); +PREPARE TRANSACTION 'test_prepared_nodecode'; +-- should show nothing +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +COMMIT PREPARED 'test_prepared_nodecode'; +-- should be decoded now +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- cleanup DROP TABLE test_prepared1; DROP TABLE test_prepared2; --- show results +-- show results. There should be nothing to show SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 3474515..d5438c5 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -11,12 +11,16 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "miscadmin.h" +#include "access/transam.h" #include "catalog/pg_type.h" #include "replication/logical.h" #include "replication/origin.h" +#include "storage/procarray.h" + #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -36,6 +40,7 @@ typedef struct bool skip_empty_xacts; bool xact_wrote_changes; bool only_local; + TransactionId check_xid; /* track abort of this txid */ } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -49,6 +54,8 @@ static void pg_output_begin(LogicalDecodingContext *ctx, bool last_write); static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pg_decode_abort_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr abort_lsn); static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); @@ -84,6 +91,19 @@ static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + void _PG_init(void) @@ -102,6 +122,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pg_decode_change; cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; + cb->abort_cb = pg_decode_abort_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; @@ -112,6 +133,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; cb->stream_truncate_cb = pg_decode_stream_truncate; + cb->filter_prepare_cb = pg_decode_filter_prepare; + cb->prepare_cb = pg_decode_prepare_txn; + cb->commit_prepared_cb = pg_decode_commit_prepared_txn; + cb->abort_prepared_cb = pg_decode_abort_prepared_txn; + } @@ -132,11 +158,14 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->check_xid = InvalidTransactionId; ctx->output_plugin_private = data; opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; opt->receive_rewrites = false; + /* this plugin supports decoding of 2pc */ + opt->enable_twophase = true; foreach(option, ctx->output_plugin_options) { @@ -223,6 +252,32 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "check-xid") == 0) + { + if (elem->arg) + { + errno = 0; + data->check_xid = (TransactionId) + strtoul(strVal(elem->arg), NULL, 0); + + if (errno == EINVAL || errno == ERANGE) + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid is not a valid number: \"%s\"", + strVal(elem->arg)))); + } + else + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid needs an input value"))); + + if (data->check_xid <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Specify positive value for parameter \"%s\"," + " you specified \"%s\"", + elem->defname, strVal(elem->arg)))); + } else { ereport(ERROR, @@ -293,6 +348,116 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +/* ABORT callback */ +static void +pg_decode_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "ABORT %u", txn->xid); + else + appendStringInfoString(ctx->out, "ABORT"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* + * Filter out two-phase transactions. + * + * Each plugin can implement its own filtering logic. Here + * we demonstrate a simple logic by checking the GID. If the + * GID contains the "_nodecode" substring, then we filter + * it out. + */ +static bool +pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + if (strstr(gid, "_nodecode") != NULL) + return true; + + return false; +} + +/* PREPARE callback */ +static void +pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "PREPARE TRANSACTION %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* COMMIT PREPARED callback */ +static void +pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "COMMIT PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* ABORT PREPARED callback */ +static void +pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "ROLLBACK PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) @@ -451,6 +616,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } data->xact_wrote_changes = true; + /* if check_xid is specified */ + if (TransactionIdIsValid(data->check_xid)) + { + elog(LOG, "waiting for %u to abort", data->check_xid); + while (TransactionIdIsInProgress(data->check_xid)) + { + CHECK_FOR_INTERRUPTS(); + pg_usleep(10000L); + } + if (!TransactionIdIsInProgress(data->check_xid) && + !TransactionIdDidCommit(data->check_xid)) + elog(LOG, "%u aborted", data->check_xid); + + Assert(TransactionIdDidAbort(data->check_xid)); + } + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 8d4fdf6..22ee70f 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -386,7 +386,12 @@ typedef struct OutputPluginCallbacks LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeAbortCB abort_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeStreamStartCB stream_start_cb; @@ -477,7 +482,13 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); never get decoded. Successful savepoints are folded into the transaction containing them in the order they were - executed within that transaction. + executed within that transaction. A transaction that is prepared for + a two-phase commit using PREPARE TRANSACTION will + also be decoded if the output plugin callbacks needed for decoding + them are provided. It is possible that the current transaction which + is being decoded is aborted concurrently via a ROLLBACK PREPARED + command. In that case, the logical decoding of this transaction will + be aborted too. @@ -578,6 +589,71 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, + + Transaction Prepare Callback + + + The optional prepare_cb callback is called whenever + a transaction which is prepared for two-phase commit has been + decoded. The change_cb callbacks for all modified + rows will have been called before this, if there have been any modified + rows. + +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + + + + + + Commit Prepared Transaction Callback + + + The optional commit_prepared_cb callback is called whenever + a commit prepared transaction has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + + + + + + Rollback Prepared Transaction Callback + + + The optional abort_prepared_cb callback is called whenever + a rollback prepared transaction has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + + + Transaction Abort Callback + + + The required abort_cb callback is called whenever + a transaction abort has to be initiated. This can happen if we are + decoding a transaction that has been prepared for two-phase commit and + a concurrent rollback happens while we are decoding it. + +typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + Change Callback @@ -587,7 +663,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, an INSERT, UPDATE, or DELETE. Even if the original command modified several rows at once the callback will be called individually for each - row. + row. The change_cb callback may access system or + user catalog tables to aid in the process of outputting the row + modification details. In case of decoding a prepared (but yet + uncommitted) transaction or decoding of an uncommitted transaction, this + change callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -664,6 +746,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct + + Prepare Filter Callback + + + The optional filter_prepare_cb callback + is called to determine whether data that is part of the current + two-phase commit transaction should be considered for decode + at this prepare stage or as a regular one-phase transaction at + COMMIT PREPARED time later. To signal that + decoding should be skipped, return true; + false otherwise. When the callback is not + defined, false is assumed (i.e. nothing is + filtered). + +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + + The ctx parameter has the same contents + as for the other callbacks. The txn parameter + contains meta information about the transaction. The xid + contains the XID because txn can be NULL in some cases. + The gid is the identifier that later identifies this + transaction for COMMIT PREPARED or ROLLBACK PREPARED. + + + The callback has to provide the same static answer for a given combination of + xid and gid every time it is + called. + + + Generic Message Callback @@ -685,7 +800,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, non-transactional and the XID was not assigned yet in the transaction which logged the message. The lsn has WAL location of the message. The transactional says - if the message was sent as transactional or not. + if the message was sent as transactional or not. Similar to the change + callback, in case of decoding a prepared (but yet uncommitted) + transaction or decoding of an uncommitted transaction, this message + callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. + The prefix is arbitrary null-terminated prefix which can be used for identifying interesting messages for the current plugin. And finally the message parameter holds diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index f21f61d..63d5acf 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -70,6 +70,9 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -312,17 +315,34 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } break; case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; + xl_xact_prepare *xlrec; + /* check that output plugin is capable of twophase decoding */ + if (!ctx->options.enable_twophase) + { + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + break; + } - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); - break; + /* ok, parse it */ + xlrec = (xl_xact_prepare *)XLogRecGetData(r); + ParsePrepareRecord(XLogRecGetInfo(buf->record), + xlrec, &parsed); + + /* does output plugin want this particular transaction? */ + if (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid, + parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } @@ -647,9 +667,82 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + /* + * Decide if we're processing COMMIT PREPARED, or a regular COMMIT. + * Regular commit simply triggers a replay of transaction changes from the + * reorder buffer. For COMMIT PREPARED that however already happened at + * PREPARE time, and so we only need to notify the subscriber that the GID + * finally committed. + * + * For output plugins that do not support PREPARE-time decoding of + * two-phase transactions, we never even see the PREPARE and all two-phase + * transactions simply fall through to the second branch. + */ + if (TransactionIdIsValid(parsed->twophase_xid) && + ReorderBufferTxnIsPrepared(ctx->reorder, + parsed->twophase_xid, parsed->twophase_gid)) + { + Assert(xid == parsed->twophase_xid); + /* we are processing COMMIT PREPARED */ + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, true); + } + else + { + /* replay actions of all transaction + subtransactions in order */ + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } +} + +/* + * Decode PREPARE record. Similar logic as in COMMIT + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed) +{ + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz commit_time = parsed->origin_timestamp; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + /* + * Process invalidation messages, even if we're not interested in the + * transaction's contents, since the various caches need to always be + * consistent. + */ + if (parsed->nmsgs > 0) + { + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + } + + /* + * Tell the reorderbuffer about the surviving subtransactions. We need to + * do this because the main transaction itself has not committed since we + * are in the prepare phase right now. So we need to be sure the snapshot + * is setup correctly for the main transaction in case all changes + * happened in subtransanctions + */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + return; + /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid); } /* @@ -661,6 +754,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = 0; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + + /* + * If it's ROLLBACK PREPARED then handle it via callbacks. + */ + if (TransactionIdIsValid(xid) && + !SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) && + parsed->dbId == ctx->slot->data.database && + !FilterByOrigin(ctx, origin_id) && + ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid)) + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, false); + return; + } for (i = 0; i < parsed->nsubxacts; i++) { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 0f6af95..a569ab8 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -58,6 +58,16 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -206,6 +216,11 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->abort = abort_cb_wrapper; + ctx->reorder->filter_prepare = filter_prepare_cb_wrapper; + ctx->reorder->prepare = prepare_cb_wrapper; + ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; + ctx->reorder->abort_prepared = abort_prepared_cb_wrapper; ctx->reorder->message = message_cb_wrapper; /* @@ -782,6 +797,140 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort"; + state.report_location = txn->final_lsn; /* beginning of abort record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.abort_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "prepare"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* If the plugin support 2 phase commits then prepare callback is mandatory */ + if (ctx->options.enable_twophase && ctx->callbacks.prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Output plugin did not register prepare_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "commit_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* If the plugin support 2 phase commits then commit prepared callback is mandatory */ + if (ctx->options.enable_twophase && ctx->callbacks.commit_prepared_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Output plugin did not register commit_prepared_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* If the plugin support 2 phase commits then abort prepared callback is mandatory */ + if (ctx->options.enable_twophase && ctx->callbacks.abort_prepared_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Output plugin did not register abort_prepared_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { @@ -858,6 +1007,51 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static bool +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + /* + * Skip if decoding of twophase at PREPARE time is not enabled. In that + * case all twophase transactions are considered filtered out and will be + * applied as regular transactions at COMMIT PREPARED. + */ + if (!ctx->options.enable_twophase) + return true; + + /* + * The filter_prepare callback is optional. When not supplied, all + * prepared transactions should go through. + */ + if (!ctx->callbacks.filter_prepare_cb) + return false; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_prepare"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + return ret; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1975d62..d6556be 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -413,6 +413,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* free data that's contained */ + if (txn->gid != NULL) + { + pfree(txn->gid); + txn->gid = NULL; + } if (txn->tuplecid_hash != NULL) { @@ -1987,7 +1992,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, prev_lsn = change->lsn; /* Set the current xid to detect concurrent aborts. */ - if (streaming) + if (streaming || rbtxn_prepared(change->txn)) { curtxn = change->txn; SetupCheckXidLive(curtxn->xid); @@ -2249,7 +2254,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } } - /* * There's a speculative insertion remaining, just clean in up, it * can't have been successful, otherwise we'd gotten a confirmation @@ -2278,7 +2282,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } else - rb->commit(rb, txn, commit_lsn); + { + /* + * Call abort/commit/prepare callback, depending on the transaction + * state. + * + * If the transaction aborted during apply (which currently can happen + * only for prepared transactions), simply call the abort callback. + * + * Otherwise call either PREPARE (for twophase transactions) or COMMIT + * (for regular ones). + */ + if (rbtxn_rollback(txn)) + rb->abort(rb, txn, commit_lsn); + else if (rbtxn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -2361,8 +2382,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * This error can only occur when we are sending the data in * streaming mode and the streaming is not finished yet. */ - Assert(streaming); - Assert(stream_started); + Assert(streaming || rbtxn_prepared(txn)); + Assert(stream_started || rbtxn_prepared(txn)); /* Cleanup the temporary error state. */ FlushErrorState(); @@ -2370,10 +2391,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, errdata = NULL; curtxn->concurrent_abort = true; - /* Reset the TXN so that it is allowed to stream remaining data. */ - ReorderBufferResetTXN(rb, txn, snapshot_now, - command_id, prev_lsn, - specinsert); + /* If streaming, reset the TXN so that it is allowed to stream remaining data. */ + if (streaming && stream_started) + { + ReorderBufferResetTXN(rb, txn, snapshot_now, + command_id, prev_lsn, + specinsert); + } + else + { + elog(LOG, "stopping decoding of %s (%u)", + txn->gid[0] != '\0'? txn->gid:"", txn->xid); + rb->abort(rb, txn, commit_lsn); + } } else { @@ -2395,23 +2425,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * This interface is called once a toplevel commit is read for both streamed * as well as non-streamed transactions. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, - XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) +static void +ReorderBufferCommitInternal(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; Snapshot snapshot_now; CommandId command_id = FirstCommandId; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -2453,6 +2476,140 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } /* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +bool +ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + return rb->filter_prepare(rb, txn, xid, gid); +} + + +/* + * Commit a transaction. + * + * See comments for ReorderBufferCommitInternal() + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Prepare a twophase transaction. It calls ReorderBufferCommitInternal() + * since all prepared transactions need to be decoded at PREPARE time. + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Check whether this transaction was sent as prepared to subscribers. + * Called while handling commit|abort prepared. + */ +bool +ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* + * Always call the prepare filter. It's the job of the prepare filter to + * give us the *same* response for a given xid across multiple calls + * (including ones on restart) + */ + return !(rb->filter_prepare(rb, txn, xid, gid)); +} + +/* + * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + + /* + * The transaction may or may not exist (during restarts for example). + * Anyways, 2PC transactions do not contain any reorderbuffers. So allow + * it to be created below. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, + true); + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + /* this txn is obviously prepared */ + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + if (is_commit) + { + txn->txn_flags |= RBTXN_COMMIT_PREPARED; + rb->commit_prepared(rb, txn, commit_lsn); + } + else + { + txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; + rb->abort_prepared(rb, txn, commit_lsn); + } + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferCleanupTXN(rb, txn); +} + +/* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. * @@ -2495,7 +2652,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* cosmetic... */ txn->final_lsn = lsn; - /* remove potential on-disk data, and deallocate */ + /* + * remove potential on-disk data, and deallocate. + * + * We remove it even for prepared transactions (GID is enough to + * commit/abort those later). + */ + ReorderBufferCleanupTXN(rb, txn); } diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index b78c796..f6ca87f 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -27,6 +27,7 @@ typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; + bool enable_twophase; } OutputPluginOptions; /* @@ -78,6 +79,46 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, XLogRecPtr commit_lsn); /* + * Called for an implicit ABORT of a transaction. + */ +typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + /* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare and + * commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED and + * sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* * Called for the generic logical decoding messages. */ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, @@ -170,7 +211,12 @@ typedef struct OutputPluginCallbacks LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeAbortCB abort_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; /* streaming of changes */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1ae17d5..820840a 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "access/twophase.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -162,9 +163,14 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 -#define RBTXN_IS_STREAMED 0x0008 -#define RBTXN_HAS_TOAST_INSERT 0x0010 -#define RBTXN_HAS_SPEC_INSERT 0x0020 +#define RBTXN_PREPARE 0x0008 +#define RBTXN_COMMIT_PREPARED 0x0010 +#define RBTXN_ROLLBACK_PREPARED 0x0020 +#define RBTXN_COMMIT 0x0040 +#define RBTXN_ROLLBACK 0x0080 +#define RBTXN_IS_STREAMED 0x0100 +#define RBTXN_HAS_TOAST_INSERT 0x0200 +#define RBTXN_HAS_SPEC_INSERT 0x0400 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -218,6 +224,17 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ ) +/* is this txn prepared? */ +#define rbtxn_prepared(txn) (txn->txn_flags & RBTXN_PREPARE) +/* was this prepared txn committed in the meanwhile? */ +#define rbtxn_commit_prepared(txn) (txn->txn_flags & RBTXN_COMMIT_PREPARED) +/* was this prepared txn aborted in the meanwhile? */ +#define rbtxn_rollback_prepared(txn) (txn->txn_flags & RBTXN_ROLLBACK_PREPARED) +/* was this txn committed in the meanwhile? */ +#define rbtxn_commit(txn) (txn->txn_flags & RBTXN_COMMIT) +/* was this prepared txn aborted in the meanwhile? */ +#define rbtxn_rollback(txn) (txn->txn_flags & RBTXN_ROLLBACK) + typedef struct ReorderBufferTXN { /* See above */ @@ -229,6 +246,9 @@ typedef struct ReorderBufferTXN /* Xid of top-level transaction, if known */ TransactionId toplevel_xid; + /* In case of 2PC we need to pass GID to output plugin */ + char *gid; + /* * LSN of the first data carrying, WAL record with knowledge about this * xid. This is allowed to *not* be first record adorned with this xid, if @@ -390,6 +410,39 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* abort callback signature */ +typedef void (*ReorderBufferAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +typedef bool (*ReorderBufferFilterPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* abort prepared callback signature */ +typedef void (*ReorderBufferAbortPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + /* message callback signature */ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -482,6 +535,11 @@ struct ReorderBuffer ReorderBufferApplyChangeCB apply_change; ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; + ReorderBufferAbortCB abort; + ReorderBufferFilterPrepareCB filter_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferAbortPreparedCB abort_prepared; ReorderBufferMessageCB message; /* @@ -548,6 +606,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); @@ -571,6 +634,15 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, + const char *gid); +bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); -- 1.8.3.1