From 05a90d44eeb75ed9684835fe1abefd58fbaf1774 Mon Sep 17 00:00:00 2001 From: Nikhil Sontakke Date: Wed, 4 Apr 2018 13:45:44 +0530 Subject: [PATCH 5/5] Additional test case to demonstrate decoding/rollback interlocking Introduce a decode-delay parameter in the test_decoding plugin. Based on the value provided the plugin sleep for those many seconds while holding the LogicalTransactionLock. A concurrent rollback is fired off which aborts that transaction in the meanwhile. --- contrib/test_decoding/Makefile | 5 ++++- contrib/test_decoding/test_decoding.c | 24 ++++++++++++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 5 +++++ 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 6c18189d9d..79b9622600 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -26,7 +26,7 @@ installcheck:; # installation, allow to do so, but only if requested explicitly. installcheck-force: regresscheck-install-force isolationcheck-install-force -check: regresscheck isolationcheck +check: regresscheck isolationcheck 2pc-check submake-regress: $(MAKE) -C $(top_builddir)/src/test/regress all @@ -66,3 +66,6 @@ isolationcheck-install-force: all | submake-isolation submake-test_decoding temp isolationcheck isolationcheck-install-force temp-install: EXTRA_INSTALL=contrib/test_decoding + +2pc-check: temp-install + $(prove_check) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 05b993fd7a..db7becdc44 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -37,6 +37,7 @@ typedef struct bool xact_wrote_changes; bool only_local; bool enable_twophase; + bool decode_delay; /* seconds to sleep after every change record */ } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -118,6 +119,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->skip_empty_xacts = false; data->only_local = false; data->enable_twophase = false; + data->decode_delay = 0; ctx->output_plugin_private = data; @@ -209,6 +211,21 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "decode-delay") == 0) + { + if (elem->arg == NULL) + data->decode_delay = 2; /* default to 2 seconds */ + else + data->decode_delay = pg_atoi(strVal(elem->arg), + sizeof(int), 0); + + if (data->decode_delay <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Specify positive value for parameter \"%s\"," + " you specified \"%s\"", + elem->defname, strVal(elem->arg)))); + } else { ereport(ERROR, @@ -548,6 +565,13 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } data->xact_wrote_changes = true; + /* if decode_delay is specified, sleep with above lock held */ + if (data->decode_delay > 0) + { + elog(LOG, "sleeping for %d seconds", data->decode_delay); + pg_usleep(data->decode_delay * 1000000L); + } + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1c7dbd3ade..adb6adef88 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1372,7 +1372,12 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, /* Lock transaction before catalog access */ if (!LogicalLockTransaction(txn)) + { + elog(LOG, "stopping decoding of %s (%u)", + txn->gid[0] != '\0'? txn->gid:"", + txn->xid); break; + } reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, change->data.tp.relnode.relNode); LogicalUnlockTransaction(txn); -- 2.15.1 (Apple Git-101)