From d7a66453369bc4c05a0d561ac9f0f85246fe119a Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Wed, 4 Nov 2020 18:05:48 +1100 Subject: [PATCH v16] Support 2PC txn - pgoutput. This patch adds support in the pgoutput plugin and subscriber for handling of two-phase commits. Includes pgoutput changes. Includes subscriber changes. --- src/backend/access/transam/twophase.c | 27 ++++ src/backend/replication/logical/proto.c | 141 ++++++++++++++++- src/backend/replication/logical/worker.c | 227 ++++++++++++++++++++++++++++ src/backend/replication/pgoutput/pgoutput.c | 74 +++++++++ src/include/access/twophase.h | 1 + src/include/replication/logicalproto.h | 37 ++++- 6 files changed, 505 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 7940060..129afe9 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -548,6 +548,33 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held) } /* + * LookupGXact + * Check if the prepared transaction with the given GID is around + */ +bool +LookupGXact(const char *gid) +{ + int i; + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs */ + if (gxact->valid && strcmp(gxact->gid, gid) == 0) + { + found = true; + break; + } + + } + LWLockRelease(TwoPhaseStateLock); + return found; +} + +/* * LockGXact * Locate the prepared transaction and mark it busy for COMMIT or PREPARE. */ diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index fdb3118..26e43f7 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -78,7 +78,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT); - /* send the flags field (unused for now) */ + /* send the flags field */ pq_sendbyte(out, flags); /* send fields */ @@ -106,6 +106,145 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) } /* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE); + + /* + * This should only ever happen for two-phase commit transactions. In which case we + * expect to have a valid GID. + */ + Assert(rbtxn_prepared(txn)); + Assert(txn->gid != NULL); + + /* + * Flags are determined from the state of the transaction. We know we + * always get PREPARE first and then [COMMIT|ROLLBACK] PREPARED, so if + * it's already marked as committed then it has to be COMMIT PREPARED (and + * likewise for abort / ROLLBACK PREPARED). + */ + if (rbtxn_commit_prepared(txn)) + flags = LOGICALREP_IS_COMMIT_PREPARED; + else if (rbtxn_rollback_prepared(txn)) + flags = LOGICALREP_IS_ROLLBACK_PREPARED; + else + flags = LOGICALREP_IS_PREPARE; + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPrepareData * prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (!PrepareFlagsAreValid(flags)) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* set the action (reuse the constants used for the flags) */ + prepare_data->prepare_type = flags; + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + prepare_data->end_lsn = pq_getmsgint64(in); + prepare_data->preparetime = pq_getmsgint64(in); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write STREAM PREPARE to the output stream. + */ +void +logicalrep_write_stream_prepare(StringInfo out, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_PREPARE); + + /* + * This should only ever happen for two-phase transactions. In which case we + * expect to have a valid GID. + */ + Assert(rbtxn_prepared(txn)); + Assert(txn->gid != NULL); + + /* + * For streaming APIs only PREPARE is supported. [COMMIT|ROLLBACK] PREPARED + * uses non-streaming APIs + */ + flags = LOGICALREP_IS_PREPARE; + + /* transaction ID */ + Assert(TransactionIdIsValid(txn->xid)); + pq_sendint32(out, txn->xid); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read STREAM PREPARE from the output stream. + */ +TransactionId +logicalrep_read_stream_prepare(StringInfo in, LogicalRepPrepareData *prepare_data) +{ + TransactionId xid; + uint8 flags; + + xid = pq_getmsgint(in, 4); + + /* read flags */ + flags = pq_getmsgbyte(in); + + if (flags != LOGICALREP_IS_PREPARE) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* set the action (reuse the constants used for the flags) */ + prepare_data->prepare_type = flags; + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + prepare_data->end_lsn = pq_getmsgint64(in); + prepare_data->preparetime = pq_getmsgint64(in); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); + + return xid; +} + +/* * Write ORIGIN to the output stream. */ void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d282336..e99cf74 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -742,6 +742,225 @@ apply_handle_commit(StringInfo s) } /* + * Called from apply_handle_prepare to handle a PREPARE TRANSACTION. + */ +static void +apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data) +{ + Assert(prepare_data->prepare_lsn == remote_final_lsn); + + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) + { + /* End the earlier transaction and start a new one */ + BeginTransactionBlock(); + CommitTransactionCommand(); + StartTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->preparetime; + + PrepareTransactionBlock(prepare_data->gid); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data->end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Called from apply_handle_prepare to handle a COMMIT PREPARED of a previously + * PREPARED transaction. + */ +static void +apply_handle_commit_prepared_txn(LogicalRepPrepareData * prepare_data) +{ + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->preparetime; + + FinishPreparedTransaction(prepare_data->gid, true); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Called from apply_handle_prepare to handle a ROLLBACK PREPARED of a previously + * PREPARED TRANSACTION. + */ +static void +apply_handle_rollback_prepared_txn(LogicalRepPrepareData * prepare_data) +{ + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->preparetime; + + /* + * During logical decoding, on the apply side, it's possible that a + * prepared transaction got aborted while decoding. In that case, we stop + * the decoding and abort the transaction immediately. However the + * ROLLBACK prepared processing still reaches the subscriber. In that case + * it's ok to have a missing gid + */ + if (LookupGXact(prepare_data->gid)) + { + /* there is no transaction when ABORT/ROLLBACK PREPARED is called */ + ensure_transaction(); + FinishPreparedTransaction(prepare_data->gid, false); + CommitTransactionCommand(); + } + + pgstat_report_stat(false); + + store_flush_position(prepare_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepPrepareData prepare_data; + + logicalrep_read_prepare(s, &prepare_data); + + switch (prepare_data.prepare_type) + { + case LOGICALREP_IS_PREPARE: + apply_handle_prepare_txn(&prepare_data); + break; + + case LOGICALREP_IS_COMMIT_PREPARED: + apply_handle_commit_prepared_txn(&prepare_data); + break; + + case LOGICALREP_IS_ROLLBACK_PREPARED: + apply_handle_rollback_prepared_txn(&prepare_data); + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected type of prepare message: %d", + prepare_data.prepare_type))); + } +} + +/* + * Handle STREAM PREPARE. + * + * Logic is in two parts: + * 1. Replay all the spooled operations + * 2. Mark the transaction as prepared + */ +static void +apply_handle_stream_prepare(StringInfo s) +{ + int nchanges = 0; + LogicalRepPrepareData prepare_data; + TransactionId xid; + + Assert(!in_streamed_transaction); + + xid = logicalrep_read_stream_prepare(s, &prepare_data); + elog(DEBUG1, "received prepare for streamed transaction %u", xid); + + /* + * This should be a PREPARE only. The COMMIT PREPARED and ROLLBACK PREPARED + * for streaming are handled by the non-streaming APIs. + */ + Assert(prepare_data.prepare_type == LOGICALREP_IS_PREPARE); + + /* + * ======================================== + * 1. Replay all the spooled operations + * - This code is same as what apply_handle_stream_commit does for NON two-phase stream commit + * ======================================== + */ + + ensure_transaction(); + + nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn); + + /* + * ======================================== + * 2. Mark the transaction as prepared. + * - This code is same as what apply_handle_prepare_txn does for two-phase prepare of the non-streamed tx + * ======================================== + */ + BeginTransactionBlock(); + CommitTransactionCommand(); + StartTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.preparetime; + + PrepareTransactionBlock(prepare_data.gid); + CommitTransactionCommand(); + + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + + elog(DEBUG1, "apply_handle_stream_prepare_txn: replayed %d (all) changes.", nchanges); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle ORIGIN message. * * TODO, support tracking of multiple origins @@ -1969,6 +2188,14 @@ apply_dispatch(StringInfo s) case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); return; + + case LOGICAL_REP_MSG_PREPARE: + apply_handle_prepare(s); + return; + + case LOGICAL_REP_MSG_STREAM_PREPARE: + apply_handle_stream_prepare(s); + return; } ereport(ERROR, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9c997ae..9f27234 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -47,6 +47,12 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, @@ -57,6 +63,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; static bool in_streaming; @@ -143,6 +151,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; + + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -153,6 +165,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_truncate_cb = pgoutput_truncate; + /* transaction streaming - two-phase commit */ + cb->stream_prepare_cb = pgoutput_stream_prepare_txn; } static void @@ -378,6 +392,48 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * PREPARE callback + */ +static void +pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT PREPARED callback + */ +static void +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * ROLLBACK PREPARED callback + */ +static void +pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* * Write the current schema of the relation and its ancestor (if any) if not * done yet. */ @@ -857,6 +913,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, } /* + * PREPARE callback (for streaming two-phase commit). + * + * Notify the downstream to prepare the transaction. + */ +static void +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* * Initialize the relation schema sync cache for a decoding session. * * The hash table is destroyed at the end of a decoding session. While diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 2ca71c3..b2628ea 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -44,6 +44,7 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, extern void StartPrepare(GlobalTransaction gxact); extern void EndPrepare(GlobalTransaction gxact); extern bool StandbyTransactionIdIsPrepared(TransactionId xid); +extern bool LookupGXact(const char *gid); extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index cca13da..d28292d 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -54,10 +54,12 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', - LOGICAL_REP_MSG_STREAM_ABORT = 'A' + LOGICAL_REP_MSG_STREAM_ABORT = 'A', + LOGICAL_REP_MSG_STREAM_PREPARE = 'p' } LogicalRepMsgType; /* @@ -114,6 +116,7 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +/* Commit (and abort) information */ typedef struct LogicalRepCommitData { XLogRecPtr commit_lsn; @@ -121,6 +124,28 @@ typedef struct LogicalRepCommitData TimestampTz committime; } LogicalRepCommitData; + +/* Prepare protocol information */ +typedef struct LogicalRepPrepareData +{ + uint8 prepare_type; + XLogRecPtr prepare_lsn; + XLogRecPtr end_lsn; + TimestampTz preparetime; + char gid[GIDSIZE]; +} LogicalRepPrepareData; + +/* types of the prepare protocol message */ +#define LOGICALREP_IS_PREPARE 0x01 +#define LOGICALREP_IS_COMMIT_PREPARED 0x02 +#define LOGICALREP_IS_ROLLBACK_PREPARED 0x04 + +/* prepare can be exactly one of PREPARE, [COMMIT|ROLLBACK] PREPARED*/ +#define PrepareFlagsAreValid(flags) \ + (((flags) == LOGICALREP_IS_PREPARE) || \ + ((flags) == LOGICALREP_IS_COMMIT_PREPARED) || \ + ((flags) == LOGICALREP_IS_ROLLBACK_PREPARED)) + extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data); @@ -128,6 +153,10 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepPrepareData * prepare_data); extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); @@ -171,4 +200,10 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern TransactionId logicalrep_read_stream_prepare(StringInfo in, + LogicalRepPrepareData *prepare_data); + + #endif /* LOGICAL_PROTO_H */ -- 1.8.3.1