From e04f9eb82abddb7d1fbb07cb352f90770cb9d1c4 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 28 Feb 2020 11:07:46 +0530 Subject: [PATCH v20 10/12] Bugfix handling of incomplete toast tuple --- src/backend/access/heap/heapam.c | 3 + src/backend/replication/logical/decode.c | 17 +- .../replication/logical/reorderbuffer.c | 193 +++++++++++------- src/include/access/heapam_xlog.h | 1 + src/include/replication/reorderbuffer.h | 24 ++- 5 files changed, 158 insertions(+), 80 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 8371ec6e81..8028820d0e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1955,6 +1955,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, { xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; bufflags |= REGBUF_KEEP_DATA; + + if (IsToastRelation(relation)) + xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION; } XLogBeginInsert(); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 69c1f45ef6..c841687c66 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -727,7 +727,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, + xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); } /* @@ -794,7 +796,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -851,7 +854,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } /* @@ -887,7 +891,7 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) memcpy(change->data.truncate.relids, xlrec->relids, xlrec->nrelids * sizeof(Oid)); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); } /* @@ -987,7 +991,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = false; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change); + buf->origptr, change, false); /* move to the next xl_multi_insert_tuple entry */ data += datalen; @@ -1025,7 +1029,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index d80ad04363..c7c2aaf0c1 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -178,6 +178,11 @@ typedef struct ReorderBufferDiskChange /* data follows */ } ReorderBufferDiskChange; +#define ChangeIsInsertOrUpdate(action) \ + (((action) == REORDER_BUFFER_CHANGE_INSERT) || \ + ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) + /* * Maximum number of changes kept in memory, per transaction. After that, * changes are spooled to disk. @@ -654,11 +659,14 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, */ void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, - ReorderBufferChange *change) + ReorderBufferChange *change, bool toast_insert) { - ReorderBufferTXN *txn; + ReorderBufferTXN *txn, *toptxn; + bool can_stream = false; - txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + toptxn = txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + if (txn->toptxn != NULL) + toptxn = txn->toptxn; change->lsn = lsn; change->txn = txn; @@ -668,9 +676,49 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn->nentries++; txn->nentries_mem++; + /* + * If this is a toast insert then set the corresponding bit. Otherwise, if + * we have toast insert bit set and this is insert/update then clear the + * bit. + */ + if (toast_insert) + toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT; + else if (rbtxn_has_toast_insert(txn) && + ChangeIsInsertOrUpdate(change->action)) + { + toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT; + can_stream = true; + } + + /* + * If this is a speculative insert then set the corresponding bit. + * Otherwise, if we have speculative insert bit set and this is spec + * confirm record then clear the bit. + */ + if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) + toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT; + else if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) + { + toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT; + can_stream = true; + } + /* update memory accounting information */ ReorderBufferChangeMemoryUpdate(rb, change, true); + /* + * If streaming is enable and we have serialized this transaction because + * it had incomplete tuple. So if now we have got the complete tuple we + * can stream it. + */ + if (ReorderBufferCanStream(rb) && can_stream && rbtxn_is_serialized(toptxn) + && !(rbtxn_has_toast_insert(txn)) && !(rbtxn_has_spec_insert(txn))) + { + ReorderBufferStreamTXN(rb, toptxn); + Assert(txn->size == 0); + Assert(txn->nentries_mem == 0); + } + /* check the memory limits and evict something if needed */ ReorderBufferCheckMemoryLimit(rb); } @@ -700,7 +748,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, change->data.msg.message = palloc(message_size); memcpy(change->data.msg.message, message, message_size); - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); MemoryContextSwitchTo(oldcontext); } @@ -1476,6 +1524,13 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->tuplecid_hash = NULL; } + /* remove entries spilled to disk */ + if (rbtxn_is_serialized(txn)) + { + ReorderBufferRestoreCleanup(rb, txn); + txn->txn_flags &= ~RBTXN_IS_SERIALIZED; + } + /* also reset the number of entries in the transaction */ txn->nentries_mem = 0; txn->nentries = 0; @@ -1865,8 +1920,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * disk. */ dlist_delete(&change->node); - ReorderBufferToastAppendChunk(rb, txn, relation, - change); + ReorderBufferToastAppendChunk(rb, txn, relation, + change); } change_done: @@ -2463,7 +2518,7 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, change->data.snapshot = snap; change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* @@ -2512,7 +2567,7 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, change->data.command_id = cid; change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); } /* @@ -2535,6 +2590,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { Size sz; ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn = NULL; Assert(change->txn); @@ -2549,8 +2605,13 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, txn = change->txn; /* if subxact, and streaming supported, use the toplevel instead */ - if (txn->toptxn && ReorderBufferCanStream(rb)) - txn = txn->toptxn; + if (ReorderBufferCanStream(rb)) + { + if (txn->toptxn) + toptxn = txn->toptxn; + else + toptxn = txn; + } sz = ReorderBufferChangeSize(change); @@ -2558,12 +2619,20 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { txn->size += sz; rb->size += sz; + + /* Update the total size in the top transaction. */ + if (toptxn) + toptxn->total_size += sz; } else { Assert((rb->size >= sz) && (txn->size >= sz)); txn->size -= sz; rb->size -= sz; + + /* Update the total size in the top transaction. */ + if (toptxn) + toptxn->total_size -= sz; } Assert(txn->size <= rb->size); @@ -2624,7 +2693,7 @@ ReorderBufferAddInvalidation(ReorderBuffer *rb, TransactionId xid, memcpy(change->data.inval.invalidations, msgs, sizeof(SharedInvalidationMessage) * nmsgs); - ReorderBufferQueueChange(rb, xid, lsn, change); + ReorderBufferQueueChange(rb, xid, lsn, change, false); MemoryContextSwitchTo(oldcontext); } @@ -2811,15 +2880,16 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) txn = dlist_container(ReorderBufferTXN, node, iter.cur); - /* if the current transaction is larger, remember it */ - if ((!largest) || (txn->size > largest->size)) - largest = txn; + /* + * if the current transaction is larger and doesn't have incomplete data + * remember it. + */ + if (((!largest) || (txn->total_size > largest->total_size)) && + ((txn->total_size > 0) && !(rbtxn_has_toast_insert(txn)) && + !(rbtxn_has_spec_insert(txn)))) + largest = txn; } - Assert(largest); - Assert(largest->size > 0); - Assert(largest->size <= rb->size); - return largest; } @@ -2837,66 +2907,51 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) - return; - - /* - * Pick the largest transaction (or subtransaction) and evict it from - * memory by streaming, if supported. Otherwise spill to disk. - */ - if (ReorderBufferCanStream(rb)) + /* Loop until we reach under the memory limit. */ + while (rb->size >= logical_decoding_work_mem * 1024L) { /* - * Pick the largest toplevel transaction and evict it from memory by - * streaming the already decoded part. + * Pick the largest transaction (or subtransaction) and evict it from + * memory by streaming, if supported. Otherwise spill to disk. */ - txn = ReorderBufferLargestTopTXN(rb); + if (ReorderBufferCanStream(rb) && + (txn = ReorderBufferLargestTopTXN(rb)) != NULL) + { + /* we know there has to be one, because the size is not zero */ + Assert(txn && !txn->toptxn); + Assert(txn->total_size > 0); + Assert(rb->size >= txn->total_size); - /* we know there has to be one, because the size is not zero */ - Assert(txn && !txn->toptxn); - Assert(txn->size > 0); - Assert(rb->size >= txn->size); + ReorderBufferStreamTXN(rb, txn); + } + else + { + /* + * Pick the largest transaction (or subtransaction) and evict it from + * memory by serializing it to disk. + */ + txn = ReorderBufferLargestTXN(rb); - ReorderBufferStreamTXN(rb, txn); - } - else - { - /* - * Pick the largest transaction (or subtransaction) and evict it from - * memory by serializing it to disk. - */ - txn = ReorderBufferLargestTXN(rb); + /* we know there has to be one, because the size is not zero */ + Assert(txn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); - /* we know there has to be one, because the size is not zero */ - Assert(txn); - Assert(txn->size > 0); - Assert(rb->size >= txn->size); + ReorderBufferSerializeTXN(rb, txn); + } - ReorderBufferSerializeTXN(rb, txn); + /* + * After eviction, the transaction should have no entries in memory, and + * should use 0 bytes for changes. + * + * XXX Checking the size is fine for both cases - spill to disk and + * streaming. But for streaming we should really check nentries_mem for + * all subtransactions too. + */ + Assert(txn->size == 0); + Assert(txn->nentries_mem == 0); } - /* - * After eviction, the transaction should have no entries in memory, and - * should use 0 bytes for changes. - * - * XXX Checking the size is fine for both cases - spill to disk and - * streaming. But for streaming we should really check nentries_mem for - * all subtransactions too. - */ - Assert(txn->size == 0); - Assert(txn->nentries_mem == 0); - - /* - * And furthermore, evicting the transaction should get us below the - * memory limit again - it is not possible that we're still exceeding the - * memory limit after evicting the transaction. - * - * This follows from the simple fact that the selected transaction is at - * least as large as the most recent change (which caused us to go over - * the memory limit). So by evicting it we're definitely back below the - * memory limit. - */ Assert(rb->size < logical_decoding_work_mem * 1024L); } diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 95d18cdb12..aa17f7df84 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -67,6 +67,7 @@ #define XLH_INSERT_LAST_IN_MULTI (1<<1) #define XLH_INSERT_IS_SPECULATIVE (1<<2) #define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3) +#define XLH_INSERT_ON_TOAST_RELATION (1<<4) /* * xl_heap_update flag values, 8 bits are available. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 603f325663..ba2ab7185c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -172,6 +172,8 @@ typedef struct ReorderBufferChange #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 #define RBTXN_IS_STREAMED 0x0008 +#define RBTXN_HAS_TOAST_INSERT 0x0010 +#define RBTXN_HAS_SPEC_INSERT 0x0020 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -191,6 +193,17 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ ) +/* This transaction's changes has toast insert, without main table insert. */ +#define rbtxn_has_toast_insert(txn) \ + ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \ + +/* + * This transaction's changes has speculative insert, without speculative + * confirm. + */ +#define rbtxn_has_spec_insert(txn) \ + ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \ + /* * Has this transaction been streamed to downstream? * @@ -199,10 +212,6 @@ typedef struct ReorderBufferChange * which case we'd have nentries==0 for the toplevel one, which would say * nothing about the streaming. So we maintain this flag, but only for the * toplevel transaction.) - * - * Note: We never do both stream and serialize a transaction (we only spill - * to disk when streaming is not supported by the plugin), so only one of - * those two flags may be set at any given time. */ #define rbtxn_is_streamed(txn) \ ( \ @@ -355,6 +364,9 @@ typedef struct ReorderBufferTXN * Size of this transaction (changes currently in memory, in bytes). */ Size size; + + /* Size of top-transaction including sub-transactions. */ + Size total_size; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ @@ -545,7 +557,9 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids); void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); -void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, + XLogRecPtr lsn, ReorderBufferChange *, + bool incomplte_data); void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); -- 2.23.0