From f175d1360bec3646315ccbf5699748448704a38b Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Fri, 18 Sep 2020 07:51:49 -0400 Subject: [PATCH v4] Support decoding of two-phase transactions Until now two-phase transactions were decoded at COMMIT, just like regular transaction. During replay, two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. This patch allows PREPARE-time decoding two-phase transactions (if the output plugin supports this capability), in which case the transactions are replayed at PREPARE and then committed later when COMMIT PREPARED arrives. Includes documentation changes. --- contrib/test_decoding/expected/prepared.out | 185 ++++++++++++-- contrib/test_decoding/sql/prepared.sql | 77 +++++- contrib/test_decoding/test_decoding.c | 154 ++++++++++++ doc/src/sgml/logicaldecoding.sgml | 110 ++++++++- src/backend/replication/logical/decode.c | 141 ++++++++++- src/backend/replication/logical/logical.c | 175 ++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 309 +++++++++++++++++++++--- src/include/replication/logical.h | 5 + src/include/replication/output_plugin.h | 37 +++ src/include/replication/reorderbuffer.h | 75 +++++- 10 files changed, 1183 insertions(+), 85 deletions(-) diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out index 46e915d..94fb0c9 100644 --- a/contrib/test_decoding/expected/prepared.out +++ b/contrib/test_decoding/expected/prepared.out @@ -6,19 +6,50 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d init (1 row) -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + PREPARE TRANSACTION 'test_prepared#1' +(3 rows) + COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +----------------------------------- + COMMIT PREPARED 'test_prepared#1' +(1 row) + INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:2 + COMMIT + BEGIN + table public.test_prepared1: INSERT: id[integer]:3 + PREPARE TRANSACTION 'test_prepared#2' +(6 rows) + ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------- + ROLLBACK PREPARED 'test_prepared#2' +(1 row) + INSERT INTO test_prepared1 VALUES (4); -- test prepared xact containing ddl BEGIN; @@ -26,45 +57,149 @@ INSERT INTO test_prepared1 VALUES (5); ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. -INSERT INTO test_prepared2 VALUES (7); -COMMIT PREPARED 'test_prepared#3'; --- make sure stuff still works -INSERT INTO test_prepared1 VALUES (8); -INSERT INTO test_prepared2 VALUES (9); --- cleanup -DROP TABLE test_prepared1; -DROP TABLE test_prepared2; --- show results +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + relation | locktype | mode +-----------------+----------+--------------------- + test_prepared_1 | relation | RowExclusiveLock + test_prepared_1 | relation | AccessExclusiveLock +(2 rows) + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------------------------------------------------------------------------- BEGIN - table public.test_prepared1: INSERT: id[integer]:1 - COMMIT - BEGIN - table public.test_prepared1: INSERT: id[integer]:2 - COMMIT - BEGIN table public.test_prepared1: INSERT: id[integer]:4 COMMIT BEGIN - table public.test_prepared2: INSERT: id[integer]:7 - COMMIT - BEGIN table public.test_prepared1: INSERT: id[integer]:5 table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' + PREPARE TRANSACTION 'test_prepared#3' +(7 rows) + +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- +INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared2: INSERT: id[integer]:7 COMMIT +(3 rows) + +COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +----------------------------------- + COMMIT PREPARED 'test_prepared#3' +(1 row) + +-- make sure stuff still works +INSERT INTO test_prepared1 VALUES (8); +INSERT INTO test_prepared2 VALUES (9); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------- BEGIN table public.test_prepared1: INSERT: id[integer]:8 data[text]:null COMMIT BEGIN table public.test_prepared2: INSERT: id[integer]:9 COMMIT -(22 rows) +(6 rows) + +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; +BEGIN; +insert into test_prepared2 values (12); +PREPARE TRANSACTION 'test_prepared_lock2'; +COMMIT PREPARED 'test_prepared_lock2'; +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + relation | locktype | mode +----------+----------+------ +(0 rows) + +-- Shouldn't timeout on 2pc decoding. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2' + PREPARE TRANSACTION 'test_prepared_lock' + BEGIN + table public.test_prepared2: INSERT: id[integer]:12 + PREPARE TRANSACTION 'test_prepared_lock2' + COMMIT PREPARED 'test_prepared_lock2' +(8 rows) + +RESET statement_timeout; +COMMIT PREPARED 'test_prepared_lock'; +-- will work normally after we commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------- + COMMIT PREPARED 'test_prepared_lock' +(1 row) + +-- test savepoints +BEGIN; +SAVEPOINT test_savepoint; +CREATE TABLE test_prepared_savepoint (a int); +PREPARE TRANSACTION 'test_prepared_savepoint'; +COMMIT PREPARED 'test_prepared_savepoint'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------- + COMMIT PREPARED 'test_prepared_savepoint' +(1 row) + +-- test that a GID containing "nodecode" gets decoded at commit prepared time +BEGIN; +INSERT INTO test_prepared1 VALUES (20); +PREPARE TRANSACTION 'test_prepared_nodecode'; +-- should show nothing +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +COMMIT PREPARED 'test_prepared_nodecode'; +-- should be decoded now +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:20 data[text]:null + COMMIT +(3 rows) + +-- cleanup +DROP TABLE test_prepared1; +DROP TABLE test_prepared2; +-- show results. There should be nothing to show +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql index e726397..ca801e4 100644 --- a/contrib/test_decoding/sql/prepared.sql +++ b/contrib/test_decoding/sql/prepared.sql @@ -2,21 +2,25 @@ SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); INSERT INTO test_prepared1 VALUES (4); @@ -27,24 +31,83 @@ ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- make sure stuff still works INSERT INTO test_prepared1 VALUES (8); INSERT INTO test_prepared2 VALUES (9); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; + +BEGIN; +insert into test_prepared2 values (12); +PREPARE TRANSACTION 'test_prepared_lock2'; +COMMIT PREPARED 'test_prepared_lock2'; + +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + +-- Shouldn't timeout on 2pc decoding. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +RESET statement_timeout; + +COMMIT PREPARED 'test_prepared_lock'; + +-- will work normally after we commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- test savepoints +BEGIN; +SAVEPOINT test_savepoint; +CREATE TABLE test_prepared_savepoint (a int); +PREPARE TRANSACTION 'test_prepared_savepoint'; +COMMIT PREPARED 'test_prepared_savepoint'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- test that a GID containing "nodecode" gets decoded at commit prepared time +BEGIN; +INSERT INTO test_prepared1 VALUES (20); +PREPARE TRANSACTION 'test_prepared_nodecode'; +-- should show nothing +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +COMMIT PREPARED 'test_prepared_nodecode'; +-- should be decoded now +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- cleanup DROP TABLE test_prepared1; DROP TABLE test_prepared2; --- show results +-- show results. There should be nothing to show SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index e60ab34..185a70e 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -11,12 +11,16 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "miscadmin.h" +#include "access/transam.h" #include "catalog/pg_type.h" #include "replication/logical.h" #include "replication/origin.h" +#include "storage/procarray.h" + #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -36,6 +40,7 @@ typedef struct bool skip_empty_xacts; bool xact_wrote_changes; bool only_local; + TransactionId check_xid; /* track abort of this txid */ } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -88,6 +93,19 @@ static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + void _PG_init(void) @@ -116,6 +134,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; cb->stream_truncate_cb = pg_decode_stream_truncate; + cb->filter_prepare_cb = pg_decode_filter_prepare; + cb->prepare_cb = pg_decode_prepare_txn; + cb->commit_prepared_cb = pg_decode_commit_prepared_txn; + cb->abort_prepared_cb = pg_decode_abort_prepared_txn; + } @@ -136,6 +159,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->check_xid = InvalidTransactionId; ctx->output_plugin_private = data; @@ -227,6 +251,32 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "check-xid") == 0) + { + if (elem->arg) + { + errno = 0; + data->check_xid = (TransactionId) + strtoul(strVal(elem->arg), NULL, 0); + + if (errno == EINVAL || errno == ERANGE) + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid is not a valid number: \"%s\"", + strVal(elem->arg)))); + } + else + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid needs an input value"))); + + if (data->check_xid <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Specify positive value for parameter \"%s\"," + " you specified \"%s\"", + elem->defname, strVal(elem->arg)))); + } else { ereport(ERROR, @@ -297,6 +347,94 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } + +/* + * Filter out two-phase transactions. + * + * Each plugin can implement its own filtering logic. Here + * we demonstrate a simple logic by checking the GID. If the + * GID contains the "_nodecode" substring, then we filter + * it out. + */ +static bool +pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + if (strstr(gid, "_nodecode") != NULL) + return true; + + return false; +} + +/* PREPARE callback */ +static void +pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "PREPARE TRANSACTION %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* COMMIT PREPARED callback */ +static void +pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "COMMIT PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* ABORT PREPARED callback */ +static void +pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "ROLLBACK PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) @@ -455,6 +593,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } data->xact_wrote_changes = true; + /* if check_xid is specified */ + if (TransactionIdIsValid(data->check_xid)) + { + elog(LOG, "waiting for %u to abort", data->check_xid); + while (TransactionIdIsInProgress(data->check_xid)) + { + CHECK_FOR_INTERRUPTS(); + pg_usleep(10000L); + } + if (!TransactionIdIsInProgress(data->check_xid) && + !TransactionIdDidCommit(data->check_xid)) + elog(LOG, "%u aborted", data->check_xid); + + Assert(TransactionIdDidAbort(data->check_xid)); + } + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 813a037..bd4542e 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -387,6 +387,10 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeStreamStartCB stream_start_cb; @@ -477,7 +481,13 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); never get decoded. Successful savepoints are folded into the transaction containing them in the order they were - executed within that transaction. + executed within that transaction. A transaction that is prepared for + a two-phase commit using PREPARE TRANSACTION will + also be decoded if the output plugin callbacks needed for decoding + them are provided. It is possible that the current transaction which + is being decoded is aborted concurrently via a ROLLBACK PREPARED + command. In that case, the logical decoding of this transaction will + be aborted too. @@ -578,6 +588,55 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, + + Transaction Prepare Callback + + + The optional prepare_cb callback is called whenever + a transaction which is prepared for two-phase commit has been + decoded. The change_cb callbacks for all modified + rows will have been called before this, if there have been any modified + rows. + +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + + + + + + Commit Prepared Transaction Callback + + + The optional commit_prepared_cb callback is called whenever + a commit prepared transaction has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + + + + + + Rollback Prepared Transaction Callback + + + The optional abort_prepared_cb callback is called whenever + a rollback prepared transaction has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + Change Callback @@ -587,7 +646,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, an INSERT, UPDATE, or DELETE. Even if the original command modified several rows at once the callback will be called individually for each - row. + row. The change_cb callback may access system or + user catalog tables to aid in the process of outputting the row + modification details. In case of decoding a prepared (but yet + uncommitted) transaction or decoding of an uncommitted transaction, this + change callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -664,6 +729,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct + + Prepare Filter Callback + + + The optional filter_prepare_cb callback + is called to determine whether data that is part of the current + two-phase commit transaction should be considered for decode + at this prepare stage or as a regular one-phase transaction at + COMMIT PREPARED time later. To signal that + decoding should be skipped, return true; + false otherwise. When the callback is not + defined, false is assumed (i.e. nothing is + filtered). + +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + + The ctx parameter has the same contents + as for the other callbacks. The txn parameter + contains meta information about the transaction. The xid + contains the XID because txn can be NULL in some cases. + The gid is the identifier that later identifies this + transaction for COMMIT PREPARED or ROLLBACK PREPARED. + + + The callback has to provide the same static answer for a given combination of + xid and gid every time it is + called. + + + Generic Message Callback @@ -685,7 +783,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, non-transactional and the XID was not assigned yet in the transaction which logged the message. The lsn has WAL location of the message. The transactional says - if the message was sent as transactional or not. + if the message was sent as transactional or not. Similar to the change + callback, in case of decoding a prepared (but yet uncommitted) + transaction or decoding of an uncommitted transaction, this message + callback might also error out due to simultaneous rollback of + this very same transaction. In that case, the logical decoding of this + aborted transaction is stopped gracefully. + The prefix is arbitrary null-terminated prefix which can be used for identifying interesting messages for the current plugin. And finally the message parameter holds diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index f21f61d..c0b0bce 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -70,6 +70,9 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -312,17 +315,34 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } break; case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; + xl_xact_prepare *xlrec; + /* check that output plugin is capable of twophase decoding */ + if (!ctx->enable_twophase) + { + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + break; + } - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); - break; + /* ok, parse it */ + xlrec = (xl_xact_prepare *)XLogRecGetData(r); + ParsePrepareRecord(XLogRecGetInfo(buf->record), + xlrec, &parsed); + + /* does output plugin want this particular transaction? */ + if (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid, + parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } @@ -647,9 +667,82 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + /* + * Decide if we're processing COMMIT PREPARED, or a regular COMMIT. + * Regular commit simply triggers a replay of transaction changes from the + * reorder buffer. For COMMIT PREPARED that however already happened at + * PREPARE time, and so we only need to notify the subscriber that the GID + * finally committed. + * + * For output plugins that do not support PREPARE-time decoding of + * two-phase transactions, we never even see the PREPARE and all two-phase + * transactions simply fall through to the second branch. + */ + if (TransactionIdIsValid(parsed->twophase_xid) && + ReorderBufferTxnIsPrepared(ctx->reorder, + parsed->twophase_xid, parsed->twophase_gid)) + { + Assert(xid == parsed->twophase_xid); + /* we are processing COMMIT PREPARED */ + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, true); + } + else + { + /* replay actions of all transaction + subtransactions in order */ + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } +} + +/* + * Decode PREPARE record. Similar logic as in COMMIT + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed) +{ + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz commit_time = parsed->origin_timestamp; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + /* + * Process invalidation messages, even if we're not interested in the + * transaction's contents, since the various caches need to always be + * consistent. + */ + if (parsed->nmsgs > 0) + { + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + } + + /* + * Tell the reorderbuffer about the surviving subtransactions. We need to + * do this because the main transaction itself has not committed since we + * are in the prepare phase right now. So we need to be sure the snapshot + * is setup correctly for the main transaction in case all changes + * happened in subtransanctions + */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + return; + /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid); } /* @@ -661,6 +754,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = 0; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + + /* + * If it's ROLLBACK PREPARED then handle it via callbacks. + */ + if (TransactionIdIsValid(xid) && + !SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) && + parsed->dbId == ctx->slot->data.database && + !FilterByOrigin(ctx, origin_id) && + ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid)) + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, false); + return; + } for (i = 0; i < parsed->nsubxacts; i++) { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 0f6af95..4e95337 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -58,6 +58,14 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -206,6 +214,10 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->filter_prepare = filter_prepare_cb_wrapper; + ctx->reorder->prepare = prepare_cb_wrapper; + ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; + ctx->reorder->abort_prepared = abort_prepared_cb_wrapper; ctx->reorder->message = message_cb_wrapper; /* @@ -225,6 +237,19 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_message_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); + /* + * To support two phase logical decoding, we require prepare/commit-prepare/abort-prepare + * callbacks. The filter-prepare callback is optional. We however enable two phase logical + * decoding when at least one of the methods is enabled so that we can easily identify + * missing methods. + * + * We decide it here, but only check it later in the wrappers. + */ + ctx->enable_twophase = (ctx->callbacks.prepare_cb != NULL) || + (ctx->callbacks.commit_prepared_cb != NULL) || + (ctx->callbacks.abort_prepared_cb != NULL) || + (ctx->callbacks.filter_prepare_cb != NULL); + /* * streaming callbacks * @@ -782,6 +807,111 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "prepare"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* If the plugin support 2 phase commits then prepare callback is mandatory */ + if (ctx->enable_twophase && ctx->callbacks.prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Output plugin did not register prepare_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "commit_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* If the plugin support 2 phase commits then commit prepared callback is mandatory */ + if (ctx->enable_twophase && ctx->callbacks.commit_prepared_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Output plugin did not register commit_prepared_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* If the plugin support 2 phase commits then abort prepared callback is mandatory */ + if (ctx->enable_twophase && ctx->callbacks.abort_prepared_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Output plugin did not register abort_prepared_cb callback"))); + + /* do the actual work: call callback */ + ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { @@ -858,6 +988,51 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static bool +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + /* + * Skip if decoding of twophase at PREPARE time is not enabled. In that + * case all twophase transactions are considered filtered out and will be + * applied as regular transactions at COMMIT PREPARED. + */ + if (!ctx->enable_twophase) + return true; + + /* + * The filter_prepare callback is optional. When not supplied, all + * prepared transactions should go through. + */ + if (!ctx->callbacks.filter_prepare_cb) + return false; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_prepare"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + return ret; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1975d62..d96be77 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); -static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool txn_prepared); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -413,6 +414,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* free data that's contained */ + if (txn->gid != NULL) + { + pfree(txn->gid); + txn->gid = NULL; + } if (txn->tuplecid_hash != NULL) { @@ -1401,6 +1407,59 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, } /* + * Cleanup the leftover contents of a transaction, usually after the transaction + * has been COMMIT PREPARED or ROLLBACK PREPARED. This does the rest of the cleanup + * that was not done when the transaction was PREPARED + */ +static void +ReorderBufferCleanupPreparedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + bool found; + + /* + * Cleanup the base snapshot, if set. + */ + if (txn->base_snapshot != NULL) + { + SnapBuildSnapDecRefcount(txn->base_snapshot); + dlist_delete(&txn->base_snapshot_node); + } + + /* + * Cleanup the snapshot for the last streamed run. + */ + if (txn->snapshot_now != NULL) + { + Assert(rbtxn_is_streamed(txn)); + ReorderBufferFreeSnap(rb, txn->snapshot_now); + } + + /* + * Remove TXN from its containing list. + * + * Note: if txn is known as subxact, we are deleting the TXN from its + * parent's list of known subxacts; this leaves the parent's nsubxacts + * count too high, but we don't care. Otherwise, we are deleting the TXN + * from the LSN-ordered list of toplevel TXNs. + */ + dlist_delete(&txn->node); + + /* now remove reference from buffer */ + hash_search(rb->by_txn, + (void *) &txn->xid, + HASH_REMOVE, + &found); + Assert(found); + + /* remove entries spilled to disk */ + if (rbtxn_is_serialized(txn)) + ReorderBufferRestoreCleanup(rb, txn); + + /* deallocate */ + ReorderBufferReturnTXN(rb, txn); +} + +/* * Cleanup the contents of a transaction, usually after the transaction * committed or aborted. */ @@ -1502,12 +1561,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Discard changes from a transaction (and subtransactions), after streaming - * them. Keep the remaining info - transactions, tuplecids, invalidations and - * snapshots. + * Discard changes from a transaction (and subtransactions), either after streaming or + * after a PREPARE. + * The flag txn_prepared indicates if this is called after a PREPARE. + * If streaming, keep the remaining info - transactions, tuplecids, invalidations and + * snapshots.If after a PREPARE, keep only the invalidations and snapshots. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) { dlist_mutable_iter iter; @@ -1526,7 +1587,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); } /* cleanup changes in the toplevel txn */ @@ -1560,9 +1621,30 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn->toptxn) || (txn->nentries_mem != 0)) + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; + if (txn_prepared) + { + /* + * If this is a prepared txn, cleanup the tuplecids we stored for decoding + * catalog snapshot access. + * They are always stored in the toplevel transaction. + */ + dlist_foreach_modify(iter, &txn->tuplecids) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + + ReorderBufferReturnChange(rb, change, true); + } + } + /* * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any * memory. We could also keep the hash table and update it with new ctid @@ -1880,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn); + ReorderBufferTruncateTXN(rb, txn, false); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -1987,7 +2069,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, prev_lsn = change->lsn; /* Set the current xid to detect concurrent aborts. */ - if (streaming) + if (streaming || rbtxn_prepared(change->txn)) { curtxn = change->txn; SetupCheckXidLive(curtxn->xid); @@ -2249,7 +2331,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } } - /* * There's a speculative insertion remaining, just clean in up, it * can't have been successful, otherwise we'd gotten a confirmation @@ -2278,7 +2359,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } else - rb->commit(rb, txn, commit_lsn); + { + /* + * Call either PREPARE (for twophase transactions) or COMMIT + * (for regular ones). + */ + if (rbtxn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -2319,11 +2409,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (streaming) { - ReorderBufferTruncateTXN(rb, txn); + ReorderBufferTruncateTXN(rb, txn, false); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } + else if (rbtxn_prepared(txn)) + { + ReorderBufferTruncateTXN(rb, txn, true); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } else ReorderBufferCleanupTXN(rb, txn); } @@ -2352,17 +2448,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent - * abort of the (sub)transaction we are streaming. We need to do the + * abort of the (sub)transaction we are streaming or preparing. We need to do the * cleanup and return gracefully on this error, see SetupCheckXidLive. */ if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) { /* - * This error can only occur when we are sending the data in - * streaming mode and the streaming is not finished yet. + * This error can only occur either when we are sending the data in + * streaming mode and the streaming is not finished yet or when we are + * sending the data out on a PREPARE during a twoi phase commit. */ - Assert(streaming); - Assert(stream_started); + Assert(streaming || rbtxn_prepared(txn)); + Assert(stream_started || rbtxn_prepared(txn)); /* Cleanup the temporary error state. */ FlushErrorState(); @@ -2370,10 +2467,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, errdata = NULL; curtxn->concurrent_abort = true; - /* Reset the TXN so that it is allowed to stream remaining data. */ - ReorderBufferResetTXN(rb, txn, snapshot_now, - command_id, prev_lsn, - specinsert); + /* If streaming, reset the TXN so that it is allowed to stream remaining data. */ + if (streaming && stream_started) + { + ReorderBufferResetTXN(rb, txn, snapshot_now, + command_id, prev_lsn, + specinsert); + } + else + { + elog(LOG, "stopping decoding of %s (%u)", + txn->gid[0] != '\0'? txn->gid:"", txn->xid); + ReorderBufferTruncateTXN(rb, txn, true); + } } else { @@ -2395,23 +2501,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * This interface is called once a toplevel commit is read for both streamed * as well as non-streamed transactions. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, - XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) +static void +ReorderBufferCommitInternal(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; Snapshot snapshot_now; CommandId command_id = FirstCommandId; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -2453,6 +2552,140 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } /* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +bool +ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + return rb->filter_prepare(rb, txn, xid, gid); +} + + +/* + * Commit a transaction. + * + * See comments for ReorderBufferCommitInternal() + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Prepare a twophase transaction. It calls ReorderBufferCommitInternal() + * since all prepared transactions need to be decoded at PREPARE time. + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Check whether this transaction was sent as prepared to subscribers. + * Called while handling commit|abort prepared. + */ +bool +ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* + * Always call the prepare filter. It's the job of the prepare filter to + * give us the *same* response for a given xid across multiple calls + * (including ones on restart) + */ + return !(rb->filter_prepare(rb, txn, xid, gid)); +} + +/* + * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + + /* + * The transaction may or may not exist (during restarts for example). + * Anyways, 2PC transactions do not contain any reorderbuffers. So allow + * it to be created below. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, + true); + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + /* this txn is obviously prepared */ + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + if (is_commit) + { + txn->txn_flags |= RBTXN_COMMIT_PREPARED; + rb->commit_prepared(rb, txn, commit_lsn); + } + else + { + txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; + rb->abort_prepared(rb, txn, commit_lsn); + } + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferCleanupPreparedTXN(rb, txn); +} + +/* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. * @@ -2495,7 +2728,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* cosmetic... */ txn->final_lsn = lsn; - /* remove potential on-disk data, and deallocate */ + /* + * remove potential on-disk data, and deallocate. + * + * We remove it even for prepared transactions (GID is enough to + * commit/abort those later). + */ + ReorderBufferCleanupTXN(rb, txn); } diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 45abc44..ee63e7b 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -84,6 +84,11 @@ typedef struct LogicalDecodingContext */ bool streaming; + /* + * Does the output plugin support two phase decoding, and is it enabled? + */ + bool enable_twophase; + /* * State for writing output. */ diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index b78c796..96e269b 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -77,6 +77,39 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); + /* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare and + * commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED and + * sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + /* * Called for the generic logical decoding messages. */ @@ -171,6 +204,10 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; /* streaming of changes */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1ae17d5..4d4e35d 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "access/twophase.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -162,9 +163,13 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 -#define RBTXN_IS_STREAMED 0x0008 -#define RBTXN_HAS_TOAST_INSERT 0x0010 -#define RBTXN_HAS_SPEC_INSERT 0x0020 +#define RBTXN_PREPARE 0x0008 +#define RBTXN_COMMIT_PREPARED 0x0010 +#define RBTXN_ROLLBACK_PREPARED 0x0020 +#define RBTXN_COMMIT 0x0040 +#define RBTXN_IS_STREAMED 0x0080 +#define RBTXN_HAS_TOAST_INSERT 0x0100 +#define RBTXN_HAS_SPEC_INSERT 0x0200 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -218,6 +223,15 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ ) +/* is this txn prepared? */ +#define rbtxn_prepared(txn) (txn->txn_flags & RBTXN_PREPARE) +/* was this prepared txn committed in the meanwhile? */ +#define rbtxn_commit_prepared(txn) (txn->txn_flags & RBTXN_COMMIT_PREPARED) +/* was this prepared txn aborted in the meanwhile? */ +#define rbtxn_rollback_prepared(txn) (txn->txn_flags & RBTXN_ROLLBACK_PREPARED) +/* was this txn committed in the meanwhile? */ +#define rbtxn_commit(txn) (txn->txn_flags & RBTXN_COMMIT) + typedef struct ReorderBufferTXN { /* See above */ @@ -229,6 +243,9 @@ typedef struct ReorderBufferTXN /* Xid of top-level transaction, if known */ TransactionId toplevel_xid; + /* In case of 2PC we need to pass GID to output plugin */ + char *gid; + /* * LSN of the first data carrying, WAL record with knowledge about this * xid. This is allowed to *not* be first record adorned with this xid, if @@ -390,6 +407,39 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* abort callback signature */ +typedef void (*ReorderBufferAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +typedef bool (*ReorderBufferFilterPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* abort prepared callback signature */ +typedef void (*ReorderBufferAbortPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + /* message callback signature */ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -482,6 +532,11 @@ struct ReorderBuffer ReorderBufferApplyChangeCB apply_change; ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; + ReorderBufferAbortCB abort; + ReorderBufferFilterPrepareCB filter_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferAbortPreparedCB abort_prepared; ReorderBufferMessageCB message; /* @@ -548,6 +603,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); @@ -571,6 +631,15 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, + const char *gid); +bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); -- 1.8.3.1