From b82a30a2e4c1647ca5ba96124cdfc3d1e19e7b0c Mon Sep 17 00:00:00 2001 From: Nikhil Sontakke Date: Thu, 5 Apr 2018 19:44:58 +0530 Subject: [PATCH 5/5] Additional test case to demonstrate decoding/rollback interlocking Introduce a decode-delay parameter in the test_decoding plugin. Based on the value provided the plugin sleep for those many seconds while holding the LogicalTransactionLock. A concurrent rollback is fired off which aborts that transaction in the meanwhile. --- contrib/test_decoding/Makefile | 5 +- contrib/test_decoding/t/001_twophase.pl | 101 ++++++++++++++++++++++++ contrib/test_decoding/test_decoding.c | 28 +++++++ src/backend/replication/logical/reorderbuffer.c | 5 ++ 4 files changed, 138 insertions(+), 1 deletion(-) create mode 100755 contrib/test_decoding/t/001_twophase.pl diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 6c18189d9d..79b9622600 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -26,7 +26,7 @@ installcheck:; # installation, allow to do so, but only if requested explicitly. installcheck-force: regresscheck-install-force isolationcheck-install-force -check: regresscheck isolationcheck +check: regresscheck isolationcheck 2pc-check submake-regress: $(MAKE) -C $(top_builddir)/src/test/regress all @@ -66,3 +66,6 @@ isolationcheck-install-force: all | submake-isolation submake-test_decoding temp isolationcheck isolationcheck-install-force temp-install: EXTRA_INSTALL=contrib/test_decoding + +2pc-check: temp-install + $(prove_check) diff --git a/contrib/test_decoding/t/001_twophase.pl b/contrib/test_decoding/t/001_twophase.pl new file mode 100755 index 0000000000..f154c89908 --- /dev/null +++ b/contrib/test_decoding/t/001_twophase.pl @@ -0,0 +1,101 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +# Initialize node +my $node_logical = get_new_node('logical'); +$node_logical->init(allows_streaming => 'logical'); +$node_logical->append_conf( + 'postgresql.conf', qq( + max_prepared_transactions = 10 +)); +$node_logical->start; + +# Create some pre-existing content on logical +$node_logical->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_logical->safe_psql('postgres', + "INSERT INTO tab SELECT generate_series(1,10)"); +$node_logical->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');"); +$node_logical->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding');"); + +# This test is specifically for testing concurrent abort while logical decode is +# ongoing. The decode-delay value will allow for each change decode to sleep for +# those many seconds. We also hold the LogicalLockTransaction while we sleep. +# We will fire off a ROLLBACK from another session when this delayed decode is +# ongoing. Since we are holding the lock from the call above, this ROLLBACK +# will wait for the logical backends to do a LogicalUnlockTransaction. We will +# stop decoding immediately post this and the next pg_logical_slot_get_changes call +# should show only a few records decoded from the entire two phase transaction +# +# We use two slots to test multiple decoding backends here + +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + PREPARE TRANSACTION 'test_prepared_tab';"); +# start decoding the above with decode-delay in the background. +my $logical_connstr = $node_logical->connstr . ' dbname=postgres'; + +# decode now, it should only decode 1 INSERT record and should include +# an ABORT entry because of the ROLLBACK below +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1', 'decode-delay', '3');\" \&"); + +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1', 'decode-delay', '3');\" \&"); + +# sleep for a little while (shorter than decode-delay) +$node_logical->safe_psql('postgres', "select pg_sleep(1)"); + +# rollback the prepared transaction whose first record is being decoded +# after sleeping for decode-delay time +$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); + +# wait for decoding to stop +$node_logical->psql('postgres', "select pg_sleep(4)"); + +# consume any remaining changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');"); + +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');"); + +# check for occurrence of log about waiting backends +my $output_file = slurp_file($node_logical->logfile()); +my $waiting_str = "Waiting for backends to abort"; +like($output_file, qr/$waiting_str/, "Waiting log found in server log"); + +# check for occurrence of log about stopping decoding +my $abort_str = "stopping decoding of test_prepared_tab "; +like($output_file, qr/$abort_str/, "ABORT found in server log"); + +# Check that commit prepared is decoded properly on immediate restart +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + INSERT INTO tab VALUES (13, 11); + PREPARE TRANSACTION 'test_prepared_tab';"); +# consume changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');"); +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');"); +$node_logical->stop('immediate'); +$node_logical->start; + +# commit post the restart +$node_logical->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');"); +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');"); + +# check inserts are visible +my $result = $node_logical->safe_psql('postgres', "SELECT count(*) FROM tab where a IN (11,12) OR b IN (11);"); +is($result, qq(3), 'Rows inserted via 2PC are visible on restart'); + +$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');"); +$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2');"); +$node_logical->stop('fast'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 05b993fd7a..6824e11906 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -37,6 +37,7 @@ typedef struct bool xact_wrote_changes; bool only_local; bool enable_twophase; + bool decode_delay; /* seconds to sleep after every change record */ } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -118,6 +119,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->skip_empty_xacts = false; data->only_local = false; data->enable_twophase = false; + data->decode_delay = 0; ctx->output_plugin_private = data; @@ -209,6 +211,21 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "decode-delay") == 0) + { + if (elem->arg == NULL) + data->decode_delay = 2; /* default to 2 seconds */ + else + data->decode_delay = pg_atoi(strVal(elem->arg), + sizeof(int), 0); + + if (data->decode_delay <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Specify positive value for parameter \"%s\"," + " you specified \"%s\"", + elem->defname, strVal(elem->arg)))); + } else { ereport(ERROR, @@ -548,6 +565,17 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } data->xact_wrote_changes = true; + /* + * if decode_delay is specified, sleep. Note that this + * happens with LogicalLockTransaction held from the + * decoding infrastructure + */ + if (data->decode_delay > 0) + { + elog(LOG, "sleeping for %d seconds", data->decode_delay); + pg_usleep(data->decode_delay * 1000000L); + } + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 178a99d158..460035cc76 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1372,7 +1372,12 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, /* Lock transaction before catalog access */ if (!LogicalLockTransaction(txn)) + { + elog(LOG, "stopping decoding of %s (%u)", + txn->gid[0] != '\0'? txn->gid:"", + txn->xid); goto change_cleanup; + } reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, change->data.tp.relnode.relNode); LogicalUnlockTransaction(txn); -- 2.15.1 (Apple Git-101)