From 7d3851db059018c9ebff1a745bfdd752ac36ead7 Mon Sep 17 00:00:00 2001 From: Nikhil Sontakke Date: Thu, 5 Apr 2018 19:43:41 +0530 Subject: [PATCH 4/5] Teach test_decoding plugin to work with 2PC Includes a new option "enable_twophase". Depending on this options value, PREPARE TRANSACTION will either be decoded or treated as a single phase commit later. --- contrib/test_decoding/expected/prepared.out | 257 +++++++++++++++++++++++++--- contrib/test_decoding/sql/prepared.sql | 84 ++++++++- contrib/test_decoding/test_decoding.c | 137 +++++++++++++++ 3 files changed, 451 insertions(+), 27 deletions(-) diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out index 46e915d4ff..5df7b7ff20 100644 --- a/contrib/test_decoding/expected/prepared.out +++ b/contrib/test_decoding/expected/prepared.out @@ -6,19 +6,82 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d init (1 row) -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + PREPARE TRANSACTION 'test_prepared#1' +(3 rows) + COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + COMMIT +(3 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +----------------------------------- + COMMIT PREPARED 'test_prepared#1' +(1 row) + INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:2 + COMMIT +(3 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:2 + COMMIT + BEGIN + table public.test_prepared1: INSERT: id[integer]:3 + PREPARE TRANSACTION 'test_prepared#2' +(6 rows) + ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +------------------------------------- + ROLLBACK PREPARED 'test_prepared#2' +(1 row) + INSERT INTO test_prepared1 VALUES (4); -- test prepared xact containing ddl BEGIN; @@ -26,45 +89,193 @@ INSERT INTO test_prepared1 VALUES (5); ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. -INSERT INTO test_prepared2 VALUES (7); -COMMIT PREPARED 'test_prepared#3'; --- make sure stuff still works -INSERT INTO test_prepared1 VALUES (8); -INSERT INTO test_prepared2 VALUES (9); --- cleanup -DROP TABLE test_prepared1; -DROP TABLE test_prepared2; --- show results +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + relation | locktype | mode +-----------------+----------+--------------------- + test_prepared_1 | relation | RowExclusiveLock + test_prepared_1 | relation | AccessExclusiveLock +(2 rows) + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:4 + COMMIT +(3 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); data ------------------------------------------------------------------------- BEGIN - table public.test_prepared1: INSERT: id[integer]:1 + table public.test_prepared1: INSERT: id[integer]:4 COMMIT BEGIN - table public.test_prepared1: INSERT: id[integer]:2 - COMMIT + table public.test_prepared1: INSERT: id[integer]:5 + table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' + PREPARE TRANSACTION 'test_prepared#3' +(7 rows) + +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- +INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- BEGIN - table public.test_prepared1: INSERT: id[integer]:4 + table public.test_prepared2: INSERT: id[integer]:7 COMMIT +(3 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +---------------------------------------------------- BEGIN table public.test_prepared2: INSERT: id[integer]:7 COMMIT +(3 rows) + +COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------- BEGIN table public.test_prepared1: INSERT: id[integer]:5 table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' COMMIT +(4 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +----------------------------------- + COMMIT PREPARED 'test_prepared#3' +(1 row) + +-- make sure stuff still works +INSERT INTO test_prepared1 VALUES (8); +INSERT INTO test_prepared2 VALUES (9); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------- BEGIN table public.test_prepared1: INSERT: id[integer]:8 data[text]:null COMMIT BEGIN table public.test_prepared2: INSERT: id[integer]:9 COMMIT -(22 rows) +(6 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +-------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:8 data[text]:null + COMMIT + BEGIN + table public.test_prepared2: INSERT: id[integer]:9 + COMMIT +(6 rows) + +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; +BEGIN; +insert into test_prepared2 values (12); +PREPARE TRANSACTION 'test_prepared_lock2'; +COMMIT PREPARED 'test_prepared_lock2'; +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + relation | locktype | mode +----------+----------+------ +(0 rows) + +-- Shouldn't see anything with 2pc decoding off +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +----------------------------------------------------- + BEGIN + table public.test_prepared2: INSERT: id[integer]:12 + COMMIT +(3 rows) + +-- Shouldn't timeout on 2pc decoding. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +---------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2' + PREPARE TRANSACTION 'test_prepared_lock' + BEGIN + table public.test_prepared2: INSERT: id[integer]:12 + PREPARE TRANSACTION 'test_prepared_lock2' + COMMIT PREPARED 'test_prepared_lock2' +(8 rows) + +RESET statement_timeout; +COMMIT PREPARED 'test_prepared_lock'; +-- Both will work normally after we commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2' + COMMIT +(4 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +-------------------------------------- + COMMIT PREPARED 'test_prepared_lock' +(1 row) + +-- test savepoints +BEGIN; +SAVEPOINT test_savepoint; +CREATE TABLE test_prepared_savepoint (a int); +PREPARE TRANSACTION 'test_prepared_savepoint'; +COMMIT PREPARED 'test_prepared_savepoint'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +------------------------------------------- + COMMIT PREPARED 'test_prepared_savepoint' +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +------ +(0 rows) + +-- cleanup +DROP TABLE test_prepared1; +DROP TABLE test_prepared2; +-- show results. There should be nothing to show +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + data +------ +(0 rows) SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot @@ -72,3 +283,9 @@ SELECT pg_drop_replication_slot('regression_slot'); (1 row) +SELECT pg_drop_replication_slot('regression_slot_2pc'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql index e72639767e..e8eb8ad8d6 100644 --- a/contrib/test_decoding/sql/prepared.sql +++ b/contrib/test_decoding/sql/prepared.sql @@ -1,22 +1,31 @@ -- predictability SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding'); -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); INSERT INTO test_prepared1 VALUES (4); @@ -27,24 +36,85 @@ ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); -- make sure stuff still works INSERT INTO test_prepared1 VALUES (8); INSERT INTO test_prepared2 VALUES (9); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; + +BEGIN; +insert into test_prepared2 values (12); +PREPARE TRANSACTION 'test_prepared_lock2'; +COMMIT PREPARED 'test_prepared_lock2'; + +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + +-- Shouldn't see anything with 2pc decoding off +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Shouldn't timeout on 2pc decoding. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); +RESET statement_timeout; + +COMMIT PREPARED 'test_prepared_lock'; + +-- Both will work normally after we commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + +-- test savepoints +BEGIN; +SAVEPOINT test_savepoint; +CREATE TABLE test_prepared_savepoint (a int); +PREPARE TRANSACTION 'test_prepared_savepoint'; +COMMIT PREPARED 'test_prepared_savepoint'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); -- cleanup DROP TABLE test_prepared1; DROP TABLE test_prepared2; --- show results +-- show results. There should be nothing to show SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1'); + SELECT pg_drop_replication_slot('regression_slot'); +SELECT pg_drop_replication_slot('regression_slot_2pc'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index a94aeeae29..05b993fd7a 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -36,6 +36,7 @@ typedef struct bool skip_empty_xacts; bool xact_wrote_changes; bool only_local; + bool enable_twophase; } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -49,6 +50,8 @@ static void pg_output_begin(LogicalDecodingContext *ctx, bool last_write); static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pg_decode_abort_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr abort_lsn); static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); @@ -58,6 +61,18 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); void _PG_init(void) @@ -75,9 +90,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pg_decode_begin_txn; cb->change_cb = pg_decode_change; cb->commit_cb = pg_decode_commit_txn; + cb->abort_cb = pg_decode_abort_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->filter_prepare_cb = pg_decode_filter_prepare; + cb->prepare_cb = pg_decode_prepare_txn; + cb->commit_prepared_cb = pg_decode_commit_prepared_txn; + cb->abort_prepared_cb = pg_decode_abort_prepared_txn; } @@ -97,6 +117,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->enable_twophase = false; ctx->output_plugin_private = data; @@ -178,6 +199,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "enable-twophase") == 0) + { + if (elem->arg == NULL) + data->enable_twophase = true; + else if (!parse_bool(strVal(elem->arg), &data->enable_twophase)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -246,6 +277,112 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +/* ABORT callback */ +static void +pg_decode_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "ABORT %u", txn->xid); + else + appendStringInfoString(ctx->out, "ABORT"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* Filter out two-phase transactions, if decoding not enabled. */ +static bool +pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + TestDecodingData *data = ctx->output_plugin_private; + + /* treat all transactions as one-phase */ + if (!data->enable_twophase) + return true; + + return false; +} + +/* PREPARE callback */ +static void +pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "PREPARE TRANSACTION %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* COMMIT PREPARED callback */ +static void +pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "COMMIT PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* ABORT PREPARED callback */ +static void +pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "ROLLBACK PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) -- 2.15.1 (Apple Git-101)