diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out index 46e915d4ff..56c6e7287f 100644 --- a/contrib/test_decoding/expected/prepared.out +++ b/contrib/test_decoding/expected/prepared.out @@ -6,19 +6,84 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d init (1 row) -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); +-- Reused queries +\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');' +\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');' +\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');' -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +:get_with2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + PREPARE TRANSACTION 'test_prepared#1' +(3 rows) + +:get_no2pc + data +------ +(0 rows) + COMMIT PREPARED 'test_prepared#1'; +:get_with2pc + data +------ +(0 rows) + +:get_no2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + COMMIT +(3 rows) + INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +:get_no2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:2 + COMMIT +(3 rows) + +:get_with2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:2 + COMMIT + BEGIN + table public.test_prepared1: INSERT: id[integer]:3 + PREPARE TRANSACTION 'test_prepared#2' +(6 rows) + ROLLBACK PREPARED 'test_prepared#2'; +:get_no2pc + data +------ +(0 rows) + +:get_with2pc + data +------ +(0 rows) + INSERT INTO test_prepared1 VALUES (4); -- test prepared xact containing ddl BEGIN; @@ -26,49 +91,169 @@ INSERT INTO test_prepared1 VALUES (5); ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. -INSERT INTO test_prepared2 VALUES (7); -COMMIT PREPARED 'test_prepared#3'; --- make sure stuff still works -INSERT INTO test_prepared1 VALUES (8); -INSERT INTO test_prepared2 VALUES (9); --- cleanup -DROP TABLE test_prepared1; -DROP TABLE test_prepared2; --- show results -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); - data -------------------------------------------------------------------------- +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + relation | locktype | mode +-----------------+----------+--------------------- + test_prepared_1 | relation | RowExclusiveLock + test_prepared_1 | relation | AccessExclusiveLock +(2 rows) + +:get_no2pc + data +---------------------------------------------------- BEGIN - table public.test_prepared1: INSERT: id[integer]:1 + table public.test_prepared1: INSERT: id[integer]:4 COMMIT +(3 rows) + +:get_with2pc + data +---------------------------------------------------- BEGIN - table public.test_prepared1: INSERT: id[integer]:2 + table public.test_prepared1: INSERT: id[integer]:4 COMMIT +(3 rows) + +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. Our 2pc filter callback will skip decoding of xacts +-- with catalog changes at PREPARE time, so we don't decode it now. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- +INSERT INTO test_prepared2 VALUES (7); +:get_with2pc + data +---------------------------------------------------- BEGIN - table public.test_prepared1: INSERT: id[integer]:4 + table public.test_prepared2: INSERT: id[integer]:7 COMMIT +(3 rows) + +:get_no2pc + data +---------------------------------------------------- BEGIN table public.test_prepared2: INSERT: id[integer]:7 COMMIT +(3 rows) + +COMMIT PREPARED 'test_prepared#3'; +:get_no2pc + data +------------------------------------------------------------------------- BEGIN table public.test_prepared1: INSERT: id[integer]:5 table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' COMMIT +(4 rows) + +:get_with2pc + data +------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:5 + table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' + PREPARE TRANSACTION 'test_prepared#3'; + COMMIT PREPARED 'test_prepared#3'; +(5 rows) + +-- make sure stuff still works +INSERT INTO test_prepared1 VALUES (8); +INSERT INTO test_prepared2 VALUES (9); +:get_with2pc + data +-------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:8 data[text]:null + COMMIT + BEGIN + table public.test_prepared2: INSERT: id[integer]:9 + COMMIT +(6 rows) + +:get_no2pc + data +-------------------------------------------------------------------- BEGIN table public.test_prepared1: INSERT: id[integer]:8 data[text]:null COMMIT BEGIN table public.test_prepared2: INSERT: id[integer]:9 COMMIT -(22 rows) +(6 rows) + +-- If we do something that takes a strong lock on a catalog relation we need to +-- read in order to decode a transaction we deadlock; we can't finish decoding +-- until the lock is released, but we're waiting for decoding to finish so we +-- can make a commit/abort decision. +--- +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + relation | locktype | mode +----------+----------+------ +(0 rows) +-- Shouldn't see anything with 2pc decoding off +:get_no2pc + data +------ +(0 rows) + +-- If we try to decode it now we'll deadlock +SET statement_timeout = '10s'; +:get_with2pc_nofilter +-- FIXME we expect a timeout here, but it actually works... +ERROR: statement timed out + +RESET statement_timeout; +-- we can decode past it by skipping xacts with catalog changes +-- and let it be decoded after COMMIT PREPARED, though. +:get_with2pc + data +------ +(0 rows) + +COMMIT PREPARED 'test_prepared_lock'; +-- Both will work normally after we commit +:get_no2pc + data +---------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2' + COMMIT +(4 rows) + +:get_with2pc + data +------ +(0 rows) + +-- cleanup +DROP TABLE test_prepared1; +DROP TABLE test_prepared2; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot -------------------------- (1 row) +SELECT pg_drop_replication_slot('regression_slot_2pc'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql index e72639767e..a94503c8b8 100644 --- a/contrib/test_decoding/sql/prepared.sql +++ b/contrib/test_decoding/sql/prepared.sql @@ -1,22 +1,36 @@ -- predictability SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding'); -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); + +-- Reused queries +\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');' +\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');' +\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');' -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +:get_with2pc +:get_no2pc COMMIT PREPARED 'test_prepared#1'; +:get_with2pc +:get_no2pc INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +:get_no2pc +:get_with2pc ROLLBACK PREPARED 'test_prepared#2'; +:get_no2pc +:get_with2pc INSERT INTO test_prepared1 VALUES (4); @@ -27,24 +41,74 @@ ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + +:get_no2pc +:get_with2pc --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. Our 2pc filter callback will skip decoding of xacts +-- with catalog changes at PREPARE time, so we don't decode it now. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- INSERT INTO test_prepared2 VALUES (7); +:get_with2pc +:get_no2pc COMMIT PREPARED 'test_prepared#3'; +:get_no2pc +:get_with2pc -- make sure stuff still works INSERT INTO test_prepared1 VALUES (8); INSERT INTO test_prepared2 VALUES (9); +:get_with2pc +:get_no2pc + +-- If we do something that takes a strong lock on a catalog relation we need to +-- read in order to decode a transaction we deadlock; we can't finish decoding +-- until the lock is released, but we're waiting for decoding to finish so we +-- can make a commit/abort decision. +--- +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; + +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + +-- Shouldn't see anything with 2pc decoding off +:get_no2pc + +-- If we try to decode it now we'll deadlock +SET statement_timeout = '10s'; +:get_with2pc_nofilter +RESET statement_timeout; + +-- we can decode past it by skipping xacts with catalog changes +-- and let it be decoded after COMMIT PREPARED, though. +:get_with2pc + +COMMIT PREPARED 'test_prepared_lock'; + +-- Both will work normally after we commit +:get_no2pc +:get_with2pc -- cleanup DROP TABLE test_prepared1; DROP TABLE test_prepared2; --- show results -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); - SELECT pg_drop_replication_slot('regression_slot'); +SELECT pg_drop_replication_slot('regression_slot_2pc'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 135b3b7638..fb0deacfb3 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -24,6 +24,8 @@ #include "replication/message.h" #include "replication/origin.h" +#include "storage/procarray.h" + #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -46,6 +48,8 @@ typedef struct bool skip_empty_xacts; bool xact_wrote_changes; bool only_local; + bool twophase_decoding; + bool twophase_decode_with_catalog_changes; } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -68,6 +72,19 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static bool pg_filter_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + char *gid); +static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + void _PG_init(void) @@ -85,9 +102,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pg_decode_begin_txn; cb->change_cb = pg_decode_change; cb->commit_cb = pg_decode_commit_txn; + cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + + cb->filter_prepare_cb = pg_filter_prepare; + cb->prepare_cb = pg_decode_prepare_txn; + cb->commit_prepared_cb = pg_decode_commit_prepared_txn; + cb->abort_prepared_cb = pg_decode_abort_prepared_txn; } @@ -107,6 +130,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->twophase_decoding = false; + data->twophase_decode_with_catalog_changes = false; ctx->output_plugin_private = data; @@ -176,6 +201,27 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "twophase-decoding") == 0) + { + + if (elem->arg == NULL) + data->twophase_decoding = true; + else if (!parse_bool(strVal(elem->arg), &data->twophase_decoding)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } + else if (strcmp(elem->defname, "twophase-decode-with-catalog-changes") == 0) + { + if (elem->arg == NULL) + data->twophase_decode_with_catalog_changes = true; + else if (!parse_bool(strVal(elem->arg), &data->twophase_decode_with_catalog_changes)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -232,10 +278,163 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, return; OutputPluginPrepareWrite(ctx, true); + + appendStringInfoString(ctx->out, "COMMIT"); + if (data->include_xids) - appendStringInfo(ctx->out, "COMMIT %u", txn->xid); - else - appendStringInfoString(ctx->out, "COMMIT"); + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + + +/* Filter out unnecessary two-phase transactions */ +static bool +pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + char *gid) +{ + TestDecodingData *data = ctx->output_plugin_private; + + /* treat all transaction as one-phase */ + if (!data->twophase_decoding) + return true; + + /* + * Two-phase transactions that accessed catalog require special + * treatment. + * + * Right now we don't have a safe way to decode catalog changes made in + * prepared transaction that was already aborted by the time of + * decoding. + * + * That kind of problem arises only when we are trying to + * retrospectively decode aborted transactions with catalog changes - + * including if a transaction aborts while we're decoding it. If one + * wants to code distributed commit based on prepare decoding then + * commits/aborts will happend strictly after decoding will be + * completed, so it is possible to skip any checks/locks here. + * + * We'll also get stuck trying to acquire locks on catalog relations + * we need for decoding if the prepared xact holds a strong lock on + * one of them and we also need to decode row changes. + */ + if (txn->has_catalog_changes) + { + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + if (TransactionIdIsInProgress(txn->xid)) + { + /* + * For the sake of simplicity, by default we just + * ignore in-progess prepared transactions with catalog + * changes in this extension. If they abort during + * decoding then tuples we need to decode them may be + * overwritten while we're still decoding, causing + * wrong catalog lookups. + * + * It is possible to move that LWLockRelease() to + * pg_decode_prepare_txn() and allow decoding of + * running prepared tx, but such lock will prevent any + * 2pc transaction commit during decoding time. That + * can be a long time in case of lots of + * changes/inserts in that tx or if the downstream is + * slow/unresonsive. + * + * (Continuing to decode without the lock is unsafe, XXX) + */ + LWLockRelease(TwoPhaseStateLock); + return !data->twophase_decode_with_catalog_changes; + } + else if (TransactionIdDidAbort(txn->xid)) + { + /* + * Here we know that it is already aborted and there is + * not much sense in doing something with this + * transaction. Consequently ABORT PREPARED will be + * suppressed. + */ + LWLockRelease(TwoPhaseStateLock); + return true; + } + + LWLockRelease(TwoPhaseStateLock); + } + + return false; +} + + +/* PREPARE callback */ +static void +pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "PREPARE TRANSACTION %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* COMMIT PREPARED callback */ +static void +pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "COMMIT PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* ABORT PREPARED callback */ +static void +pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "ABORT PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); if (data->include_timestamp) appendStringInfo(ctx->out, " (at %s)", diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 3aafa79e52..8756e4ed18 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -100,8 +100,13 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data; parsed->twophase_xid = xl_twophase->xid; - data += sizeof(xl_xact_twophase); + + if (parsed->xinfo & XACT_XINFO_HAS_GID) + { + strcpy(parsed->twophase_gid, data); + data += strlen(parsed->twophase_gid) + 1; + } } if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) @@ -139,6 +144,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) data += sizeof(xl_xact_xinfo); } + if (parsed->xinfo & XACT_XINFO_HAS_DBINFO) + { + xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data; + + parsed->dbId = xl_dbinfo->dbId; + parsed->tsId = xl_dbinfo->tsId; + + data += sizeof(xl_xact_dbinfo); + } + if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS) { xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data; @@ -166,8 +181,26 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data; parsed->twophase_xid = xl_twophase->xid; - data += sizeof(xl_xact_twophase); + + if (parsed->xinfo & XACT_XINFO_HAS_GID) + { + strcpy(parsed->twophase_gid, data); + data += strlen(parsed->twophase_gid) + 1; + } + } + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + xl_xact_origin xl_origin; + + /* we're only guaranteed 4 byte alignment, so copy onto stack */ + memcpy(&xl_origin, data, sizeof(xl_origin)); + + parsed->origin_lsn = xl_origin.origin_lsn; + parsed->origin_timestamp = xl_origin.origin_timestamp; + + data += sizeof(xl_xact_origin); } } diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index b715152e8d..c764c6c22b 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -148,7 +148,6 @@ int max_prepared_xacts = 0; * Note that the max value of GIDSIZE must fit in the uint16 gidlen, * specified in TwoPhaseFileHeader. */ -#define GIDSIZE 200 typedef struct GlobalTransactionData { @@ -211,12 +210,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval); + bool initfileinval, + const char *gid); static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels); + RelFileNode *rels, + const char *gid); static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); static void RemoveGXact(GlobalTransaction gxact); @@ -898,7 +899,7 @@ TwoPhaseGetDummyProc(TransactionId xid) /* * Header for a 2PC state file */ -#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */ +#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */ typedef struct TwoPhaseFileHeader { @@ -914,6 +915,8 @@ typedef struct TwoPhaseFileHeader int32 ninvalmsgs; /* number of cache invalidation messages */ bool initfileinval; /* does relcache init file need invalidation? */ uint16 gidlen; /* length of the GID - GID follows the header */ + XLogRecPtr origin_lsn; /* lsn of this record at origin node */ + TimestampTz origin_timestamp; /* time of prepare at origin node */ } TwoPhaseFileHeader; /* @@ -1065,6 +1068,7 @@ EndPrepare(GlobalTransaction gxact) { TwoPhaseFileHeader *hdr; StateFileChunk *record; + bool replorigin; /* Add the end sentinel to the list of 2PC records */ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, @@ -1075,6 +1079,21 @@ EndPrepare(GlobalTransaction gxact) Assert(hdr->magic == TWOPHASE_MAGIC); hdr->total_len = records.total_len + sizeof(pg_crc32c); + replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin_session_origin != DoNotReplicateId); + + if (replorigin) + { + Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr); + hdr->origin_lsn = replorigin_session_origin_lsn; + hdr->origin_timestamp = replorigin_session_origin_timestamp; + } + else + { + hdr->origin_lsn = InvalidXLogRecPtr; + hdr->origin_timestamp = 0; + } + /* * If the data size exceeds MaxAllocSize, we won't be able to read it in * ReadTwoPhaseFile. Check for that now, rather than fail in the case @@ -1105,9 +1124,19 @@ EndPrepare(GlobalTransaction gxact) MyPgXact->delayChkpt = true; XLogBeginInsert(); + for (record = records.head; record != NULL; record = record->next) XLogRegisterData(record->data, record->len); + + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); + + if (replorigin) + /* Move LSNs forward for this replication origin */ + replorigin_session_advance(replorigin_session_origin_lsn, + gxact->prepare_end_lsn); + XLogFlush(gxact->prepare_end_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ @@ -1283,6 +1312,43 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) return buf; } +/* + * ParsePrepareRecord + */ +void +ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) +{ + TwoPhaseFileHeader *hdr; + char *bufptr; + + hdr = (TwoPhaseFileHeader *) xlrec; + bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader)); + + parsed->origin_lsn = hdr->origin_lsn; + parsed->origin_timestamp = hdr->origin_timestamp; + parsed->twophase_xid = hdr->xid; + parsed->dbId = hdr->database; + parsed->nsubxacts = hdr->nsubxacts; + parsed->ncommitrels = hdr->ncommitrels; + parsed->nabortrels = hdr->nabortrels; + parsed->nmsgs = hdr->ninvalmsgs; + + strncpy(parsed->twophase_gid, bufptr, hdr->gidlen); + bufptr += MAXALIGN(hdr->gidlen); + + parsed->subxacts = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + + parsed->commitrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + + parsed->abortrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + parsed->msgs = (SharedInvalidationMessage *) bufptr; + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); +} + /* * Reads 2PC data from xlog. During checkpoint this data will be moved to @@ -1435,11 +1501,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, - hdr->initfileinval); + hdr->initfileinval, gid); else RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, - hdr->nabortrels, abortrels); + hdr->nabortrels, abortrels, + gid); ProcArrayRemove(proc, latestXid); @@ -2165,7 +2232,8 @@ RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval) + bool initfileinval, + const char *gid) { XLogRecPtr recptr; TimestampTz committs = GetCurrentTimestamp(); @@ -2193,7 +2261,7 @@ RecordTransactionCommitPrepared(TransactionId xid, ninvalmsgs, invalmsgs, initfileinval, false, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); if (replorigin) @@ -2255,7 +2323,8 @@ RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels) + RelFileNode *rels, + const char *gid) { XLogRecPtr recptr; @@ -2278,7 +2347,7 @@ RecordTransactionAbortPrepared(TransactionId xid, nchildren, children, nrels, rels, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c06fabca10..e22622bfb2 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1226,7 +1226,7 @@ RecordTransactionCommit(void) nmsgs, invalMessages, RelcacheInitFileInval, forceSyncCommit, MyXactFlags, - InvalidTransactionId /* plain commit */ ); + InvalidTransactionId, NULL /* plain commit */ ); if (replorigin) /* Move LSNs forward for this replication origin */ @@ -1578,7 +1578,8 @@ RecordTransactionAbort(bool isSubXact) XactLogAbortRecord(xact_time, nchildren, children, nrels, rels, - MyXactFlags, InvalidTransactionId); + MyXactFlags, InvalidTransactionId, + NULL); /* * Report the latest async abort LSN, so that the WAL writer knows to @@ -5256,7 +5257,8 @@ XactLogCommitRecord(TimestampTz commit_time, int nrels, RelFileNode *rels, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_commit xlrec; xl_xact_xinfo xl_xinfo; @@ -5268,6 +5270,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5330,6 +5333,13 @@ XactLogCommitRecord(TimestampTz commit_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } } /* dump transaction origin information */ @@ -5380,8 +5390,13 @@ XactLogCommitRecord(TimestampTz commit_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + XLogRegisterData((char *) twophase_gid, gidlen); + } + if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); @@ -5401,15 +5416,19 @@ XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_abort xlrec; xl_xact_xinfo xl_xinfo; xl_xact_subxacts xl_subxacts; xl_xact_relfilenodes xl_relfilenodes; xl_xact_twophase xl_twophase; + xl_xact_dbinfo xl_dbinfo; + xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5421,7 +5440,6 @@ XactLogAbortRecord(TimestampTz abort_time, else info = XLOG_XACT_ABORT_PREPARED; - /* First figure out and collect all the information needed */ xlrec.xact_time = abort_time; @@ -5445,6 +5463,31 @@ XactLogAbortRecord(TimestampTz abort_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } + } + + if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; + xl_dbinfo.dbId = MyDatabaseId; + xl_dbinfo.tsId = MyDatabaseTableSpace; + } + + /* dump transaction origin information only for abort prepared */ + if ( (replorigin_session_origin != InvalidRepOriginId) && + TransactionIdIsValid(twophase_xid) && + XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; + + xl_origin.origin_lsn = replorigin_session_origin_lsn; + xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } if (xl_xinfo.xinfo != 0) @@ -5459,6 +5502,9 @@ XactLogAbortRecord(TimestampTz abort_time, if (xl_xinfo.xinfo != 0) XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) { XLogRegisterData((char *) (&xl_subxacts), @@ -5476,8 +5522,22 @@ XactLogAbortRecord(TimestampTz abort_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + XLogRegisterData((char *) twophase_gid, gidlen); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) + XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + + if (TransactionIdIsValid(twophase_xid)) + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + return XLogInsert(RM_XACT_ID, info); } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 486fd0c988..7b2eec2402 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -34,6 +34,7 @@ #include "access/xlogutils.h" #include "access/xlogreader.h" #include "access/xlogrecord.h" +#include "access/twophase.h" #include "catalog/pg_control.h" @@ -71,7 +72,9 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid); + xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -277,17 +280,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); - break; + /* check that output plugin capable of twophase decoding */ + if (!ctx->twophase_hadling) + { + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + break; + } + + /* ok, parse it */ + ParsePrepareRecord(XLogRecGetInfo(buf->record), + XLogRecGetData(buf->record), &parsed); + + /* does output plugin wants this particular transaction? */ + if (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid, + parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } @@ -551,8 +570,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * Process invalidation messages, even if we're not interested in the * transaction's contents, since the various caches need to always be * consistent. + * + * Also if that transaction was sent to prepare callback then both + * this function were called during prepare. */ - if (parsed->nmsgs > 0) + if (parsed->nmsgs > 0 && + !(TransactionIdIsValid(parsed->twophase_xid) && + ReorderBufferTxnIsPrepared(ctx->reorder, xid))) { ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, parsed->nmsgs, parsed->msgs); @@ -607,9 +631,81 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + if (TransactionIdIsValid(parsed->twophase_xid) && + ReorderBufferTxnIsPrepared(ctx->reorder, xid)) + { + /* + * We are processing COMMIT PREPARED and know that reorder buffer is + * empty. So we can skip use shortcut for coomiting bare xact. + */ + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); + } + else + { + /* replay actions of all transaction + subtransactions in order */ + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } +} + + +/* + * Decode PREPARE record. Same logic as in COMMIT, but diffent calls + * to SnapshotBuilder as we need to mark this transaction as commited + * instead of running to properly decode it. When prepared transation + * is decoded we mark it in snapshot as running again. + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed) +{ + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz commit_time = parsed->origin_timestamp; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + /* + * Process invalidation messages, even if we're not interested in the + * transaction's contents, since the various caches need to always be + * consistent. + */ + if (parsed->nmsgs > 0) + { + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + } + + SnapBuildPrepareTxnStart(ctx->snapshot_builder, buf->origptr, xid, + parsed->nsubxacts, parsed->subxacts); + + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + FilterByOrigin(ctx, origin_id)) + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); + } + ReorderBufferForget(ctx->reorder, xid, buf->origptr); + + return; + } + + /* tell the reorderbuffer about the surviving subtransactions */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid); + + SnapBuildPrepareTxnFinish(ctx->snapshot_builder, xid); } /* @@ -621,6 +717,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = 0; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + + /* + * If that is ROLLBACK PREPARED than send that to callbacks. + */ + if (TransactionIdIsValid(xid) && + !SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) && + parsed->dbId == ctx->slot->data.database && + !FilterByOrigin(ctx, origin_id) && + ReorderBufferTxnIsPrepared(ctx->reorder, xid)) + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, false); + return; + } for (i = 0; i < parsed->nsubxacts; i++) { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index bca585fc27..93ba3fbc5a 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -58,6 +58,14 @@ static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions bool is_init); static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); +static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + char *gid); +static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -124,6 +132,7 @@ StartupDecodingContext(List *output_plugin_options, MemoryContext context, old_context; LogicalDecodingContext *ctx; + int twophase_callbacks; /* shorter lines... */ slot = MyReplicationSlot; @@ -182,8 +191,25 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->filter_prepare = filter_prepare_cb_wrapper; + ctx->reorder->prepare = prepare_cb_wrapper; + ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; + ctx->reorder->abort_prepared = abort_prepared_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + /* check that plugin implements all necessary callbacks to perform 2PC */ + twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) + + (ctx->callbacks.commit_prepared_cb != NULL) + + (ctx->callbacks.abort_prepared_cb != NULL); + + ctx->twophase_hadling = (twophase_callbacks == 3); + + if (twophase_callbacks != 3 && twophase_callbacks != 0) + ereport(WARNING, + (errmsg("Output plugin registered only %d twophase callbacks out of 3. " + "Twophase transactions will be decoded as ordinary ones.", + twophase_callbacks))); + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -680,6 +706,93 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "prepare"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "commit_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { @@ -714,6 +827,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static bool +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, char *gid) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_prepare"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_prepare_cb(ctx, txn, gid); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + return ret; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9b126b2957..6952cbc28d 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -101,6 +101,66 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) commit_data->commit_lsn = pq_getmsgint64(in); commit_data->end_lsn = pq_getmsgint64(in); commit_data->committime = pq_getmsgint64(in); + + /* set gid to empty */ + commit_data->gid[0] = '\0'; +} + +/* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'P'); /* sending PREPARE protocol */ + + if (txn->txn_flags & TXN_COMMIT_PREPARED) + flags |= LOGICALREP_IS_COMMIT_PREPARED; + else if (txn->txn_flags & TXN_ROLLBACK_PREPARED) + flags |= LOGICALREP_IS_ROLLBACK_PREPARED; + else if (txn->txn_flags & TXN_PREPARE) + flags |= LOGICALREP_IS_PREPARE; + + if (flags == 0) + elog(ERROR, "unrecognized flags %u in [commit|rollback] prepare message", flags); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepCommitData *commit_data, uint8 *flags) +{ + /* read flags */ + uint8 prep_flags = pq_getmsgbyte(in); + + if (!(prep_flags & LOGICALREP_PREPARE_MASK)) + elog(ERROR, "unrecognized flags %u in prepare message", prep_flags); + + /* read fields */ + commit_data->commit_lsn = pq_getmsgint64(in); + commit_data->end_lsn = pq_getmsgint64(in); + commit_data->committime = pq_getmsgint64(in); + + /* read gid */ + strcpy(commit_data->gid, pq_getmsgstring(in)); + + /* set flags */ + *flags = prep_flags; } /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 0f607bab70..3d9598aab8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1307,25 +1307,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) * the top and subtransactions (using a k-way merge) and replay the changes in * lsn order. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, +static void +ReorderBufferCommitInternal(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; volatile Snapshot snapshot_now; volatile CommandId command_id = FirstCommandId; bool using_subtxn; ReorderBufferIterTXNState *volatile iterstate = NULL; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -1604,8 +1597,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + /* call commit or prepare callback */ + if (txn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -1632,8 +1628,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (snapshot_now->copied) ReorderBufferFreeSnap(rb, snapshot_now); - /* remove potential on-disk data, and deallocate */ - ReorderBufferCleanupTXN(rb, txn); + /* + * remove potential on-disk data, and deallocate or postpone that + * till the finish of two-phase tx + */ + if (!txn_prepared(txn)) + ReorderBufferCleanupTXN(rb, txn); } PG_CATCH(); { @@ -1667,6 +1667,125 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } /* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as one-phase later on commit. + */ +bool +ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + return rb->filter_prepare(rb, txn, gid); +} + + +/* + * Commit non-twophase transaction. See comments to ReorderBufferCommitInternal() + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Prepare twophase transaction. It calls ReorderBufferCommitInternal() + * since all transaction changes should be decoded on PREPARE. + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= TXN_PREPARE; + strcpy(txn->gid, gid); + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Check whether this transaction was sent as prepared to receiver. + * Called upon commit/abort prepared. + */ +bool +ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* + * If txn == NULL then presumably subscriber confirmed prepare + * but we are rebooted. + */ + return txn == NULL ? true : txn_prepared(txn); +} + +/* + * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, + true); + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + strcpy(txn->gid, gid); + + if (is_commit) + { + txn->txn_flags |= TXN_COMMIT_PREPARED; + rb->commit_prepared(rb, txn, commit_lsn); + } + else + { + txn->txn_flags |= TXN_ROLLBACK_PREPARED; + rb->abort_prepared(rb, txn, commit_lsn); + } + + ReorderBufferCleanupTXN(rb, txn); +} + +/* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. * diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index ad65b9831d..3ba6841770 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -901,7 +901,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) /* copy xids that still are interesting to workspace */ for (off = 0; off < builder->committed.xcnt; off++) { - if (NormalTransactionIdPrecedes(builder->committed.xip[off], + if (TransactionIdPrecedes(builder->committed.xip[off], builder->xmin)) ; /* remove */ else @@ -1079,6 +1079,52 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } +/* + * Just a wrapper to clarify DecodePrepare(). + * Right now we can't extract correct historic catalog data that + * was produced by aborted prepared transaction, so it work of + * decoding plugin to avoid such situation and here we just construct usual + * snapshot to able to decode prepare. + */ +void +SnapBuildPrepareTxnStart(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, + int nsubxacts, TransactionId *subxacts) +{ + SnapBuildCommitTxn(builder, lsn, xid, nsubxacts, subxacts); +} + + +/* + * When decoding of preppare is finished we want should exclude our xid + * from list of committed xids to have correct snapshot between prepare + * and commit. + * + * However, this is not sctrictly needed. Prepared transaction holds locks + * between prepare and commit so nodody can produce new version of our + * catalog tuples. In case of abort we will have this xid in array of + * commited xids, but it also will not cause a problem since checks of + * HeapTupleHeaderXminInvalid() in HeapTupleSatisfiesHistoricMVCC() + * have higher priority then checks for xip array. Anyway let's be consistent + * about definitions and delete this xid from xip array. + */ +void +SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid) +{ + TransactionId *search = bsearch(&xid, builder->committed.xip, + builder->committed.xcnt, sizeof(TransactionId), xidComparator); + + if (search == NULL) + return; + + /* delete that xid */ + memmove(search, search + 1, + ((builder->committed.xip + builder->committed.xcnt - 1) - search) * sizeof(TransactionId)); + builder->committed.xcnt--; + + /* update min/max */ + builder->xmin = builder->committed.xip[0]; + builder->xmax = builder->committed.xip[builder->committed.xcnt - 1]; +} /* ----------------------------------- * Snapshot building functions dealing with xlog records diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fa5d9bb120..f1e91efeec 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -487,6 +487,121 @@ apply_handle_commit(StringInfo s) pgstat_report_activity(STATE_IDLE, NULL); } +static void +apply_handle_prepare_txn(LogicalRepCommitData *commit_data) +{ + Assert(commit_data->commit_lsn == remote_final_lsn); + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) + { + /* End the earlier transaction and start a new one */ + BeginTransactionBlock(); + CommitTransactionCommand(); + StartTransactionCommand(); + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + + PrepareTransactionBlock(commit_data->gid); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + /* TODO: what to do here for prepared transactions?? */ + Assert(false); + } + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +static void +apply_handle_commit_prepared_txn(LogicalRepCommitData *commit_data) +{ + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + + FinishPreparedTransaction(commit_data->gid, true); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +static void +apply_handle_rollback_prepared_txn(LogicalRepCommitData *commit_data) +{ + /* there is no transaction when ABORT/ROLLBACK PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + + FinishPreparedTransaction(commit_data->gid, false); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepCommitData commit_data; + uint8 flags = 0; + + logicalrep_read_prepare(s, &commit_data, &flags); + + if (flags & LOGICALREP_IS_PREPARE) + apply_handle_prepare_txn(&commit_data); + else if (flags & LOGICALREP_IS_COMMIT_PREPARED) + apply_handle_commit_prepared_txn(&commit_data); + else if (flags & LOGICALREP_IS_ROLLBACK_PREPARED) + apply_handle_rollback_prepared_txn(&commit_data); + else + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("wrong [commit|rollback] prepare message"))); +} + /* * Handle ORIGIN message. * @@ -888,6 +1003,10 @@ apply_dispatch(StringInfo s) case 'C': apply_handle_commit(s); break; + /* [COMMIT|ROLLBACK] PREPARE */ + case 'P': + apply_handle_prepare(s); + break; /* INSERT */ case 'I': apply_handle_insert(s); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index c3126545b4..d55aa5b5a2 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -42,6 +42,14 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pgoutput_filter_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, char *gid); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; @@ -79,6 +87,12 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; cb->commit_cb = pgoutput_commit_txn; + + cb->filter_prepare_cb = pgoutput_filter_prepare; + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->abort_prepared_cb = pgoutput_abort_prepared_txn; + cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; } @@ -254,6 +268,47 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * PREPARE callback + */ +static void +pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT PREPARED callback + */ +static void +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} +/* + * PREPARE callback + */ +static void +pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* * Sends the decoded DML over wire. */ static void @@ -364,6 +419,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * Filter out unnecessary two-phase transactions. + * + * Currently, we forward all two-phase transactions + */ +static bool +pgoutput_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + char *gid) +{ + return false; +} + +/* * Currently we always forward. */ static bool diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 54dec4eeaf..11ff0511fd 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -15,6 +15,7 @@ #define TWOPHASE_H #include "access/xlogdefs.h" +#include "access/xact.h" #include "datatype/timestamp.h" #include "storage/lock.h" @@ -57,4 +58,6 @@ extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); +extern void ParsePrepareRecord(uint8 info, char *xlrec, + xl_xact_parsed_prepare *parsed); #endif /* TWOPHASE_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 118b0a8432..1f093fb7b4 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -21,6 +21,10 @@ #include "storage/sinval.h" #include "utils/datetime.h" +/* + * Maximum size of Global Transaction ID (including '\0'). + */ +#define GIDSIZE 200 /* * Xact isolation levels @@ -156,6 +160,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_TWOPHASE (1U << 4) #define XACT_XINFO_HAS_ORIGIN (1U << 5) #define XACT_XINFO_HAS_AE_LOCKS (1U << 6) +#define XACT_XINFO_HAS_GID (1U << 7) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -302,13 +307,40 @@ typedef struct xl_xact_parsed_commit SharedInvalidationMessage *msgs; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; XLogRecPtr origin_lsn; TimestampTz origin_timestamp; } xl_xact_parsed_commit; +typedef struct xl_xact_parsed_prepare +{ + Oid dbId; /* MyDatabaseId */ + + int nsubxacts; + TransactionId *subxacts; + + int ncommitrels; + RelFileNode *commitrels; + + int nabortrels; + RelFileNode *abortrels; + + int nmsgs; + SharedInvalidationMessage *msgs; + + TransactionId twophase_xid; + char twophase_gid[GIDSIZE]; + + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; +} xl_xact_parsed_prepare; + typedef struct xl_xact_parsed_abort { + Oid dbId; + Oid tsId; + TimestampTz xact_time; uint32 xinfo; @@ -319,6 +351,10 @@ typedef struct xl_xact_parsed_abort RelFileNode *xnodes; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; + + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; } xl_xact_parsed_abort; @@ -386,12 +422,14 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, int xactflags, - TransactionId twophase_xid); + TransactionId twophase_xid, const char *twophase_gid); extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - int xactflags, TransactionId twophase_xid); + int xactflags, TransactionId twophase_xid, + const char *twophase_gid); + extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 7f0e0fa881..4a1ca4a2b9 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -82,6 +82,11 @@ typedef struct LogicalDecodingContext bool prepared_write; XLogRecPtr write_location; TransactionId write_xid; + + /* + * Capabilities of decoding plugin used. + */ + bool twophase_hadling; } LogicalDecodingContext; @@ -111,5 +116,4 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); - #endif diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index a9736e1bf6..99f0c50de8 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -69,11 +69,18 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +#define LOGICALREP_IS_COMMIT 0x01 +#define LOGICALREP_IS_PREPARE 0x02 +#define LOGICALREP_IS_COMMIT_PREPARED 0x04 +#define LOGICALREP_IS_ROLLBACK_PREPARED 0x08 +#define LOGICALREP_PREPARE_MASK (LOGICALREP_IS_PREPARE | LOGICALREP_IS_COMMIT_PREPARED | LOGICALREP_IS_ROLLBACK_PREPARED) typedef struct LogicalRepCommitData { + uint8 flag; XLogRecPtr commit_lsn; XLogRecPtr end_lsn; TimestampTz committime; + char gid[GIDSIZE]; } LogicalRepCommitData; extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); @@ -81,8 +88,12 @@ extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data); extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepCommitData *commit_data, uint8 *flags); extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 26ff024882..11a7af7da8 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -68,6 +68,38 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, XLogRecPtr commit_lsn); /* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare + * and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED + * and sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + char *gid); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* * Called for the generic logical decoding messages. */ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, @@ -98,6 +130,10 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 86effe106b..ee18fa346b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "access/twophase.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -137,13 +138,28 @@ typedef struct ReorderBufferChange dlist_node node; } ReorderBufferChange; + +/* TODO: convert existing bools into flags later */ +/* values for txn_flags */ +#define TXN_HAS_CATALOG_CHANGES 0x0001 +#define TXN_IS_SUBXACT 0x0002 +#define TXN_PREPARE 0x0004 +#define TXN_COMMIT_PREPARED 0x0008 +#define TXN_ROLLBACK_PREPARED 0x0010 +#define txn_prepared(txn) (txn->txn_flags & TXN_PREPARE) + typedef struct ReorderBufferTXN { + int txn_flags; + /* * The transactions transaction id, can be a toplevel or sub xid. */ TransactionId xid; + /* In case of 2PC we need to pass GID to output plugin */ + char gid[GIDSIZE]; + /* did the TX have catalog changes */ bool has_catalog_changes; @@ -292,6 +308,29 @@ typedef void (*ReorderBufferCommitCB) ( ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +typedef bool (*ReorderBufferFilterPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + char *gid); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* abort prepared callback signature */ +typedef void (*ReorderBufferAbortPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + /* message callback signature */ typedef void (*ReorderBufferMessageCB) ( ReorderBuffer *rb, @@ -327,6 +366,10 @@ struct ReorderBuffer ReorderBufferBeginCB begin; ReorderBufferApplyChangeCB apply_change; ReorderBufferCommitCB commit; + ReorderBufferFilterPrepareCB filter_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferAbortPreparedCB abort_prepared; ReorderBufferMessageCB message; /* @@ -382,6 +425,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); @@ -405,6 +453,13 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid); +bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 7653717f83..7fcd479d8a 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -86,5 +86,9 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildPrepareTxnStart(SnapBuild *builder, XLogRecPtr lsn, + TransactionId xid, int nsubxacts, + TransactionId *subxacts); +extern void SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid); #endif /* SNAPBUILD_H */