diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 1e22c1eefc..766ced401f 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -416,12 +416,12 @@ CREATE TABLE replication_metadata ( WITH (user_catalog_table = true) ; \d+ replication_metadata - Table "public.replication_metadata" - Column | Type | Collation | Nullable | Default | Storage | Stats target | Description -----------+---------+-----------+----------+--------------------------------------------------+----------+--------------+------------- - id | integer | | not null | nextval('replication_metadata_id_seq'::regclass) | plain | | - relation | name | | not null | | plain | | - options | text[] | | | | extended | | + Table "public.replication_metadata" + Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description +----------+---------+-----------+----------+--------------------------------------------------+----------+-------------+--------------+------------- + id | integer | | not null | nextval('replication_metadata_id_seq'::regclass) | plain | | | + relation | name | | not null | | plain | | | + options | text[] | | | | extended | | | Indexes: "replication_metadata_pkey" PRIMARY KEY, btree (id) Options: user_catalog_table=true @@ -430,12 +430,12 @@ INSERT INTO replication_metadata(relation, options) VALUES ('foo', ARRAY['a', 'b']); ALTER TABLE replication_metadata RESET (user_catalog_table); \d+ replication_metadata - Table "public.replication_metadata" - Column | Type | Collation | Nullable | Default | Storage | Stats target | Description -----------+---------+-----------+----------+--------------------------------------------------+----------+--------------+------------- - id | integer | | not null | nextval('replication_metadata_id_seq'::regclass) | plain | | - relation | name | | not null | | plain | | - options | text[] | | | | extended | | + Table "public.replication_metadata" + Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description +----------+---------+-----------+----------+--------------------------------------------------+----------+-------------+--------------+------------- + id | integer | | not null | nextval('replication_metadata_id_seq'::regclass) | plain | | | + relation | name | | not null | | plain | | | + options | text[] | | | | extended | | | Indexes: "replication_metadata_pkey" PRIMARY KEY, btree (id) @@ -443,12 +443,12 @@ INSERT INTO replication_metadata(relation, options) VALUES ('bar', ARRAY['a', 'b']); ALTER TABLE replication_metadata SET (user_catalog_table = true); \d+ replication_metadata - Table "public.replication_metadata" - Column | Type | Collation | Nullable | Default | Storage | Stats target | Description -----------+---------+-----------+----------+--------------------------------------------------+----------+--------------+------------- - id | integer | | not null | nextval('replication_metadata_id_seq'::regclass) | plain | | - relation | name | | not null | | plain | | - options | text[] | | | | extended | | + Table "public.replication_metadata" + Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description +----------+---------+-----------+----------+--------------------------------------------------+----------+-------------+--------------+------------- + id | integer | | not null | nextval('replication_metadata_id_seq'::regclass) | plain | | | + relation | name | | not null | | plain | | | + options | text[] | | | | extended | | | Indexes: "replication_metadata_pkey" PRIMARY KEY, btree (id) Options: user_catalog_table=true @@ -461,13 +461,13 @@ ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text; ERROR: cannot rewrite table "replication_metadata" used as a catalog table ALTER TABLE replication_metadata SET (user_catalog_table = false); \d+ replication_metadata - Table "public.replication_metadata" - Column | Type | Collation | Nullable | Default | Storage | Stats target | Description -----------------+---------+-----------+----------+--------------------------------------------------+----------+--------------+------------- - id | integer | | not null | nextval('replication_metadata_id_seq'::regclass) | plain | | - relation | name | | not null | | plain | | - options | text[] | | | | extended | | - rewritemeornot | integer | | | | plain | | + Table "public.replication_metadata" + Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description +----------------+---------+-----------+----------+--------------------------------------------------+----------+-------------+--------------+------------- + id | integer | | not null | nextval('replication_metadata_id_seq'::regclass) | plain | | | + relation | name | | not null | | plain | | | + options | text[] | | | | extended | | | + rewritemeornot | integer | | | | plain | | | Indexes: "replication_metadata_pkey" PRIMARY KEY, btree (id) Options: user_catalog_table=false diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c index 138671410a..9efa5a9648 100644 --- a/src/backend/access/common/indextuple.c +++ b/src/backend/access/common/indextuple.c @@ -92,7 +92,8 @@ index_form_tuple(TupleDesc tupleDescriptor, VARSIZE(DatumGetPointer(untoasted_values[i])) > TOAST_INDEX_TARGET && (att->attstorage == 'x' || att->attstorage == 'm')) { - Datum cvalue = toast_compress_datum(untoasted_values[i]); + Datum cvalue = toast_compress_datum(untoasted_values[i], + att->attcompression); if (DatumGetPointer(cvalue) != NULL) { diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index ec10762529..21e375ad96 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -935,11 +935,48 @@ untransformRelOptions(Datum options) val = (Node *) makeString(pstrdup(p)); } result = lappend(result, makeDefElem(pstrdup(s), val, -1)); + pfree(s); } return result; } +char * +formatRelOptions(List *options) +{ + StringInfoData buf; + ListCell *cell; + + initStringInfo(&buf); + + foreach(cell, options) + { + DefElem *def = (DefElem *) lfirst(cell); + + appendStringInfo(&buf, "%s%s=%s", buf.len > 0 ? ", " : "", + def->defname, defGetString(def)); + } + + return buf.data; +} + +void +freeRelOptions(List *options) +{ + ListCell *cell; + + Assert(options != NIL); + foreach(cell, options) + { + DefElem *def = (DefElem *) lfirst(cell); + + pfree(def->defname); + pfree(defGetString(def)); + pfree(def->arg); + } + list_free_deep(options); +} + /* * Extract and parse reloptions from a pg_class tuple. * diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c index 4436c86361..8ec03457e0 100644 --- a/src/backend/access/common/tupdesc.c +++ b/src/backend/access/common/tupdesc.c @@ -19,8 +19,10 @@ #include "postgres.h" +#include "access/compression.h" #include "access/hash.h" #include "access/htup_details.h" +#include "access/reloptions.h" #include "catalog/pg_collation.h" #include "catalog/pg_type.h" #include "miscadmin.h" @@ -226,6 +228,7 @@ TupleDescCopyEntry(TupleDesc dst, AttrNumber dstAttno, dstAtt->attnotnull = false; dstAtt->atthasdef = false; dstAtt->attidentity = '\0'; + dstAtt->attcompression = InvalidOid; } /* @@ -380,6 +383,8 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2) return false; if (attr1->attcollation != attr2->attcollation) return false; + if (attr1->attcompression != attr2->attcompression) + return false; /* attacl, attoptions and attfdwoptions are not even present... */ } @@ -442,6 +447,7 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2) } else if (tupdesc2->constr != NULL) return false; + return true; } @@ -547,6 +553,7 @@ TupleDescInitEntry(TupleDesc desc, att->attalign = typeForm->typalign; att->attstorage = typeForm->typstorage; att->attcollation = typeForm->typcollation; + att->attcompression = InvalidOid; ReleaseSysCache(tuple); } @@ -659,7 +666,6 @@ TupleDescInitEntryCollation(TupleDesc desc, TupleDescAttr(desc, attributeNumber - 1)->attcollation = collationid; } - /* * BuildDescForRelation * diff --git a/src/backend/access/heap/tuptoaster.c b/src/backend/access/heap/tuptoaster.c index 5a8f1dab83..83ba21e8c7 100644 --- a/src/backend/access/heap/tuptoaster.c +++ b/src/backend/access/heap/tuptoaster.c @@ -30,8 +30,10 @@ #include #include +#include "access/compression.h" #include "access/genam.h" #include "access/heapam.h" +#include "access/reloptions.h" #include "access/tuptoaster.h" #include "access/xact.h" #include "catalog/catalog.h" @@ -39,6 +41,8 @@ #include "miscadmin.h" #include "utils/expandeddatum.h" #include "utils/fmgroids.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/typcache.h" @@ -53,19 +57,46 @@ typedef struct toast_compress_header { int32 vl_len_; /* varlena header (do not touch directly!) */ - int32 rawsize; + uint32 info; /* flags (2 bits) and rawsize */ } toast_compress_header; +/* + * If the compression method were used, then data also contains + * Oid of compression options + */ +typedef struct toast_compress_header_custom +{ + int32 vl_len_; /* varlena header (do not touch directly!) */ + uint32 info; /* flags (2 high bits) and rawsize */ + Oid cmoptoid; /* Oid from pg_compression_opt */ +} toast_compress_header_custom; + +static HTAB *compression_options_htab = NULL; +static MemoryContext compression_options_mcxt = NULL; + +#define RAWSIZEMASK (0x3FFFFFFFU) + /* * Utilities for manipulation of header information for compressed * toast entries. */ -#define TOAST_COMPRESS_HDRSZ ((int32) sizeof(toast_compress_header)) -#define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->rawsize) +#define TOAST_COMPRESS_HDRSZ ((int32) sizeof(toast_compress_header)) +#define TOAST_COMPRESS_HDRSZ_CUSTOM ((int32) sizeof(toast_compress_header_custom)) +#define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->info & RAWSIZEMASK) #define TOAST_COMPRESS_RAWDATA(ptr) \ (((char *) (ptr)) + TOAST_COMPRESS_HDRSZ) #define TOAST_COMPRESS_SET_RAWSIZE(ptr, len) \ - (((toast_compress_header *) (ptr))->rawsize = (len)) +do { \ + ((toast_compress_header *) (ptr))->info &= 0xC0000000; \ + ((toast_compress_header *) (ptr))->info |= ((uint32)(len) & RAWSIZEMASK); \ +} while (0) +#define TOAST_COMPRESS_SET_CMOPTOID(ptr, oid) \ + (((toast_compress_header_custom *) (ptr))->cmoptoid = (oid)) +#define TOAST_COMPRESS_SET_CUSTOM(ptr) \ +do { \ + (((toast_compress_header *) (ptr))->info |= (1 << 31)); \ + (((toast_compress_header *) (ptr))->info &= ~(1 << 30)); \ +} while (0) static void toast_delete_datum(Relation rel, Datum value, bool is_speculative); static Datum toast_save_datum(Relation rel, Datum value, @@ -83,6 +114,8 @@ static int toast_open_indexes(Relation toastrel, static void toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock); static void init_toast_snapshot(Snapshot toast_snapshot); +static void init_compression_options_htab(void); +static AttributeCompression *get_compression_options_info(Oid cmoptoid); /* ---------- @@ -741,6 +774,8 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, Datum old_value; Datum new_value; + Form_pg_attribute att; + /* * Search for the biggest yet unprocessed internal attribute */ @@ -770,10 +805,11 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, * Attempt to compress it inline, if it has attstorage 'x' */ i = biggest_attno; - if (TupleDescAttr(tupleDesc, i)->attstorage == 'x') + att = TupleDescAttr(tupleDesc, i); + if (att->attstorage == 'x') { old_value = toast_values[i]; - new_value = toast_compress_datum(old_value); + new_value = toast_compress_datum(old_value, att->attcompression); if (DatumGetPointer(new_value) != NULL) { @@ -914,7 +950,8 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, */ i = biggest_attno; old_value = toast_values[i]; - new_value = toast_compress_datum(old_value); + new_value = toast_compress_datum(old_value, + TupleDescAttr(tupleDesc, i)->attcompression); if (DatumGetPointer(new_value) != NULL) { @@ -1229,7 +1266,9 @@ toast_flatten_tuple_to_datum(HeapTupleHeader tup, if (VARATT_IS_EXTERNAL(new_value) || VARATT_IS_COMPRESSED(new_value)) { - new_value = heap_tuple_untoast_attr(new_value); + struct varlena *untoasted_value = heap_tuple_untoast_attr(new_value); + + new_value = untoasted_value; toast_values[i] = PointerGetDatum(new_value); toast_free[i] = true; } @@ -1353,7 +1392,6 @@ toast_build_flattened_tuple(TupleDesc tupleDesc, return new_tuple; } - /* ---------- * toast_compress_datum - * @@ -1368,25 +1406,43 @@ toast_build_flattened_tuple(TupleDesc tupleDesc, * ---------- */ Datum -toast_compress_datum(Datum value) +toast_compress_datum(Datum value, Oid cmoptoid) { - struct varlena *tmp; - int32 valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value)); - int32 len; + struct varlena *tmp = NULL; + int32 valsize, + len = 0; + AttributeCompression *ac = NULL; Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value))); Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value))); + if (OidIsValid(cmoptoid)) + ac = get_compression_options_info(cmoptoid); + /* * No point in wasting a palloc cycle if value size is out of the allowed * range for compression */ - if (valsize < PGLZ_strategy_default->min_input_size || - valsize > PGLZ_strategy_default->max_input_size) + valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value)); + if (!ac && (valsize < PGLZ_strategy_default->min_input_size || + valsize > PGLZ_strategy_default->max_input_size)) return PointerGetDatum(NULL); - tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) + - TOAST_COMPRESS_HDRSZ); + if (ac) + { + tmp = ac->routine->compress(ac, (const struct varlena *) value); + if (!tmp) + return PointerGetDatum(NULL); + } + else + { + tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) + + TOAST_COMPRESS_HDRSZ); + len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)), + valsize, + TOAST_COMPRESS_RAWDATA(tmp), + PGLZ_strategy_default); + } /* * We recheck the actual size even if pglz_compress() reports success, @@ -1398,11 +1454,7 @@ toast_compress_datum(Datum value) * only one header byte and no padding if the value is short enough. So * we insist on a savings of more than 2 bytes to ensure we have a gain. */ - len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)), - valsize, - TOAST_COMPRESS_RAWDATA(tmp), - PGLZ_strategy_default); - if (len >= 0 && + if (!ac && len >= 0 && len + TOAST_COMPRESS_HDRSZ < valsize - 2) { TOAST_COMPRESS_SET_RAWSIZE(tmp, valsize); @@ -1410,10 +1462,20 @@ toast_compress_datum(Datum value) /* successful compression */ return PointerGetDatum(tmp); } + else if (ac && VARSIZE(tmp) < valsize - 2) + { + TOAST_COMPRESS_SET_CUSTOM(tmp); + TOAST_COMPRESS_SET_RAWSIZE(tmp, valsize); + TOAST_COMPRESS_SET_CMOPTOID(tmp, ac->cmoptoid); + /* successful compression */ + return PointerGetDatum(tmp); + } else { /* incompressible data */ - pfree(tmp); + if (tmp) + pfree(tmp); + return PointerGetDatum(NULL); } } @@ -2280,15 +2342,26 @@ toast_decompress_datum(struct varlena *attr) Assert(VARATT_IS_COMPRESSED(attr)); - result = (struct varlena *) - palloc(TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ); - SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ); + if (VARATT_IS_CUSTOM_COMPRESSED(attr)) + { + AttributeCompression *ac; + toast_compress_header_custom *hdr; - if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr), - VARSIZE(attr) - TOAST_COMPRESS_HDRSZ, - VARDATA(result), - TOAST_COMPRESS_RAWSIZE(attr)) < 0) - elog(ERROR, "compressed data is corrupted"); + hdr = (toast_compress_header_custom *) attr; + ac = get_compression_options_info(hdr->cmoptoid); + result = ac->routine->decompress(ac, attr); + } + else + { + result = (struct varlena *) + palloc(TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ); + SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ); + if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr), + VARSIZE(attr) - TOAST_COMPRESS_HDRSZ, + VARDATA(result), + TOAST_COMPRESS_RAWSIZE(attr)) < 0) + elog(ERROR, "compressed data is corrupted"); + } return result; } @@ -2390,3 +2463,44 @@ init_toast_snapshot(Snapshot toast_snapshot) InitToastSnapshot(*toast_snapshot, snapshot->lsn, snapshot->whenTaken); } + +static void +init_compression_options_htab(void) +{ + HASHCTL ctl; + + compression_options_mcxt = AllocSetContextCreate(TopMemoryContext, + "compression options cache context", + ALLOCSET_DEFAULT_SIZES); + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(AttributeCompression); + ctl.hcxt = compression_options_mcxt; + compression_options_htab = hash_create("compression options cache", 100, &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + +static AttributeCompression * +get_compression_options_info(Oid cmoptoid) +{ + bool found; + AttributeCompression *result; + + Assert(OidIsValid(cmoptoid)); + if (!compression_options_htab) + init_compression_options_htab(); + + result = hash_search(compression_options_htab, &cmoptoid, HASH_ENTER, &found); + if (!found) + { + MemoryContext oldcxt; + + Assert(compression_options_mcxt); + oldcxt = MemoryContextSwitchTo(compression_options_mcxt); + result->routine = GetCompressionRoutine(cmoptoid); + result->options = GetCompressionOptionsList(cmoptoid); + result->cmoptoid = cmoptoid; + MemoryContextSwitchTo(oldcxt); + } + return result; +} diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 0453fd4ac1..53feb17abc 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -718,6 +718,7 @@ DefineAttr(char *name, char *type, int attnum, int nullness) attrtypes[attnum]->attcacheoff = -1; attrtypes[attnum]->atttypmod = -1; attrtypes[attnum]->attislocal = true; + attrtypes[attnum]->attcompression = InvalidOid; if (nullness == BOOTCOL_NULL_FORCE_NOT_NULL) { diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index fd33426bad..c7cea974b1 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -46,7 +46,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ pg_subscription_rel.h toasting.h indexing.h \ - toasting.h indexing.h \ + pg_compression.h pg_compression_opt.h \ ) # location of Catalog.pm diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index ccde66a7dd..fd733a34a0 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3340,6 +3340,8 @@ static const char *const no_priv_msg[MAX_ACL_KIND] = gettext_noop("permission denied for publication %s"), /* ACL_KIND_SUBSCRIPTION */ gettext_noop("permission denied for subscription %s"), + /* ACL_KIND_COMPRESSION_METHOD */ + gettext_noop("permission denied for compression method %s"), }; static const char *const not_owner_msg[MAX_ACL_KIND] = diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 6fffc290fa..faaf7fb7f1 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -28,6 +28,8 @@ #include "catalog/pg_cast.h" #include "catalog/pg_collation.h" #include "catalog/pg_collation_fn.h" +#include "catalog/pg_compression.h" +#include "catalog/pg_compression_opt.h" #include "catalog/pg_constraint.h" #include "catalog/pg_constraint_fn.h" #include "catalog/pg_conversion.h" @@ -173,7 +175,9 @@ static const Oid object_classes[] = { PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ - TransformRelationId /* OCLASS_TRANSFORM */ + TransformRelationId, /* OCLASS_TRANSFORM */ + CompressionMethodRelationId, /* OCLASS_COMPRESSION_METHOD */ + CompressionOptRelationId, /* OCLASS_COMPRESSION_OPTIONS */ }; @@ -1271,6 +1275,14 @@ doDeletion(const ObjectAddress *object, int flags) DropTransformById(object->objectId); break; + case OCLASS_COMPRESSION_METHOD: + RemoveCompressionMethodById(object->objectId); + break; + + case OCLASS_COMPRESSION_OPTIONS: + RemoveCompressionOptionsById(object->objectId); + break; + /* * These global object types are not supported here. */ @@ -2459,6 +2471,12 @@ getObjectClass(const ObjectAddress *object) case TransformRelationId: return OCLASS_TRANSFORM; + + case CompressionMethodRelationId: + return OCLASS_COMPRESSION_METHOD; + + case CompressionOptRelationId: + return OCLASS_COMPRESSION_OPTIONS; } /* shouldn't get here */ diff --git a/src/backend/catalog/genbki.pl b/src/backend/catalog/genbki.pl index 2eebb061b7..36c7fa0484 100644 --- a/src/backend/catalog/genbki.pl +++ b/src/backend/catalog/genbki.pl @@ -449,6 +449,7 @@ sub emit_pgattr_row attisdropped => 'f', attislocal => 't', attinhcount => '0', + attcompression=> '0', attacl => '_null_', attoptions => '_null_', attfdwoptions => '_null_'); diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 05e70818e7..b08e857e3a 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -29,8 +29,10 @@ */ #include "postgres.h" +#include "access/compression.h" #include "access/htup_details.h" #include "access/multixact.h" +#include "access/reloptions.h" #include "access/sysattr.h" #include "access/transam.h" #include "access/xact.h" @@ -44,6 +46,8 @@ #include "catalog/partition.h" #include "catalog/pg_attrdef.h" #include "catalog/pg_collation.h" +#include "catalog/pg_compression.h" +#include "catalog/pg_compression_opt.h" #include "catalog/pg_constraint.h" #include "catalog/pg_constraint_fn.h" #include "catalog/pg_foreign_table.h" @@ -628,6 +632,7 @@ InsertPgAttributeTuple(Relation pg_attribute_rel, values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal); values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount); values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation); + values[Anum_pg_attribute_attcompression - 1] = ObjectIdGetDatum(new_attribute->attcompression); /* start out with empty permissions and empty options */ nulls[Anum_pg_attribute_attacl - 1] = true; @@ -707,6 +712,13 @@ AddNewAttributeTuples(Oid new_rel_oid, referenced.objectSubId = 0; recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } + + if (OidIsValid(attr->attcompression)) + { + ObjectAddressSet(referenced, CompressionOptRelationId, + attr->attcompression); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } } /* @@ -1453,6 +1465,22 @@ DeleteRelationTuple(Oid relid) heap_close(pg_class_desc, RowExclusiveLock); } +static void +DropAttributeCompression(Form_pg_attribute att) +{ + CompressionMethodRoutine *cmr = GetCompressionRoutine(att->attcompression); + + if (cmr->drop) + { + bool attisdropped = att->attisdropped; + List *options = GetCompressionOptionsList(att->attcompression); + + att->attisdropped = true; + cmr->drop(att, options); + att->attisdropped = attisdropped; + } +} + /* * DeleteAttributeTuples * @@ -1483,7 +1511,14 @@ DeleteAttributeTuples(Oid relid) /* Delete all the matching tuples */ while ((atttup = systable_getnext(scan)) != NULL) + { + Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(atttup); + + if (OidIsValid(att->attcompression)) + DropAttributeCompression(att); + CatalogTupleDelete(attrel, &atttup->t_self); + } /* Clean up after the scan */ systable_endscan(scan); @@ -1576,6 +1611,8 @@ RemoveAttributeById(Oid relid, AttrNumber attnum) else { /* Dropping user attributes is lots harder */ + if (OidIsValid(attStruct->attcompression)) + DropAttributeCompression(attStruct); /* Mark the attribute as dropped */ attStruct->attisdropped = true; @@ -1597,6 +1634,8 @@ RemoveAttributeById(Oid relid, AttrNumber attnum) /* We don't want to keep stats for it anymore */ attStruct->attstattarget = 0; + attStruct->attcompression = InvalidOid; + /* * Change the column name to something that isn't likely to conflict */ diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index c7b2f031f0..11dc107ab7 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -393,6 +393,7 @@ ConstructTupleDescriptor(Relation heapRelation, to->atttypmod = exprTypmod(indexkey); to->attislocal = true; to->attcollation = collationObjectId[i]; + to->attcompression = InvalidOid; ReleaseSysCache(tuple); @@ -471,6 +472,7 @@ ConstructTupleDescriptor(Relation heapRelation, to->attbyval = typeTup->typbyval; to->attalign = typeTup->typalign; to->attstorage = typeTup->typstorage; + to->attcompression = InvalidOid; ReleaseSysCache(tuple); } diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 6cac2dfd1d..d2b5285a5a 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -29,6 +29,8 @@ #include "catalog/pg_default_acl.h" #include "catalog/pg_event_trigger.h" #include "catalog/pg_collation.h" +#include "catalog/pg_compression.h" +#include "catalog/pg_compression_opt.h" #include "catalog/pg_constraint.h" #include "catalog/pg_constraint_fn.h" #include "catalog/pg_conversion.h" @@ -490,6 +492,30 @@ static const ObjectPropertyType ObjectProperty[] = InvalidAttrNumber, /* no ACL (same as relation) */ ACL_KIND_STATISTICS, true + }, + { + CompressionMethodRelationId, + CompressionMethodOidIndexId, + COMPRESSIONMETHODOID, + COMPRESSIONMETHODNAME, + Anum_pg_compression_cmname, + InvalidAttrNumber, + InvalidAttrNumber, + InvalidAttrNumber, + -1, + true + }, + { + CompressionOptRelationId, + CompressionOptionsOidIndexId, + COMPRESSIONOPTIONSOID, + -1, + InvalidAttrNumber, + InvalidAttrNumber, + InvalidAttrNumber, + InvalidAttrNumber, + -1, + false, } }; @@ -712,6 +738,10 @@ static const struct object_type_map /* OBJECT_STATISTIC_EXT */ { "statistics object", OBJECT_STATISTIC_EXT + }, + /* OCLASS_COMPRESSION_METHOD */ + { + "compression method", OBJECT_COMPRESSION_METHOD } }; @@ -876,6 +906,7 @@ get_object_address(ObjectType objtype, Node *object, case OBJECT_ACCESS_METHOD: case OBJECT_PUBLICATION: case OBJECT_SUBSCRIPTION: + case OBJECT_COMPRESSION_METHOD: address = get_object_address_unqualified(objtype, (Value *) object, missing_ok); break; @@ -1182,6 +1213,11 @@ get_object_address_unqualified(ObjectType objtype, address.objectId = get_subscription_oid(name, missing_ok); address.objectSubId = 0; break; + case OBJECT_COMPRESSION_METHOD: + address.classId = CompressionMethodRelationId; + address.objectId = get_compression_method_oid(name, missing_ok); + address.objectSubId = 0; + break; default: elog(ERROR, "unrecognized objtype: %d", (int) objtype); /* placate compiler, which doesn't know elog won't return */ @@ -2139,6 +2175,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_SCHEMA: case OBJECT_SUBSCRIPTION: case OBJECT_TABLESPACE: + case OBJECT_COMPRESSION_METHOD: if (list_length(name) != 1) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -2395,12 +2432,14 @@ check_object_ownership(Oid roleid, ObjectType objtype, ObjectAddress address, case OBJECT_TSPARSER: case OBJECT_TSTEMPLATE: case OBJECT_ACCESS_METHOD: + case OBJECT_COMPRESSION_METHOD: /* We treat these object types as being owned by superusers */ if (!superuser_arg(roleid)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser"))); break; + case OBJECT_STATISTIC_EXT: if (!pg_statistics_object_ownercheck(address.objectId, roleid)) aclcheck_error_type(ACLCHECK_NOT_OWNER, address.objectId); @@ -3398,6 +3437,27 @@ getObjectDescription(const ObjectAddress *object) break; } + case OCLASS_COMPRESSION_METHOD: + { + char *name = get_compression_method_name(object->objectId); + + if (!name) + elog(ERROR, "cache lookup failed for compression method %u", + object->objectId); + appendStringInfo(&buffer, _("compression method %s"), name); + pfree(name); + break; + } + + case OCLASS_COMPRESSION_OPTIONS: + { + char *name = get_compression_method_name_for_opt(object->objectId); + + appendStringInfo(&buffer, _("compression options for %s"), name); + pfree(name); + break; + } + case OCLASS_TRANSFORM: { HeapTuple trfTup; @@ -3919,6 +3979,14 @@ getObjectTypeDescription(const ObjectAddress *object) appendStringInfoString(&buffer, "transform"); break; + case OCLASS_COMPRESSION_METHOD: + appendStringInfoString(&buffer, "compression method"); + break; + + case OCLASS_COMPRESSION_OPTIONS: + appendStringInfoString(&buffer, "compression options"); + break; + /* * There's intentionally no default: case here; we want the * compiler to warn if a new OCLASS hasn't been handled above. @@ -4160,6 +4228,30 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_COMPRESSION_METHOD: + { + char *cmname = get_compression_method_name(object->objectId); + + if (!cmname) + elog(ERROR, "cache lookup failed for compression method %u", + object->objectId); + appendStringInfoString(&buffer, quote_identifier(cmname)); + if (objname) + *objname = list_make1(cmname); + else + pfree(cmname); + break; + } + + case OCLASS_COMPRESSION_OPTIONS: + { + appendStringInfo(&buffer, "%u", + object->objectId); + if (objname) + *objname = list_make1(psprintf("%u", object->objectId)); + break; + } + case OCLASS_CONSTRAINT: { HeapTuple conTup; diff --git a/src/backend/catalog/pg_type.c b/src/backend/catalog/pg_type.c index 59ffd2104d..6e806b3334 100644 --- a/src/backend/catalog/pg_type.c +++ b/src/backend/catalog/pg_type.c @@ -22,6 +22,7 @@ #include "catalog/indexing.h" #include "catalog/objectaccess.h" #include "catalog/pg_collation.h" +#include "catalog/pg_compression.h" #include "catalog/pg_namespace.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" @@ -118,6 +119,7 @@ TypeShellMake(const char *typeName, Oid typeNamespace, Oid ownerId) values[Anum_pg_type_typtypmod - 1] = Int32GetDatum(-1); values[Anum_pg_type_typndims - 1] = Int32GetDatum(0); values[Anum_pg_type_typcollation - 1] = ObjectIdGetDatum(InvalidOid); + values[Anum_pg_type_typdefaultcm - 1] = ObjectIdGetDatum(InvalidOid); nulls[Anum_pg_type_typdefaultbin - 1] = true; nulls[Anum_pg_type_typdefault - 1] = true; nulls[Anum_pg_type_typacl - 1] = true; @@ -362,6 +364,7 @@ TypeCreate(Oid newTypeOid, values[Anum_pg_type_typtypmod - 1] = Int32GetDatum(typeMod); values[Anum_pg_type_typndims - 1] = Int32GetDatum(typNDims); values[Anum_pg_type_typcollation - 1] = ObjectIdGetDatum(typeCollation); + values[Anum_pg_type_typdefaultcm - 1] = ObjectIdGetDatum(InvalidOid); /* * initialize the default binary value for this type. Check for nulls of diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 4a6c99e090..e23abf64f1 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -13,8 +13,8 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \ - collationcmds.o constraint.o conversioncmds.o copy.o createas.o \ - dbcommands.o define.o discard.o dropcmds.o \ + collationcmds.o compressioncmds.o constraint.o conversioncmds.o copy.o \ + createas.o dbcommands.o define.o discard.o dropcmds.o \ event_trigger.o explain.o extension.o foreigncmds.o functioncmds.o \ indexcmds.o lockcmds.o matview.o operatorcmds.o opclasscmds.o \ policy.o portalcmds.o prepare.o proclang.o publicationcmds.o \ diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 4f8147907c..4f18e4083f 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -385,6 +385,7 @@ ExecRenameStmt(RenameStmt *stmt) case OBJECT_TSTEMPLATE: case OBJECT_PUBLICATION: case OBJECT_SUBSCRIPTION: + case OBJECT_COMPRESSION_METHOD: { ObjectAddress address; Relation catalog; @@ -500,6 +501,7 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt, case OBJECT_TSDICTIONARY: case OBJECT_TSPARSER: case OBJECT_TSTEMPLATE: + case OBJECT_COMPRESSION_METHOD: { Relation catalog; Relation relation; @@ -627,6 +629,8 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: + case OCLASS_COMPRESSION_METHOD: + case OCLASS_COMPRESSION_OPTIONS: /* ignore object types that don't have schema-qualified names */ break; diff --git a/src/backend/commands/compressioncmds.c b/src/backend/commands/compressioncmds.c new file mode 100644 index 0000000000..32cbcc3efe --- /dev/null +++ b/src/backend/commands/compressioncmds.c @@ -0,0 +1,485 @@ +/*------------------------------------------------------------------------- + * + * compressioncmds.c + * Routines for SQL commands that manipulate compression methods. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/commands/compressioncmds.c + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "miscadmin.h" + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/compression.h" +#include "access/reloptions.h" +#include "catalog/dependency.h" +#include "catalog/indexing.h" +#include "catalog/pg_compression.h" +#include "catalog/pg_compression_opt.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "parser/parse_func.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/syscache.h" + +static CompressionMethodRoutine *get_compression_method_routine(Oid cmhandler, Oid typeid); + +/* + * Convert a handler function name to an Oid. If the return type of the + * function doesn't match the given AM type, an error is raised. + * + * This function either return valid function Oid or throw an error. + */ +static Oid +LookupCompressionHandlerFunc(List *handlerName) +{ + static const Oid funcargtypes[1] = {INTERNALOID}; + Oid handlerOid; + + if (handlerName == NIL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("handler function is not specified"))); + + /* handlers have one argument of type internal */ + handlerOid = LookupFuncName(handlerName, 1, funcargtypes, false); + + /* check that handler has the correct return type */ + if (get_func_rettype(handlerOid) != COMPRESSION_HANDLEROID) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("function %s must return type %s", + NameListToString(handlerName), + "compression_handler"))); + + return handlerOid; +} + +static ObjectAddress +CreateCompressionMethod(char *cmName, List *handlerName) +{ + Relation rel; + ObjectAddress myself; + ObjectAddress referenced; + Oid cmoid; + Oid cmhandler; + bool nulls[Natts_pg_compression]; + Datum values[Natts_pg_compression]; + HeapTuple tup; + + rel = heap_open(CompressionMethodRelationId, RowExclusiveLock); + + /* Check if name is used */ + cmoid = GetSysCacheOid1(COMPRESSIONMETHODNAME, CStringGetDatum(cmName)); + if (OidIsValid(cmoid)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("compression method \"%s\" already exists", + cmName))); + + /* + * Get the handler function oid and compression method routine + */ + cmhandler = LookupCompressionHandlerFunc(handlerName); + + /* + * Insert tuple into pg_compression. + */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + values[Anum_pg_compression_cmname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(cmName)); + values[Anum_pg_compression_cmhandler - 1] = ObjectIdGetDatum(cmhandler); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + cmoid = CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + ObjectAddressSet(myself, CompressionMethodRelationId, cmoid); + + /* Record dependency on handler function */ + ObjectAddressSet(referenced, ProcedureRelationId, cmhandler); + + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + recordDependencyOnCurrentExtension(&myself, false); + + heap_close(rel, RowExclusiveLock); + + return myself; +} + +ObjectAddress +DefineCompressionMethod(List *names, List *parameters) +{ + char *cmName; + ListCell *pl; + DefElem *handlerEl = NULL; + + if (list_length(names) != 1) + elog(ERROR, "compression method name cannot be qualified"); + + cmName = strVal(linitial(names)); + + /* Must be super user */ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to create compression method \"%s\"", + cmName), + errhint("Must be superuser to create an compression method."))); + + foreach(pl, parameters) + { + DefElem *defel = (DefElem *) lfirst(pl); + DefElem **defelp; + + if (pg_strcasecmp(defel->defname, "handler") == 0) + defelp = &handlerEl; + else + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("compression method attribute \"%s\" not recognized", + defel->defname))); + break; + } + + *defelp = defel; + } + + if (!handlerEl) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("compression method handler is not specified"))); + + return CreateCompressionMethod(cmName, (List *) handlerEl->arg); +} + +void +RemoveCompressionMethodById(Oid cmOid) +{ + Relation relation; + HeapTuple tup; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to drop compression methods"))); + + relation = heap_open(CompressionMethodRelationId, RowExclusiveLock); + + tup = SearchSysCache1(COMPRESSIONMETHODOID, ObjectIdGetDatum(cmOid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for compression method %u", cmOid); + + CatalogTupleDelete(relation, &tup->t_self); + + ReleaseSysCache(tup); + + heap_close(relation, RowExclusiveLock); +} + +/* + * Create new record in pg_compression_opt + */ +Oid +CreateCompressionOptions(Form_pg_attribute attr, Oid cmid, List *options) +{ + Relation rel; + HeapTuple tuple; + Oid result, + cmhandler; + Datum values[Natts_pg_compression_opt]; + bool nulls[Natts_pg_compression_opt]; + ObjectAddress myself, + ref1, + ref2, + ref3; + CompressionMethodRoutine *routine; + + /* Initialize buffers for new tuple values */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + /* Get handler function OID for the compression method */ + tuple = SearchSysCache1(COMPRESSIONMETHODOID, ObjectIdGetDatum(cmid)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for compression method %u", cmid); + cmhandler = ((Form_pg_compression) GETSTRUCT(tuple))->cmhandler; + ReleaseSysCache(tuple); + + routine = get_compression_method_routine(cmhandler, attr->atttypid); + + if (routine->configure && options != NIL) + routine->configure(attr, options); + + values[Anum_pg_compression_opt_cmid - 1] = ObjectIdGetDatum(cmid); + values[Anum_pg_compression_opt_cmhandler - 1] = ObjectIdGetDatum(cmhandler); + + if (options) + values[Anum_pg_compression_opt_cmoptions - 1] = optionListToArray(options); + else + nulls[Anum_pg_compression_opt_cmoptions - 1] = true; + + rel = heap_open(CompressionOptRelationId, RowExclusiveLock); + tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); + result = CatalogTupleInsert(rel, tuple); + heap_freetuple(tuple); + + ObjectAddressSet(myself, CompressionOptRelationId, result); + ObjectAddressSet(ref1, ProcedureRelationId, cmhandler); + ObjectAddressSubSet(ref2, RelationRelationId, attr->attrelid, attr->attnum); + ObjectAddressSet(ref3, CompressionMethodRelationId, cmid); + + recordDependencyOn(&myself, &ref1, DEPENDENCY_NORMAL); + recordDependencyOn(&ref2, &myself, DEPENDENCY_NORMAL); + recordDependencyOn(&myself, &ref3, DEPENDENCY_NORMAL); + recordDependencyOnCurrentExtension(&myself, false); + heap_close(rel, RowExclusiveLock); + + return result; +} + +void +RemoveCompressionOptionsById(Oid cmoptoid) +{ + Relation relation; + HeapTuple tup; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to drop compression options"))); + + relation = heap_open(CompressionOptRelationId, RowExclusiveLock); + + tup = SearchSysCache1(COMPRESSIONOPTIONSOID, ObjectIdGetDatum(cmoptoid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for compression options %u", cmoptoid); + + CatalogTupleDelete(relation, &tup->t_self); + ReleaseSysCache(tup); + heap_close(relation, RowExclusiveLock); +} + +ColumnCompression * +GetColumnCompressionForAttribute(Form_pg_attribute att) +{ + HeapTuple tuple; + Form_pg_compression_opt cmopt; + ColumnCompression *compression = makeNode(ColumnCompression); + + /* Get handler function OID for the compression method */ + tuple = SearchSysCache1(COMPRESSIONOPTIONSOID, + ObjectIdGetDatum(att->attcompression)); + + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for compression options %u", + att->attcompression); + + cmopt = (Form_pg_compression_opt) GETSTRUCT(tuple); + compression->methodName = NULL; + compression->methodOid = cmopt->cmid; + compression->options = GetCompressionOptionsList(att->attcompression); + ReleaseSysCache(tuple); + + return compression; +} + +void +CheckCompressionMismatch(ColumnCompression * c1, ColumnCompression * c2, + const char *attributeName) +{ + char *cmname1 = c1->methodName ? c1->methodName : + get_compression_method_name(c1->methodOid); + char *cmname2 = c2->methodName ? c2->methodName : + get_compression_method_name(c2->methodOid); + + if (strcmp(cmname1, cmname2)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" has a compression method conflict", + attributeName), + errdetail("%s versus %s", cmname1, cmname2))); + + if (!equal(c1->options, c2->options)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" has a compression options conflict", + attributeName), + errdetail("(%s) versus (%s)", + formatRelOptions(c1->options), + formatRelOptions(c2->options)))); +} + +/* + * get_compression_method_oid + * + * If missing_ok is false, throw an error if compression method not found. + * If missing_ok is true, just return InvalidOid. + */ +Oid +get_compression_method_oid(const char *cmname, bool missing_ok) +{ + HeapTuple tup; + Oid oid = InvalidOid; + + tup = SearchSysCache1(COMPRESSIONMETHODNAME, CStringGetDatum(cmname)); + if (HeapTupleIsValid(tup)) + { + oid = HeapTupleGetOid(tup); + ReleaseSysCache(tup); + } + + if (!OidIsValid(oid) && !missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("compression method \"%s\" does not exist", cmname))); + + return oid; +} + +/* + * get_compression_method_name + * + * given an compression method OID, look up its name. + */ +char * +get_compression_method_name(Oid cmOid) +{ + HeapTuple tup; + char *result = NULL; + + tup = SearchSysCache1(COMPRESSIONMETHODOID, ObjectIdGetDatum(cmOid)); + if (HeapTupleIsValid(tup)) + { + Form_pg_compression cmform = (Form_pg_compression) GETSTRUCT(tup); + + result = pstrdup(NameStr(cmform->cmname)); + ReleaseSysCache(tup); + } + return result; +} + +/* + * get_compression_method_name_for_opt + * + * given an compression options OID, look up a name for used compression method. + */ +char * +get_compression_method_name_for_opt(Oid cmoptoid) +{ + HeapTuple tup; + Oid cmid; + char *result = NULL; + Form_pg_compression cmform; + + tup = SearchSysCache1(COMPRESSIONOPTIONSOID, ObjectIdGetDatum(cmoptoid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for compression options %u", cmoptoid); + + cmid = ((Form_pg_compression_opt) GETSTRUCT(tup))->cmid; + ReleaseSysCache(tup); + + /* now we can get the name */ + tup = SearchSysCache1(COMPRESSIONMETHODOID, ObjectIdGetDatum(cmid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for compression method %u", cmid); + + cmform = (Form_pg_compression) GETSTRUCT(tup); + result = pstrdup(NameStr(cmform->cmname)); + ReleaseSysCache(tup); + return result; +} + +/* get_compression_options */ +List * +GetCompressionOptionsList(Oid cmoptoid) +{ + HeapTuple tuple; + List *result = NULL; + bool isnull; + Datum cmoptions; + + /* Get handler function OID for the compression method */ + tuple = SearchSysCache1(COMPRESSIONOPTIONSOID, ObjectIdGetDatum(cmoptoid)); + + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for compression options %u", cmoptoid); + + cmoptions = SysCacheGetAttr(COMPRESSIONOPTIONSOID, tuple, + Anum_pg_compression_opt_cmoptions, &isnull); + + if (!isnull) + result = untransformRelOptions(cmoptions); + + ReleaseSysCache(tuple); + return result; +} + +CompressionMethodRoutine * +GetCompressionRoutine(Oid cmoptoid) +{ + HeapTuple tuple; + Form_pg_compression_opt cmopt; + regproc cmhandler; + + /* Get handler function OID for the compression method */ + tuple = SearchSysCache1(COMPRESSIONOPTIONSOID, ObjectIdGetDatum(cmoptoid)); + + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for compression options %u", cmoptoid); + + cmopt = (Form_pg_compression_opt) GETSTRUCT(tuple); + cmhandler = cmopt->cmhandler; + + /* Complain if handler OID is invalid */ + if (!RegProcedureIsValid(cmhandler)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find compression method handler for compression options %u", + cmoptoid))); + + ReleaseSysCache(tuple); + + /* And finally, call the handler function to get the API struct. */ + return get_compression_method_routine(cmhandler, InvalidOid); +} + +/* + * Call the specified compression method handler + * routine to get its CompressionMethodRoutine struct, + * which will be palloc'd in the caller's context. + */ +static CompressionMethodRoutine * +get_compression_method_routine(Oid cmhandler, Oid typeid) +{ + Datum datum; + CompressionMethodRoutine *routine; + CompressionMethodOpArgs opargs; + + opargs.typeid = typeid; + opargs.cmhanderid = cmhandler; + + datum = OidFunctionCall1(cmhandler, PointerGetDatum(&opargs)); + routine = (CompressionMethodRoutine *) DatumGetPointer(datum); + + if (routine == NULL || !IsA(routine, CompressionMethodRoutine)) + elog(ERROR, "compression method handler function %u " + "did not return an CompressionMethodRoutine struct", + cmhandler); + + return routine; +} diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cfa3f059c2..8cef3be086 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -2758,8 +2758,8 @@ CopyFrom(CopyState cstate) List *recheckIndexes = NIL; /* OK, store the tuple and create index entries for it */ - heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid, - hi_options, bistate); + heap_insert(resultRelInfo->ri_RelationDesc, tuple, + mycid, hi_options, bistate); if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(slot, diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index e60210cb24..010bdb644b 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -112,7 +112,8 @@ create_ctas_internal(List *attrList, IntoClause *into) * Create the relation. (This will error out if there's an existing view, * so we don't need more code to complain if "replace" is false.) */ - intoRelationAddr = DefineRelation(create, relkind, InvalidOid, NULL, NULL); + intoRelationAddr = DefineRelation(create, relkind, InvalidOid, NULL, NULL, + NULL); /* * If necessary, create a TOAST table for the target table. Note that diff --git a/src/backend/commands/dropcmds.c b/src/backend/commands/dropcmds.c index 2b30677d6f..8611db22ec 100644 --- a/src/backend/commands/dropcmds.c +++ b/src/backend/commands/dropcmds.c @@ -275,6 +275,10 @@ does_not_exist_skipping(ObjectType objtype, Node *object) name = NameListToString(castNode(List, object)); } break; + case OBJECT_COMPRESSION_METHOD: + msg = gettext_noop("compression method \"%s\" does not exist, skipping"); + name = strVal((Value *) object); + break; case OBJECT_CONVERSION: if (!schema_does_not_exist_skipping(castNode(List, object), &msg, &name)) { diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 938133bbe4..f520c7fe7b 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -91,6 +91,7 @@ static event_trigger_support_data event_trigger_support[] = { {"CAST", true}, {"CONSTRAINT", true}, {"COLLATION", true}, + {"COMPRESSION METHOD", true}, {"CONVERSION", true}, {"DATABASE", false}, {"DOMAIN", true}, @@ -1085,6 +1086,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_CAST: case OBJECT_COLUMN: case OBJECT_COLLATION: + case OBJECT_COMPRESSION_METHOD: case OBJECT_CONVERSION: case OBJECT_DEFACL: case OBJECT_DEFAULT: @@ -1144,6 +1146,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_DATABASE: case OCLASS_TBLSPACE: case OCLASS_ROLE: + case OCLASS_COMPRESSION_OPTIONS: /* no support for global objects */ return false; case OCLASS_EVENT_TRIGGER: @@ -1183,6 +1186,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: + case OCLASS_COMPRESSION_METHOD: return true; /* diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index 9ad991507f..b840309d03 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -61,7 +61,7 @@ static void import_error_callback(void *arg); * processing, hence any validation should be done before this * conversion. */ -static Datum +Datum optionListToArray(List *options) { ArrayBuildState *astate = NULL; diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 62937124ef..857eb14b64 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -212,7 +212,8 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq) stmt->tablespacename = NULL; stmt->if_not_exists = seq->if_not_exists; - address = DefineRelation(stmt, RELKIND_SEQUENCE, seq->ownerId, NULL, NULL); + address = DefineRelation(stmt, RELKIND_SEQUENCE, seq->ownerId, NULL, NULL, + NULL); seqoid = address.objectId; Assert(seqoid != InvalidOid); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 2e389e08da..11c5eea243 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -14,6 +14,7 @@ */ #include "postgres.h" +#include "access/compression.h" #include "access/genam.h" #include "access/heapam.h" #include "access/multixact.h" @@ -33,6 +34,8 @@ #include "catalog/partition.h" #include "catalog/pg_am.h" #include "catalog/pg_collation.h" +#include "catalog/pg_compression.h" +#include "catalog/pg_compression_opt.h" #include "catalog/pg_constraint.h" #include "catalog/pg_constraint_fn.h" #include "catalog/pg_depend.h" @@ -41,6 +44,7 @@ #include "catalog/pg_inherits_fn.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" +#include "catalog/pg_proc.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" @@ -90,6 +94,7 @@ #include "storage/smgr.h" #include "utils/acl.h" #include "utils/builtins.h" +#include "utils/datum.h" #include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -458,6 +463,8 @@ static void ATExecGenericOptions(Relation rel, List *options); static void ATExecEnableRowSecurity(Relation rel); static void ATExecDisableRowSecurity(Relation rel); static void ATExecForceNoForceRowSecurity(Relation rel, bool force_rls); +static void ATExecAlterColumnCompression(AlteredTableInfo *tab, Relation rel, + const char *column, ColumnCompression * compression, LOCKMODE lockmode); static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, ForkNumber forkNum, char relpersistence); @@ -502,7 +509,8 @@ static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name); */ ObjectAddress DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, - ObjectAddress *typaddress, const char *queryString) + ObjectAddress *typaddress, const char *queryString, + Node **pAlterStmt) { char relname[NAMEDATALEN]; Oid namespaceId; @@ -522,6 +530,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, static char *validnsps[] = HEAP_RELOPT_NAMESPACES; Oid ofTypeId; ObjectAddress address; + AlterTableStmt *alterStmt = NULL; /* * Truncate relname to appropriate length (probably a waste of time, as @@ -723,6 +732,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, if (colDef->identity) attr->attidentity = colDef->identity; + + transformColumnCompression(colDef, stmt->relation, &alterStmt); } /* @@ -920,6 +931,11 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, */ relation_close(rel, NoLock); + if (pAlterStmt) + *pAlterStmt = (Node *) alterStmt; + else + Assert(!alterStmt); + return address; } @@ -1609,6 +1625,7 @@ storage_name(char c) } } + /*---------- * MergeAttributes * Returns new schema given initial schema and superclasses. @@ -1943,6 +1960,19 @@ MergeAttributes(List *schema, List *supers, char relpersistence, storage_name(def->storage), storage_name(attribute->attstorage)))); + if (OidIsValid(attribute->attcompression)) + { + ColumnCompression *compression = + GetColumnCompressionForAttribute(attribute); + + if (!def->compression) + def->compression = compression; + else + CheckCompressionMismatch(def->compression, + compression, + attributeName); + } + def->inhcount++; /* Merge of NOT NULL constraints = OR 'em together */ def->is_not_null |= attribute->attnotnull; @@ -1970,6 +2000,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence, def->collOid = attribute->attcollation; def->constraints = NIL; def->location = -1; + if (OidIsValid(attribute->attcompression)) + def->compression = + GetColumnCompressionForAttribute(attribute); inhSchema = lappend(inhSchema, def); newattno[parent_attno - 1] = ++child_attno; } @@ -2179,6 +2212,13 @@ MergeAttributes(List *schema, List *supers, char relpersistence, storage_name(def->storage), storage_name(newdef->storage)))); + if (!def->compression) + def->compression = newdef->compression; + else if (newdef->compression) + CheckCompressionMismatch(def->compression, + newdef->compression, + attributeName); + /* Mark the column as locally defined */ def->is_local = true; /* Merge of NOT NULL constraints = OR 'em together */ @@ -3278,6 +3318,7 @@ AlterTableGetLockLevel(List *cmds) */ case AT_GenericOptions: case AT_AlterColumnGenericOptions: + case AT_AlterColumnCompression: cmd_lockmode = AccessExclusiveLock; break; @@ -3768,6 +3809,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, /* No command-specific prep needed */ pass = AT_PASS_MISC; break; + case AT_AlterColumnCompression: + ATSimplePermissions(rel, ATT_TABLE); + /* FIXME This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -4107,6 +4154,11 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_DetachPartition: ATExecDetachPartition(rel, ((PartitionCmd *) cmd->def)->name); break; + case AT_AlterColumnCompression: + ATExecAlterColumnCompression(tab, rel, cmd->name, + (ColumnCompression *) cmd->def, + lockmode); + break; default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -5325,6 +5377,18 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, attribute.attislocal = colDef->is_local; attribute.attinhcount = colDef->inhcount; attribute.attcollation = collOid; + attribute.attcompression = InvalidOid; + + if (!colDef->compression) + { + /* colDef->compression is handled in subsequent ALTER TABLE statement */ + Oid cmid = get_base_typdefaultcm(typeTuple); + + if (OidIsValid(cmid)) + attribute.attcompression = CreateCompressionOptions(&attribute, + cmid, NULL); + } + /* attribute.attacl is handled by InsertPgAttributeTuple */ ReleaseSysCache(typeTuple); @@ -6425,6 +6489,11 @@ ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE loc errmsg("cannot alter system column \"%s\"", colName))); + if (attrtuple->attcompression && newstorage != 'x' && newstorage != 'm') + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("compressed columns can only have storage MAIN or EXTENDED"))); + /* * safety check: do not allow toasted storage modes unless column datatype * is TOAST-aware. @@ -9070,6 +9139,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, SysScanDesc scan; HeapTuple depTup; ObjectAddress address; + Oid cmid; attrelation = heap_open(AttributeRelationId, RowExclusiveLock); @@ -9345,6 +9415,8 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: + case OCLASS_COMPRESSION_METHOD: + case OCLASS_COMPRESSION_OPTIONS: /* * We don't expect any of these sorts of objects to depend on @@ -9394,7 +9466,9 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, if (!(foundDep->refclassid == TypeRelationId && foundDep->refobjid == attTup->atttypid) && !(foundDep->refclassid == CollationRelationId && - foundDep->refobjid == attTup->attcollation)) + foundDep->refobjid == attTup->attcollation) && + !(foundDep->refclassid == CompressionMethodRelationId && + foundDep->refobjid == attTup->attcompression)) elog(ERROR, "found unexpected dependency for column"); CatalogTupleDelete(depRel, &depTup->t_self); @@ -9417,6 +9491,10 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, attTup->attalign = tform->typalign; attTup->attstorage = tform->typstorage; + cmid = get_base_typdefaultcm(typeTuple); + if (OidIsValid(cmid)) + attTup->attcompression = CreateCompressionOptions(attTup, cmid, NULL); + ReleaseSysCache(typeTuple); CatalogTupleUpdate(attrelation, &heapTup->t_self, heapTup); @@ -12413,6 +12491,86 @@ ATExecGenericOptions(Relation rel, List *options) heap_freetuple(tuple); } +static void +ATExecAlterColumnCompression(AlteredTableInfo *tab, + Relation rel, + const char *column, + ColumnCompression * compression, + LOCKMODE lockmode) +{ + Relation attrel; + HeapTuple atttuple; + Form_pg_attribute atttableform; + AttrNumber attnum; + HeapTuple newtuple; + Datum values[Natts_pg_attribute]; + bool nulls[Natts_pg_attribute]; + bool replace[Natts_pg_attribute]; + + attrel = heap_open(AttributeRelationId, RowExclusiveLock); + + atttuple = SearchSysCacheAttName(RelationGetRelid(rel), column); + if (!HeapTupleIsValid(atttuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + column, RelationGetRelationName(rel)))); + + /* Prevent them from altering a system attribute */ + atttableform = (Form_pg_attribute) GETSTRUCT(atttuple); + attnum = atttableform->attnum; + if (attnum <= 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter system column \"%s\"", column))); + + if (atttableform->attstorage != 'x' && atttableform->attstorage != 'm') + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("storage for \"%s\" should be MAIN or EXTENDED", column))); + + /* Initialize buffers for new tuple values */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replace, false, sizeof(replace)); + + if (compression->methodName || OidIsValid(compression->methodOid)) + { + /* SET COMPRESSED */ + Oid cmid, + cmoptoid; + + cmid = compression->methodName + ? get_compression_method_oid(compression->methodName, false) + : compression->methodOid; + + cmoptoid = CreateCompressionOptions(atttableform, cmid, compression->options); + + values[Anum_pg_attribute_attcompression - 1] = ObjectIdGetDatum(cmoptoid); + replace[Anum_pg_attribute_attcompression - 1] = true; + + } + else + { + /* SET NOT COMPRESSED */ + values[Anum_pg_attribute_attcompression - 1] = ObjectIdGetDatum(InvalidOid); + replace[Anum_pg_attribute_attcompression - 1] = true; + } + + newtuple = heap_modify_tuple(atttuple, RelationGetDescr(attrel), + values, nulls, replace); + CatalogTupleUpdate(attrel, &newtuple->t_self, newtuple); + heap_freetuple(newtuple); + + InvokeObjectPostAlterHook(RelationRelationId, + RelationGetRelid(rel), + atttableform->attnum); + + ReleaseSysCache(atttuple); + heap_close(attrel, RowExclusiveLock); +} + + /* * Preparation phase for SET LOGGED/UNLOGGED * diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index 7ed16aeff4..38a086b584 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -40,6 +40,7 @@ #include "catalog/pg_am.h" #include "catalog/pg_authid.h" #include "catalog/pg_collation.h" +#include "catalog/pg_compression.h" #include "catalog/pg_constraint.h" #include "catalog/pg_constraint_fn.h" #include "catalog/pg_depend.h" @@ -2110,7 +2111,7 @@ DefineCompositeType(RangeVar *typevar, List *coldeflist) * Finally create the relation. This also creates the type. */ DefineRelation(createStmt, RELKIND_COMPOSITE_TYPE, InvalidOid, &address, - NULL); + NULL, NULL); return address; } @@ -3638,3 +3639,100 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid, return oldNspOid; } + + +/* + * Execute ALTER TYPE SET COMPRESSED [WITH (