diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c7f1877..8c147bd 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -82,6 +82,8 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; +bool always_stream_logical = true; + /* * CheckXidAlive is a xid value pointing to a possibly ongoing (sub) * transaction.  Currently, it is used in logical decoding.  It's possible diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2ceb192..97aed74 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3105,9 +3105,12 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) } - Assert(largest); - Assert(largest->size > 0); - Assert(largest->size <= rb->size); + if (!always_stream_logical) + { + Assert(largest); + Assert(largest->size > 0); + Assert(largest->size <= rb->size); + } return largest; } @@ -3130,8 +3133,22 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) ReorderBufferTXN *txn; /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) + if (!always_stream_logical && rb->size < logical_decoding_work_mem * 1024L) + return; + + /* If GUC set to always stream, then stream everything */ + if (always_stream_logical) + { + while ((txn = ReorderBufferLargestTopTXN(rb)) != NULL) + { + ReorderBufferStreamTXN(rb, txn); + elog(DEBUG2, "initiate stream for changes in XID %u", + txn->xid); + + } return; + } + /* * Loop until we reach under the memory limit. One might think that just @@ -3476,6 +3493,10 @@ ReorderBufferCanStream(ReorderBuffer *rb) { LogicalDecodingContext *ctx = rb->private_data; + /* force streaming on logical replication if guc set */ + if (always_stream_logical) + ctx->streaming = true; + return ctx->streaming; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 3a802d8..8f5144d 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2041,6 +2041,15 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"always_stream_logical", PGC_USERSET, REPLICATION_MASTER, + gettext_noop("Always stream during logical replication, do not spill to disk."), + }, + &always_stream_logical, + true, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 5f767eb..f99d9c7 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -65,6 +65,9 @@ extern bool xact_is_sampled; extern bool DefaultXactDeferrable; extern bool XactDeferrable; +/* to turn on forced streaming of logical replication */ +extern bool always_stream_logical; + typedef enum { SYNCHRONOUS_COMMIT_OFF, /* asynchronous commit */