v42/000755 000765 000024 00000000000 13706560046 013004 5ustar00dilipkumarstaff000000 000000 v42/v42-0002-Implement-streaming-mode-in-ReorderBuffer.patch000644 000765 000024 00000240354 13706560046 025026 0ustar00dilipkumarstaff000000 000000 From 6bfc0bc86f31e52ac0f6b52c5110c1f684df4549 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 15 Jul 2020 18:38:32 +0530 Subject: [PATCH v42 2/7] Implement streaming mode in ReorderBuffer. Instead of serializing the transaction to disk after reaching the logical_decoding_work_mem limit in memory, we consume the changes we have in memory and invoke new stream API methods. However, sometimes if we have incomplete toast or speculative insert we spill to the disk because we can not generate the complete tuple and stream. And, as soon as we get the complete tuple we stream the transaction including the serialized changes. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages. Now that we can stream in-progress transactions, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend or WALSender decoding a specific uncommitted transaction. The decoding logic on the receipt of such an sqlerrcode aborts the ongoing decoding and returns gracefully. We have ReorderBufferTXN pointer in each ReorderBufferChange by which we know which xact it belongs to. The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). We also provide a new option via SQL APIs to fetch the changes being streamed. Author: Dilip Kumar, Tomas Vondra, Amit Kapila Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/stream.out | 196 ++++ contrib/test_decoding/expected/truncate.out | 6 + contrib/test_decoding/sql/stream.sql | 21 + contrib/test_decoding/sql/truncate.sql | 1 + contrib/test_decoding/test_decoding.c | 13 + doc/src/sgml/logicaldecoding.sgml | 9 +- doc/src/sgml/test-decoding.sgml | 22 + src/backend/access/heap/heapam.c | 13 + src/backend/access/heap/heapam_visibility.c | 42 +- src/backend/access/index/genam.c | 53 + src/backend/access/table/tableam.c | 8 + src/backend/access/transam/xact.c | 19 + src/backend/replication/logical/decode.c | 17 +- src/backend/replication/logical/logical.c | 10 + .../replication/logical/reorderbuffer.c | 969 ++++++++++++++++-- src/include/access/heapam_xlog.h | 1 + src/include/access/tableam.h | 55 + src/include/access/xact.h | 4 + src/include/replication/logical.h | 1 + src/include/replication/reorderbuffer.h | 56 +- 21 files changed, 1412 insertions(+), 106 deletions(-) create mode 100644 contrib/test_decoding/expected/stream.out create mode 100644 contrib/test_decoding/sql/stream.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index f439c582a5..ed9a3d6c0e 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate + spill slot truncate stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out new file mode 100644 index 0000000000..272aab2de8 --- /dev/null +++ b/contrib/test_decoding/expected/stream.out @@ -0,0 +1,196 @@ +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE stream_test(data text); +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- streaming with subxact, nothing in main +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + ?column? +---------- + msg5 +(1 row) + +INSERT INTO stream_test SELECT repeat('a', 500) || g.i FROM generate_series(1, 150) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 150) g(i); +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1'); + data +---------------------------------------------------------- + opening a streamed block for transaction + streaming message: transactional: 1 prefix: test, sz: 50 + closing a streamed block for transaction + aborting streamed (sub)transaction + opening a streamed block for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + closing a streamed block for transaction + committing streamed transaction +(157 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/expected/truncate.out b/contrib/test_decoding/expected/truncate.out index 1cf2ae835c..e64d377214 100644 --- a/contrib/test_decoding/expected/truncate.out +++ b/contrib/test_decoding/expected/truncate.out @@ -25,3 +25,9 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc COMMIT (9 rows) +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql new file mode 100644 index 0000000000..73c5c987da --- /dev/null +++ b/contrib/test_decoding/sql/stream.sql @@ -0,0 +1,21 @@ +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE stream_test(data text); + +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- streaming with subxact, nothing in main +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); +INSERT INTO stream_test SELECT repeat('a', 500) || g.i FROM generate_series(1, 150) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 150) g(i); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1'); + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/sql/truncate.sql b/contrib/test_decoding/sql/truncate.sql index 5aecdf0881..5633854e0d 100644 --- a/contrib/test_decoding/sql/truncate.sql +++ b/contrib/test_decoding/sql/truncate.sql @@ -11,3 +11,4 @@ TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE; TRUNCATE tab1, tab2; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index dbef52a3af..d8e2b416c6 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -122,6 +122,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, { ListCell *option; TestDecodingData *data; + bool enable_streaming = false; data = palloc0(sizeof(TestDecodingData)); data->context = AllocSetContextCreate(ctx->context, @@ -212,6 +213,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "stream-changes") == 0) + { + if (elem->arg == NULL) + continue; + else if (!parse_bool(strVal(elem->arg), &enable_streaming)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -221,6 +232,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, elem->arg ? strVal(elem->arg) : "(null)"))); } } + + ctx->streaming &= enable_streaming; } /* cleanup this plugin's resources */ diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 791a62b57c..1571d71a5b 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -433,9 +433,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); - Any actions leading to transaction ID assignment are prohibited. That, among others, - includes writing to tables, performing DDL changes, and - calling pg_current_xact_id(). + Note that access to user catalog tables or regular system catalog tables + in the output plugins has to be done via the systable_* + scan APIs only. Access via the heap_* scan APIs will + error out. Additionally, any actions leading to transaction ID assignment + are prohibited. That, among others, includes writing to tables, performing + DDL changes, and calling pg_current_xact_id(). diff --git a/doc/src/sgml/test-decoding.sgml b/doc/src/sgml/test-decoding.sgml index 8356a3d67b..fe7c9783fa 100644 --- a/doc/src/sgml/test-decoding.sgml +++ b/doc/src/sgml/test-decoding.sgml @@ -39,4 +39,26 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'i + + We can also get the changes of the in-progress transaction and the typical + output, might be: + + +postgres[33712]=#* SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'stream-changes', '1'); + lsn | xid | data +-----------+-----+-------------------------------------------------- + 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503 + 0/16B21F8 | 503 | streaming change for TXN 503 + 0/16B2300 | 503 | streaming change for TXN 503 + 0/16B2408 | 503 | streaming change for TXN 503 + 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503 + 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503 + 0/16BECA8 | 503 | streaming change for TXN 503 + 0/16BEDB0 | 503 | streaming change for TXN 503 + 0/16BEEB8 | 503 | streaming change for TXN 503 + 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503 +(10 rows) + + + diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index d881f4cd46..33a45800b3 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1288,6 +1288,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg_internal("only heap AM is supported"))); + /* + * We don't expect direct calls to heap_getnext with valid CheckXidAlive + * for catalog or regular tables. See detailed comments in xact.c where + * these variables are declared. Normally we have such a check at tableam + * level API but this is called from many places so we need to ensure it + * here. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected heap_getnext call during logical decoding"); + /* Note: no locking manipulations needed */ if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE) @@ -1945,6 +1955,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, { xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; bufflags |= REGBUF_KEEP_DATA; + + if (IsToastRelation(relation)) + xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION; } XLogBeginInsert(); diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index dba10890aa..c77128087c 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -1571,8 +1571,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); + /* + * If we haven't resolved the combocid to cmin/cmax, that means we + * have not decoded the combocid yet. That means the cmin is + * definitely in the future, and we're not supposed to see the tuple + * yet. + * + * XXX This only applies to decoding of in-progress transactions. In + * regular logical decoding we only execute this code at commit time, + * at which point we should have seen all relevant combocids. So + * ideally, we should error out in this case but in practice, this + * won't happen. If we are too worried about this then we can add an + * elog inside ResolveCminCmaxDuringDecoding. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ if (!resolved) - elog(ERROR, "could not resolve cmin/cmax of catalog tuple"); + return false; Assert(cmin != InvalidCommandId); @@ -1642,10 +1659,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); - if (!resolved) - elog(ERROR, "could not resolve combocid to cmax"); - - Assert(cmax != InvalidCommandId); + /* + * If we haven't resolved the combocid to cmin/cmax, that means we + * have not decoded the combocid yet. That means the cmax is + * definitely in the future, and we're still supposed to see the + * tuple. + * + * XXX This only applies to decoding of in-progress transactions. In + * regular logical decoding we only execute this code at commit time, + * at which point we should have seen all relevant combocids. So + * ideally, we should error out in this case but in practice, this + * won't happen. If we are too worried about this then we can add an + * elog inside ResolveCminCmaxDuringDecoding. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ + if (!resolved || cmax == InvalidCommandId) + return true; if (cmax >= snapshot->curcid) return true; /* deleted after scan started */ diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index dfba5ae39a..9d9a70a354 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -28,6 +28,7 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/bufmgr.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -429,9 +430,36 @@ systable_beginscan(Relation heapRelation, sysscan->iscan = NULL; } + /* + * If CheckXidAlive is set then set a flag to indicate that system table + * scan is in-progress. See detailed comments in xact.c where these + * variables are declared. + */ + if (TransactionIdIsValid(CheckXidAlive)) + bsysscan = true; + return sysscan; } +/* + * HandleConcurrentAbort - Handle concurrent abort of the CheckXidAlive. + * + * Error out, if CheckXidAlive is aborted. We can't directly use + * TransactionIdDidAbort as after crash such transaction might not have been + * marked as aborted. See detailed comments in xact.c where the variable + * is declared. + */ +static inline void +HandleConcurrentAbort() +{ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); +} + /* * systable_getnext --- get next tuple in a heap-or-index scan * @@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan) } } + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return htup; } @@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) sysscan->slot, freshsnap); + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return result; } @@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan) if (sysscan->snapshot) UnregisterSnapshot(sysscan->snapshot); + /* + * Reset the sysbegin_called flag at the end of the systable scan. See + * detailed comments in xact.c where these variables are declared. + */ + if (TransactionIdIsValid(CheckXidAlive)) + bsysscan = false; + pfree(sysscan); } @@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) if (htup && sysscan->iscan->xs_recheck) elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return htup; } diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 4b2bb29559..a61e279d68 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -234,6 +234,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid) Relation rel = scan->rs_rd; const TableAmRoutine *tableam = rel->rd_tableam; + /* + * We don't expect direct calls to table_tuple_get_latest_tid with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding"); + /* * Since this can be called with user-supplied TID, don't trust the input * too much. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d4f7c29847..99722eea4b 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -82,6 +82,19 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; +/* + * CheckXidAlive is a xid value pointing to a possibly ongoing (sub) + * transaction.  Currently, it is used in logical decoding.  It's possible + * that such transactions can get aborted while the decoding is ongoing in + * which case we skip decoding that particular transaction. To ensure that we + * check whether the CheckXidAlive is aborted after fetching the tuple from + * system tables.  We also ensure that during logical decoding we never + * directly access the tableam or heap APIs because we are checking for the + * concurrent aborts only in systable_* APIs. + */ +TransactionId CheckXidAlive = InvalidTransactionId; +bool bsysscan = false; + /* * When running as a parallel worker, we place only a single * TransactionStateData on the parallel worker's state stack, and the XID @@ -2680,6 +2693,9 @@ AbortTransaction(void) /* Forget about any active REINDEX. */ ResetReindexState(s->nestingLevel); + /* Reset logical streaming state. */ + ResetLogicalStreamingState(); + /* If in parallel mode, clean up workers and exit parallel mode. */ if (IsInParallelMode()) { @@ -4982,6 +4998,9 @@ AbortSubTransaction(void) /* Forget about any active REINDEX. */ ResetReindexState(s->nestingLevel); + /* Reset logical streaming state. */ + ResetLogicalStreamingState(); + /* Exit from parallel mode, if necessary. */ if (IsInParallelMode()) { diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index f3a1c31a29..f21f61d5e1 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -724,7 +724,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, + xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); } /* @@ -791,7 +793,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -848,7 +851,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -884,7 +888,7 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) memcpy(change->data.truncate.relids, xlrec->relids, xlrec->nrelids * sizeof(Oid)); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); } /* @@ -984,7 +988,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = false; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); /* move to the next xl_multi_insert_tuple entry */ data += datalen; @@ -1022,7 +1026,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 05d24b93da..42f284b33f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1442,3 +1442,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +/* + * Clear logical streaming state during (sub)transaction abort. + */ +void +ResetLogicalStreamingState(void) +{ + CheckXidAlive = InvalidTransactionId; + bsysscan = false; +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ce6e62152f..c469536b5f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -100,6 +100,7 @@ #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/procarray.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/combocid.h" @@ -178,6 +179,21 @@ typedef struct ReorderBufferDiskChange /* data follows */ } ReorderBufferDiskChange; +#define IsSpecInsert(action) \ +( \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \ +) +#define IsSpecConfirm(action) \ +( \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) \ +) +#define IsInsertOrUpdate(action) \ +( \ + (((action) == REORDER_BUFFER_CHANGE_INSERT) || \ + ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \ +) + /* * Maximum number of changes kept in memory, per transaction. After that, * changes are spooled to disk. @@ -236,6 +252,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -244,6 +261,16 @@ static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid); +/* + * --------------------------------------- + * Streaming support functions + * --------------------------------------- + */ +static inline bool ReorderBufferCanStream(ReorderBuffer *rb); +static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb); +static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn); + /* --------------------------------------- * toast reassembly support * --------------------------------------- @@ -367,6 +394,9 @@ ReorderBufferGetTXN(ReorderBuffer *rb) dlist_init(&txn->tuplecids); dlist_init(&txn->subtxns); + /* InvalidCommandId is not zero, so set it explicitly */ + txn->command_id = InvalidCommandId; + return txn; } @@ -416,13 +446,15 @@ ReorderBufferGetChange(ReorderBuffer *rb) } /* - * Free an ReorderBufferChange. + * Free a ReorderBufferChange and update memory accounting, if requested. */ void -ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) +ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, + bool upd_mem) { /* update memory accounting info */ - ReorderBufferChangeMemoryUpdate(rb, change, false); + if (upd_mem) + ReorderBufferChangeMemoryUpdate(rb, change, false); /* free contained data */ switch (change->action) @@ -624,16 +656,101 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, } /* - * Queue a change into a transaction so it can be replayed upon commit. + * Record the partial change for the streaming of in-progress transactions. We + * can stream only complete changes so if we have a partial change like toast + * table insert or speculative then we mark such a 'txn' so that it can't be + * streamed. We also ensure that if the changes in such a 'txn' are above + * logical_decoding_work_mem threshold then we stream them as soon as we have a + * complete change. + */ +static void +ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, + bool toast_insert) +{ + ReorderBufferTXN *toptxn; + + /* + * The partial changes need to be processed only while streaming + * in-progress transactions. + */ + if (!ReorderBufferCanStream(rb)) + return; + + /* Get the top transaction. */ + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + + /* + * Set the toast insert bit whenever we get toast insert to indicate a partial + * change and clear it when we get the insert or update on main table (Both + * update and insert will do the insert in the toast table). + */ + if (toast_insert) + toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT; + else if (rbtxn_has_toast_insert(toptxn) && + IsInsertOrUpdate(change->action)) + toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT; + + /* + * Set the spec insert bit whenever we get the speculative insert to + * indicate the partial change and clear the same on speculative confirm. + */ + if (IsSpecInsert(change->action)) + toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT; + else if (IsSpecConfirm(change->action)) + { + /* + * Speculative confirm change must be preceded by speculative insertion. + */ + Assert(rbtxn_has_spec_insert(toptxn)); + toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT; + } + + /* + * Stream the transaction if it is serialized before and the changes are + * now complete in the top-level transaction. + * + * The reason for doing the streaming of such a transaction as soon as + * we get the complete change for it is that previously it would have + * reached the memory threshold and wouldn't get streamed because of + * incomplete changes. Delaying such transactions would increase apply + * lag for them. + */ + if (ReorderBufferCanStartStreaming(rb) && + !(rbtxn_has_incomplete_tuple(toptxn)) && + rbtxn_is_serialized(txn)) + ReorderBufferStreamTXN(rb, toptxn); +} + +/* + * Queue a change into a transaction so it can be replayed upon commit or will be + * streamed when we reach logical_decoding_work_mem threshold. */ void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, - ReorderBufferChange *change) + ReorderBufferChange *change, bool toast_insert) { ReorderBufferTXN *txn; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + /* + * While streaming the previous changes we have detected that the transaction + * is aborted. So there is no point in collecting further changes for it. + */ + if (txn->concurrent_abort) + { + /* + * We don't need to update memory accounting for this change as we have + * not added it to the queue yet. + */ + ReorderBufferReturnChange(rb, change, false); + return; + } + change->lsn = lsn; change->txn = txn; @@ -645,6 +762,9 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, /* update memory accounting information */ ReorderBufferChangeMemoryUpdate(rb, change, true); + /* process partial change */ + ReorderBufferProcessPartialChange(rb, txn, change, toast_insert); + /* check the memory limits and evict something if needed */ ReorderBufferCheckMemoryLimit(rb); } @@ -674,7 +794,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, change->data.msg.message = palloc(message_size); memcpy(change->data.msg.message, message, message_size); - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); MemoryContextSwitchTo(oldcontext); } @@ -763,6 +883,38 @@ AssertTXNLsnOrder(ReorderBuffer *rb) #endif } +/* + * AssertChangeLsnOrder + * + * Check ordering of changes in the (sub)transaction. + */ +static void +AssertChangeLsnOrder(ReorderBufferTXN *txn) +{ +#ifdef USE_ASSERT_CHECKING + dlist_iter iter; + XLogRecPtr prev_lsn = txn->first_lsn; + + dlist_foreach(iter, &txn->changes) + { + ReorderBufferChange *cur_change; + + cur_change = dlist_container(ReorderBufferChange, node, iter.cur); + + Assert(txn->first_lsn != InvalidXLogRecPtr); + Assert(cur_change->lsn != InvalidXLogRecPtr); + Assert(txn->first_lsn <= cur_change->lsn); + + if (txn->end_lsn != InvalidXLogRecPtr) + Assert(cur_change->lsn <= txn->end_lsn); + + Assert(prev_lsn <= cur_change->lsn); + + prev_lsn = cur_change->lsn; + } +#endif +} + /* * ReorderBufferGetOldestTXN * Return oldest transaction in reorderbuffer @@ -1018,6 +1170,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, *iter_state = NULL; + /* Check ordering of changes in the toplevel transaction. */ + AssertChangeLsnOrder(txn); + /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder @@ -1032,6 +1187,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + /* Check ordering of changes in this subtransaction. */ + AssertChangeLsnOrder(cur_txn); + if (cur_txn->nentries > 0) nr_txns++; } @@ -1148,7 +1306,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) { change = dlist_container(ReorderBufferChange, node, dlist_pop_head_node(&state->old_change)); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); Assert(dlist_is_empty(&state->old_change)); } @@ -1234,7 +1392,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, change = dlist_container(ReorderBufferChange, node, dlist_pop_head_node(&state->old_change)); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); Assert(dlist_is_empty(&state->old_change)); } @@ -1280,7 +1438,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Check we're not mixing changes from different transactions. */ Assert(change->txn == txn); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); } /* @@ -1297,7 +1455,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(change->txn == txn); Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); } /* @@ -1309,6 +1467,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) dlist_delete(&txn->base_snapshot_node); } + /* + * Cleanup the snapshot for the last streamed run. + */ + if (txn->snapshot_now != NULL) + { + Assert(rbtxn_is_streamed(txn)); + ReorderBufferFreeSnap(rb, txn->snapshot_now); + } + /* * Remove TXN from its containing list. * @@ -1334,6 +1501,91 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferReturnTXN(rb, txn); } +/* + * Discard changes from a transaction (and subtransactions), after streaming + * them. Keep the remaining info - transactions, tuplecids, invalidations and + * snapshots. + */ +static void +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + dlist_mutable_iter iter; + + /* cleanup subtransactions & their changes */ + dlist_foreach_modify(iter, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); + + /* + * Subtransactions are always associated to the toplevel TXN, even if + * they originally were happening inside another subtxn, so we won't + * ever recurse more than one level deep here. + */ + Assert(rbtxn_is_known_subxact(subtxn)); + Assert(subtxn->nsubtxns == 0); + + ReorderBufferTruncateTXN(rb, subtxn); + } + + /* cleanup changes in the toplevel txn */ + dlist_foreach_modify(iter, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + + /* remove the change from it's containing list */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change, true); + } + + /* + * Mark the transaction as streamed. + * + * The toplevel transaction, identified by (toptxn==NULL), is marked as + * streamed always, even if it does not contain any changes (that is, when + * all the changes are in subtransactions). + * + * For subtransactions, we only mark them as streamed when there are + * changes in them. + * + * We do it this way because of aborts - we don't want to send aborts for + * XIDs the downstream is not aware of. And of course, it always knows + * about the toplevel xact (we send the XID in all messages), but we never + * stream XIDs of empty subxacts. + */ + if ((!txn->toptxn) || (txn->nentries_mem != 0)) + txn->txn_flags |= RBTXN_IS_STREAMED; + + /* + * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any + * memory. We could also keep the hash table and update it with new ctid + * values, but this seems simpler and good enough for now. + */ + if (txn->tuplecid_hash != NULL) + { + hash_destroy(txn->tuplecid_hash); + txn->tuplecid_hash = NULL; + } + + /* If this txn is serialized then clean the disk space. */ + if (rbtxn_is_serialized(txn)) + { + ReorderBufferRestoreCleanup(rb, txn); + txn->txn_flags &= ~RBTXN_IS_SERIALIZED; + } + + /* also reset the number of entries in the transaction */ + txn->nentries_mem = 0; + txn->nentries = 0; +} + /* * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by * HeapTupleSatisfiesHistoricMVCC. @@ -1485,57 +1737,178 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) } /* - * Perform the replay of a transaction and its non-aborted subtransactions. - * - * Subtransactions previously have to be processed by - * ReorderBufferCommitChild(), even if previously assigned to the toplevel - * transaction with ReorderBufferAssignChild. - * - * We currently can only decode a transaction's contents when its commit - * record is read because that's the only place where we know about cache - * invalidations. Thus, once a toplevel commit is read, we iterate over the top - * and subtransactions (using a k-way merge) and replay the changes in lsn - * order. + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, - XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) +static void +ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) { - ReorderBufferTXN *txn; - volatile Snapshot snapshot_now; - volatile CommandId command_id = FirstCommandId; - bool using_subtxn; - ReorderBufferIterTXNState *volatile iterstate = NULL; + /* we should only call this for previously streamed transactions */ + Assert(rbtxn_is_streamed(txn)); - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); + ReorderBufferStreamTXN(rb, txn); - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; + rb->stream_commit(rb, txn, txn->final_lsn); - txn->final_lsn = commit_lsn; - txn->end_lsn = end_lsn; - txn->commit_time = commit_time; - txn->origin_id = origin_id; - txn->origin_lsn = origin_lsn; + ReorderBufferCleanupTXN(rb, txn); +} +/* + * Set xid for concurrent abort check + * + * While streaming an in-progress transaction there is a possibility that the + * (sub)transaction might get aborted concurrently. In such case if the + * (sub)transaction has catalog update then we might decode the tuple using + * wrong catalog version. So for detecting the concurrent abort we set + * CheckXidAlive to the current (sub)transaction's xid for which this change + * belongs to. And, during catalog scan we can check the status of the xid and + * if it is aborted we will report a specific error so that we can stop + * streaming current transaction and discard the already streamed changes on + * such an error. We might have already streamed some of the changes for the + * aborted (sub)transaction, but that is fine because when we decode the abort + * we will stream abort message to truncate the changes in the subscriber. + */ +static inline void +SetupCheckXidLive(TransactionId xid) +{ /* - * If this transaction has no snapshot, it didn't make any changes to the - * database, so there's nothing to decode. Note that - * ReorderBufferCommitChild will have transferred any snapshots from - * subtransactions if there were any. + * If the input transaction id is already set as a CheckXidAlive then + * nothing to do. */ - if (txn->base_snapshot == NULL) - { - Assert(txn->ninvalidations == 0); - ReorderBufferCleanupTXN(rb, txn); + if (TransactionIdEquals(CheckXidAlive, xid)) return; + + /* + * setup CheckXidAlive if it's not committed yet. We don't check if the + * xid aborted. That will happen during catalog access. + */ + if (!TransactionIdDidCommit(xid)) + CheckXidAlive = xid; + else + CheckXidAlive = InvalidTransactionId; +} + +/* + * Helper function for ReorderBufferProcessTXN for applying change. + */ +static inline void +ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change, + bool streaming) +{ + if (streaming) + rb->stream_change(rb, txn, relation, change); + else + rb->apply_change(rb, txn, relation, change); +} + +/* + * Helper function for ReorderBufferProcessTXN for applying the truncate + */ +static inline void +ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, + int nrelations, Relation *relations, + ReorderBufferChange *change, bool streaming) +{ + if (streaming) + rb->stream_truncate(rb, txn, nrelations, relations, change); + else + rb->apply_truncate(rb, txn, nrelations, relations, change); +} + +/* + * Helper function for ReorderBufferProcessTXN for applying the message + */ +static inline void +ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, bool streaming) +{ + if (streaming) + rb->stream_message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); + else + rb->message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); +} + +/* + * Function to store the command id and snapshot at the end of the current + * stream so that we can reuse the same while sending the next stream. + */ +static inline void +ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, + Snapshot snapshot_now, CommandId command_id) +{ + txn->command_id = command_id; + + /* Avoid copying if it's already copied. */ + if (snapshot_now->copied) + txn->snapshot_now = snapshot_now; + else + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); +} + +/* + * Helper function for ReorderBufferProcessTXN to handle the concurrent + * abort of the streaming transaction. This resets the TXN such that it + * can be used to stream the remaining data of transaction being processed. + */ +static void +ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + Snapshot snapshot_now, + CommandId command_id, + XLogRecPtr last_lsn, + ReorderBufferChange *specinsert) +{ + /* Discard the changes that we just streamed. */ + ReorderBufferTruncateTXN(rb, txn); + + /* Free all resources allocated for toast reconstruction */ + ReorderBufferToastReset(rb, txn); + + /* Return the spec insert change if it is not NULL */ + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert, true); + specinsert = NULL; } - snapshot_now = txn->base_snapshot; + /* Stop the stream. */ + rb->stream_stop(rb, txn, last_lsn); + + /* Remember the command ID and snapshot for the streaming run. */ + ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); +} + +/* + * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN. + * + * Send data of a transaction (and its subtransactions) to the + * output plugin. We iterate over the top and subtransactions (using a k-way + * merge) and replay the changes in lsn order. + * + * If streaming is true then data will be sent using stream API. + */ +static void +ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn, + volatile Snapshot snapshot_now, + volatile CommandId command_id, + bool streaming) +{ + bool using_subtxn; + MemoryContext ccxt = CurrentMemoryContext; + ReorderBufferIterTXNState *volatile iterstate = NULL; + volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr; + ReorderBufferChange *volatile specinsert = NULL; + volatile bool stream_started = false; + ReorderBufferTXN *volatile curtxn = NULL; /* build data to be able to lookup the CommandIds of catalog tuples */ ReorderBufferBuildTupleCidHash(rb, txn); @@ -1558,14 +1931,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_TRY(); { ReorderBufferChange *change; - ReorderBufferChange *specinsert = NULL; if (using_subtxn) - BeginInternalSubTransaction("replay"); + BeginInternalSubTransaction(streaming ? "stream" : "replay"); else StartTransactionCommand(); - rb->begin(rb, txn); + /* We only need to send begin/commit for non-streamed transactions. */ + if (!streaming) + rb->begin(rb, txn); ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) @@ -1573,6 +1947,36 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, Relation relation = NULL; Oid reloid; + /* + * We can't call start stream callback before processing first + * change. + */ + if (prev_lsn == InvalidXLogRecPtr) + { + if (streaming) + { + txn->origin_id = change->origin_id; + rb->stream_start(rb, txn, change->lsn); + stream_started = true; + } + } + + /* + * Enforce correct ordering of changes, merged from multiple + * subtransactions. The changes may have the same LSN due to + * MULTI_INSERT xlog records. + */ + Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn); + + prev_lsn = change->lsn; + + /* Set the xid for concurrent abort check. */ + if (streaming) + { + curtxn = change->txn; + SetupCheckXidLive(curtxn->xid); + } + switch (change->action) { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -1649,7 +2053,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (!IsToastRelation(relation)) { ReorderBufferToastReplace(rb, txn, relation, change); - rb->apply_change(rb, txn, relation, change); + ReorderBufferApplyChange(rb, txn, relation, change, + streaming); /* * Only clear reassembled toast chunks if we're sure @@ -1685,11 +2090,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, */ if (specinsert != NULL) { - ReorderBufferReturnChange(rb, specinsert); + ReorderBufferReturnChange(rb, specinsert, true); specinsert = NULL; } - if (relation != NULL) + if (RelationIsValid(relation)) { RelationClose(relation); relation = NULL; @@ -1714,7 +2119,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, /* clear out a pending (and thus failed) speculation */ if (specinsert != NULL) { - ReorderBufferReturnChange(rb, specinsert); + ReorderBufferReturnChange(rb, specinsert, true); specinsert = NULL; } @@ -1747,7 +2152,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, relations[nrelations++] = relation; } - rb->apply_truncate(rb, txn, nrelations, relations, change); + /* Apply the truncate. */ + ReorderBufferApplyTruncate(rb, txn, nrelations, + relations, change, + streaming); for (i = 0; i < nrelations; i++) RelationClose(relations[i]); @@ -1756,10 +2164,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } case REORDER_BUFFER_CHANGE_MESSAGE: - rb->message(rb, txn, change->lsn, true, - change->data.msg.prefix, - change->data.msg.message_size, - change->data.msg.message); + ReorderBufferApplyMessage(rb, txn, change, streaming); break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: @@ -1790,7 +2195,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, snapshot_now = change->data.snapshot; } - /* and continue with the new one */ SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); break; @@ -1837,7 +2241,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, */ if (specinsert) { - ReorderBufferReturnChange(rb, specinsert); + ReorderBufferReturnChange(rb, specinsert, true); specinsert = NULL; } @@ -1845,14 +2249,35 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + /* + * Done with current changes, send the last message for this set of + * changes depending upon streaming mode. + */ + if (streaming) + { + if (stream_started) + { + rb->stream_stop(rb, txn, prev_lsn); + stream_started = false; + } + } + else + rb->commit(rb, txn, commit_lsn); /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) elog(ERROR, "output plugin used XID %u", GetCurrentTransactionId()); + /* + * Remember the command ID and snapshot for the next set of changes in + * streaming mode. + */ + if (streaming) + ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); + else if (snapshot_now->copied) + ReorderBufferFreeSnap(rb, snapshot_now); + /* cleanup */ TeardownHistoricSnapshot(false); @@ -1870,14 +2295,27 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - if (snapshot_now->copied) - ReorderBufferFreeSnap(rb, snapshot_now); + /* + * If we are streaming the in-progress transaction then discard the + * changes that we just streamed, and mark the transactions as + * streamed (if they contained changes). Otherwise, remove all the + * changes and deallocate the ReorderBufferTXN. + */ + if (streaming) + { + ReorderBufferTruncateTXN(rb, txn); - /* remove potential on-disk data, and deallocate */ - ReorderBufferCleanupTXN(rb, txn); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + ReorderBufferCleanupTXN(rb, txn); } PG_CATCH(); { + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ if (iterstate) ReorderBufferIterTXNFinish(rb, iterstate); @@ -1896,15 +2334,106 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - if (snapshot_now->copied) - ReorderBufferFreeSnap(rb, snapshot_now); + /* + * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent + * abort of the (sub)transaction we are streaming. We need to do the + * cleanup and return gracefully on this error, see SetupCheckXidLive. + */ + if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) + { + /* + * This error can only occur when we are sending the data in + * streaming mode and the streaming in not finished yet. + */ + Assert(streaming); + Assert(stream_started); + + /* Cleanup the temporary error state. */ + FlushErrorState(); + FreeErrorData(errdata); + errdata = NULL; + curtxn->concurrent_abort = true; + + /* Reset the TXN so that it is allowed to stream remaining data. */ + ReorderBufferResetTXN(rb, txn, snapshot_now, + command_id, prev_lsn, + specinsert); + } + else + { + ReorderBufferCleanupTXN(rb, txn); + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + } + PG_END_TRY(); +} - /* remove potential on-disk data, and deallocate */ - ReorderBufferCleanupTXN(rb, txn); +/* + * Perform the replay of a transaction and its non-aborted subtransactions. + * + * Subtransactions previously have to be processed by + * ReorderBufferCommitChild(), even if previously assigned to the toplevel + * transaction with ReorderBufferAssignChild. + * + * This interface is called once a toplevel commit is read for both streamed + * as well as non-streamed transactions. + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + Snapshot snapshot_now; + CommandId command_id = FirstCommandId; - PG_RE_THROW(); + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + /* + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. + * + * XXX Called after everything (origin ID and LSN, ...) is stored in the + * transaction, so we don't pass that directly. + */ + if (rbtxn_is_streamed(txn)) + { + ReorderBufferStreamCommit(rb, txn); + return; } - PG_END_TRY(); + + /* + * If this transaction has no snapshot, it didn't make any changes to the + * database, so there's nothing to decode. Note that + * ReorderBufferCommitChild will have transferred any snapshots from + * subtransactions if there were any. + */ + if (txn->base_snapshot == NULL) + { + Assert(txn->ninvalidations == 0); + ReorderBufferCleanupTXN(rb, txn); + return; + } + + snapshot_now = txn->base_snapshot; + + /* Process and send the changes to output plugin. */ + ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now, + command_id, false); } /* @@ -1931,6 +2460,22 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + /* For streamed transactions notify the remote node about the abort. */ + if (rbtxn_is_streamed(txn)) + { + rb->stream_abort(rb, txn, lsn); + + /* + * We might have decoded changes for this transaction that could load + * the cache as per the current transaction's view (consider DDL's + * happened in this transaction). We don't want the decoding of future + * transactions to use those cache entries so execute invalidations. + */ + if (txn->ninvalidations > 0) + ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, + txn->invalidations); + } + /* cosmetic... */ txn->final_lsn = lsn; @@ -2000,6 +2545,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + /* For streamed transactions notify the remote node about the abort. */ + if (rbtxn_is_streamed(txn)) + rb->stream_abort(rb, txn, lsn); + /* cosmetic... */ txn->final_lsn = lsn; @@ -2082,7 +2631,7 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, change->data.snapshot = snap; change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* @@ -2131,12 +2680,21 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, change->data.command_id = cid; change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* - * Update the memory accounting info. We track memory used by the whole - * reorder buffer and the transaction containing the change. + * Update memory counters to account for the new or removed change. + * + * We update two counters - in the reorder buffer, and in the transaction + * containing the change. The reorder buffer counter allows us to quickly + * decide if we reached the memory limit, the transaction counter allows + * us to quickly pick the largest transaction for eviction. + * + * When streaming is enabled, we need to update the toplevel transaction + * counters instead - we don't really care about subtransactions as we + * can't stream them individually anyway, and we only pick toplevel + * transactions for eviction. So only toplevel transactions matter. */ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, @@ -2144,6 +2702,8 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, bool addition) { Size sz; + ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn = NULL; Assert(change->txn); @@ -2155,19 +2715,41 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) return; + txn = change->txn; + + /* If streaming supported, update the total size in top level as well. */ + if (ReorderBufferCanStream(rb)) + { + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + } + sz = ReorderBufferChangeSize(change); if (addition) { - change->txn->size += sz; + txn->size += sz; rb->size += sz; + + /* Update the total size in the top transaction. */ + if (toptxn) + toptxn->total_size += sz; } else { - Assert((rb->size >= sz) && (change->txn->size >= sz)); - change->txn->size -= sz; + Assert((rb->size >= sz) && (txn->size >= sz)); + txn->size -= sz; rb->size -= sz; + + /* Update the total size in the top transaction. */ + if (toptxn) + toptxn->total_size -= sz; } + + Assert(txn->size <= rb->size); + Assert((txn->size >= 0) && (rb->size >= 0)); } /* @@ -2196,6 +2778,7 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, change->lsn = lsn; change->txn = txn; change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; + change->txn = txn; dlist_push_tail(&txn->tuplecids, &change->node); txn->ntuplecids++; @@ -2387,6 +2970,51 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) return largest; } +/* + * Find the largest toplevel transaction to evict (by streaming). + * + * This can be seen as an optimized version of ReorderBufferLargestTXN, which + * should give us the same transaction (because we don't update memory account + * for subtransaction with streaming, so it's always 0). But we can simply + * iterate over the limited number of toplevel transactions. + * + * Note that, we skip transactions that contains incomplete changes. There + * is a scope of optimization here such that we can select the largest transaction + * which has complete changes. But that will make the code and design quite complex + * and that might not be worth the benefit. If we plan to stream the transactions + * that contains incomplete changes then we need to find a way to partially + * stream/truncate the transaction changes in-memory and build a mechanism to + * partially truncate the spilled files. Additionally, whenever we partially + * stream the transaction we need to maintain the last streamed lsn and next time + * we need to restore from that segment and the offset in WAL. As we stream the + * changes from the top transaction and restore them subtransaction wise, we need + * to even remember the subxact from where we streamed the last change. + */ +static ReorderBufferTXN * +ReorderBufferLargestTopTXN(ReorderBuffer *rb) +{ + dlist_iter iter; + Size largest_size = 0; + ReorderBufferTXN *largest = NULL; + + /* Find the largest top-level transaction. */ + dlist_foreach(iter, &rb->toplevel_by_lsn) + { + ReorderBufferTXN *txn; + + txn = dlist_container(ReorderBufferTXN, node, iter.cur); + + if ((largest != NULL || txn->total_size > largest_size) && + (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn))) + { + largest = txn; + largest_size = txn->total_size; + } + } + + return largest; +} + /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the largest (sub)transaction at-a-time to evict and spill its changes to @@ -2419,11 +3047,33 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { /* * Pick the largest transaction (or subtransaction) and evict it from - * memory by serializing it to disk. + * memory by streaming, if possible. Otherwise, spill to disk. */ - txn = ReorderBufferLargestTXN(rb); + if (ReorderBufferCanStartStreaming(rb) && + (txn = ReorderBufferLargestTopTXN(rb)) != NULL) + { + /* we know there has to be one, because the size is not zero */ + Assert(txn && !txn->toptxn); + Assert(txn->total_size > 0); + Assert(rb->size >= txn->total_size); - ReorderBufferSerializeTXN(rb, txn); + ReorderBufferStreamTXN(rb, txn); + } + else + { + /* + * Pick the largest transaction (or subtransaction) and evict it + * from memory by serializing it to disk. + */ + txn = ReorderBufferLargestTXN(rb); + + /* we know there has to be one, because the size is not zero */ + Assert(txn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); + + ReorderBufferSerializeTXN(rb, txn); + } /* * After eviction, the transaction should have no entries in memory, @@ -2501,7 +3151,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferSerializeChange(rb, txn, fd, change); dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); spilled++; } @@ -2713,6 +3363,136 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Assert(ondisk->change.action == change->action); } +/* Returns true, if the output plugin supports streaming, false, otherwise. */ +static inline bool +ReorderBufferCanStream(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + + return ctx->streaming; +} + +/* Returns true, if the streaming can be started now, false, otherwise. */ +static inline bool +ReorderBufferCanStartStreaming(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + SnapBuild *builder = ctx->snapshot_builder; + + /* + * We can't start streaming immediately even if the streaming is enabled + * because we previously decoded this transaction and now just are + * restarting. + */ + if (ReorderBufferCanStream(rb) && + !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr)) + { + /* We must have a consistent snapshot by this time */ + Assert(SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT); + return true; + } + + return false; +} + +/* + * Send data of a large transaction (and its subtransactions) to the + * output plugin, but using the stream API. + */ +static void +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + Snapshot snapshot_now; + CommandId command_id; + + /* We can never reach here for a sub transaction. */ + Assert(txn->toptxn == NULL); + + /* + * We can't make any assumptions about base snapshot here, similar to what + * ReorderBufferCommit() does. That relies on base_snapshot getting + * transferred from subxact in ReorderBufferCommitChild(), but that was + * not yet called as the transaction is in-progress. + * + * So just walk the subxacts and use the same logic here. But we only need + * to do that once, when the transaction is streamed for the first time. + * After that we need to reuse the snapshot from the previous run. + * + * Unlike DecodeCommit which adds xids of all the subtransactions in + * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here + * but we do add them to subxip array instead via ReorderBufferCopySnap. + * This allows the catalog changes made in subtransactions decoded till + * now to be visible. + */ + if (txn->snapshot_now == NULL) + { + dlist_iter subxact_i; + + /* make sure this transaction is streamed for the first time */ + Assert(!rbtxn_is_streamed(txn)); + + /* at the beginning we should have invalid command ID */ + Assert(txn->command_id == InvalidCommandId); + + dlist_foreach(subxact_i, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur); + ReorderBufferTransferSnapToParent(txn, subtxn); + } + + /* + * If this transaction has no snapshot, it didn't make any changes to + * the database till now, so there's nothing to decode. + */ + if (txn->base_snapshot == NULL) + { + Assert(txn->ninvalidations == 0); + return; + } + + command_id = FirstCommandId; + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot, + txn, command_id); + } + else + { + /* the transaction must have been already streamed */ + Assert(rbtxn_is_streamed(txn)); + + /* + * Nah, we already have snapshot from the previous streaming run. We + * assume new subxacts can't move the LSN backwards, and so can't beat + * the LSN condition in the previous branch (so no need to walk + * through subxacts again). In fact, we must not do that as we may be + * using snapshot half-way through the subxact. + */ + command_id = txn->command_id; + + /* + * We can not use txn->snapshot_now directly because after the last + * streaming run, we might have got some new sub-transactions. So we + * need to add them to the snapshot. + */ + snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now, + txn, command_id); + + /* Free the previously copied snapshot. */ + Assert(txn->snapshot_now->copied); + ReorderBufferFreeSnap(rb, txn->snapshot_now); + txn->snapshot_now = NULL; + } + + /* Process and send the changes to output plugin. */ + ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, + command_id, true); + + Assert(dlist_is_empty(&txn->changes)); + Assert(txn->nentries == 0); + Assert(txn->nentries_mem == 0); +} + /* * Size of a change in memory. */ @@ -2813,7 +3593,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, dlist_container(ReorderBufferChange, node, cleanup_iter.cur); dlist_delete(&cleanup->node); - ReorderBufferReturnChange(rb, cleanup); + ReorderBufferReturnChange(rb, cleanup, true); } txn->nentries_mem = 0; Assert(dlist_is_empty(&txn->changes)); @@ -3522,7 +4302,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn) dlist_container(ReorderBufferChange, node, it.cur); dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); } } @@ -3812,6 +4592,17 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, BlockNumber blockno; bool updated_mapping = false; + /* + * Return unresolved if tuplecid_data is not valid. That's because when + * streaming in-progress transactions we may run into tuples with the CID + * before actually decoding them. Think e.g. about INSERT followed by + * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the + * INSERT. So in such cases, we assume the CID is from the future + * command. + */ + if (tuplecid_data == NULL) + return false; + /* be careful about padding */ memset(&key, 0, sizeof(key)); diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 95d18cdb12..aa17f7df84 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -67,6 +67,7 @@ #define XLH_INSERT_LAST_IN_MULTI (1<<1) #define XLH_INSERT_IS_SPECULATIVE (1<<2) #define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3) +#define XLH_INSERT_ON_TOAST_RELATION (1<<4) /* * xl_heap_update flag values, 8 bits are available. diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 0d28f01ca9..b188427563 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -19,6 +19,7 @@ #include "access/relscan.h" #include "access/sdir.h" +#include "access/xact.h" #include "utils/guc.h" #include "utils/rel.h" #include "utils/snapshot.h" @@ -903,6 +904,15 @@ static inline bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) { slot->tts_tableOid = RelationGetRelid(sscan->rs_rd); + + /* + * We don't expect direct calls to table_scan_getnextslot with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_getnextslot call during logical decoding"); + return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot); } @@ -1017,6 +1027,13 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan, TupleTableSlot *slot, bool *call_again, bool *all_dead) { + /* + * We don't expect direct calls to table_index_fetch_tuple with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_index_fetch_tuple call during logical decoding"); return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot, slot, call_again, @@ -1056,6 +1073,14 @@ table_tuple_fetch_row_version(Relation rel, Snapshot snapshot, TupleTableSlot *slot) { + /* + * We don't expect direct calls to table_tuple_fetch_row_version with + * valid CheckXidAlive for catalog or regular tables. See detailed + * comments in xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_tuple_fetch_row_version call during logical decoding"); + return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot); } @@ -1713,6 +1738,14 @@ static inline bool table_scan_bitmap_next_block(TableScanDesc scan, struct TBMIterateResult *tbmres) { + /* + * We don't expect direct calls to table_scan_bitmap_next_block with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding"); + return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan, tbmres); } @@ -1730,6 +1763,14 @@ table_scan_bitmap_next_tuple(TableScanDesc scan, struct TBMIterateResult *tbmres, TupleTableSlot *slot) { + /* + * We don't expect direct calls to table_scan_bitmap_next_tuple with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_bitmap_next_tuple call during logical decoding"); + return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan, tbmres, slot); @@ -1748,6 +1789,13 @@ static inline bool table_scan_sample_next_block(TableScanDesc scan, struct SampleScanState *scanstate) { + /* + * We don't expect direct calls to table_scan_sample_next_block with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_sample_next_block call during logical decoding"); return scan->rs_rd->rd_tableam->scan_sample_next_block(scan, scanstate); } @@ -1764,6 +1812,13 @@ table_scan_sample_next_tuple(TableScanDesc scan, struct SampleScanState *scanstate, TupleTableSlot *slot) { + /* + * We don't expect direct calls to table_scan_sample_next_tuple with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_sample_next_tuple call during logical decoding"); return scan->rs_rd->rd_tableam->scan_sample_next_tuple(scan, scanstate, slot); } diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 53480116a4..c18554bae2 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -81,6 +81,10 @@ typedef enum /* Synchronous commit level */ extern int synchronous_commit; +/* used during logical streaming of a transaction */ +extern TransactionId CheckXidAlive; +extern bool bsysscan; + /* * Miscellaneous flag bits to record events which occur on the top level * transaction. These flags are only persisted in MyXactFlags and are intended diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index deef31825d..b0fae9808b 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -121,5 +121,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); +extern void ResetLogicalStreamingState(void); #endif diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 42bc817648..1ae17d5f11 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -162,6 +162,9 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_STREAMED 0x0008 +#define RBTXN_HAS_TOAST_INSERT 0x0010 +#define RBTXN_HAS_SPEC_INSERT 0x0020 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -181,6 +184,40 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ ) +/* This transaction's changes has toast insert, without main table insert. */ +#define rbtxn_has_toast_insert(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \ +) +/* + * This transaction's changes has speculative insert, without speculative + * confirm. + */ +#define rbtxn_has_spec_insert(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \ +) + +/* Check whether this transaction has an incomplete change. */ +#define rbtxn_has_incomplete_tuple(txn) \ +( \ + rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \ +) + +/* + * Has this transaction been streamed to downstream? + * + * (It's not possible to deduce this from nentries and nentries_mem for + * various reasons. For example, all changes may be in subtransactions in + * which case we'd have nentries==0 for the toplevel one, which would say + * nothing about the streaming. So we maintain this flag, but only for the + * toplevel transaction.) + */ +#define rbtxn_is_streamed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ +) + typedef struct ReorderBufferTXN { /* See above */ @@ -248,6 +285,13 @@ typedef struct ReorderBufferTXN XLogRecPtr base_snapshot_lsn; dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ + /* + * Snapshot/CID from the previous streaming run. Only valid for already + * streamed transactions (NULL/InvalidCommandId otherwise). + */ + Snapshot snapshot_now; + CommandId command_id; + /* * How many ReorderBufferChange's do we have in this txn. * @@ -313,6 +357,12 @@ typedef struct ReorderBufferTXN * Size of this transaction (changes currently in memory, in bytes). */ Size size; + + /* Size of top-transaction including sub-transactions. */ + Size total_size; + + /* If we have detected concurrent abort then ignore future changes. */ + bool concurrent_abort; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ @@ -484,12 +534,14 @@ void ReorderBufferFree(ReorderBuffer *); ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); -void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); +void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool); Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids); void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); -void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, + XLogRecPtr lsn, ReorderBufferChange *, + bool toast_insert); void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); -- 2.23.0 v42/v42-0006-Add-TAP-test-for-streaming-vs.-DDL.patch000644 000765 000024 00000010616 13706560046 022774 0ustar00dilipkumarstaff000000 000000 From ad78f92f84ccb4e3a0557cf118d088cb7ece022a Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 26 Sep 2019 19:15:35 +0200 Subject: [PATCH v42 6/7] Add TAP test for streaming vs. DDL --- .../subscription/t/014_stream_through_ddl.pl | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 src/test/subscription/t/014_stream_through_ddl.pl diff --git a/src/test/subscription/t/014_stream_through_ddl.pl b/src/test/subscription/t/014_stream_through_ddl.pl new file mode 100644 index 0000000000..b8d78b1972 --- /dev/null +++ b/src/test/subscription/t/014_stream_through_ddl.pl @@ -0,0 +1,98 @@ +# Test streaming of large transaction with DDL, subtransactions and rollbacks. +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d text, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming=true)" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d) FROM test_tab"); +is($result, qq(2|0|0), 'check initial data was copied to subscriber'); + + +# large (streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 1000) s(i); +SAVEPOINT s2; +ALTER TABLE test_tab ADD COLUMN c INT; +INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(1001, 2000) s(i); +SAVEPOINT s3; +ALTER TABLE test_tab ADD COLUMN d text; +SAVEPOINT s4; +SAVEPOINT s5; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text) FROM generate_series(2001, 3000) s(i); +ALTER TABLE test_tab ADD COLUMN e INT; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(3001, 4000) s(i); +SAVEPOINT s10; +ALTER TABLE test_tab DROP d; +INSERT INTO test_tab SELECT i, md5(i::text), i, i FROM generate_series(4001, 5000) s(i); +ALTER TABLE test_tab ADD COLUMN d text; +ROLLBACK TO SAVEPOINT s10; +RELEASE SAVEPOINT s10; +SAVEPOINT s10; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(5001, 6000) s(i); +SAVEPOINT s6; +ALTER TABLE test_tab DROP d; +INSERT INTO test_tab SELECT i, md5(i::text), i, i FROM generate_series(6001, 7000) s(i); +SAVEPOINT s7; +ALTER TABLE test_tab ADD COLUMN d text; +INSERT INTO test_tab (a, b, c, d, e) SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(7001, 8000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(a), count(b), count(c), count(d), count(e) FROM test_tab"); +is($result, qq(7000|7000|7000|6000|4000|4000), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; -- 2.23.0 v42/v42-0004-Add-support-for-streaming-to-built-in-replicatio.patch000644 000765 000024 00000273335 13706560046 026303 0ustar00dilipkumarstaff000000 000000 From e4eeebf36fd51b3c3d2a110eb8a219879d1f3c3e Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 20 Jul 2020 10:37:49 +0530 Subject: [PATCH v42 4/7] Add support for streaming to built-in replication To add support for streaming of in-progress transactions into the built-in transaction, we need to do three things: * Extend the logical replication protocol, so identify in-progress transactions, and allow adding additional bits of information (e.g. XID of subtransactions). * Modify the output plugin (pgoutput) to implement the new stream API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle streamed in-progress transaction by spilling the data to disk and then replaying them on commit. We however must explicitly disable streaming replication during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover we don't have a replication connection open so we don't have where to send the data anyway. --- doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 11 + src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 49 +- src/backend/postmaster/pgstat.c | 12 + .../libpqwalreceiver/libpqwalreceiver.c | 4 + src/backend/replication/logical/proto.c | 140 ++- src/backend/replication/logical/worker.c | 946 +++++++++++++++++- src/backend/replication/pgoutput/pgoutput.c | 348 ++++++- src/include/catalog/pg_subscription.h | 3 + src/include/pgstat.h | 6 +- src/include/replication/logicalproto.h | 46 +- src/include/replication/walreceiver.h | 1 + src/test/subscription/t/009_stream_simple.pl | 86 ++ src/test/subscription/t/010_stream_subxact.pl | 102 ++ src/test/subscription/t/011_stream_ddl.pl | 95 ++ .../t/012_stream_subxact_abort.pl | 82 ++ .../t/013_stream_subxact_ddl_abort.pl | 84 ++ src/test/subscription/t/015_stream_binary.pl | 86 ++ 19 files changed, 2060 insertions(+), 47 deletions(-) create mode 100644 src/test/subscription/t/009_stream_simple.pl create mode 100644 src/test/subscription/t/010_stream_subxact.pl create mode 100644 src/test/subscription/t/011_stream_ddl.pl create mode 100644 src/test/subscription/t/012_stream_subxact_abort.pl create mode 100644 src/test/subscription/t/013_stream_subxact_ddl_abort.pl create mode 100644 src/test/subscription/t/015_stream_binary.pl diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 81c4e70cdf..a81bd54efc 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -165,8 +165,9 @@ ALTER SUBSCRIPTION name RENAME TO < . See there for more information. The parameters that can be altered are slot_name, - synchronous_commit, and - binary. + synchronous_commit, + binary and + streaming. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index cdb22c54fe..b7d7457d00 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -228,6 +228,17 @@ CREATE SUBSCRIPTION subscription_name + + streaming (boolean) + + + Specifies whether streaming of in-progress transactions should + be enabled for this subscription. By default, all transactions + are fully decoded on the publisher, and only then sent to the + subscriber as a whole. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 90bf5cf0c6..311d46225a 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->binary = subform->subbinary; + sub->stream = subform->substream; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 40b6377a85..4c58ad8b07 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -63,7 +63,8 @@ parse_subscription_options(List *options, bool *copy_data, char **synchronous_commit, bool *refresh, - bool *binary_given, bool *binary) + bool *binary_given, bool *binary, + bool *streaming_given, bool *streaming) { ListCell *lc; bool connect_given = false; @@ -99,6 +100,8 @@ parse_subscription_options(List *options, *binary_given = false; *binary = false; } + if (streaming) + *streaming_given = false; /* Parse options */ foreach(lc, options) @@ -194,6 +197,16 @@ parse_subscription_options(List *options, *binary_given = true; *binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "streaming") == 0 && streaming) + { + if (*streaming_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *streaming_given = true; + *streaming = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -337,6 +350,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + bool streaming; + bool streaming_given; char *synchronous_commit; char *conninfo; char *slotname; @@ -360,7 +375,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ©_data, &synchronous_commit, NULL, /* no "refresh" */ - &binary_given, &binary); + &binary_given, &binary, + &streaming_given, &streaming); /* * Since creating a replication slot is not transactional, rolling back @@ -439,6 +455,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); + if (streaming_given) + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(streaming); + else + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(false); + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -698,6 +721,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *synchronous_commit; bool binary_given; bool binary; + bool streaming_given; + bool streaming; parse_subscription_options(stmt->options, NULL, /* no "connect" */ @@ -707,7 +732,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, /* no "copy_data" */ &synchronous_commit, NULL, /* no "refresh" */ - &binary_given, &binary); + &binary_given, &binary, + &streaming_given, &streaming); if (slotname_given) { @@ -739,6 +765,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subbinary - 1] = true; } + if (streaming_given) + { + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(streaming); + replaces[Anum_pg_subscription_substream - 1] = true; + } + update_tuple = true; break; } @@ -756,7 +789,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, /* no "copy_data" */ NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ - NULL, NULL); /* no "binary" */ + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no streaming */ Assert(enabled_given); if (!sub->slotname && enabled) @@ -800,8 +834,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ©_data, NULL, /* no "synchronous_commit" */ &refresh, - NULL, NULL); /* no "binary" */ - + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -843,7 +877,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ©_data, NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ - NULL, NULL); /* no "binary" */ + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 6c97f68671..479e3cadf9 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4141,6 +4141,18 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_WAL_WRITE: event_name = "WALWrite"; break; + case WAIT_EVENT_LOGICAL_CHANGES_READ: + event_name = "ReorderLogicalChangesRead"; + break; + case WAIT_EVENT_LOGICAL_CHANGES_WRITE: + event_name = "ReorderLogicalChangesWrite"; + break; + case WAIT_EVENT_LOGICAL_SUBXACT_READ: + event_name = "ReorderLogicalSubxactRead"; + break; + case WAIT_EVENT_LOGICAL_SUBXACT_WRITE: + event_name = "ReorderLogicalSubxactWrite"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e9057230e4..a6101ace30 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -408,6 +408,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, "proto_version '%u'", options->proto.logical.proto_version); + if (options->proto.logical.streaming && + PQserverVersion(conn->streamConn) >= 140000) + appendStringInfo(&cmd, ", streaming 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9ff8097bf5..ff25924e68 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -138,10 +138,15 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary) +logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, + HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'I'); /* action INSERT */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -177,8 +182,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) * Write UPDATE to the output stream. */ void -logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary) +logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, + HeapTuple oldtuple, HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -186,6 +191,10 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -247,7 +256,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, * Write DELETE to the output stream. */ void -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary) +logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, + HeapTuple oldtuple, bool binary) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -255,6 +265,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool b pq_sendbyte(out, 'D'); /* action DELETE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -295,6 +309,7 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) */ void logicalrep_write_truncate(StringInfo out, + TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs) @@ -304,6 +319,10 @@ logicalrep_write_truncate(StringInfo out, pq_sendbyte(out, 'T'); /* action TRUNCATE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + pq_sendint32(out, nrelids); /* encode and send truncate flags */ @@ -346,12 +365,16 @@ logicalrep_read_truncate(StringInfo in, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) { char *relname; pq_sendbyte(out, 'R'); /* sending RELATION */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -396,7 +419,7 @@ logicalrep_read_rel(StringInfo in) * This function will always write base type info. */ void -logicalrep_write_typ(StringInfo out, Oid typoid) +logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) { Oid basetypoid = getBaseType(typoid); HeapTuple tup; @@ -404,6 +427,10 @@ logicalrep_write_typ(StringInfo out, Oid typoid) pq_sendbyte(out, 'Y'); /* sending TYPE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for type %u", basetypoid); @@ -720,3 +747,104 @@ logicalrep_read_namespace(StringInfo in) return nspname; } + +void +logicalrep_write_stream_start(StringInfo out, + TransactionId xid, bool first_segment) +{ + pq_sendbyte(out, 'S'); /* action STREAM START */ + + Assert(TransactionIdIsValid(xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); + + /* 1 if this is the first streaming segment for this xid */ + pq_sendbyte(out, first_segment ? 1 : 0); +} + +TransactionId +logicalrep_read_stream_start(StringInfo in, bool *first_segment) +{ + TransactionId xid; + + Assert(first_segment); + + xid = pq_getmsgint(in, 4); + *first_segment = (pq_getmsgbyte(in) == 1); + + return xid; +} + +void +logicalrep_write_stream_stop(StringInfo out) +{ + pq_sendbyte(out, 'E'); /* action STREAM END */ +} + +void +logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ + + Assert(TransactionIdIsValid(txn->xid)); + + /* transaction ID */ + pq_sendint32(out, txn->xid); + + /* send the flags field (unused for now) */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); +} + +TransactionId +logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) +{ + TransactionId xid; + uint8 flags; + + xid = pq_getmsgint(in, 4); + + /* read flags (unused for now) */ + flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit message", flags); + + /* read fields */ + commit_data->commit_lsn = pq_getmsgint64(in); + commit_data->end_lsn = pq_getmsgint64(in); + commit_data->committime = pq_getmsgint64(in); + + return xid; +} + +void +logicalrep_write_stream_abort(StringInfo out, TransactionId xid, + TransactionId subxid) +{ + pq_sendbyte(out, 'A'); /* action STREAM ABORT */ + + Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); + + /* transaction ID */ + pq_sendint32(out, xid); + pq_sendint32(out, subxid); +} + +void +logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, + TransactionId *subxid) +{ + Assert(xid && subxid); + + *xid = pq_getmsgint(in, 4); + *subxid = pq_getmsgint(in, 4); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2fcf2e61bc..98e7fd0576 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -18,11 +18,43 @@ * This module includes server facing code and shares libpqwalreceiver * module with walreceiver for providing the libpq specific functionality. * + * + * STREAMED TRANSACTIONS + * --------------------- + * + * Streamed transactions (large transactions exceeding a memory limit on the + * upstream) are not applied immediately, but instead the data is written + * to temporary files and then applied at once when the final commit arrives. + * + * Unlike the regular (non-streamed) case, handling streamed transactions has + * to handle aborts of both the toplevel transaction and subtransactions. This + * is achieved by tracking offsets for subtransactions, which is then used + * to truncate the file with serialized changes. + * + * The files are placed in tmp file directory by default, and the filenames + * include both the XID of the toplevel transaction and OID of the + * subscription. This is necessary so that different workers processing a + * remote transaction with the same XID don't interfere. + * + * We use BufFiles instead of using normal temporary files because (a) the + * BufFile infrastructure supports temporary files that exceed the OS file size + * limit, (b) provides a way for automatic clean up on the error and (c) provides + * a way to survive these files across local transactions and allow to open and + * close at stream start and close. We decided to use SharedFileSet + * infrastructure as without that it deletes the files on the closure of file + * and if we decide to keep stream files open across the start/stop stream then + * it will consume a lot of memory (more than 8K). Moreover, if we don't use + * SharedFileSet then we also need to invent a new way to pass filenames to + * BufFile APIs so that we can be allowed to open the file we desired across + * multiple stream open calls for the same transaction. *------------------------------------------------------------------------- */ #include "postgres.h" +#include +#include + #include "access/table.h" #include "access/tableam.h" #include "access/xact.h" @@ -33,7 +65,9 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "catalog/pg_tablespace.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/execPartition.h" @@ -63,7 +97,9 @@ #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" +#include "storage/buffile.h" #include "storage/bufmgr.h" +#include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -71,6 +107,7 @@ #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/catcache.h" +#include "utils/dynahash.h" #include "utils/datum.h" #include "utils/fmgroids.h" #include "utils/guc.h" @@ -99,9 +136,26 @@ typedef struct SlotErrCallbackArg int remote_attnum; } SlotErrCallbackArg; +/* + * Stream xid hash entry. Whenever we see a new xid we create this entry in the + * xidhash and along with it create the streaming file and store the fileset handle. + * The subxact file is created iff there is any suxact info under this xid. This + * entry is used on the subsequent streams for the xid to get the corresponding + * fileset handles. + */ +typedef struct StreamXidHash +{ + TransactionId xid; /* xid is the hash key and must be first */ + SharedFileSet *stream_fileset; /* shared file set for stream data */ + SharedFileSet *subxact_fileset; /* shared file set for subxact info */ +} StreamXidHash; + static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; +/* per stream context for streaming transactions. */ +static MemoryContext LogicalStreamingContext = NULL; + WalReceiverConn *wrconn = NULL; Subscription *MySubscription = NULL; @@ -110,12 +164,62 @@ bool MySubscriptionValid = false; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +/* fields valid only when processing streamed transaction */ +bool in_streamed_transaction = false; + +static TransactionId stream_xid = InvalidTransactionId; + +/* + * Hash table for storing the streaming xid information along with shared file + * set for streaming and subxact files. On every stream start we need to open + * the xid's files and for that we need the shared file set handle. So storing + * it in xid hash make it faster to search. + */ +static HTAB *xidhash = NULL; + +/* Buf file handle of the current streaming file. */ +static BufFile *stream_fd = NULL; + +typedef struct SubXactInfo +{ + TransactionId xid; /* XID of the subxact */ + int fileno; /* file number in the buffile */ + off_t offset; /* offset in the file */ +} SubXactInfo; + +static uint32 nsubxacts = 0; +static uint32 nsubxacts_max = 0; +static SubXactInfo *subxacts = NULL; +static TransactionId subxact_last = InvalidTransactionId; + +static void subxact_filename(char *path, Oid subid, TransactionId xid); +static void changes_filename(char *path, Oid subid, TransactionId xid); + +/* + * Information about subtransactions of a given toplevel transaction. + */ +static void subxact_info_write(Oid subid, TransactionId xid); +static void subxact_info_read(Oid subid, TransactionId xid); +static void subxact_info_add(TransactionId xid); +static inline void cleanup_subxact_info(void); + +/* + * Serialize and deserialize changes for a toplevel transaction. + */ +static void stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok); +static void stream_open_file(Oid subid, TransactionId xid, bool first); +static void stream_write_change(char action, StringInfo s); +static void stream_close_file(void); + static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); +/* prototype needed because of stream_commit */ +static void apply_dispatch(StringInfo s); + static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot); static void apply_handle_update_internal(ResultRelInfo *relinfo, @@ -187,6 +291,42 @@ ensure_transaction(void) return true; } +/* + * Handle streamed transactions. + * + * If in streaming mode (receiving a block of streamed transaction), we + * simply redirect it to a file for the proper toplevel transaction. + * + * Returns true for streamed transactions, false otherwise (regular mode). + */ +static bool +handle_streamed_transaction(const char action, StringInfo s) +{ + TransactionId xid; + + /* not in streaming mode */ + if (!in_streamed_transaction) + return false; + + Assert(stream_fd != NULL); + Assert(TransactionIdIsValid(stream_xid)); + + /* + * We should have received XID of the subxact as the first part of the + * message, so extract it. + */ + xid = pq_getmsgint(s, 4); + + Assert(TransactionIdIsValid(xid)); + + /* Add the new subxact to the array (unless already there). */ + subxact_info_add(xid); + + /* write the change to the current file */ + stream_write_change(action, s); + + return true; +} /* * Executor state preparation for evaluation of constraint expressions, @@ -612,16 +752,322 @@ static void apply_handle_origin(StringInfo s) { /* - * ORIGIN message can only come inside remote transaction and before any - * actual writes. + * ORIGIN message can only come inside remote transaction or + * inside streaming transaction and before any actual writes. */ - if (!in_remote_transaction || - (IsTransactionState() && !am_tablesync_worker())) + if ((!in_remote_transaction && !in_streamed_transaction) || + ((IsTransactionState() && !am_tablesync_worker()) && + !in_streamed_transaction)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("ORIGIN message sent out of order"))); } +/* + * Handle STREAM START message. + */ +static void +apply_handle_stream_start(StringInfo s) +{ + bool first_segment; + HASHCTL hash_ctl; + + Assert(!in_streamed_transaction); + + /* + * Start a transaction on stream start, this transaction will be committed + * on the stream stop. We need the transaction for handling the buffile, + * used for serializing the streaming data and subxact info. + */ + ensure_transaction(); + + /* notify handle methods we're processing a remote transaction */ + in_streamed_transaction = true; + + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + + /* Initialize the xidhash table if we haven't yet */ + if (xidhash == NULL) + { + hash_ctl.keysize = sizeof(TransactionId); + hash_ctl.entrysize = sizeof(StreamXidHash); + hash_ctl.hcxt = ApplyContext; + xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, + HASH_ELEM | HASH_CONTEXT); + } + + /* open the spool file for this transaction */ + stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); + + /* if this is not the first segment, open existing file */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, stream_xid); + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle STREAM STOP message. + */ +static void +apply_handle_stream_stop(StringInfo s) +{ + Assert(in_streamed_transaction); + + /* + * Close the file with serialized changes, and serialize information about + * subxacts for the toplevel transaction. + */ + subxact_info_write(MyLogicalRepWorker->subid, stream_xid); + stream_close_file(); + + /* We must be in a valid transaction state */ + Assert(IsTransactionState()); + + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + + in_streamed_transaction = false; + + /* Reset per-stream context */ + MemoryContextReset(LogicalStreamingContext); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle STREAM abort message. + */ +static void +apply_handle_stream_abort(StringInfo s) +{ + TransactionId xid; + TransactionId subxid; + + Assert(!in_streamed_transaction); + + logicalrep_read_stream_abort(s, &xid, &subxid); + + /* + * If the two XIDs are the same, it's in fact abort of toplevel xact, so + * just delete the files with serialized info. + */ + if (xid == subxid) + stream_cleanup_files(MyLogicalRepWorker->subid, xid, false); + else + { + /* + * OK, so it's a subxact. We need to read the subxact file for the + * toplevel transaction, determine the offset tracked for the subxact, + * and truncate the file with changes. We also remove the subxacts + * with higher offsets (or rather higher XIDs). + * + * We intentionally scan the array from the tail, because we're likely + * aborting a change for the most recent subtransactions. + * + * XXX Can we rely on the subxact XIDs arriving in sorted order? That + * would allow us to use binary search here. + * + * XXX Or perhaps we can rely on the aborts to arrive in the reverse + * order, i.e. from the inner-most subxact (when nested)? In which + * case we could simply check the last element. + */ + + int64 i; + int64 subidx; + BufFile *fd; + bool found = false; + char path[MAXPGPATH]; + StreamXidHash *ent; + + subidx = -1; + ensure_transaction(); + subxact_info_read(MyLogicalRepWorker->subid, xid); + + /* XXX optimize the search by bsearch on sorted data */ + for (i = nsubxacts; i > 0; i--) + { + if (subxacts[i - 1].xid == subxid) + { + subidx = (i - 1); + found = true; + break; + } + } + + /* + * If it's an empty sub-transaction then we will not find the subxid + * here so just cleanup the subxact info and return. + */ + if (!found) + { + /* Cleanup the subxact info */ + cleanup_subxact_info(); + CommitTransactionCommand(); + return; + } + + Assert((subidx >= 0) && (subidx < nsubxacts)); + + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + Assert(found); + + /* open the changes file */ + changes_filename(path, MyLogicalRepWorker->subid, xid); + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + + /* OK, truncate the file at the right offset */ + BufFileTruncateShared(fd, subxacts[subidx].fileno, subxacts[subidx].offset); + BufFileClose(fd); + + /* discard the subxacts added later */ + nsubxacts = subidx; + + /* write the updated subxact list */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + CommitTransactionCommand(); + } +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + StringInfoData s2; + int nchanges; + char path[MAXPGPATH]; + char *buffer = NULL; + bool found; + LogicalRepCommitData commit_data; + StreamXidHash *ent; + MemoryContext oldcxt; + BufFile *fd; + + Assert(!in_streamed_transaction); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + ensure_transaction(); + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + /* open the spool file for the committed transaction */ + changes_filename(path, MyLogicalRepWorker->subid, xid); + elog(DEBUG1, "replaying changes from file '%s'", path); + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + Assert(found); + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY); + + buffer = palloc(BLCKSZ); + initStringInfo(&s2); + + MemoryContextSwitchTo(oldcxt); + + remote_final_lsn = commit_data.commit_lsn; + + /* + * Make sure the handle apply_dispatch methods are aware we're in a remote + * transaction. + */ + in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); + + /* + * Read the entries one by one and pass them through the same logic as in + * apply_dispatch. + */ + nchanges = 0; + while (true) + { + int nbytes; + int len; + + /* read length of the on-disk record */ + nbytes = BufFileRead(fd, &len, sizeof(len)); + + /* have we reached end of the file? */ + if (nbytes == 0) + break; + + /* do we have a correct length? */ + if (nbytes != sizeof(len)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's changes file: %m"))); + + Assert(len > 0); + + /* make sure we have sufficiently large buffer */ + buffer = repalloc(buffer, len); + + /* and finally read the data into the buffer */ + if (BufFileRead(fd, buffer, len) != len) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's changes file: %m"))); + + /* copy the buffer to the stringinfo and call apply_dispatch */ + resetStringInfo(&s2); + appendBinaryStringInfo(&s2, buffer, len); + + /* Ensure we are reading the data into our memory context. */ + oldcxt = MemoryContextSwitchTo(ApplyMessageContext); + + apply_dispatch(&s2); + + MemoryContextReset(ApplyMessageContext); + + MemoryContextSwitchTo(oldcxt); + + nchanges++; + + if (nchanges % 1000 == 0) + elog(DEBUG1, "replayed %d changes from file '%s'", + nchanges, path); + } + + BufFileClose(fd); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = commit_data.end_lsn; + replorigin_session_origin_timestamp = commit_data.committime; + + pfree(buffer); + pfree(s2.data); + + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data.end_lsn); + + elog(DEBUG1, "replayed %d (all) changes from file '%s'", + nchanges, path); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid, false); +} + /* * Handle RELATION message. * @@ -635,6 +1081,9 @@ apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; + if (handle_streamed_transaction('R', s)) + return; + rel = logicalrep_read_rel(s); logicalrep_relmap_update(rel); } @@ -650,6 +1099,9 @@ apply_handle_type(StringInfo s) { LogicalRepTyp typ; + if (handle_streamed_transaction('Y', s)) + return; + logicalrep_read_typ(s, &typ); logicalrep_typmap_update(&typ); } @@ -686,6 +1138,9 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; + if (handle_streamed_transaction('I', s)) + return; + ensure_transaction(); relid = logicalrep_read_insert(s, &newtup); @@ -801,6 +1256,9 @@ apply_handle_update(StringInfo s) RangeTblEntry *target_rte; MemoryContext oldctx; + if (handle_streamed_transaction('U', s)) + return; + ensure_transaction(); relid = logicalrep_read_update(s, &has_oldtup, &oldtup, @@ -950,6 +1408,9 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; + if (handle_streamed_transaction('D', s)) + return; + ensure_transaction(); relid = logicalrep_read_delete(s, &oldtup); @@ -1320,6 +1781,9 @@ apply_handle_truncate(StringInfo s) List *relids_logged = NIL; ListCell *lc; + if (handle_streamed_transaction('T', s)) + return; + ensure_transaction(); remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); @@ -1458,6 +1922,22 @@ apply_dispatch(StringInfo s) case 'O': apply_handle_origin(s); break; + /* STREAM START */ + case 'S': + apply_handle_stream_start(s); + break; + /* STREAM END */ + case 'E': + apply_handle_stream_stop(s); + break; + /* STREAM ABORT */ + case 'A': + apply_handle_stream_abort(s); + break; + /* STREAM COMMIT */ + case 'c': + apply_handle_stream_commit(s); + break; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1570,6 +2050,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "ApplyMessageContext", ALLOCSET_DEFAULT_SIZES); + /* + * This memory context used for per stream data when streaming mode is + * enabled. This context is reset on each stream stop. + */ + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); + /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -1674,7 +2162,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); - if (!in_remote_transaction) + if (!in_remote_transaction && !in_streamed_transaction) { /* * If we didn't get any transactions for a while there might be @@ -1947,6 +2435,20 @@ maybe_reread_subscription(void) proc_exit(0); } + /* + * Exit if streaming option is changed. The launcher will start new + * worker. + */ + if (newsub->stream != MySubscription->stream) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will " + "restart because subscription's streaming option were changed", + MySubscription->name))); + + proc_exit(0); + } + /* Check for other changes that should never happen too. */ if (newsub->dbid != MySubscription->dbid) { @@ -1979,6 +2481,439 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) MySubscriptionValid = false; } +/* + * subxact_info_write + * Store information about subxacts for a toplevel transaction. + * + * For each subxact we store offset of it's first change in the main file. + * The file is always over-written as a whole. + * + * XXX We should only store subxacts that were not aborted yet. + */ +static void +subxact_info_write(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + bool found; + Size len; + StreamXidHash *ent; + BufFile *fd; + + Assert(TransactionIdIsValid(xid)); + + subxact_filename(path, subid, xid); + + /* find the xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + /* we must found the entry for its top transaction by this time */ + Assert(found); + + /* + * If there is no subtransaction then nothing to do, but if already have + * subxact file then delete that. + */ + if (nsubxacts == 0) + { + if (ent->subxact_fileset) + { + cleanup_subxact_info(); + BufFileDeleteShared(ent->subxact_fileset, path); + pfree(ent->subxact_fileset); + ent->subxact_fileset = NULL; + } + + return; + } + + /* + * Create the subxact file if it not already created, otherwise open the + * existing file. + */ + if (ent->subxact_fileset == NULL) + { + MemoryContext oldctx; + + /* + * We need to maintain shared fileset across multiple stream start/stop + * calls. So, need to allocate it in a persistent context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + ent->subxact_fileset = palloc(sizeof(SharedFileSet)); + SharedFileSetInit(ent->subxact_fileset, NULL); + MemoryContextSwitchTo(oldctx); + + fd = BufFileCreateShared(ent->subxact_fileset, path); + } + else + fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR); + + len = sizeof(SubXactInfo) * nsubxacts; + + /* Write the subxact count and subxact info */ + BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)); + BufFileWrite(fd, subxacts, len); + + BufFileClose(fd); + + /* + * But we free the memory allocated for subxact info. There might be one + * exceptional transaction with many subxacts, and we don't want to keep + * the memory allocated forever. + */ + cleanup_subxact_info(); +} + +/* + * subxact_info_read + * Restore information about subxacts of a streamed transaction. + * + * Read information about subxacts into the global variables. + */ +static void +subxact_info_read(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + bool found; + Size len; + BufFile *fd; + StreamXidHash *ent; + MemoryContext oldctx; + + Assert(TransactionIdIsValid(xid)); + Assert(!subxacts); + Assert(nsubxacts == 0); + Assert(nsubxacts_max == 0); + + /* Find the stream xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + + /* + * If subxact_fileset is not valid that mean we don't have any subxact + * info + */ + if (ent->subxact_fileset == NULL) + return; + + subxact_filename(path, subid, xid); + + fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY); + + /* read number of subxact items */ + if (BufFileRead(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's subxact file \"%s\": %m", + path))); + + len = sizeof(SubXactInfo) * nsubxacts; + + /* we keep the maximum as a power of 2 */ + nsubxacts_max = 1 << my_log2(nsubxacts); + + /* + * Allocate subxact information in the logical streaming context. We need + * this information during the complete stream so that we can add the sub + * transaction info to this. On stream stop we will flush this + * information to the subxact file and reset the logical streaming + * context. + */ + oldctx = MemoryContextSwitchTo(LogicalStreamingContext); + subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + + if ((len > 0) && ((BufFileRead(fd, subxacts, len)) != len)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's subxact file \"%s\": %m", + path))); + + BufFileClose(fd); +} + +/* + * subxact_info_add + * Add information about a subxact (offset in the main file). + */ +static void +subxact_info_add(TransactionId xid) +{ + int64 i; + + /* We must have a valid top level stream xid and a stream fd. */ + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + /* + * If the XID matches the toplevel transaction, we don't want to add it. + */ + if (stream_xid == xid) + return; + + /* + * In most cases we're checking the same subxact as we've already seen in + * the last call, so make sure to ignore it (this change comes later). + */ + if (subxact_last == xid) + return; + + /* OK, remember we're processing this XID. */ + subxact_last = xid; + + /* + * Check if the transaction is already present in the array of subxact. We + * intentionally scan the array from the tail, because we're likely adding + * a change for the most recent subtransactions. + * + * XXX Can we rely on the subxact XIDs arriving in sorted order? That + * would allow us to use binary search here. + */ + for (i = nsubxacts; i > 0; i--) + { + /* found, so we're done */ + if (subxacts[i - 1].xid == xid) + return; + } + + /* This is a new subxact, so we need to add it to the array. */ + if (nsubxacts == 0) + { + MemoryContext oldctx; + + nsubxacts_max = 128; + + /* Allocate this in per-stream context */ + oldctx = MemoryContextSwitchTo(LogicalStreamingContext); + subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + } + else if (nsubxacts == nsubxacts_max) + { + nsubxacts_max *= 2; + subxacts = repalloc(subxacts, nsubxacts_max * sizeof(SubXactInfo)); + } + + subxacts[nsubxacts].xid = xid; + + /* + * Get the current offset of the stream file and store it as offset of + * this subxact. + */ + BufFileTell(stream_fd, &subxacts[nsubxacts].fileno, + &subxacts[nsubxacts].offset); + + nsubxacts++; +} + +/* format filename for file containing the info about subxacts */ +static void +subxact_filename(char *path, Oid subid, TransactionId xid) +{ + snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid); +} + +/* format filename for file containing serialized changes */ +static inline void +changes_filename(char *path, Oid subid, TransactionId xid) +{ + snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid); +} + +/* + * stream_cleanup_files + * Cleanup files for a subscription / toplevel transaction. + * + * Remove files with serialized changes and subxact info for a particular + * toplevel transaction. Each subscription has a separate set of files. + * + * Note: The files may not exists, so handle ENOENT as non-error. + * + * missing_ok - don't report error for missing file is the flag is passed true. + */ +static void +stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok) +{ + char path[MAXPGPATH]; + StreamXidHash *ent; + + /* Remove the xid entry from the stream xid hash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_REMOVE, + NULL); + /* By this time we must have created the transaction entry */ + Assert(ent != NULL); + + /* Delete the change file and release the stream fileset memory */ + changes_filename(path, subid, xid); + BufFileDeleteShared(ent->stream_fileset, path); + pfree(ent->stream_fileset); + + /* Delete the subxact file and release the memory, if it exist */ + if (ent->subxact_fileset) + { + subxact_filename(path, subid, xid); + BufFileDeleteShared(ent->subxact_fileset, path); + pfree(ent->subxact_fileset); + } +} + +/* + * stream_open_file + * Open file we'll use to serialize changes for a toplevel transaction. + * + * Open a file for streamed changes from a toplevel transaction identified + * by stream_xid (global variable). If it's the first chunk of streamed + * changes for this transaction, initialize the shared fileset and create the + * buf file, otherwise open the previously created file. + * + * This can only be called at the beginning of a "streaming" block, i.e. + * between stream_start/stream_stop messages from the upstream. + */ +static void +stream_open_file(Oid subid, TransactionId xid, bool first_segment) +{ + char path[MAXPGPATH]; + bool found; + MemoryContext oldcxt; + StreamXidHash *ent; + + Assert(in_streamed_transaction); + Assert(OidIsValid(subid)); + Assert(TransactionIdIsValid(xid)); + Assert(stream_fd == NULL); + + /* create or find the xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_ENTER | HASH_FIND, + &found); + Assert(first_segment || found); + changes_filename(path, subid, xid); + elog(DEBUG1, "opening file '%s' for streamed changes", path); + + /* + * Create/open the buffiles under the logical streaming context so that we + * have those files until stream stop. + */ + oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); + + /* + * If this is the first streamed segment, the file must not exist, so make + * sure we're the ones creating it. Otherwise just open the file for + * writing, in append mode. + */ + if (first_segment) + { + MemoryContext savectx; + SharedFileSet *fileset; + + /* + * We need to maintain shared fileset across multiple stream start/stop + * calls. So, need to allocate it in a persistent context. + */ + savectx = MemoryContextSwitchTo(ApplyContext); + fileset = palloc(sizeof(SharedFileSet)); + + SharedFileSetInit(fileset, NULL); + MemoryContextSwitchTo(savectx); + + stream_fd = BufFileCreateShared(fileset, path); + + /* Remember the fileset for the next stream of the same transaction */ + ent->xid = xid; + ent->stream_fileset = fileset; + ent->subxact_fileset = NULL; + } + else + { + /* + * Open the file and seek to the end of the file because we always + * append the changes file. + */ + stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + BufFileSeek(stream_fd, 0, 0, SEEK_END); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * stream_close_file + * Close the currently open file with streamed changes. + * + * This can only be called at the end of a streaming block, i.e. at stream_stop + * message from the upstream. + */ +static void +stream_close_file(void) +{ + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + BufFileClose(stream_fd); + + stream_xid = InvalidTransactionId; + stream_fd = NULL; +} + +/* + * stream_write_change + * Serialize a change to a file for the current toplevel transaction. + * + * The change is serialized in a simple format, with length (not including + * the length), action code (identifying the message type) and message + * contents (without the subxact TransactionId value). + * + * XXX The subxact file includes CRC32C of the contents. Maybe we should + * include something like that here too, but doing so will not be as + * straighforward, because we write the file in chunks. + */ +static void +stream_write_change(char action, StringInfo s) +{ + int len; + + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + /* total on-disk size, including the action type character */ + len = (s->len - s->cursor) + sizeof(char); + + /* first write the size */ + BufFileWrite(stream_fd, &len, sizeof(len)); + + /* then the action */ + BufFileWrite(stream_fd, &action, sizeof(action)); + + /* and finally the remaining part of the buffer (after the XID) */ + len = (s->len - s->cursor); + + BufFileWrite(stream_fd, &s->data[s->cursor], len); +} + +/* + * Cleanup the memory for subxacts and reset the related variables. + */ +static inline void +cleanup_subxact_info() +{ + if (subxacts) + pfree(subxacts); + + subxacts = NULL; + subxact_last = InvalidTransactionId; + nsubxacts = 0; + nsubxacts_max = 0; +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -2145,6 +3080,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; + options.proto.logical.streaming = MySubscription->stream; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 81ef7dc4c1..3360bd5dd0 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -47,29 +47,57 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); static bool publications_valid; +static bool in_streaming; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); +static void send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. * + * The schema_sent flag determines if the current schema record was already + * sent to the subscriber (in which case we don't need to send it again). + * + * The schema cache on downstream is however updated only at commit time, + * and with streamed transactions the commit order may be different from + * the order the transactions are sent in. Also, the (sub) transactions + * might get aborted so we need to send the schema for each (sub) transaction + * so that we don't loose the schema information on abort. For handling this, + * we maintain the list of xids (streamed_txns) for those we have already sent + * the schema. + * * For partitions, 'pubactions' considers not only the table's own * publications, but also those of all of its ancestors. */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ - + TransactionId xid; /* transaction that created the record */ /* * Did we send the schema? If ancestor relid is set, its schema must also * have been sent for this to be true. */ bool schema_sent; + List *streamed_txns; /* streamed toplevel transactions with this + * schema */ bool replicate_valid; PublicationActions pubactions; @@ -95,11 +123,17 @@ typedef struct RelationSyncEntry static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); +static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); +static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); +static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); + /* * Specify output plugin callbacks */ @@ -115,16 +149,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; + + /* transaction streaming */ + cb->stream_change_cb = pgoutput_change; + cb->stream_truncate_cb = pgoutput_truncate; + cb->stream_abort_cb = pgoutput_stream_abort; + cb->stream_commit_cb = pgoutput_stream_commit; + cb->stream_start_cb = pgoutput_stream_start; + cb->stream_stop_cb = pgoutput_stream_stop; } static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names, bool *binary) + List **publication_names, bool *binary, + bool *enable_streaming) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool streaming_given = false; *binary = false; @@ -182,6 +226,23 @@ parse_output_parameters(List *options, uint32 *protocol_version, *binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "streaming") == 0) + { + if (streaming_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + streaming_given = true; + + /* the value must be on/off */ + if (strcmp(strVal(defel->arg), "on") && strcmp(strVal(defel->arg), "off")) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid streaming value"))); + + /* enable streaming if it's 'on' */ + *enable_streaming = (strcmp(strVal(defel->arg), "on") == 0); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -194,6 +255,7 @@ static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { + bool enable_streaming = false; PGOutputData *data = palloc0(sizeof(PGOutputData)); /* Create our memory context for private allocations. */ @@ -217,7 +279,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, &data->publication_names, - &data->binary); + &data->binary, + &enable_streaming); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) @@ -237,6 +300,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("publication_names parameter missing"))); + /* + * Decide whether to enable streaming. It is disabled by default, in + * which case we just update the flag in decoding context. Otherwise + * we only allow it with sufficient version of the protocol, and when + * the output plugin supports it. + */ + if (!enable_streaming) + ctx->streaming = false; + else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support streaming, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM))); + else if (!ctx->streaming) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("streaming requested, but not supported by output plugin"))); + + /* Also remember we're currently not streaming any transaction. */ + in_streaming = false; + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -247,6 +331,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Initialize relation schema cache. */ init_rel_sync_cache(CacheMemoryContext); } + else + { + /* Disable the streaming during the slot initialization mode. */ + ctx->streaming = false; + } } /* @@ -305,9 +394,41 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static void maybe_send_schema(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry) { - if (relentry->schema_sent) + bool schema_sent; + TransactionId xid = InvalidTransactionId; + TransactionId topxid = InvalidTransactionId; + + /* + * Remember XID of the (sub)transaction for the change. We don't care if + * it's top-level transaction or not (we have already sent that XID in + * start the current streaming block). + * + * If we're not in a streaming block, just use InvalidTransactionId and + * the write methods will not include it. + */ + if (in_streaming) + xid = change->txn->xid; + + if (change->txn->toptxn) + topxid = change->txn->toptxn->xid; + else + topxid = xid; + + /* + * Do we need to send the schema? We do track streamed transactions + * separately, because those may not be applied later (and the regular + * transactions won't see their effects until then) and in an order + * that we don't know at this point. + */ + if (in_streaming) + schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); + else + schema_sent = relentry->schema_sent; + + if (schema_sent) return; /* If needed, send the ancestor's schema first. */ @@ -323,19 +444,25 @@ maybe_send_schema(LogicalDecodingContext *ctx, relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc), CreateTupleDescCopy(outdesc)); MemoryContextSwitchTo(oldctx); - send_relation_and_attrs(ancestor, ctx); + send_relation_and_attrs(ancestor, xid, ctx); RelationClose(ancestor); } - send_relation_and_attrs(relation, ctx); - relentry->schema_sent = true; + send_relation_and_attrs(relation, xid, ctx); + relentry->xid = change->txn->xid; + + if (in_streaming) + set_schema_sent_in_streamed_txn(relentry, topxid); + else + relentry->schema_sent = true; } /* * Sends a relation */ static void -send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) +send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -359,17 +486,19 @@ send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) continue; OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); + logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); + logicalrep_write_rel(ctx->out, xid, relation); OutputPluginWrite(ctx, false); } /* * Sends the decoded DML over wire. + * + * XXX May be called both in streaming and non-streaming modes. */ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -378,6 +507,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; + TransactionId xid = InvalidTransactionId; + + if (in_streaming) + xid = change->txn->xid; if (!is_publishable_relation(relation)) return; @@ -406,7 +539,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - maybe_send_schema(ctx, relation, relentry); + maybe_send_schema(ctx, txn, change, relation, relentry); /* Send the data */ switch (change->action) @@ -426,7 +559,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, tuple, + logicalrep_write_insert(ctx->out, xid, relation, tuple, data->binary); OutputPluginWrite(ctx, true); break; @@ -451,8 +584,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, newtuple, - data->binary); + logicalrep_write_update(ctx->out, xid, relation, oldtuple, + newtuple, data->binary); OutputPluginWrite(ctx, true); break; } @@ -472,7 +605,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, oldtuple, + logicalrep_write_delete(ctx->out, xid, relation, oldtuple, data->binary); OutputPluginWrite(ctx, true); } @@ -498,6 +631,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int i; int nrelids; Oid *relids; + TransactionId xid = InvalidTransactionId; + + if (in_streaming) + xid = change->txn->xid; old = MemoryContextSwitchTo(data->context); @@ -526,13 +663,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, continue; relids[nrelids++] = relid; - maybe_send_schema(ctx, relation, relentry); + maybe_send_schema(ctx, txn, change, relation, relentry); } if (nrelids > 0) { OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, + xid, nrelids, relids, change->data.truncate.cascade, @@ -605,6 +743,113 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) rel_sync_cache_publication_cb(arg, cacheid, hashvalue); } +/* + * Notify downstream to discard the streamed transaction (along with all + * it's subtransactions, if it's a toplevel transaction). + */ +static void +pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + ReorderBufferTXN *toptxn; + + /* + * The abort should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + + /* determine the toplevel transaction */ + toptxn = (txn->toptxn) ? txn->toptxn : txn; + + Assert(rbtxn_is_streamed(toptxn)); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(toptxn->xid, false); +} + +/* + * Notify downstream to apply the streamed transaction (along with all + * it's subtransactions). + */ +static void +pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + /* + * The commit should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(txn->xid, true); +} + + +static void +pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + + /* we can't nest streaming of transactions */ + Assert(!in_streaming); + + /* + * If we already sent the first stream for this transaction then don't + * send the origin id in the subsequent streams. + */ + if (rbtxn_is_streamed(txn)) + send_replication_origin = false; + + OutputPluginPrepareWrite(ctx, !send_replication_origin); + logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); + + if (send_replication_origin) + { + char *origin; + + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); + + if (replorigin_by_oid(txn->origin_id, true, &origin)) + logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr); + } + + OutputPluginWrite(ctx, true); + + /* we're streaming a chunk of transaction now */ + in_streaming = true; +} + +static void +pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + /* we should be streaming a trasanction */ + Assert(in_streaming); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_stop(ctx->out); + OutputPluginWrite(ctx, true); + + /* we've stopped streaming a transaction */ + in_streaming = false; +} + /* * Initialize the relation schema sync cache for a decoding session. * @@ -641,6 +886,38 @@ init_rel_sync_cache(MemoryContext cachectx) (Datum) 0); } +/* + * We expect relatively small number of streamed transactions. + */ +static bool +get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + ListCell *lc; + foreach (lc, entry->streamed_txns) + { + if (xid == lfirst_int(lc)) + return true; + } + + return false; +} + +/* + * Add the xid in the rel sync entry for which we have already sent the schema + * of the relation. + */ +static void +set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + entry->streamed_txns = lappend_int(entry->streamed_txns, xid); + + MemoryContextSwitchTo(oldctx); +} + /* * Find or create entry in the relation schema cache. * @@ -771,11 +1048,44 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } if (!found) + { entry->schema_sent = false; + entry->streamed_txns = NULL; + } return entry; } +/* + * Cleanup list of streamed transactions and update the schema_sent flag. + * + * When a streamed transaction commits or aborts, we need to remove the + * toplevel XID from the schema cache. If the transaction aborted, the + * subscriber will simply throw away the schema records we streamed, so + * we don't need to do anything else. + * + * If the transaction committed, the subscriber will update the relation + * cache - so tweak the schema_sent flag accordingly. + */ +static void +cleanup_rel_sync_cache(TransactionId xid, bool is_commit) +{ + HASH_SEQ_STATUS hash_seq; + RelationSyncEntry *entry; + + Assert(RelationSyncCache != NULL); + + hash_seq_init(&hash_seq, RelationSyncCache); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + if (is_commit) + entry->schema_sent = true; + + /* Remove the xid from the schema sent list. */ + entry->streamed_txns = list_delete_int(entry->streamed_txns, xid); + } +} + /* * Relcache invalidation callback */ @@ -811,7 +1121,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) * Reset schema sent status as the relation definition may have changed. */ if (entry != NULL) + { entry->schema_sent = false; + list_free(entry->streamed_txns); + entry->streamed_txns = NULL; + } } /* diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 9795c35000..1d091546bf 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -51,6 +51,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subbinary; /* True if the subscription wants the * publisher to send data in binary */ + bool substream; /* Stream in-progress transactions. */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -78,6 +80,7 @@ typedef struct Subscription bool enabled; /* Indicates if the subscription is enabled */ bool binary; /* Indicates if the subscription wants data in * binary format */ + bool stream; /* Allow streaming in-progress transactions. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 807a9c1edf..0dfbac46b4 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -982,7 +982,11 @@ typedef enum WAIT_EVENT_WAL_READ, WAIT_EVENT_WAL_SYNC, WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN, - WAIT_EVENT_WAL_WRITE + WAIT_EVENT_WAL_WRITE, + WAIT_EVENT_LOGICAL_CHANGES_READ, + WAIT_EVENT_LOGICAL_CHANGES_WRITE, + WAIT_EVENT_LOGICAL_SUBXACT_READ, + WAIT_EVENT_LOGICAL_SUBXACT_WRITE } WaitEventIO; /* ---------- diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 60a76bc85c..655144d03a 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -23,9 +23,13 @@ * we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we * have backwards compatibility for. The client requests protocol version at * connect time. + * + * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with + * support for streaming large transactions. */ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 -#define LOGICALREP_PROTO_VERSION_NUM 1 +#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 +#define LOGICALREP_PROTO_VERSION_NUM 2 /* * This struct stores a tuple received via logical replication. @@ -98,25 +102,49 @@ extern void logicalrep_read_commit(StringInfo in, extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); -extern void logicalrep_write_insert(StringInfo out, Relation rel, - HeapTuple newtuple, bool binary); +extern void logicalrep_write_insert(StringInfo out, TransactionId xid, + Relation rel, HeapTuple newtuple, + bool binary); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); -extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, +extern void logicalrep_write_update(StringInfo out, TransactionId xid, + Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); -extern void logicalrep_write_delete(StringInfo out, Relation rel, - HeapTuple oldtuple, bool binary); +extern void logicalrep_write_delete(StringInfo out, TransactionId xid, + Relation rel, HeapTuple oldtuple, + bool binary); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); -extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], +extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, + int nrelids, Oid relids[], bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); -extern void logicalrep_write_rel(StringInfo out, Relation rel); +extern void logicalrep_write_rel(StringInfo out, TransactionId xid, + Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); -extern void logicalrep_write_typ(StringInfo out, Oid typoid); +extern void logicalrep_write_typ(StringInfo out, TransactionId xid, + Oid typoid); extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp); +extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid, + bool first_segment); +extern TransactionId logicalrep_read_stream_start(StringInfo in, + bool *first_segment); + +extern void logicalrep_write_stream_stop(StringInfo out); +extern TransactionId logicalrep_read_stream_stop(StringInfo in); + +extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern TransactionId logicalrep_read_stream_commit(StringInfo out, + LogicalRepCommitData *commit_data); + +extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, + TransactionId subxid); +extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, + TransactionId *subxid); + #endif /* LOGICAL_PROTO_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c2d5dbee54..6c0a4e30a8 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -178,6 +178,7 @@ typedef struct uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ bool binary; /* Ask publisher to use binary */ + bool streaming; /* Streaming of large transactions */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/subscription/t/009_stream_simple.pl b/src/test/subscription/t/009_stream_simple.pl new file mode 100644 index 0000000000..2f01133f69 --- /dev/null +++ b/src/test/subscription/t/009_stream_simple.pl @@ -0,0 +1,86 @@ +# Test streaming of simple large transaction +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/010_stream_subxact.pl b/src/test/subscription/t/010_stream_subxact.pl new file mode 100644 index 0000000000..d2ae38592b --- /dev/null +++ b/src/test/subscription/t/010_stream_subxact.pl @@ -0,0 +1,102 @@ +# Test streaming of large transaction containing large subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rowsto exceed 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s4; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(1667|1667|1667), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(1667|1667|1667), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/011_stream_ddl.pl b/src/test/subscription/t/011_stream_ddl.pl new file mode 100644 index 0000000000..0da39a1a8a --- /dev/null +++ b/src/test/subscription/t/011_stream_ddl.pl @@ -0,0 +1,95 @@ +# Test streaming of large transaction with DDL and subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|0|0), 'check initial data was copied to subscriber'); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (3, md5(3::text)); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (4, md5(4::text), -4); +COMMIT; +}); + +# large (streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i); +COMMIT; +}); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e) FROM test_tab"); +is($result, qq(2002|1999|1002|1), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/012_stream_subxact_abort.pl b/src/test/subscription/t/012_stream_subxact_abort.pl new file mode 100644 index 0000000000..402df30f59 --- /dev/null +++ b/src/test/subscription/t/012_stream_subxact_abort.pl @@ -0,0 +1,82 @@ +# Test streaming of large transaction containing multiple subtransactions and rollbacks +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i); +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i); +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i); +ROLLBACK TO s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i); +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(1000|0), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/013_stream_subxact_ddl_abort.pl b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl new file mode 100644 index 0000000000..becbdd0578 --- /dev/null +++ b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl @@ -0,0 +1,84 @@ +# Test behavior with streaming transaction exceeding logical_decoding_work_mem +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i); +ALTER TABLE test_tab DROP COLUMN c; +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(1000|500), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/015_stream_binary.pl b/src/test/subscription/t/015_stream_binary.pl new file mode 100644 index 0000000000..fa2362e32b --- /dev/null +++ b/src/test/subscription/t/015_stream_binary.pl @@ -0,0 +1,86 @@ +# Test streaming of simple large transaction +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on, binary = true)" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; -- 2.23.0 v42/v42-0007-Add-streaming-option-in-pg_dump.patch000644 000765 000024 00000005262 13706560046 023051 0ustar00dilipkumarstaff000000 000000 From 202716074e871612c1471f6fd7ee41cde25be7d7 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 20 Jul 2020 11:09:18 +0530 Subject: [PATCH v42 7/7] Add streaming option in pg_dump --- src/bin/pg_dump/pg_dump.c | 17 +++++++++++++++-- src/bin/pg_dump/pg_dump.h | 1 + 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 94459b3539..f69d64cd16 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4201,6 +4201,7 @@ getSubscriptions(Archive *fout) int i_oid; int i_subname; int i_rolname; + int i_substream; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4240,10 +4241,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 140000) appendPQExpBuffer(query, - " s.subbinary\n"); + " s.subbinary,\n"); else appendPQExpBuffer(query, - " false AS subbinary\n"); + " false AS subbinary,\n"); + + if (fout->remoteVersion >= 140000) + appendPQExpBuffer(query, + " s.substream\n"); + else + appendPQExpBuffer(query, + " false AS substream\n"); appendPQExpBuffer(query, "FROM pg_subscription s\n" @@ -4263,6 +4271,7 @@ getSubscriptions(Archive *fout) i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); i_subbinary = PQfnumber(res, "subbinary"); + i_substream = PQfnumber(res, "substream"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4286,6 +4295,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subpublications)); subinfo[i].subbinary = pg_strdup(PQgetvalue(res, i, i_subbinary)); + subinfo[i].substream = + pg_strdup(PQgetvalue(res, i, i_substream)); if (strlen(subinfo[i].rolname) == 0) pg_log_warning("owner of subscription \"%s\" appears to be invalid", @@ -4357,6 +4368,8 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) if (strcmp(subinfo->subbinary, "t") == 0) appendPQExpBuffer(query, ", binary = true"); + if (strcmp(subinfo->substream, "f") != 0) + appendPQExpBuffer(query, ", streaming = on"); if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index da97b731b1..cc10c7c1cc 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -626,6 +626,7 @@ typedef struct _SubscriptionInfo char *subconninfo; char *subslotname; char *subbinary; + char *substream; char *subsynccommit; char *subpublications; } SubscriptionInfo; -- 2.23.0 v42/v42-0001-Extend-the-logical-decoding-output-plugin-API-wi.patch000644 000765 000024 00000114056 13706560046 026070 0ustar00dilipkumarstaff000000 000000 From f311ae1ba840d2540a39d7cf6e18c5fe61d9d2b3 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 30 Jun 2020 11:01:24 +0530 Subject: [PATCH v42 1/7] Extend the logical decoding output plugin API with stream methods. This adds seven methods to the output plugin API, adding support for streaming changes for large in-porgress transactions. * stream_start * stream_stop * stream_abort * stream_commit * stream_change * stream_message * stream_truncate Most of this is a simple extension of the existing methods, with the semantic difference that the transaction (or subtransaction) is incomplete and may be aborted later (which is something the regular API does not really need to deal with). This also extends the 'test_decoding' plugin, implementing these new stream methods. The stream_start/start_stop are used to demarcate a chunk of changes streamed for a particular toplevel transaction. This commit simply adds these new APIs and the upcoming patch to "allow the streaming mode in ReorderBuffer" will use these APIs. Author: Tomas Vondra, Dilip Kumar, Amit Kapila Reviewed-by: Amit Kapila Tested-by: Neha Sharma and Mahendra Singh Thalor Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com --- contrib/test_decoding/test_decoding.c | 176 +++++++++++ doc/src/sgml/logicaldecoding.sgml | 218 ++++++++++++++ src/backend/replication/logical/logical.c | 351 ++++++++++++++++++++++ src/include/replication/logical.h | 5 + src/include/replication/output_plugin.h | 69 +++++ src/include/replication/reorderbuffer.h | 59 ++++ 6 files changed, 878 insertions(+) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 93c948856e..dbef52a3af 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_stream_start(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pg_decode_stream_stop(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pg_decode_stream_abort(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static void pg_decode_stream_commit(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_stream_change(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); +static void pg_decode_stream_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); +static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change); void _PG_init(void) @@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->stream_start_cb = pg_decode_stream_start; + cb->stream_stop_cb = pg_decode_stream_stop; + cb->stream_abort_cb = pg_decode_stream_abort; + cb->stream_commit_cb = pg_decode_stream_commit; + cb->stream_change_cb = pg_decode_stream_change; + cb->stream_message_cb = pg_decode_stream_message; + cb->stream_truncate_cb = pg_decode_stream_truncate; } @@ -540,3 +569,150 @@ pg_decode_message(LogicalDecodingContext *ctx, appendBinaryStringInfo(ctx->out, message, sz); OutputPluginWrite(ctx, true); } + +/* + * We never try to stream any empty xact so we don't need any special handling + * for skip_empty_xacts in streaming mode APIs. + */ +static void +pg_decode_stream_start(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "opening a streamed block for transaction"); + OutputPluginWrite(ctx, true); +} + +/* + * We never try to stream any empty xact so we don't need any special handling + * for skip_empty_xacts in streaming mode APIs. + */ +static void +pg_decode_stream_stop(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "closing a streamed block for transaction"); + OutputPluginWrite(ctx, true); +} + +/* + * We never try to stream any empty xact so we don't need any special handling + * for skip_empty_xacts in streaming mode APIs. + */ +static void +pg_decode_stream_abort(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "aborting streamed (sub)transaction"); + OutputPluginWrite(ctx, true); +} + +/* + * We never try to stream any empty xact so we don't need any special handling + * for skip_empty_xacts in streaming mode APIs. + */ +static void +pg_decode_stream_commit(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "committing streamed transaction"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* + * In streaming mode, we don't display the changes as the transaction can abort + * at a later point in time. We don't want users to see the changes until the + * transaction is committed. + */ +static void +pg_decode_stream_change(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "streaming change for transaction"); + OutputPluginWrite(ctx, true); +} + +/* + * In streaming mode, we don't display the contents for transactional messages + * as the transaction can abort at a later point in time. We don't want users to + * see the message contents until the transaction is committed. + */ +static void +pg_decode_stream_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + + if (transactional) + { + appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu", + transactional, prefix, sz); + } + else + { + appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:", + transactional, prefix, sz); + appendBinaryStringInfo(ctx->out, message, sz); + } + + OutputPluginWrite(ctx, true); +} + +/* + * In streaming mode, we don't display the detailed information of Truncate. + * See pg_decode_stream_change. + */ +static void +pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "streaming truncate for transaction"); + OutputPluginWrite(ctx, true); +} diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index c89f93cf6b..791a62b57c 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -389,6 +389,13 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + LogicalDecodeStreamStartCB stream_start_cb; + LogicalDecodeStreamStopCB stream_stop_cb; + LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamCommitCB stream_commit_cb; + LogicalDecodeStreamChangeCB stream_change_cb; + LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); @@ -401,6 +408,15 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); If truncate_cb is not set but a TRUNCATE is to be decoded, the action will be ignored. + + + An output plugin may also define functions to support streaming of large, + in-progress transactions. The stream_start_cb, + stream_stop_cb, stream_abort_cb, + stream_commit_cb and stream_change_cb + are required, while stream_message_cb and + stream_truncate_cb are optional. + @@ -679,6 +695,117 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, + + Stream Start Callback + + The stream_start_cb callback is called when opening + a block of streamed changes from an in-progress transaction. + +typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + + + + + + Stream Stop Callback + + The stream_stop_cb callback is called when closing + a block of streamed changes from an in-progress transaction. + +typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + + + + + + Stream Abort Callback + + The stream_abort_cb callback is called to abort + a previously streamed transaction. + +typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + + + Stream Commit Callback + + The stream_commit_cb callback is called to commit + a previously streamed transaction. + +typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + + + + + + Stream Change Callback + + The stream_change_cb callback is called when sending + a change in a block of streamed changes (demarcated by + stream_start_cb and stream_stop_cb calls). + The actual changes are not displayed as the transaction can abort at a later + point in time and we don't decode changes for aborted transactions. + +typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + + + + + + Stream Message Callback + + The stream_message_cb callback is called when sending + a generic message in a block of streamed changes (demarcated by + stream_start_cb and stream_stop_cb calls). + The message contents for transactional messages are not displayed as the transaction + can abort at a later point in time and we don't decode changes for aborted + transactions. + +typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + + + + + + Stream Truncate Callback + + The stream_truncate_cb callback is called for a + TRUNCATE command in a block of streamed changes + (demarcated by stream_start_cb and + stream_stop_cb calls). + +typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + + The parameters are analogous to the stream_change_cb + callback. However, because TRUNCATE actions on + tables connected by foreign keys need to be executed together, this + callback receives an array of relations instead of just a single one. + See the description of the statement for + details. + + + @@ -747,4 +874,95 @@ OutputPluginWrite(ctx, true); + + + Streaming of Large Transactions for Logical Decoding + + + The basic output plugin callbacks (e.g. begin_cb, + change_cb, commit_cb and + message_cb) are only invoked when the transaction + actually commits. The changes are still decoded from the transaction + log, but are only passed to the output plugin at commit (and discarded + if the transaction aborts). + + + + This means that while the decoding happens incrementally, and may spill + to disk to keep memory usage under control, all the decoded changes have + to be transmitted when the transaction finally commits (or more precisely, + when the commit is decoded from the transaction log). Depending on the + size of the transaction and network bandwidth, the transfer time may + significantly increase the apply lag. + + + + To reduce the apply lag caused by large transactions, an output plugin + may provide additional callback to support incremental streaming of + in-progress transactions. There are multiple required streaming callbacks + (stream_start_cb, stream_stop_cb, + stream_abort_cb, stream_commit_cb + and stream_change_cb) and two optional callbacks + (stream_message_cb) and (stream_truncate_cb). + + + + When streaming an in-progress transaction, the changes (and messages) are + streamed in blocks demarcated by stream_start_cb + and stream_stop_cb callbacks. Once all the decoded + changes are transmitted, the transaction is committed using the + stream_commit_cb callback (or possibly aborted using + the stream_abort_cb callback). + + + + One example sequence of streaming callback calls for one transaction may + look like this: + +stream_start_cb(...); <-- start of first block of changes + stream_change_cb(...); + stream_change_cb(...); + stream_message_cb(...); + stream_change_cb(...); + ... + stream_change_cb(...); +stream_stop_cb(...); <-- end of first block of changes + +stream_start_cb(...); <-- start of second block of changes + stream_change_cb(...); + stream_change_cb(...); + stream_change_cb(...); + ... + stream_message_cb(...); + stream_change_cb(...); +stream_stop_cb(...); <-- end of second block of changes + +stream_commit_cb(...); <-- commit of the streamed transaction + + + + + The actual sequence of callback calls may be more complicated, of course. + There may be blocks for multiple streamed transactions, some of the + transactions may get aborted, etc. + + + + Similar to spill-to-disk behavior, streaming is triggered when the total + amount of changes decoded from the WAL (for all in-progress transactions) + exceeds limit defined by logical_decoding_work_mem setting. + At that point the largest toplevel transaction (measured by amount of memory + currently used for decoded changes) is selected and streamed. However, in + some cases we still have to spill to the disk even if streaming is enabled + because if we cross the memory limit but we still have not decoded the + complete tuple e.g. only decoded toast table insert but not the main table + insert. + + + + Even when streaming large transactions, the changes are still applied in + commit order, preserving the same guarantees as the non-streaming mode. + + + diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 61902be3b0..05d24b93da 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -65,6 +65,23 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +/* streaming callbacks */ +static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr first_lsn); +static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr last_lsn); +static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); +static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message); +static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change); + static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); /* @@ -189,6 +206,39 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + /* + * To support streaming, we require start/stop/abort/commit/change + * callbacks. The message and truncate callbacks are optional, similar to + * regular output plugins. We however enable streaming when at least one + * of the methods is enabled so that we can easily identify missing + * methods. + * + * We decide it here, but only check it later in the wrappers. + */ + ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) || + (ctx->callbacks.stream_stop_cb != NULL) || + (ctx->callbacks.stream_abort_cb != NULL) || + (ctx->callbacks.stream_commit_cb != NULL) || + (ctx->callbacks.stream_change_cb != NULL) || + (ctx->callbacks.stream_message_cb != NULL) || + (ctx->callbacks.stream_truncate_cb != NULL); + + /* + * streaming callbacks + * + * stream_message and stream_truncate callbacks are optional, so we do not + * fail with ERROR when missing, but the wrappers simply do nothing. We + * must set the ReorderBuffer callbacks to something, otherwise the calls + * from there will crash (we don't want to move the checks there). + */ + ctx->reorder->stream_start = stream_start_cb_wrapper; + ctx->reorder->stream_stop = stream_stop_cb_wrapper; + ctx->reorder->stream_abort = stream_abort_cb_wrapper; + ctx->reorder->stream_commit = stream_commit_cb_wrapper; + ctx->reorder->stream_change = stream_change_cb_wrapper; + ctx->reorder->stream_message = stream_message_cb_wrapper; + ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -866,6 +916,307 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr first_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_start"; + state.report_location = first_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this message's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = first_lsn; + + /* in streaming mode, stream_start_cb is required */ + if (ctx->callbacks.stream_start_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_start_cb callback"))); + + ctx->callbacks.stream_start_cb(ctx, txn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr last_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_stop"; + state.report_location = last_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this message's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = last_lsn; + + /* in streaming mode, stream_stop_cb is required */ + if (ctx->callbacks.stream_stop_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_stop_cb callback"))); + + ctx->callbacks.stream_stop_cb(ctx, txn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_abort"; + state.report_location = abort_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = abort_lsn; + + /* in streaming mode, stream_abort_cb is required */ + if (ctx->callbacks.stream_abort_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_abort_cb callback"))); + + ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_commit"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + /* in streaming mode, stream_abort_cb is required */ + if (ctx->callbacks.stream_commit_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_commit_cb callback"))); + + ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_change"; + state.report_location = change->lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = change->lsn; + + /* in streaming mode, stream_change_cb is required */ + if (ctx->callbacks.stream_change_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_change_cb callback"))); + + ctx->callbacks.stream_change_cb(ctx, txn, relation, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (ctx->callbacks.stream_message_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_message"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix, + message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (!ctx->callbacks.stream_truncate_cb) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_truncate"; + state.report_location = change->lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = change->lsn; + + ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c2f2475e5d..deef31825d 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -79,6 +79,11 @@ typedef struct LogicalDecodingContext */ void *output_writer_private; + /* + * Does the output plugin support streaming, and is it enabled? + */ + bool streaming; + /* * State for writing output. */ diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 3dd9236c57..b78c796450 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -99,6 +99,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct */ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); +/* + * Called when starting to stream a block of changes from in-progress + * transaction (may be called repeatedly, if it's streamed in multiple + * chunks). + */ +typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called when stopping to stream a block of changes from in-progress + * transaction to a remote node (may be called repeatedly, if it's streamed + * in multiple chunks). + */ +typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called to discard changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* + * Called to apply changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Callback for streaming individual changes from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* + * Callback for streaming generic logical decoding messages from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + +/* + * Callback for streaming truncates from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + /* * Output plugin callbacks */ @@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + /* streaming of changes */ + LogicalDecodeStreamStartCB stream_start_cb; + LogicalDecodeStreamStopCB stream_stop_cb; + LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamCommitCB stream_commit_cb; + LogicalDecodeStreamChangeCB stream_change_cb; + LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; /* Functions in replication/logical/logical.c */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1055e99e2e..42bc817648 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -348,6 +348,54 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* start streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStartCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr first_lsn); + +/* stop streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStopCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr last_lsn); + +/* discard streamed transaction callback signature */ +typedef void (*ReorderBufferStreamAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* commit streamed transaction callback signature */ +typedef void (*ReorderBufferStreamCommitCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* stream change callback signature */ +typedef void (*ReorderBufferStreamChangeCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* stream message callback signature */ +typedef void (*ReorderBufferStreamMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, Size sz, + const char *message); + +/* stream truncate callback signature */ +typedef void (*ReorderBufferStreamTruncateCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + struct ReorderBuffer { /* @@ -386,6 +434,17 @@ struct ReorderBuffer ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + /* + * Callbacks to be called when streaming a transaction. + */ + ReorderBufferStreamStartCB stream_start; + ReorderBufferStreamStopCB stream_stop; + ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamCommitCB stream_commit; + ReorderBufferStreamChangeCB stream_change; + ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamTruncateCB stream_truncate; + /* * Pointer that will be passed untouched to the callbacks. */ -- 2.23.0 v42/v42-0003-Extend-the-BufFile-interface-for-the-streaming-o.patch000644 000765 000024 00000037647 13706560046 026101 0ustar00dilipkumarstaff000000 000000 From 18afc9fdf67f173cdc73ccdc8ad5d1f1e4c1e3f8 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 14 Jul 2020 10:56:51 +0530 Subject: [PATCH v42 3/7] Extend the BufFile interface for the streaming of in-progress transactions. Allow BufFile to support temporary files that can be used by the single backend when the corresponding files need to be survived across the transaction and need to be opened and closed multiple times. Implement the interface for BufFileTruncate interface to allow files to be truncated up to a particular offset. Extend BufFileSeek API to support SEEK_END case. Add an option to provide a mode while opening the shared BufFiles instead of always opening in read-only mode. --- src/backend/postmaster/pgstat.c | 3 + src/backend/storage/file/buffile.c | 86 +++++++++++++++++--- src/backend/storage/file/fd.c | 9 +-- src/backend/storage/file/sharedfileset.c | 98 +++++++++++++++++++++-- src/backend/utils/sort/logtape.c | 4 +- src/backend/utils/sort/sharedtuplestore.c | 2 +- src/include/pgstat.h | 1 + src/include/storage/buffile.h | 4 +- src/include/storage/fd.h | 2 +- src/include/storage/sharedfileset.h | 4 +- 10 files changed, 186 insertions(+), 27 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 88992c2da2..6c97f68671 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3940,6 +3940,9 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_BUFFILE_WRITE: event_name = "BufFileWrite"; break; + case WAIT_EVENT_BUFFILE_TRUNCATE: + event_name = "BufFileTruncate"; + break; case WAIT_EVENT_CONTROL_FILE_READ: event_name = "ControlFileRead"; break; diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 2d7a082320..a9ca5d929c 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -32,10 +32,14 @@ * (by opening multiple fd.c temporary files). This is an essential feature * for sorts and hashjoins on large amounts of data. * - * BufFile supports temporary files that can be made read-only and shared with - * other backends, as infrastructure for parallel execution. Such files need - * to be created as a member of a SharedFileSet that all participants are - * attached to. + * BufFile supports temporary files that can be shared with other backends, as + * infrastructure for parallel execution. Such files need to be created as a + * member of a SharedFileSet that all participants are attached to. + * + * BufFile supports temporary files that can be used by the single backend when + * the corresponding files need to be survived across the transaction and need + * to be opened and closed multiple times. Such files need to be created as a + * member of a SharedFileSet. *------------------------------------------------------------------------- */ @@ -277,7 +281,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name) * backends and render it read-only. */ BufFile * -BufFileOpenShared(SharedFileSet *fileset, const char *name) +BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode) { BufFile *file; char segment_name[MAXPGPATH]; @@ -301,7 +305,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name) } /* Try to load a segment. */ SharedSegmentName(segment_name, name, nfiles); - files[nfiles] = SharedFileSetOpen(fileset, segment_name); + files[nfiles] = SharedFileSetOpen(fileset, segment_name, mode); if (files[nfiles] <= 0) break; ++nfiles; @@ -321,7 +325,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name) file = makeBufFileCommon(nfiles); file->files = files; - file->readOnly = true; /* Can't write to files opened this way */ + file->readOnly = (mode == O_RDONLY) ? true : false; file->fileset = fileset; file->name = pstrdup(name); @@ -364,6 +368,9 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name) if (!found) elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name); + + /* Unregister the shared fileset */ + SharedFileSetUnregister(fileset); } /* @@ -666,11 +673,22 @@ BufFileSeek(BufFile *file, int fileno, off_t offset, int whence) newFile = file->curFile; newOffset = (file->curOffset + file->pos) + offset; break; -#ifdef NOT_USED case SEEK_END: - /* could be implemented, not needed currently */ + + /* + * Get the file size of the last file to get the last offset of + * that file. + */ + newFile = file->numFiles - 1; + newOffset = FileSize(file->files[file->numFiles - 1]); + if (newOffset < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not determine size of temporary file \"%s\" from BufFile \"%s\": %m", + FilePathName(file->files[file->numFiles - 1]), + file->name))); + break; break; -#endif default: elog(ERROR, "invalid whence: %d", whence); return EOF; @@ -838,3 +856,51 @@ BufFileAppend(BufFile *target, BufFile *source) return startBlock; } + +/* + * Truncate the file upto the given fileno and the offsets. + */ +void +BufFileTruncateShared(BufFile *file, int fileno, off_t offset) +{ + int newFile = file->numFiles; + off_t newOffset = file->curOffset; + char segment_name[MAXPGPATH]; + int i; + + /* Loop over all the files upto the fileno which we want to truncate. */ + for (i = file->numFiles - 1; i >= fileno; i--) + { + /* + * Except the fileno, we can directly delete other files. If the + * offset is 0 then we can delete the fileno file as well unless it is + * the first file. + */ + if ((i != fileno || offset == 0) && fileno != 0) + { + SharedSegmentName(segment_name, file->name, i); + FileClose(file->files[i]); + if (!SharedFileSetDelete(file->fileset, segment_name, true)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not delete shared fileset \"%s\": %m", + segment_name))); + newFile--; + newOffset = MAX_PHYSICAL_FILESIZE; + } + else + { + if (FileTruncate(file->files[i], offset, + WAIT_EVENT_BUFFILE_TRUNCATE) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\": %m", + FilePathName(file->files[i])))); + + newOffset = offset; + } + } + + file->numFiles = newFile; + file->curOffset = newOffset; +} diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 5f6420efb2..f376a97ed6 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -1743,18 +1743,17 @@ PathNameCreateTemporaryFile(const char *path, bool error_on_failure) /* * Open a file that was created with PathNameCreateTemporaryFile, possibly in * another backend. Files opened this way don't count against the - * temp_file_limit of the caller, are read-only and are automatically closed - * at the end of the transaction but are not deleted on close. + * temp_file_limit of the caller, are automatically closed at the end of the + * transaction but are not deleted on close. */ File -PathNameOpenTemporaryFile(const char *path) +PathNameOpenTemporaryFile(const char *path, int mode) { File file; ResourceOwnerEnlargeFiles(CurrentResourceOwner); - /* We open the file read-only. */ - file = PathNameOpenFile(path, O_RDONLY | PG_BINARY); + file = PathNameOpenFile(path, mode | PG_BINARY); /* If no such file, then we don't raise an error. */ if (file <= 0 && errno != ENOENT) diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c index 16b7594756..9a3dc102f5 100644 --- a/src/backend/storage/file/sharedfileset.c +++ b/src/backend/storage/file/sharedfileset.c @@ -13,6 +13,10 @@ * files can be discovered by name, and a shared ownership semantics so that * shared files survive until the last user detaches. * + * SharedFileSets can also be used by backends when the temporary files need + * to be opened/closed multiple times and the underlying files need to survive + * across transactions. + * *------------------------------------------------------------------------- */ @@ -25,19 +29,29 @@ #include "common/hashfn.h" #include "miscadmin.h" #include "storage/dsm.h" +#include "storage/ipc.h" #include "storage/sharedfileset.h" #include "utils/builtins.h" +static List *filesetlist = NIL; + static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum); +static void SharedFileSetDeleteOnProcExit(int status, Datum arg); static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace); static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name); static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name); /* - * Initialize a space for temporary files that can be opened for read-only - * access by other backends. Other backends must attach to it before - * accessing it. Associate this SharedFileSet with 'seg'. Any contained - * files will be deleted when the last backend detaches. + * Initialize a space for temporary files that can be opened by other backends. + * Other backends must attach to it before accessing it. Associate this + * SharedFileSet with 'seg'. Any contained files will be deleted when the + * last backend detaches. + * + * We can also use this interface if the temporary files are used only by + * single backend but the files need to be opened and closed multiple times + * and also the underlying files need to survive across transactions. For + * such cases, dsm segment 'seg' should be passed as NULL. We remove such + * files on proc exit. * * Files will be distributed over the tablespaces configured in * temp_tablespaces. @@ -84,7 +98,25 @@ SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg) } /* Register our cleanup callback. */ - on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset)); + if (seg) + on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset)); + else + { + static bool registered_cleanup = false; + + if (!registered_cleanup) + { + /* + * We must not have registered any fileset before registering the + * fileset clean up. + */ + Assert(filesetlist == NIL); + on_proc_exit(SharedFileSetDeleteOnProcExit, 0); + registered_cleanup = true; + } + + filesetlist = lcons((void *) fileset, filesetlist); + } } /* @@ -147,13 +179,13 @@ SharedFileSetCreate(SharedFileSet *fileset, const char *name) * another backend. */ File -SharedFileSetOpen(SharedFileSet *fileset, const char *name) +SharedFileSetOpen(SharedFileSet *fileset, const char *name, int mode) { char path[MAXPGPATH]; File file; SharedFilePath(path, fileset, name); - file = PathNameOpenTemporaryFile(path); + file = PathNameOpenTemporaryFile(path, mode); return file; } @@ -222,6 +254,58 @@ SharedFileSetOnDetach(dsm_segment *segment, Datum datum) SharedFileSetDeleteAll(fileset); } +/* + * Callback function that will be invoked on the process exit. This will + * process the list of all the registered sharedfilesets and delete the + * underlying files. + */ +static void +SharedFileSetDeleteOnProcExit(int status, Datum arg) +{ + ListCell *l; + + /* Loop over all the pending shared fileset entry */ + foreach (l, filesetlist) + { + SharedFileSet *fileset = (SharedFileSet *) lfirst(l); + SharedFileSetDeleteAll(fileset); + } + + filesetlist = NIL; +} + +/* + * Unregister the shared fileset entry registered for cleanup on proc exit. + */ +void +SharedFileSetUnregister(SharedFileSet *input_fileset) +{ + bool found = false; + ListCell *l; + + /* + * If the caller is following the dsm based cleanup then we don't + * maintain the filesetlist so return. + */ + if (filesetlist == NIL) + return; + + foreach (l, filesetlist) + { + SharedFileSet *fileset = (SharedFileSet *) lfirst(l); + + /* Remove the entry from the list */ + if (input_fileset == fileset) + { + filesetlist = list_delete_cell(filesetlist, l); + found = true; + break; + } + } + + Assert(found); +} + /* * Build the path for the directory holding the files backing a SharedFileSet * in a given tablespace. diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 5517e59c50..788815cdab 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -78,6 +78,8 @@ #include "postgres.h" +#include + #include "storage/buffile.h" #include "utils/builtins.h" #include "utils/logtape.h" @@ -551,7 +553,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lt = <s->tapes[i]; pg_itoa(i, filename); - file = BufFileOpenShared(fileset, filename); + file = BufFileOpenShared(fileset, filename, O_RDONLY); filesize = BufFileSize(file); /* diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 6537a4303b..b83fb50dac 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -559,7 +559,7 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) sts_filename(name, accessor, accessor->read_participant); accessor->read_file = - BufFileOpenShared(accessor->fileset, name); + BufFileOpenShared(accessor->fileset, name, O_RDONLY); } /* Seek and load the chunk header. */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 1387201382..807a9c1edf 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -916,6 +916,7 @@ typedef enum WAIT_EVENT_BASEBACKUP_READ = PG_WAIT_IO, WAIT_EVENT_BUFFILE_READ, WAIT_EVENT_BUFFILE_WRITE, + WAIT_EVENT_BUFFILE_TRUNCATE, WAIT_EVENT_CONTROL_FILE_READ, WAIT_EVENT_CONTROL_FILE_SYNC, WAIT_EVENT_CONTROL_FILE_SYNC_UPDATE, diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index f4752bab0d..fc34c49522 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -48,7 +48,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source); extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name); extern void BufFileExportShared(BufFile *file); -extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name); +extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name, + int mode); extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name); +extern void BufFileTruncateShared(BufFile *file, int fileno, off_t offset); #endif /* BUFFILE_H */ diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 8cd125d7df..e209f047e8 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -94,7 +94,7 @@ extern mode_t FileGetRawMode(File file); /* Operations used for sharing named temporary files */ extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure); -extern File PathNameOpenTemporaryFile(const char *name); +extern File PathNameOpenTemporaryFile(const char *path, int mode); extern bool PathNameDeleteTemporaryFile(const char *name, bool error_on_failure); extern void PathNameCreateTemporaryDir(const char *base, const char *name); extern void PathNameDeleteTemporaryDir(const char *name); diff --git a/src/include/storage/sharedfileset.h b/src/include/storage/sharedfileset.h index 2d6cf077e5..d5edb600af 100644 --- a/src/include/storage/sharedfileset.h +++ b/src/include/storage/sharedfileset.h @@ -37,9 +37,11 @@ typedef struct SharedFileSet extern void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg); extern void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg); extern File SharedFileSetCreate(SharedFileSet *fileset, const char *name); -extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name); +extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name, + int mode); extern bool SharedFileSetDelete(SharedFileSet *fileset, const char *name, bool error_on_failure); extern void SharedFileSetDeleteAll(SharedFileSet *fileset); +extern void SharedFileSetUnregister(SharedFileSet *input_fileset); #endif -- 2.23.0 v42/v42-0005-Enable-streaming-for-all-subscription-TAP-tests.patch000644 000765 000024 00000035515 13706560046 026044 0ustar00dilipkumarstaff000000 000000 From 2b39a173ebfe7f43e1b8cc592d1ca3af193741d1 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Wed, 20 Nov 2019 16:41:13 +0530 Subject: [PATCH v42 5/7] Enable streaming for all subscription TAP tests --- src/test/subscription/t/001_rep_changes.pl | 2 +- src/test/subscription/t/002_types.pl | 2 +- src/test/subscription/t/003_constraints.pl | 2 +- src/test/subscription/t/004_sync.pl | 8 ++++---- src/test/subscription/t/005_encoding.pl | 2 +- src/test/subscription/t/006_rewrite.pl | 2 +- src/test/subscription/t/007_ddl.pl | 2 +- src/test/subscription/t/008_diff_schema.pl | 2 +- src/test/subscription/t/009_matviews.pl | 2 +- src/test/subscription/t/009_stream_simple.pl | 2 +- src/test/subscription/t/010_stream_subxact.pl | 2 +- src/test/subscription/t/010_truncate.pl | 6 +++--- src/test/subscription/t/011_generated.pl | 2 +- src/test/subscription/t/011_stream_ddl.pl | 2 +- src/test/subscription/t/012_collation.pl | 2 +- src/test/subscription/t/012_stream_subxact_abort.pl | 2 +- src/test/subscription/t/013_stream_subxact_ddl_abort.pl | 2 +- src/test/subscription/t/100_bugs.pl | 2 +- 18 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 3f8318fc7c..6f7bedc130 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -78,7 +78,7 @@ $node_publisher->safe_psql('postgres', "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only WITH (streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl index aedcab2fbc..94c71f8ae2 100644 --- a/src/test/subscription/t/002_types.pl +++ b/src/test/subscription/t/002_types.pl @@ -108,7 +108,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot, streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl index 9f140b552b..21410fac1c 100644 --- a/src/test/subscription/t/003_constraints.pl +++ b/src/test/subscription/t/003_constraints.pl @@ -35,7 +35,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false, streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index e111ab9181..a6fae9c3f1 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -33,7 +33,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); @@ -56,7 +56,7 @@ $node_publisher->safe_psql('postgres', # recreate the subscription, it will try to do initial copy $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on)" ); # but it will be stuck on data copy as it will fail on constraint @@ -78,7 +78,7 @@ is($result, qq(20), 'initial data synced for second sub'); # now check another subscription for the same node pair $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" + "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false, streaming = on)" ); # wait for it to start @@ -100,7 +100,7 @@ $node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); # recreate the subscription again $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on)" ); # and wait for data sync to finish again diff --git a/src/test/subscription/t/005_encoding.pl b/src/test/subscription/t/005_encoding.pl index aec7a17a78..202871a658 100644 --- a/src/test/subscription/t/005_encoding.pl +++ b/src/test/subscription/t/005_encoding.pl @@ -26,7 +26,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub WITH (streaming = on);" ); $node_publisher->wait_for_catchup('mysub'); diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl index c6cda10a19..70c86b22ac 100644 --- a/src/test/subscription/t/006_rewrite.pl +++ b/src/test/subscription/t/006_rewrite.pl @@ -22,7 +22,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub WITH (streaming = on);" ); $node_publisher->wait_for_catchup('mysub'); diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl index 7fe6cc6d63..f9c8d1d348 100644 --- a/src/test/subscription/t/007_ddl.pl +++ b/src/test/subscription/t/007_ddl.pl @@ -22,7 +22,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub WITH (streaming = on);" ); $node_publisher->wait_for_catchup('mysub'); diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl index 963334ed89..cdf9b8e7bb 100644 --- a/src/test/subscription/t/008_diff_schema.pl +++ b/src/test/subscription/t/008_diff_schema.pl @@ -32,7 +32,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); diff --git a/src/test/subscription/t/009_matviews.pl b/src/test/subscription/t/009_matviews.pl index 7afc7bdba9..21f50c7012 100644 --- a/src/test/subscription/t/009_matviews.pl +++ b/src/test/subscription/t/009_matviews.pl @@ -18,7 +18,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub WITH (streaming = on);" ); $node_publisher->safe_psql('postgres', diff --git a/src/test/subscription/t/009_stream_simple.pl b/src/test/subscription/t/009_stream_simple.pl index 2f01133f69..30561d8f96 100644 --- a/src/test/subscription/t/009_stream_simple.pl +++ b/src/test/subscription/t/009_stream_simple.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/010_stream_subxact.pl b/src/test/subscription/t/010_stream_subxact.pl index d2ae38592b..9a6bac6822 100644 --- a/src/test/subscription/t/010_stream_subxact.pl +++ b/src/test/subscription/t/010_stream_subxact.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index be2c0bdc35..ed56fbf96c 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -52,13 +52,13 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (streaming = on)" ); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2" + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (streaming = on)" ); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3" + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3 WITH (streaming = on)" ); # Wait for initial sync of all subscriptions diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl index f35d1cba4c..4df1ddef63 100644 --- a/src/test/subscription/t/011_generated.pl +++ b/src/test/subscription/t/011_generated.pl @@ -33,7 +33,7 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (streaming = on)" ); # Wait for initial sync of all subscriptions diff --git a/src/test/subscription/t/011_stream_ddl.pl b/src/test/subscription/t/011_stream_ddl.pl index 0da39a1a8a..c3caff6149 100644 --- a/src/test/subscription/t/011_stream_ddl.pl +++ b/src/test/subscription/t/011_stream_ddl.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/012_collation.pl b/src/test/subscription/t/012_collation.pl index 4bfcef7c2f..c62eb521e7 100644 --- a/src/test/subscription/t/012_collation.pl +++ b/src/test/subscription/t/012_collation.pl @@ -80,7 +80,7 @@ $node_publisher->safe_psql('postgres', q{CREATE PUBLICATION pub1 FOR ALL TABLES}); $node_subscriber->safe_psql('postgres', - qq{CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (copy_data = false)} + qq{CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (copy_data = false, streaming = on)} ); $node_publisher->wait_for_catchup('sub1'); diff --git a/src/test/subscription/t/012_stream_subxact_abort.pl b/src/test/subscription/t/012_stream_subxact_abort.pl index 402df30f59..2be7542831 100644 --- a/src/test/subscription/t/012_stream_subxact_abort.pl +++ b/src/test/subscription/t/012_stream_subxact_abort.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/013_stream_subxact_ddl_abort.pl b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl index becbdd0578..2da9607a7d 100644 --- a/src/test/subscription/t/013_stream_subxact_ddl_abort.pl +++ b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index 366a7a9435..96ffc091b0 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -53,7 +53,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (streaming = on)" ); $node_publisher->wait_for_catchup('sub1'); -- 2.23.0