From cf9a73649e99dbaf9b8c8747c19ca3dac3396a40 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Wed, 4 Nov 2020 18:11:58 +1100 Subject: [PATCH v16] Support 2PC txn - subscriber tests. This patch adds the two-phase commit subscriber test code (streaming and not streaming). --- src/test/subscription/t/020_twophase.pl | 345 ++++++++++++++ src/test/subscription/t/021_twophase_streaming.pl | 521 ++++++++++++++++++++++ 2 files changed, 866 insertions(+) create mode 100644 src/test/subscription/t/020_twophase.pl create mode 100644 src/test/subscription/t/021_twophase_streaming.pl diff --git a/src/test/subscription/t/020_twophase.pl b/src/test/subscription/t/020_twophase.pl new file mode 100644 index 0000000..f489f47 --- /dev/null +++ b/src/test/subscription/t/020_twophase.pl @@ -0,0 +1,345 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 21; + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Create some pre-existing content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full2 (x text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full2 (x text)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub ADD TABLE tab_full, tab_full2"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub"); + +# Wait for subscriber to finish initialization +my $caughtup_query = + "SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +############################### +# check that 2PC gets replicated to subscriber +# then COMMIT PREPARED +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (11); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# check that 2PC gets replicated to subscriber +# then ROLLBACK PREPARED +############################### + +$node_publisher->safe_psql('postgres'," + BEGIN; + INSERT INTO tab_full VALUES (12); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets aborted on subscriber +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is aborted on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Check that ROLLBACK PREPARED is decoded properly on crash restart +# (publisher and subscriber crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# rollback post the restart +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are rolled back +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (12,13);"); +is($result, qq(0), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (publisher and subscriber crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (12,13);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (subscriber only crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (14); + INSERT INTO tab_full VALUES (15); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (14,15);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (publisher only crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (16); + INSERT INTO tab_full VALUES (17); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_publisher->stop('immediate'); +$node_publisher->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (16,17);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Test nested transaction with 2PC +############################### + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (21); + SAVEPOINT sp_inner; + INSERT INTO tab_full VALUES (22); + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# COMMIT +$node_publisher->safe_psql('postgres', " + COMMIT PREPARED 'outer';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check the tx state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';"); +is($result, qq(0), 'transaction is ended on subscriber'); + +# check inserts are visible. 22 should be rolled back. 21 should be committed. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (21);"); +is($result, qq(1), 'Rows committed are on the subscriber'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (22);"); +is($result, qq(0), 'Rows rolled back are not on the subscriber'); + +############################### +# Test using empty GID +############################### + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (51); + PREPARE TRANSACTION '';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = '';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# ROLLBACK +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED '';"); + +# check that 2PC gets aborted on subscriber +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = '';"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Test cases involving DDL. +############################### + +# TODO This can be added after we add functionality to replicate DDL changes to subscriber. + +############################### +# check all the cleanup +############################### + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/021_twophase_streaming.pl b/src/test/subscription/t/021_twophase_streaming.pl new file mode 100644 index 0000000..9a03b83 --- /dev/null +++ b/src/test/subscription/t/021_twophase_streaming.pl @@ -0,0 +1,521 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 28; + +############################### +# Test setup +############################### + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_publisher->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Create some pre-existing content on publisher (uses same DDL as 015_stream test) +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber (columns a and b are compatible with same table name on publisher) +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication (streaming = on) +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (streaming = on)"); + +# Wait for subscriber to finish initialization +my $caughtup_query = + "SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +############################### +# Check initial data was copied to subscriber +############################### +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +############################### +# Test 2PC PREPARE / COMMIT PREPARED +# 1. Data is streamed as a 2PC transaction. +# 2. Then do commit prepared. +# +# Expect all data is replicated on subscriber side after the commit. +############################### + +# check that 2PC gets replicated to subscriber +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# 2PC gets committed +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# Test 2PC PREPARE / ROLLBACK PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC +# 3. Do rollback prepared. +# +# Expect data rolls back leaving only the original 2 rows. +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# 2PC tx gets aborted +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is aborted on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Check that 2PC ROLLBACK PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then server crashes before the 2PC transaction is rolled back. +# 3. After servers are restarted the pending transaction is rolled back. +# +# Expect all inserted data is gone. +# (Note: both publisher and subscriber crash/restart) +############################### + +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# rollback post the restart +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are rolled back +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'); + +############################### +# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then server crashes before the 2PC transaction is committed. +# 3. After servers are restarted the pending transaction is committed. +# +# Expect all data is replicated on subscriber side after the commit. +# (Note: both publisher and subscriber crash/restart) +############################### + +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'); + +############################### +# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then 1 server crashes before the 2PC transaction is committed. +# 3. After servers are restarted the pending transaction is committed. +# +# Expect all data is replicated on subscriber side after the commit. +# (Note: only subscriber crashes) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# insert, update, delete enough data to cause streaming +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_subscriber->stop('immediate'); +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber'); + +############################### +# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then 1 server crashes before the 2PC transaction is committed. +# 3. After servers are restarted the pending transaction is committed. +# +# Expect all data is replicated on subscriber side after the commit. +# (Note: only publisher crashes) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# insert, update, delete enough data to cause streaming +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->stop('immediate'); +$node_publisher->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber'); + +############################### +# Do INSERT after the PREPARE but before ROLLBACK PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC. +# 3. A single row INSERT is done which is after the PREPARE +# 4. Then do a ROLLBACK PREPARED. +# +# Expect the 2PC data rolls back leaving only 3 rows on the subscriber. +# (the original 2 + inserted 1) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Insert a different record (now we are outside of the 2PC tx) +# Note: the 2PC tx still holds row locks so make sure this insert is for a separate primary key +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + +# 2PC tx gets aborted +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is aborted on subscriber, +# but the extra INSERT outside of the 2PC still was replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3|3|3), 'check the outside insert was copied to subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Do INSERT after the PREPARE but before COMMIT PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC. +# 3. A single row INSERT is done which is after the PREPARE. +# 4. Then do a COMMIT PREPARED. +# +# Expect 2PC data + the extra row are on the subscriber. +# (the 3334 + inserted 1 = 3335) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Insert a different record (now we are outside of the 2PC tx) +# Note: the 2PC tx still holds row locks so make sure this insert is for a separare primary key +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + +# 2PC tx gets committed +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3335|3335|3335), 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# Do DELETE after PREPARE but before COMMIT PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC. +# 3. A single row DELETE is done for one of the records that was inserted by the 2PC transaction +# 4. Then there is a COMMIT PREPARED. +# +# Expect all the 2PC data rows on the subscriber (since in fact delete at step 3 would do nothing +# because that record was not yet committed at the time of the delete). +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# DELETE one of the prepared 2PC records before they get committed (we are outside of the 2PC tx) +$node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a = 5"); + +# 2PC tx gets committed +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber. Nothing was deleted'); + +# confirm the "deleted" row was in fact not deleted +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM test_tab WHERE a = 5"); +is($result, qq(1), 'The row we deleted before the commit till exists on subscriber.'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# Try 2PC tx works using an empty GID literal +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION '';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = '';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# 2PC tx gets committed +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED '';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber'); + +############################### +# Test cases involving DDL +############################### + +# TODO This can be added after we add functionality to replicate DDL changes to subscriber. + +############################### +# check all the cleanup +############################### + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 1.8.3.1