From be90ef42b76823b394567a0113f3f7a762853e50 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 29 Oct 2020 06:03:03 -0400 Subject: [PATCH v14] Support 2PC txn backend and tests. Until now two-phase transactions were decoded at COMMIT, just like regular transaction. During replay, two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. This patch allows PREPARE-time decoding two-phase transactions (if the output plugin supports this capability), in which case the transactions are replayed at PREPARE and then committed later when COMMIT PREPARED arrives. Includes backend changes to support decoding of PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED. Includes two-phase commit test code (for test_decoding). --- contrib/test_decoding/Makefile | 4 +- contrib/test_decoding/expected/two_phase.out | 228 +++++++++++++++++ .../test_decoding/expected/two_phase_stream.out | 177 +++++++++++++ contrib/test_decoding/sql/two_phase.sql | 119 +++++++++ contrib/test_decoding/sql/two_phase_stream.sql | 63 +++++ contrib/test_decoding/t/001_twophase.pl | 121 +++++++++ src/backend/replication/logical/decode.c | 250 +++++++++++++++++- src/backend/replication/logical/reorderbuffer.c | 282 ++++++++++++++++++--- src/include/replication/reorderbuffer.h | 12 + 9 files changed, 1204 insertions(+), 52 deletions(-) create mode 100644 contrib/test_decoding/expected/two_phase.out create mode 100644 contrib/test_decoding/expected/two_phase_stream.out create mode 100644 contrib/test_decoding/sql/two_phase.sql create mode 100644 contrib/test_decoding/sql/two_phase_stream.sql create mode 100644 contrib/test_decoding/t/001_twophase.pl diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 9a4c76f..49523fe 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -4,11 +4,13 @@ MODULES = test_decoding PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ - decoding_into_rel binary prepared replorigin time messages \ + decoding_into_rel binary prepared replorigin time two_phase two_phase_stream messages \ spill slot truncate stream stats ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream +TAP_TESTS = 1 + REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/two_phase.out b/contrib/test_decoding/expected/two_phase.out new file mode 100644 index 0000000..e5e34b4 --- /dev/null +++ b/contrib/test_decoding/expected/two_phase.out @@ -0,0 +1,228 @@ +-- Test two-phased transactions. When two-phase-commit is enabled, transactions are +-- decoded at PREPARE time rather than at COMMIT PREPARED time. +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); +-- Test 1: +-- Test that commands in a two phase xact are only decoded at PREPARE. +-- Decoding after COMMIT PREPARED should only have the COMMIT PREPARED command and not the +-- rest of the commands in the transaction. +BEGIN; +INSERT INTO test_prepared1 VALUES (1); +INSERT INTO test_prepared1 VALUES (2); +-- should show nothing because the xact has not been prepared yet. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +PREPARE TRANSACTION 'test_prepared#1'; +-- should show both the above inserts and the PREPARE TRANSACTION. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + table public.test_prepared1: INSERT: id[integer]:2 + PREPARE TRANSACTION 'test_prepared#1' +(4 rows) + +COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +----------------------------------- + COMMIT PREPARED 'test_prepared#1' +(1 row) + +-- Test 2: +-- Test that rollback of a prepared xact is decoded. +BEGIN; +INSERT INTO test_prepared1 VALUES (3); +PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:3 + PREPARE TRANSACTION 'test_prepared#2' +(3 rows) + +ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------- + ROLLBACK PREPARED 'test_prepared#2' +(1 row) + +-- Test 3: +-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test. +BEGIN; +ALTER TABLE test_prepared1 ADD COLUMN data text; +INSERT INTO test_prepared1 VALUES (4, 'frakbar'); +PREPARE TRANSACTION 'test_prepared#3'; +-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + relation | locktype | mode +-----------------+----------+--------------------- + test_prepared_1 | relation | RowExclusiveLock + test_prepared_1 | relation | AccessExclusiveLock +(2 rows) + +-- The insert should show the newly altered column but not the DDL. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar' + PREPARE TRANSACTION 'test_prepared#3' +(3 rows) + +-- Test 4: +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +INSERT INTO test_prepared2 VALUES (5); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------- + BEGIN + table public.test_prepared2: INSERT: id[integer]:5 + COMMIT +(3 rows) + +COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +----------------------------------- + COMMIT PREPARED 'test_prepared#3' +(1 row) + +-- make sure stuff still works +INSERT INTO test_prepared1 VALUES (6); +INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:6 data[text]:null + COMMIT + BEGIN + table public.test_prepared2: INSERT: id[integer]:7 + COMMIT +(6 rows) + +-- Test 5: +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (8, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (9, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; +SELECT 'test_prepared1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + relation | locktype | mode +----------------+----------+--------------------- + test_prepared1 | relation | RowExclusiveLock + test_prepared1 | relation | ShareLock + test_prepared1 | relation | AccessExclusiveLock +(3 rows) + +-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The call should return +-- within a second. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2' + PREPARE TRANSACTION 'test_prepared_lock' +(4 rows) + +RESET statement_timeout; +COMMIT PREPARED 'test_prepared_lock'; +-- consume the commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------- + COMMIT PREPARED 'test_prepared_lock' +(1 row) + +-- Test 6: +-- Test savepoints and sub-xacts. Creating savepoints will create sub-xacts implicitly. +BEGIN; +CREATE TABLE test_prepared_savepoint (a int); +INSERT INTO test_prepared_savepoint VALUES (1); +SAVEPOINT test_savepoint; +INSERT INTO test_prepared_savepoint VALUES (2); +ROLLBACK TO SAVEPOINT test_savepoint; +PREPARE TRANSACTION 'test_prepared_savepoint'; +-- should show only 1, not 2 +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------ + BEGIN + table public.test_prepared_savepoint: INSERT: a[integer]:1 + PREPARE TRANSACTION 'test_prepared_savepoint' +(3 rows) + +COMMIT PREPARED 'test_prepared_savepoint'; +-- consume the commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------- + COMMIT PREPARED 'test_prepared_savepoint' +(1 row) + +-- Test 7: +-- test that a GID containing "_nodecode" gets decoded at commit prepared time +BEGIN; +INSERT INTO test_prepared1 VALUES (20); +PREPARE TRANSACTION 'test_prepared_nodecode'; +-- should show nothing +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +COMMIT PREPARED 'test_prepared_nodecode'; +-- should be decoded now +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:20 data[text]:null + COMMIT +(3 rows) + +-- Test 8: +-- cleanup and make sure results are also empty +DROP TABLE test_prepared1; +DROP TABLE test_prepared2; +-- show results. There should be nothing to show +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/expected/two_phase_stream.out b/contrib/test_decoding/expected/two_phase_stream.out new file mode 100644 index 0000000..cde4b83 --- /dev/null +++ b/contrib/test_decoding/expected/two_phase_stream.out @@ -0,0 +1,177 @@ +-- Test streaming of two-phase commits +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE stream_test(data text); +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + ?column? +---------- + msg5 +(1 row) + +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test1'; +-- should show the inserts after a PREPARE +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +---------------------------------------------------------- + streaming message: transactional: 1 prefix: test, sz: 50 + opening a streamed block for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + closing a streamed block for transaction + preparing streamed transaction +(24 rows) + +COMMIT PREPARED 'test1'; +--should show the COMMIT PREPARED +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +------------------------- + COMMIT PREPARED 'test1' +(1 row) + +-- streaming test with sub-transaction and PREPARE/ROLLBACK PREPARED +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + ?column? +---------- + msg5 +(1 row) + +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test2'; +-- should show the inserts after a PREPARE +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +---------------------------------------------------------- + streaming message: transactional: 1 prefix: test, sz: 50 + opening a streamed block for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + closing a streamed block for transaction + preparing streamed transaction +(24 rows) + +ROLLBACK PREPARED 'test2'; +-- should show the ROLLBACK PREPARED +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +--------------------------- + ROLLBACK PREPARED 'test2' +(1 row) + +-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with filtered gid +-- gids with '_nodecode' should not be handled as a two-phase commit. +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + ?column? +---------- + msg5 +(1 row) + +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test1_nodecode'; +-- should NOT show inserts after a PREPARE +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +---------------------------------------------------------- + streaming message: transactional: 1 prefix: test, sz: 50 +(1 row) + +COMMIT PREPARED 'test1_nodecode'; +-- should show the inserts but not show a COMMIT PREPARED but a COMMIT +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +------------------------------------------------------------- + BEGIN + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19' + table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20' + COMMIT +(22 rows) + +DROP TABLE stream_test; +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/two_phase.sql b/contrib/test_decoding/sql/two_phase.sql new file mode 100644 index 0000000..4ed5266 --- /dev/null +++ b/contrib/test_decoding/sql/two_phase.sql @@ -0,0 +1,119 @@ +-- Test two-phased transactions. When two-phase-commit is enabled, transactions are +-- decoded at PREPARE time rather than at COMMIT PREPARED time. +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); + +-- Test 1: +-- Test that commands in a two phase xact are only decoded at PREPARE. +-- Decoding after COMMIT PREPARED should only have the COMMIT PREPARED command and not the +-- rest of the commands in the transaction. +BEGIN; +INSERT INTO test_prepared1 VALUES (1); +INSERT INTO test_prepared1 VALUES (2); +-- should show nothing because the xact has not been prepared yet. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +PREPARE TRANSACTION 'test_prepared#1'; +-- should show both the above inserts and the PREPARE TRANSACTION. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +COMMIT PREPARED 'test_prepared#1'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test 2: +-- Test that rollback of a prepared xact is decoded. +BEGIN; +INSERT INTO test_prepared1 VALUES (3); +PREPARE TRANSACTION 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +ROLLBACK PREPARED 'test_prepared#2'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test 3: +-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test. +BEGIN; +ALTER TABLE test_prepared1 ADD COLUMN data text; +INSERT INTO test_prepared1 VALUES (4, 'frakbar'); +PREPARE TRANSACTION 'test_prepared#3'; +-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; +-- The insert should show the newly altered column but not the DDL. +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test 4: +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +INSERT INTO test_prepared2 VALUES (5); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +COMMIT PREPARED 'test_prepared#3'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +-- make sure stuff still works +INSERT INTO test_prepared1 VALUES (6); +INSERT INTO test_prepared2 VALUES (7); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test 5: +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (8, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (9, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; + +SELECT 'test_prepared1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; +-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The call should return +-- within a second. +SET statement_timeout = '1s'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +RESET statement_timeout; +COMMIT PREPARED 'test_prepared_lock'; +-- consume the commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test 6: +-- Test savepoints and sub-xacts. Creating savepoints will create sub-xacts implicitly. +BEGIN; +CREATE TABLE test_prepared_savepoint (a int); +INSERT INTO test_prepared_savepoint VALUES (1); +SAVEPOINT test_savepoint; +INSERT INTO test_prepared_savepoint VALUES (2); +ROLLBACK TO SAVEPOINT test_savepoint; +PREPARE TRANSACTION 'test_prepared_savepoint'; +-- should show only 1, not 2 +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +COMMIT PREPARED 'test_prepared_savepoint'; +-- consume the commit +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test 7: +-- test that a GID containing "_nodecode" gets decoded at commit prepared time +BEGIN; +INSERT INTO test_prepared1 VALUES (20); +PREPARE TRANSACTION 'test_prepared_nodecode'; +-- should show nothing +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +COMMIT PREPARED 'test_prepared_nodecode'; +-- should be decoded now +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test 8: +-- cleanup and make sure results are also empty +DROP TABLE test_prepared1; +DROP TABLE test_prepared2; +-- show results. There should be nothing to show +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/sql/two_phase_stream.sql b/contrib/test_decoding/sql/two_phase_stream.sql new file mode 100644 index 0000000..01510e4 --- /dev/null +++ b/contrib/test_decoding/sql/two_phase_stream.sql @@ -0,0 +1,63 @@ +-- Test streaming of two-phase commits + +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE stream_test(data text); + +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test1'; +-- should show the inserts after a PREPARE +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + +COMMIT PREPARED 'test1'; +--should show the COMMIT PREPARED +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + +-- streaming test with sub-transaction and PREPARE/ROLLBACK PREPARED +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test2'; +-- should show the inserts after a PREPARE +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + +ROLLBACK PREPARED 'test2'; +-- should show the ROLLBACK PREPARED +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + + +-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with filtered gid +-- gids with '_nodecode' should not be handled as a two-phase commit. +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +PREPARE TRANSACTION 'test1_nodecode'; +-- should NOT show inserts after a PREPARE +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + +COMMIT PREPARED 'test1_nodecode'; +-- should show the inserts but not show a COMMIT PREPARED but a COMMIT +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + + +DROP TABLE stream_test; +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/t/001_twophase.pl b/contrib/test_decoding/t/001_twophase.pl new file mode 100644 index 0000000..1555582 --- /dev/null +++ b/contrib/test_decoding/t/001_twophase.pl @@ -0,0 +1,121 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; +use Time::HiRes qw(usleep); +use Scalar::Util qw(looks_like_number); + +# Initialize node +my $node_logical = get_new_node('logical'); +$node_logical->init(allows_streaming => 'logical'); +$node_logical->append_conf( + 'postgresql.conf', qq( + max_prepared_transactions = 10 +)); +$node_logical->start; + +# Create some pre-existing content on logical +$node_logical->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_logical->safe_psql('postgres', + "INSERT INTO tab SELECT generate_series(1,10)"); +$node_logical->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');"); + +#Test 1: +# This test is specifically for testing concurrent abort while logical decode +# is ongoing. We will pass in the xid of the 2PC to the plugin as an option. +# On the receipt of a valid "check-xid-aborted", the change API in the test decoding +# plugin will wait for it to be aborted. +# +# We will fire off a ROLLBACK from another session when this decode +# is waiting. +# +# The status of "check-xid-aborted" will change from in-progress to not-committed +# (hence aborted) and we will stop decoding because the subsequent +# system catalog scan will error out. + +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + INSERT INTO tab VALUES (13,14); + PREPARE TRANSACTION 'test_prepared_tab';"); +# get XID of the above two-phase transaction +my $xid2pc = $node_logical->safe_psql('postgres', "SELECT transaction FROM pg_prepared_xacts WHERE gid = 'test_prepared_tab'"); +is(looks_like_number($xid2pc), qq(1), 'Got a valid two-phase XID'); + +# start decoding the above by passing the "check-xid-aborted" +my $logical_connstr = $node_logical->connstr . ' dbname=postgres'; + +# decode now, it should include an ABORT entry because of the ROLLBACK below +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'check-xid-aborted', '$xid2pc');\" \&"); + +# check that decode starts waiting for this $xid2pc +poll_output_until("waiting for $xid2pc to abort") + or die "no wait happened for the abort"; + +# rollback the prepared transaction +$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); + +# check for occurrence of the log about stopping this decoding +poll_output_until("stopping decoding of test_prepared_tab") + or die "no decoding stop for the rollback"; + +# consume any remaining changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');"); + +# Test 2: +# Check that commit prepared is decoded properly on immediate restart +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + INSERT INTO tab VALUES (13, 11); + PREPARE TRANSACTION 'test_prepared_tab';"); +# consume changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');"); +$node_logical->stop('immediate'); +$node_logical->start; + +# commit post the restart +$node_logical->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');"); + +# check inserts are visible +my $result = $node_logical->safe_psql('postgres', "SELECT count(*) FROM tab where a IN (11,12) OR b IN (11);"); +is($result, qq(3), 'Rows inserted via 2PC are visible on restart'); + +$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');"); +$node_logical->stop('fast'); + +sub poll_output_until +{ + my ($expected) = @_; + + $expected = 'xxxxxx' unless defined($expected); # default junk value + + my $max_attempts = 180 * 10; + my $attempts = 0; + + my $output_file = ''; + while ($attempts < $max_attempts) + { + $output_file = slurp_file($node_logical->logfile()); + + if ($output_file =~ $expected) + { + return 1; + } + + # Wait 0.1 second before retrying. + usleep(100_000); + $attempts++; + } + + # The output result didn't change in 180 seconds. Give up + return 0; +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 3f84ee9..fd961d4 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -68,8 +68,15 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); +static void DecodeCommitPrepared(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodeAbortPrepared(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -239,7 +246,6 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) switch (info) { case XLOG_XACT_COMMIT: - case XLOG_XACT_COMMIT_PREPARED: { xl_xact_commit *xlrec; xl_xact_parsed_commit parsed; @@ -256,8 +262,24 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeCommit(ctx, buf, &parsed, xid); break; } + case XLOG_XACT_COMMIT_PREPARED: + { + xl_xact_commit *xlrec; + xl_xact_parsed_commit parsed; + TransactionId xid; + + xlrec = (xl_xact_commit *) XLogRecGetData(r); + ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); + + if (!TransactionIdIsValid(parsed.twophase_xid)) + xid = XLogRecGetXid(r); + else + xid = parsed.twophase_xid; + + DecodeCommitPrepared(ctx, buf, &parsed, xid); + break; + } case XLOG_XACT_ABORT: - case XLOG_XACT_ABORT_PREPARED: { xl_xact_abort *xlrec; xl_xact_parsed_abort parsed; @@ -274,6 +296,23 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeAbort(ctx, buf, &parsed, xid); break; } + case XLOG_XACT_ABORT_PREPARED: + { + xl_xact_abort *xlrec; + xl_xact_parsed_abort parsed; + TransactionId xid; + + xlrec = (xl_xact_abort *) XLogRecGetData(r); + ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); + + if (!TransactionIdIsValid(parsed.twophase_xid)) + xid = XLogRecGetXid(r); + else + xid = parsed.twophase_xid; + + DecodeAbortPrepared(ctx, buf, &parsed, xid); + break; + } case XLOG_XACT_ASSIGNMENT: /* @@ -312,17 +351,35 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } break; case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; + xl_xact_prepare *xlrec; - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); - break; + /* check that output plugin is capable of two-phase decoding */ + if (!ctx->twophase) + { + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + break; + } + + /* ok, parse it */ + xlrec = (xl_xact_prepare *)XLogRecGetData(r); + ParsePrepareRecord(XLogRecGetInfo(buf->record), + xlrec, &parsed); + + /* does output plugin want this particular transaction? */ + if (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid, + parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } @@ -659,6 +716,131 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* + * Consolidated commit record handling between the different form of commit + * records. + */ +static void +DecodeCommitPrepared(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_commit *parsed, TransactionId xid) +{ + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = parsed->xact_time; + RepOriginId origin_id = XLogRecGetOrigin(buf->record); + int i; + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + + SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, + parsed->nsubxacts, parsed->subxacts); + + /* ---- + * Check whether we are interested in this specific transaction, and tell + * the reorderbuffer to forget the content of the (sub-)transactions + * if not. + * + * There can be several reasons we might not be interested in this + * transaction: + * 1) We might not be interested in decoding transactions up to this + * LSN. This can happen because we previously decoded it and now just + * are restarting or if we haven't assembled a consistent snapshot yet. + * 2) The transaction happened in another database. + * 3) The output plugin is not interested in the origin. + * 4) We are doing fast-forwarding + * + * We can't just use ReorderBufferAbort() here, because we need to execute + * the transaction's invalidations. This currently won't be needed if + * we're just skipping over the transaction because currently we only do + * so during startup, to get to the first transaction the client needs. As + * we have reset the catalog caches before starting to read WAL, and we + * haven't yet touched any catalogs, there can't be anything to invalidate. + * But if we're "forgetting" this commit because it's it happened in + * another database, the invalidations might be important, because they + * could be for shared catalogs and we might have loaded data into the + * relevant syscaches. + * --- + */ + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); + } + ReorderBufferForget(ctx->reorder, xid, buf->origptr); + + return; + } + + /* tell the reorderbuffer about the surviving subtransactions */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + + /* + * For COMMIT PREPARED, the changes have already been replayed at + * PREPARE time, so we only need to notify the subscriber that the GID + * finally committed. + * If filter check present and this needs to be skipped, do a regular commit. + */ + if (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(ctx->reorder, xid, parsed->twophase_gid)) + { + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } + else + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, true); + } + +} + +/* + * Decode PREPARE record. Similar logic as in COMMIT + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed) +{ + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz commit_time = parsed->origin_timestamp; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + return; + + /* + * Tell the reorderbuffer about the surviving subtransactions. We need to + * do this because the main transaction itself has not committed since we + * are in the prepare phase right now. So we need to be sure the snapshot + * is setup correctly for the main transaction in case all changes + * happened in subtransanctions + */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + + /* replay actions of all transaction + subtransactions in order */ + ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid); +} + +/* * Get the data from the various forms of abort records and pass it on to * snapbuild.c and reorderbuffer.c */ @@ -681,6 +863,50 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* + * Get the data from the various forms of abort records and pass it on to + * snapbuild.c and reorderbuffer.c + */ +static void +DecodeAbortPrepared(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_abort *parsed, TransactionId xid) +{ + int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = 0; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + + /* + * If it passes through the filters handle the ROLLBACK via callbacks + */ + if(!FilterByOrigin(ctx, origin_id) && + !SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) && + !ReorderBufferPrepareNeedSkip(ctx->reorder, xid, parsed->twophase_gid)) + { + Assert(TransactionIdIsValid(xid)); + Assert(parsed->dbId == ctx->slot->data.database); + + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, false); + return; + } + + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], + buf->record->EndRecPtr); + } + + ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); +} + +/* * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs. * * Deletes can contain the new tuple. diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c1bd680..1456bc8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); -static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool txn_prepared); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -421,6 +422,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* free data that's contained */ + if (txn->gid != NULL) + { + pfree(txn->gid); + txn->gid = NULL; + } + if (txn->tuplecid_hash != NULL) { hash_destroy(txn->tuplecid_hash); @@ -1514,12 +1521,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Discard changes from a transaction (and subtransactions), after streaming - * them. Keep the remaining info - transactions, tuplecids, invalidations and - * snapshots. + * Discard changes from a transaction (and subtransactions), either after streaming or + * after a PREPARE. + * The flag txn_prepared indicates if this is called after a PREPARE. + * If streaming, keep the remaining info - transactions, tuplecids, invalidations and + * snapshots. If after a PREPARE, keep only the invalidations and snapshots. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) { dlist_mutable_iter iter; @@ -1538,7 +1547,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); } /* cleanup changes in the txn */ @@ -1572,9 +1581,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn->toptxn) || (txn->nentries_mem != 0)) + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; + if (txn_prepared) + { + /* + * If this is a prepared txn, cleanup the tuplecids we stored for decoding + * catalog snapshot access. + * They are always stored in the toplevel transaction. + */ + dlist_foreach_modify(iter, &txn->tuplecids) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + + /* Remove the change from its containing list. */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change, true); + } + } + /* * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any * memory. We could also keep the hash table and update it with new ctid @@ -1768,9 +1801,22 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferStreamTXN(rb, txn); - rb->stream_commit(rb, txn, txn->final_lsn); - - ReorderBufferCleanupTXN(rb, txn); + if (rbtxn_prepared(txn)) + { + rb->stream_prepare(rb, txn, txn->final_lsn); + /* This is a PREPARED transaction, part of a two-phase commit. + * The full cleanup will happen as part of the COMMIT PREPAREDs, so now + * just truncate txn by removing changes and tuple_cids + */ + ReorderBufferTruncateTXN(rb, txn, true); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + { + rb->stream_commit(rb, txn, txn->final_lsn); + ReorderBufferCleanupTXN(rb, txn); + } } /* @@ -1898,8 +1944,12 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn, ReorderBufferChange *specinsert) { - /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn); + /* Discard the changes that we just streamed. + * This can only be called if streaming and not part of a PREPARE in + * a two-phase commit, so set prepared flag as false. + */ + Assert(!rbtxn_prepared(txn)); + ReorderBufferTruncateTXN(rb, txn, false); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -1921,6 +1971,11 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN. * + * We are here due to one of the 3 scenarios: + * 1. As part of streaming an in-progress transactions + * 2. Prepare of a two-phase commit + * 3. Commit of a transaction. + * * Send data of a transaction (and its subtransactions) to the * output plugin. We iterate over the top and subtransactions (using a k-way * merge) and replay the changes in lsn order. @@ -2006,7 +2061,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, prev_lsn = change->lsn; /* Set the current xid to detect concurrent aborts. */ - if (streaming) + if (streaming || rbtxn_prepared(change->txn)) { curtxn = change->txn; SetupCheckXidLive(curtxn->xid); @@ -2297,7 +2352,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } else - rb->commit(rb, txn, commit_lsn); + { + /* + * Call either PREPARE (for two-phase transactions) or COMMIT + * (for regular ones). + */ + if (rbtxn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -2331,18 +2395,32 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, RollbackAndReleaseCurrentSubTransaction(); /* + * We are here due to one of the 3 scenarios: + * 1. As part of streaming in-progress transactions + * 2. Prepare of a two-phase commit + * 3. Commit of a transaction. + * * If we are streaming the in-progress transaction then discard the * changes that we just streamed, and mark the transactions as - * streamed (if they contained changes). Otherwise, remove all the + * streamed (if they contained changes), set prepared flag as false. + * If part of a prepare of a two-phase commit set the prepared flag + * as true so that we can discard changes and cleanup tuplecids. + * Otherwise, remove all the * changes and deallocate the ReorderBufferTXN. */ if (streaming) { - ReorderBufferTruncateTXN(rb, txn); + ReorderBufferTruncateTXN(rb, txn, false); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } + else if (rbtxn_prepared(txn)) + { + ReorderBufferTruncateTXN(rb, txn, true); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } else ReorderBufferCleanupTXN(rb, txn); } @@ -2372,17 +2450,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent - * abort of the (sub)transaction we are streaming. We need to do the + * abort of the (sub)transaction we are streaming or preparing. We need to do the * cleanup and return gracefully on this error, see SetupCheckXidLive. */ if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) { /* - * This error can only occur when we are sending the data in - * streaming mode and the streaming is not finished yet. + * This error can only occur either when we are sending the data in + * streaming mode and the streaming is not finished yet or when we are + * sending the data out on a PREPARE during a two-phase commit. + * Both conditions can't be true either, it should be one of them. */ - Assert(streaming); - Assert(stream_started); + Assert(streaming || rbtxn_prepared(txn)); + Assert(stream_started || rbtxn_prepared(txn)); + Assert(!(streaming && rbtxn_prepared(txn))); /* Cleanup the temporary error state. */ FlushErrorState(); @@ -2390,10 +2471,21 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, errdata = NULL; curtxn->concurrent_abort = true; - /* Reset the TXN so that it is allowed to stream remaining data. */ - ReorderBufferResetTXN(rb, txn, snapshot_now, - command_id, prev_lsn, - specinsert); + /* + * If streaming, reset the TXN so that it is allowed to stream remaining data. + */ + if (streaming) + { + ReorderBufferResetTXN(rb, txn, snapshot_now, + command_id, prev_lsn, + specinsert); + } + else + { + elog(LOG, "stopping decoding of %s (%u)", + txn->gid[0] != '\0'? txn->gid:"", txn->xid); + ReorderBufferTruncateTXN(rb, txn, true); + } } else { @@ -2415,23 +2507,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * This interface is called once a toplevel commit is read for both streamed * as well as non-streamed transactions. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, - XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) +static void +ReorderBufferCommitInternal(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; Snapshot snapshot_now; CommandId command_id = FirstCommandId; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -2473,6 +2558,120 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } /* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +bool +ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + return rb->filter_prepare(rb, txn, xid, gid); +} + + +/* + * Commit a transaction. + * + * See comments for ReorderBufferCommitInternal() + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Prepare a two-phase transaction. It calls ReorderBufferCommitInternal() + * since all prepared transactions need to be decoded at PREPARE time. + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Send standalone xact event. This is used to handle COMMIT/ROLLBACK PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + + /* + * The transaction may or may not exist (during restarts for example). + * Anyway, two-phase transactions do not contain any reorderbuffers. So allow + * it to be created below. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, + true); + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + /* this txn is obviously prepared */ + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ + strcpy(txn->gid, gid); + + if (is_commit) + txn->txn_flags |= RBTXN_COMMIT_PREPARED; + else + txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; + + if (rbtxn_commit_prepared(txn)) + rb->commit_prepared(rb, txn, commit_lsn); + else if (rbtxn_rollback_prepared(txn)) + rb->rollback_prepared(rb, txn, commit_lsn); + + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); + ReorderBufferCleanupTXN(rb, txn); +} + +/* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. * @@ -2515,7 +2714,12 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* cosmetic... */ txn->final_lsn = lsn; - /* remove potential on-disk data, and deallocate */ + /* + * remove potential on-disk data, and deallocate. + * + * We remove it even for prepared transactions (GID is enough to + * commit/abort those later). + */ ReorderBufferCleanupTXN(rb, txn); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index becd20e..03f777c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -635,6 +635,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); @@ -658,6 +663,13 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, + const char *gid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); -- 1.8.3.1