v45/0000775000175000017500000000000013712204536012565 5ustar dilipkumardilipkumarv45/v45-0001-Implement-streaming-mode-in-ReorderBuffer.patch0000664000175000017500000023103313712204536024603 0ustar dilipkumardilipkumarFrom 3e7189c5617d43dc0fc36903abfb9cec5537642b Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 15 Jul 2020 18:38:32 +0530 Subject: [PATCH v45 1/6] Implement streaming mode in ReorderBuffer. Instead of serializing the transaction to disk after reaching the logical_decoding_work_mem limit in memory, we consume the changes we have in memory and invoke new stream API methods. However, sometimes if we have incomplete toast or speculative insert we spill to the disk because we can not generate the complete tuple and stream. And, as soon as we get the complete tuple we stream the transaction including the serialized changes. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages. Now that we can stream in-progress transactions, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend or WALSender decoding a specific uncommitted transaction. The decoding logic on the receipt of such an sqlerrcode aborts the ongoing decoding and returns gracefully. We have ReorderBufferTXN pointer in each ReorderBufferChange by which we know which xact it belongs to. The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). We also provide a new option via SQL APIs to fetch the changes being streamed. Author: Dilip Kumar, Tomas Vondra, Amit Kapila Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/stream.out | 66 ++ contrib/test_decoding/expected/truncate.out | 6 + contrib/test_decoding/sql/stream.sql | 21 + contrib/test_decoding/sql/truncate.sql | 1 + contrib/test_decoding/test_decoding.c | 13 + doc/src/sgml/logicaldecoding.sgml | 9 +- doc/src/sgml/test-decoding.sgml | 22 + src/backend/access/heap/heapam.c | 13 + src/backend/access/heap/heapam_visibility.c | 42 +- src/backend/access/index/genam.c | 53 ++ src/backend/access/table/tableam.c | 8 + src/backend/access/transam/xact.c | 19 + src/backend/replication/logical/decode.c | 17 +- src/backend/replication/logical/logical.c | 10 + src/backend/replication/logical/reorderbuffer.c | 980 +++++++++++++++++++++--- src/include/access/heapam_xlog.h | 1 + src/include/access/tableam.h | 55 ++ src/include/access/xact.h | 4 + src/include/replication/logical.h | 1 + src/include/replication/reorderbuffer.h | 56 +- 21 files changed, 1293 insertions(+), 106 deletions(-) create mode 100644 contrib/test_decoding/expected/stream.out create mode 100644 contrib/test_decoding/sql/stream.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index f439c58..ed9a3d6 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate + spill slot truncate stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out new file mode 100644 index 0000000..2cdd79a --- /dev/null +++ b/contrib/test_decoding/expected/stream.out @@ -0,0 +1,66 @@ +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE stream_test(data text); +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- streaming test with sub-transaction +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + ?column? +---------- + msg5 +(1 row) + +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1'); + data +---------------------------------------------------------- + opening a streamed block for transaction + streaming message: transactional: 1 prefix: test, sz: 50 + closing a streamed block for transaction + aborting streamed (sub)transaction + opening a streamed block for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + closing a streamed block for transaction + committing streamed transaction +(27 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/expected/truncate.out b/contrib/test_decoding/expected/truncate.out index 1cf2ae8..e64d377 100644 --- a/contrib/test_decoding/expected/truncate.out +++ b/contrib/test_decoding/expected/truncate.out @@ -25,3 +25,9 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc COMMIT (9 rows) +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql new file mode 100644 index 0000000..65e8289 --- /dev/null +++ b/contrib/test_decoding/sql/stream.sql @@ -0,0 +1,21 @@ +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE stream_test(data text); + +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- streaming test with sub-transaction +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1'); + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/sql/truncate.sql b/contrib/test_decoding/sql/truncate.sql index 5aecdf0..5633854 100644 --- a/contrib/test_decoding/sql/truncate.sql +++ b/contrib/test_decoding/sql/truncate.sql @@ -11,3 +11,4 @@ TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE; TRUNCATE tab1, tab2; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index dbef52a..d8e2b41 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -122,6 +122,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, { ListCell *option; TestDecodingData *data; + bool enable_streaming = false; data = palloc0(sizeof(TestDecodingData)); data->context = AllocSetContextCreate(ctx->context, @@ -212,6 +213,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "stream-changes") == 0) + { + if (elem->arg == NULL) + continue; + else if (!parse_bool(strVal(elem->arg), &enable_streaming)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -221,6 +232,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, elem->arg ? strVal(elem->arg) : "(null)"))); } } + + ctx->streaming &= enable_streaming; } /* cleanup this plugin's resources */ diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 791a62b..1571d71 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -433,9 +433,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); - Any actions leading to transaction ID assignment are prohibited. That, among others, - includes writing to tables, performing DDL changes, and - calling pg_current_xact_id(). + Note that access to user catalog tables or regular system catalog tables + in the output plugins has to be done via the systable_* + scan APIs only. Access via the heap_* scan APIs will + error out. Additionally, any actions leading to transaction ID assignment + are prohibited. That, among others, includes writing to tables, performing + DDL changes, and calling pg_current_xact_id(). diff --git a/doc/src/sgml/test-decoding.sgml b/doc/src/sgml/test-decoding.sgml index 8356a3d..fe7c978 100644 --- a/doc/src/sgml/test-decoding.sgml +++ b/doc/src/sgml/test-decoding.sgml @@ -39,4 +39,26 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'i + + We can also get the changes of the in-progress transaction and the typical + output, might be: + + +postgres[33712]=#* SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'stream-changes', '1'); + lsn | xid | data +-----------+-----+-------------------------------------------------- + 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503 + 0/16B21F8 | 503 | streaming change for TXN 503 + 0/16B2300 | 503 | streaming change for TXN 503 + 0/16B2408 | 503 | streaming change for TXN 503 + 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503 + 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503 + 0/16BECA8 | 503 | streaming change for TXN 503 + 0/16BEDB0 | 503 | streaming change for TXN 503 + 0/16BEEB8 | 503 | streaming change for TXN 503 + 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503 +(10 rows) + + + diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 5eef225..0016900 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1299,6 +1299,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg_internal("only heap AM is supported"))); + /* + * We don't expect direct calls to heap_getnext with valid CheckXidAlive + * for catalog or regular tables. See detailed comments in xact.c where + * these variables are declared. Normally we have such a check at tableam + * level API but this is called from many places so we need to ensure it + * here. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected heap_getnext call during logical decoding"); + /* Note: no locking manipulations needed */ if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE) @@ -1956,6 +1966,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, { xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; bufflags |= REGBUF_KEEP_DATA; + + if (IsToastRelation(relation)) + xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION; } XLogBeginInsert(); diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index dba1089..c771280 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -1571,8 +1571,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); + /* + * If we haven't resolved the combocid to cmin/cmax, that means we + * have not decoded the combocid yet. That means the cmin is + * definitely in the future, and we're not supposed to see the tuple + * yet. + * + * XXX This only applies to decoding of in-progress transactions. In + * regular logical decoding we only execute this code at commit time, + * at which point we should have seen all relevant combocids. So + * ideally, we should error out in this case but in practice, this + * won't happen. If we are too worried about this then we can add an + * elog inside ResolveCminCmaxDuringDecoding. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ if (!resolved) - elog(ERROR, "could not resolve cmin/cmax of catalog tuple"); + return false; Assert(cmin != InvalidCommandId); @@ -1642,10 +1659,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, htup, buffer, &cmin, &cmax); - if (!resolved) - elog(ERROR, "could not resolve combocid to cmax"); - - Assert(cmax != InvalidCommandId); + /* + * If we haven't resolved the combocid to cmin/cmax, that means we + * have not decoded the combocid yet. That means the cmax is + * definitely in the future, and we're still supposed to see the + * tuple. + * + * XXX This only applies to decoding of in-progress transactions. In + * regular logical decoding we only execute this code at commit time, + * at which point we should have seen all relevant combocids. So + * ideally, we should error out in this case but in practice, this + * won't happen. If we are too worried about this then we can add an + * elog inside ResolveCminCmaxDuringDecoding. + * + * XXX For the streaming case, we can track the largest combocid + * assigned, and error out based on this (when unable to resolve + * combocid below that observed maximum value). + */ + if (!resolved || cmax == InvalidCommandId) + return true; if (cmax >= snapshot->curcid) return true; /* deleted after scan started */ diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index dfba5ae..9d9a70a 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -28,6 +28,7 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/bufmgr.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -429,10 +430,37 @@ systable_beginscan(Relation heapRelation, sysscan->iscan = NULL; } + /* + * If CheckXidAlive is set then set a flag to indicate that system table + * scan is in-progress. See detailed comments in xact.c where these + * variables are declared. + */ + if (TransactionIdIsValid(CheckXidAlive)) + bsysscan = true; + return sysscan; } /* + * HandleConcurrentAbort - Handle concurrent abort of the CheckXidAlive. + * + * Error out, if CheckXidAlive is aborted. We can't directly use + * TransactionIdDidAbort as after crash such transaction might not have been + * marked as aborted. See detailed comments in xact.c where the variable + * is declared. + */ +static inline void +HandleConcurrentAbort() +{ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); +} + +/* * systable_getnext --- get next tuple in a heap-or-index scan * * Returns NULL if no more tuples available. @@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan) } } + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return htup; } @@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup) sysscan->slot, freshsnap); + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return result; } @@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan) if (sysscan->snapshot) UnregisterSnapshot(sysscan->snapshot); + /* + * Reset the sysbegin_called flag at the end of the systable scan. See + * detailed comments in xact.c where these variables are declared. + */ + if (TransactionIdIsValid(CheckXidAlive)) + bsysscan = false; + pfree(sysscan); } @@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction) if (htup && sysscan->iscan->xs_recheck) elog(ERROR, "system catalog scans with lossy index conditions are not implemented"); + /* + * Handle the concurrent abort while fetching the catalog tuple during + * logical streaming of a transaction. + */ + HandleConcurrentAbort(); + return htup; } diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 3afb63b..c638319 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -249,6 +249,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid) const TableAmRoutine *tableam = rel->rd_tableam; /* + * We don't expect direct calls to table_tuple_get_latest_tid with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding"); + + /* * Since this can be called with user-supplied TID, don't trust the input * too much. */ diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d4f7c29..99722ee 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -83,6 +83,19 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; /* + * CheckXidAlive is a xid value pointing to a possibly ongoing (sub) + * transaction.  Currently, it is used in logical decoding.  It's possible + * that such transactions can get aborted while the decoding is ongoing in + * which case we skip decoding that particular transaction. To ensure that we + * check whether the CheckXidAlive is aborted after fetching the tuple from + * system tables.  We also ensure that during logical decoding we never + * directly access the tableam or heap APIs because we are checking for the + * concurrent aborts only in systable_* APIs. + */ +TransactionId CheckXidAlive = InvalidTransactionId; +bool bsysscan = false; + +/* * When running as a parallel worker, we place only a single * TransactionStateData on the parallel worker's state stack, and the XID * reflected there will be that of the *innermost* currently-active @@ -2680,6 +2693,9 @@ AbortTransaction(void) /* Forget about any active REINDEX. */ ResetReindexState(s->nestingLevel); + /* Reset logical streaming state. */ + ResetLogicalStreamingState(); + /* If in parallel mode, clean up workers and exit parallel mode. */ if (IsInParallelMode()) { @@ -4982,6 +4998,9 @@ AbortSubTransaction(void) /* Forget about any active REINDEX. */ ResetReindexState(s->nestingLevel); + /* Reset logical streaming state. */ + ResetLogicalStreamingState(); + /* Exit from parallel mode, if necessary. */ if (IsInParallelMode()) { diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index f3a1c31..f21f61d 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -724,7 +724,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, + xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); } /* @@ -791,7 +793,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -848,7 +851,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -884,7 +888,7 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) memcpy(change->data.truncate.relids, xlrec->relids, xlrec->nrelids * sizeof(Oid)); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); } /* @@ -984,7 +988,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = false; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); /* move to the next xl_multi_insert_tuple entry */ data += datalen; @@ -1022,7 +1026,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 05d24b9..42f284b 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1442,3 +1442,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +/* + * Clear logical streaming state during (sub)transaction abort. + */ +void +ResetLogicalStreamingState(void) +{ + CheckXidAlive = InvalidTransactionId; + bsysscan = false; +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ce6e621..9d7af4b 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -100,6 +100,7 @@ #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/procarray.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/combocid.h" @@ -178,6 +179,21 @@ typedef struct ReorderBufferDiskChange /* data follows */ } ReorderBufferDiskChange; +#define IsSpecInsert(action) \ +( \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \ +) +#define IsSpecConfirm(action) \ +( \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) \ +) +#define IsInsertOrUpdate(action) \ +( \ + (((action) == REORDER_BUFFER_CHANGE_INSERT) || \ + ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \ +) + /* * Maximum number of changes kept in memory, per transaction. After that, * changes are spooled to disk. @@ -236,6 +252,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -244,6 +261,16 @@ static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid); +/* + * --------------------------------------- + * Streaming support functions + * --------------------------------------- + */ +static inline bool ReorderBufferCanStream(ReorderBuffer *rb); +static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb); +static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn); + /* --------------------------------------- * toast reassembly support * --------------------------------------- @@ -367,6 +394,9 @@ ReorderBufferGetTXN(ReorderBuffer *rb) dlist_init(&txn->tuplecids); dlist_init(&txn->subtxns); + /* InvalidCommandId is not zero, so set it explicitly */ + txn->command_id = InvalidCommandId; + return txn; } @@ -416,13 +446,15 @@ ReorderBufferGetChange(ReorderBuffer *rb) } /* - * Free an ReorderBufferChange. + * Free a ReorderBufferChange and update memory accounting, if requested. */ void -ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) +ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, + bool upd_mem) { /* update memory accounting info */ - ReorderBufferChangeMemoryUpdate(rb, change, false); + if (upd_mem) + ReorderBufferChangeMemoryUpdate(rb, change, false); /* free contained data */ switch (change->action) @@ -624,16 +656,101 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, } /* - * Queue a change into a transaction so it can be replayed upon commit. + * Record the partial change for the streaming of in-progress transactions. We + * can stream only complete changes so if we have a partial change like toast + * table insert or speculative insert then we mark such a 'txn' so that it + * can't be streamed. We also ensure that if the changes in such a 'txn' are + * above logical_decoding_work_mem threshold then we stream them as soon as we + * have a complete change. + */ +static void +ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, + bool toast_insert) +{ + ReorderBufferTXN *toptxn; + + /* + * The partial changes need to be processed only while streaming + * in-progress transactions. + */ + if (!ReorderBufferCanStream(rb)) + return; + + /* Get the top transaction. */ + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + + /* + * Set the toast insert bit whenever we get toast insert to indicate a partial + * change and clear it when we get the insert or update on main table (Both + * update and insert will do the insert in the toast table). + */ + if (toast_insert) + toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT; + else if (rbtxn_has_toast_insert(toptxn) && + IsInsertOrUpdate(change->action)) + toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT; + + /* + * Set the spec insert bit whenever we get the speculative insert to + * indicate the partial change and clear the same on speculative confirm. + */ + if (IsSpecInsert(change->action)) + toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT; + else if (IsSpecConfirm(change->action)) + { + /* + * Speculative confirm change must be preceded by speculative insertion. + */ + Assert(rbtxn_has_spec_insert(toptxn)); + toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT; + } + + /* + * Stream the transaction if it is serialized before and the changes are + * now complete in the top-level transaction. + * + * The reason for doing the streaming of such a transaction as soon as + * we get the complete change for it is that previously it would have + * reached the memory threshold and wouldn't get streamed because of + * incomplete changes. Delaying such transactions would increase apply + * lag for them. + */ + if (ReorderBufferCanStartStreaming(rb) && + !(rbtxn_has_incomplete_tuple(toptxn)) && + rbtxn_is_serialized(txn)) + ReorderBufferStreamTXN(rb, toptxn); +} + +/* + * Queue a change into a transaction so it can be replayed upon commit or will be + * streamed when we reach logical_decoding_work_mem threshold. */ void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, - ReorderBufferChange *change) + ReorderBufferChange *change, bool toast_insert) { ReorderBufferTXN *txn; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + /* + * While streaming the previous changes we have detected that the transaction + * is aborted. So there is no point in collecting further changes for it. + */ + if (txn->concurrent_abort) + { + /* + * We don't need to update memory accounting for this change as we have + * not added it to the queue yet. + */ + ReorderBufferReturnChange(rb, change, false); + return; + } + change->lsn = lsn; change->txn = txn; @@ -645,6 +762,9 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, /* update memory accounting information */ ReorderBufferChangeMemoryUpdate(rb, change, true); + /* process partial change */ + ReorderBufferProcessPartialChange(rb, txn, change, toast_insert); + /* check the memory limits and evict something if needed */ ReorderBufferCheckMemoryLimit(rb); } @@ -674,7 +794,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, change->data.msg.message = palloc(message_size); memcpy(change->data.msg.message, message, message_size); - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); MemoryContextSwitchTo(oldcontext); } @@ -764,6 +884,38 @@ AssertTXNLsnOrder(ReorderBuffer *rb) } /* + * AssertChangeLsnOrder + * + * Check ordering of changes in the (sub)transaction. + */ +static void +AssertChangeLsnOrder(ReorderBufferTXN *txn) +{ +#ifdef USE_ASSERT_CHECKING + dlist_iter iter; + XLogRecPtr prev_lsn = txn->first_lsn; + + dlist_foreach(iter, &txn->changes) + { + ReorderBufferChange *cur_change; + + cur_change = dlist_container(ReorderBufferChange, node, iter.cur); + + Assert(txn->first_lsn != InvalidXLogRecPtr); + Assert(cur_change->lsn != InvalidXLogRecPtr); + Assert(txn->first_lsn <= cur_change->lsn); + + if (txn->end_lsn != InvalidXLogRecPtr) + Assert(cur_change->lsn <= txn->end_lsn); + + Assert(prev_lsn <= cur_change->lsn); + + prev_lsn = cur_change->lsn; + } +#endif +} + +/* * ReorderBufferGetOldestTXN * Return oldest transaction in reorderbuffer */ @@ -1018,6 +1170,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, *iter_state = NULL; + /* Check ordering of changes in the toplevel transaction. */ + AssertChangeLsnOrder(txn); + /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder @@ -1032,6 +1187,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + /* Check ordering of changes in this subtransaction. */ + AssertChangeLsnOrder(cur_txn); + if (cur_txn->nentries > 0) nr_txns++; } @@ -1148,7 +1306,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) { change = dlist_container(ReorderBufferChange, node, dlist_pop_head_node(&state->old_change)); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); Assert(dlist_is_empty(&state->old_change)); } @@ -1234,7 +1392,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, change = dlist_container(ReorderBufferChange, node, dlist_pop_head_node(&state->old_change)); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); Assert(dlist_is_empty(&state->old_change)); } @@ -1280,7 +1438,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Check we're not mixing changes from different transactions. */ Assert(change->txn == txn); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); } /* @@ -1297,7 +1455,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(change->txn == txn); Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); } /* @@ -1310,6 +1468,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* + * Cleanup the snapshot for the last streamed run. + */ + if (txn->snapshot_now != NULL) + { + Assert(rbtxn_is_streamed(txn)); + ReorderBufferFreeSnap(rb, txn->snapshot_now); + } + + /* * Remove TXN from its containing list. * * Note: if txn is known as subxact, we are deleting the TXN from its @@ -1335,6 +1502,91 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* + * Discard changes from a transaction (and subtransactions), after streaming + * them. Keep the remaining info - transactions, tuplecids, invalidations and + * snapshots. + */ +static void +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + dlist_mutable_iter iter; + + /* cleanup subtransactions & their changes */ + dlist_foreach_modify(iter, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); + + /* + * Subtransactions are always associated to the toplevel TXN, even if + * they originally were happening inside another subtxn, so we won't + * ever recurse more than one level deep here. + */ + Assert(rbtxn_is_known_subxact(subtxn)); + Assert(subtxn->nsubtxns == 0); + + ReorderBufferTruncateTXN(rb, subtxn); + } + + /* cleanup changes in the toplevel txn */ + dlist_foreach_modify(iter, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + + /* remove the change from it's containing list */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change, true); + } + + /* + * Mark the transaction as streamed. + * + * The toplevel transaction, identified by (toptxn==NULL), is marked as + * streamed always, even if it does not contain any changes (that is, when + * all the changes are in subtransactions). + * + * For subtransactions, we only mark them as streamed when there are + * changes in them. + * + * We do it this way because of aborts - we don't want to send aborts for + * XIDs the downstream is not aware of. And of course, it always knows + * about the toplevel xact (we send the XID in all messages), but we never + * stream XIDs of empty subxacts. + */ + if ((!txn->toptxn) || (txn->nentries_mem != 0)) + txn->txn_flags |= RBTXN_IS_STREAMED; + + /* + * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any + * memory. We could also keep the hash table and update it with new ctid + * values, but this seems simpler and good enough for now. + */ + if (txn->tuplecid_hash != NULL) + { + hash_destroy(txn->tuplecid_hash); + txn->tuplecid_hash = NULL; + } + + /* If this txn is serialized then clean the disk space. */ + if (rbtxn_is_serialized(txn)) + { + ReorderBufferRestoreCleanup(rb, txn); + txn->txn_flags &= ~RBTXN_IS_SERIALIZED; + } + + /* also reset the number of entries in the transaction */ + txn->nentries_mem = 0; + txn->nentries = 0; +} + +/* * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by * HeapTupleSatisfiesHistoricMVCC. */ @@ -1485,57 +1737,189 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) } /* - * Perform the replay of a transaction and its non-aborted subtransactions. - * - * Subtransactions previously have to be processed by - * ReorderBufferCommitChild(), even if previously assigned to the toplevel - * transaction with ReorderBufferAssignChild. - * - * We currently can only decode a transaction's contents when its commit - * record is read because that's the only place where we know about cache - * invalidations. Thus, once a toplevel commit is read, we iterate over the top - * and subtransactions (using a k-way merge) and replay the changes in lsn - * order. + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, - XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time, - RepOriginId origin_id, XLogRecPtr origin_lsn) +static void +ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) { - ReorderBufferTXN *txn; - volatile Snapshot snapshot_now; - volatile CommandId command_id = FirstCommandId; - bool using_subtxn; - ReorderBufferIterTXNState *volatile iterstate = NULL; + /* we should only call this for previously streamed transactions */ + Assert(rbtxn_is_streamed(txn)); - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); + ReorderBufferStreamTXN(rb, txn); - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; + rb->stream_commit(rb, txn, txn->final_lsn); - txn->final_lsn = commit_lsn; - txn->end_lsn = end_lsn; - txn->commit_time = commit_time; - txn->origin_id = origin_id; - txn->origin_lsn = origin_lsn; + ReorderBufferCleanupTXN(rb, txn); +} +/* + * Set xid for concurrent abort check + * + * While streaming an in-progress transaction there is a possibility that the + * (sub)transaction might get aborted concurrently. In such case if the + * (sub)transaction has catalog update then we might decode the tuple using + * wrong catalog version. For example, suppose there is one catalog tuple with + * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple + * and now we will have two tuples (xmin: 500, xmax: 501) and + * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction + * say 502 updates the same catalog tuple then the first tuple will be changed + * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode + * the the tuple inserted/updated in 501 after the catalog update, will see the + * first catalog tuple instead of second one because it will assume it is + * deleted by xid 502 which is not visible to our snapshot so we can see that + * tuple. + * + * For detecting the concurrent abort we set CheckXidAlive to the current + * (sub)transaction's xid for which this change belongs to. And, during + * catalog scan we can check the status of the xid and if it is aborted we will + * report a specific error so that we can stop streaming current transaction + * and discard the already streamed changes on such an error. We might have + * already streamed some of the changes for the aborted (sub)transaction, but + * that is fine because when we decode the abort we will stream abort message + * to truncate the changes in the subscriber. + */ +static inline void +SetupCheckXidLive(TransactionId xid) +{ /* - * If this transaction has no snapshot, it didn't make any changes to the - * database, so there's nothing to decode. Note that - * ReorderBufferCommitChild will have transferred any snapshots from - * subtransactions if there were any. + * If the input transaction id is already set as a CheckXidAlive then + * nothing to do. */ - if (txn->base_snapshot == NULL) - { - Assert(txn->ninvalidations == 0); - ReorderBufferCleanupTXN(rb, txn); + if (TransactionIdEquals(CheckXidAlive, xid)) return; + + /* + * setup CheckXidAlive if it's not committed yet. We don't check if the + * xid aborted. That will happen during catalog access. + */ + if (!TransactionIdDidCommit(xid)) + CheckXidAlive = xid; + else + CheckXidAlive = InvalidTransactionId; +} + +/* + * Helper function for ReorderBufferProcessTXN for applying change. + */ +static inline void +ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change, + bool streaming) +{ + if (streaming) + rb->stream_change(rb, txn, relation, change); + else + rb->apply_change(rb, txn, relation, change); +} + +/* + * Helper function for ReorderBufferProcessTXN for applying the truncate + */ +static inline void +ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, + int nrelations, Relation *relations, + ReorderBufferChange *change, bool streaming) +{ + if (streaming) + rb->stream_truncate(rb, txn, nrelations, relations, change); + else + rb->apply_truncate(rb, txn, nrelations, relations, change); +} + +/* + * Helper function for ReorderBufferProcessTXN for applying the message + */ +static inline void +ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, bool streaming) +{ + if (streaming) + rb->stream_message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); + else + rb->message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); +} + +/* + * Function to store the command id and snapshot at the end of the current + * stream so that we can reuse the same while sending the next stream. + */ +static inline void +ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, + Snapshot snapshot_now, CommandId command_id) +{ + txn->command_id = command_id; + + /* Avoid copying if it's already copied. */ + if (snapshot_now->copied) + txn->snapshot_now = snapshot_now; + else + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); +} + +/* + * Helper function for ReorderBufferProcessTXN to handle the concurrent + * abort of the streaming transaction. This resets the TXN such that it + * can be used to stream the remaining data of transaction being processed. + */ +static void +ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + Snapshot snapshot_now, + CommandId command_id, + XLogRecPtr last_lsn, + ReorderBufferChange *specinsert) +{ + /* Discard the changes that we just streamed. */ + ReorderBufferTruncateTXN(rb, txn); + + /* Free all resources allocated for toast reconstruction */ + ReorderBufferToastReset(rb, txn); + + /* Return the spec insert change if it is not NULL */ + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert, true); + specinsert = NULL; } - snapshot_now = txn->base_snapshot; + /* Stop the stream. */ + rb->stream_stop(rb, txn, last_lsn); + + /* Remember the command ID and snapshot for the streaming run. */ + ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); +} + +/* + * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN. + * + * Send data of a transaction (and its subtransactions) to the + * output plugin. We iterate over the top and subtransactions (using a k-way + * merge) and replay the changes in lsn order. + * + * If streaming is true then data will be sent using stream API. + */ +static void +ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn, + volatile Snapshot snapshot_now, + volatile CommandId command_id, + bool streaming) +{ + bool using_subtxn; + MemoryContext ccxt = CurrentMemoryContext; + ReorderBufferIterTXNState *volatile iterstate = NULL; + volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr; + ReorderBufferChange *volatile specinsert = NULL; + volatile bool stream_started = false; + ReorderBufferTXN *volatile curtxn = NULL; /* build data to be able to lookup the CommandIds of catalog tuples */ ReorderBufferBuildTupleCidHash(rb, txn); @@ -1558,14 +1942,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_TRY(); { ReorderBufferChange *change; - ReorderBufferChange *specinsert = NULL; if (using_subtxn) - BeginInternalSubTransaction("replay"); + BeginInternalSubTransaction(streaming ? "stream" : "replay"); else StartTransactionCommand(); - rb->begin(rb, txn); + /* We only need to send begin/commit for non-streamed transactions. */ + if (!streaming) + rb->begin(rb, txn); ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) @@ -1573,6 +1958,36 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, Relation relation = NULL; Oid reloid; + /* + * We can't call start stream callback before processing first + * change. + */ + if (prev_lsn == InvalidXLogRecPtr) + { + if (streaming) + { + txn->origin_id = change->origin_id; + rb->stream_start(rb, txn, change->lsn); + stream_started = true; + } + } + + /* + * Enforce correct ordering of changes, merged from multiple + * subtransactions. The changes may have the same LSN due to + * MULTI_INSERT xlog records. + */ + Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn); + + prev_lsn = change->lsn; + + /* Set the xid for concurrent abort check. */ + if (streaming) + { + curtxn = change->txn; + SetupCheckXidLive(curtxn->xid); + } + switch (change->action) { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -1649,7 +2064,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (!IsToastRelation(relation)) { ReorderBufferToastReplace(rb, txn, relation, change); - rb->apply_change(rb, txn, relation, change); + ReorderBufferApplyChange(rb, txn, relation, change, + streaming); /* * Only clear reassembled toast chunks if we're sure @@ -1685,11 +2101,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, */ if (specinsert != NULL) { - ReorderBufferReturnChange(rb, specinsert); + ReorderBufferReturnChange(rb, specinsert, true); specinsert = NULL; } - if (relation != NULL) + if (RelationIsValid(relation)) { RelationClose(relation); relation = NULL; @@ -1714,7 +2130,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, /* clear out a pending (and thus failed) speculation */ if (specinsert != NULL) { - ReorderBufferReturnChange(rb, specinsert); + ReorderBufferReturnChange(rb, specinsert, true); specinsert = NULL; } @@ -1747,7 +2163,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, relations[nrelations++] = relation; } - rb->apply_truncate(rb, txn, nrelations, relations, change); + /* Apply the truncate. */ + ReorderBufferApplyTruncate(rb, txn, nrelations, + relations, change, + streaming); for (i = 0; i < nrelations; i++) RelationClose(relations[i]); @@ -1756,10 +2175,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } case REORDER_BUFFER_CHANGE_MESSAGE: - rb->message(rb, txn, change->lsn, true, - change->data.msg.prefix, - change->data.msg.message_size, - change->data.msg.message); + ReorderBufferApplyMessage(rb, txn, change, streaming); break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: @@ -1790,7 +2206,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, snapshot_now = change->data.snapshot; } - /* and continue with the new one */ SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); break; @@ -1837,7 +2252,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, */ if (specinsert) { - ReorderBufferReturnChange(rb, specinsert); + ReorderBufferReturnChange(rb, specinsert, true); specinsert = NULL; } @@ -1845,14 +2260,35 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + /* + * Done with current changes, send the last message for this set of + * changes depending upon streaming mode. + */ + if (streaming) + { + if (stream_started) + { + rb->stream_stop(rb, txn, prev_lsn); + stream_started = false; + } + } + else + rb->commit(rb, txn, commit_lsn); /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) elog(ERROR, "output plugin used XID %u", GetCurrentTransactionId()); + /* + * Remember the command ID and snapshot for the next set of changes in + * streaming mode. + */ + if (streaming) + ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); + else if (snapshot_now->copied) + ReorderBufferFreeSnap(rb, snapshot_now); + /* cleanup */ TeardownHistoricSnapshot(false); @@ -1870,14 +2306,27 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - if (snapshot_now->copied) - ReorderBufferFreeSnap(rb, snapshot_now); + /* + * If we are streaming the in-progress transaction then discard the + * changes that we just streamed, and mark the transactions as + * streamed (if they contained changes). Otherwise, remove all the + * changes and deallocate the ReorderBufferTXN. + */ + if (streaming) + { + ReorderBufferTruncateTXN(rb, txn); - /* remove potential on-disk data, and deallocate */ - ReorderBufferCleanupTXN(rb, txn); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + ReorderBufferCleanupTXN(rb, txn); } PG_CATCH(); { + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ if (iterstate) ReorderBufferIterTXNFinish(rb, iterstate); @@ -1896,15 +2345,106 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - if (snapshot_now->copied) - ReorderBufferFreeSnap(rb, snapshot_now); + /* + * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent + * abort of the (sub)transaction we are streaming. We need to do the + * cleanup and return gracefully on this error, see SetupCheckXidLive. + */ + if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) + { + /* + * This error can only occur when we are sending the data in + * streaming mode and the streaming in not finished yet. + */ + Assert(streaming); + Assert(stream_started); + + /* Cleanup the temporary error state. */ + FlushErrorState(); + FreeErrorData(errdata); + errdata = NULL; + curtxn->concurrent_abort = true; + + /* Reset the TXN so that it is allowed to stream remaining data. */ + ReorderBufferResetTXN(rb, txn, snapshot_now, + command_id, prev_lsn, + specinsert); + } + else + { + ReorderBufferCleanupTXN(rb, txn); + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + } + PG_END_TRY(); +} - /* remove potential on-disk data, and deallocate */ - ReorderBufferCleanupTXN(rb, txn); +/* + * Perform the replay of a transaction and its non-aborted subtransactions. + * + * Subtransactions previously have to be processed by + * ReorderBufferCommitChild(), even if previously assigned to the toplevel + * transaction with ReorderBufferAssignChild. + * + * This interface is called once a toplevel commit is read for both streamed + * as well as non-streamed transactions. + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + Snapshot snapshot_now; + CommandId command_id = FirstCommandId; - PG_RE_THROW(); + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + /* + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. + * + * XXX Called after everything (origin ID and LSN, ...) is stored in the + * transaction, so we don't pass that directly. + */ + if (rbtxn_is_streamed(txn)) + { + ReorderBufferStreamCommit(rb, txn); + return; } - PG_END_TRY(); + + /* + * If this transaction has no snapshot, it didn't make any changes to the + * database, so there's nothing to decode. Note that + * ReorderBufferCommitChild will have transferred any snapshots from + * subtransactions if there were any. + */ + if (txn->base_snapshot == NULL) + { + Assert(txn->ninvalidations == 0); + ReorderBufferCleanupTXN(rb, txn); + return; + } + + snapshot_now = txn->base_snapshot; + + /* Process and send the changes to output plugin. */ + ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now, + command_id, false); } /* @@ -1931,6 +2471,22 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + /* For streamed transactions notify the remote node about the abort. */ + if (rbtxn_is_streamed(txn)) + { + rb->stream_abort(rb, txn, lsn); + + /* + * We might have decoded changes for this transaction that could load + * the cache as per the current transaction's view (consider DDL's + * happened in this transaction). We don't want the decoding of future + * transactions to use those cache entries so execute invalidations. + */ + if (txn->ninvalidations > 0) + ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, + txn->invalidations); + } + /* cosmetic... */ txn->final_lsn = lsn; @@ -2000,6 +2556,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + /* For streamed transactions notify the remote node about the abort. */ + if (rbtxn_is_streamed(txn)) + rb->stream_abort(rb, txn, lsn); + /* cosmetic... */ txn->final_lsn = lsn; @@ -2082,7 +2642,7 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, change->data.snapshot = snap; change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* @@ -2131,12 +2691,21 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, change->data.command_id = cid; change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* - * Update the memory accounting info. We track memory used by the whole - * reorder buffer and the transaction containing the change. + * Update memory counters to account for the new or removed change. + * + * We update two counters - in the reorder buffer, and in the transaction + * containing the change. The reorder buffer counter allows us to quickly + * decide if we reached the memory limit, the transaction counter allows + * us to quickly pick the largest transaction for eviction. + * + * When streaming is enabled, we need to update the toplevel transaction + * counters instead - we don't really care about subtransactions as we + * can't stream them individually anyway, and we only pick toplevel + * transactions for eviction. So only toplevel transactions matter. */ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, @@ -2144,6 +2713,8 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, bool addition) { Size sz; + ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn = NULL; Assert(change->txn); @@ -2155,19 +2726,41 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) return; + txn = change->txn; + + /* If streaming supported, update the total size in top level as well. */ + if (ReorderBufferCanStream(rb)) + { + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + } + sz = ReorderBufferChangeSize(change); if (addition) { - change->txn->size += sz; + txn->size += sz; rb->size += sz; + + /* Update the total size in the top transaction. */ + if (toptxn) + toptxn->total_size += sz; } else { - Assert((rb->size >= sz) && (change->txn->size >= sz)); - change->txn->size -= sz; + Assert((rb->size >= sz) && (txn->size >= sz)); + txn->size -= sz; rb->size -= sz; + + /* Update the total size in the top transaction. */ + if (toptxn) + toptxn->total_size -= sz; } + + Assert(txn->size <= rb->size); + Assert((txn->size >= 0) && (rb->size >= 0)); } /* @@ -2196,6 +2789,7 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, change->lsn = lsn; change->txn = txn; change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; + change->txn = txn; dlist_push_tail(&txn->tuplecids, &change->node); txn->ntuplecids++; @@ -2388,6 +2982,51 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) } /* + * Find the largest toplevel transaction to evict (by streaming). + * + * This can be seen as an optimized version of ReorderBufferLargestTXN, which + * should give us the same transaction (because we don't update memory account + * for subtransaction with streaming, so it's always 0). But we can simply + * iterate over the limited number of toplevel transactions. + * + * Note that, we skip transactions that contains incomplete changes. There + * is a scope of optimization here such that we can select the largest transaction + * which has complete changes. But that will make the code and design quite complex + * and that might not be worth the benefit. If we plan to stream the transactions + * that contains incomplete changes then we need to find a way to partially + * stream/truncate the transaction changes in-memory and build a mechanism to + * partially truncate the spilled files. Additionally, whenever we partially + * stream the transaction we need to maintain the last streamed lsn and next time + * we need to restore from that segment and the offset in WAL. As we stream the + * changes from the top transaction and restore them subtransaction wise, we need + * to even remember the subxact from where we streamed the last change. + */ +static ReorderBufferTXN * +ReorderBufferLargestTopTXN(ReorderBuffer *rb) +{ + dlist_iter iter; + Size largest_size = 0; + ReorderBufferTXN *largest = NULL; + + /* Find the largest top-level transaction. */ + dlist_foreach(iter, &rb->toplevel_by_lsn) + { + ReorderBufferTXN *txn; + + txn = dlist_container(ReorderBufferTXN, node, iter.cur); + + if ((largest != NULL || txn->total_size > largest_size) && + (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn))) + { + largest = txn; + largest_size = txn->total_size; + } + } + + return largest; +} + +/* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the largest (sub)transaction at-a-time to evict and spill its changes to * disk until we reach under the memory limit. @@ -2419,11 +3058,33 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { /* * Pick the largest transaction (or subtransaction) and evict it from - * memory by serializing it to disk. + * memory by streaming, if possible. Otherwise, spill to disk. */ - txn = ReorderBufferLargestTXN(rb); + if (ReorderBufferCanStartStreaming(rb) && + (txn = ReorderBufferLargestTopTXN(rb)) != NULL) + { + /* we know there has to be one, because the size is not zero */ + Assert(txn && !txn->toptxn); + Assert(txn->total_size > 0); + Assert(rb->size >= txn->total_size); - ReorderBufferSerializeTXN(rb, txn); + ReorderBufferStreamTXN(rb, txn); + } + else + { + /* + * Pick the largest transaction (or subtransaction) and evict it + * from memory by serializing it to disk. + */ + txn = ReorderBufferLargestTXN(rb); + + /* we know there has to be one, because the size is not zero */ + Assert(txn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); + + ReorderBufferSerializeTXN(rb, txn); + } /* * After eviction, the transaction should have no entries in memory, @@ -2501,7 +3162,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferSerializeChange(rb, txn, fd, change); dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); spilled++; } @@ -2713,6 +3374,136 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Assert(ondisk->change.action == change->action); } +/* Returns true, if the output plugin supports streaming, false, otherwise. */ +static inline bool +ReorderBufferCanStream(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + + return ctx->streaming; +} + +/* Returns true, if the streaming can be started now, false, otherwise. */ +static inline bool +ReorderBufferCanStartStreaming(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + SnapBuild *builder = ctx->snapshot_builder; + + /* + * We can't start streaming immediately even if the streaming is enabled + * because we previously decoded this transaction and now just are + * restarting. + */ + if (ReorderBufferCanStream(rb) && + !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr)) + { + /* We must have a consistent snapshot by this time */ + Assert(SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT); + return true; + } + + return false; +} + +/* + * Send data of a large transaction (and its subtransactions) to the + * output plugin, but using the stream API. + */ +static void +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + Snapshot snapshot_now; + CommandId command_id; + + /* We can never reach here for a sub transaction. */ + Assert(txn->toptxn == NULL); + + /* + * We can't make any assumptions about base snapshot here, similar to what + * ReorderBufferCommit() does. That relies on base_snapshot getting + * transferred from subxact in ReorderBufferCommitChild(), but that was + * not yet called as the transaction is in-progress. + * + * So just walk the subxacts and use the same logic here. But we only need + * to do that once, when the transaction is streamed for the first time. + * After that we need to reuse the snapshot from the previous run. + * + * Unlike DecodeCommit which adds xids of all the subtransactions in + * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here + * but we do add them to subxip array instead via ReorderBufferCopySnap. + * This allows the catalog changes made in subtransactions decoded till + * now to be visible. + */ + if (txn->snapshot_now == NULL) + { + dlist_iter subxact_i; + + /* make sure this transaction is streamed for the first time */ + Assert(!rbtxn_is_streamed(txn)); + + /* at the beginning we should have invalid command ID */ + Assert(txn->command_id == InvalidCommandId); + + dlist_foreach(subxact_i, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur); + ReorderBufferTransferSnapToParent(txn, subtxn); + } + + /* + * If this transaction has no snapshot, it didn't make any changes to + * the database till now, so there's nothing to decode. + */ + if (txn->base_snapshot == NULL) + { + Assert(txn->ninvalidations == 0); + return; + } + + command_id = FirstCommandId; + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot, + txn, command_id); + } + else + { + /* the transaction must have been already streamed */ + Assert(rbtxn_is_streamed(txn)); + + /* + * Nah, we already have snapshot from the previous streaming run. We + * assume new subxacts can't move the LSN backwards, and so can't beat + * the LSN condition in the previous branch (so no need to walk + * through subxacts again). In fact, we must not do that as we may be + * using snapshot half-way through the subxact. + */ + command_id = txn->command_id; + + /* + * We can not use txn->snapshot_now directly because after the last + * streaming run, we might have got some new sub-transactions. So we + * need to add them to the snapshot. + */ + snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now, + txn, command_id); + + /* Free the previously copied snapshot. */ + Assert(txn->snapshot_now->copied); + ReorderBufferFreeSnap(rb, txn->snapshot_now); + txn->snapshot_now = NULL; + } + + /* Process and send the changes to output plugin. */ + ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, + command_id, true); + + Assert(dlist_is_empty(&txn->changes)); + Assert(txn->nentries == 0); + Assert(txn->nentries_mem == 0); +} + /* * Size of a change in memory. */ @@ -2813,7 +3604,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, dlist_container(ReorderBufferChange, node, cleanup_iter.cur); dlist_delete(&cleanup->node); - ReorderBufferReturnChange(rb, cleanup); + ReorderBufferReturnChange(rb, cleanup, true); } txn->nentries_mem = 0; Assert(dlist_is_empty(&txn->changes)); @@ -3522,7 +4313,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn) dlist_container(ReorderBufferChange, node, it.cur); dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change); + ReorderBufferReturnChange(rb, change, true); } } @@ -3812,6 +4603,17 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, BlockNumber blockno; bool updated_mapping = false; + /* + * Return unresolved if tuplecid_data is not valid. That's because when + * streaming in-progress transactions we may run into tuples with the CID + * before actually decoding them. Think e.g. about INSERT followed by + * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the + * INSERT. So in such cases, we assume the CID is from the future + * command. + */ + if (tuplecid_data == NULL) + return false; + /* be careful about padding */ memset(&key, 0, sizeof(key)); diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 95d18cd..aa17f7d 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -67,6 +67,7 @@ #define XLH_INSERT_LAST_IN_MULTI (1<<1) #define XLH_INSERT_IS_SPECULATIVE (1<<2) #define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3) +#define XLH_INSERT_ON_TOAST_RELATION (1<<4) /* * xl_heap_update flag values, 8 bits are available. diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 7ba72c8..387eb34 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -19,6 +19,7 @@ #include "access/relscan.h" #include "access/sdir.h" +#include "access/xact.h" #include "utils/guc.h" #include "utils/rel.h" #include "utils/snapshot.h" @@ -903,6 +904,15 @@ static inline bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) { slot->tts_tableOid = RelationGetRelid(sscan->rs_rd); + + /* + * We don't expect direct calls to table_scan_getnextslot with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_getnextslot call during logical decoding"); + return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot); } @@ -1017,6 +1027,13 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan, TupleTableSlot *slot, bool *call_again, bool *all_dead) { + /* + * We don't expect direct calls to table_index_fetch_tuple with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_index_fetch_tuple call during logical decoding"); return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot, slot, call_again, @@ -1056,6 +1073,14 @@ table_tuple_fetch_row_version(Relation rel, Snapshot snapshot, TupleTableSlot *slot) { + /* + * We don't expect direct calls to table_tuple_fetch_row_version with + * valid CheckXidAlive for catalog or regular tables. See detailed + * comments in xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_tuple_fetch_row_version call during logical decoding"); + return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot); } @@ -1713,6 +1738,14 @@ static inline bool table_scan_bitmap_next_block(TableScanDesc scan, struct TBMIterateResult *tbmres) { + /* + * We don't expect direct calls to table_scan_bitmap_next_block with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding"); + return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan, tbmres); } @@ -1730,6 +1763,14 @@ table_scan_bitmap_next_tuple(TableScanDesc scan, struct TBMIterateResult *tbmres, TupleTableSlot *slot) { + /* + * We don't expect direct calls to table_scan_bitmap_next_tuple with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_bitmap_next_tuple call during logical decoding"); + return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan, tbmres, slot); @@ -1748,6 +1789,13 @@ static inline bool table_scan_sample_next_block(TableScanDesc scan, struct SampleScanState *scanstate) { + /* + * We don't expect direct calls to table_scan_sample_next_block with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_sample_next_block call during logical decoding"); return scan->rs_rd->rd_tableam->scan_sample_next_block(scan, scanstate); } @@ -1764,6 +1812,13 @@ table_scan_sample_next_tuple(TableScanDesc scan, struct SampleScanState *scanstate, TupleTableSlot *slot) { + /* + * We don't expect direct calls to table_scan_sample_next_tuple with valid + * CheckXidAlive for catalog or regular tables. See detailed comments in + * xact.c where these variables are declared. + */ + if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) + elog(ERROR, "unexpected table_scan_sample_next_tuple call during logical decoding"); return scan->rs_rd->rd_tableam->scan_sample_next_tuple(scan, scanstate, slot); } diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 5348011..c18554b 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -81,6 +81,10 @@ typedef enum /* Synchronous commit level */ extern int synchronous_commit; +/* used during logical streaming of a transaction */ +extern TransactionId CheckXidAlive; +extern bool bsysscan; + /* * Miscellaneous flag bits to record events which occur on the top level * transaction. These flags are only persisted in MyXactFlags and are intended diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index deef318..b0fae98 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -121,5 +121,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); +extern void ResetLogicalStreamingState(void); #endif diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 42bc817..1ae17d5 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -162,6 +162,9 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_STREAMED 0x0008 +#define RBTXN_HAS_TOAST_INSERT 0x0010 +#define RBTXN_HAS_SPEC_INSERT 0x0020 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -181,6 +184,40 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ ) +/* This transaction's changes has toast insert, without main table insert. */ +#define rbtxn_has_toast_insert(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \ +) +/* + * This transaction's changes has speculative insert, without speculative + * confirm. + */ +#define rbtxn_has_spec_insert(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \ +) + +/* Check whether this transaction has an incomplete change. */ +#define rbtxn_has_incomplete_tuple(txn) \ +( \ + rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \ +) + +/* + * Has this transaction been streamed to downstream? + * + * (It's not possible to deduce this from nentries and nentries_mem for + * various reasons. For example, all changes may be in subtransactions in + * which case we'd have nentries==0 for the toplevel one, which would say + * nothing about the streaming. So we maintain this flag, but only for the + * toplevel transaction.) + */ +#define rbtxn_is_streamed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ +) + typedef struct ReorderBufferTXN { /* See above */ @@ -249,6 +286,13 @@ typedef struct ReorderBufferTXN dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ /* + * Snapshot/CID from the previous streaming run. Only valid for already + * streamed transactions (NULL/InvalidCommandId otherwise). + */ + Snapshot snapshot_now; + CommandId command_id; + + /* * How many ReorderBufferChange's do we have in this txn. * * Changes in subtransactions are *not* included but tracked separately. @@ -313,6 +357,12 @@ typedef struct ReorderBufferTXN * Size of this transaction (changes currently in memory, in bytes). */ Size size; + + /* Size of top-transaction including sub-transactions. */ + Size total_size; + + /* If we have detected concurrent abort then ignore future changes. */ + bool concurrent_abort; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ @@ -484,12 +534,14 @@ void ReorderBufferFree(ReorderBuffer *); ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); -void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); +void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool); Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids); void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); -void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, + XLogRecPtr lsn, ReorderBufferChange *, + bool toast_insert); void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); -- 1.8.3.1 v45/v45-0002-Extend-the-BufFile-interface-for-the-streaming-o.patch0000664000175000017500000003753413712204536025657 0ustar dilipkumardilipkumarFrom 4207c6d4c9802911ab5be39b6c97627657e7f4d9 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 14 Jul 2020 10:56:51 +0530 Subject: [PATCH v45 2/6] Extend the BufFile interface for the streaming of in-progress transactions. Allow BufFile to support temporary files that can be used by the single backend when the corresponding files need to be survived across the transaction and need to be opened and closed multiple times. Implement the interface for BufFileTruncate interface to allow files to be truncated up to a particular offset. Extend BufFileSeek API to support SEEK_END case. Add an option to provide a mode while opening the shared BufFiles instead of always opening in read-only mode. --- src/backend/postmaster/pgstat.c | 3 + src/backend/storage/file/buffile.c | 86 +++++++++++++++++++++++---- src/backend/storage/file/fd.c | 9 ++- src/backend/storage/file/sharedfileset.c | 98 ++++++++++++++++++++++++++++--- src/backend/utils/sort/logtape.c | 4 +- src/backend/utils/sort/sharedtuplestore.c | 2 +- src/include/pgstat.h | 1 + src/include/storage/buffile.h | 4 +- src/include/storage/fd.h | 2 +- src/include/storage/sharedfileset.h | 4 +- 10 files changed, 186 insertions(+), 27 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 15f92b6..3804412 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3940,6 +3940,9 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_BUFFILE_WRITE: event_name = "BufFileWrite"; break; + case WAIT_EVENT_BUFFILE_TRUNCATE: + event_name = "BufFileTruncate"; + break; case WAIT_EVENT_CONTROL_FILE_READ: event_name = "ControlFileRead"; break; diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 2d7a082..a9ca5d9 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -32,10 +32,14 @@ * (by opening multiple fd.c temporary files). This is an essential feature * for sorts and hashjoins on large amounts of data. * - * BufFile supports temporary files that can be made read-only and shared with - * other backends, as infrastructure for parallel execution. Such files need - * to be created as a member of a SharedFileSet that all participants are - * attached to. + * BufFile supports temporary files that can be shared with other backends, as + * infrastructure for parallel execution. Such files need to be created as a + * member of a SharedFileSet that all participants are attached to. + * + * BufFile supports temporary files that can be used by the single backend when + * the corresponding files need to be survived across the transaction and need + * to be opened and closed multiple times. Such files need to be created as a + * member of a SharedFileSet. *------------------------------------------------------------------------- */ @@ -277,7 +281,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name) * backends and render it read-only. */ BufFile * -BufFileOpenShared(SharedFileSet *fileset, const char *name) +BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode) { BufFile *file; char segment_name[MAXPGPATH]; @@ -301,7 +305,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name) } /* Try to load a segment. */ SharedSegmentName(segment_name, name, nfiles); - files[nfiles] = SharedFileSetOpen(fileset, segment_name); + files[nfiles] = SharedFileSetOpen(fileset, segment_name, mode); if (files[nfiles] <= 0) break; ++nfiles; @@ -321,7 +325,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name) file = makeBufFileCommon(nfiles); file->files = files; - file->readOnly = true; /* Can't write to files opened this way */ + file->readOnly = (mode == O_RDONLY) ? true : false; file->fileset = fileset; file->name = pstrdup(name); @@ -364,6 +368,9 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name) if (!found) elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name); + + /* Unregister the shared fileset */ + SharedFileSetUnregister(fileset); } /* @@ -666,11 +673,22 @@ BufFileSeek(BufFile *file, int fileno, off_t offset, int whence) newFile = file->curFile; newOffset = (file->curOffset + file->pos) + offset; break; -#ifdef NOT_USED case SEEK_END: - /* could be implemented, not needed currently */ + + /* + * Get the file size of the last file to get the last offset of + * that file. + */ + newFile = file->numFiles - 1; + newOffset = FileSize(file->files[file->numFiles - 1]); + if (newOffset < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not determine size of temporary file \"%s\" from BufFile \"%s\": %m", + FilePathName(file->files[file->numFiles - 1]), + file->name))); + break; break; -#endif default: elog(ERROR, "invalid whence: %d", whence); return EOF; @@ -838,3 +856,51 @@ BufFileAppend(BufFile *target, BufFile *source) return startBlock; } + +/* + * Truncate the file upto the given fileno and the offsets. + */ +void +BufFileTruncateShared(BufFile *file, int fileno, off_t offset) +{ + int newFile = file->numFiles; + off_t newOffset = file->curOffset; + char segment_name[MAXPGPATH]; + int i; + + /* Loop over all the files upto the fileno which we want to truncate. */ + for (i = file->numFiles - 1; i >= fileno; i--) + { + /* + * Except the fileno, we can directly delete other files. If the + * offset is 0 then we can delete the fileno file as well unless it is + * the first file. + */ + if ((i != fileno || offset == 0) && fileno != 0) + { + SharedSegmentName(segment_name, file->name, i); + FileClose(file->files[i]); + if (!SharedFileSetDelete(file->fileset, segment_name, true)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not delete shared fileset \"%s\": %m", + segment_name))); + newFile--; + newOffset = MAX_PHYSICAL_FILESIZE; + } + else + { + if (FileTruncate(file->files[i], offset, + WAIT_EVENT_BUFFILE_TRUNCATE) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\": %m", + FilePathName(file->files[i])))); + + newOffset = offset; + } + } + + file->numFiles = newFile; + file->curOffset = newOffset; +} diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 5f6420e..f376a97 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -1743,18 +1743,17 @@ PathNameCreateTemporaryFile(const char *path, bool error_on_failure) /* * Open a file that was created with PathNameCreateTemporaryFile, possibly in * another backend. Files opened this way don't count against the - * temp_file_limit of the caller, are read-only and are automatically closed - * at the end of the transaction but are not deleted on close. + * temp_file_limit of the caller, are automatically closed at the end of the + * transaction but are not deleted on close. */ File -PathNameOpenTemporaryFile(const char *path) +PathNameOpenTemporaryFile(const char *path, int mode) { File file; ResourceOwnerEnlargeFiles(CurrentResourceOwner); - /* We open the file read-only. */ - file = PathNameOpenFile(path, O_RDONLY | PG_BINARY); + file = PathNameOpenFile(path, mode | PG_BINARY); /* If no such file, then we don't raise an error. */ if (file <= 0 && errno != ENOENT) diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c index 16b7594..9a3dc10 100644 --- a/src/backend/storage/file/sharedfileset.c +++ b/src/backend/storage/file/sharedfileset.c @@ -13,6 +13,10 @@ * files can be discovered by name, and a shared ownership semantics so that * shared files survive until the last user detaches. * + * SharedFileSets can also be used by backends when the temporary files need + * to be opened/closed multiple times and the underlying files need to survive + * across transactions. + * *------------------------------------------------------------------------- */ @@ -25,19 +29,29 @@ #include "common/hashfn.h" #include "miscadmin.h" #include "storage/dsm.h" +#include "storage/ipc.h" #include "storage/sharedfileset.h" #include "utils/builtins.h" +static List *filesetlist = NIL; + static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum); +static void SharedFileSetDeleteOnProcExit(int status, Datum arg); static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace); static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name); static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name); /* - * Initialize a space for temporary files that can be opened for read-only - * access by other backends. Other backends must attach to it before - * accessing it. Associate this SharedFileSet with 'seg'. Any contained - * files will be deleted when the last backend detaches. + * Initialize a space for temporary files that can be opened by other backends. + * Other backends must attach to it before accessing it. Associate this + * SharedFileSet with 'seg'. Any contained files will be deleted when the + * last backend detaches. + * + * We can also use this interface if the temporary files are used only by + * single backend but the files need to be opened and closed multiple times + * and also the underlying files need to survive across transactions. For + * such cases, dsm segment 'seg' should be passed as NULL. We remove such + * files on proc exit. * * Files will be distributed over the tablespaces configured in * temp_tablespaces. @@ -84,7 +98,25 @@ SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg) } /* Register our cleanup callback. */ - on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset)); + if (seg) + on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset)); + else + { + static bool registered_cleanup = false; + + if (!registered_cleanup) + { + /* + * We must not have registered any fileset before registering the + * fileset clean up. + */ + Assert(filesetlist == NIL); + on_proc_exit(SharedFileSetDeleteOnProcExit, 0); + registered_cleanup = true; + } + + filesetlist = lcons((void *) fileset, filesetlist); + } } /* @@ -147,13 +179,13 @@ SharedFileSetCreate(SharedFileSet *fileset, const char *name) * another backend. */ File -SharedFileSetOpen(SharedFileSet *fileset, const char *name) +SharedFileSetOpen(SharedFileSet *fileset, const char *name, int mode) { char path[MAXPGPATH]; File file; SharedFilePath(path, fileset, name); - file = PathNameOpenTemporaryFile(path); + file = PathNameOpenTemporaryFile(path, mode); return file; } @@ -223,6 +255,58 @@ SharedFileSetOnDetach(dsm_segment *segment, Datum datum) } /* + * Callback function that will be invoked on the process exit. This will + * process the list of all the registered sharedfilesets and delete the + * underlying files. + */ +static void +SharedFileSetDeleteOnProcExit(int status, Datum arg) +{ + ListCell *l; + + /* Loop over all the pending shared fileset entry */ + foreach (l, filesetlist) + { + SharedFileSet *fileset = (SharedFileSet *) lfirst(l); + SharedFileSetDeleteAll(fileset); + } + + filesetlist = NIL; +} + +/* + * Unregister the shared fileset entry registered for cleanup on proc exit. + */ +void +SharedFileSetUnregister(SharedFileSet *input_fileset) +{ + bool found = false; + ListCell *l; + + /* + * If the caller is following the dsm based cleanup then we don't + * maintain the filesetlist so return. + */ + if (filesetlist == NIL) + return; + + foreach (l, filesetlist) + { + SharedFileSet *fileset = (SharedFileSet *) lfirst(l); + + /* Remove the entry from the list */ + if (input_fileset == fileset) + { + filesetlist = list_delete_cell(filesetlist, l); + found = true; + break; + } + } + + Assert(found); +} + +/* * Build the path for the directory holding the files backing a SharedFileSet * in a given tablespace. */ diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 5517e59..788815c 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -78,6 +78,8 @@ #include "postgres.h" +#include + #include "storage/buffile.h" #include "utils/builtins.h" #include "utils/logtape.h" @@ -551,7 +553,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lt = <s->tapes[i]; pg_itoa(i, filename); - file = BufFileOpenShared(fileset, filename); + file = BufFileOpenShared(fileset, filename, O_RDONLY); filesize = BufFileSize(file); /* diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 6537a43..b83fb50 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -559,7 +559,7 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) sts_filename(name, accessor, accessor->read_participant); accessor->read_file = - BufFileOpenShared(accessor->fileset, name); + BufFileOpenShared(accessor->fileset, name, O_RDONLY); } /* Seek and load the chunk header. */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 1387201..807a9c1 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -916,6 +916,7 @@ typedef enum WAIT_EVENT_BASEBACKUP_READ = PG_WAIT_IO, WAIT_EVENT_BUFFILE_READ, WAIT_EVENT_BUFFILE_WRITE, + WAIT_EVENT_BUFFILE_TRUNCATE, WAIT_EVENT_CONTROL_FILE_READ, WAIT_EVENT_CONTROL_FILE_SYNC, WAIT_EVENT_CONTROL_FILE_SYNC_UPDATE, diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index f4752ba..fc34c49 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -48,7 +48,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source); extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name); extern void BufFileExportShared(BufFile *file); -extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name); +extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name, + int mode); extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name); +extern void BufFileTruncateShared(BufFile *file, int fileno, off_t offset); #endif /* BUFFILE_H */ diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 8cd125d..e209f04 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -94,7 +94,7 @@ extern mode_t FileGetRawMode(File file); /* Operations used for sharing named temporary files */ extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure); -extern File PathNameOpenTemporaryFile(const char *name); +extern File PathNameOpenTemporaryFile(const char *path, int mode); extern bool PathNameDeleteTemporaryFile(const char *name, bool error_on_failure); extern void PathNameCreateTemporaryDir(const char *base, const char *name); extern void PathNameDeleteTemporaryDir(const char *name); diff --git a/src/include/storage/sharedfileset.h b/src/include/storage/sharedfileset.h index 2d6cf07..d5edb60 100644 --- a/src/include/storage/sharedfileset.h +++ b/src/include/storage/sharedfileset.h @@ -37,9 +37,11 @@ typedef struct SharedFileSet extern void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg); extern void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg); extern File SharedFileSetCreate(SharedFileSet *fileset, const char *name); -extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name); +extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name, + int mode); extern bool SharedFileSetDelete(SharedFileSet *fileset, const char *name, bool error_on_failure); extern void SharedFileSetDeleteAll(SharedFileSet *fileset); +extern void SharedFileSetUnregister(SharedFileSet *input_fileset); #endif -- 1.8.3.1 v45/v45-0003-Add-support-for-streaming-to-built-in-replicatio.patch0000664000175000017500000027354513712204536026071 0ustar dilipkumardilipkumarFrom ac24187438e68ec702f5952dbc7fd7127e739724 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 20 Jul 2020 10:37:49 +0530 Subject: [PATCH v45 3/6] Add support for streaming to built-in replication To add support for streaming of in-progress transactions into the built-in transaction, we need to do three things: * Extend the logical replication protocol, so identify in-progress transactions, and allow adding additional bits of information (e.g. XID of subtransactions). * Modify the output plugin (pgoutput) to implement the new stream API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle streamed in-progress transaction by spilling the data to disk and then replaying them on commit. We however must explicitly disable streaming replication during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover we don't have a replication connection open so we don't have where to send the data anyway. --- doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 11 + src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 49 +- src/backend/postmaster/pgstat.c | 12 + .../libpqwalreceiver/libpqwalreceiver.c | 4 + src/backend/replication/logical/proto.c | 140 ++- src/backend/replication/logical/worker.c | 946 ++++++++++++++++++++- src/backend/replication/pgoutput/pgoutput.c | 348 +++++++- src/include/catalog/pg_subscription.h | 3 + src/include/pgstat.h | 6 +- src/include/replication/logicalproto.h | 46 +- src/include/replication/walreceiver.h | 1 + src/test/subscription/t/009_stream_simple.pl | 86 ++ src/test/subscription/t/010_stream_subxact.pl | 102 +++ src/test/subscription/t/011_stream_ddl.pl | 95 +++ .../subscription/t/012_stream_subxact_abort.pl | 82 ++ .../subscription/t/013_stream_subxact_ddl_abort.pl | 84 ++ src/test/subscription/t/015_stream_binary.pl | 86 ++ 19 files changed, 2060 insertions(+), 47 deletions(-) create mode 100644 src/test/subscription/t/009_stream_simple.pl create mode 100644 src/test/subscription/t/010_stream_subxact.pl create mode 100644 src/test/subscription/t/011_stream_ddl.pl create mode 100644 src/test/subscription/t/012_stream_subxact_abort.pl create mode 100644 src/test/subscription/t/013_stream_subxact_ddl_abort.pl create mode 100644 src/test/subscription/t/015_stream_binary.pl diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 81c4e70..a81bd54 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -165,8 +165,9 @@ ALTER SUBSCRIPTION name RENAME TO < . See there for more information. The parameters that can be altered are slot_name, - synchronous_commit, and - binary. + synchronous_commit, + binary and + streaming. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index cdb22c5..b7d7457 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -228,6 +228,17 @@ CREATE SUBSCRIPTION subscription_name + + streaming (boolean) + + + Specifies whether streaming of in-progress transactions should + be enabled for this subscription. By default, all transactions + are fully decoded on the publisher, and only then sent to the + subscriber as a whole. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 90bf5cf..311d462 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->binary = subform->subbinary; + sub->stream = subform->substream; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 40b6377..4c58ad8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -63,7 +63,8 @@ parse_subscription_options(List *options, bool *copy_data, char **synchronous_commit, bool *refresh, - bool *binary_given, bool *binary) + bool *binary_given, bool *binary, + bool *streaming_given, bool *streaming) { ListCell *lc; bool connect_given = false; @@ -99,6 +100,8 @@ parse_subscription_options(List *options, *binary_given = false; *binary = false; } + if (streaming) + *streaming_given = false; /* Parse options */ foreach(lc, options) @@ -194,6 +197,16 @@ parse_subscription_options(List *options, *binary_given = true; *binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "streaming") == 0 && streaming) + { + if (*streaming_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *streaming_given = true; + *streaming = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -337,6 +350,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + bool streaming; + bool streaming_given; char *synchronous_commit; char *conninfo; char *slotname; @@ -360,7 +375,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ©_data, &synchronous_commit, NULL, /* no "refresh" */ - &binary_given, &binary); + &binary_given, &binary, + &streaming_given, &streaming); /* * Since creating a replication slot is not transactional, rolling back @@ -439,6 +455,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); + if (streaming_given) + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(streaming); + else + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(false); + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -698,6 +721,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *synchronous_commit; bool binary_given; bool binary; + bool streaming_given; + bool streaming; parse_subscription_options(stmt->options, NULL, /* no "connect" */ @@ -707,7 +732,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, /* no "copy_data" */ &synchronous_commit, NULL, /* no "refresh" */ - &binary_given, &binary); + &binary_given, &binary, + &streaming_given, &streaming); if (slotname_given) { @@ -739,6 +765,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subbinary - 1] = true; } + if (streaming_given) + { + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(streaming); + replaces[Anum_pg_subscription_substream - 1] = true; + } + update_tuple = true; break; } @@ -756,7 +789,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, /* no "copy_data" */ NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ - NULL, NULL); /* no "binary" */ + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no streaming */ Assert(enabled_given); if (!sub->slotname && enabled) @@ -800,8 +834,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ©_data, NULL, /* no "synchronous_commit" */ &refresh, - NULL, NULL); /* no "binary" */ - + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -843,7 +877,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ©_data, NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ - NULL, NULL); /* no "binary" */ + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 3804412..bb0f95a 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4141,6 +4141,18 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_WAL_WRITE: event_name = "WALWrite"; break; + case WAIT_EVENT_LOGICAL_CHANGES_READ: + event_name = "ReorderLogicalChangesRead"; + break; + case WAIT_EVENT_LOGICAL_CHANGES_WRITE: + event_name = "ReorderLogicalChangesWrite"; + break; + case WAIT_EVENT_LOGICAL_SUBXACT_READ: + event_name = "ReorderLogicalSubxactRead"; + break; + case WAIT_EVENT_LOGICAL_SUBXACT_WRITE: + event_name = "ReorderLogicalSubxactWrite"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e905723..a6101ac 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -408,6 +408,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, "proto_version '%u'", options->proto.logical.proto_version); + if (options->proto.logical.streaming && + PQserverVersion(conn->streamConn) >= 140000) + appendStringInfo(&cmd, ", streaming 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9ff8097..ff25924 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -138,10 +138,15 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary) +logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, + HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'I'); /* action INSERT */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -177,8 +182,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) * Write UPDATE to the output stream. */ void -logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary) +logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, + HeapTuple oldtuple, HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -186,6 +191,10 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -247,7 +256,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, * Write DELETE to the output stream. */ void -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary) +logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, + HeapTuple oldtuple, bool binary) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -255,6 +265,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool b pq_sendbyte(out, 'D'); /* action DELETE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -295,6 +309,7 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) */ void logicalrep_write_truncate(StringInfo out, + TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs) @@ -304,6 +319,10 @@ logicalrep_write_truncate(StringInfo out, pq_sendbyte(out, 'T'); /* action TRUNCATE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + pq_sendint32(out, nrelids); /* encode and send truncate flags */ @@ -346,12 +365,16 @@ logicalrep_read_truncate(StringInfo in, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) { char *relname; pq_sendbyte(out, 'R'); /* sending RELATION */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -396,7 +419,7 @@ logicalrep_read_rel(StringInfo in) * This function will always write base type info. */ void -logicalrep_write_typ(StringInfo out, Oid typoid) +logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) { Oid basetypoid = getBaseType(typoid); HeapTuple tup; @@ -404,6 +427,10 @@ logicalrep_write_typ(StringInfo out, Oid typoid) pq_sendbyte(out, 'Y'); /* sending TYPE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for type %u", basetypoid); @@ -720,3 +747,104 @@ logicalrep_read_namespace(StringInfo in) return nspname; } + +void +logicalrep_write_stream_start(StringInfo out, + TransactionId xid, bool first_segment) +{ + pq_sendbyte(out, 'S'); /* action STREAM START */ + + Assert(TransactionIdIsValid(xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); + + /* 1 if this is the first streaming segment for this xid */ + pq_sendbyte(out, first_segment ? 1 : 0); +} + +TransactionId +logicalrep_read_stream_start(StringInfo in, bool *first_segment) +{ + TransactionId xid; + + Assert(first_segment); + + xid = pq_getmsgint(in, 4); + *first_segment = (pq_getmsgbyte(in) == 1); + + return xid; +} + +void +logicalrep_write_stream_stop(StringInfo out) +{ + pq_sendbyte(out, 'E'); /* action STREAM END */ +} + +void +logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ + + Assert(TransactionIdIsValid(txn->xid)); + + /* transaction ID */ + pq_sendint32(out, txn->xid); + + /* send the flags field (unused for now) */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); +} + +TransactionId +logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) +{ + TransactionId xid; + uint8 flags; + + xid = pq_getmsgint(in, 4); + + /* read flags (unused for now) */ + flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit message", flags); + + /* read fields */ + commit_data->commit_lsn = pq_getmsgint64(in); + commit_data->end_lsn = pq_getmsgint64(in); + commit_data->committime = pq_getmsgint64(in); + + return xid; +} + +void +logicalrep_write_stream_abort(StringInfo out, TransactionId xid, + TransactionId subxid) +{ + pq_sendbyte(out, 'A'); /* action STREAM ABORT */ + + Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); + + /* transaction ID */ + pq_sendint32(out, xid); + pq_sendint32(out, subxid); +} + +void +logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, + TransactionId *subxid) +{ + Assert(xid && subxid); + + *xid = pq_getmsgint(in, 4); + *subxid = pq_getmsgint(in, 4); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2fcf2e6..98e7fd0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -18,11 +18,43 @@ * This module includes server facing code and shares libpqwalreceiver * module with walreceiver for providing the libpq specific functionality. * + * + * STREAMED TRANSACTIONS + * --------------------- + * + * Streamed transactions (large transactions exceeding a memory limit on the + * upstream) are not applied immediately, but instead the data is written + * to temporary files and then applied at once when the final commit arrives. + * + * Unlike the regular (non-streamed) case, handling streamed transactions has + * to handle aborts of both the toplevel transaction and subtransactions. This + * is achieved by tracking offsets for subtransactions, which is then used + * to truncate the file with serialized changes. + * + * The files are placed in tmp file directory by default, and the filenames + * include both the XID of the toplevel transaction and OID of the + * subscription. This is necessary so that different workers processing a + * remote transaction with the same XID don't interfere. + * + * We use BufFiles instead of using normal temporary files because (a) the + * BufFile infrastructure supports temporary files that exceed the OS file size + * limit, (b) provides a way for automatic clean up on the error and (c) provides + * a way to survive these files across local transactions and allow to open and + * close at stream start and close. We decided to use SharedFileSet + * infrastructure as without that it deletes the files on the closure of file + * and if we decide to keep stream files open across the start/stop stream then + * it will consume a lot of memory (more than 8K). Moreover, if we don't use + * SharedFileSet then we also need to invent a new way to pass filenames to + * BufFile APIs so that we can be allowed to open the file we desired across + * multiple stream open calls for the same transaction. *------------------------------------------------------------------------- */ #include "postgres.h" +#include +#include + #include "access/table.h" #include "access/tableam.h" #include "access/xact.h" @@ -33,7 +65,9 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "catalog/pg_tablespace.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/execPartition.h" @@ -63,7 +97,9 @@ #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" +#include "storage/buffile.h" #include "storage/bufmgr.h" +#include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -71,6 +107,7 @@ #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/catcache.h" +#include "utils/dynahash.h" #include "utils/datum.h" #include "utils/fmgroids.h" #include "utils/guc.h" @@ -99,9 +136,26 @@ typedef struct SlotErrCallbackArg int remote_attnum; } SlotErrCallbackArg; +/* + * Stream xid hash entry. Whenever we see a new xid we create this entry in the + * xidhash and along with it create the streaming file and store the fileset handle. + * The subxact file is created iff there is any suxact info under this xid. This + * entry is used on the subsequent streams for the xid to get the corresponding + * fileset handles. + */ +typedef struct StreamXidHash +{ + TransactionId xid; /* xid is the hash key and must be first */ + SharedFileSet *stream_fileset; /* shared file set for stream data */ + SharedFileSet *subxact_fileset; /* shared file set for subxact info */ +} StreamXidHash; + static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; +/* per stream context for streaming transactions. */ +static MemoryContext LogicalStreamingContext = NULL; + WalReceiverConn *wrconn = NULL; Subscription *MySubscription = NULL; @@ -110,12 +164,62 @@ bool MySubscriptionValid = false; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +/* fields valid only when processing streamed transaction */ +bool in_streamed_transaction = false; + +static TransactionId stream_xid = InvalidTransactionId; + +/* + * Hash table for storing the streaming xid information along with shared file + * set for streaming and subxact files. On every stream start we need to open + * the xid's files and for that we need the shared file set handle. So storing + * it in xid hash make it faster to search. + */ +static HTAB *xidhash = NULL; + +/* Buf file handle of the current streaming file. */ +static BufFile *stream_fd = NULL; + +typedef struct SubXactInfo +{ + TransactionId xid; /* XID of the subxact */ + int fileno; /* file number in the buffile */ + off_t offset; /* offset in the file */ +} SubXactInfo; + +static uint32 nsubxacts = 0; +static uint32 nsubxacts_max = 0; +static SubXactInfo *subxacts = NULL; +static TransactionId subxact_last = InvalidTransactionId; + +static void subxact_filename(char *path, Oid subid, TransactionId xid); +static void changes_filename(char *path, Oid subid, TransactionId xid); + +/* + * Information about subtransactions of a given toplevel transaction. + */ +static void subxact_info_write(Oid subid, TransactionId xid); +static void subxact_info_read(Oid subid, TransactionId xid); +static void subxact_info_add(TransactionId xid); +static inline void cleanup_subxact_info(void); + +/* + * Serialize and deserialize changes for a toplevel transaction. + */ +static void stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok); +static void stream_open_file(Oid subid, TransactionId xid, bool first); +static void stream_write_change(char action, StringInfo s); +static void stream_close_file(void); + static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); +/* prototype needed because of stream_commit */ +static void apply_dispatch(StringInfo s); + static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot); static void apply_handle_update_internal(ResultRelInfo *relinfo, @@ -187,6 +291,42 @@ ensure_transaction(void) return true; } +/* + * Handle streamed transactions. + * + * If in streaming mode (receiving a block of streamed transaction), we + * simply redirect it to a file for the proper toplevel transaction. + * + * Returns true for streamed transactions, false otherwise (regular mode). + */ +static bool +handle_streamed_transaction(const char action, StringInfo s) +{ + TransactionId xid; + + /* not in streaming mode */ + if (!in_streamed_transaction) + return false; + + Assert(stream_fd != NULL); + Assert(TransactionIdIsValid(stream_xid)); + + /* + * We should have received XID of the subxact as the first part of the + * message, so extract it. + */ + xid = pq_getmsgint(s, 4); + + Assert(TransactionIdIsValid(xid)); + + /* Add the new subxact to the array (unless already there). */ + subxact_info_add(xid); + + /* write the change to the current file */ + stream_write_change(action, s); + + return true; +} /* * Executor state preparation for evaluation of constraint expressions, @@ -612,17 +752,323 @@ static void apply_handle_origin(StringInfo s) { /* - * ORIGIN message can only come inside remote transaction and before any - * actual writes. + * ORIGIN message can only come inside remote transaction or + * inside streaming transaction and before any actual writes. */ - if (!in_remote_transaction || - (IsTransactionState() && !am_tablesync_worker())) + if ((!in_remote_transaction && !in_streamed_transaction) || + ((IsTransactionState() && !am_tablesync_worker()) && + !in_streamed_transaction)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("ORIGIN message sent out of order"))); } /* + * Handle STREAM START message. + */ +static void +apply_handle_stream_start(StringInfo s) +{ + bool first_segment; + HASHCTL hash_ctl; + + Assert(!in_streamed_transaction); + + /* + * Start a transaction on stream start, this transaction will be committed + * on the stream stop. We need the transaction for handling the buffile, + * used for serializing the streaming data and subxact info. + */ + ensure_transaction(); + + /* notify handle methods we're processing a remote transaction */ + in_streamed_transaction = true; + + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + + /* Initialize the xidhash table if we haven't yet */ + if (xidhash == NULL) + { + hash_ctl.keysize = sizeof(TransactionId); + hash_ctl.entrysize = sizeof(StreamXidHash); + hash_ctl.hcxt = ApplyContext; + xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, + HASH_ELEM | HASH_CONTEXT); + } + + /* open the spool file for this transaction */ + stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); + + /* if this is not the first segment, open existing file */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, stream_xid); + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle STREAM STOP message. + */ +static void +apply_handle_stream_stop(StringInfo s) +{ + Assert(in_streamed_transaction); + + /* + * Close the file with serialized changes, and serialize information about + * subxacts for the toplevel transaction. + */ + subxact_info_write(MyLogicalRepWorker->subid, stream_xid); + stream_close_file(); + + /* We must be in a valid transaction state */ + Assert(IsTransactionState()); + + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + + in_streamed_transaction = false; + + /* Reset per-stream context */ + MemoryContextReset(LogicalStreamingContext); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle STREAM abort message. + */ +static void +apply_handle_stream_abort(StringInfo s) +{ + TransactionId xid; + TransactionId subxid; + + Assert(!in_streamed_transaction); + + logicalrep_read_stream_abort(s, &xid, &subxid); + + /* + * If the two XIDs are the same, it's in fact abort of toplevel xact, so + * just delete the files with serialized info. + */ + if (xid == subxid) + stream_cleanup_files(MyLogicalRepWorker->subid, xid, false); + else + { + /* + * OK, so it's a subxact. We need to read the subxact file for the + * toplevel transaction, determine the offset tracked for the subxact, + * and truncate the file with changes. We also remove the subxacts + * with higher offsets (or rather higher XIDs). + * + * We intentionally scan the array from the tail, because we're likely + * aborting a change for the most recent subtransactions. + * + * XXX Can we rely on the subxact XIDs arriving in sorted order? That + * would allow us to use binary search here. + * + * XXX Or perhaps we can rely on the aborts to arrive in the reverse + * order, i.e. from the inner-most subxact (when nested)? In which + * case we could simply check the last element. + */ + + int64 i; + int64 subidx; + BufFile *fd; + bool found = false; + char path[MAXPGPATH]; + StreamXidHash *ent; + + subidx = -1; + ensure_transaction(); + subxact_info_read(MyLogicalRepWorker->subid, xid); + + /* XXX optimize the search by bsearch on sorted data */ + for (i = nsubxacts; i > 0; i--) + { + if (subxacts[i - 1].xid == subxid) + { + subidx = (i - 1); + found = true; + break; + } + } + + /* + * If it's an empty sub-transaction then we will not find the subxid + * here so just cleanup the subxact info and return. + */ + if (!found) + { + /* Cleanup the subxact info */ + cleanup_subxact_info(); + CommitTransactionCommand(); + return; + } + + Assert((subidx >= 0) && (subidx < nsubxacts)); + + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + Assert(found); + + /* open the changes file */ + changes_filename(path, MyLogicalRepWorker->subid, xid); + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + + /* OK, truncate the file at the right offset */ + BufFileTruncateShared(fd, subxacts[subidx].fileno, subxacts[subidx].offset); + BufFileClose(fd); + + /* discard the subxacts added later */ + nsubxacts = subidx; + + /* write the updated subxact list */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + CommitTransactionCommand(); + } +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + StringInfoData s2; + int nchanges; + char path[MAXPGPATH]; + char *buffer = NULL; + bool found; + LogicalRepCommitData commit_data; + StreamXidHash *ent; + MemoryContext oldcxt; + BufFile *fd; + + Assert(!in_streamed_transaction); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + ensure_transaction(); + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + /* open the spool file for the committed transaction */ + changes_filename(path, MyLogicalRepWorker->subid, xid); + elog(DEBUG1, "replaying changes from file '%s'", path); + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + Assert(found); + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY); + + buffer = palloc(BLCKSZ); + initStringInfo(&s2); + + MemoryContextSwitchTo(oldcxt); + + remote_final_lsn = commit_data.commit_lsn; + + /* + * Make sure the handle apply_dispatch methods are aware we're in a remote + * transaction. + */ + in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); + + /* + * Read the entries one by one and pass them through the same logic as in + * apply_dispatch. + */ + nchanges = 0; + while (true) + { + int nbytes; + int len; + + /* read length of the on-disk record */ + nbytes = BufFileRead(fd, &len, sizeof(len)); + + /* have we reached end of the file? */ + if (nbytes == 0) + break; + + /* do we have a correct length? */ + if (nbytes != sizeof(len)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's changes file: %m"))); + + Assert(len > 0); + + /* make sure we have sufficiently large buffer */ + buffer = repalloc(buffer, len); + + /* and finally read the data into the buffer */ + if (BufFileRead(fd, buffer, len) != len) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's changes file: %m"))); + + /* copy the buffer to the stringinfo and call apply_dispatch */ + resetStringInfo(&s2); + appendBinaryStringInfo(&s2, buffer, len); + + /* Ensure we are reading the data into our memory context. */ + oldcxt = MemoryContextSwitchTo(ApplyMessageContext); + + apply_dispatch(&s2); + + MemoryContextReset(ApplyMessageContext); + + MemoryContextSwitchTo(oldcxt); + + nchanges++; + + if (nchanges % 1000 == 0) + elog(DEBUG1, "replayed %d changes from file '%s'", + nchanges, path); + } + + BufFileClose(fd); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = commit_data.end_lsn; + replorigin_session_origin_timestamp = commit_data.committime; + + pfree(buffer); + pfree(s2.data); + + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data.end_lsn); + + elog(DEBUG1, "replayed %d (all) changes from file '%s'", + nchanges, path); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid, false); +} + +/* * Handle RELATION message. * * Note we don't do validation against local schema here. The validation @@ -635,6 +1081,9 @@ apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; + if (handle_streamed_transaction('R', s)) + return; + rel = logicalrep_read_rel(s); logicalrep_relmap_update(rel); } @@ -650,6 +1099,9 @@ apply_handle_type(StringInfo s) { LogicalRepTyp typ; + if (handle_streamed_transaction('Y', s)) + return; + logicalrep_read_typ(s, &typ); logicalrep_typmap_update(&typ); } @@ -686,6 +1138,9 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; + if (handle_streamed_transaction('I', s)) + return; + ensure_transaction(); relid = logicalrep_read_insert(s, &newtup); @@ -801,6 +1256,9 @@ apply_handle_update(StringInfo s) RangeTblEntry *target_rte; MemoryContext oldctx; + if (handle_streamed_transaction('U', s)) + return; + ensure_transaction(); relid = logicalrep_read_update(s, &has_oldtup, &oldtup, @@ -950,6 +1408,9 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; + if (handle_streamed_transaction('D', s)) + return; + ensure_transaction(); relid = logicalrep_read_delete(s, &oldtup); @@ -1320,6 +1781,9 @@ apply_handle_truncate(StringInfo s) List *relids_logged = NIL; ListCell *lc; + if (handle_streamed_transaction('T', s)) + return; + ensure_transaction(); remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); @@ -1458,6 +1922,22 @@ apply_dispatch(StringInfo s) case 'O': apply_handle_origin(s); break; + /* STREAM START */ + case 'S': + apply_handle_stream_start(s); + break; + /* STREAM END */ + case 'E': + apply_handle_stream_stop(s); + break; + /* STREAM ABORT */ + case 'A': + apply_handle_stream_abort(s); + break; + /* STREAM COMMIT */ + case 'c': + apply_handle_stream_commit(s); + break; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1570,6 +2050,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "ApplyMessageContext", ALLOCSET_DEFAULT_SIZES); + /* + * This memory context used for per stream data when streaming mode is + * enabled. This context is reset on each stream stop. + */ + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); + /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -1674,7 +2162,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); - if (!in_remote_transaction) + if (!in_remote_transaction && !in_streamed_transaction) { /* * If we didn't get any transactions for a while there might be @@ -1947,6 +2435,20 @@ maybe_reread_subscription(void) proc_exit(0); } + /* + * Exit if streaming option is changed. The launcher will start new + * worker. + */ + if (newsub->stream != MySubscription->stream) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will " + "restart because subscription's streaming option were changed", + MySubscription->name))); + + proc_exit(0); + } + /* Check for other changes that should never happen too. */ if (newsub->dbid != MySubscription->dbid) { @@ -1979,6 +2481,439 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) MySubscriptionValid = false; } +/* + * subxact_info_write + * Store information about subxacts for a toplevel transaction. + * + * For each subxact we store offset of it's first change in the main file. + * The file is always over-written as a whole. + * + * XXX We should only store subxacts that were not aborted yet. + */ +static void +subxact_info_write(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + bool found; + Size len; + StreamXidHash *ent; + BufFile *fd; + + Assert(TransactionIdIsValid(xid)); + + subxact_filename(path, subid, xid); + + /* find the xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + /* we must found the entry for its top transaction by this time */ + Assert(found); + + /* + * If there is no subtransaction then nothing to do, but if already have + * subxact file then delete that. + */ + if (nsubxacts == 0) + { + if (ent->subxact_fileset) + { + cleanup_subxact_info(); + BufFileDeleteShared(ent->subxact_fileset, path); + pfree(ent->subxact_fileset); + ent->subxact_fileset = NULL; + } + + return; + } + + /* + * Create the subxact file if it not already created, otherwise open the + * existing file. + */ + if (ent->subxact_fileset == NULL) + { + MemoryContext oldctx; + + /* + * We need to maintain shared fileset across multiple stream start/stop + * calls. So, need to allocate it in a persistent context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + ent->subxact_fileset = palloc(sizeof(SharedFileSet)); + SharedFileSetInit(ent->subxact_fileset, NULL); + MemoryContextSwitchTo(oldctx); + + fd = BufFileCreateShared(ent->subxact_fileset, path); + } + else + fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR); + + len = sizeof(SubXactInfo) * nsubxacts; + + /* Write the subxact count and subxact info */ + BufFileWrite(fd, &nsubxacts, sizeof(nsubxacts)); + BufFileWrite(fd, subxacts, len); + + BufFileClose(fd); + + /* + * But we free the memory allocated for subxact info. There might be one + * exceptional transaction with many subxacts, and we don't want to keep + * the memory allocated forever. + */ + cleanup_subxact_info(); +} + +/* + * subxact_info_read + * Restore information about subxacts of a streamed transaction. + * + * Read information about subxacts into the global variables. + */ +static void +subxact_info_read(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + bool found; + Size len; + BufFile *fd; + StreamXidHash *ent; + MemoryContext oldctx; + + Assert(TransactionIdIsValid(xid)); + Assert(!subxacts); + Assert(nsubxacts == 0); + Assert(nsubxacts_max == 0); + + /* Find the stream xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + + /* + * If subxact_fileset is not valid that mean we don't have any subxact + * info + */ + if (ent->subxact_fileset == NULL) + return; + + subxact_filename(path, subid, xid); + + fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY); + + /* read number of subxact items */ + if (BufFileRead(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's subxact file \"%s\": %m", + path))); + + len = sizeof(SubXactInfo) * nsubxacts; + + /* we keep the maximum as a power of 2 */ + nsubxacts_max = 1 << my_log2(nsubxacts); + + /* + * Allocate subxact information in the logical streaming context. We need + * this information during the complete stream so that we can add the sub + * transaction info to this. On stream stop we will flush this + * information to the subxact file and reset the logical streaming + * context. + */ + oldctx = MemoryContextSwitchTo(LogicalStreamingContext); + subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + + if ((len > 0) && ((BufFileRead(fd, subxacts, len)) != len)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's subxact file \"%s\": %m", + path))); + + BufFileClose(fd); +} + +/* + * subxact_info_add + * Add information about a subxact (offset in the main file). + */ +static void +subxact_info_add(TransactionId xid) +{ + int64 i; + + /* We must have a valid top level stream xid and a stream fd. */ + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + /* + * If the XID matches the toplevel transaction, we don't want to add it. + */ + if (stream_xid == xid) + return; + + /* + * In most cases we're checking the same subxact as we've already seen in + * the last call, so make sure to ignore it (this change comes later). + */ + if (subxact_last == xid) + return; + + /* OK, remember we're processing this XID. */ + subxact_last = xid; + + /* + * Check if the transaction is already present in the array of subxact. We + * intentionally scan the array from the tail, because we're likely adding + * a change for the most recent subtransactions. + * + * XXX Can we rely on the subxact XIDs arriving in sorted order? That + * would allow us to use binary search here. + */ + for (i = nsubxacts; i > 0; i--) + { + /* found, so we're done */ + if (subxacts[i - 1].xid == xid) + return; + } + + /* This is a new subxact, so we need to add it to the array. */ + if (nsubxacts == 0) + { + MemoryContext oldctx; + + nsubxacts_max = 128; + + /* Allocate this in per-stream context */ + oldctx = MemoryContextSwitchTo(LogicalStreamingContext); + subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + } + else if (nsubxacts == nsubxacts_max) + { + nsubxacts_max *= 2; + subxacts = repalloc(subxacts, nsubxacts_max * sizeof(SubXactInfo)); + } + + subxacts[nsubxacts].xid = xid; + + /* + * Get the current offset of the stream file and store it as offset of + * this subxact. + */ + BufFileTell(stream_fd, &subxacts[nsubxacts].fileno, + &subxacts[nsubxacts].offset); + + nsubxacts++; +} + +/* format filename for file containing the info about subxacts */ +static void +subxact_filename(char *path, Oid subid, TransactionId xid) +{ + snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid); +} + +/* format filename for file containing serialized changes */ +static inline void +changes_filename(char *path, Oid subid, TransactionId xid) +{ + snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid); +} + +/* + * stream_cleanup_files + * Cleanup files for a subscription / toplevel transaction. + * + * Remove files with serialized changes and subxact info for a particular + * toplevel transaction. Each subscription has a separate set of files. + * + * Note: The files may not exists, so handle ENOENT as non-error. + * + * missing_ok - don't report error for missing file is the flag is passed true. + */ +static void +stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok) +{ + char path[MAXPGPATH]; + StreamXidHash *ent; + + /* Remove the xid entry from the stream xid hash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_REMOVE, + NULL); + /* By this time we must have created the transaction entry */ + Assert(ent != NULL); + + /* Delete the change file and release the stream fileset memory */ + changes_filename(path, subid, xid); + BufFileDeleteShared(ent->stream_fileset, path); + pfree(ent->stream_fileset); + + /* Delete the subxact file and release the memory, if it exist */ + if (ent->subxact_fileset) + { + subxact_filename(path, subid, xid); + BufFileDeleteShared(ent->subxact_fileset, path); + pfree(ent->subxact_fileset); + } +} + +/* + * stream_open_file + * Open file we'll use to serialize changes for a toplevel transaction. + * + * Open a file for streamed changes from a toplevel transaction identified + * by stream_xid (global variable). If it's the first chunk of streamed + * changes for this transaction, initialize the shared fileset and create the + * buf file, otherwise open the previously created file. + * + * This can only be called at the beginning of a "streaming" block, i.e. + * between stream_start/stream_stop messages from the upstream. + */ +static void +stream_open_file(Oid subid, TransactionId xid, bool first_segment) +{ + char path[MAXPGPATH]; + bool found; + MemoryContext oldcxt; + StreamXidHash *ent; + + Assert(in_streamed_transaction); + Assert(OidIsValid(subid)); + Assert(TransactionIdIsValid(xid)); + Assert(stream_fd == NULL); + + /* create or find the xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_ENTER | HASH_FIND, + &found); + Assert(first_segment || found); + changes_filename(path, subid, xid); + elog(DEBUG1, "opening file '%s' for streamed changes", path); + + /* + * Create/open the buffiles under the logical streaming context so that we + * have those files until stream stop. + */ + oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); + + /* + * If this is the first streamed segment, the file must not exist, so make + * sure we're the ones creating it. Otherwise just open the file for + * writing, in append mode. + */ + if (first_segment) + { + MemoryContext savectx; + SharedFileSet *fileset; + + /* + * We need to maintain shared fileset across multiple stream start/stop + * calls. So, need to allocate it in a persistent context. + */ + savectx = MemoryContextSwitchTo(ApplyContext); + fileset = palloc(sizeof(SharedFileSet)); + + SharedFileSetInit(fileset, NULL); + MemoryContextSwitchTo(savectx); + + stream_fd = BufFileCreateShared(fileset, path); + + /* Remember the fileset for the next stream of the same transaction */ + ent->xid = xid; + ent->stream_fileset = fileset; + ent->subxact_fileset = NULL; + } + else + { + /* + * Open the file and seek to the end of the file because we always + * append the changes file. + */ + stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + BufFileSeek(stream_fd, 0, 0, SEEK_END); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * stream_close_file + * Close the currently open file with streamed changes. + * + * This can only be called at the end of a streaming block, i.e. at stream_stop + * message from the upstream. + */ +static void +stream_close_file(void) +{ + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + BufFileClose(stream_fd); + + stream_xid = InvalidTransactionId; + stream_fd = NULL; +} + +/* + * stream_write_change + * Serialize a change to a file for the current toplevel transaction. + * + * The change is serialized in a simple format, with length (not including + * the length), action code (identifying the message type) and message + * contents (without the subxact TransactionId value). + * + * XXX The subxact file includes CRC32C of the contents. Maybe we should + * include something like that here too, but doing so will not be as + * straighforward, because we write the file in chunks. + */ +static void +stream_write_change(char action, StringInfo s) +{ + int len; + + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + /* total on-disk size, including the action type character */ + len = (s->len - s->cursor) + sizeof(char); + + /* first write the size */ + BufFileWrite(stream_fd, &len, sizeof(len)); + + /* then the action */ + BufFileWrite(stream_fd, &action, sizeof(action)); + + /* and finally the remaining part of the buffer (after the XID) */ + len = (s->len - s->cursor); + + BufFileWrite(stream_fd, &s->data[s->cursor], len); +} + +/* + * Cleanup the memory for subxacts and reset the related variables. + */ +static inline void +cleanup_subxact_info() +{ + if (subxacts) + pfree(subxacts); + + subxacts = NULL; + subxact_last = InvalidTransactionId; + nsubxacts = 0; + nsubxacts_max = 0; +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -2145,6 +3080,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; + options.proto.logical.streaming = MySubscription->stream; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 81ef7dc..3360bd5 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -47,29 +47,57 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); static bool publications_valid; +static bool in_streaming; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); +static void send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. * + * The schema_sent flag determines if the current schema record was already + * sent to the subscriber (in which case we don't need to send it again). + * + * The schema cache on downstream is however updated only at commit time, + * and with streamed transactions the commit order may be different from + * the order the transactions are sent in. Also, the (sub) transactions + * might get aborted so we need to send the schema for each (sub) transaction + * so that we don't loose the schema information on abort. For handling this, + * we maintain the list of xids (streamed_txns) for those we have already sent + * the schema. + * * For partitions, 'pubactions' considers not only the table's own * publications, but also those of all of its ancestors. */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ - + TransactionId xid; /* transaction that created the record */ /* * Did we send the schema? If ancestor relid is set, its schema must also * have been sent for this to be true. */ bool schema_sent; + List *streamed_txns; /* streamed toplevel transactions with this + * schema */ bool replicate_valid; PublicationActions pubactions; @@ -95,11 +123,17 @@ typedef struct RelationSyncEntry static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); +static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); +static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); +static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); + /* * Specify output plugin callbacks */ @@ -115,16 +149,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; + + /* transaction streaming */ + cb->stream_change_cb = pgoutput_change; + cb->stream_truncate_cb = pgoutput_truncate; + cb->stream_abort_cb = pgoutput_stream_abort; + cb->stream_commit_cb = pgoutput_stream_commit; + cb->stream_start_cb = pgoutput_stream_start; + cb->stream_stop_cb = pgoutput_stream_stop; } static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names, bool *binary) + List **publication_names, bool *binary, + bool *enable_streaming) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool streaming_given = false; *binary = false; @@ -182,6 +226,23 @@ parse_output_parameters(List *options, uint32 *protocol_version, *binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "streaming") == 0) + { + if (streaming_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + streaming_given = true; + + /* the value must be on/off */ + if (strcmp(strVal(defel->arg), "on") && strcmp(strVal(defel->arg), "off")) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid streaming value"))); + + /* enable streaming if it's 'on' */ + *enable_streaming = (strcmp(strVal(defel->arg), "on") == 0); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -194,6 +255,7 @@ static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { + bool enable_streaming = false; PGOutputData *data = palloc0(sizeof(PGOutputData)); /* Create our memory context for private allocations. */ @@ -217,7 +279,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, &data->publication_names, - &data->binary); + &data->binary, + &enable_streaming); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) @@ -237,6 +300,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("publication_names parameter missing"))); + /* + * Decide whether to enable streaming. It is disabled by default, in + * which case we just update the flag in decoding context. Otherwise + * we only allow it with sufficient version of the protocol, and when + * the output plugin supports it. + */ + if (!enable_streaming) + ctx->streaming = false; + else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support streaming, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM))); + else if (!ctx->streaming) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("streaming requested, but not supported by output plugin"))); + + /* Also remember we're currently not streaming any transaction. */ + in_streaming = false; + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -247,6 +331,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Initialize relation schema cache. */ init_rel_sync_cache(CacheMemoryContext); } + else + { + /* Disable the streaming during the slot initialization mode. */ + ctx->streaming = false; + } } /* @@ -305,9 +394,41 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static void maybe_send_schema(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry) { - if (relentry->schema_sent) + bool schema_sent; + TransactionId xid = InvalidTransactionId; + TransactionId topxid = InvalidTransactionId; + + /* + * Remember XID of the (sub)transaction for the change. We don't care if + * it's top-level transaction or not (we have already sent that XID in + * start the current streaming block). + * + * If we're not in a streaming block, just use InvalidTransactionId and + * the write methods will not include it. + */ + if (in_streaming) + xid = change->txn->xid; + + if (change->txn->toptxn) + topxid = change->txn->toptxn->xid; + else + topxid = xid; + + /* + * Do we need to send the schema? We do track streamed transactions + * separately, because those may not be applied later (and the regular + * transactions won't see their effects until then) and in an order + * that we don't know at this point. + */ + if (in_streaming) + schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); + else + schema_sent = relentry->schema_sent; + + if (schema_sent) return; /* If needed, send the ancestor's schema first. */ @@ -323,19 +444,25 @@ maybe_send_schema(LogicalDecodingContext *ctx, relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc), CreateTupleDescCopy(outdesc)); MemoryContextSwitchTo(oldctx); - send_relation_and_attrs(ancestor, ctx); + send_relation_and_attrs(ancestor, xid, ctx); RelationClose(ancestor); } - send_relation_and_attrs(relation, ctx); - relentry->schema_sent = true; + send_relation_and_attrs(relation, xid, ctx); + relentry->xid = change->txn->xid; + + if (in_streaming) + set_schema_sent_in_streamed_txn(relentry, topxid); + else + relentry->schema_sent = true; } /* * Sends a relation */ static void -send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) +send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -359,17 +486,19 @@ send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) continue; OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); + logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); + logicalrep_write_rel(ctx->out, xid, relation); OutputPluginWrite(ctx, false); } /* * Sends the decoded DML over wire. + * + * XXX May be called both in streaming and non-streaming modes. */ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -378,6 +507,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; + TransactionId xid = InvalidTransactionId; + + if (in_streaming) + xid = change->txn->xid; if (!is_publishable_relation(relation)) return; @@ -406,7 +539,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - maybe_send_schema(ctx, relation, relentry); + maybe_send_schema(ctx, txn, change, relation, relentry); /* Send the data */ switch (change->action) @@ -426,7 +559,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, tuple, + logicalrep_write_insert(ctx->out, xid, relation, tuple, data->binary); OutputPluginWrite(ctx, true); break; @@ -451,8 +584,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, newtuple, - data->binary); + logicalrep_write_update(ctx->out, xid, relation, oldtuple, + newtuple, data->binary); OutputPluginWrite(ctx, true); break; } @@ -472,7 +605,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, oldtuple, + logicalrep_write_delete(ctx->out, xid, relation, oldtuple, data->binary); OutputPluginWrite(ctx, true); } @@ -498,6 +631,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int i; int nrelids; Oid *relids; + TransactionId xid = InvalidTransactionId; + + if (in_streaming) + xid = change->txn->xid; old = MemoryContextSwitchTo(data->context); @@ -526,13 +663,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, continue; relids[nrelids++] = relid; - maybe_send_schema(ctx, relation, relentry); + maybe_send_schema(ctx, txn, change, relation, relentry); } if (nrelids > 0) { OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, + xid, nrelids, relids, change->data.truncate.cascade, @@ -606,6 +744,113 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) } /* + * Notify downstream to discard the streamed transaction (along with all + * it's subtransactions, if it's a toplevel transaction). + */ +static void +pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + ReorderBufferTXN *toptxn; + + /* + * The abort should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + + /* determine the toplevel transaction */ + toptxn = (txn->toptxn) ? txn->toptxn : txn; + + Assert(rbtxn_is_streamed(toptxn)); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(toptxn->xid, false); +} + +/* + * Notify downstream to apply the streamed transaction (along with all + * it's subtransactions). + */ +static void +pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + /* + * The commit should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(txn->xid, true); +} + + +static void +pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + + /* we can't nest streaming of transactions */ + Assert(!in_streaming); + + /* + * If we already sent the first stream for this transaction then don't + * send the origin id in the subsequent streams. + */ + if (rbtxn_is_streamed(txn)) + send_replication_origin = false; + + OutputPluginPrepareWrite(ctx, !send_replication_origin); + logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); + + if (send_replication_origin) + { + char *origin; + + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); + + if (replorigin_by_oid(txn->origin_id, true, &origin)) + logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr); + } + + OutputPluginWrite(ctx, true); + + /* we're streaming a chunk of transaction now */ + in_streaming = true; +} + +static void +pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + /* we should be streaming a trasanction */ + Assert(in_streaming); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_stop(ctx->out); + OutputPluginWrite(ctx, true); + + /* we've stopped streaming a transaction */ + in_streaming = false; +} + +/* * Initialize the relation schema sync cache for a decoding session. * * The hash table is destroyed at the end of a decoding session. While @@ -642,6 +887,38 @@ init_rel_sync_cache(MemoryContext cachectx) } /* + * We expect relatively small number of streamed transactions. + */ +static bool +get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + ListCell *lc; + foreach (lc, entry->streamed_txns) + { + if (xid == lfirst_int(lc)) + return true; + } + + return false; +} + +/* + * Add the xid in the rel sync entry for which we have already sent the schema + * of the relation. + */ +static void +set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + entry->streamed_txns = lappend_int(entry->streamed_txns, xid); + + MemoryContextSwitchTo(oldctx); +} + +/* * Find or create entry in the relation schema cache. * * This looks up publications that the given relation is directly or @@ -771,12 +1048,45 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } if (!found) + { entry->schema_sent = false; + entry->streamed_txns = NULL; + } return entry; } /* + * Cleanup list of streamed transactions and update the schema_sent flag. + * + * When a streamed transaction commits or aborts, we need to remove the + * toplevel XID from the schema cache. If the transaction aborted, the + * subscriber will simply throw away the schema records we streamed, so + * we don't need to do anything else. + * + * If the transaction committed, the subscriber will update the relation + * cache - so tweak the schema_sent flag accordingly. + */ +static void +cleanup_rel_sync_cache(TransactionId xid, bool is_commit) +{ + HASH_SEQ_STATUS hash_seq; + RelationSyncEntry *entry; + + Assert(RelationSyncCache != NULL); + + hash_seq_init(&hash_seq, RelationSyncCache); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + if (is_commit) + entry->schema_sent = true; + + /* Remove the xid from the schema sent list. */ + entry->streamed_txns = list_delete_int(entry->streamed_txns, xid); + } +} + +/* * Relcache invalidation callback */ static void @@ -811,7 +1121,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) * Reset schema sent status as the relation definition may have changed. */ if (entry != NULL) + { entry->schema_sent = false; + list_free(entry->streamed_txns); + entry->streamed_txns = NULL; + } } /* diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 9795c35..1d09154 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -51,6 +51,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subbinary; /* True if the subscription wants the * publisher to send data in binary */ + bool substream; /* Stream in-progress transactions. */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -78,6 +80,7 @@ typedef struct Subscription bool enabled; /* Indicates if the subscription is enabled */ bool binary; /* Indicates if the subscription wants data in * binary format */ + bool stream; /* Allow streaming in-progress transactions. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 807a9c1..0dfbac4 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -982,7 +982,11 @@ typedef enum WAIT_EVENT_WAL_READ, WAIT_EVENT_WAL_SYNC, WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN, - WAIT_EVENT_WAL_WRITE + WAIT_EVENT_WAL_WRITE, + WAIT_EVENT_LOGICAL_CHANGES_READ, + WAIT_EVENT_LOGICAL_CHANGES_WRITE, + WAIT_EVENT_LOGICAL_SUBXACT_READ, + WAIT_EVENT_LOGICAL_SUBXACT_WRITE } WaitEventIO; /* ---------- diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 60a76bc..655144d 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -23,9 +23,13 @@ * we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we * have backwards compatibility for. The client requests protocol version at * connect time. + * + * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with + * support for streaming large transactions. */ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 -#define LOGICALREP_PROTO_VERSION_NUM 1 +#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 +#define LOGICALREP_PROTO_VERSION_NUM 2 /* * This struct stores a tuple received via logical replication. @@ -98,25 +102,49 @@ extern void logicalrep_read_commit(StringInfo in, extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); -extern void logicalrep_write_insert(StringInfo out, Relation rel, - HeapTuple newtuple, bool binary); +extern void logicalrep_write_insert(StringInfo out, TransactionId xid, + Relation rel, HeapTuple newtuple, + bool binary); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); -extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, +extern void logicalrep_write_update(StringInfo out, TransactionId xid, + Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); -extern void logicalrep_write_delete(StringInfo out, Relation rel, - HeapTuple oldtuple, bool binary); +extern void logicalrep_write_delete(StringInfo out, TransactionId xid, + Relation rel, HeapTuple oldtuple, + bool binary); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); -extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], +extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, + int nrelids, Oid relids[], bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); -extern void logicalrep_write_rel(StringInfo out, Relation rel); +extern void logicalrep_write_rel(StringInfo out, TransactionId xid, + Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); -extern void logicalrep_write_typ(StringInfo out, Oid typoid); +extern void logicalrep_write_typ(StringInfo out, TransactionId xid, + Oid typoid); extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp); +extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid, + bool first_segment); +extern TransactionId logicalrep_read_stream_start(StringInfo in, + bool *first_segment); + +extern void logicalrep_write_stream_stop(StringInfo out); +extern TransactionId logicalrep_read_stream_stop(StringInfo in); + +extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern TransactionId logicalrep_read_stream_commit(StringInfo out, + LogicalRepCommitData *commit_data); + +extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, + TransactionId subxid); +extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, + TransactionId *subxid); + #endif /* LOGICAL_PROTO_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c2d5dbe..6c0a4e3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -178,6 +178,7 @@ typedef struct uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ bool binary; /* Ask publisher to use binary */ + bool streaming; /* Streaming of large transactions */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/subscription/t/009_stream_simple.pl b/src/test/subscription/t/009_stream_simple.pl new file mode 100644 index 0000000..2f01133 --- /dev/null +++ b/src/test/subscription/t/009_stream_simple.pl @@ -0,0 +1,86 @@ +# Test streaming of simple large transaction +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/010_stream_subxact.pl b/src/test/subscription/t/010_stream_subxact.pl new file mode 100644 index 0000000..d2ae385 --- /dev/null +++ b/src/test/subscription/t/010_stream_subxact.pl @@ -0,0 +1,102 @@ +# Test streaming of large transaction containing large subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rowsto exceed 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s4; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(1667|1667|1667), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(1667|1667|1667), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/011_stream_ddl.pl b/src/test/subscription/t/011_stream_ddl.pl new file mode 100644 index 0000000..0da39a1 --- /dev/null +++ b/src/test/subscription/t/011_stream_ddl.pl @@ -0,0 +1,95 @@ +# Test streaming of large transaction with DDL and subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|0|0), 'check initial data was copied to subscriber'); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (3, md5(3::text)); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (4, md5(4::text), -4); +COMMIT; +}); + +# large (streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i); +COMMIT; +}); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e) FROM test_tab"); +is($result, qq(2002|1999|1002|1), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/012_stream_subxact_abort.pl b/src/test/subscription/t/012_stream_subxact_abort.pl new file mode 100644 index 0000000..402df30 --- /dev/null +++ b/src/test/subscription/t/012_stream_subxact_abort.pl @@ -0,0 +1,82 @@ +# Test streaming of large transaction containing multiple subtransactions and rollbacks +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i); +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i); +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i); +ROLLBACK TO s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i); +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(1000|0), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/013_stream_subxact_ddl_abort.pl b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl new file mode 100644 index 0000000..becbdd0 --- /dev/null +++ b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl @@ -0,0 +1,84 @@ +# Test behavior with streaming transaction exceeding logical_decoding_work_mem +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i); +ALTER TABLE test_tab DROP COLUMN c; +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(1000|500), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/015_stream_binary.pl b/src/test/subscription/t/015_stream_binary.pl new file mode 100644 index 0000000..fa2362e --- /dev/null +++ b/src/test/subscription/t/015_stream_binary.pl @@ -0,0 +1,86 @@ +# Test streaming of simple large transaction +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on, binary = true)" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; -- 1.8.3.1 v45/v45-0004-Enable-streaming-for-all-subscription-TAP-tests.patch0000664000175000017500000003534213712204536025625 0ustar dilipkumardilipkumarFrom 40dc36eda7c95d901d0a532c482fc90c26d5219e Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Wed, 20 Nov 2019 16:41:13 +0530 Subject: [PATCH v45 4/6] Enable streaming for all subscription TAP tests --- src/test/subscription/t/001_rep_changes.pl | 2 +- src/test/subscription/t/002_types.pl | 2 +- src/test/subscription/t/003_constraints.pl | 2 +- src/test/subscription/t/004_sync.pl | 8 ++++---- src/test/subscription/t/005_encoding.pl | 2 +- src/test/subscription/t/006_rewrite.pl | 2 +- src/test/subscription/t/007_ddl.pl | 2 +- src/test/subscription/t/008_diff_schema.pl | 2 +- src/test/subscription/t/009_matviews.pl | 2 +- src/test/subscription/t/009_stream_simple.pl | 2 +- src/test/subscription/t/010_stream_subxact.pl | 2 +- src/test/subscription/t/010_truncate.pl | 6 +++--- src/test/subscription/t/011_generated.pl | 2 +- src/test/subscription/t/011_stream_ddl.pl | 2 +- src/test/subscription/t/012_collation.pl | 2 +- src/test/subscription/t/012_stream_subxact_abort.pl | 2 +- src/test/subscription/t/013_stream_subxact_ddl_abort.pl | 2 +- src/test/subscription/t/100_bugs.pl | 2 +- 18 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 3f8318f..6f7bedc 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -78,7 +78,7 @@ $node_publisher->safe_psql('postgres', "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only WITH (streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl index aedcab2..94c71f8 100644 --- a/src/test/subscription/t/002_types.pl +++ b/src/test/subscription/t/002_types.pl @@ -108,7 +108,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot, streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl index 9f140b5..21410fa 100644 --- a/src/test/subscription/t/003_constraints.pl +++ b/src/test/subscription/t/003_constraints.pl @@ -35,7 +35,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false, streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index e111ab9..a6fae9c 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -33,7 +33,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); @@ -56,7 +56,7 @@ $node_publisher->safe_psql('postgres', # recreate the subscription, it will try to do initial copy $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on)" ); # but it will be stuck on data copy as it will fail on constraint @@ -78,7 +78,7 @@ is($result, qq(20), 'initial data synced for second sub'); # now check another subscription for the same node pair $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" + "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false, streaming = on)" ); # wait for it to start @@ -100,7 +100,7 @@ $node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); # recreate the subscription again $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on)" ); # and wait for data sync to finish again diff --git a/src/test/subscription/t/005_encoding.pl b/src/test/subscription/t/005_encoding.pl index aec7a17..202871a 100644 --- a/src/test/subscription/t/005_encoding.pl +++ b/src/test/subscription/t/005_encoding.pl @@ -26,7 +26,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub WITH (streaming = on);" ); $node_publisher->wait_for_catchup('mysub'); diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl index c6cda10..70c86b2 100644 --- a/src/test/subscription/t/006_rewrite.pl +++ b/src/test/subscription/t/006_rewrite.pl @@ -22,7 +22,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub WITH (streaming = on);" ); $node_publisher->wait_for_catchup('mysub'); diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl index 7fe6cc6..f9c8d1d 100644 --- a/src/test/subscription/t/007_ddl.pl +++ b/src/test/subscription/t/007_ddl.pl @@ -22,7 +22,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub WITH (streaming = on);" ); $node_publisher->wait_for_catchup('mysub'); diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl index 963334e..cdf9b8e 100644 --- a/src/test/subscription/t/008_diff_schema.pl +++ b/src/test/subscription/t/008_diff_schema.pl @@ -32,7 +32,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on)" ); $node_publisher->wait_for_catchup('tap_sub'); diff --git a/src/test/subscription/t/009_matviews.pl b/src/test/subscription/t/009_matviews.pl index 7afc7bd..21f50c7 100644 --- a/src/test/subscription/t/009_matviews.pl +++ b/src/test/subscription/t/009_matviews.pl @@ -18,7 +18,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR ALL TABLES;"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub WITH (streaming = on);" ); $node_publisher->safe_psql('postgres', diff --git a/src/test/subscription/t/009_stream_simple.pl b/src/test/subscription/t/009_stream_simple.pl index 2f01133..30561d8 100644 --- a/src/test/subscription/t/009_stream_simple.pl +++ b/src/test/subscription/t/009_stream_simple.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/010_stream_subxact.pl b/src/test/subscription/t/010_stream_subxact.pl index d2ae385..9a6bac6 100644 --- a/src/test/subscription/t/010_stream_subxact.pl +++ b/src/test/subscription/t/010_stream_subxact.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index be2c0bd..ed56fbf 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -52,13 +52,13 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (streaming = on)" ); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2" + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (streaming = on)" ); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3" + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3 WITH (streaming = on)" ); # Wait for initial sync of all subscriptions diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl index f35d1cb..4df1dde 100644 --- a/src/test/subscription/t/011_generated.pl +++ b/src/test/subscription/t/011_generated.pl @@ -33,7 +33,7 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (streaming = on)" ); # Wait for initial sync of all subscriptions diff --git a/src/test/subscription/t/011_stream_ddl.pl b/src/test/subscription/t/011_stream_ddl.pl index 0da39a1..c3caff6 100644 --- a/src/test/subscription/t/011_stream_ddl.pl +++ b/src/test/subscription/t/011_stream_ddl.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/012_collation.pl b/src/test/subscription/t/012_collation.pl index 4bfcef7..c62eb52 100644 --- a/src/test/subscription/t/012_collation.pl +++ b/src/test/subscription/t/012_collation.pl @@ -80,7 +80,7 @@ $node_publisher->safe_psql('postgres', q{CREATE PUBLICATION pub1 FOR ALL TABLES}); $node_subscriber->safe_psql('postgres', - qq{CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (copy_data = false)} + qq{CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (copy_data = false, streaming = on)} ); $node_publisher->wait_for_catchup('sub1'); diff --git a/src/test/subscription/t/012_stream_subxact_abort.pl b/src/test/subscription/t/012_stream_subxact_abort.pl index 402df30..2be7542 100644 --- a/src/test/subscription/t/012_stream_subxact_abort.pl +++ b/src/test/subscription/t/012_stream_subxact_abort.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/013_stream_subxact_ddl_abort.pl b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl index becbdd0..2da9607 100644 --- a/src/test/subscription/t/013_stream_subxact_ddl_abort.pl +++ b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl @@ -40,7 +40,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tes my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', -"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); wait_for_caught_up($node_publisher, $appname); diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index 366a7a9..96ffc09 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -53,7 +53,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR ALL TABLES"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (streaming = on)" ); $node_publisher->wait_for_catchup('sub1'); -- 1.8.3.1 v45/v45-0005-Add-TAP-test-for-streaming-vs.-DDL.patch0000664000175000017500000001062113712204536022553 0ustar dilipkumardilipkumarFrom 679779484fcc8ff93c26a27131dd79af6ad4b997 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 26 Sep 2019 19:15:35 +0200 Subject: [PATCH v45 5/6] Add TAP test for streaming vs. DDL --- src/test/subscription/t/014_stream_through_ddl.pl | 98 +++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 src/test/subscription/t/014_stream_through_ddl.pl diff --git a/src/test/subscription/t/014_stream_through_ddl.pl b/src/test/subscription/t/014_stream_through_ddl.pl new file mode 100644 index 0000000..b8d78b1 --- /dev/null +++ b/src/test/subscription/t/014_stream_through_ddl.pl @@ -0,0 +1,98 @@ +# Test streaming of large transaction with DDL, subtransactions and rollbacks. +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d text, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming=true)" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d) FROM test_tab"); +is($result, qq(2|0|0), 'check initial data was copied to subscriber'); + + +# large (streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 1000) s(i); +SAVEPOINT s2; +ALTER TABLE test_tab ADD COLUMN c INT; +INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(1001, 2000) s(i); +SAVEPOINT s3; +ALTER TABLE test_tab ADD COLUMN d text; +SAVEPOINT s4; +SAVEPOINT s5; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text) FROM generate_series(2001, 3000) s(i); +ALTER TABLE test_tab ADD COLUMN e INT; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(3001, 4000) s(i); +SAVEPOINT s10; +ALTER TABLE test_tab DROP d; +INSERT INTO test_tab SELECT i, md5(i::text), i, i FROM generate_series(4001, 5000) s(i); +ALTER TABLE test_tab ADD COLUMN d text; +ROLLBACK TO SAVEPOINT s10; +RELEASE SAVEPOINT s10; +SAVEPOINT s10; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(5001, 6000) s(i); +SAVEPOINT s6; +ALTER TABLE test_tab DROP d; +INSERT INTO test_tab SELECT i, md5(i::text), i, i FROM generate_series(6001, 7000) s(i); +SAVEPOINT s7; +ALTER TABLE test_tab ADD COLUMN d text; +INSERT INTO test_tab (a, b, c, d, e) SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(7001, 8000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(a), count(b), count(c), count(d), count(e) FROM test_tab"); +is($result, qq(7000|7000|7000|6000|4000|4000), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; -- 1.8.3.1 v45/v45-0006-Add-streaming-option-in-pg_dump.patch0000664000175000017500000000524713712204536022637 0ustar dilipkumardilipkumarFrom eaa67a5ffc290792e93547395bad72ca90969e90 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 20 Jul 2020 11:09:18 +0530 Subject: [PATCH v45 6/6] Add streaming option in pg_dump --- src/bin/pg_dump/pg_dump.c | 17 +++++++++++++++-- src/bin/pg_dump/pg_dump.h | 1 + 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 94459b3..f69d64c 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4201,6 +4201,7 @@ getSubscriptions(Archive *fout) int i_oid; int i_subname; int i_rolname; + int i_substream; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4240,10 +4241,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 140000) appendPQExpBuffer(query, - " s.subbinary\n"); + " s.subbinary,\n"); else appendPQExpBuffer(query, - " false AS subbinary\n"); + " false AS subbinary,\n"); + + if (fout->remoteVersion >= 140000) + appendPQExpBuffer(query, + " s.substream\n"); + else + appendPQExpBuffer(query, + " false AS substream\n"); appendPQExpBuffer(query, "FROM pg_subscription s\n" @@ -4263,6 +4271,7 @@ getSubscriptions(Archive *fout) i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); i_subbinary = PQfnumber(res, "subbinary"); + i_substream = PQfnumber(res, "substream"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4286,6 +4295,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subpublications)); subinfo[i].subbinary = pg_strdup(PQgetvalue(res, i, i_subbinary)); + subinfo[i].substream = + pg_strdup(PQgetvalue(res, i, i_substream)); if (strlen(subinfo[i].rolname) == 0) pg_log_warning("owner of subscription \"%s\" appears to be invalid", @@ -4357,6 +4368,8 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) if (strcmp(subinfo->subbinary, "t") == 0) appendPQExpBuffer(query, ", binary = true"); + if (strcmp(subinfo->substream, "f") != 0) + appendPQExpBuffer(query, ", streaming = on"); if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index da97b73..cc10c7c 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -626,6 +626,7 @@ typedef struct _SubscriptionInfo char *subconninfo; char *subslotname; char *subbinary; + char *substream; char *subsynccommit; char *subpublications; } SubscriptionInfo; -- 1.8.3.1