diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c index 934d65b89f..4c133b7106 100644 --- a/src/backend/access/spgist/spgdoinsert.c +++ b/src/backend/access/spgist/spgdoinsert.c @@ -22,7 +22,7 @@ #include "miscadmin.h" #include "storage/bufmgr.h" #include "utils/rel.h" - +#include "access/htup_details.h" /* * SPPageDesc tracks all info about a page we are inserting into. In some @@ -220,7 +220,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SpGistBlockIsRoot(current->blkno)) { /* Tuple is not part of a chain */ - leafTuple->nextOffset = InvalidOffsetNumber; + SGLT_SET_OFFSET(leafTuple->nextOffset, InvalidOffsetNumber); current->offnum = SpGistPageAddNewItem(state, current->page, (Item) leafTuple, leafTuple->size, NULL, false); @@ -253,7 +253,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, PageGetItemId(current->page, current->offnum)); if (head->tupstate == SPGIST_LIVE) { - leafTuple->nextOffset = head->nextOffset; + SGLT_SET_OFFSET(leafTuple->nextOffset, SGLT_GET_OFFSET(head->nextOffset)); offnum = SpGistPageAddNewItem(state, current->page, (Item) leafTuple, leafTuple->size, NULL, false); @@ -264,14 +264,14 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, */ head = (SpGistLeafTuple) PageGetItem(current->page, PageGetItemId(current->page, current->offnum)); - head->nextOffset = offnum; + SGLT_SET_OFFSET(head->nextOffset, offnum); xlrec.offnumLeaf = offnum; xlrec.offnumHeadLeaf = current->offnum; } else if (head->tupstate == SPGIST_DEAD) { - leafTuple->nextOffset = InvalidOffsetNumber; + SGLT_SET_OFFSET(leafTuple->nextOffset, InvalidOffsetNumber); PageIndexTupleDelete(current->page, current->offnum); if (PageAddItem(current->page, (Item) leafTuple, leafTuple->size, @@ -362,13 +362,13 @@ checkSplitConditions(Relation index, SpGistState *state, { /* We could see a DEAD tuple as first/only chain item */ Assert(i == current->offnum); - Assert(it->nextOffset == InvalidOffsetNumber); + Assert(SGLT_GET_OFFSET(it->nextOffset) == InvalidOffsetNumber); /* Don't count it in result, because it won't go to other page */ } else elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); - i = it->nextOffset; + i = SGLT_GET_OFFSET(it->nextOffset); } *nToSplit = n; @@ -437,7 +437,7 @@ moveLeafs(Relation index, SpGistState *state, { /* We could see a DEAD tuple as first/only chain item */ Assert(i == current->offnum); - Assert(it->nextOffset == InvalidOffsetNumber); + Assert(SGLT_GET_OFFSET(it->nextOffset) == InvalidOffsetNumber); /* We don't want to move it, so don't count it in size */ toDelete[nDelete] = i; nDelete++; @@ -446,7 +446,7 @@ moveLeafs(Relation index, SpGistState *state, else elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); - i = it->nextOffset; + i = SGLT_GET_OFFSET(it->nextOffset); } /* Find a leaf page that will hold them */ @@ -475,7 +475,7 @@ moveLeafs(Relation index, SpGistState *state, * don't care). We're modifying the tuple on the source page * here, but it's okay since we're about to delete it. */ - it->nextOffset = r; + SGLT_SET_OFFSET(it->nextOffset, r); r = SpGistPageAddNewItem(state, npage, (Item) it, it->size, &startOffset, false); @@ -490,7 +490,7 @@ moveLeafs(Relation index, SpGistState *state, } /* add the new tuple as well */ - newLeafTuple->nextOffset = r; + SGLT_SET_OFFSET(newLeafTuple->nextOffset, r); r = SpGistPageAddNewItem(state, npage, (Item) newLeafTuple, newLeafTuple->size, &startOffset, false); @@ -709,6 +709,9 @@ doPickSplit(Relation index, SpGistState *state, int nToDelete, nToInsert, maxToInclude; + Datum *leafChainDatums; + bool *leafChainIsnulls; + const int natts = IndexRelationGetNumberOfAttributes(index); in.level = level; @@ -723,14 +726,16 @@ doPickSplit(Relation index, SpGistState *state, toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n); newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n); leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n); - STORE_STATE(state, xlrec.stateSrc); + leafChainDatums = (Datum *) palloc(n * natts * sizeof(Datum)); + leafChainIsnulls = (bool *) palloc(n * natts * sizeof(bool)); + /* - * Form list of leaf tuples which will be distributed as split result; - * also, count up the amount of space that will be freed from current. - * (Note that in the non-root case, we won't actually delete the old - * tuples, only replace them with redirects or placeholders.) + * Collect leaf tuples which will be distributed as split result; also, + * count up the amount of space that will be freed from current. (Note + * that in the non-root case, we won't actually delete the old tuples, + * only replace them with redirects or placeholders.) * * Note: the SGLTDATUM calls here are safe even when dealing with a nulls * page. For a pass-by-value data type we will fetch a word that must @@ -738,7 +743,15 @@ doPickSplit(Relation index, SpGistState *state, * tuples must have size at least SGDTSIZE). For a pass-by-reference type * we are just computing a pointer that isn't going to get dereferenced. * So it's not worth guarding the calls with isNulls checks. + * + * Datums and isnulls of all leaf tuple attributes in a chain are + * collected into 2-d arrays: (number of tuples in chain) x (number of + * attributes) First attribute is key, the other - included attributes (if + * any). After picksplit we need to form new leaf tuples as key attribute + * length can change which can affect alignment of every include + * attribute. */ + nToInsert = 0; nToDelete = 0; spaceToDelete = 0; @@ -759,6 +772,8 @@ doPickSplit(Relation index, SpGistState *state, { in.datums[nToInsert] = SGLTDATUM(it, state); heapPtrs[nToInsert] = it->heapPtr; + SpGistDeformLeafTuple(it, state, leafChainDatums + nToInsert * natts, + leafChainIsnulls + nToInsert * natts, isNulls); nToInsert++; toDelete[nToDelete] = i; nToDelete++; @@ -784,6 +799,9 @@ doPickSplit(Relation index, SpGistState *state, { in.datums[nToInsert] = SGLTDATUM(it, state); heapPtrs[nToInsert] = it->heapPtr; + + SpGistDeformLeafTuple(it, state, leafChainDatums + nToInsert * natts, + leafChainIsnulls + nToInsert * natts, isNulls); nToInsert++; toDelete[nToDelete] = i; nToDelete++; @@ -795,7 +813,7 @@ doPickSplit(Relation index, SpGistState *state, { /* We could see a DEAD tuple as first/only chain item */ Assert(i == current->offnum); - Assert(it->nextOffset == InvalidOffsetNumber); + Assert(SGLT_GET_OFFSET(it->nextOffset) == InvalidOffsetNumber); toDelete[nToDelete] = i; nToDelete++; /* replacing it with redirect will save no space */ @@ -803,7 +821,7 @@ doPickSplit(Relation index, SpGistState *state, else elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); - i = it->nextOffset; + i = SGLT_GET_OFFSET(it->nextOffset); } } in.nTuples = nToInsert; @@ -816,10 +834,17 @@ doPickSplit(Relation index, SpGistState *state, */ in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state); heapPtrs[in.nTuples] = newLeafTuple->heapPtr; + + SpGistDeformLeafTuple(newLeafTuple, state, leafChainDatums + (in.nTuples) * natts, + leafChainIsnulls + (in.nTuples) * natts, isNulls); in.nTuples++; memset(&out, 0, sizeof(out)); + /* + * Process collected key values of tuples from the chain. Included values + * are used to build fresh leaf tuples unchanged. + */ if (!isNulls) { /* @@ -837,9 +862,11 @@ doPickSplit(Relation index, SpGistState *state, totalLeafSizes = 0; for (i = 0; i < in.nTuples; i++) { - newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, - out.leafTupleDatums[i], - false); + *(leafChainDatums + i * natts) = (Datum) out.leafTupleDatums[i]; + *(leafChainIsnulls + i * natts) = false; + + newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, leafChainDatums + i * natts, + leafChainIsnulls + i * natts); totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); } } @@ -860,9 +887,14 @@ doPickSplit(Relation index, SpGistState *state, totalLeafSizes = 0; for (i = 0; i < in.nTuples; i++) { - newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, - (Datum) 0, - true); + /* + * Nulls tree can contain only null key values. + */ + *(leafChainDatums + i * natts) = (Datum) 0; + *(leafChainIsnulls + i * natts) = true; + + newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, leafChainDatums + i * natts, + leafChainIsnulls + i * natts); totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); } } @@ -1196,10 +1228,10 @@ doPickSplit(Relation index, SpGistState *state, if (ItemPointerIsValid(&nodes[n]->t_tid)) { Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock); - it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid); + SGLT_SET_OFFSET(it->nextOffset, ItemPointerGetOffsetNumber(&nodes[n]->t_tid)); } else - it->nextOffset = InvalidOffsetNumber; + SGLT_SET_OFFSET(it->nextOffset, InvalidOffsetNumber); /* Insert it on page */ newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer), @@ -1889,67 +1921,83 @@ spgSplitNodeAction(Relation index, SpGistState *state, */ bool spgdoinsert(Relation index, SpGistState *state, - ItemPointer heapPtr, Datum datum, bool isnull) + ItemPointer heapPtr, Datum *datum, bool *isnull) { int level = 0; - Datum leafDatum; + Datum *leafDatum; int leafSize; SPPageDesc current, parent; FmgrInfo *procinfo = NULL; + int i; /* * Look up FmgrInfo of the user-defined choose function once, to save * cycles in the loop below. */ - if (!isnull) + if (!isnull[0]) procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC); /* * Prepare the leaf datum to insert. - * + */ + + leafDatum = (Datum *) palloc0(sizeof(Datum) * (IndexRelationGetNumberOfAttributes(index))); + + /* * If an optional "compress" method is provided, then call it to form the - * leaf datum from the input datum. Otherwise store the input datum as - * is. Since we don't use index_form_tuple in this AM, we have to make - * sure value to be inserted is not toasted; FormIndexDatum doesn't - * guarantee that. But we assume the "compress" method to return an - * untoasted value. + * key datum from the input datum. Otherwise store the input datum as is. + * Since we don't use index_form_tuple in this AM, we have to make sure + * value to be inserted is not toasted; FormIndexDatum doesn't guarantee + * that. But we assume the "compress" method to return an untoasted + * value. */ - if (!isnull) + if (!isnull[0]) { if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC))) { FmgrInfo *compressProcinfo = NULL; compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC); - leafDatum = FunctionCall1Coll(compressProcinfo, - index->rd_indcollation[0], - datum); + leafDatum[0] = FunctionCall1Coll(compressProcinfo, + index->rd_indcollation[0], + datum[0]); } else { Assert(state->attLeafType.type == state->attType.type); if (state->attType.attlen == -1) - leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum)); + leafDatum[0] = PointerGetDatum(PG_DETOAST_DATUM(datum[0])); else - leafDatum = datum; + leafDatum[0] = datum[0]; } } else - leafDatum = (Datum) 0; + leafDatum[0] = (Datum) 0; + + for (i = 1; i < IndexRelationGetNumberOfAttributes(index); i++) + { + if (!isnull[i]) + { + if (TupleDescAttr(state->includeTupdesc, i - 1)->attlen == -1) + leafDatum[i] = PointerGetDatum(PG_DETOAST_DATUM(datum[i])); + else + leafDatum[i] = datum[i]; + } + else + leafDatum[i] = (Datum) 0; + } + /* - * Compute space needed for a leaf tuple containing the given datum. + * Compute space needed on a page for a leaf tuple containing the given + * datum. * * If it isn't gonna fit, and the opclass can't reduce the datum size by * suffixing, bail out now rather than getting into an endless loop. */ - if (!isnull) - leafSize = SGLTHDRSZ + sizeof(ItemIdData) + - SpGistGetTypeSize(&state->attLeafType, leafDatum); - else - leafSize = SGDTSIZE + sizeof(ItemIdData); + leafSize = SpgLeafSize(state, leafDatum, isnull) + sizeof(ItemIdData); if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK) ereport(ERROR, @@ -1961,7 +2009,7 @@ spgdoinsert(Relation index, SpGistState *state, errhint("Values larger than a buffer page cannot be indexed."))); /* Initialize "current" to the appropriate root page */ - current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO; + current.blkno = isnull[0] ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO; current.buffer = InvalidBuffer; current.page = NULL; current.offnum = FirstOffsetNumber; @@ -1995,7 +2043,7 @@ spgdoinsert(Relation index, SpGistState *state, */ current.buffer = SpGistGetBuffer(index, - GBUF_LEAF | (isnull ? GBUF_NULLS : 0), + GBUF_LEAF | (isnull[0] ? GBUF_NULLS : 0), Min(leafSize, SPGIST_PAGE_CAPACITY), &isNew); current.blkno = BufferGetBlockNumber(current.buffer); @@ -2037,7 +2085,7 @@ spgdoinsert(Relation index, SpGistState *state, current.page = BufferGetPage(current.buffer); /* should not arrive at a page of the wrong type */ - if (isnull ? !SpGistPageStoresNulls(current.page) : + if (isnull[0] ? !SpGistPageStoresNulls(current.page) : SpGistPageStoresNulls(current.page)) elog(ERROR, "SPGiST index page %u has wrong nulls flag", current.blkno); @@ -2054,7 +2102,7 @@ spgdoinsert(Relation index, SpGistState *state, { /* it fits on page, so insert it and we're done */ addLeafTuple(index, state, leafTuple, - ¤t, &parent, isnull, isNew); + ¤t, &parent, isnull[0], isNew); break; } else if ((sizeToSplit = @@ -2068,14 +2116,14 @@ spgdoinsert(Relation index, SpGistState *state, * chain to another leaf page rather than splitting it. */ Assert(!isNew); - moveLeafs(index, state, ¤t, &parent, leafTuple, isnull); + moveLeafs(index, state, ¤t, &parent, leafTuple, isnull[0]); break; /* we're done */ } else { /* picksplit */ if (doPickSplit(index, state, ¤t, &parent, - leafTuple, level, isnull, isNew)) + leafTuple, level, isnull[0], isNew)) break; /* doPickSplit installed new tuples */ /* leaf tuple will not be inserted yet */ @@ -2110,8 +2158,8 @@ spgdoinsert(Relation index, SpGistState *state, innerTuple = (SpGistInnerTuple) PageGetItem(current.page, PageGetItemId(current.page, current.offnum)); - in.datum = datum; - in.leafDatum = leafDatum; + in.datum = datum[0]; + in.leafDatum = leafDatum[0]; in.level = level; in.allTheSame = innerTuple->allTheSame; in.hasPrefix = (innerTuple->prefixSize > 0); @@ -2121,7 +2169,7 @@ spgdoinsert(Relation index, SpGistState *state, memset(&out, 0, sizeof(out)); - if (!isnull) + if (!isnull[0]) { /* use user-defined choose method */ FunctionCall2Coll(procinfo, @@ -2158,11 +2206,11 @@ spgdoinsert(Relation index, SpGistState *state, /* Adjust level as per opclass request */ level += out.result.matchNode.levelAdd; /* Replace leafDatum and recompute leafSize */ - if (!isnull) + if (!isnull[0]) { - leafDatum = out.result.matchNode.restDatum; - leafSize = SGLTHDRSZ + sizeof(ItemIdData) + - SpGistGetTypeSize(&state->attLeafType, leafDatum); + leafDatum[0] = out.result.matchNode.restDatum; + leafSize = SpgLeafSize(state, leafDatum, isnull) + + sizeof(ItemIdData); } /* @@ -2227,6 +2275,6 @@ spgdoinsert(Relation index, SpGistState *state, SpGistSetLastUsedPage(index, parent.buffer); UnlockReleaseBuffer(parent.buffer); } - + pfree(leafDatum); return true; } diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c index e4508a2b92..b54ae85f6e 100644 --- a/src/backend/access/spgist/spginsert.c +++ b/src/backend/access/spgist/spginsert.c @@ -55,8 +55,7 @@ spgistBuildCallback(Relation index, ItemPointer tid, Datum *values, * lock on some buffer. So we need to be willing to retry. We can flush * any temp data when retrying. */ - while (!spgdoinsert(index, &buildstate->spgstate, tid, - *values, *isnull)) + while (!spgdoinsert(index, &buildstate->spgstate, tid, values, isnull)) { MemoryContextReset(buildstate->tmpCtx); } @@ -226,7 +225,7 @@ spginsert(Relation index, Datum *values, bool *isnull, * to avoid cumulative memory consumption. That means we also have to * redo initSpGistState(), but it's cheap enough not to matter. */ - while (!spgdoinsert(index, &spgstate, ht_ctid, *values, *isnull)) + while (!spgdoinsert(index, &spgstate, ht_ctid, values, isnull)) { MemoryContextReset(insertCtx); initSpGistState(&spgstate, index); diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c index 4d506bfb9a..5a3c7c50cf 100644 --- a/src/backend/access/spgist/spgscan.c +++ b/src/backend/access/spgist/spgscan.c @@ -28,7 +28,8 @@ typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr, Datum leafValue, bool isNull, bool recheck, - bool recheckDistances, double *distances); + bool recheckDistances, double *distances, + SpGistLeafTuple leafTuple); /* * Pairing heap comparison function for the SpGistSearchItem queue. @@ -88,6 +89,9 @@ spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item) if (item->traversalValue) pfree(item->traversalValue); + if (item->isLeaf && item->leafTuple) + pfree(item->leafTuple); + pfree(item); } @@ -134,6 +138,8 @@ spgAddStartItem(SpGistScanOpaque so, bool isnull) startEntry->recheck = false; startEntry->recheckDistances = false; + startEntry->leafTuple = NULL; + spgAddSearchItemToQueue(so, startEntry); } @@ -438,14 +444,30 @@ spgendscan(IndexScanDesc scan) * Leaf SpGistSearchItem constructor, called in queue context */ static SpGistSearchItem * -spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr, +spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple, Datum leafValue, bool recheck, bool recheckDistances, bool isnull, double *distances) { SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances); + /* + * If there are include attributes search item in the queue should contain + * them. + */ + if (so->state.includeTupdesc) + { + Assert(so->state.includeTupdesc->natts); + + item->leafTuple = palloc(leafTuple->size); + memcpy(item->leafTuple, leafTuple, leafTuple->size); + } + else + { + item->leafTuple = NULL; + } + item->level = level; - item->heapPtr = *heapPtr; + item->heapPtr = leafTuple->heapPtr; /* copy value to queue cxt out of tmp cxt */ item->value = isnull ? (Datum) 0 : datumCopy(leafValue, so->state.attLeafType.attbyval, @@ -503,6 +525,8 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item, in.returnData = so->want_itup; in.leafDatum = SGLTDATUM(leafTuple, &so->state); + + out.leafValue = (Datum) 0; out.recheck = false; out.distances = NULL; @@ -528,7 +552,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item, /* the scan is ordered -> add the item to the queue */ MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt); SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level, - &leafTuple->heapPtr, + leafTuple, leafValue, recheck, recheckDistances, @@ -543,8 +567,10 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item, { /* non-ordered scan, so report the item right away */ Assert(!recheckDistances); + storeRes(so, &leafTuple->heapPtr, leafValue, isnull, - recheck, false, NULL); + recheck, false, NULL, leafTuple); + *reportedSome = true; } } @@ -736,7 +762,7 @@ spgTestLeafTuple(SpGistScanOpaque so, /* dead tuple should be first in chain */ Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr)); /* No live entries on this page */ - Assert(leafTuple->nextOffset == InvalidOffsetNumber); + Assert(SGLT_GET_OFFSET(leafTuple->nextOffset) == InvalidOffsetNumber); return SpGistBreakOffsetNumber; } } @@ -750,7 +776,7 @@ spgTestLeafTuple(SpGistScanOpaque so, spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes); - return leafTuple->nextOffset; + return SGLT_GET_OFFSET(leafTuple->nextOffset); } /* @@ -782,8 +808,8 @@ redirect: { /* We store heap items in the queue only in case of ordered search */ Assert(so->numberOfNonNullOrderBys > 0); - storeRes(so, &item->heapPtr, item->value, item->isNull, - item->recheck, item->recheckDistances, item->distances); + storeRes(so, &item->heapPtr, item->value, item->isNull, item->recheck, + item->recheckDistances, item->distances, item->leafTuple); reportedSome = true; } else @@ -877,7 +903,7 @@ redirect: static void storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr, Datum leafValue, bool isnull, bool recheck, bool recheckDistances, - double *distances) + double *distances, SpGistLeafTuple leafTuple) { Assert(!recheckDistances && !distances); tbm_add_tuples(so->tbm, heapPtr, 1, recheck); @@ -904,7 +930,7 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm) static void storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr, Datum leafValue, bool isnull, bool recheck, bool recheckDistances, - double *nonNullDistances) + double *nonNullDistances, SpGistLeafTuple leafTuple) { Assert(so->nPtrs < MaxIndexTuplesPerPage); so->heapPtrs[so->nPtrs] = *heapPtr; @@ -949,9 +975,38 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr, * Reconstruct index data. We have to copy the datum out of the temp * context anyway, so we may as well create the tuple here. */ - so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc, - &leafValue, - &isnull); + if (so->state.includeTupdesc) + { + /* Add included attributes */ + Datum *leafDatums; + bool *leafIsnulls; + + Assert(so->state.includeTupdesc->natts); + + leafDatums = (Datum *) palloc(sizeof(Datum) * (so->state.includeTupdesc->natts + 1)); + leafIsnulls = (bool *) palloc(sizeof(bool) * (so->state.includeTupdesc->natts + 1)); + + SpGistDeformLeafTuple(leafTuple, &so->state, leafDatums, leafIsnulls, isnull); + + /* + * override key value extracted from LeafTuple in case we've + * reconstructed it already + */ + leafDatums[0] = leafValue; + leafIsnulls[0] = isnull; + + so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc, + leafDatums, + leafIsnulls); + pfree(leafDatums); + pfree(leafIsnulls); + } + else + { + so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc, + &leafValue, + &isnull); + } } so->nPtrs++; } @@ -1019,6 +1074,10 @@ spgcanreturn(Relation index, int attno) { SpGistCache *cache; + /* Included attributes always can be fetched for index-only scans */ + if (attno > 1) + return true; + /* We can do it if the opclass config function says so */ cache = spgGetCache(index); diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index 0efe05e552..3ca47ff53d 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -31,7 +31,18 @@ #include "utils/index_selfuncs.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "access/itup.h" +#include "access/detoast.h" +#include "access/toast_internals.h" +#include "access/heaptoast.h" +#include "utils/expandeddatum.h" +/* Does att's datatype allow packing into the 1-byte-header varlena format? */ +#define ATT_IS_PACKABLE(att) \ + ((att)->attlen == -1 && (att)->attstorage != TYPSTORAGE_PLAIN) + +Size spgIncludedDataSize(TupleDesc tupleDesc, Datum *values, + bool *isnull, Size start); /* * SP-GiST handler function: return IndexAmRoutine with access method parameters @@ -49,7 +60,7 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amcanorderbyop = true; amroutine->amcanbackward = false; amroutine->amcanunique = false; - amroutine->amcanmulticol = false; + amroutine->amcanmulticol = true; amroutine->amoptionalkey = true; amroutine->amsearcharray = false; amroutine->amsearchnulls = true; @@ -57,7 +68,7 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; - amroutine->amcaninclude = false; + amroutine->amcaninclude = true; amroutine->amusemaintenanceworkmem = false; amroutine->amparallelvacuumoptions = VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP; @@ -116,14 +127,21 @@ spgGetCache(Relation index) cache = MemoryContextAllocZero(index->rd_indexcxt, sizeof(SpGistCache)); - /* SPGiST doesn't support multi-column indexes */ - Assert(index->rd_att->natts == 1); + /* + * SPGiST should have one key column and can also have included + * columns + */ + if (IndexRelationGetNumberOfKeyAttributes(index) != 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("SPGiST index can have only one key column"))); /* - * Get the actual data type of the indexed column from the index - * tupdesc. We pass this to the opclass config function so that - * polymorphic opclasses are possible. + * Get the actual data type of the key column from the index tupdesc. + * We pass this to the opclass config function so that polymorphic + * opclasses are possible. */ + atttype = TupleDescAttr(index->rd_att, 0)->atttypid; /* Call the config function to get config info for the opclass */ @@ -156,6 +174,7 @@ spgGetCache(Relation index) fillTypeDesc(&cache->attPrefixType, cache->config.prefixType); fillTypeDesc(&cache->attLabelType, cache->config.labelType); + /* Last, get the lastUsedPages data from the metapage */ metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO); LockBuffer(metabuffer, BUFFER_LOCK_SHARE); @@ -177,7 +196,23 @@ spgGetCache(Relation index) /* assume it's up to date */ cache = (SpGistCache *) index->rd_amcache; } + /* Form descriptor for included columns if any */ + if (IndexRelationGetNumberOfAttributes(index) > 1) + { + int i; + + cache->includeTupdesc = CreateTemplateTupleDesc( + IndexRelationGetNumberOfAttributes(index) - 1); + for (i = 0; i < IndexRelationGetNumberOfAttributes(index) - 1; i++) + { + TupleDescInitEntry(cache->includeTupdesc, i + 1, NULL, + TupleDescAttr(index->rd_att, i + 1)->atttypid, + -1, 0); + } + } + else + cache->includeTupdesc = NULL; return cache; } @@ -190,6 +225,7 @@ initSpGistState(SpGistState *state, Relation index) /* Get cached static information about index */ cache = spgGetCache(index); + state->includeTupdesc = cache->includeTupdesc; state->config = cache->config; state->attType = cache->attType; state->attLeafType = cache->attLeafType; @@ -603,7 +639,7 @@ spgoptions(Datum reloptions, bool validate) /* * Get the space needed to store a non-null datum of the indicated type. - * Note the result is already rounded up to a MAXALIGN boundary. + * Note the result is not maxaligned and this should be done by caller if needed. * Also, we follow the SPGiST convention that pass-by-val types are * just stored in their Datum representation (compare memcpyDatum). */ @@ -619,7 +655,7 @@ SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum) else size = VARSIZE_ANY(datum); - return MAXALIGN(size); + return size; } /* @@ -642,36 +678,202 @@ memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum) } /* - * Construct a leaf tuple containing the given heap TID and datum value + * Private version of heap_compute_data_size with start address not + * necessarily MAXALIGNed. The reason is that start address (and alignment) + * influence alignment of each of next values and overall size of included + * data area in SpGiST leaf tuple. + */ +Size +spgIncludedDataSize(TupleDesc tupleDesc, + Datum *values, + bool *isnull, Size start) +{ + Size data_length = 0; + int i; + int numberOfAttributes = tupleDesc->natts; + + data_length = start; + for (i = 0; i < numberOfAttributes; i++) + { + Datum val; + Form_pg_attribute atti; + + if (isnull[i]) + continue; + + val = values[i]; + atti = TupleDescAttr(tupleDesc, i); + + if (ATT_IS_PACKABLE(atti) && + VARATT_CAN_MAKE_SHORT(DatumGetPointer(val))) + { + /* + * we're anticipating converting to a short varlena header, so + * adjust length and don't count any alignment + */ + data_length += VARATT_CONVERTED_SHORT_SIZE(DatumGetPointer(val)); + } + else if (atti->attlen == -1 && + VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val))) + { + /* + * we want to flatten the expanded value so that the constructed + * tuple doesn't depend on it + */ + data_length = att_align_nominal(data_length, atti->attalign); + data_length += EOH_get_flat_size(DatumGetEOHP(val)); + } + else + { + data_length = att_align_datum(data_length, atti->attalign, + atti->attlen, val); + data_length = att_addlength_datum(data_length, atti->attlen, + val); + } + } + return data_length - start; +} + +/* Calculate overall leaf tuple size. SGLTHDRSZ is MAXALIGNed only for backward + * compatibility and there might be gap between header and key data. After key + * data there are no such gaps more than is is necessary for each value + * alignment. Overall result is MAXALIGNed.*/ +unsigned int +SpgLeafSize(SpGistState *state, Datum *datum, bool *isnull) +{ + /* compute space needed, nullmask size and offset for include attributes */ + unsigned int size = SGLTHDRSZ; + unsigned int i; + + if (!isnull[0]) + /* key attribute size (not maxaligned) */ + size += SpGistGetTypeSize(&state->attLeafType, datum[0]); + + if (state->includeTupdesc) + { + Assert(state->includeTupdesc->natts); + if (state->includeTupdesc->natts + 1 >= INDEX_MAX_KEYS) + ereport(ERROR, + (errcode(ERRCODE_TOO_MANY_COLUMNS), + errmsg("number of index columns (%d) exceeds limit (%d)", + state->includeTupdesc->natts, INDEX_MAX_KEYS))); + /* nullmask size */ + for (i = 1; i <= state->includeTupdesc->natts; i++) + { + if (isnull[i]) + { + size += (state->includeTupdesc->natts / 8) + 1; + break; + } + } + /* overall included attributes size each with added proper alignment. */ + size += spgIncludedDataSize(state->includeTupdesc, datum + 1, isnull + 1, size); + } + return MAXALIGN(size); +} + +/* + * Construct a leaf tuple containing the given heap TID, key data and included + * columns data. Key data starts from MAXALIGN boundary for backward compatibility. + * Nullmask apply only to included attributes and is placed just after key data if + * there is at least one NULL among included attributes. It doesn't need alignment. + * Then all included columns data follow aligned by their typealign's. */ SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, - Datum datum, bool isnull) + Datum *datum, bool *isnull) { SpGistLeafTuple tup; - unsigned int size; + unsigned int size = SGLTHDRSZ; + unsigned int include_offset = 0; + unsigned int nullmask_size = 0; + unsigned int data_offset = 0; + unsigned int data_size = 0; + uint16 tupmask = 0; + int i; - /* compute space needed (note result is already maxaligned) */ - size = SGLTHDRSZ; - if (!isnull) - size += SpGistGetTypeSize(&state->attLeafType, datum); + /* + * Calculate space needed. If there are include attributes also calculate + * sizes and offsets needed for heap_fill_tuple + */ + if (!isnull[0]) + /* key attribute size (not maxaligned) */ + size += SpGistGetTypeSize(&state->attLeafType, datum[0]); + + if (state->includeTupdesc) + { + Assert(state->includeTupdesc->natts); + if (state->includeTupdesc->natts + 1 >= INDEX_MAX_KEYS) + ereport(ERROR, + (errcode(ERRCODE_TOO_MANY_COLUMNS), + errmsg("number of index columns (%d) exceeds limit (%d)", + state->includeTupdesc->natts, INDEX_MAX_KEYS))); + + include_offset = size; + + for (i = 1; i <= state->includeTupdesc->natts; i++) + { + if (isnull[i]) + { + nullmask_size = (state->includeTupdesc->natts / 8) + 1; + size += nullmask_size; + break; + } + } + + /* + * Alignment of all included attributes is counted inside data_size. + * data_offset itself is not aligned. + */ + data_size = spgIncludedDataSize(state->includeTupdesc, datum + 1, isnull + 1, size); + data_offset = size; + + size += data_size; + } /* * Ensure that we can replace the tuple with a dead tuple later. This - * test is unnecessary when !isnull, but let's be safe. + * test is unnecessary when !isnull[0], but let's be safe. */ if (size < SGDTSIZE) size = SGDTSIZE; /* OK, form the tuple */ - tup = (SpGistLeafTuple) palloc0(size); + tup = (SpGistLeafTuple) palloc0(MAXALIGN(size)); - tup->size = size; - tup->nextOffset = InvalidOffsetNumber; + tup->size = MAXALIGN(size); + SGLT_SET_OFFSET(tup->nextOffset, InvalidOffsetNumber); tup->heapPtr = *heapPtr; - if (!isnull) - memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum); + if (!isnull[0]) + memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum[0]); + + /* Add included columns data to leaf tuple if any. */ + if (state->includeTupdesc) + { + /* + * The start of include attributes tuple is not aligned by default. + * All values alignment should be done by heap_fill_tuple + * automaticaly. If there is a nulls mask it is included just after + * key attribute data and it should not be aligned. + */ + heap_fill_tuple(state->includeTupdesc, datum + 1, isnull + 1, + (char *) tup + data_offset, + data_size, &tupmask, + (nullmask_size ? (bits8 *) tup + include_offset : NULL)); + + if (nullmask_size) + SGLT_SET_CONTAINSNULLMASK(tup->nextOffset, 1); + + /* + * We do this because heap_fill_tuple wants to initialize a "tupmask" + * which is used for HeapTuples, but the only relevant info is the + * "has variable attributes" field. We have already set the hasnull + * bit above. + */ + if (tupmask & HEAP_HASVARWIDTH) + SGLT_SET_CONTAINSVARATT(tup->nextOffset, 1); + } return tup; } @@ -688,10 +890,10 @@ spgFormNodeTuple(SpGistState *state, Datum label, bool isnull) unsigned int size; unsigned short infomask = 0; - /* compute space needed (note result is already maxaligned) */ + /* compute space needed */ size = SGNTHDRSZ; if (!isnull) - size += SpGistGetTypeSize(&state->attLabelType, label); + size += MAXALIGN(SpGistGetTypeSize(&state->attLabelType, label)); /* * Here we make sure that the size will fit in the field reserved for it @@ -735,7 +937,7 @@ spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, /* Compute size needed */ if (hasPrefix) - prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix); + prefixSize = MAXALIGN(SpGistGetTypeSize(&state->attPrefixType, prefix)); else prefixSize = 0; @@ -1046,3 +1248,133 @@ spgproperty(Oid index_oid, int attno, return true; } + +/* + * Convert an SpGist tuple into palloc'd Datum/isnull arrays. + * + */ +void +SpGistDeformLeafTuple(SpGistLeafTuple tup, SpGistState *state, Datum *datum, bool *isnull, + bool key_isnull) +{ + unsigned int include_offset; /* offset of include data */ + int off; + bits8 *nullmask_ptr = NULL; /* ptr to null bitmap in tuple */ + char *tp; + bool slow = false; /* can we use/set attcacheoff? */ + int i; + + if (key_isnull) + { + datum[0] = (Datum) 0; + isnull[0] = true; + } + else + { + datum[0] = SGLTDATUM(tup, state); + isnull[0] = false; + } + + if (state->includeTupdesc) + { + Assert(state->includeTupdesc->natts); + if (state->includeTupdesc->natts + 1 >= INDEX_MAX_KEYS) + ereport(ERROR, + (errcode(ERRCODE_TOO_MANY_COLUMNS), + errmsg("number of index columns (%d) exceeds limit (%d)", + state->includeTupdesc->natts, INDEX_MAX_KEYS))); + + include_offset = key_isnull ? SGLTHDRSZ : SGLTHDRSZ + SpGistGetTypeSize(&state->attLeafType, datum[0]); + + tp = (char *) tup; + off = include_offset; + + if (SGLT_GET_CONTAINSNULLMASK(tup->nextOffset)) + { + nullmask_ptr = (bits8 *) tp + include_offset; + off += (state->includeTupdesc->natts) / 8 + 1; + } + + if (state->attLeafType.attlen > 0 && !SGLT_GET_CONTAINSVARATT(tup->nextOffset) && + !SGLT_GET_CONTAINSNULLMASK(tup->nextOffset)) + /* can use attcacheoff for all attributes */ + { + for (i = 1; i <= state->includeTupdesc->natts; i++) + { + Form_pg_attribute thisatt = TupleDescAttr(state->includeTupdesc, i - 1); + + isnull[i] = false; + if (thisatt->attcacheoff >= 0) + off = thisatt->attcacheoff; + else + { + off = att_align_nominal(off, thisatt->attalign); + thisatt->attcacheoff = off; + } + datum[i] = fetchatt(thisatt, tp + off); + off = att_addlength_pointer(off, thisatt->attlen, tp + off); + } + } + else + + /* + * general case: can use cache until first null or varlen + * attribute + */ + { + if (state->attLeafType.attlen <= 0) + slow = true; /* can't use attcacheoff at all */ + + for (i = 1; i <= state->includeTupdesc->natts; i++) + { + Form_pg_attribute thisatt = TupleDescAttr(state->includeTupdesc, i - 1); + + if (SGLT_GET_CONTAINSNULLMASK(tup->nextOffset)) + { + if (att_isnull(i - 1, nullmask_ptr)) + { + datum[i] = (Datum) 0; + isnull[i] = true; + slow = true; /* can't use attcacheoff anymore */ + continue; + } + } + + isnull[i] = false; + + if (!slow && thisatt->attcacheoff >= 0) + off = thisatt->attcacheoff; + else if (thisatt->attlen == -1) + { + /* + * We can only cache the offset for a varlena attribute if + * the offset is already suitably aligned, so that there + * would be no pad bytes in any case: then the offset will + * be valid for either an aligned or unaligned value. + */ + if (!slow && off == att_align_nominal(off, thisatt->attalign)) + thisatt->attcacheoff = off; + else + { + off = att_align_pointer(off, thisatt->attalign, -1, tp + off); + slow = true; + } + } + else + { + /* not varlena, so safe to use att_align_nominal */ + off = att_align_nominal(off, thisatt->attalign); + + if (!slow) + thisatt->attcacheoff = off; + } + + datum[i] = fetchatt(thisatt, tp + off); + off = att_addlength_pointer(off, thisatt->attlen, tp + off); + + if (thisatt->attlen <= 0) + slow = true; /* can't use attcacheoff anymore */ + } + } + } +} diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index bd98707f3c..a0d76901fc 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -168,23 +168,28 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, } /* Form predecessor map, too */ - if (lt->nextOffset != InvalidOffsetNumber) + if (SGLT_GET_OFFSET(lt->nextOffset) != InvalidOffsetNumber) { /* paranoia about corrupted chain links */ - if (lt->nextOffset < FirstOffsetNumber || - lt->nextOffset > max || - predecessor[lt->nextOffset] != InvalidOffsetNumber) + if (SGLT_GET_OFFSET(lt->nextOffset) < FirstOffsetNumber || + SGLT_GET_OFFSET(lt->nextOffset) > max || + predecessor[SGLT_GET_OFFSET(lt->nextOffset)] != InvalidOffsetNumber) elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"", BufferGetBlockNumber(buffer), RelationGetRelationName(index)); - predecessor[lt->nextOffset] = i; + predecessor[SGLT_GET_OFFSET(lt->nextOffset)] = i; } } else if (lt->tupstate == SPGIST_REDIRECT) { SpGistDeadTuple dt = (SpGistDeadTuple) lt; - Assert(dt->nextOffset == InvalidOffsetNumber); + /* + * Dead tuple nextOffset is allowed to have any values of two + * highest bits in case it is inherited from SpGistLeafTuple where + * these bits has their own meaning. + */ + Assert(SGLT_GET_OFFSET(dt->nextOffset) == InvalidOffsetNumber); Assert(ItemPointerIsValid(&dt->pointer)); /* @@ -201,7 +206,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, } else { - Assert(lt->nextOffset == InvalidOffsetNumber); + Assert(SGLT_GET_OFFSET(lt->nextOffset) == InvalidOffsetNumber); } } @@ -250,7 +255,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, prevLive = deletable[i] ? InvalidOffsetNumber : i; /* scan down the chain ... */ - j = head->nextOffset; + j = SGLT_GET_OFFSET(head->nextOffset); while (j != InvalidOffsetNumber) { SpGistLeafTuple lt; @@ -301,7 +306,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, interveningDeletable = false; } - j = lt->nextOffset; + j = SGLT_GET_OFFSET(lt->nextOffset); } if (prevLive == InvalidOffsetNumber) @@ -366,7 +371,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, lt = (SpGistLeafTuple) PageGetItem(page, PageGetItemId(page, chainSrc[i])); Assert(lt->tupstate == SPGIST_LIVE); - lt->nextOffset = chainDest[i]; + SGLT_SET_OFFSET(lt->nextOffset, chainDest[i]); } MarkBufferDirty(buffer); diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index 7be2291d07..4022e3af07 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -122,8 +122,8 @@ spgRedoAddLeaf(XLogReaderState *record) head = (SpGistLeafTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumHeadLeaf)); - Assert(head->nextOffset == leafTupleHdr.nextOffset); - head->nextOffset = xldata->offnumLeaf; + Assert(SGLT_GET_OFFSET(head->nextOffset) == SGLT_GET_OFFSET(leafTupleHdr.nextOffset)); + SGLT_SET_OFFSET(head->nextOffset, xldata->offnumLeaf); } } else @@ -822,7 +822,7 @@ spgRedoVacuumLeaf(XLogReaderState *record) lt = (SpGistLeafTuple) PageGetItem(page, PageGetItemId(page, chainSrc[i])); Assert(lt->tupstate == SPGIST_LIVE); - lt->nextOffset = chainDest[i]; + SGLT_SET_OFFSET(lt->nextOffset, chainDest[i]); } PageSetLSN(page, lsn); diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h index 00b98ec6a0..8d03adb8f5 100644 --- a/src/include/access/spgist_private.h +++ b/src/include/access/spgist_private.h @@ -141,6 +141,7 @@ typedef struct SpGistState SpGistTypeDesc attLeafType; /* type of leaf-tuple values */ SpGistTypeDesc attPrefixType; /* type of inner-tuple prefix values */ SpGistTypeDesc attLabelType; /* type of node label values */ + TupleDesc includeTupdesc; /* tuple descriptor of included columns */ char *deadTupleStorage; /* workspace for spgFormDeadTuple */ @@ -148,6 +149,98 @@ typedef struct SpGistState bool isBuild; /* true if doing index build */ } SpGistState; +/* + * SPGiST leaf tuple: carries a datum and a heap tuple TID + * + * In the simplest case, the datum is the same as the indexed value; but + * it could also be a suffix or some other sort of delta that permits + * reconstruction given knowledge of the prefix path traversed to get here. + * + * The size field is wider than could possibly be needed for an on-disk leaf + * tuple, but this allows us to form leaf tuples even when the datum is too + * wide to be stored immediately, and it costs nothing because of alignment + * considerations. + * + * Normally, nextOffset links to the next tuple belonging to the same parent + * node (which must be on the same page). But when the root page is a leaf + * page, we don't chain its tuples, so nextOffset is always 0 on the root. + * + * size must be a multiple of MAXALIGN; also, it must be at least SGDTSIZE + * so that the tuple can be converted to REDIRECT status later. (This + * restriction only adds bytes for the null-datum case, otherwise alignment + * restrictions force it anyway.) + * + * In a leaf tuple for a NULL indexed value, there's no useful datum value; + * however, the SGDTSIZE limit ensures that's there's a Datum word there + * anyway, so SGLTDATUM can be applied safely as long as you don't do + * anything with the result. + * + * Minimum space to store SpGistLeafTuple on a page is 12 bytes tuple header + * and 4 bytes ItemIdData so 14 lower bits of nextOffset (accessed as + * SGLT_GET/SET_OFFSET) is enough to store actual tuple number on a page even + * if page size is 64Kb. Two higher bits are to store per-tuple + * information is there nulls mask exist and is there any included attribute + * of variable length type. + */ + +typedef struct SpGistLeafTupleData +{ + unsigned int tupstate:2, /* LIVE/REDIRECT/DEAD/PLACEHOLDER */ + size:30; /* large enough for any palloc'able value */ + OffsetNumber nextOffset; /* higher 1 bit = 1 if included values has + * nulls, 2 bit = 1 if included values contain + * variable length values, lower 15 bits - is + * "actual" nextOffset i.e. number of next + * tuple in chain on a page, or + * InvalidOffsetNumber. They SHOULD NOT be + * set/read directly, + * SGLT_SET_XXX/SGLT_GET_XXX macros must be + * used instead. */ + ItemPointerData heapPtr; /* TID of represented heap tuple */ + /* leaf datum follows */ + + /* + * if SGLT_GET_CONTAINSNULLMASK nullmask follows. Its size (number of + * included columns/8)+1 + */ + /* include attributes follow if any */ +} SpGistLeafTupleData; + +typedef SpGistLeafTupleData *SpGistLeafTuple; + +#define SGLTHDRSZ MAXALIGN(sizeof(SpGistLeafTupleData)) +#define SGLTDATAPTR(x) (((char *) (x)) + SGLTHDRSZ) +#define SGLTDATUM(x, s) ((s)->attLeafType.attbyval ? \ + *(Datum *) SGLTDATAPTR(x) : \ + PointerGetDatum(SGLTDATAPTR(x))) +/* + * Accessor macros to get and set actual 14-bit offset and two bit flags from/to + * nextOffset value. + */ +#define SGLT_GET_OFFSET(x) ( (x) & 0x3FFF ) +#define SGLT_GET_CONTAINSNULLMASK(x) ( (x) >> 15 ) +#define SGLT_GET_CONTAINSVARATT(x) ( ( (x) & 4000 ) >> 14 ) +#define SGLT_SET_OFFSET(x,o) ( (x) = ( (x) & 0xC000 ) | ( (o) & 0x3FFF) ) +#define SGLT_SET_CONTAINSNULLMASK(x,n) ( (x) = ( (n) << 15 ) | ( (x) & 0x3FFF ) ) +#define SGLT_SET_CONTAINSVARATT(x,v) ( (x) = ( (v) << 14 ) | ( (x) & 0xBFFF ) ) + +#define SGLT_GET_INCLUDE_TUPSIZE(x) SGLT_GET_OFFSET(x) +#define SGLT_SET_INCLUDE_TUPSIZE(x,o) SGLT_SET_OFFSET(x,o) + +extern char *SpGistFormIncludeTuple(TupleDesc tupleDescriptor, Datum *values, + bool *isnull, uint16 *tupdata); + +/* + * SPGiST dead tuple: declaration for examining non-live tuples + * + * The tupstate field of this struct must match those of regular inner and + * leaf tuples, and its size field must match a leaf tuple's. + * Also, the pointer field must be in the same place as a leaf tuple's heapPtr + * field, to satisfy some Asserts that we make when replacing a leaf tuple + * with a dead tuple. + * We don't use nextOffset, but it's needed to align the pointer field. + */ + typedef struct SpGistSearchItem { pairingheap_node phNode; /* pairing heap node */ @@ -160,14 +253,14 @@ typedef struct SpGistSearchItem bool isLeaf; /* SearchItem is heap item */ bool recheck; /* qual recheck is needed */ bool recheckDistances; /* distance recheck is needed */ - + SpGistLeafTuple leafTuple; /* array with numberOfOrderBys entries */ double distances[FLEXIBLE_ARRAY_MEMBER]; + /* if there are include columns SpGistLeafTupleData follow */ } SpGistSearchItem; #define SizeOfSpGistSearchItem(n_distances) \ (offsetof(SpGistSearchItem, distances) + sizeof(double) * (n_distances)) - /* * Private state of an index scan */ @@ -241,6 +334,7 @@ typedef struct SpGistCache SpGistTypeDesc attLeafType; /* type of leaf-tuple values */ SpGistTypeDesc attPrefixType; /* type of inner-tuple prefix values */ SpGistTypeDesc attLabelType; /* type of node label values */ + TupleDesc includeTupdesc; SpGistLUPCache lastUsedPages; /* local storage of last-used info */ } SpGistCache; @@ -321,60 +415,6 @@ typedef SpGistNodeTupleData *SpGistNodeTuple; *(Datum *) SGNTDATAPTR(x) : \ PointerGetDatum(SGNTDATAPTR(x))) -/* - * SPGiST leaf tuple: carries a datum and a heap tuple TID - * - * In the simplest case, the datum is the same as the indexed value; but - * it could also be a suffix or some other sort of delta that permits - * reconstruction given knowledge of the prefix path traversed to get here. - * - * The size field is wider than could possibly be needed for an on-disk leaf - * tuple, but this allows us to form leaf tuples even when the datum is too - * wide to be stored immediately, and it costs nothing because of alignment - * considerations. - * - * Normally, nextOffset links to the next tuple belonging to the same parent - * node (which must be on the same page). But when the root page is a leaf - * page, we don't chain its tuples, so nextOffset is always 0 on the root. - * - * size must be a multiple of MAXALIGN; also, it must be at least SGDTSIZE - * so that the tuple can be converted to REDIRECT status later. (This - * restriction only adds bytes for the null-datum case, otherwise alignment - * restrictions force it anyway.) - * - * In a leaf tuple for a NULL indexed value, there's no useful datum value; - * however, the SGDTSIZE limit ensures that's there's a Datum word there - * anyway, so SGLTDATUM can be applied safely as long as you don't do - * anything with the result. - */ -typedef struct SpGistLeafTupleData -{ - unsigned int tupstate:2, /* LIVE/REDIRECT/DEAD/PLACEHOLDER */ - size:30; /* large enough for any palloc'able value */ - OffsetNumber nextOffset; /* next tuple in chain, or InvalidOffsetNumber */ - ItemPointerData heapPtr; /* TID of represented heap tuple */ - /* leaf datum follows */ -} SpGistLeafTupleData; - -typedef SpGistLeafTupleData *SpGistLeafTuple; - -#define SGLTHDRSZ MAXALIGN(sizeof(SpGistLeafTupleData)) -#define SGLTDATAPTR(x) (((char *) (x)) + SGLTHDRSZ) -#define SGLTDATUM(x, s) ((s)->attLeafType.attbyval ? \ - *(Datum *) SGLTDATAPTR(x) : \ - PointerGetDatum(SGLTDATAPTR(x))) - -/* - * SPGiST dead tuple: declaration for examining non-live tuples - * - * The tupstate field of this struct must match those of regular inner and - * leaf tuples, and its size field must match a leaf tuple's. - * Also, the pointer field must be in the same place as a leaf tuple's heapPtr - * field, to satisfy some Asserts that we make when replacing a leaf tuple - * with a dead tuple. - * We don't use nextOffset, but it's needed to align the pointer field. - * pointer and xid are only valid when tupstate = REDIRECT. - */ typedef struct SpGistDeadTupleData { unsigned int tupstate:2, /* LIVE/REDIRECT/DEAD/PLACEHOLDER */ @@ -394,7 +434,6 @@ typedef SpGistDeadTupleData *SpGistDeadTuple; * size plus sizeof(ItemIdData) (for the line pointer). This works correctly * so long as tuple sizes are always maxaligned. */ - /* Page capacity after allowing for fixed header and special space */ #define SPGIST_PAGE_CAPACITY \ MAXALIGN_DOWN(BLCKSZ - \ @@ -456,9 +495,10 @@ extern void SpGistInitPage(Page page, uint16 f); extern void SpGistInitBuffer(Buffer b, uint16 f); extern void SpGistInitMetapage(Page page); extern unsigned int SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum); +extern unsigned int SpgLeafSize(SpGistState *state, Datum *datum, bool *isnull); extern SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, - Datum datum, bool isnull); + Datum *datum, bool *isnull); extern SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull); extern SpGistInnerTuple spgFormInnerTuple(SpGistState *state, @@ -466,6 +506,8 @@ extern SpGistInnerTuple spgFormInnerTuple(SpGistState *state, int nNodes, SpGistNodeTuple *nodes); extern SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum); +extern void SpGistDeformLeafTuple(SpGistLeafTuple tup, SpGistState *state, + Datum *datum, bool *isnull, bool key_value_isnull); extern Datum *spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple); extern OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, @@ -484,7 +526,7 @@ extern void spgPageIndexMultiDelete(SpGistState *state, Page page, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum); extern bool spgdoinsert(Relation index, SpGistState *state, - ItemPointer heapPtr, Datum datum, bool isnull); + ItemPointer heapPtr, Datum *datum, bool *isnull); /* spgproc.c */ extern double *spg_key_orderbys_distances(Datum key, bool isLeaf, diff --git a/src/test/regress/expected/amutils.out b/src/test/regress/expected/amutils.out index d92a6d12c6..93e6a43b6d 100644 --- a/src/test/regress/expected/amutils.out +++ b/src/test/regress/expected/amutils.out @@ -169,9 +169,9 @@ select amname, prop, pg_indexam_has_property(a.oid, prop) as p hash | bogus | spgist | can_order | f spgist | can_unique | f - spgist | can_multi_col | f + spgist | can_multi_col | t spgist | can_exclude | t - spgist | can_include | f + spgist | can_include | t spgist | bogus | (36 rows) diff --git a/src/test/regress/expected/index_including.out b/src/test/regress/expected/index_including.out index 8e5d53e712..4fd2b7e878 100644 --- a/src/test/regress/expected/index_including.out +++ b/src/test/regress/expected/index_including.out @@ -356,7 +356,6 @@ CREATE INDEX on tbl USING brin(c1, c2) INCLUDE (c3, c4); ERROR: access method "brin" does not support included columns CREATE INDEX on tbl USING gist(c3) INCLUDE (c1, c4); CREATE INDEX on tbl USING spgist(c3) INCLUDE (c4); -ERROR: access method "spgist" does not support included columns CREATE INDEX on tbl USING gin(c1, c2) INCLUDE (c3, c4); ERROR: access method "gin" does not support included columns CREATE INDEX on tbl USING hash(c1, c2) INCLUDE (c3, c4); diff --git a/src/test/regress/expected/index_including_spgist.out b/src/test/regress/expected/index_including_spgist.out new file mode 100644 index 0000000000..fa64766fb7 --- /dev/null +++ b/src/test/regress/expected/index_including_spgist.out @@ -0,0 +1,139 @@ +/* + * 1.1. test CREATE INDEX with buffered build + */ +-- Regular index with included columns +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +-- size is chosen to exceed page size and trigger actual truncation +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,8000) AS x; +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3); +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl_spgist'::regclass ORDER BY c.relname; + pg_get_indexdef +----------------------------------------------------------------------------------------- + CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c2, c3) +(1 row) + +SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10)); + c1 | c2 | c3 | c4 +----+----+----+------------- + 1 | 2 | 3 | (2,3),(1,2) + 2 | 4 | 6 | (4,5),(2,3) + 3 | 6 | 9 | (6,7),(3,4) + 4 | 8 | 12 | (8,9),(4,5) +(4 rows) + +SET enable_bitmapscan TO off; +EXPLAIN (costs off) SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10)); + QUERY PLAN +---------------------------------------------------- + Index Only Scan using tbl_spgist_idx on tbl_spgist + Index Cond: (c4 <@ '(10,10),(1,1)'::box) +(2 rows) + +SET enable_bitmapscan TO default; +DROP TABLE tbl_spgist; +/* + * 1.2. test CREATE INDEX with inserts + */ +-- Regular index with included columns +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +-- size is chosen to exceed page size and trigger actual truncation +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,8000) AS x; +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl_spgist'::regclass ORDER BY c.relname; + pg_get_indexdef +----------------------------------------------------------------------------------------- + CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c2, c3) +(1 row) + +SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10)); + c1 | c2 | c3 | c4 +----+----+----+------------- + 1 | 2 | 3 | (2,3),(1,2) + 2 | 4 | 6 | (4,5),(2,3) + 3 | 6 | 9 | (6,7),(3,4) + 4 | 8 | 12 | (8,9),(4,5) +(4 rows) + +SET enable_bitmapscan TO off; +EXPLAIN (costs off) SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10)); + QUERY PLAN +---------------------------------------------------- + Index Only Scan using tbl_spgist_idx on tbl_spgist + Index Cond: (c4 <@ '(10,10),(1,1)'::box) +(2 rows) + +SET enable_bitmapscan TO default; +DROP TABLE tbl_spgist; +/* + * 2. CREATE INDEX CONCURRENTLY + */ +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x; +CREATE INDEX CONCURRENTLY tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname; + indexdef +----------------------------------------------------------------------------------------- + CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c2, c3) +(1 row) + +DROP TABLE tbl_spgist; +/* + * 3. REINDEX + */ +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x; +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname; + indexdef +------------------------------------------------------------------------------------- + CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c3) +(1 row) + +REINDEX INDEX tbl_spgist_idx; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname; + indexdef +------------------------------------------------------------------------------------- + CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c3) +(1 row) + +ALTER TABLE tbl_spgist DROP COLUMN c1; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname; + indexdef +---------- +(0 rows) + +DROP TABLE tbl_spgist; +/* + * 4. Update, delete values in indexed table. + */ +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x; +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3); +UPDATE tbl_spgist SET c1 = 100 WHERE c1 = 2; +UPDATE tbl_spgist SET c1 = 1 WHERE c1 = 3; +DELETE FROM tbl_spgist WHERE c1 = 5 OR c3 = 12; +DROP TABLE tbl_spgist; +/* + * 5. Alter column type. + */ +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x; +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3); +ALTER TABLE tbl_spgist ALTER c1 TYPE bigint; +ALTER TABLE tbl_spgist ALTER c3 TYPE bigint; +\d tbl_spgist + Table "public.tbl_spgist" + Column | Type | Collation | Nullable | Default +--------+---------+-----------+----------+--------- + c1 | bigint | | | + c2 | integer | | | + c3 | bigint | | | + c4 | box | | | +Indexes: + "tbl_spgist_idx" spgist (c4) INCLUDE (c1, c3) + +DROP TABLE tbl_spgist; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 026ea880cd..985458a1a8 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -50,7 +50,7 @@ test: copy copyselect copydml insert insert_conflict # ---------- test: create_misc create_operator create_procedure # These depend on create_misc and create_operator -test: create_index create_index_spgist create_view index_including index_including_gist +test: create_index create_index_spgist create_view index_including index_including_gist index_including_spgist # ---------- # Another group of parallel tests diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 979d926119..f3df961535 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -68,6 +68,7 @@ test: create_index_spgist test: create_view test: index_including test: index_including_gist +test: index_including_spgist test: create_aggregate test: create_function_3 test: create_cast diff --git a/src/test/regress/sql/index_including_spgist.sql b/src/test/regress/sql/index_including_spgist.sql new file mode 100644 index 0000000000..a59e73aa22 --- /dev/null +++ b/src/test/regress/sql/index_including_spgist.sql @@ -0,0 +1,81 @@ +/* + * 1.1. test CREATE INDEX with buffered build + */ + +-- Regular index with included columns +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +-- size is chosen to exceed page size and trigger actual truncation +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,8000) AS x; +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3); +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl_spgist'::regclass ORDER BY c.relname; +SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10)); +SET enable_bitmapscan TO off; +EXPLAIN (costs off) SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10)); +SET enable_bitmapscan TO default; +DROP TABLE tbl_spgist; + +/* + * 1.2. test CREATE INDEX with inserts + */ + +-- Regular index with included columns +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +-- size is chosen to exceed page size and trigger actual truncation +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,8000) AS x; +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl_spgist'::regclass ORDER BY c.relname; +SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10)); +SET enable_bitmapscan TO off; +EXPLAIN (costs off) SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10)); +SET enable_bitmapscan TO default; +DROP TABLE tbl_spgist; + +/* + * 2. CREATE INDEX CONCURRENTLY + */ +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x; +CREATE INDEX CONCURRENTLY tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname; +DROP TABLE tbl_spgist; + + +/* + * 3. REINDEX + */ +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x; +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname; +REINDEX INDEX tbl_spgist_idx; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname; +ALTER TABLE tbl_spgist DROP COLUMN c1; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname; +DROP TABLE tbl_spgist; + +/* + * 4. Update, delete values in indexed table. + */ +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x; +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3); +UPDATE tbl_spgist SET c1 = 100 WHERE c1 = 2; +UPDATE tbl_spgist SET c1 = 1 WHERE c1 = 3; +DELETE FROM tbl_spgist WHERE c1 = 5 OR c3 = 12; +DROP TABLE tbl_spgist; + +/* + * 5. Alter column type. + */ +CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x; +CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3); +ALTER TABLE tbl_spgist ALTER c1 TYPE bigint; +ALTER TABLE tbl_spgist ALTER c3 TYPE bigint; +\d tbl_spgist +DROP TABLE tbl_spgist; +