From f962b2527e54e607ea795c6c009f0d36fe5eb00c Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 29 Oct 2020 06:25:46 -0400 Subject: [PATCH v14] Support 2PC txn - pgoutput. This patch adds support in the pgoutput plugin and subscriber for handling of two-phase commits. Includes pgoutput changes. Includes subscriber changes. Includes two-phase commit test code (streaming and not streaming). --- src/backend/access/transam/twophase.c | 27 ++ src/backend/replication/logical/proto.c | 142 +++++- src/backend/replication/logical/worker.c | 288 +++++++++++- src/backend/replication/pgoutput/pgoutput.c | 75 +++- src/include/access/twophase.h | 1 + src/include/replication/logicalproto.h | 33 ++ src/test/subscription/t/020_twophase.pl | 345 ++++++++++++++ src/test/subscription/t/021_twophase_streaming.pl | 521 ++++++++++++++++++++++ 8 files changed, 1409 insertions(+), 23 deletions(-) create mode 100644 src/test/subscription/t/020_twophase.pl create mode 100644 src/test/subscription/t/021_twophase_streaming.pl diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 7940060..129afe9 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -548,6 +548,33 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held) } /* + * LookupGXact + * Check if the prepared transaction with the given GID is around + */ +bool +LookupGXact(const char *gid) +{ + int i; + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs */ + if (gxact->valid && strcmp(gxact->gid, gid) == 0) + { + found = true; + break; + } + + } + LWLockRelease(TwoPhaseStateLock); + return found; +} + +/* * LockGXact * Locate the prepared transaction and mark it busy for COMMIT or PREPARE. */ diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index eb19142..9deb1ef 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -78,7 +78,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, pq_sendbyte(out, 'C'); /* sending COMMIT */ - /* send the flags field (unused for now) */ + /* send the flags field */ pq_sendbyte(out, flags); /* send fields */ @@ -99,6 +99,7 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) if (flags != 0) elog(ERROR, "unrecognized flags %u in commit message", flags); + /* read fields */ commit_data->commit_lsn = pq_getmsgint64(in); commit_data->end_lsn = pq_getmsgint64(in); @@ -106,6 +107,145 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) } /* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'P'); /* sending PREPARE protocol */ + + /* + * This should only ever happen for two-phase commit transactions. In which case we + * expect to have a valid GID. + */ + Assert(rbtxn_prepared(txn)); + Assert(txn->gid != NULL); + + /* + * Flags are determined from the state of the transaction. We know we + * always get PREPARE first and then [COMMIT|ROLLBACK] PREPARED, so if + * it's already marked as committed then it has to be COMMIT PREPARED (and + * likewise for abort / ROLLBACK PREPARED). + */ + if (rbtxn_commit_prepared(txn)) + flags = LOGICALREP_IS_COMMIT_PREPARED; + else if (rbtxn_rollback_prepared(txn)) + flags = LOGICALREP_IS_ROLLBACK_PREPARED; + else + flags = LOGICALREP_IS_PREPARE; + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPrepareData * prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (!PrepareFlagsAreValid(flags)) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* set the action (reuse the constants used for the flags) */ + prepare_data->prepare_type = flags; + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + prepare_data->end_lsn = pq_getmsgint64(in); + prepare_data->preparetime = pq_getmsgint64(in); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write STREAM PREPARE to the output stream. + */ +void +logicalrep_write_stream_prepare(StringInfo out, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'p'); /* sending STREAM PREPARE protocol */ + + /* + * This should only ever happen for two-phase transactions. In which case we + * expect to have a valid GID. + */ + Assert(rbtxn_prepared(txn)); + Assert(txn->gid != NULL); + + /* + * For streaming APIs only PREPARE is supported. [COMMIT|ROLLBACK] PREPARED + * uses non-streaming APIs + */ + flags = LOGICALREP_IS_PREPARE; + + /* transaction ID */ + Assert(TransactionIdIsValid(txn->xid)); + pq_sendint32(out, txn->xid); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read STREAM PREPARE from the output stream. + */ +TransactionId +logicalrep_read_stream_prepare(StringInfo in, LogicalRepPrepareData *prepare_data) +{ + TransactionId xid; + uint8 flags; + + xid = pq_getmsgint(in, 4); + + /* read flags */ + flags = pq_getmsgbyte(in); + + if (flags != LOGICALREP_IS_PREPARE) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* set the action (reuse the constants used for the flags) */ + prepare_data->prepare_type = flags; + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + prepare_data->end_lsn = pq_getmsgint64(in); + prepare_data->preparetime = pq_getmsgint64(in); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); + + return xid; +} + +/* * Write ORIGIN to the output stream. */ void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b0f27e0..593af82 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -244,6 +244,8 @@ static void apply_handle_tuple_routing(ResultRelInfo *relinfo, LogicalRepRelMapEntry *relmapentry, CmdType operation); +static int apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); + /* * Should this worker apply changes for given relation. * @@ -720,6 +722,7 @@ apply_handle_commit(StringInfo s) replorigin_session_origin_timestamp = commit_data.committime; CommitTransactionCommand(); + pgstat_report_stat(false); store_flush_position(commit_data.end_lsn); @@ -740,6 +743,225 @@ apply_handle_commit(StringInfo s) } /* + * Called from apply_handle_prepare to handle a PREPARE TRANSACTION. + */ +static void +apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data) +{ + Assert(prepare_data->prepare_lsn == remote_final_lsn); + + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) + { + /* End the earlier transaction and start a new one */ + BeginTransactionBlock(); + CommitTransactionCommand(); + StartTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->preparetime; + + PrepareTransactionBlock(prepare_data->gid); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data->end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Called from apply_handle_prepare to handle a COMMIT PREPARED of a previously + * PREPARED transaction. + */ +static void +apply_handle_commit_prepared_txn(LogicalRepPrepareData * prepare_data) +{ + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->preparetime; + + FinishPreparedTransaction(prepare_data->gid, true); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Called from apply_handle_prepare to handle a ROLLBACK PREPARED of a previously + * PREPARED TRANSACTION. + */ +static void +apply_handle_rollback_prepared_txn(LogicalRepPrepareData * prepare_data) +{ + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->preparetime; + + /* + * During logical decoding, on the apply side, it's possible that a + * prepared transaction got aborted while decoding. In that case, we stop + * the decoding and abort the transaction immediately. However the + * ROLLBACK prepared processing still reaches the subscriber. In that case + * it's ok to have a missing gid + */ + if (LookupGXact(prepare_data->gid)) + { + /* there is no transaction when ABORT/ROLLBACK PREPARED is called */ + ensure_transaction(); + FinishPreparedTransaction(prepare_data->gid, false); + CommitTransactionCommand(); + } + + pgstat_report_stat(false); + + store_flush_position(prepare_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepPrepareData prepare_data; + + logicalrep_read_prepare(s, &prepare_data); + + switch (prepare_data.prepare_type) + { + case LOGICALREP_IS_PREPARE: + apply_handle_prepare_txn(&prepare_data); + break; + + case LOGICALREP_IS_COMMIT_PREPARED: + apply_handle_commit_prepared_txn(&prepare_data); + break; + + case LOGICALREP_IS_ROLLBACK_PREPARED: + apply_handle_rollback_prepared_txn(&prepare_data); + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected type of prepare message: %d", + prepare_data.prepare_type))); + } +} + +/* + * Handle STREAM PREPARE. + * + * Logic is in two parts: + * 1. Replay all the spooled operations + * 2. Mark the transaction as prepared + */ +static void +apply_handle_stream_prepare(StringInfo s) +{ + int nchanges = 0; + LogicalRepPrepareData prepare_data; + TransactionId xid; + + Assert(!in_streamed_transaction); + + xid = logicalrep_read_stream_prepare(s, &prepare_data); + elog(DEBUG1, "received prepare for streamed transaction %u", xid); + + /* + * This should be a PREPARE only. The COMMIT PREPARED and ROLLBACK PREPARED + * for streaming are handled by the non-streaming APIs. + */ + Assert(prepare_data.prepare_type == LOGICALREP_IS_PREPARE); + + /* + * ======================================== + * 1. Replay all the spooled operations + * - This code is same as what apply_handle_stream_commit does for NON two-phase stream commit + * ======================================== + */ + + ensure_transaction(); + + nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn); + + /* + * ======================================== + * 2. Mark the transaction as prepared. + * - This code is same as what apply_handle_prepare_txn does for two-phase prepare of the non-streamed tx + * ======================================== + */ + BeginTransactionBlock(); + CommitTransactionCommand(); + StartTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.preparetime; + + PrepareTransactionBlock(prepare_data.gid); + CommitTransactionCommand(); + + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + + elog(DEBUG1, "apply_handle_stream_prepare_txn: replayed %d (all) changes.", nchanges); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle ORIGIN message. * * TODO, support tracking of multiple origins @@ -933,30 +1155,21 @@ apply_handle_stream_abort(StringInfo s) } /* - * Handle STREAM COMMIT message. + * Common spoolfile processing. + * Returns how many changes were applied. */ -static void -apply_handle_stream_commit(StringInfo s) +static int +apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) { - TransactionId xid; StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; bool found; - LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; - Assert(!in_streamed_transaction); - - xid = logicalrep_read_stream_commit(s, &commit_data); - - elog(DEBUG1, "received commit for streamed transaction %u", xid); - - ensure_transaction(); - /* * Allocate file handle and memory required to process all the messages in * TopTransactionContext to avoid them getting reset after each message is @@ -964,7 +1177,7 @@ apply_handle_stream_commit(StringInfo s) */ oldcxt = MemoryContextSwitchTo(TopTransactionContext); - /* open the spool file for the committed transaction */ + /* open the spool file for the committed/prepared transaction */ changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); ent = (StreamXidHash *) hash_search(xidhash, @@ -979,7 +1192,7 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; + remote_final_lsn = lsn; /* * Make sure the handle apply_dispatch methods are aware we're in a remote @@ -1048,6 +1261,35 @@ apply_handle_stream_commit(StringInfo s) BufFileClose(fd); + pfree(buffer); + pfree(s2.data); + + elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", + nchanges, path); + + return nchanges; +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + LogicalRepCommitData commit_data; + int nchanges = 0; + + Assert(!in_streamed_transaction); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + ensure_transaction(); + + nchanges = apply_spooled_messages(xid, commit_data.commit_lsn); + /* * Update origin state so we can restart streaming from correct position * in case of crash. @@ -1055,16 +1297,12 @@ apply_handle_stream_commit(StringInfo s) replorigin_session_origin_lsn = commit_data.end_lsn; replorigin_session_origin_timestamp = commit_data.committime; - pfree(buffer); - pfree(s2.data); - CommitTransactionCommand(); pgstat_report_stat(false); store_flush_position(commit_data.end_lsn); - elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", - nchanges, path); + elog(DEBUG1, "apply_handle_stream_commit: replayed %d (all) changes.", nchanges); in_remote_transaction = false; @@ -1904,10 +2142,14 @@ apply_dispatch(StringInfo s) case 'B': apply_handle_begin(s); break; - /* COMMIT */ + /* COMMIT/ABORT */ case 'C': apply_handle_commit(s); break; + /* PREPARE and [COMMIT|ROLLBACK] PREPARED */ + case 'P': + apply_handle_prepare(s); + break; /* INSERT */ case 'I': apply_handle_insert(s); @@ -1952,6 +2194,10 @@ apply_dispatch(StringInfo s) case 'c': apply_handle_stream_commit(s); break; + /* STREAM PREPARE */ + case 'p': + apply_handle_stream_prepare(s); + break; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9c997ae..b4f2c9a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -47,6 +47,12 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, @@ -57,7 +63,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); - +static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; static bool in_streaming; @@ -143,6 +150,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; + + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -153,6 +164,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_truncate_cb = pgoutput_truncate; + /* transaction streaming - two-phase commit */ + cb->stream_prepare_cb = pgoutput_stream_prepare_txn; } static void @@ -378,6 +391,48 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * PREPARE callback + */ +static void +pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT PREPARED callback + */ +static void +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * ROLLBACK PREPARED callback + */ +static void +pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* * Write the current schema of the relation and its ancestor (if any) if not * done yet. */ @@ -857,6 +912,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, } /* + * PREPARE callback (for streaming two-phase commit). + * + * Notify the downstream to prepare the transaction. + */ +static void +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* * Initialize the relation schema sync cache for a decoding session. * * The hash table is destroyed at the end of a decoding session. While diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 2ca71c3..b2628ea 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -44,6 +44,7 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, extern void StartPrepare(GlobalTransaction gxact); extern void EndPrepare(GlobalTransaction gxact); extern bool StandbyTransactionIdIsPrepared(TransactionId xid); +extern bool LookupGXact(const char *gid); extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 0c2cda2..ee38f89 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -87,6 +87,7 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +/* Commit (and abort) information */ typedef struct LogicalRepCommitData { XLogRecPtr commit_lsn; @@ -94,6 +95,28 @@ typedef struct LogicalRepCommitData TimestampTz committime; } LogicalRepCommitData; + +/* Prepare protocol information */ +typedef struct LogicalRepPrepareData +{ + uint8 prepare_type; + XLogRecPtr prepare_lsn; + XLogRecPtr end_lsn; + TimestampTz preparetime; + char gid[GIDSIZE]; +} LogicalRepPrepareData; + +/* types of the prepare protocol message */ +#define LOGICALREP_IS_PREPARE 0x01 +#define LOGICALREP_IS_COMMIT_PREPARED 0x02 +#define LOGICALREP_IS_ROLLBACK_PREPARED 0x04 + +/* prepare can be exactly one of PREPARE, [COMMIT|ROLLBACK] PREPARED*/ +#define PrepareFlagsAreValid(flags) \ + (((flags) == LOGICALREP_IS_PREPARE) || \ + ((flags) == LOGICALREP_IS_COMMIT_PREPARED) || \ + ((flags) == LOGICALREP_IS_ROLLBACK_PREPARED)) + extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data); @@ -101,6 +124,10 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepPrepareData * prepare_data); extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); @@ -144,4 +171,10 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern TransactionId logicalrep_read_stream_prepare(StringInfo in, + LogicalRepPrepareData *prepare_data); + + #endif /* LOGICAL_PROTO_H */ diff --git a/src/test/subscription/t/020_twophase.pl b/src/test/subscription/t/020_twophase.pl new file mode 100644 index 0000000..f489f47 --- /dev/null +++ b/src/test/subscription/t/020_twophase.pl @@ -0,0 +1,345 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 21; + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Create some pre-existing content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full2 (x text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full2 (x text)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub ADD TABLE tab_full, tab_full2"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub"); + +# Wait for subscriber to finish initialization +my $caughtup_query = + "SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +############################### +# check that 2PC gets replicated to subscriber +# then COMMIT PREPARED +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (11); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# check that 2PC gets replicated to subscriber +# then ROLLBACK PREPARED +############################### + +$node_publisher->safe_psql('postgres'," + BEGIN; + INSERT INTO tab_full VALUES (12); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets aborted on subscriber +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is aborted on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Check that ROLLBACK PREPARED is decoded properly on crash restart +# (publisher and subscriber crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# rollback post the restart +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are rolled back +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (12,13);"); +is($result, qq(0), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (publisher and subscriber crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (12,13);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (subscriber only crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (14); + INSERT INTO tab_full VALUES (15); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_subscriber->stop('immediate'); +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (14,15);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Check that COMMIT PREPARED is decoded properly on crash restart +# (publisher only crash) +############################### + +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (16); + INSERT INTO tab_full VALUES (17); + PREPARE TRANSACTION 'test_prepared_tab';"); + +$node_publisher->stop('immediate'); +$node_publisher->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (16,17);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +############################### +# Test nested transaction with 2PC +############################### + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (21); + SAVEPOINT sp_inner; + INSERT INTO tab_full VALUES (22); + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# COMMIT +$node_publisher->safe_psql('postgres', " + COMMIT PREPARED 'outer';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check the tx state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';"); +is($result, qq(0), 'transaction is ended on subscriber'); + +# check inserts are visible. 22 should be rolled back. 21 should be committed. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (21);"); +is($result, qq(1), 'Rows committed are on the subscriber'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a IN (22);"); +is($result, qq(0), 'Rows rolled back are not on the subscriber'); + +############################### +# Test using empty GID +############################### + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (51); + PREPARE TRANSACTION '';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = '';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# ROLLBACK +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED '';"); + +# check that 2PC gets aborted on subscriber +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = '';"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Test cases involving DDL. +############################### + +# TODO This can be added after we add functionality to replicate DDL changes to subscriber. + +############################### +# check all the cleanup +############################### + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/021_twophase_streaming.pl b/src/test/subscription/t/021_twophase_streaming.pl new file mode 100644 index 0000000..9a03b83 --- /dev/null +++ b/src/test/subscription/t/021_twophase_streaming.pl @@ -0,0 +1,521 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 28; + +############################### +# Test setup +############################### + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_publisher->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Create some pre-existing content on publisher (uses same DDL as 015_stream test) +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber (columns a and b are compatible with same table name on publisher) +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication (streaming = on) +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (streaming = on)"); + +# Wait for subscriber to finish initialization +my $caughtup_query = + "SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +############################### +# Check initial data was copied to subscriber +############################### +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +############################### +# Test 2PC PREPARE / COMMIT PREPARED +# 1. Data is streamed as a 2PC transaction. +# 2. Then do commit prepared. +# +# Expect all data is replicated on subscriber side after the commit. +############################### + +# check that 2PC gets replicated to subscriber +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# 2PC gets committed +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# Test 2PC PREPARE / ROLLBACK PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC +# 3. Do rollback prepared. +# +# Expect data rolls back leaving only the original 2 rows. +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# 2PC tx gets aborted +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is aborted on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Check that 2PC ROLLBACK PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then server crashes before the 2PC transaction is rolled back. +# 3. After servers are restarted the pending transaction is rolled back. +# +# Expect all inserted data is gone. +# (Note: both publisher and subscriber crash/restart) +############################### + +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# rollback post the restart +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are rolled back +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'); + +############################### +# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then server crashes before the 2PC transaction is committed. +# 3. After servers are restarted the pending transaction is committed. +# +# Expect all data is replicated on subscriber side after the commit. +# (Note: both publisher and subscriber crash/restart) +############################### + +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'); + +############################### +# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then 1 server crashes before the 2PC transaction is committed. +# 3. After servers are restarted the pending transaction is committed. +# +# Expect all data is replicated on subscriber side after the commit. +# (Note: only subscriber crashes) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# insert, update, delete enough data to cause streaming +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_subscriber->stop('immediate'); +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber'); + +############################### +# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then 1 server crashes before the 2PC transaction is committed. +# 3. After servers are restarted the pending transaction is committed. +# +# Expect all data is replicated on subscriber side after the commit. +# (Note: only publisher crashes) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# insert, update, delete enough data to cause streaming +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->stop('immediate'); +$node_publisher->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber'); + +############################### +# Do INSERT after the PREPARE but before ROLLBACK PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC. +# 3. A single row INSERT is done which is after the PREPARE +# 4. Then do a ROLLBACK PREPARED. +# +# Expect the 2PC data rolls back leaving only 3 rows on the subscriber. +# (the original 2 + inserted 1) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Insert a different record (now we are outside of the 2PC tx) +# Note: the 2PC tx still holds row locks so make sure this insert is for a separate primary key +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + +# 2PC tx gets aborted +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is aborted on subscriber, +# but the extra INSERT outside of the 2PC still was replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3|3|3), 'check the outside insert was copied to subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Do INSERT after the PREPARE but before COMMIT PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC. +# 3. A single row INSERT is done which is after the PREPARE. +# 4. Then do a COMMIT PREPARED. +# +# Expect 2PC data + the extra row are on the subscriber. +# (the 3334 + inserted 1 = 3335) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Insert a different record (now we are outside of the 2PC tx) +# Note: the 2PC tx still holds row locks so make sure this insert is for a separare primary key +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + +# 2PC tx gets committed +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3335|3335|3335), 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# Do DELETE after PREPARE but before COMMIT PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC. +# 3. A single row DELETE is done for one of the records that was inserted by the 2PC transaction +# 4. Then there is a COMMIT PREPARED. +# +# Expect all the 2PC data rows on the subscriber (since in fact delete at step 3 would do nothing +# because that record was not yet committed at the time of the delete). +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# DELETE one of the prepared 2PC records before they get committed (we are outside of the 2PC tx) +$node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a = 5"); + +# 2PC tx gets committed +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber. Nothing was deleted'); + +# confirm the "deleted" row was in fact not deleted +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM test_tab WHERE a = 5"); +is($result, qq(1), 'The row we deleted before the commit till exists on subscriber.'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# Try 2PC tx works using an empty GID literal +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', q{ + DELETE FROM test_tab WHERE a > 2;}); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION '';}); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts where gid = '';"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# 2PC tx gets committed +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED '';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber'); + +############################### +# Test cases involving DDL +############################### + +# TODO This can be added after we add functionality to replicate DDL changes to subscriber. + +############################### +# check all the cleanup +############################### + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 1.8.3.1