diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out index dbcc6b08db..dfd49b937e 100644 --- a/contrib/dblink/expected/dblink.out +++ b/contrib/dblink/expected/dblink.out @@ -54,6 +54,61 @@ SELECT dblink_build_sql_delete('foo','1 2',2,'{"0", "a"}'); -- too many pk fields, should fail SELECT dblink_build_sql_delete('foo','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}'); ERROR: invalid attribute number 4 +-- repeat the test for table with primary key index with included columns +CREATE TABLE foo_1(f1 int, f2 text, f3 text[], primary key (f1,f2) include (f3)); +INSERT INTO foo_1 VALUES (0,'a','{"a0","b0","c0"}'); +INSERT INTO foo_1 VALUES (1,'b','{"a1","b1","c1"}'); +INSERT INTO foo_1 VALUES (2,'c','{"a2","b2","c2"}'); +INSERT INTO foo_1 VALUES (3,'d','{"a3","b3","c3"}'); +INSERT INTO foo_1 VALUES (4,'e','{"a4","b4","c4"}'); +INSERT INTO foo_1 VALUES (5,'f','{"a5","b5","c5"}'); +INSERT INTO foo_1 VALUES (6,'g','{"a6","b6","c6"}'); +INSERT INTO foo_1 VALUES (7,'h','{"a7","b7","c7"}'); +INSERT INTO foo_1 VALUES (8,'i','{"a8","b8","c8"}'); +INSERT INTO foo_1 VALUES (9,'j','{"a9","b9","c9"}'); +-- misc utilities +-- list the primary key fields +SELECT * +FROM dblink_get_pkey('foo_1'); + position | colname +----------+--------- + 1 | f1 + 2 | f2 +(2 rows) + +-- build an insert statement based on a local tuple, +-- replacing the primary key values with new ones +SELECT dblink_build_sql_insert('foo_1','1 2',2,'{"0", "a"}','{"99", "xyz"}'); + dblink_build_sql_insert +------------------------------------------------------------- + INSERT INTO foo_1(f1,f2,f3) VALUES('99','xyz','{a0,b0,c0}') +(1 row) + +-- too many pk fields, should fail +SELECT dblink_build_sql_insert('foo_1','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}','{"99", "xyz", "{za0,zb0,zc0}"}'); +ERROR: invalid attribute number 4 +-- build an update statement based on a local tuple, +-- replacing the primary key values with new ones +SELECT dblink_build_sql_update('foo_1','1 2',2,'{"0", "a"}','{"99", "xyz"}'); + dblink_build_sql_update +------------------------------------------------------------------------------------------ + UPDATE foo_1 SET f1 = '99', f2 = 'xyz', f3 = '{a0,b0,c0}' WHERE f1 = '99' AND f2 = 'xyz' +(1 row) + +-- too many pk fields, should fail +SELECT dblink_build_sql_update('foo_1','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}','{"99", "xyz", "{za0,zb0,zc0}"}'); +ERROR: invalid attribute number 4 +-- build a delete statement based on a local tuple, +SELECT dblink_build_sql_delete('foo_1','1 2',2,'{"0", "a"}'); + dblink_build_sql_delete +----------------------------------------------- + DELETE FROM foo_1 WHERE f1 = '0' AND f2 = 'a' +(1 row) + +-- too many pk fields, should fail +SELECT dblink_build_sql_delete('foo_1','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}'); +ERROR: invalid attribute number 4 +DROP TABLE foo_1; -- retest using a quoted and schema qualified table CREATE SCHEMA "MySchema"; CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key (f1,f2)); diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql index b093fa6722..3e96b98571 100644 --- a/contrib/dblink/sql/dblink.sql +++ b/contrib/dblink/sql/dblink.sql @@ -38,6 +38,44 @@ SELECT dblink_build_sql_delete('foo','1 2',2,'{"0", "a"}'); -- too many pk fields, should fail SELECT dblink_build_sql_delete('foo','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}'); +-- repeat the test for table with primary key index with included columns +CREATE TABLE foo_1(f1 int, f2 text, f3 text[], primary key (f1,f2) include (f3)); +INSERT INTO foo_1 VALUES (0,'a','{"a0","b0","c0"}'); +INSERT INTO foo_1 VALUES (1,'b','{"a1","b1","c1"}'); +INSERT INTO foo_1 VALUES (2,'c','{"a2","b2","c2"}'); +INSERT INTO foo_1 VALUES (3,'d','{"a3","b3","c3"}'); +INSERT INTO foo_1 VALUES (4,'e','{"a4","b4","c4"}'); +INSERT INTO foo_1 VALUES (5,'f','{"a5","b5","c5"}'); +INSERT INTO foo_1 VALUES (6,'g','{"a6","b6","c6"}'); +INSERT INTO foo_1 VALUES (7,'h','{"a7","b7","c7"}'); +INSERT INTO foo_1 VALUES (8,'i','{"a8","b8","c8"}'); +INSERT INTO foo_1 VALUES (9,'j','{"a9","b9","c9"}'); + +-- misc utilities + +-- list the primary key fields +SELECT * +FROM dblink_get_pkey('foo_1'); + +-- build an insert statement based on a local tuple, +-- replacing the primary key values with new ones +SELECT dblink_build_sql_insert('foo_1','1 2',2,'{"0", "a"}','{"99", "xyz"}'); +-- too many pk fields, should fail +SELECT dblink_build_sql_insert('foo_1','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}','{"99", "xyz", "{za0,zb0,zc0}"}'); + +-- build an update statement based on a local tuple, +-- replacing the primary key values with new ones +SELECT dblink_build_sql_update('foo_1','1 2',2,'{"0", "a"}','{"99", "xyz"}'); +-- too many pk fields, should fail +SELECT dblink_build_sql_update('foo_1','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}','{"99", "xyz", "{za0,zb0,zc0}"}'); + +-- build a delete statement based on a local tuple, +SELECT dblink_build_sql_delete('foo_1','1 2',2,'{"0", "a"}'); +-- too many pk fields, should fail +SELECT dblink_build_sql_delete('foo_1','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}'); + +DROP TABLE foo_1; + -- retest using a quoted and schema qualified table CREATE SCHEMA "MySchema"; CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key (f1,f2)); diff --git a/doc/src/sgml/btree.sgml b/doc/src/sgml/btree.sgml index 10abf90189..ca81fbbc84 100644 --- a/doc/src/sgml/btree.sgml +++ b/doc/src/sgml/btree.sgml @@ -433,6 +433,23 @@ returns bool + + Included attributes in B-tree indexes + + + As of PostgreSQL 11.0 there is an optional + INCLUDE clause, which allows to add non-key (included) attributes to index. + Those included attributes allow more queries to benefit from index-only scans. + We never use included attributes in ScanKeys for search. That allows us to + include into B-tree any datatypes, even those which don't have suitable + operator classes. Included columns only stored in regular tuples on leaf + pages. All pivot tuples on non-leaf pages and highkey tuples are truncated + to contain only key attributes. That helps to slightly reduce the size of + index. + + + + Implementation diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c index f7103e53bc..a58bd95620 100644 --- a/src/backend/access/common/indextuple.c +++ b/src/backend/access/common/indextuple.c @@ -19,6 +19,7 @@ #include "access/heapam.h" #include "access/itup.h" #include "access/tuptoaster.h" +#include "utils/rel.h" /* ---------------------------------------------------------------- @@ -445,3 +446,32 @@ CopyIndexTuple(IndexTuple source) memcpy(result, source, size); return result; } + +/* + * Reform index tuple. Truncate nonkey (INCLUDE) attributes. + * Pass the number of attributes the truncated tuple must contain. + */ +IndexTuple +index_truncate_tuple(Relation idxrel, IndexTuple olditup, int new_indnatts) +{ + TupleDesc itupdesc = CreateTupleDescCopyConstr(RelationGetDescr(idxrel)); + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + IndexTuple newitup; + int indnatts = IndexRelationGetNumberOfAttributes(idxrel); + + Assert(indnatts <= INDEX_MAX_KEYS); + Assert(new_indnatts > 0); + Assert(new_indnatts < indnatts); + + index_deform_tuple(olditup, itupdesc, values, isnull); + + /* form new tuple that will contain only key attributes */ + itupdesc->natts = new_indnatts; + newitup = index_form_tuple(itupdesc, values, isnull); + newitup->t_tid = olditup->t_tid; + + FreeTupleDesc(itupdesc); + Assert(IndexTupleSize(newitup) <= IndexTupleSize(olditup)); + return newitup; +} diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index d7279248e7..df9874cd5c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8019,7 +8019,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool * TupleDesc desc = RelationGetDescr(relation); Oid replidindex; Relation idx_rel; - TupleDesc idx_desc; char replident = relation->rd_rel->relreplident; HeapTuple key_tuple = NULL; bool nulls[MaxHeapAttributeNumber]; @@ -8062,7 +8061,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool * } idx_rel = RelationIdGetRelation(replidindex); - idx_desc = RelationGetDescr(idx_rel); /* deform tuple, so we have fast access to columns */ heap_deform_tuple(tp, desc, values, nulls); @@ -8074,7 +8072,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool * * Now set all columns contained in the index to NOT NULL, they cannot * currently be NULL. */ - for (natt = 0; natt < idx_desc->natts; natt++) + for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++) { int attno = idx_rel->rd_index->indkey.values[natt]; diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 40111990c5..3a96824fbe 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -80,8 +80,6 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page, static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstoldonright, bool newitemonleft, int dataitemstoleft, Size firstoldonrightsz); -static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, - OffsetNumber itup_off); static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel); @@ -109,13 +107,17 @@ _bt_doinsert(Relation rel, IndexTuple itup, IndexUniqueCheck checkUnique, Relation heapRel) { bool is_unique = false; - int natts = rel->rd_rel->relnatts; + int indnkeyatts; ScanKey itup_scankey; BTStack stack = NULL; Buffer buf; OffsetNumber offset; bool fastpath; + Assert(IndexRelationGetNumberOfAttributes(rel) != 0); + indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); + Assert(indnkeyatts != 0); + /* we need an insertion scan key to do our search, so build one */ itup_scankey = _bt_mkscankey(rel, itup); @@ -177,7 +179,7 @@ top: !P_IGNORE(lpageop) && (PageGetFreeSpace(page) > itemsz) && PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) && - _bt_compare(rel, natts, itup_scankey, page, + _bt_compare(rel, indnkeyatts, itup_scankey, page, P_FIRSTDATAKEY(lpageop)) > 0) { fastpath = true; @@ -209,7 +211,7 @@ top: if (!fastpath) { /* find the first page containing this key */ - stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, + stack = _bt_search(rel, indnkeyatts, itup_scankey, false, &buf, BT_WRITE, NULL); /* trade in our read lock for a write lock */ @@ -223,7 +225,7 @@ top: * need to move right in the tree. See Lehman and Yao for an * excruciatingly precise description. */ - buf = _bt_moveright(rel, buf, natts, itup_scankey, false, + buf = _bt_moveright(rel, buf, indnkeyatts, itup_scankey, false, true, stack, BT_WRITE, NULL); } @@ -253,7 +255,7 @@ top: TransactionId xwait; uint32 speculativeToken; - offset = _bt_binsrch(rel, buf, natts, itup_scankey, false); + offset = _bt_binsrch(rel, buf, indnkeyatts, itup_scankey, false); xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey, checkUnique, &is_unique, &speculativeToken); @@ -290,7 +292,7 @@ top: */ CheckForSerializableConflictIn(rel, NULL, buf); /* do the insertion */ - _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, + _bt_findinsertloc(rel, &buf, &offset, indnkeyatts, itup_scankey, itup, stack, heapRel); _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false); } @@ -334,7 +336,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, uint32 *speculativeToken) { TupleDesc itupdesc = RelationGetDescr(rel); - int natts = rel->rd_rel->relnatts; + int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); SnapshotData SnapshotDirty; OffsetNumber maxoff; Page page; @@ -393,7 +395,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, * in real comparison, but only for ordering/finding items on * pages. - vadim 03/24/97 */ - if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey)) + if (!_bt_isequal(itupdesc, page, offset, indnkeyatts, itup_scankey)) break; /* we're past all the equal tuples */ /* okay, we gotta fetch the heap tuple ... */ @@ -558,7 +560,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, if (P_RIGHTMOST(opaque)) break; if (!_bt_isequal(itupdesc, page, P_HIKEY, - natts, itup_scankey)) + indnkeyatts, itup_scankey)) break; /* Advance to next non-dead page --- there must be one */ for (;;) @@ -1081,6 +1083,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, OffsetNumber maxoff; OffsetNumber i; bool isleaf; + IndexTuple lefthikey; + int indnatts = IndexRelationGetNumberOfAttributes(rel); + int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); /* Acquire a new page to split into */ rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); @@ -1180,7 +1185,23 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, itemsz = ItemIdGetLength(itemid); item = (IndexTuple) PageGetItem(origpage, itemid); } - if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, + + /* + * We must truncate included attributes of the "high key" item, + * before insert it onto the leaf page. It's the only point in insertion + * process, where we perform truncation. All other functions work with + * this high key and do not change it. + */ + if (indnatts != indnkeyatts && P_ISLEAF(lopaque)) + { + lefthikey = index_truncate_tuple(rel, item, indnkeyatts); + itemsz = IndexTupleSize(lefthikey); + itemsz = MAXALIGN(itemsz); + } + else + lefthikey = item; + + if (PageAddItem(leftpage, (Item) lefthikey, itemsz, leftoff, false, false) == InvalidOffsetNumber) { memset(rightpage, 0, BufferGetPageSize(rbuf)); @@ -1397,20 +1418,18 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, if (newitemonleft) XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz)); - /* Log left page */ - if (!isleaf) - { - /* - * We must also log the left page's high key, because the right - * page's leftmost key is suppressed on non-leaf levels. Show it - * as belonging to the left page buffer, so that it is not stored - * if XLogInsert decides it needs a full-page image of the left - * page. - */ - itemid = PageGetItemId(origpage, P_HIKEY); - item = (IndexTuple) PageGetItem(origpage, itemid); - XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item))); - } + /* + * We must also log the left page's high key. There are two reasons + * for that: right page's leftmost key is suppressed on non-leaf levels, + * in covering indexes, included columns are truncated from high keys. + * For simplicity, we don't distinguish these cases, but log the high + * key every time. Show it as belonging to the left page buffer, so + * that it is not stored if XLogInsert decides it needs a full-page + * image of the left page. + */ + itemid = PageGetItemId(origpage, P_HIKEY); + item = (IndexTuple) PageGetItem(origpage, itemid); + XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item))); /* * Log the contents of the right page in the format understood by @@ -1659,6 +1678,11 @@ _bt_checksplitloc(FindSplitData *state, /* * The first item on the right page becomes the high key of the left page; * therefore it counts against left space as well as right space. + * When index has included attribues, then those attributes of left page + * high key will be truncate leaving that page with slightly more free + * space. However, that shouldn't affect our ability to find valid split + * location, because anyway split location should exists even without high + * key truncation. */ leftfree -= firstrightitemsz; @@ -2183,7 +2207,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * we insert the tuples in order, so that the given itup_off does * represent the final position of the tuple! */ -static bool +bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 92afe2de38..e6bfb18e7b 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -1256,8 +1256,9 @@ _bt_pagedel(Relation rel, Buffer buf) /* we need an insertion scan key for the search, so build one */ itup_scankey = _bt_mkscankey(rel, targetkey); /* find the leftmost leaf page containing this key */ - stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey, - false, &lbuf, BT_READ, NULL); + stack = _bt_search(rel, + IndexRelationGetNumberOfKeyAttributes(rel), + itup_scankey, false, &lbuf, BT_READ, NULL); /* don't need a pin on the page */ _bt_relbuf(rel, lbuf); diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index d80721802a..2dcef8a63c 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -117,7 +117,7 @@ bthandler(PG_FUNCTION_ARGS) amroutine->amclusterable = true; amroutine->ampredlocks = true; amroutine->amcanparallel = true; - amroutine->amcaninclude = false; + amroutine->amcaninclude = true; amroutine->amkeytype = InvalidOid; amroutine->ambuild = btbuild; diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 098e0ce1be..d19348a206 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -802,6 +802,9 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) OffsetNumber last_off; Size pgspc; Size itupsz; + BTPageOpaque pageop; + int indnatts = IndexRelationGetNumberOfAttributes(wstate->index); + int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(wstate->index); /* * This is a handy place to check for cancel interrupts during the btree @@ -856,6 +859,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) ItemId ii; ItemId hii; IndexTuple oitup; + IndexTuple keytup; + BTPageOpaque opageop = (BTPageOpaque) PageGetSpecialPointer(opage); /* Create new page of same level */ npage = _bt_blnewpage(state->btps_level); @@ -883,6 +888,27 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) ItemIdSetUnused(ii); /* redundant */ ((PageHeader) opage)->pd_lower -= sizeof(ItemIdData); + if (indnkeyatts != indnatts && P_ISLEAF(opageop)) + { + /* + * We truncate included attributes of high key here. + * Subsequent insertions assume that hikey is already truncated, + * and so they need not worry about it, when copying the high key + * into the parent page as a downlink. + * NOTE: It is not crucial for reliability in present, but maybe + * it will be that in the future. Now the purpose is just to save + * more space on inner pages of btree. + */ + keytup = index_truncate_tuple(wstate->index, oitup, indnkeyatts); + + /* delete "wrong" high key, insert keytup as P_HIKEY. */ + PageIndexTupleDelete(opage, P_HIKEY); + + if (!_bt_pgaddtup(opage, IndexTupleSize(keytup), keytup, P_HIKEY)) + elog(ERROR, "failed to rewrite compressed item in index \"%s\"", + RelationGetRelationName(wstate->index)); + } + /* * Link the old page into its parent, using its minimum key. If we * don't have a parent, we have to create one; this adds a new btree @@ -900,7 +926,11 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) * Save a copy of the minimum key for the new page. We have to copy * it off the old page, not the new one, in case we are not at leaf * level. + * Despite oitup is already initialized, it's important to get high + * key from the page, since we could have replaced it with truncated + * copy. See comment above. */ + oitup = (IndexTuple) PageGetItem(opage, PageGetItemId(opage, P_HIKEY)); state->btps_minkey = CopyIndexTuple(oitup); /* @@ -927,6 +957,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) last_off = P_FIRSTKEY; } + pageop = (BTPageOpaque) PageGetSpecialPointer(npage); /* * If the new item is the first for its page, stash a copy for later. Note * this will only happen for the first item on a level; on later pages, @@ -936,7 +967,15 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) if (last_off == P_HIKEY) { Assert(state->btps_minkey == NULL); - state->btps_minkey = CopyIndexTuple(itup); + /* + * Truncate included attributes of the tuple that we're going to insert + * into the parent page as a downlink + */ + if (indnkeyatts != indnatts && P_ISLEAF(pageop)) + state->btps_minkey = index_truncate_tuple(wstate->index, + itup, indnkeyatts); + else + state->btps_minkey = CopyIndexTuple(itup); } /* @@ -1029,7 +1068,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) bool load1; TupleDesc tupdes = RelationGetDescr(wstate->index); int i, - keysz = RelationGetNumberOfAttributes(wstate->index); + keysz = IndexRelationGetNumberOfKeyAttributes(wstate->index); ScanKey indexScanKey = NULL; SortSupport sortKeys; diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 752667c885..2fc5924bf0 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -63,17 +63,26 @@ _bt_mkscankey(Relation rel, IndexTuple itup) { ScanKey skey; TupleDesc itupdesc; - int natts; + int indnatts PG_USED_FOR_ASSERTS_ONLY; + int indnkeyatts; int16 *indoption; int i; itupdesc = RelationGetDescr(rel); - natts = RelationGetNumberOfAttributes(rel); + indnatts = IndexRelationGetNumberOfAttributes(rel); + indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); indoption = rel->rd_indoption; - skey = (ScanKey) palloc(natts * sizeof(ScanKeyData)); + Assert(indnkeyatts != 0); + Assert(indnkeyatts <= indnatts); - for (i = 0; i < natts; i++) + /* + * We'll execute search using ScanKey constructed on key columns. + * Non key (included) columns must be omitted. + */ + skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData)); + + for (i = 0; i < indnkeyatts; i++) { FmgrInfo *procinfo; Datum arg; @@ -115,16 +124,16 @@ ScanKey _bt_mkscankey_nodata(Relation rel) { ScanKey skey; - int natts; + int indnkeyatts; int16 *indoption; int i; - natts = RelationGetNumberOfAttributes(rel); + indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); indoption = rel->rd_indoption; - skey = (ScanKey) palloc(natts * sizeof(ScanKeyData)); + skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData)); - for (i = 0; i < natts; i++) + for (i = 0; i < indnkeyatts; i++) { FmgrInfo *procinfo; int flags; diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 233c3965d9..bbfe860e36 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -294,13 +294,11 @@ btree_xlog_split(bool onleft, XLogReaderState *record) } /* Extract left hikey and its size (assuming 16-bit alignment) */ - if (!isleaf) - { - left_hikey = (IndexTuple) datapos; - left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey)); - datapos += left_hikeysz; - datalen -= left_hikeysz; - } + left_hikey = (IndexTuple) datapos; + left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey)); + datapos += left_hikeysz; + datalen -= left_hikeysz; + Assert(datalen == 0); newlpage = PageGetTempPageCopySpecial(lpage); diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 2b0b1da763..053f8aa345 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -476,6 +476,8 @@ extern bool _bt_doinsert(Relation rel, IndexTuple itup, IndexUniqueCheck checkUnique, Relation heapRel); extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access); extern void _bt_finish_split(Relation rel, Buffer bbuf, BTStack stack); +extern bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, + OffsetNumber itup_off); /* * prototypes for functions in nbtpage.c diff --git a/src/test/isolation/specs/insert-conflict-do-nothing-2.spec b/src/test/isolation/specs/insert-conflict-do-nothing-2.spec index f1e5bde357..8a8ec94447 100644 --- a/src/test/isolation/specs/insert-conflict-do-nothing-2.spec +++ b/src/test/isolation/specs/insert-conflict-do-nothing-2.spec @@ -3,7 +3,7 @@ setup { - CREATE TABLE ints (key int primary key, val text); + CREATE TABLE ints (key int, val text, PRIMARY KEY (key) INCLUDE (val)); } teardown diff --git a/src/test/isolation/specs/insert-conflict-do-update-2.spec b/src/test/isolation/specs/insert-conflict-do-update-2.spec index cd7e3f42fe..f5b4f601b5 100644 --- a/src/test/isolation/specs/insert-conflict-do-update-2.spec +++ b/src/test/isolation/specs/insert-conflict-do-update-2.spec @@ -7,7 +7,7 @@ setup { CREATE TABLE upsert (key text not null, payload text); - CREATE UNIQUE INDEX ON upsert(lower(key)); + CREATE UNIQUE INDEX ON upsert(lower(key)) INCLUDE (payload); } teardown diff --git a/src/test/isolation/specs/lock-committed-keyupdate.spec b/src/test/isolation/specs/lock-committed-keyupdate.spec index 1630282d0f..3fb424af0e 100644 --- a/src/test/isolation/specs/lock-committed-keyupdate.spec +++ b/src/test/isolation/specs/lock-committed-keyupdate.spec @@ -8,7 +8,7 @@ setup { DROP TABLE IF EXISTS lcku_table; - CREATE TABLE lcku_table (id INTEGER PRIMARY KEY, value TEXT); + CREATE TABLE lcku_table (id INTEGER, value TEXT, PRIMARY KEY (id) INCLUDE (value)); INSERT INTO lcku_table VALUES (1, 'one'); INSERT INTO lcku_table VALUES (3, 'two'); } diff --git a/src/test/isolation/specs/lock-update-traversal.spec b/src/test/isolation/specs/lock-update-traversal.spec index 7042b9399c..2ffe87d152 100644 --- a/src/test/isolation/specs/lock-update-traversal.spec +++ b/src/test/isolation/specs/lock-update-traversal.spec @@ -7,8 +7,9 @@ setup { CREATE TABLE foo ( - key int PRIMARY KEY, - value int + key int, + value int, + PRIMARY KEY (key) INCLUDE (value) ); INSERT INTO foo VALUES (1, 1); diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out index 057faff2e5..024836b335 100644 --- a/src/test/regress/expected/create_index.out +++ b/src/test/regress/expected/create_index.out @@ -2395,6 +2395,25 @@ DETAIL: Key ((f1 || f2))=(ABCDEF) already exists. -- but this shouldn't: INSERT INTO func_index_heap VALUES('QWERTY'); -- +-- Test unique index with included columns +-- +CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text); +CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDE(f3); +INSERT INTO covering_index_heap VALUES(1,1,'AAA'); +INSERT INTO covering_index_heap VALUES(1,2,'AAA'); +-- this should fail because of unique index on f1,f2: +INSERT INTO covering_index_heap VALUES(1,2,'BBB'); +ERROR: duplicate key value violates unique constraint "covering_index_index" +DETAIL: Key (f1, f2)=(1, 2) already exists. +-- and this shouldn't: +INSERT INTO covering_index_heap VALUES(1,4,'AAA'); +-- Try to build index on table that already contains data +CREATE UNIQUE INDEX covering_pkey on covering_index_heap (f1,f2) INCLUDE(f3); +-- Try to use existing covering index as primary key +ALTER TABLE covering_index_heap ADD CONSTRAINT covering_pkey PRIMARY KEY USING INDEX +covering_pkey; +DROP TABLE covering_index_heap; +-- -- Also try building functional, expressional, and partial indexes on -- tables that already contain data. -- diff --git a/src/test/regress/expected/index_including.out b/src/test/regress/expected/index_including.out new file mode 100644 index 0000000000..1d253ee77d --- /dev/null +++ b/src/test/regress/expected/index_including.out @@ -0,0 +1,346 @@ +/* + * 1.test CREATE INDEX + */ +-- Regular index with included columns +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE INDEX tbl_idx ON tbl using btree (c1, c2) INCLUDE (c3,c4); +-- must fail because of intersection of key and included columns +CREATE INDEX tbl_idx ON tbl using btree (c1, c2) INCLUDE (c1,c3); +ERROR: included columns must not intersect with key columns +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl'::regclass ORDER BY c.relname; + pg_get_indexdef +-------------------------------------------------------------------------- + CREATE INDEX tbl_idx ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) +(1 row) + +DROP TABLE tbl; +-- Unique index and unique constraint +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree (c1, c2) INCLUDE (c3, c4); +ALTER TABLE tbl add UNIQUE USING INDEX tbl_idx_unique; +ALTER TABLE tbl add UNIQUE (c1, c2) INCLUDE (c3, c4); +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl'::regclass ORDER BY c.relname; + pg_get_indexdef +--------------------------------------------------------------------------------------------- + CREATE UNIQUE INDEX tbl_c1_c2_c3_c4_key ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) + CREATE UNIQUE INDEX tbl_idx_unique ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) +(2 rows) + +DROP TABLE tbl; +-- Unique index and unique constraint. Both must fail. +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree (c1, c2) INCLUDE (c3, c4); +ERROR: could not create unique index "tbl_idx_unique" +DETAIL: Key (c1, c2)=(1, 2) is duplicated. +ALTER TABLE tbl add UNIQUE (c1, c2) INCLUDE (c3, c4); +ERROR: could not create unique index "tbl_c1_c2_c3_c4_key" +DETAIL: Key (c1, c2)=(1, 2) is duplicated. +DROP TABLE tbl; +-- PK constraint +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT 1, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ALTER TABLE tbl add PRIMARY KEY (c1, c2) INCLUDE (c3, c4); +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl'::regclass ORDER BY c.relname; + pg_get_indexdef +---------------------------------------------------------------------------------- + CREATE UNIQUE INDEX tbl_pkey ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) +(1 row) + +DROP TABLE tbl; +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT 1, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree (c1, c2) INCLUDE (c3, c4); +ALTER TABLE tbl add PRIMARY KEY USING INDEX tbl_idx_unique; +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl'::regclass ORDER BY c.relname; + pg_get_indexdef +---------------------------------------------------------------------------------------- + CREATE UNIQUE INDEX tbl_idx_unique ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) +(1 row) + +DROP TABLE tbl; +-- PK constraint. Must fail. +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ALTER TABLE tbl add PRIMARY KEY (c1, c2) INCLUDE (c3, c4); +ERROR: could not create unique index "tbl_pkey" +DETAIL: Key (c1, c2)=(1, 2) is duplicated. +DROP TABLE tbl; +/* + * 2. Test CREATE TABLE with constraint + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + CONSTRAINT covering UNIQUE(c1,c2) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; + indexrelid | indnatts | indnkeyatts | indisunique | indisprimary | indkey | indclass +------------+----------+-------------+-------------+--------------+---------+----------- + covering | 4 | 2 | t | f | 1 2 3 4 | 1978 1978 +(1 row) + +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; + pg_get_constraintdef | conname | conkey | conincluding +----------------------------------+----------+--------+-------------- + UNIQUE (c1, c2) INCLUDE (c3, c4) | covering | {1,2} | {3,4} +(1 row) + +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ERROR: duplicate key value violates unique constraint "covering" +DETAIL: Key (c1, c2)=(1, 2) already exists. +DROP TABLE tbl; +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + CONSTRAINT covering PRIMARY KEY(c1,c2) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; + indexrelid | indnatts | indnkeyatts | indisunique | indisprimary | indkey | indclass +------------+----------+-------------+-------------+--------------+---------+----------- + covering | 4 | 2 | t | t | 1 2 3 4 | 1978 1978 +(1 row) + +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; + pg_get_constraintdef | conname | conkey | conincluding +---------------------------------------+----------+--------+-------------- + PRIMARY KEY (c1, c2) INCLUDE (c3, c4) | covering | {1,2} | {3,4} +(1 row) + +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ERROR: duplicate key value violates unique constraint "covering" +DETAIL: Key (c1, c2)=(1, 2) already exists. +INSERT INTO tbl SELECT 1, NULL, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ERROR: null value in column "c2" violates not-null constraint +DETAIL: Failing row contains (1, null, 3, (4,4),(4,4)). +INSERT INTO tbl SELECT x, 2*x, NULL, NULL FROM generate_series(1,10) AS x; +DROP TABLE tbl; +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + UNIQUE(c1,c2) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; + indexrelid | indnatts | indnkeyatts | indisunique | indisprimary | indkey | indclass +---------------------+----------+-------------+-------------+--------------+---------+----------- + tbl_c1_c2_c3_c4_key | 4 | 2 | t | f | 1 2 3 4 | 1978 1978 +(1 row) + +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; + pg_get_constraintdef | conname | conkey | conincluding +----------------------------------+---------------------+--------+-------------- + UNIQUE (c1, c2) INCLUDE (c3, c4) | tbl_c1_c2_c3_c4_key | {1,2} | {3,4} +(1 row) + +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ERROR: duplicate key value violates unique constraint "tbl_c1_c2_c3_c4_key" +DETAIL: Key (c1, c2)=(1, 2) already exists. +DROP TABLE tbl; +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + PRIMARY KEY(c1,c2) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; + indexrelid | indnatts | indnkeyatts | indisunique | indisprimary | indkey | indclass +------------+----------+-------------+-------------+--------------+---------+----------- + tbl_pkey | 4 | 2 | t | t | 1 2 3 4 | 1978 1978 +(1 row) + +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; + pg_get_constraintdef | conname | conkey | conincluding +---------------------------------------+----------+--------+-------------- + PRIMARY KEY (c1, c2) INCLUDE (c3, c4) | tbl_pkey | {1,2} | {3,4} +(1 row) + +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ERROR: duplicate key value violates unique constraint "tbl_pkey" +DETAIL: Key (c1, c2)=(1, 2) already exists. +INSERT INTO tbl SELECT 1, NULL, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ERROR: null value in column "c2" violates not-null constraint +DETAIL: Failing row contains (1, null, 3, (4,4),(4,4)). +INSERT INTO tbl SELECT x, 2*x, NULL, NULL FROM generate_series(1,10) AS x; +DROP TABLE tbl; +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + EXCLUDE USING btree (c1 WITH =) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; + indexrelid | indnatts | indnkeyatts | indisunique | indisprimary | indkey | indclass +-------------------+----------+-------------+-------------+--------------+--------+---------- + tbl_c1_c3_c4_excl | 3 | 1 | f | f | 1 3 4 | 1978 +(1 row) + +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; + pg_get_constraintdef | conname | conkey | conincluding +--------------------------------------------------+-------------------+--------+-------------- + EXCLUDE USING btree (c1 WITH =) INCLUDE (c3, c4) | tbl_c1_c3_c4_excl | {1} | {3,4} +(1 row) + +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ERROR: conflicting key value violates exclusion constraint "tbl_c1_c3_c4_excl" +DETAIL: Key (c1)=(1) conflicts with existing key (c1)=(1). +INSERT INTO tbl SELECT x, 2*x, NULL, NULL FROM generate_series(1,10) AS x; +DROP TABLE tbl; +/* + * 3.0 Test ALTER TABLE DROP COLUMN. + * Any column deletion leads to index deletion. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 int); +CREATE UNIQUE INDEX tbl_idx ON tbl using btree(c1, c2, c3, c4); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +------------------------------------------------------------------------ + CREATE UNIQUE INDEX tbl_idx ON public.tbl USING btree (c1, c2, c3, c4) +(1 row) + +ALTER TABLE tbl DROP COLUMN c3; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +---------- +(0 rows) + +DROP TABLE tbl; +/* + * 3.1 Test ALTER TABLE DROP COLUMN. + * Included column deletion leads to the index deletion, + * AS well AS key columns deletion. It's explained in documentation. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box); +CREATE UNIQUE INDEX tbl_idx ON tbl using btree(c1, c2) INCLUDE(c3,c4); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +--------------------------------------------------------------------------------- + CREATE UNIQUE INDEX tbl_idx ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) +(1 row) + +ALTER TABLE tbl DROP COLUMN c3; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +---------- +(0 rows) + +DROP TABLE tbl; +/* + * 3.2 Test ALTER TABLE DROP COLUMN. + * Included column deletion leads to the index deletion. + * AS well AS key columns deletion. It's explained in documentation. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDE(c3,c4)); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +--------------------------------------------------------------------------------------------- + CREATE UNIQUE INDEX tbl_c1_c2_c3_c4_key ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) +(1 row) + +ALTER TABLE tbl DROP COLUMN c3; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +---------- +(0 rows) + +ALTER TABLE tbl DROP COLUMN c1; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +---------- +(0 rows) + +DROP TABLE tbl; +/* + * 4. CREATE INDEX CONCURRENTLY + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDE(c3,c4)); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,1000) AS x; +CREATE UNIQUE INDEX CONCURRENTLY on tbl (c1, c2) INCLUDE (c3, c4); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +--------------------------------------------------------------------------------------------- + CREATE UNIQUE INDEX tbl_c1_c2_c3_c4_idx ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) + CREATE UNIQUE INDEX tbl_c1_c2_c3_c4_key ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) +(2 rows) + +DROP TABLE tbl; +/* + * 5. REINDEX + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDE(c3,c4)); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +--------------------------------------------------------------------------------------------- + CREATE UNIQUE INDEX tbl_c1_c2_c3_c4_key ON public.tbl USING btree (c1, c2) INCLUDE (c3, c4) +(1 row) + +ALTER TABLE tbl DROP COLUMN c3; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +---------- +(0 rows) + +REINDEX INDEX tbl_c1_c2_c3_c4_key; +ERROR: relation "tbl_c1_c2_c3_c4_key" does not exist +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +---------- +(0 rows) + +ALTER TABLE tbl DROP COLUMN c1; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; + indexdef +---------- +(0 rows) + +DROP TABLE tbl; +/* + * 7. Check various AMs. All but btree must fail. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 box, c4 box); +CREATE INDEX on tbl USING brin(c1, c2) INCLUDE (c3, c4); +ERROR: access method "brin" does not support included columns +CREATE INDEX on tbl USING gist(c3) INCLUDE (c4); +ERROR: access method "gist" does not support included columns +CREATE INDEX on tbl USING spgist(c3) INCLUDE (c4); +ERROR: access method "spgist" does not support included columns +CREATE INDEX on tbl USING gin(c1, c2) INCLUDE (c3, c4); +ERROR: access method "gin" does not support included columns +CREATE INDEX on tbl USING hash(c1, c2) INCLUDE (c3, c4); +ERROR: access method "hash" does not support included columns +CREATE INDEX on tbl USING rtree(c1, c2) INCLUDE (c3, c4); +NOTICE: substituting access method "gist" for obsolete method "rtree" +ERROR: access method "gist" does not support included columns +CREATE INDEX on tbl USING btree(c1, c2) INCLUDE (c3, c4); +DROP TABLE tbl; +/* + * 8. Update, delete values in indexed table. + */ +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDE (c3,c4); +UPDATE tbl SET c1 = 100 WHERE c1 = 2; +UPDATE tbl SET c1 = 1 WHERE c1 = 3; +-- should fail +UPDATE tbl SET c2 = 2 WHERE c1 = 1; +ERROR: duplicate key value violates unique constraint "tbl_idx_unique" +DETAIL: Key (c1, c2)=(1, 2) already exists. +UPDATE tbl SET c3 = 1; +DELETE FROM tbl WHERE c1 = 5 OR c3 = 12; +DROP TABLE tbl; +/* + * 9. Alter column type. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDE(c3,c4)); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ALTER TABLE tbl ALTER c1 TYPE bigint; +ALTER TABLE tbl ALTER c3 TYPE bigint; +\d tbl + Table "public.tbl" + Column | Type | Collation | Nullable | Default +--------+---------+-----------+----------+--------- + c1 | bigint | | | + c2 | integer | | | + c3 | bigint | | | + c4 | box | | | +Indexes: + "tbl_c1_c2_c3_c4_key" UNIQUE CONSTRAINT, btree (c1, c2) INCLUDE (c3, c4) + +DROP TABLE tbl; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index d858a0e7db..c07083bd44 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -55,7 +55,7 @@ test: copy copyselect copydml # ---------- test: create_misc create_operator create_procedure # These depend on the above two -test: create_index create_view +test: create_index create_view index_including # ---------- # Another group of parallel tests diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 99f8ca37ba..e6e6a4608b 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -65,6 +65,7 @@ test: create_misc test: create_operator test: create_procedure test: create_index +test: index_including test: create_view test: create_aggregate test: create_function_3 diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql index 7f17588b0d..9d4b8883c9 100644 --- a/src/test/regress/sql/create_index.sql +++ b/src/test/regress/sql/create_index.sql @@ -731,6 +731,26 @@ INSERT INTO func_index_heap VALUES('ABCD', 'EF'); -- but this shouldn't: INSERT INTO func_index_heap VALUES('QWERTY'); +-- +-- Test unique index with included columns +-- +CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text); +CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDE(f3); + +INSERT INTO covering_index_heap VALUES(1,1,'AAA'); +INSERT INTO covering_index_heap VALUES(1,2,'AAA'); +-- this should fail because of unique index on f1,f2: +INSERT INTO covering_index_heap VALUES(1,2,'BBB'); +-- and this shouldn't: +INSERT INTO covering_index_heap VALUES(1,4,'AAA'); +-- Try to build index on table that already contains data +CREATE UNIQUE INDEX covering_pkey on covering_index_heap (f1,f2) INCLUDE(f3); +-- Try to use existing covering index as primary key +ALTER TABLE covering_index_heap ADD CONSTRAINT covering_pkey PRIMARY KEY USING INDEX +covering_pkey; +DROP TABLE covering_index_heap; + + -- -- Also try building functional, expressional, and partial indexes on -- tables that already contain data. diff --git a/src/test/regress/sql/index_including.sql b/src/test/regress/sql/index_including.sql new file mode 100644 index 0000000000..caedc9866d --- /dev/null +++ b/src/test/regress/sql/index_including.sql @@ -0,0 +1,203 @@ +/* + * 1.test CREATE INDEX + */ + +-- Regular index with included columns +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE INDEX tbl_idx ON tbl using btree (c1, c2) INCLUDE (c3,c4); +-- must fail because of intersection of key and included columns +CREATE INDEX tbl_idx ON tbl using btree (c1, c2) INCLUDE (c1,c3); +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl'::regclass ORDER BY c.relname; +DROP TABLE tbl; + +-- Unique index and unique constraint +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree (c1, c2) INCLUDE (c3, c4); +ALTER TABLE tbl add UNIQUE USING INDEX tbl_idx_unique; +ALTER TABLE tbl add UNIQUE (c1, c2) INCLUDE (c3, c4); +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl'::regclass ORDER BY c.relname; +DROP TABLE tbl; + +-- Unique index and unique constraint. Both must fail. +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree (c1, c2) INCLUDE (c3, c4); +ALTER TABLE tbl add UNIQUE (c1, c2) INCLUDE (c3, c4); +DROP TABLE tbl; + +-- PK constraint +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT 1, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ALTER TABLE tbl add PRIMARY KEY (c1, c2) INCLUDE (c3, c4); +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl'::regclass ORDER BY c.relname; +DROP TABLE tbl; + +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT 1, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree (c1, c2) INCLUDE (c3, c4); +ALTER TABLE tbl add PRIMARY KEY USING INDEX tbl_idx_unique; +SELECT pg_get_indexdef(i.indexrelid) +FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid +WHERE i.indrelid = 'tbl'::regclass ORDER BY c.relname; +DROP TABLE tbl; + +-- PK constraint. Must fail. +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ALTER TABLE tbl add PRIMARY KEY (c1, c2) INCLUDE (c3, c4); +DROP TABLE tbl; + + +/* + * 2. Test CREATE TABLE with constraint + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + CONSTRAINT covering UNIQUE(c1,c2) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +DROP TABLE tbl; + +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + CONSTRAINT covering PRIMARY KEY(c1,c2) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +INSERT INTO tbl SELECT 1, NULL, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +INSERT INTO tbl SELECT x, 2*x, NULL, NULL FROM generate_series(1,10) AS x; +DROP TABLE tbl; + +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + UNIQUE(c1,c2) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +DROP TABLE tbl; + +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + PRIMARY KEY(c1,c2) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +INSERT INTO tbl SELECT 1, NULL, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +INSERT INTO tbl SELECT x, 2*x, NULL, NULL FROM generate_series(1,10) AS x; +DROP TABLE tbl; + +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, + EXCLUDE USING btree (c1 WITH =) INCLUDE(c3,c4)); +SELECT indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass FROM pg_index WHERE indrelid = 'tbl'::regclass::oid; +SELECT pg_get_constraintdef(oid), conname, conkey, conincluding FROM pg_constraint WHERE conrelid = 'tbl'::regclass::oid; +-- ensure that constraint works +INSERT INTO tbl SELECT 1, 2, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +INSERT INTO tbl SELECT x, 2*x, NULL, NULL FROM generate_series(1,10) AS x; +DROP TABLE tbl; + +/* + * 3.0 Test ALTER TABLE DROP COLUMN. + * Any column deletion leads to index deletion. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 int); +CREATE UNIQUE INDEX tbl_idx ON tbl using btree(c1, c2, c3, c4); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +ALTER TABLE tbl DROP COLUMN c3; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +DROP TABLE tbl; + +/* + * 3.1 Test ALTER TABLE DROP COLUMN. + * Included column deletion leads to the index deletion, + * AS well AS key columns deletion. It's explained in documentation. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box); +CREATE UNIQUE INDEX tbl_idx ON tbl using btree(c1, c2) INCLUDE(c3,c4); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +ALTER TABLE tbl DROP COLUMN c3; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +DROP TABLE tbl; + +/* + * 3.2 Test ALTER TABLE DROP COLUMN. + * Included column deletion leads to the index deletion. + * AS well AS key columns deletion. It's explained in documentation. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDE(c3,c4)); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +ALTER TABLE tbl DROP COLUMN c3; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +ALTER TABLE tbl DROP COLUMN c1; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +DROP TABLE tbl; + + +/* + * 4. CREATE INDEX CONCURRENTLY + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDE(c3,c4)); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,1000) AS x; +CREATE UNIQUE INDEX CONCURRENTLY on tbl (c1, c2) INCLUDE (c3, c4); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +DROP TABLE tbl; + + +/* + * 5. REINDEX + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDE(c3,c4)); +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +ALTER TABLE tbl DROP COLUMN c3; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +REINDEX INDEX tbl_c1_c2_c3_c4_key; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +ALTER TABLE tbl DROP COLUMN c1; +SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname; +DROP TABLE tbl; + +/* + * 7. Check various AMs. All but btree must fail. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 box, c4 box); +CREATE INDEX on tbl USING brin(c1, c2) INCLUDE (c3, c4); +CREATE INDEX on tbl USING gist(c3) INCLUDE (c4); +CREATE INDEX on tbl USING spgist(c3) INCLUDE (c4); +CREATE INDEX on tbl USING gin(c1, c2) INCLUDE (c3, c4); +CREATE INDEX on tbl USING hash(c1, c2) INCLUDE (c3, c4); +CREATE INDEX on tbl USING rtree(c1, c2) INCLUDE (c3, c4); +CREATE INDEX on tbl USING btree(c1, c2) INCLUDE (c3, c4); +DROP TABLE tbl; + +/* + * 8. Update, delete values in indexed table. + */ +CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDE (c3,c4); +UPDATE tbl SET c1 = 100 WHERE c1 = 2; +UPDATE tbl SET c1 = 1 WHERE c1 = 3; +-- should fail +UPDATE tbl SET c2 = 2 WHERE c1 = 1; +UPDATE tbl SET c3 = 1; +DELETE FROM tbl WHERE c1 = 5 OR c3 = 12; +DROP TABLE tbl; + +/* + * 9. Alter column type. + */ +CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDE(c3,c4)); +INSERT INTO tbl SELECT x, 2*x, 3*x, box('4,4,4,4') FROM generate_series(1,10) AS x; +ALTER TABLE tbl ALTER c1 TYPE bigint; +ALTER TABLE tbl ALTER c3 TYPE bigint; +\d tbl +DROP TABLE tbl; + diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index e0104cd8d0..4050e82bc9 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 16; +use Test::More tests => 17; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -31,6 +31,8 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE tab_mixed (a int primary key, b text)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))"); # Setup structure on subscriber $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)"); @@ -44,13 +46,17 @@ $node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_mixed (c text, b text, a int primary key)"); +# replication of the table with included index +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))"); + # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)"); $node_publisher->safe_psql('postgres', -"ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed" +"ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include" ); $node_publisher->safe_psql('postgres', "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); @@ -89,6 +95,11 @@ $node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_mixed VALUES (2, 'bar')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_include SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_include WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_include SET a = -a"); + $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', @@ -104,6 +115,10 @@ $result = is( $result, qq(|foo|1 |bar|2), 'check replicated changes with different column order'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_include"); +is($result, qq(20|-20|-1), 'check replicated changes with primary key index with included columns'); + # insert some duplicate rows $node_publisher->safe_psql('postgres', "INSERT INTO tab_full SELECT generate_series(1,10)");