diff --git a/contrib/amcheck/verify_nbtree.c b/contrib/amcheck/verify_nbtree.c index e7d807c5cb..768cf19700 100644 --- a/contrib/amcheck/verify_nbtree.c +++ b/contrib/amcheck/verify_nbtree.c @@ -722,6 +722,38 @@ bt_target_page_check(BtreeCheckState *state) elog(DEBUG2, "verifying %u items on %s block %u", max, P_ISLEAF(topaque) ? "leaf" : "internal", state->targetblock); + + /* Check the number of attributes in high key if any */ + if (!P_RIGHTMOST(topaque)) + { + if (!_bt_check_natts(state->rel, state->target, P_HIKEY)) + { + ItemId itemid; + IndexTuple itup; + char *itid, + *htid; + + itemid = PageGetItemId(state->target, P_HIKEY); + itup = (IndexTuple) PageGetItem(state->target, itemid); + itid = psprintf("(%u,%u)", state->targetblock, P_HIKEY); + htid = psprintf("(%u,%u)", + ItemPointerGetBlockNumber(&(itup->t_tid)), + ItemPointerGetOffsetNumber(&(itup->t_tid))); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("wrong number of index tuple attributes for index \"%s\"", + RelationGetRelationName(state->rel)), + errdetail_internal("Index tid=%s points to %s tid=%s page lsn=%X/%X.", + itid, + P_ISLEAF(topaque) ? "heap" : "index", + htid, + (uint32) (state->targetlsn >> 32), + (uint32) state->targetlsn))); + } + } + + /* * Loop over page items, starting from first non-highkey item, not high * key (if any). Also, immediately skip "negative infinity" real item (if @@ -770,6 +802,29 @@ bt_target_page_check(BtreeCheckState *state) /* Build insertion scankey for current page offset */ skey = _bt_mkscankey(state->rel, itup); + /* Check the number of index tuple attributes */ + if (!_bt_check_natts(state->rel, state->target, offset)) + { + char *itid, + *htid; + + itid = psprintf("(%u,%u)", state->targetblock, offset); + htid = psprintf("(%u,%u)", + ItemPointerGetBlockNumber(&(itup->t_tid)), + ItemPointerGetOffsetNumber(&(itup->t_tid))); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("wrong number of index tuple attributes for index \"%s\"", + RelationGetRelationName(state->rel)), + errdetail_internal("Index tid=%s points to %s tid=%s page lsn=%X/%X.", + itid, + P_ISLEAF(topaque) ? "heap" : "index", + htid, + (uint32) (state->targetlsn >> 32), + (uint32) state->targetlsn))); + } + /* Fingerprint leaf page tuples (those that point to the heap) */ if (state->heapallindexed && P_ISLEAF(topaque) && !ItemIdIsDead(itemid)) bloom_add_element(state->filter, (unsigned char *) itup, tupsize); diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c index a58bd95620..ea6ad941ed 100644 --- a/src/backend/access/common/indextuple.c +++ b/src/backend/access/common/indextuple.c @@ -448,8 +448,8 @@ CopyIndexTuple(IndexTuple source) } /* - * Reform index tuple. Truncate nonkey (INCLUDE) attributes. - * Pass the number of attributes the truncated tuple must contain. + * Truncate tailing attributes from given index tuple leaving it with + * new_indnatts number of attributes. */ IndexTuple index_truncate_tuple(Relation idxrel, IndexTuple olditup, int new_indnatts) diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 3a96824fbe..a534095ee7 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -1194,7 +1194,7 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, */ if (indnatts != indnkeyatts && P_ISLEAF(lopaque)) { - lefthikey = index_truncate_tuple(rel, item, indnkeyatts); + lefthikey = _bt_truncate_tuple(rel, item); itemsz = IndexTupleSize(lefthikey); itemsz = MAXALIGN(itemsz); } @@ -1816,7 +1816,7 @@ _bt_insert_parent(Relation rel, /* form an index tuple that points at the new right page */ new_item = CopyIndexTuple(ritem); - ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY); + ItemPointerSetBlockNumber(&(new_item->t_tid), rbknum); /* * Find the parent buffer and get the parent page. @@ -2081,7 +2081,8 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) left_item_sz = sizeof(IndexTupleData); left_item = (IndexTuple) palloc(left_item_sz); left_item->t_info = left_item_sz; - ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY); + ItemPointerSetBlockNumber(&(left_item->t_tid), lbkno); + BTreeTupSetNAtts(left_item, 0); /* * Create downlink item for right page. The key for it is obtained from @@ -2091,7 +2092,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) right_item_sz = ItemIdGetLength(itemid); item = (IndexTuple) PageGetItem(lpage, itemid); right_item = CopyIndexTuple(item); - ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY); + ItemPointerSetBlockNumber(&(right_item->t_tid), rbkno); /* NO EREPORT(ERROR) from here till newroot op is logged */ START_CRIT_SECTION(); diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index e6bfb18e7b..6d3637921c 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -985,7 +985,7 @@ _bt_lock_branch_parent(Relation rel, BlockNumber child, BTStack stack, * Locate the downlink of "child" in the parent (updating the stack entry * if needed) */ - ItemPointerSet(&(stack->bts_btentry.t_tid), child, P_HIKEY); + ItemPointerSetBlockNumber(&(stack->bts_btentry.t_tid), child); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); if (pbuf == InvalidBuffer) elog(ERROR, "failed to re-find parent key in index \"%s\" for deletion target page %u", @@ -1425,7 +1425,7 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack) itemid = PageGetItemId(page, topoff); itup = (IndexTuple) PageGetItem(page, itemid); - ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY); + ItemPointerSetBlockNumber(&(itup->t_tid), rightsib); nextoffset = OffsetNumberNext(topoff); PageIndexTupleDelete(page, nextoffset); @@ -1444,7 +1444,7 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack) MemSet(&trunctuple, 0, sizeof(IndexTupleData)); trunctuple.t_info = sizeof(IndexTupleData); if (target != leafblkno) - ItemPointerSet(&trunctuple.t_tid, target, P_HIKEY); + ItemPointerSetBlockNumber(&trunctuple.t_tid, target); else ItemPointerSetInvalid(&trunctuple.t_tid); if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY, @@ -1763,7 +1763,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsib_empty) if (nextchild == InvalidBlockNumber) ItemPointerSetInvalid(leafhikey); else - ItemPointerSet(leafhikey, nextchild, P_HIKEY); + ItemPointerSetBlockNumber(leafhikey, nextchild); } /* diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index 51dca64e13..c0f7ba1243 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -443,6 +443,17 @@ _bt_compare(Relation rel, if (!P_ISLEAF(opaque) && offnum == P_FIRSTDATAKEY(opaque)) return 1; + /* + * Check tuple has correct number of attributes. + */ + if (unlikely(!_bt_check_natts(rel, page, offnum))) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("tuple has wrong number of attributes in index \"%s\"", + RelationGetRelationName(rel)))); + } + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); /* @@ -1959,3 +1970,29 @@ _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) so->numKilled = 0; /* just paranoia */ so->markItemIndex = -1; /* ditto */ } + +/* + * Check if index tuple have appropriate number of attributes. + */ +bool +_bt_check_natts(Relation index, Page page, OffsetNumber offnum) +{ + int16 natts = IndexRelationGetNumberOfAttributes(index); + int16 nkeyatts = IndexRelationGetNumberOfKeyAttributes(index); + ItemId itemid; + IndexTuple itup; + BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); + + itemid = PageGetItemId(page, offnum); + itup = (IndexTuple) PageGetItem(page, itemid); + + /* + * Pivot tuples stored in non-leaf pages and hikeys of leaf pages should + * have nkeyatts number of attributes. While regular tuples of leaf pages + * should have natts number of attributes. + */ + if (P_ISLEAF(opaque) && offnum >= P_FIRSTDATAKEY(opaque)) + return (BtreeTupGetNAtts(itup, index) == natts); + else + return (BtreeTupGetNAtts(itup, index) == nkeyatts); +} diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index d19348a206..91441b467c 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -899,7 +899,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) * it will be that in the future. Now the purpose is just to save * more space on inner pages of btree. */ - keytup = index_truncate_tuple(wstate->index, oitup, indnkeyatts); + keytup = _bt_truncate_tuple(wstate->index, oitup); /* delete "wrong" high key, insert keytup as P_HIKEY. */ PageIndexTupleDelete(opage, P_HIKEY); @@ -918,7 +918,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) state->btps_next = _bt_pagestate(wstate, state->btps_level + 1); Assert(state->btps_minkey != NULL); - ItemPointerSet(&(state->btps_minkey->t_tid), oblkno, P_HIKEY); + ItemPointerSetBlockNumber(&(state->btps_minkey->t_tid), oblkno); _bt_buildadd(wstate, state->btps_next, state->btps_minkey); pfree(state->btps_minkey); @@ -972,8 +972,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) * into the parent page as a downlink */ if (indnkeyatts != indnatts && P_ISLEAF(pageop)) - state->btps_minkey = index_truncate_tuple(wstate->index, - itup, indnkeyatts); + state->btps_minkey = _bt_truncate_tuple(wstate->index, itup); else state->btps_minkey = CopyIndexTuple(itup); } @@ -1028,7 +1027,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state) else { Assert(s->btps_minkey != NULL); - ItemPointerSet(&(s->btps_minkey->t_tid), blkno, P_HIKEY); + ItemPointerSetBlockNumber(&(s->btps_minkey->t_tid), blkno); _bt_buildadd(wstate, s->btps_next, s->btps_minkey); pfree(s->btps_minkey); s->btps_minkey = NULL; diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 2fc5924bf0..149b52e3ad 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -2078,3 +2078,23 @@ btproperty(Oid index_oid, int attno, return false; /* punt to generic code */ } } + +/* + * _bt_truncate_tuple() -- remove non-key (INCLUDE) attributes from index + * tuple. + * + * Transforms an ordinal B-tree leaf index tuple into pivot tuple to be used + * as hikey or non-leaf page tuple with downlink. Note that t_tid offset + * will be overritten in order to represent number of present tuple attributes. + */ +IndexTuple +_bt_truncate_tuple(Relation idxrel, IndexTuple olditup) +{ + IndexTuple newitup; + int nkeyattrs = IndexRelationGetNumberOfKeyAttributes(idxrel); + + newitup = index_truncate_tuple(idxrel, olditup, nkeyattrs); + BTreeTupSetNAtts(newitup, nkeyattrs); + + return newitup; +} diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index bbfe860e36..e09a389181 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -764,7 +764,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record) itemid = PageGetItemId(page, poffset); itup = (IndexTuple) PageGetItem(page, itemid); - ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY); + ItemPointerSetBlockNumber(&(itup->t_tid), rightsib); nextoffset = OffsetNumberNext(poffset); PageIndexTupleDelete(page, nextoffset); @@ -794,7 +794,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record) MemSet(&trunctuple, 0, sizeof(IndexTupleData)); trunctuple.t_info = sizeof(IndexTupleData); if (xlrec->topparent != InvalidBlockNumber) - ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY); + ItemPointerSetBlockNumber(&trunctuple.t_tid, xlrec->topparent); else ItemPointerSetInvalid(&trunctuple.t_tid); if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY, @@ -904,7 +904,7 @@ btree_xlog_unlink_page(uint8 info, XLogReaderState *record) MemSet(&trunctuple, 0, sizeof(IndexTupleData)); trunctuple.t_info = sizeof(IndexTupleData); if (xlrec->topparent != InvalidBlockNumber) - ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY); + ItemPointerSetBlockNumber(&trunctuple.t_tid, xlrec->topparent); else ItemPointerSetInvalid(&trunctuple.t_tid); if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY, diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 053f8aa345..0aea5b171a 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -151,11 +151,8 @@ typedef struct BTMetaPageData * as unique identifier for a given index tuple (logical position * within a level). - vadim 04/09/97 */ -#define BTTidSame(i1, i2) \ - ((ItemPointerGetBlockNumber(&(i1)) == ItemPointerGetBlockNumber(&(i2))) && \ - (ItemPointerGetOffsetNumber(&(i1)) == ItemPointerGetOffsetNumber(&(i2)))) #define BTEntrySame(i1, i2) \ - BTTidSame((i1)->t_tid, (i2)->t_tid) + ((ItemPointerGetBlockNumber(&(i1)->t_tid) == ItemPointerGetBlockNumber(&(i2)->t_tid))) /* @@ -206,6 +203,49 @@ typedef struct BTMetaPageData #define P_FIRSTDATAKEY(opaque) (P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY) +/* + * In B-tree index with INCLUDE clause, pivot tuples used in non-leaf pages + * and as hikeys are truncated. So, such tuples don't contain included + * attributes. In order to keep on-disk compatibility with upcoming suffix + * truncation of pivot tuples, we store number of attributes present inside + * tuple itself. Thankfully, offset number is always unused in pivot tuple. + * So, we use free bit of index tuple flags as sign that offset have + * alternative meaning: it stores number of keys present in index tuple + * (12 bit is far enough for that). And we have 4 bits reserved + * for future usage. + * + * It's possible that index tuple has zero attributes (leftmost item of + * iternal page). And we have assertion that offset number is greater or equal + * to 1. This is why we store (number_of_attributes + 1) in offset number. + */ +#define INDEX_ALT_TID_MASK 0x2000 /* flag indicating t_tid offset has + an alternative meaning */ +#define BT_RESERVED_OFFSET_MASK 0xF000 /* mask of bits in t_tid offset + reserved for future usage */ +#define BT_N_KEYS_OFFSET_MASK 0x0FFF /* mask of bits in t_tid offset + holding number of attributes + actually present in index tuple */ + +/* Set number of attributes to B-tree index tuple overriding t_tid offset */ +#define BTreeTupSetNAtts(itup, n) \ + do { \ + (itup)->t_info |= INDEX_ALT_TID_MASK; \ + ItemPointerSetOffsetNumber(&(itup)->t_tid, (n) + 1); \ + } while(0) + +/* Get number of attributes in B-tree index tuple */ +#define BtreeTupGetNAtts(itup, index) \ + ( \ + (itup)->t_info & INDEX_ALT_TID_MASK ? \ + ( \ + AssertMacro((ItemPointerGetOffsetNumber(&(itup)->t_tid) & BT_RESERVED_OFFSET_MASK) == 0), \ + ItemPointerGetOffsetNumber(&(itup)->t_tid) & BT_N_KEYS_OFFSET_MASK - 1 \ + ) \ + : \ + IndexRelationGetNumberOfAttributes(index) \ + ) + + /* * Operator strategy numbers for B-tree have been moved to access/stratnum.h, * because many places need to use them in ScanKeyInit() calls. @@ -517,6 +557,7 @@ extern bool _bt_first(IndexScanDesc scan, ScanDirection dir); extern bool _bt_next(IndexScanDesc scan, ScanDirection dir); extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost, Snapshot snapshot); +extern bool _bt_check_natts(Relation index, Page page, OffsetNumber offnum); /* * prototypes for functions in nbtutils.c @@ -545,6 +586,7 @@ extern bytea *btoptions(Datum reloptions, bool validate); extern bool btproperty(Oid index_oid, int attno, IndexAMProperty prop, const char *propname, bool *res, bool *isnull); +extern IndexTuple _bt_truncate_tuple(Relation idxrel, IndexTuple olditup); /* * prototypes for functions in nbtvalidate.c