From f0cf243ef74b28248d068f43adeed0404fa39fec Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Fri, 18 Sep 2020 08:01:14 -0400 Subject: [PATCH v4] pgoutput output plugin support for logical decoding of 2pc --- src/backend/access/transam/twophase.c | 31 ++++++ src/backend/replication/logical/proto.c | 90 ++++++++++++++- src/backend/replication/logical/worker.c | 147 ++++++++++++++++++++++++- src/backend/replication/pgoutput/pgoutput.c | 54 ++++++++- src/include/access/twophase.h | 1 + src/include/replication/logicalproto.h | 37 ++++++- src/test/subscription/t/020_twophase.pl | 163 ++++++++++++++++++++++++++++ 7 files changed, 514 insertions(+), 9 deletions(-) create mode 100644 src/test/subscription/t/020_twophase.pl diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index ef4f998..bed87d5 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -548,6 +548,37 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held) } /* + * LookupGXact + * Check if the prepared transaction with the given GID is around + */ +bool +LookupGXact(const char *gid) +{ + int i; + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs */ + if (!gxact->valid) + continue; + if (strcmp(gxact->gid, gid) != 0) + continue; + + LWLockRelease(TwoPhaseStateLock); + + return true; + } + + LWLockRelease(TwoPhaseStateLock); + + return false; +} + +/* * LockGXact * Locate the prepared transaction and mark it busy for COMMIT or PREPARE. */ diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index eb19142..291ed10 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -72,12 +72,17 @@ logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data) */ void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, bool is_commit) { uint8 flags = 0; pq_sendbyte(out, 'C'); /* sending COMMIT */ + if (is_commit) + flags |= LOGICALREP_IS_COMMIT; + else + flags |= LOGICALREP_IS_ABORT; + /* send the flags field (unused for now) */ pq_sendbyte(out, flags); @@ -88,16 +93,20 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, } /* - * Read transaction COMMIT from the stream. + * Read transaction COMMIT|ABORT from the stream. */ void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) { - /* read flags (unused for now) */ + /* read flags */ uint8 flags = pq_getmsgbyte(in); - if (flags != 0) - elog(ERROR, "unrecognized flags %u in commit message", flags); + if (!CommitFlagsAreValid(flags)) + elog(ERROR, "unrecognized flags %u in commit|abort message", + flags); + + /* the flag is either commit or abort */ + commit_data->is_commit = (flags == LOGICALREP_IS_COMMIT); /* read fields */ commit_data->commit_lsn = pq_getmsgint64(in); @@ -106,6 +115,77 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) } /* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'P'); /* sending PREPARE protocol */ + + /* + * This should only ever happen for 2PC transactions. In which case we + * expect to have a non-empty GID. + */ + Assert(rbtxn_prepared(txn)); + Assert(strlen(txn->gid) > 0); + + /* + * Flags are determined from the state of the transaction. We know we + * always get PREPARE first and then [COMMIT|ROLLBACK] PREPARED, so if + * it's already marked as committed then it has to be COMMIT PREPARED (and + * likewise for abort / ROLLBACK PREPARED). + */ + if (rbtxn_commit_prepared(txn)) + flags |= LOGICALREP_IS_COMMIT_PREPARED; + else if (rbtxn_rollback_prepared(txn)) + flags |= LOGICALREP_IS_ROLLBACK_PREPARED; + else + flags |= LOGICALREP_IS_PREPARE; + + /* Make sure exactly one of the expected flags is set. */ + if (!PrepareFlagsAreValid(flags)) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPrepareData * prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (!PrepareFlagsAreValid(flags)) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* set the action (reuse the constants used for the flags) */ + prepare_data->prepare_type = flags; + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + prepare_data->end_lsn = pq_getmsgint64(in); + prepare_data->preparetime = pq_getmsgint64(in); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* * Write ORIGIN to the output stream. */ void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d239d28..62c571e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -729,7 +729,11 @@ apply_handle_commit(StringInfo s) replorigin_session_origin_lsn = commit_data.end_lsn; replorigin_session_origin_timestamp = commit_data.committime; - CommitTransactionCommand(); + if (commit_data.is_commit) + CommitTransactionCommand(); + else + AbortCurrentTransaction(); + pgstat_report_stat(false); store_flush_position(commit_data.end_lsn); @@ -749,6 +753,141 @@ apply_handle_commit(StringInfo s) pgstat_report_activity(STATE_IDLE, NULL); } +static void +apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data) +{ + Assert(prepare_data->prepare_lsn == remote_final_lsn); + + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) + { + /* End the earlier transaction and start a new one */ + BeginTransactionBlock(); + CommitTransactionCommand(); + StartTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->preparetime; + + PrepareTransactionBlock(prepare_data->gid); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data->end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +static void +apply_handle_commit_prepared_txn(LogicalRepPrepareData * prepare_data) +{ + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->preparetime; + + FinishPreparedTransaction(prepare_data->gid, true); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +static void +apply_handle_rollback_prepared_txn(LogicalRepPrepareData * prepare_data) +{ + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->preparetime; + + /* + * During logical decoding, on the apply side, it's possible that a + * prepared transaction got aborted while decoding. In that case, we stop + * the decoding and abort the transaction immediately. However the + * ROLLBACK prepared processing still reaches the subscriber. In that case + * it's ok to have a missing gid + */ + if (LookupGXact(prepare_data->gid)) + { + /* there is no transaction when ABORT/ROLLBACK PREPARED is called */ + ensure_transaction(); + FinishPreparedTransaction(prepare_data->gid, false); + CommitTransactionCommand(); + } + + pgstat_report_stat(false); + + store_flush_position(prepare_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepPrepareData prepare_data; + + logicalrep_read_prepare(s, &prepare_data); + + switch (prepare_data.prepare_type) + { + case LOGICALREP_IS_PREPARE: + apply_handle_prepare_txn(&prepare_data); + break; + + case LOGICALREP_IS_COMMIT_PREPARED: + apply_handle_commit_prepared_txn(&prepare_data); + break; + + case LOGICALREP_IS_ROLLBACK_PREPARED: + apply_handle_rollback_prepared_txn(&prepare_data); + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected type of prepare message: %d", + prepare_data.prepare_type))); + } +} + /* * Handle ORIGIN message. * @@ -1909,10 +2048,14 @@ apply_dispatch(StringInfo s) case 'B': apply_handle_begin(s); break; - /* COMMIT */ + /* COMMIT/ABORT */ case 'C': apply_handle_commit(s); break; + /* PREPARE and [COMMIT|ROLLBACK] PREPARED */ + case 'P': + apply_handle_prepare(s); + break; /* INSERT */ case 'I': apply_handle_insert(s); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index eb1f230..729b655 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -47,6 +47,12 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, @@ -143,6 +149,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; + + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->abort_prepared_cb = pgoutput_abort_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -373,7 +383,49 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginUpdateProgress(ctx); OutputPluginPrepareWrite(ctx, true); - logicalrep_write_commit(ctx->out, txn, commit_lsn); + logicalrep_write_commit(ctx->out, txn, commit_lsn, true); + OutputPluginWrite(ctx, true); +} + +/* + * PREPARE callback + */ +static void +pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT PREPARED callback + */ +static void +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * PREPARE callback + */ +static void +pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); } diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 2ca71c3..b2628ea 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -44,6 +44,7 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, extern void StartPrepare(GlobalTransaction gxact); extern void EndPrepare(GlobalTransaction gxact); extern bool StandbyTransactionIdIsPrepared(TransactionId xid); +extern bool LookupGXact(const char *gid); extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 607a728..fb07580 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -85,20 +85,55 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +/* Commit (and abort) information */ typedef struct LogicalRepCommitData { + bool is_commit; XLogRecPtr commit_lsn; XLogRecPtr end_lsn; TimestampTz committime; } LogicalRepCommitData; +/* types of the commit protocol message */ +#define LOGICALREP_IS_COMMIT 0x01 +#define LOGICALREP_IS_ABORT 0x02 + +/* commit message is COMMIT or ABORT, and there is nothing else */ +#define CommitFlagsAreValid(flags) \ + ((flags == LOGICALREP_IS_COMMIT) || (flags == LOGICALREP_IS_ABORT)) + +/* Prepare protocol information */ +typedef struct LogicalRepPrepareData +{ + uint8 prepare_type; + XLogRecPtr prepare_lsn; + XLogRecPtr end_lsn; + TimestampTz preparetime; + char gid[GIDSIZE]; +} LogicalRepPrepareData; + +/* types of the prepare protocol message */ +#define LOGICALREP_IS_PREPARE 0x01 +#define LOGICALREP_IS_COMMIT_PREPARED 0x02 +#define LOGICALREP_IS_ROLLBACK_PREPARED 0x04 + +/* prepare can be exactly one of PREPARE, [COMMIT|ABORT] PREPARED*/ +#define PrepareFlagsAreValid(flags) \ + ((flags == LOGICALREP_IS_PREPARE) || \ + (flags == LOGICALREP_IS_COMMIT_PREPARED) || \ + (flags == LOGICALREP_IS_ROLLBACK_PREPARED)) + extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data); extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, bool is_commit); extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepPrepareData * prepare_data); extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); diff --git a/src/test/subscription/t/020_twophase.pl b/src/test/subscription/t/020_twophase.pl new file mode 100644 index 0000000..c7f373d --- /dev/null +++ b/src/test/subscription/t/020_twophase.pl @@ -0,0 +1,163 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 12; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf( + 'postgresql.conf', qq( + max_prepared_transactions = 10 + )); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf( + 'postgresql.conf', qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Create some pre-existing content on publisher +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', +"ALTER PUBLICATION tap_pub ADD TABLE tab_full, tab_full2" +); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +# Wait for subscriber to finish initialization +my $caughtup_query = +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', + "BEGIN;INSERT INTO tab_full VALUES (11);PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); + is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); + is($result, qq(1), 'Row inserted via 2PC has committed on subscriber'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); + is($result, qq(0), 'transaction is committed on subscriber'); + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', + "BEGIN;INSERT INTO tab_full VALUES (12);PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); + is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets aborted on subscriber +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is aborted on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); + is($result, qq(0), 'Row inserted via 2PC is not present on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); + is($result, qq(0), 'transaction is aborted on subscriber'); + +# Check that commit prepared is decoded properly on crash restart +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); +$node_publisher->start; +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (11,12);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +# TODO add test cases involving DDL. This can be added after we add functionality +# to replicate DDL changes to subscriber. + +# check all the cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 1.8.3.1