From f328e79faefb9600ee2e57a40c819f32432b4364 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Tue, 20 Oct 2020 17:57:00 +0530 Subject: [PATCH v9 4/5] Create custom compression methods Provide syntax to create custom compression methods. --- .../sgml/ref/create_compression_method.sgml | 87 +++++++++++++++ src/backend/access/common/detoast.c | 53 ++++++++- src/backend/access/common/toast_internals.c | 15 ++- .../access/compression/compressionapi.c | 2 +- src/backend/access/compression/lz4.c | 23 ++-- src/backend/access/compression/pglz.c | 20 ++-- src/backend/catalog/aclchk.c | 2 + src/backend/catalog/dependency.c | 2 +- src/backend/catalog/objectaddress.c | 22 ++++ src/backend/commands/compressioncmds.c | 103 ++++++++++++++++++ src/backend/commands/event_trigger.c | 3 + src/backend/commands/seclabel.c | 1 + src/backend/executor/nodeModifyTable.c | 3 +- src/backend/nodes/copyfuncs.c | 14 +++ src/backend/nodes/equalfuncs.c | 12 ++ src/backend/parser/gram.y | 20 +++- src/backend/tcop/utility.c | 16 +++ src/include/access/compressionapi.h | 24 ++-- src/include/access/detoast.h | 8 ++ src/include/access/toast_internals.h | 19 +++- src/include/commands/defrem.h | 2 + src/include/nodes/nodes.h | 3 +- src/include/nodes/parsenodes.h | 12 ++ src/include/postgres.h | 8 ++ src/include/tcop/cmdtaglist.h | 2 + src/test/regress/expected/create_cm.out | 18 +++ src/test/regress/sql/create_cm.sql | 7 ++ 27 files changed, 455 insertions(+), 46 deletions(-) create mode 100644 doc/src/sgml/ref/create_compression_method.sgml diff --git a/doc/src/sgml/ref/create_compression_method.sgml b/doc/src/sgml/ref/create_compression_method.sgml new file mode 100644 index 0000000000..e9d076194b --- /dev/null +++ b/doc/src/sgml/ref/create_compression_method.sgml @@ -0,0 +1,87 @@ + + + + + CREATE COMPRESSION METHOD + + + + CREATE COMPRESSION METHOD + 7 + SQL - Language Statements + + + + CREATE COMPRESSION METHOD + define a new compresion method + + + + +CREATE COMPRESSION METHOD name + HANDLER handler_function + + + + + Description + + + CREATE COMPRESSION METHOD creates a new compression method. + PostgreSQL supports two internal + built-in compression methods (pglz + and zlib), and also allows to add more custom compression + methods through this interface. + + + + + The compression method name must be unique within the database. + + + + Only superusers can define new compression methods. + + + + + Parameters + + + + name + + + The name of the compression method to be created. + + + + + + handler_function + + + handler_function is the + name (possibly schema-qualified) of a previously registered function + that represents the access method. The handler function must be declared + to accept a single argument of type internal and to return + the pseudo-type compression_handler. The argument is a + dummy value that simply serves to prevent handler functions from being + called directly from SQL commands. The result of the function must be a + palloc'd struct of type CompressionRoutine, + which contains everything that the core code needs to know to make use of + the compression access method. + The CompressionRoutine struct, also called the + access method's API struct, contains pointers to + support functions for the compression method. These support functions are + plain C functions and are not visible or callable at the SQL level. + + + + + + + diff --git a/src/backend/access/common/detoast.c b/src/backend/access/common/detoast.c index 9fa4e2da1d..174e6f6d5c 100644 --- a/src/backend/access/common/detoast.c +++ b/src/backend/access/common/detoast.c @@ -438,6 +438,37 @@ toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, return result; } +/* ---------- + * toast_get_compression_oid - + * + * Return the Oid of the compresion method stored in the compressed data. + * For built-in methods, we only store the built-in compression method id in + * first 2-bits of the rawsize and that is directly mapped to the compression + * method Oid. And, for the custom compression method we store the Oid of the + * compression method in the custom compression header. + */ +Oid +toast_get_compression_oid(struct varlena *attr) +{ + CompressionId cmid; + + /* + * If it is custom compression id then get the Oid from the custom + * compression header otherwise, directly translate the buil-in compression + * id to compression method Oid. + */ + cmid = TOAST_COMPRESS_METHOD(attr); + if (IsCustomCompression(cmid)) + { + toast_compress_header_custom *hdr; + + hdr = (toast_compress_header_custom *) attr; + return hdr->cmoid; + } + else + return GetCompressionOidFromCompressionId(cmid); +} + /* ---------- * toast_decompress_datum - * @@ -451,12 +482,16 @@ toast_decompress_datum(struct varlena *attr) Assert(VARATT_IS_COMPRESSED(attr)); - cmoid = GetCompressionOidFromCompressionId(TOAST_COMPRESS_METHOD(attr)); + /* get the compression method oid */ + cmoid = toast_get_compression_oid(attr); /* get compression method handler routines */ cmroutine = GetCompressionRoutine(cmoid); - return cmroutine->cmdecompress(attr); + /* call the decompression routine */ + return cmroutine->cmdecompress(attr, + IsCustomCompression(GetCompressionId(cmoid)) ? + TOAST_CUSTOM_COMPRESS_HDRSZ : TOAST_COMPRESS_HDRSZ); } @@ -470,24 +505,30 @@ toast_decompress_datum(struct varlena *attr) static struct varlena * toast_decompress_datum_slice(struct varlena *attr, int32 slicelength) { + Oid cmoid; + int32 header_sz; CompressionRoutine *cmroutine; - Oid cmoid; Assert(VARATT_IS_COMPRESSED(attr)); - cmoid = GetCompressionOidFromCompressionId(TOAST_COMPRESS_METHOD(attr)); + /* get the compression method oid */ + cmoid = toast_get_compression_oid(attr); /* get compression method handler routines */ cmroutine = GetCompressionRoutine(cmoid); + /* get the compression header size */ + header_sz = IsCustomCompression(GetCompressionId(cmoid)) ? + TOAST_CUSTOM_COMPRESS_HDRSZ : TOAST_COMPRESS_HDRSZ; + /* * If the handler supports the slice decompression then decompress the * slice otherwise decompress complete data. */ if (cmroutine->cmdecompress_slice) - return cmroutine->cmdecompress_slice(attr, slicelength); + return cmroutine->cmdecompress_slice(attr, header_sz, slicelength); else - return cmroutine->cmdecompress(attr); + return cmroutine->cmdecompress(attr, header_sz); } /* ---------- diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c index 7c603dd19a..d69dd907c9 100644 --- a/src/backend/access/common/toast_internals.c +++ b/src/backend/access/common/toast_internals.c @@ -38,10 +38,14 @@ static bool toastid_valueid_exists(Oid toastrelid, Oid valueid); * -------- */ void -toast_set_compressed_datum_info(struct varlena *val, CompressionId cmid, int32 rawsize) +toast_set_compressed_datum_info(struct varlena *val, CompressionId cmid, + Oid cmoid, int32 rawsize) { TOAST_COMPRESS_SET_RAWSIZE(val, rawsize); TOAST_COMPRESS_SET_COMPRESSION_METHOD(val, cmid); + + if (IsCustomCompression(cmid)) + TOAST_COMPRESS_SET_CMID(val, cmoid); } /* ---------- @@ -62,6 +66,7 @@ toast_compress_datum(Datum value, Oid cmoid) { struct varlena *tmp = NULL; int32 valsize; + CompressionId cmid; CompressionRoutine *cmroutine; Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value))); @@ -73,9 +78,12 @@ toast_compress_datum(Datum value, Oid cmoid) /* Get the compression routines for the compression method */ cmroutine = GetCompressionRoutine(cmoid); + cmid = GetCompressionId(cmoid); /* Call the actual compression function */ - tmp = cmroutine->cmcompress((const struct varlena *) value); + tmp = cmroutine->cmcompress((const struct varlena *) value, + IsCustomCompression(cmid) ? + TOAST_CUSTOM_COMPRESS_HDRSZ : TOAST_COMPRESS_HDRSZ); if (!tmp) return PointerGetDatum(NULL); @@ -94,8 +102,7 @@ toast_compress_datum(Datum value, Oid cmoid) if (VARSIZE(tmp) < valsize - 2) { /* successful compression */ - toast_set_compressed_datum_info(tmp, GetCompressionId(cmoid), - valsize); + toast_set_compressed_datum_info(tmp, cmid, cmoid, valsize); return PointerGetDatum(tmp); } else diff --git a/src/backend/access/compression/compressionapi.c b/src/backend/access/compression/compressionapi.c index ee57261213..cf91b1c69b 100644 --- a/src/backend/access/compression/compressionapi.c +++ b/src/backend/access/compression/compressionapi.c @@ -129,6 +129,6 @@ GetCompressionId(Oid cmoid) case LZ4_COMPRESSION_OID: return LZ4_COMPRESSION_ID; default: - elog(ERROR, "Invalid compression method %d", cmoid); + return CUSTOM_COMPRESSION_ID; } } diff --git a/src/backend/access/compression/lz4.c b/src/backend/access/compression/lz4.c index edc8f5ed22..52c167a35d 100644 --- a/src/backend/access/compression/lz4.c +++ b/src/backend/access/compression/lz4.c @@ -29,7 +29,7 @@ * compressed varlena, or NULL if compression fails. */ static struct varlena * -lz4_cmcompress(const struct varlena *value) +lz4_cmcompress(const struct varlena *value, int32 header_size) { int32 valsize; int32 len; @@ -39,10 +39,10 @@ lz4_cmcompress(const struct varlena *value) valsize = VARSIZE_ANY_EXHDR(value); max_size = LZ4_compressBound(VARSIZE_ANY_EXHDR(value)); - tmp = (struct varlena *) palloc(max_size + TOAST_COMPRESS_HDRSZ); + tmp = (struct varlena *) palloc(max_size + header_size); len = LZ4_compress_default(VARDATA_ANY(value), - (char *) tmp + TOAST_COMPRESS_HDRSZ, + (char *) tmp + header_size, valsize, max_size); if (len <= 0) { @@ -50,7 +50,7 @@ lz4_cmcompress(const struct varlena *value) elog(ERROR, "lz4: could not compress data"); } - SET_VARSIZE_COMPRESSED(tmp, len + TOAST_COMPRESS_HDRSZ); + SET_VARSIZE_COMPRESSED(tmp, len + header_size); return tmp; } @@ -60,7 +60,7 @@ lz4_cmcompress(const struct varlena *value) * Returns the decompressed varlena. */ static struct varlena * -lz4_cmdecompress(const struct varlena *value) +lz4_cmdecompress(const struct varlena *value, int32 header_size) { int32 rawsize; struct varlena *result; @@ -68,9 +68,9 @@ lz4_cmdecompress(const struct varlena *value) result = (struct varlena *) palloc(TOAST_COMPRESS_RAWSIZE(value) + VARHDRSZ); SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(value) + VARHDRSZ); - rawsize = LZ4_decompress_safe((char *) value + TOAST_COMPRESS_HDRSZ, + rawsize = LZ4_decompress_safe((char *) value + header_size, VARDATA(result), - VARSIZE(value) - TOAST_COMPRESS_HDRSZ, + VARSIZE(value) - header_size, TOAST_COMPRESS_RAWSIZE(value)); if (rawsize < 0) elog(ERROR, "lz4: compressed data is corrupted"); @@ -86,17 +86,18 @@ lz4_cmdecompress(const struct varlena *value) * Decompresses part of the data. Returns the decompressed varlena. */ static struct varlena * -lz4_cmdecompress_slice(const struct varlena *value, int32 slicelength) +lz4_cmdecompress_slice(const struct varlena *value, int32 header_size, + int32 slicelength) { int32 rawsize; struct varlena *result; - result = (struct varlena *)palloc(slicelength + VARHDRSZ); + result = (struct varlena *) palloc(slicelength + VARHDRSZ); SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(value) + VARHDRSZ); - rawsize = LZ4_decompress_safe_partial((char *) value + TOAST_COMPRESS_HDRSZ, + rawsize = LZ4_decompress_safe_partial((char *) value + header_size, VARDATA(result), - VARSIZE(value) - TOAST_COMPRESS_HDRSZ, + VARSIZE(value) - header_size, slicelength, slicelength); if (rawsize < 0) diff --git a/src/backend/access/compression/pglz.c b/src/backend/access/compression/pglz.c index 61dfe42004..931394f779 100644 --- a/src/backend/access/compression/pglz.c +++ b/src/backend/access/compression/pglz.c @@ -27,7 +27,7 @@ * compressed varlena, or NULL if compression fails. */ static struct varlena * -pglz_cmcompress(const struct varlena *value) +pglz_cmcompress(const struct varlena *value, int32 header_size) { int32 valsize, len; @@ -44,16 +44,16 @@ pglz_cmcompress(const struct varlena *value) return NULL; tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) + - TOAST_COMPRESS_HDRSZ); + header_size); len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize, - TOAST_COMPRESS_RAWDATA(tmp), + (char *) tmp + header_size, NULL); if (len >= 0) { - SET_VARSIZE_COMPRESSED(tmp, len + TOAST_COMPRESS_HDRSZ); + SET_VARSIZE_COMPRESSED(tmp, len + header_size); return tmp; } @@ -68,7 +68,7 @@ pglz_cmcompress(const struct varlena *value) * Returns the decompressed varlena. */ static struct varlena * -pglz_cmdecompress(const struct varlena *value) +pglz_cmdecompress(const struct varlena *value, int32 header_size) { struct varlena *result; int32 rawsize; @@ -76,8 +76,8 @@ pglz_cmdecompress(const struct varlena *value) result = (struct varlena *) palloc(TOAST_COMPRESS_RAWSIZE(value) + VARHDRSZ); SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(value) + VARHDRSZ); - rawsize = pglz_decompress(TOAST_COMPRESS_RAWDATA(value), - TOAST_COMPRESS_SIZE(value), + rawsize = pglz_decompress((char *) value + header_size, + VARSIZE(value) - header_size, VARDATA(result), TOAST_COMPRESS_RAWSIZE(value), true); @@ -95,7 +95,7 @@ pglz_cmdecompress(const struct varlena *value) * Decompresses part of the data. Returns the decompressed varlena. */ static struct varlena * -pglz_cmdecompress_slice(const struct varlena *value, +pglz_cmdecompress_slice(const struct varlena *value, int32 header_size, int32 slicelength) { struct varlena *result; @@ -103,8 +103,8 @@ pglz_cmdecompress_slice(const struct varlena *value, result = (struct varlena *) palloc(slicelength + VARHDRSZ); - rawsize = pglz_decompress(TOAST_COMPRESS_RAWDATA(value), - VARSIZE(value) - TOAST_COMPRESS_HDRSZ, + rawsize = pglz_decompress((char *) value + header_size, + VARSIZE(value) - header_size, VARDATA(result), slicelength, false); diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index c626161408..bf47b230b1 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3410,6 +3410,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_AMPROC: case OBJECT_ATTRIBUTE: case OBJECT_CAST: + case OBJECT_COMPRESSION_METHOD: case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: @@ -3549,6 +3550,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_AMPROC: case OBJECT_ATTRIBUTE: case OBJECT_CAST: + case OBJECT_COMPRESSION_METHOD: case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 6e86c66456..6db2cee3ca 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -1487,6 +1487,7 @@ doDeletion(const ObjectAddress *object, int flags) case OCLASS_AM: case OCLASS_AMOP: case OCLASS_AMPROC: + case OCLASS_COMPRESSION: case OCLASS_SCHEMA: case OCLASS_TSPARSER: case OCLASS_TSDICT: @@ -1508,7 +1509,6 @@ doDeletion(const ObjectAddress *object, int flags) case OCLASS_DATABASE: case OCLASS_TBLSPACE: case OCLASS_SUBSCRIPTION: - case OCLASS_COMPRESSION: elog(ERROR, "global objects cannot be deleted by doDeletion"); break; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index ed40d78d8d..6014ca3d58 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -190,6 +190,20 @@ static const ObjectPropertyType ObjectProperty[] = OBJECT_COLLATION, true }, + { + "compression method", + CompressionRelationId, + CompressionIndexId, + CMOID, + CMNAME, + Anum_pg_compression_oid, + Anum_pg_compression_cmname, + InvalidAttrNumber, + InvalidAttrNumber, + InvalidAttrNumber, + -1, + true + }, { "constraint", ConstraintRelationId, @@ -1010,6 +1024,7 @@ get_object_address(ObjectType objtype, Node *object, case OBJECT_FOREIGN_SERVER: case OBJECT_EVENT_TRIGGER: case OBJECT_ACCESS_METHOD: + case OBJECT_COMPRESSION_METHOD: case OBJECT_PUBLICATION: case OBJECT_SUBSCRIPTION: address = get_object_address_unqualified(objtype, @@ -1261,6 +1276,11 @@ get_object_address_unqualified(ObjectType objtype, address.objectId = get_am_oid(name, missing_ok); address.objectSubId = 0; break; + case OBJECT_COMPRESSION_METHOD: + address.classId = CompressionRelationId; + address.objectId = get_cm_oid(name, missing_ok); + address.objectSubId = 0; + break; case OBJECT_DATABASE: address.classId = DatabaseRelationId; address.objectId = get_database_oid(name, missing_ok); @@ -2272,6 +2292,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) objnode = (Node *) name; break; case OBJECT_ACCESS_METHOD: + case OBJECT_COMPRESSION_METHOD: case OBJECT_DATABASE: case OBJECT_EVENT_TRIGGER: case OBJECT_EXTENSION: @@ -2565,6 +2586,7 @@ check_object_ownership(Oid roleid, ObjectType objtype, ObjectAddress address, case OBJECT_TSPARSER: case OBJECT_TSTEMPLATE: case OBJECT_ACCESS_METHOD: + case OBJECT_COMPRESSION_METHOD: /* We treat these object types as being owned by superusers */ if (!superuser_arg(roleid)) ereport(ERROR, diff --git a/src/backend/commands/compressioncmds.c b/src/backend/commands/compressioncmds.c index a1be6ef7be..d47d55b119 100644 --- a/src/backend/commands/compressioncmds.c +++ b/src/backend/commands/compressioncmds.c @@ -20,11 +20,114 @@ #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/objectaccess.h" #include "catalog/pg_attribute.h" #include "catalog/pg_depend.h" +#include "catalog/pg_proc.h" #include "commands/defrem.h" +#include "miscadmin.h" #include "nodes/parsenodes.h" +#include "parser/parse_func.h" #include "utils/fmgroids.h" +#include "utils/fmgrprotos.h" +#include "utils/syscache.h" + +/* + * CreateCompressionMethod + * Registers a new compression method. + */ +ObjectAddress +CreateCompressionMethod(CreateCmStmt *stmt) +{ + Relation rel; + ObjectAddress myself; + ObjectAddress referenced; + Oid cmoid; + Oid cmhandler; + bool nulls[Natts_pg_compression]; + Datum values[Natts_pg_compression]; + HeapTuple tup; + Oid funcargtypes[1] = {INTERNALOID}; + + rel = table_open(CompressionRelationId, RowExclusiveLock); + + /* Must be super user */ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to create compression method \"%s\"", + stmt->cmname), + errhint("Must be superuser to create an access method."))); + + /* Check if name is used */ + cmoid = GetSysCacheOid1(CMNAME, Anum_pg_compression_oid, + CStringGetDatum(stmt->cmname)); + if (OidIsValid(cmoid)) + { + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("compression method \"%s\" already exists", + stmt->cmname))); + } + + if (stmt->handler_name == NIL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("handler function is not specified"))); + + /* Get the handler function oid */ + cmhandler = LookupFuncName(stmt->handler_name, 1, funcargtypes, false); + + /* + * Insert tuple into pg_compression. + */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + cmoid = GetNewOidWithIndex(rel, AmOidIndexId, Anum_pg_compression_oid); + values[Anum_pg_compression_oid - 1] = ObjectIdGetDatum(cmoid); + values[Anum_pg_compression_cmname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(stmt->cmname)); + values[Anum_pg_compression_cmhandler - 1] = ObjectIdGetDatum(cmhandler); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + myself.classId = CompressionRelationId; + myself.objectId = cmoid; + myself.objectSubId = 0; + + /* Record dependency on handler function */ + referenced.classId = ProcedureRelationId; + referenced.objectId = cmhandler; + referenced.objectSubId = 0; + + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + + recordDependencyOnCurrentExtension(&myself, false); + + InvokeObjectPostCreateHook(CompressionRelationId, cmoid, 0); + + table_close(rel, RowExclusiveLock); + + return myself; +} + +Oid +get_cm_oid(const char *cmname, bool missing_ok) +{ + Oid cmoid; + + cmoid = GetCompressionOid(cmname); + if (!OidIsValid(cmoid) && !missing_ok) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("compression type \"%s\" not recognized", cmname))); + + return cmoid; +} /* * get list of all supported compression methods for the given attribute. diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 26fe640e8e..57bffc4691 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -953,6 +953,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_CAST: case OBJECT_COLUMN: case OBJECT_COLLATION: + case OBJECT_COMPRESSION_METHOD: case OBJECT_CONVERSION: case OBJECT_DEFACL: case OBJECT_DEFAULT: @@ -2106,6 +2107,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_ATTRIBUTE: case OBJECT_CAST: case OBJECT_COLLATION: + case OBJECT_COMPRESSION_METHOD: case OBJECT_CONVERSION: case OBJECT_DEFAULT: case OBJECT_DEFACL: @@ -2188,6 +2190,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_ATTRIBUTE: case OBJECT_CAST: case OBJECT_COLLATION: + case OBJECT_COMPRESSION_METHOD: case OBJECT_CONVERSION: case OBJECT_DEFAULT: case OBJECT_DEFACL: diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index ee036e9087..083ac78e11 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -67,6 +67,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_ATTRIBUTE: case OBJECT_CAST: case OBJECT_COLLATION: + case OBJECT_COMPRESSION_METHOD: case OBJECT_CONVERSION: case OBJECT_DEFAULT: case OBJECT_DEFACL: diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index efcbb663af..64fc13d4d0 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1970,8 +1970,7 @@ CompareCompressionMethodAndDecompress(TupleTableSlot *slot, * Get the compression method stored in the toast header and * compare with the compression method of the target. */ - cmoid = GetCompressionOidFromCompressionId( - TOAST_COMPRESS_METHOD(new_value)); + cmoid = toast_get_compression_oid(new_value); if (!IsCompressionSupported(&targetTupDesc->attrs[i], cmoid)) { new_value = detoast_attr(new_value); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 2d1f830d9d..53c8ad462e 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4320,6 +4320,17 @@ _copyCreateAmStmt(const CreateAmStmt *from) return newnode; } +static CreateCmStmt * +_copyCreateCmStmt(const CreateCmStmt *from) +{ + CreateCmStmt *newnode = makeNode(CreateCmStmt); + + COPY_STRING_FIELD(cmname); + COPY_NODE_FIELD(handler_name); + + return newnode; +} + static CreateTrigStmt * _copyCreateTrigStmt(const CreateTrigStmt *from) { @@ -5485,6 +5496,9 @@ copyObjectImpl(const void *from) case T_CreateAmStmt: retval = _copyCreateAmStmt(from); break; + case T_CreateCmStmt: + retval = _copyCreateCmStmt(from); + break; case T_CreateTrigStmt: retval = _copyCreateTrigStmt(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 365957cfe7..f3023ae03f 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2016,6 +2016,15 @@ _equalCreateAmStmt(const CreateAmStmt *a, const CreateAmStmt *b) return true; } +static bool +_equalCreateCmStmt(const CreateCmStmt *a, const CreateCmStmt *b) +{ + COMPARE_STRING_FIELD(cmname); + COMPARE_NODE_FIELD(handler_name); + + return true; +} + static bool _equalCreateTrigStmt(const CreateTrigStmt *a, const CreateTrigStmt *b) { @@ -3537,6 +3546,9 @@ equal(const void *a, const void *b) case T_CreateAmStmt: retval = _equalCreateAmStmt(a, b); break; + case T_CreateCmStmt: + retval = _equalCreateCmStmt(a, b); + break; case T_CreateTrigStmt: retval = _equalCreateTrigStmt(a, b); break; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index a1755c6fc7..35c69744e2 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -289,7 +289,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt - CreateMatViewStmt RefreshMatViewStmt CreateAmStmt + CreateMatViewStmt RefreshMatViewStmt CreateAmStmt CreateCmStmt CreatePublicationStmt AlterPublicationStmt CreateSubscriptionStmt AlterSubscriptionStmt DropSubscriptionStmt @@ -882,6 +882,7 @@ stmt : | CreateAsStmt | CreateAssertionStmt | CreateCastStmt + | CreateCmStmt | CreateConversionStmt | CreateDomainStmt | CreateExtensionStmt @@ -5256,6 +5257,22 @@ am_type: | TABLE { $$ = AMTYPE_TABLE; } ; +/***************************************************************************** + * + * QUERY: + * CREATE COMPRESSION METHOD name HANDLER handler_name + * + *****************************************************************************/ + +CreateCmStmt: CREATE COMPRESSION METHOD name HANDLER handler_name + { + CreateCmStmt *n = makeNode(CreateCmStmt); + n->cmname = $4; + n->handler_name = $6; + $$ = (Node *) n; + } + ; + /***************************************************************************** * * QUERIES : @@ -6258,6 +6275,7 @@ object_type_name: drop_type_name: ACCESS METHOD { $$ = OBJECT_ACCESS_METHOD; } + | COMPRESSION METHOD { $$ = OBJECT_COMPRESSION_METHOD; } | EVENT TRIGGER { $$ = OBJECT_EVENT_TRIGGER; } | EXTENSION { $$ = OBJECT_EXTENSION; } | FOREIGN DATA_P WRAPPER { $$ = OBJECT_FDW; } diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 9a35147b26..7ebbf51778 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -168,6 +168,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree) case T_CompositeTypeStmt: case T_CreateAmStmt: case T_CreateCastStmt: + case T_CreateCmStmt: case T_CreateConversionStmt: case T_CreateDomainStmt: case T_CreateEnumStmt: @@ -1805,6 +1806,10 @@ ProcessUtilitySlow(ParseState *pstate, address = CreateAccessMethod((CreateAmStmt *) parsetree); break; + case T_CreateCmStmt: + address = CreateCompressionMethod((CreateCmStmt *) parsetree); + break; + case T_CreatePublicationStmt: address = CreatePublication((CreatePublicationStmt *) parsetree); break; @@ -2566,6 +2571,9 @@ CreateCommandTag(Node *parsetree) case OBJECT_ACCESS_METHOD: tag = CMDTAG_DROP_ACCESS_METHOD; break; + case OBJECT_COMPRESSION_METHOD: + tag = CMDTAG_DROP_COMPRESSION_METHOD; + break; case OBJECT_PUBLICATION: tag = CMDTAG_DROP_PUBLICATION; break; @@ -2973,6 +2981,10 @@ CreateCommandTag(Node *parsetree) tag = CMDTAG_CREATE_ACCESS_METHOD; break; + case T_CreateCmStmt: + tag = CMDTAG_CREATE_COMPRESSION_METHOD; + break; + case T_CreatePublicationStmt: tag = CMDTAG_CREATE_PUBLICATION; break; @@ -3581,6 +3593,10 @@ GetCommandLogLevel(Node *parsetree) lev = LOGSTMT_DDL; break; + case T_CreateCmStmt: + lev = LOGSTMT_DDL; + break; + case T_CreatePublicationStmt: lev = LOGSTMT_DDL; break; diff --git a/src/include/access/compressionapi.h b/src/include/access/compressionapi.h index 6660285e84..4d4326ff57 100644 --- a/src/include/access/compressionapi.h +++ b/src/include/access/compressionapi.h @@ -23,23 +23,31 @@ * this in the first 2 bits of the raw length. These built-in compression * method-id are directly mapped to the built-in compression method oid. And, * using that oid we can get the compression handler routine by fetching the - * pg_compression catalog row. + * pg_compression catalog row. If it is custome compression id then the + * compressed data will store special custom compression header wherein it will + * directly store the oid of the custom compression method. */ typedef enum CompressionId { - PGLZ_COMPRESSION_ID, - LZ4_COMPRESSION_ID + PGLZ_COMPRESSION_ID = 0, + LZ4_COMPRESSION_ID = 1, + /* one free slot for the future built-in method */ + CUSTOM_COMPRESSION_ID = 3 } CompressionId; -/* Use default compression method if it is not specified. */ -#define DefaultCompressionOid PGLZ_COMPRESSION_OID +/* Use default compression method if it is not specified */ +#define DefaultCompressionOid PGLZ_COMPRESSION_OID +#define IsCustomCompression(cmid) ((cmid) == CUSTOM_COMPRESSION_ID) typedef struct CompressionRoutine CompressionRoutine; /* compresion handler routines */ -typedef struct varlena *(*cmcompress_function) (const struct varlena *value); -typedef struct varlena *(*cmdecompress_slice_function) - (const struct varlena *value, int32 slicelength); +typedef struct varlena *(*cmcompress_function)(const struct varlena *value, + int32 toast_header_size); +typedef struct varlena *(*cmdecompress_slice_function)( + const struct varlena *value, + int32 slicelength, + int32 toast_header_size); /* * API struct for a compression. diff --git a/src/include/access/detoast.h b/src/include/access/detoast.h index 86bad7e78c..e6b3ec90b5 100644 --- a/src/include/access/detoast.h +++ b/src/include/access/detoast.h @@ -89,4 +89,12 @@ extern Size toast_raw_datum_size(Datum value); */ extern Size toast_datum_size(Datum value); +/* ---------- + * toast_get_compression_oid - + * + * Return the compression method oid from the compressed value + * ---------- + */ +extern Oid toast_get_compression_oid(struct varlena *attr); + #endif /* DETOAST_H */ diff --git a/src/include/access/toast_internals.h b/src/include/access/toast_internals.h index 8ef477cfe0..48ca172eae 100644 --- a/src/include/access/toast_internals.h +++ b/src/include/access/toast_internals.h @@ -27,6 +27,17 @@ typedef struct toast_compress_header * rawsize */ } toast_compress_header; +/* + * If the compression method were used, then data also contains + * Oid of compression options + */ +typedef struct toast_compress_header_custom +{ + int32 vl_len_; /* varlena header (do not touch directly!) */ + uint32 info; /* 2 bits for compression method + rawsize */ + Oid cmoid; /* Oid from pg_compression */ +} toast_compress_header_custom; + #define RAWSIZEMASK (0x3FFFFFFFU) /* @@ -38,6 +49,7 @@ typedef struct toast_compress_header * two highest bits. */ #define TOAST_COMPRESS_HDRSZ ((int32) sizeof(toast_compress_header)) +#define TOAST_CUSTOM_COMPRESS_HDRSZ ((int32) sizeof(toast_compress_header_custom)) #define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->info & RAWSIZEMASK) #define TOAST_COMPRESS_METHOD(ptr) (((toast_compress_header *) (ptr))->info >> 30) #define TOAST_COMPRESS_SIZE(ptr) ((int32) VARSIZE_ANY(ptr) - TOAST_COMPRESS_HDRSZ) @@ -52,6 +64,9 @@ do { \ #define TOAST_COMPRESS_SET_COMPRESSION_METHOD(ptr, cm_method) \ ((toast_compress_header *) (ptr))->info |= ((cm_method) << 30); +#define TOAST_COMPRESS_SET_CMID(ptr, oid) \ + (((toast_compress_header_custom *) (ptr))->cmoid = (oid)) + extern Datum toast_compress_datum(Datum value, Oid cmoid); extern Oid toast_get_valid_index(Oid toastoid, LOCKMODE lock); @@ -72,7 +87,9 @@ extern void init_toast_snapshot(Snapshot toast_snapshot); * * Save metadata in compressed datum */ -extern void toast_set_compressed_datum_info(struct varlena *val, CompressionId cmid, +extern void toast_set_compressed_datum_info(struct varlena *val, + CompressionId cmid, + Oid cmoid, int32 rawsize); #endif /* TOAST_INTERNALS_H */ diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h index 936b072c76..8952b2b70d 100644 --- a/src/include/commands/defrem.h +++ b/src/include/commands/defrem.h @@ -146,6 +146,8 @@ extern Oid get_am_oid(const char *amname, bool missing_ok); extern char *get_am_name(Oid amOid); /* commands/compressioncmds.c */ +extern ObjectAddress CreateCompressionMethod(CreateCmStmt *stmt); +extern Oid get_cm_oid(const char *cmname, bool missing_ok); extern Oid GetAttributeCompression(Form_pg_attribute att, ColumnCompression *compression, bool *need_rewrite); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 65ef987d67..ae297c9116 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -415,6 +415,7 @@ typedef enum NodeTag T_AlterPolicyStmt, T_CreateTransformStmt, T_CreateAmStmt, + T_CreateCmStmt, T_CreatePublicationStmt, T_AlterPublicationStmt, T_CreateSubscriptionStmt, @@ -493,7 +494,7 @@ typedef enum NodeTag T_TimeLineHistoryCmd, T_SQLCmd, - T_CompressionRoutine, /* in access/compressionapi.h */ + T_CompressionRoutine, /* in access/compressionapi.h */ /* * TAGS FOR RANDOM OTHER STUFF * diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3c1ee794b5..f7870c469b 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1717,6 +1717,7 @@ typedef enum ObjectType OBJECT_CAST, OBJECT_COLUMN, OBJECT_COLLATION, + OBJECT_COMPRESSION_METHOD, OBJECT_CONVERSION, OBJECT_DATABASE, OBJECT_DEFAULT, @@ -2439,6 +2440,17 @@ typedef struct CreateAmStmt char amtype; /* type of access method */ } CreateAmStmt; +/*---------------------- + * Create COMPRESSION METHOD Statement + *---------------------- + */ +typedef struct CreateCmStmt +{ + NodeTag type; + char *cmname; /* compression method name */ + List *handler_name; /* handler function name */ +} CreateCmStmt; + /* ---------------------- * Create TRIGGER Statement * ---------------------- diff --git a/src/include/postgres.h b/src/include/postgres.h index 4e7cad3daf..a8893759cf 100644 --- a/src/include/postgres.h +++ b/src/include/postgres.h @@ -149,6 +149,14 @@ typedef union * flags */ char va_data[FLEXIBLE_ARRAY_MEMBER]; /* Compressed data */ } va_compressed; + struct /* Compressed-in-line format */ + { + uint32 va_header; + uint32 va_info; /* Original data size (excludes header) and + * flags */ + Oid va_cmid; /* Oid of compression method */ + char va_data[FLEXIBLE_ARRAY_MEMBER]; /* Compressed data */ + } va_custom_compressed; } varattrib_4b; typedef struct diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h index be94852bbd..a9bc1c4bae 100644 --- a/src/include/tcop/cmdtaglist.h +++ b/src/include/tcop/cmdtaglist.h @@ -83,6 +83,7 @@ PG_CMDTAG(CMDTAG_COMMIT_PREPARED, "COMMIT PREPARED", false, false, false) PG_CMDTAG(CMDTAG_COPY, "COPY", false, false, true) PG_CMDTAG(CMDTAG_COPY_FROM, "COPY FROM", false, false, false) PG_CMDTAG(CMDTAG_CREATE_ACCESS_METHOD, "CREATE ACCESS METHOD", true, false, false) +PG_CMDTAG(CMDTAG_CREATE_COMPRESSION_METHOD, "CREATE COMPRESSION METHOD", true, false, false) PG_CMDTAG(CMDTAG_CREATE_AGGREGATE, "CREATE AGGREGATE", true, false, false) PG_CMDTAG(CMDTAG_CREATE_CAST, "CREATE CAST", true, false, false) PG_CMDTAG(CMDTAG_CREATE_COLLATION, "CREATE COLLATION", true, false, false) @@ -138,6 +139,7 @@ PG_CMDTAG(CMDTAG_DROP_ACCESS_METHOD, "DROP ACCESS METHOD", true, false, false) PG_CMDTAG(CMDTAG_DROP_AGGREGATE, "DROP AGGREGATE", true, false, false) PG_CMDTAG(CMDTAG_DROP_CAST, "DROP CAST", true, false, false) PG_CMDTAG(CMDTAG_DROP_COLLATION, "DROP COLLATION", true, false, false) +PG_CMDTAG(CMDTAG_DROP_COMPRESSION_METHOD, "DROP COMPRESSIOn METHOD", true, false, false) PG_CMDTAG(CMDTAG_DROP_CONSTRAINT, "DROP CONSTRAINT", true, false, false) PG_CMDTAG(CMDTAG_DROP_CONVERSION, "DROP CONVERSION", true, false, false) PG_CMDTAG(CMDTAG_DROP_DATABASE, "DROP DATABASE", false, false, false) diff --git a/src/test/regress/expected/create_cm.out b/src/test/regress/expected/create_cm.out index 1b7b5bd6e5..897a61689f 100644 --- a/src/test/regress/expected/create_cm.out +++ b/src/test/regress/expected/create_cm.out @@ -103,4 +103,22 @@ INSERT INTO cmmove2 VALUES (repeat('1234567890',1004)); --------+------+-----------+----------+---------+----------+-------------+--------------+------------- f1 | text | | | | extended | pglz | | +-- create compression method +CREATE COMPRESSION METHOD pglz2 HANDLER pglzhandler; +ALTER TABLE cmmove2 ALTER COLUMN f1 SET COMPRESSION pglz2; +INSERT INTO cmmove2 VALUES (repeat('1234567890',1004)); +SELECT length(f1) FROM cmmove2; + length +-------- + 10000 + 10040 + 10040 +(3 rows) + +\d+ cmmove2 + Table "public.cmmove2" + Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description +--------+------+-----------+----------+---------+----------+-------------+--------------+------------- + f1 | text | | | | extended | pglz2 | | + DROP TABLE cmmove1, cmmove2, cmmove3, lz4test; diff --git a/src/test/regress/sql/create_cm.sql b/src/test/regress/sql/create_cm.sql index 3549fb35f0..a0a0aa13f0 100644 --- a/src/test/regress/sql/create_cm.sql +++ b/src/test/regress/sql/create_cm.sql @@ -51,4 +51,11 @@ ALTER TABLE cmmove2 ALTER COLUMN f1 SET COMPRESSION pglz PRESERVE (lz4); INSERT INTO cmmove2 VALUES (repeat('1234567890',1004)); \d+ cmmove2 +-- create compression method +CREATE COMPRESSION METHOD pglz2 HANDLER pglzhandler; +ALTER TABLE cmmove2 ALTER COLUMN f1 SET COMPRESSION pglz2; +INSERT INTO cmmove2 VALUES (repeat('1234567890',1004)); +SELECT length(f1) FROM cmmove2; +\d+ cmmove2 + DROP TABLE cmmove1, cmmove2, cmmove3, lz4test; -- 2.23.0