diff --git a/configure b/configure index e147a98..a35e610 100755 --- a/configure +++ b/configure @@ -699,6 +699,7 @@ with_gnu_ld LD LDFLAGS_SL LDFLAGS_EX +with_zstd with_zlib with_system_tzdata with_libxslt @@ -866,6 +867,7 @@ with_libxml with_libxslt with_system_tzdata with_zlib +with_zstd with_gnu_ld enable_largefile ' @@ -1570,6 +1572,7 @@ Optional Packages: --with-system-tzdata=DIR use system time zone data in DIR --without-zlib do not use Zlib + --with-zstd build with zstd support --with-gnu-ld assume the C compiler uses GNU ld [default=no] Some influential environment variables: @@ -8505,6 +8508,41 @@ fi # +# zstd +# +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking whether to build with zstd support" >&5 +$as_echo_n "checking whether to build with zstd support... " >&6; } + + + +# Check whether --with-zstd was given. +if test "${with_zstd+set}" = set; then : + withval=$with_zstd; + case $withval in + yes) + +$as_echo "#define USE_ZSTD 1" >>confdefs.h + + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 + ;; + esac + +else + with_zstd=no + +fi + + +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $with_zstd" >&5 +$as_echo "$with_zstd" >&6; } + + +# # Assignments # @@ -11995,6 +12033,56 @@ fi fi +if test "$with_zstd" = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5 +$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; } +if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzstd $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_compress (); +int +main () +{ +return ZSTD_compress (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zstd_ZSTD_compress=yes +else + ac_cv_lib_zstd_ZSTD_compress=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5 +$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; } +if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZSTD 1 +_ACEOF + + LIBS="-lzstd $LIBS" + +else + as_fn_error $? "library 'zstd' is required for zstd support" "$LINENO" 5 +fi + +fi + if test "$enable_spinlocks" = yes; then $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h @@ -13196,6 +13284,36 @@ fi fi +if test "$with_zstd" = yes; then + for ac_header in zstd.h +do : + ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default" +if test "x$ac_cv_header_zstd_h" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_ZSTD_H 1 +_ACEOF + +else + for ac_header in zstd.h +do : + ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default" +if test "x$ac_cv_header_zstd_h" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_ZSTD_H 1 +_ACEOF + +else + as_fn_error $? "zstd.h header file is required for zstd" "$LINENO" 5 +fi + +done + +fi + +done + +fi + if test "$with_gssapi" = yes ; then for ac_header in gssapi/gssapi.h do : diff --git a/configure.in b/configure.in index 1a9891c..868ba38 100644 --- a/configure.in +++ b/configure.in @@ -993,6 +993,16 @@ PGAC_ARG_BOOL(with, zlib, yes, AC_SUBST(with_zlib) # +# zstd +# +AC_MSG_CHECKING([whether to build with zstd support]) +PGAC_ARG_BOOL(with, zstd, no, + [build with zstd support], + [AC_DEFINE([USE_ZSTD], 1, [Define to 1 to build with zstd support. (--with-zstd)])]) +AC_MSG_RESULT([$with_zstd]) +AC_SUBST(with_zstd) + +# # Assignments # @@ -1179,6 +1189,10 @@ failure. It is possible the compiler isn't looking in the proper directory. Use --without-zlib to disable zlib support.])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_LIB(zstd, ZSTD_compress, [], [AC_MSG_ERROR([library 'zstd' is required for zstd support])]) +fi + if test "$enable_spinlocks" = yes; then AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.]) else @@ -1390,6 +1404,11 @@ failure. It is possible the compiler isn't looking in the proper directory. Use --without-zlib to disable zlib support.])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_HEADERS(zstd.h, [], + [AC_CHECK_HEADERS(zstd.h, [], [AC_MSG_ERROR([zstd.h header file is required for zstd])])]) +fi + if test "$with_gssapi" = yes ; then AC_CHECK_HEADERS(gssapi/gssapi.h, [], [AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])]) diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 8ccc228..bf152f5 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -382,6 +382,24 @@ static relopt_int intRelOpts[] = }, -1, 0, 1024 }, + { + { + "compress_chunk_size", + "Size of chunk to store compressed page.", + RELOPT_KIND_HEAP|RELOPT_KIND_BTREE, + AccessExclusiveLock + }, + BLCKSZ / 2, BLCKSZ / 8, BLCKSZ / 2 + }, + { + { + "compress_prealloc_chunks", + "Number of prealloced chunks for each block.", + RELOPT_KIND_HEAP|RELOPT_KIND_BTREE, + ShareUpdateExclusiveLock + }, + 0, 0, 7 + }, /* list terminator */ {{NULL}} @@ -492,6 +510,16 @@ relopt_enum_elt_def viewCheckOptValues[] = {(const char *) NULL} /* list terminator */ }; +/* values from compressTypeOption */ +relopt_enum_elt_def compressTypeOptValues[] = +{ + /* no value for NOT_SET */ + {"none", COMPRESS_TYPE_NONE}, + {"pglz", COMPRESS_TYPE_PGLZ}, + {"zstd", COMPRESS_TYPE_ZSTD}, + {(const char *) NULL} /* list terminator */ +}; + static relopt_enum enumRelOpts[] = { { @@ -516,6 +544,17 @@ static relopt_enum enumRelOpts[] = VIEW_OPTION_CHECK_OPTION_NOT_SET, gettext_noop("Valid values are \"local\" and \"cascaded\".") }, + { + { + "compress_type", + "compress type (none, pglz or zstd).", + RELOPT_KIND_HEAP|RELOPT_KIND_BTREE, + AccessExclusiveLock + }, + compressTypeOptValues, + COMPRESS_TYPE_NONE, + gettext_noop("Valid values are \"none\", \"pglz\" and \"zstd\".") + }, /* list terminator */ {{NULL}} }; @@ -1859,7 +1898,13 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind) {"vacuum_index_cleanup", RELOPT_TYPE_BOOL, offsetof(StdRdOptions, vacuum_index_cleanup)}, {"vacuum_truncate", RELOPT_TYPE_BOOL, - offsetof(StdRdOptions, vacuum_truncate)} + offsetof(StdRdOptions, vacuum_truncate)}, + {"compress_type", RELOPT_TYPE_ENUM, + offsetof(StdRdOptions, compress) + offsetof(PageCompressOpts, compress_type)}, + {"compress_chunk_size", RELOPT_TYPE_INT, + offsetof(StdRdOptions, compress) + offsetof(PageCompressOpts, compress_chunk_size)}, + {"compress_prealloc_chunks", RELOPT_TYPE_INT, + offsetof(StdRdOptions, compress) + offsetof(PageCompressOpts, compress_prealloc_chunks)} }; return (bytea *) build_reloptions(reloptions, validate, kind, diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 7c33711..f00dcd9 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -2107,8 +2107,13 @@ btoptions(Datum reloptions, bool validate) {"vacuum_cleanup_index_scale_factor", RELOPT_TYPE_REAL, offsetof(BTOptions, vacuum_cleanup_index_scale_factor)}, {"deduplicate_items", RELOPT_TYPE_BOOL, - offsetof(BTOptions, deduplicate_items)} - + offsetof(BTOptions, deduplicate_items)}, + {"compress_type", RELOPT_TYPE_ENUM, + offsetof(BTOptions, compress) + offsetof(PageCompressOpts, compress_type)}, + {"compress_chunk_size", RELOPT_TYPE_INT, + offsetof(BTOptions, compress) + offsetof(PageCompressOpts, compress_chunk_size)}, + {"compress_prealloc_chunks", RELOPT_TYPE_INT, + offsetof(BTOptions, compress) + offsetof(PageCompressOpts, compress_prealloc_chunks)} }; return (bytea *) build_reloptions(reloptions, validate, diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index 5eaca27..3ce718b 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -229,6 +229,7 @@ Boot_CreateStmt: RELPERSISTENCE_PERMANENT, shared_relation, mapped_relation, + (Datum) 0, true, &relfrozenxid, &relminmxid); diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index e393c93..f239c1d 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -302,6 +302,7 @@ heap_create(const char *relname, char relpersistence, bool shared_relation, bool mapped_relation, + Datum reloptions, bool allow_system_table_mods, TransactionId *relfrozenxid, MultiXactId *relminmxid) @@ -402,7 +403,9 @@ heap_create(const char *relname, shared_relation, mapped_relation, relpersistence, - relkind); + relkind, + reloptions); + /* * Have the storage manager create the relation's disk file, if needed. @@ -1259,6 +1262,7 @@ heap_create_with_catalog(const char *relname, relpersistence, shared_relation, mapped_relation, + reloptions, allow_system_table_mods, &relfrozenxid, &relminmxid); diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index cdc01c4..c7bc68d 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -932,6 +932,7 @@ index_create(Relation heapRelation, relpersistence, shared_relation, mapped_relation, + reloptions, allow_system_table_mods, &relfrozenxid, &relminmxid); diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 7dc6dd2..057ffa1 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -96,6 +96,7 @@ #include "storage/ipc.h" #include "utils/guc.h" #include "utils/resowner_private.h" +#include "storage/page_compression.h" /* Define PG_FLUSH_DATA_WORKS if we have an implementation for pg_flush_data */ #if defined(HAVE_SYNC_FILE_RANGE) @@ -195,6 +196,8 @@ typedef struct vfd /* NB: fileName is malloc'd, and must be free'd when closing the VFD */ int fileFlags; /* open(2) flags for (re)opening the file */ mode_t fileMode; /* mode to pass to open(2) */ + bool with_pcmap; + PageCompressHeader *pcmap; /* memory map of page compress file's header and address area */ } Vfd; /* @@ -1154,6 +1157,14 @@ LruDelete(File file) vfdP = &VfdCache[file]; + if (vfdP->with_pcmap && vfdP->pcmap != NULL) + { + if (pc_munmap(vfdP->pcmap)) + elog(LOG, "Failed to unmap file %s: %m", vfdP->fileName); + + vfdP->pcmap = NULL; + } + /* * Close the file. We aren't expecting this to fail; if it does, better * to leak the FD than to mess up our internal state. @@ -1834,6 +1845,15 @@ FileClose(File file) if (!FileIsNotOpen(file)) { + /* close the pcmap */ + if (vfdP->with_pcmap && vfdP->pcmap != NULL) + { + if (pc_munmap(vfdP->pcmap)) + elog(LOG, "Failed to unmap file %s: %m", vfdP->fileName); + + vfdP->pcmap = NULL; + } + /* close the file */ if (close(vfdP->fd) != 0) { @@ -2189,6 +2209,88 @@ FileTruncate(File file, off_t offset, uint32 wait_event_info) } /* + * initialize page compress memory map. + * + */ +void +SetupPageCompressMemoryMap(File file, int chunk_size, uint8 algorithm) +{ + int returnCode; + Vfd *vfdP; + PageCompressHeader *map; + + Assert(FileIsValid(file)); + + vfdP = &VfdCache[file]; + + returnCode = FileAccess(file); + if (returnCode < 0) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("Failed to open file %s: %m", + vfdP->fileName))); + + map = pc_mmap(vfdP->fd, chunk_size); + if(map == MAP_FAILED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("Failed to map page compress file %s: %m", + vfdP->fileName))); + + /* initialize page compress header */ + if(map->chunk_size == 0) + { + map->chunk_size = chunk_size; + map->algorithm = algorithm; + + if(pc_msync(map) != 0) + elog(LOG, "failed to msync page compress map %s",vfdP->fileName); + } + + vfdP->with_pcmap=true; + vfdP->pcmap = map; +} + +/* + * Return the page compress memory map. + * + */ +void * +GetPageCompressMemoryMap(File file, int chunk_size) +{ + int returnCode; + Vfd *vfdP; + PageCompressHeader *map; + + Assert(FileIsValid(file)); + + vfdP = &VfdCache[file]; + + returnCode = FileAccess(file); + if (returnCode < 0) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("Failed to open file %s: %m", + vfdP->fileName))); + + Assert(vfdP->with_pcmap); + + if(vfdP->pcmap == NULL) + { + map = pc_mmap(vfdP->fd, chunk_size); + if(map == MAP_FAILED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("Failed to map page compress file %s: %m", + vfdP->fileName))); + + vfdP->pcmap = map; + } + + return vfdP->pcmap; +} + +/* * Return the pathname associated with an open file. * * The returned string points to an internal buffer, which is valid until diff --git a/src/backend/storage/smgr/Makefile b/src/backend/storage/smgr/Makefile index 596b564..ba7a9aa 100644 --- a/src/backend/storage/smgr/Makefile +++ b/src/backend/storage/smgr/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ md.o \ - smgr.o + smgr.o \ + page_compression.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 0eacd46..b3c5271 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -40,6 +40,7 @@ #include "storage/sync.h" #include "utils/hsearch.h" #include "utils/memutils.h" +#include "storage/page_compression.h" /* * The magnetic disk storage manager keeps track of open file @@ -82,6 +83,7 @@ typedef struct _MdfdVec { File mdfd_vfd; /* fd number in fd.c's pool */ + File mdfd_vfd_pc; /* page compress file 's fd number in fd.c's pool */ BlockNumber mdfd_segno; /* segment number, from 0 */ } MdfdVec; @@ -117,6 +119,12 @@ static MemoryContext MdCxt; /* context for all MdfdVec objects */ */ #define EXTENSION_DONT_CHECK_SIZE (1 << 4) +#define IS_COMPRESSED_MAINFORK(reln, forkNum) \ + (reln->smgr_rnode.node.compress_algorithm != COMPRESS_TYPE_NONE && forkNum == MAIN_FORKNUM) + +#define PAGE_COMPRESS_ALGORITHM(reln) (reln->smgr_rnode.node.compress_algorithm) +#define PAGE_COMPRESS_CHUNK_SIZE(reln) (reln->smgr_rnode.node.compress_chunk_size) +#define PAGE_COMPRESS_PREALLOC_CHUNKS(reln) (reln->smgr_rnode.node.compress_prealloc_chunks) /* local routines */ static void mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum, @@ -179,7 +187,8 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo) { MdfdVec *mdfd; char *path; - File fd; + char *pcfile_path; + File fd,fd_pc; if (isRedo && reln->md_num_open_segs[forkNum] > 0) return; /* created and opened already... */ @@ -219,11 +228,39 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo) } } + fd_pc = -1; + if(IS_COMPRESSED_MAINFORK(reln,forkNum)) + { + /* open page compress file */ + pcfile_path = psprintf("%s_pc", path); + fd_pc = PathNameOpenFile(pcfile_path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY); + + if (fd_pc < 0) + { + int save_errno = errno; + + if (isRedo) + fd_pc = PathNameOpenFile(pcfile_path, O_RDWR | PG_BINARY); + if (fd_pc < 0) + { + /* be sure to report the error reported by create, not open */ + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", pcfile_path))); + } + } + pfree(pcfile_path); + + SetupPageCompressMemoryMap(fd_pc, PAGE_COMPRESS_CHUNK_SIZE(reln), PAGE_COMPRESS_ALGORITHM(reln)); + } + pfree(path); _fdvec_resize(reln, forkNum, 1); mdfd = &reln->md_seg_fds[forkNum][0]; mdfd->mdfd_vfd = fd; + mdfd->mdfd_vfd_pc = fd_pc; mdfd->mdfd_segno = 0; } @@ -309,6 +346,18 @@ mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo) ereport(WARNING, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); + + if((rnode.node.compress_algorithm != COMPRESS_TYPE_NONE && + forkNum == MAIN_FORKNUM)) + { + char *pcfile_path = psprintf("%s_pc", path); + ret = unlink(pcfile_path); + if (ret < 0 && errno != ENOENT) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", pcfile_path))); + pfree(pcfile_path); + } } else { @@ -367,6 +416,22 @@ mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo) errmsg("could not remove file \"%s\": %m", segpath))); break; } + + if((rnode.node.compress_algorithm != COMPRESS_TYPE_NONE && + forkNum == MAIN_FORKNUM)) + { + char *pcfile_segpath = psprintf("%s_pc", segpath); + if (unlink(pcfile_segpath) < 0) + { + /* ENOENT is expected after the last segment... */ + if (errno != ENOENT) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", pcfile_segpath))); + break; + } + pfree(pcfile_segpath); + } } pfree(segpath); } @@ -375,6 +440,185 @@ mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo) } /* + * mdextend_pc() -- Add a block to the specified page compressed relation. + * + */ +static void +mdextend_pc(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer, bool skipFsync) +{ + off_t seekpos; + int nbytes; + MdfdVec *v; + char *work_buffer,*buffer_pos,*zero_buffer; + int i; + int prealloc_chunks,need_chunks,chunk_size,nchunks,range,write_amount; + pc_chunk_number_t chunkno; + PageCompressHeader *pcMap; + PageCompressAddr *pcAddr; + uint8 algorithm; + + /* This assert is too expensive to have on normally ... */ +#ifdef CHECK_WRITE_VS_EXTEND + Assert(blocknum >= mdnblocks(reln, forknum)); +#endif + + Assert(IS_COMPRESSED_MAINFORK(reln,forknum)); + + /* + * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any + * more --- we mustn't create a block whose number actually is + * InvalidBlockNumber. + */ + if (blocknum == InvalidBlockNumber) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("cannot extend file \"%s\" beyond %u blocks", + relpath(reln->smgr_rnode, forknum), + InvalidBlockNumber))); + + v = _mdfd_getseg(reln, MAIN_FORKNUM, blocknum, skipFsync, EXTENSION_CREATE); + + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + algorithm = PAGE_COMPRESS_ALGORITHM(reln); + prealloc_chunks = PAGE_COMPRESS_PREALLOC_CHUNKS(reln); + if(prealloc_chunks > BLCKSZ / chunk_size -1) + prealloc_chunks = BLCKSZ / chunk_size -1; + + pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(v->mdfd_vfd_pc, chunk_size); + pcAddr = GetPageCompressAddr(pcMap, chunk_size, blocknum); + + work_buffer = NULL; + nchunks = 0; + + /* compress data if not full zero page */ + for(i=0;iallocated_chunks > BLCKSZ / chunk_size) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunks %u of block %u in file \"%s\"", + pcAddr->allocated_chunks, blocknum, FilePathName(v->mdfd_vfd_pc)))); + + for(i=0; i< pcAddr->allocated_chunks; i++) + { + if(pcAddr->chunknos[i] <= 0 || pcAddr->chunknos[i] > (BLCKSZ / chunk_size) * RELSEG_SIZE) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk number %u of block %u in file \"%s\"", + pcAddr->chunknos[i], blocknum, FilePathName(v->mdfd_vfd_pc)))); + } + + need_chunks = prealloc_chunks > nchunks ? prealloc_chunks : nchunks; + /* TODO automatic read ? */ + if(pcAddr->allocated_chunks < need_chunks) + { + chunkno = (pc_chunk_number_t)pg_atomic_fetch_add_u32(&pcMap->allocated_chunks, need_chunks - pcAddr->allocated_chunks) + 1; + for(i = pcAddr->allocated_chunks ;ichunknos[i] = chunkno; + } + pcAddr->allocated_chunks = need_chunks; + } + pcAddr->nchunks = nchunks; + + if(pg_atomic_read_u32(&pcMap->nblocks) < blocknum % RELSEG_SIZE + 1) + pg_atomic_write_u32(&pcMap->nblocks, blocknum % RELSEG_SIZE + 1); + + /*if(pc_msync(pcMap) != 0) + elog(LOG, "failed to msync page compress map %s",FilePathName(v->mdfd_vfd_pc));*/ + + for(i=0; i < nchunks;i++) + { + buffer_pos = work_buffer + chunk_size * i; + seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, pcAddr->chunknos[i]); + range = 1; + while(i < nchunks -1 && pcAddr->chunknos[i+1] == pcAddr->chunknos[i] + 1) + { + range++; + i++; + } + write_amount = chunk_size * range; + + if ((nbytes = FileWrite(v->mdfd_vfd_pc, buffer_pos, write_amount, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != write_amount) + { + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not extend file \"%s\": %m", + FilePathName(v->mdfd_vfd_pc)), + errhint("Check free disk space."))); + /* short write: complain appropriately */ + ereport(ERROR, + (errcode(ERRCODE_DISK_FULL), + errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u", + FilePathName(v->mdfd_vfd_pc), + nbytes, write_amount, blocknum), + errhint("Check free disk space."))); + } + } + + if(need_chunks > nchunks) + { + zero_buffer = palloc0(chunk_size * (need_chunks - nchunks)); + + for(i=nchunks; i < need_chunks;i++) + { + seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, pcAddr->chunknos[i]); + range = 1; + while(i < nchunks -1 && pcAddr->chunknos[i+1] == pcAddr->chunknos[i] + 1) + { + range++; + i++; + } + write_amount = chunk_size * range; + + if ((nbytes = FileWrite(v->mdfd_vfd_pc, zero_buffer, write_amount, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != write_amount) + { + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not extend file \"%s\": %m", + FilePathName(v->mdfd_vfd_pc)), + errhint("Check free disk space."))); + /* short write: complain appropriately */ + ereport(ERROR, + (errcode(ERRCODE_DISK_FULL), + errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u", + FilePathName(v->mdfd_vfd_pc), + nbytes, write_amount, blocknum), + errhint("Check free disk space."))); + } + } + pfree(zero_buffer); + } + + if(work_buffer != NULL && work_buffer != buffer) + pfree(work_buffer); + + if (!skipFsync && !SmgrIsTemp(reln)) + register_dirty_segment(reln, forknum, v); + + Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE)); +} + +/* * mdextend() -- Add a block to the specified relation. * * The semantics are nearly the same as mdwrite(): write at the @@ -396,6 +640,9 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, Assert(blocknum >= mdnblocks(reln, forknum)); #endif + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + return mdextend_pc(reln, forknum, blocknum, buffer, skipFsync); + /* * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any * more --- we mustn't create a block whose number actually is @@ -419,16 +666,16 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, if (nbytes < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not extend file \"%s\": %m", + errmsg("could not extend file \"%s\": %m", FilePathName(v->mdfd_vfd)), - errhint("Check free disk space."))); + errhint("Check free disk space."))); /* short write: complain appropriately */ ereport(ERROR, (errcode(ERRCODE_DISK_FULL), - errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u", + errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u", FilePathName(v->mdfd_vfd), nbytes, BLCKSZ, blocknum), - errhint("Check free disk space."))); + errhint("Check free disk space."))); } if (!skipFsync && !SmgrIsTemp(reln)) @@ -452,7 +699,8 @@ mdopenfork(SMgrRelation reln, ForkNumber forknum, int behavior) { MdfdVec *mdfd; char *path; - File fd; + char *pcfile_path; + File fd,fd_pc; /* No work if already open */ if (reln->md_num_open_segs[forknum] > 0) @@ -475,11 +723,38 @@ mdopenfork(SMgrRelation reln, ForkNumber forknum, int behavior) errmsg("could not open file \"%s\": %m", path))); } + fd_pc = -1; + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + /* open page compress file */ + pcfile_path = psprintf("%s_pc", path); + + fd_pc = PathNameOpenFile(pcfile_path, O_RDWR | PG_BINARY); + + if (fd_pc < 0) + { + if ((behavior & EXTENSION_RETURN_NULL) && + FILE_POSSIBLY_DELETED(errno)) + { + pfree(path); + pfree(pcfile_path); + return NULL; + } + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", pcfile_path))); + } + pfree(pcfile_path); + + SetupPageCompressMemoryMap(fd_pc, PAGE_COMPRESS_CHUNK_SIZE(reln), PAGE_COMPRESS_ALGORITHM(reln)); + } + pfree(path); _fdvec_resize(reln, forknum, 1); mdfd = &reln->md_seg_fds[forknum][0]; mdfd->mdfd_vfd = fd; + mdfd->mdfd_vfd_pc = fd_pc; mdfd->mdfd_segno = 0; Assert(_mdnblocks(reln, forknum, mdfd) <= ((BlockNumber) RELSEG_SIZE)); @@ -516,6 +791,10 @@ mdclose(SMgrRelation reln, ForkNumber forknum) MdfdVec *v = &reln->md_seg_fds[forknum][nopensegs - 1]; FileClose(v->mdfd_vfd); + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + FileClose(v->mdfd_vfd_pc); + } _fdvec_resize(reln, forknum, nopensegs - 1); nopensegs--; } @@ -536,11 +815,18 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) if (v == NULL) return false; - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + /* TODO not imp */ + } + else + { + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - (void) FilePrefetch(v->mdfd_vfd, seekpos, BLCKSZ, WAIT_EVENT_DATA_FILE_PREFETCH); + (void) FilePrefetch(v->mdfd_vfd, seekpos, BLCKSZ, WAIT_EVENT_DATA_FILE_PREFETCH); + } #endif /* USE_PREFETCH */ return true; @@ -567,6 +853,8 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, MdfdVec *v; int segnum_start, segnum_end; + int chunk_size; + PageCompressHeader *pcMap; v = _mdfd_getseg(reln, forknum, blocknum, true /* not used */ , EXTENSION_RETURN_NULL); @@ -589,9 +877,23 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, Assert(nflush >= 1); Assert(nflush <= nblocks); - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(v->mdfd_vfd_pc, chunk_size); + + if(pc_msync(pcMap) != 0) + elog(LOG, "failed to msync page compress map %s",FilePathName(v->mdfd_vfd_pc)); + /* TODO may be need to optimize */ + FileSync(v->mdfd_vfd_pc, WAIT_EVENT_DATA_FILE_FLUSH); + } + else + { + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - FileWriteback(v->mdfd_vfd, seekpos, (off_t) BLCKSZ * nflush, WAIT_EVENT_DATA_FILE_FLUSH); + FileWriteback(v->mdfd_vfd, seekpos, (off_t) BLCKSZ * nflush, WAIT_EVENT_DATA_FILE_FLUSH); + } nblocks -= nflush; blocknum += nflush; @@ -599,6 +901,159 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, } /* + * mdread_pc() -- Read the specified block from a page compressed relation. + */ +static void +mdread_pc(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer) +{ + off_t seekpos; + int nbytes,chunk_size,i,read_amount,range; + MdfdVec *v; + PageCompressHeader *pcMap; + PageCompressAddr *pcAddr; + char *compress_buffer,*buffer_pos; + + Assert(IS_COMPRESSED_MAINFORK(reln,forkNum)); + + v = _mdfd_getseg(reln, forknum, blocknum, false, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(v->mdfd_vfd_pc, chunk_size); + pcAddr = GetPageCompressAddr(pcMap, chunk_size, blocknum); + + /* check chunk number */ + if(pcAddr->nchunks <= 0 || pcAddr->nchunks > BLCKSZ / chunk_size) + { + if (zero_damaged_pages || InRecovery) + { + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunks %u of block %u in file \"%s\"", + pcAddr->nchunks, blocknum, FilePathName(v->mdfd_vfd_pc)))); + } + + for(i=0; i< pcAddr->nchunks; i++) + { + if(pcAddr->chunknos[i] <= 0 || pcAddr->chunknos[i] > (BLCKSZ / chunk_size) * RELSEG_SIZE) + { + if (zero_damaged_pages || InRecovery) + { + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk number %u of block %u in file \"%s\"", + pcAddr->chunknos[i], blocknum, FilePathName(v->mdfd_vfd_pc)))); + } + } + + /* read chunk data */ + compress_buffer = palloc(chunk_size * pcAddr->nchunks); + for(i=0; i< pcAddr->nchunks; i++) + { + buffer_pos = compress_buffer + chunk_size * i; + seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, pcAddr->chunknos[i]); + range = 1; + while(inchunks-1 && pcAddr->chunknos[i+1] == pcAddr->chunknos[i]+1) + { + range++; + i++; + } + read_amount = chunk_size * range; + + TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum, + reln->smgr_rnode.node.spcNode, + reln->smgr_rnode.node.dbNode, + reln->smgr_rnode.node.relNode, + reln->smgr_rnode.backend); + + nbytes = FileRead(v->mdfd_vfd_pc, buffer_pos, read_amount, seekpos, WAIT_EVENT_DATA_FILE_READ); + + TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, + reln->smgr_rnode.node.spcNode, + reln->smgr_rnode.node.dbNode, + reln->smgr_rnode.node.relNode, + reln->smgr_rnode.backend, + nbytes, + read_amount); + + if (nbytes != read_amount) + { + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read block %u in file \"%s\": %m", + blocknum, FilePathName(v->mdfd_vfd_pc)))); + + /* + * Short read: we are at or past EOF, or we read a partial block at + * EOF. Normally this is an error; upper levels should never try to + * read a nonexistent block. However, if zero_damaged_pages is ON or + * we are InRecovery, we should instead return zeroes without + * complaining. This allows, for example, the case of trying to + * update a block that was later truncated away. + */ + if (zero_damaged_pages || InRecovery) + { + pfree(compress_buffer); + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read block %u in file \"%s\": read only %d of %d bytes", + blocknum, FilePathName(v->mdfd_vfd_pc), + nbytes, read_amount))); + } + + } + + /* decompress chunk data */ + if(pcAddr->nchunks == BLCKSZ / chunk_size) + { + memcpy(buffer, compress_buffer, BLCKSZ); + } + else + { + nbytes = decompress_page(compress_buffer, buffer, PAGE_COMPRESS_ALGORITHM(reln) ); + if (nbytes != BLCKSZ) + { + /* + * Short read: we are at or past EOF, or we read a partial block at + * EOF. Normally this is an error; upper levels should never try to + * read a nonexistent block. However, if zero_damaged_pages is ON or + * we are InRecovery, we should instead return zeroes without + * complaining. This allows, for example, the case of trying to + * update a block that was later truncated away. + */ + if (zero_damaged_pages || InRecovery) + { + pfree(compress_buffer); + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not decompress block %u in file \"%s\": decompress %d of %d bytes", + blocknum, FilePathName(v->mdfd_vfd_pc), + nbytes, BLCKSZ))); + } + } + + pfree(compress_buffer); +} + +/* * mdread() -- Read the specified block from a relation. */ void @@ -609,6 +1064,9 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nbytes; MdfdVec *v; + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + return mdread_pc(reln, forknum, blocknum, buffer); + TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, @@ -616,7 +1074,7 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, reln->smgr_rnode.backend); v = _mdfd_getseg(reln, forknum, blocknum, false, - EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); @@ -625,41 +1083,157 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, - reln->smgr_rnode.node.spcNode, - reln->smgr_rnode.node.dbNode, - reln->smgr_rnode.node.relNode, - reln->smgr_rnode.backend, - nbytes, - BLCKSZ); + reln->smgr_rnode.node.spcNode, + reln->smgr_rnode.node.dbNode, + reln->smgr_rnode.node.relNode, + reln->smgr_rnode.backend, + nbytes, + BLCKSZ); if (nbytes != BLCKSZ) { if (nbytes < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read block %u in file \"%s\": %m", + errmsg("could not read block %u in file \"%s\": %m", blocknum, FilePathName(v->mdfd_vfd)))); /* - * Short read: we are at or past EOF, or we read a partial block at - * EOF. Normally this is an error; upper levels should never try to - * read a nonexistent block. However, if zero_damaged_pages is ON or - * we are InRecovery, we should instead return zeroes without - * complaining. This allows, for example, the case of trying to - * update a block that was later truncated away. - */ + * Short read: we are at or past EOF, or we read a partial block at + * EOF. Normally this is an error; upper levels should never try to + * read a nonexistent block. However, if zero_damaged_pages is ON or + * we are InRecovery, we should instead return zeroes without + * complaining. This allows, for example, the case of trying to + * update a block that was later truncated away. + */ if (zero_damaged_pages || InRecovery) MemSet(buffer, 0, BLCKSZ); else ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read block %u in file \"%s\": read only %d of %d bytes", + errmsg("could not read block %u in file \"%s\": read only %d of %d bytes", blocknum, FilePathName(v->mdfd_vfd), nbytes, BLCKSZ))); } } /* + * mdwrite_pc() -- Write the supplied block at the appropriate location for page compressed relation. + * + * This is to be used only for updating already-existing blocks of a + * relation (ie, those before the current EOF). To extend a relation, + * use mdextend(). + */ +static void +mdwrite_pc(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer, bool skipFsync) +{ + off_t seekpos; + int nbytes; + MdfdVec *v; + char *work_buffer,*buffer_pos; + int i; + int chunk_size,nchunks,range,write_amount; + pc_chunk_number_t chunkno; + PageCompressHeader *pcMap; + PageCompressAddr *pcAddr; + uint8 algorithm; + + /* This assert is too expensive to have on normally ... */ +#ifdef CHECK_WRITE_VS_EXTEND + Assert(blocknum >= mdnblocks(reln, forknum)); +#endif + + Assert(IS_COMPRESSED_MAINFORK(reln,forkNum)); + + v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + algorithm = PAGE_COMPRESS_ALGORITHM(reln); + + pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(v->mdfd_vfd_pc, chunk_size); + pcAddr = GetPageCompressAddr(pcMap, chunk_size, blocknum); + + work_buffer = compress_page(buffer, chunk_size, algorithm, &nchunks); + + /* store original page if compress failed */ + if(work_buffer == NULL) + { + work_buffer = buffer; + nchunks = BLCKSZ / chunk_size; + } + + /* check allocated chunk number */ + if(pcAddr->allocated_chunks > BLCKSZ / chunk_size) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunks %u of block %u in file \"%s\"", + pcAddr->allocated_chunks, blocknum, FilePathName(v->mdfd_vfd_pc)))); + + for(i=0; i< pcAddr->allocated_chunks; i++) + { + if(pcAddr->chunknos[i] <= 0 || pcAddr->chunknos[i] > (BLCKSZ / chunk_size) * RELSEG_SIZE) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk number %u of block %u in file \"%s\"", + pcAddr->chunknos[i], blocknum, FilePathName(v->mdfd_vfd_pc)))); + } + + /* TODO automatic read ? */ + if(pcAddr->allocated_chunks < nchunks) + { + chunkno = (pc_chunk_number_t)pg_atomic_fetch_add_u32(&pcMap->allocated_chunks, nchunks - pcAddr->allocated_chunks) + 1; + for(i = pcAddr->allocated_chunks ;ichunknos[i] = chunkno; + } + pcAddr->allocated_chunks = nchunks; + } + pcAddr->nchunks = nchunks; + + /*if(pc_msync(pcMap) != 0) + elog(LOG, "failed to msync page compress map %s",FilePathName(v->mdfd_vfd_pc)); + */ + + for(i=0; i < nchunks;i++) + { + buffer_pos = work_buffer + chunk_size * i; + seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, pcAddr->chunknos[i]); + range = 1; + while(i < nchunks -1 && pcAddr->chunknos[i+1] == pcAddr->chunknos[i] + 1) + { + range++; + i++; + } + write_amount = chunk_size * range; + + if ((nbytes = FileWrite(v->mdfd_vfd_pc, buffer_pos, write_amount, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != write_amount) + { + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write block %u in file \"%s\": %m", + blocknum, FilePathName(v->mdfd_vfd_pc)))); + /* short write: complain appropriately */ + ereport(ERROR, + (errcode(ERRCODE_DISK_FULL), + errmsg("could not write block %u in file \"%s\": wrote only %d of %d bytes", + blocknum, + FilePathName(v->mdfd_vfd_pc), + nbytes, write_amount), + errhint("Check free disk space."))); + } + } + + if(work_buffer != buffer) + pfree(work_buffer); + + if (!skipFsync && !SmgrIsTemp(reln)) + register_dirty_segment(reln, forknum, v); +} + +/* * mdwrite() -- Write the supplied block at the appropriate location. * * This is to be used only for updating already-existing blocks of a @@ -679,6 +1253,9 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, Assert(blocknum < mdnblocks(reln, forknum)); #endif + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + return mdwrite_pc(reln, forknum, blocknum, buffer, skipFsync); + TRACE_POSTGRESQL_SMGR_MD_WRITE_START(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, @@ -791,7 +1368,10 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) { BlockNumber curnblk; BlockNumber priorblocks; - int curopensegs; + BlockNumber blk; + int curopensegs,chunk_size; + PageCompressHeader *pcMap; + PageCompressAddr *pcAddr; /* * NOTE: mdnblocks makes sure we have opened all active segments, so that @@ -830,11 +1410,30 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) * This segment is no longer active. We truncate the file, but do * not delete it, for reasons explained in the header comments. */ - if (FileTruncate(v->mdfd_vfd, 0, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not truncate file \"%s\": %m", - FilePathName(v->mdfd_vfd)))); + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(v->mdfd_vfd_pc, chunk_size); + + memset((char *)pcMap + SizeOfPageCompressHeaderData, 0x00, SizeofPageCompressMemoryMapArea(chunk_size) - SizeOfPageCompressHeaderData); + pg_atomic_write_u32(&pcMap->nblocks, 0); + pg_atomic_write_u32(&pcMap->allocated_chunks, 0); + if(pc_msync(pcMap) != 0) + elog(LOG, "failed to msync page compress map %s",FilePathName(v->mdfd_vfd_pc)); + + if (FileTruncate(v->mdfd_vfd_pc, SizeofPageCompressMemoryMapArea(chunk_size), WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\": %m", + FilePathName(v->mdfd_vfd_pc)))); + } + else{ + if (FileTruncate(v->mdfd_vfd, 0, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\": %m", + FilePathName(v->mdfd_vfd)))); + } if (!SmgrIsTemp(reln)) register_dirty_segment(reln, forknum, v); @@ -842,26 +1441,48 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) /* we never drop the 1st segment */ Assert(v != &reln->md_seg_fds[forknum][0]); - FileClose(v->mdfd_vfd); + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + FileClose(v->mdfd_vfd_pc); + else + FileClose(v->mdfd_vfd); + _fdvec_resize(reln, forknum, curopensegs - 1); } else if (priorblocks + ((BlockNumber) RELSEG_SIZE) > nblocks) { - /* - * This is the last segment we want to keep. Truncate the file to - * the right length. NOTE: if nblocks is exactly a multiple K of - * RELSEG_SIZE, we will truncate the K+1st segment to 0 length but - * keep it. This adheres to the invariant given in the header - * comments. - */ - BlockNumber lastsegblocks = nblocks - priorblocks; - - if (FileTruncate(v->mdfd_vfd, (off_t) lastsegblocks * BLCKSZ, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not truncate file \"%s\" to %u blocks: %m", - FilePathName(v->mdfd_vfd), - nblocks))); + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(v->mdfd_vfd_pc, chunk_size); + + for(blk = nblocks - priorblocks; blk < RELSEG_SIZE; blk++) + { + pcAddr = GetPageCompressAddr(pcMap, chunk_size, blk); + pcAddr->nchunks = 0; + } + + pg_atomic_write_u32(&pcMap->nblocks, nblocks - priorblocks); + if(pc_msync(pcMap) != 0) + elog(LOG, "failed to msync page compress map %s",FilePathName(v->mdfd_vfd_pc)); + } + else + { + /* + * This is the last segment we want to keep. Truncate the file to + * the right length. NOTE: if nblocks is exactly a multiple K of + * RELSEG_SIZE, we will truncate the K+1st segment to 0 length but + * keep it. This adheres to the invariant given in the header + * comments. + */ + BlockNumber lastsegblocks = nblocks - priorblocks; + + if (FileTruncate(v->mdfd_vfd, (off_t) lastsegblocks * BLCKSZ, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\" to %u blocks: %m", + FilePathName(v->mdfd_vfd), + nblocks))); + } if (!SmgrIsTemp(reln)) register_dirty_segment(reln, forknum, v); } @@ -915,16 +1536,34 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum) { MdfdVec *v = &reln->md_seg_fds[forknum][segno - 1]; - if (FileSync(v->mdfd_vfd, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) < 0) - ereport(data_sync_elevel(ERROR), - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\": %m", - FilePathName(v->mdfd_vfd)))); + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + PageCompressHeader *pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(v->mdfd_vfd_pc, PAGE_COMPRESS_CHUNK_SIZE(reln)); + if(pc_msync(pcMap) != 0) + elog(LOG, "failed to msync page compress map %s",FilePathName(v->mdfd_vfd_pc)); + + if (FileSync(v->mdfd_vfd_pc, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) < 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + FilePathName(v->mdfd_vfd_pc)))); + } + else + { + if (FileSync(v->mdfd_vfd, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) < 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + FilePathName(v->mdfd_vfd)))); + } /* Close inactive segments immediately */ if (segno > min_inactive_seg) { - FileClose(v->mdfd_vfd); + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + FileClose(v->mdfd_vfd_pc); + else + FileClose(v->mdfd_vfd); _fdvec_resize(reln, forknum, segno - 1); } @@ -956,11 +1595,25 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) ereport(DEBUG1, (errmsg("could not forward fsync request because request queue is full"))); - if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0) - ereport(data_sync_elevel(ERROR), - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\": %m", - FilePathName(seg->mdfd_vfd)))); + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + PageCompressHeader *pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(seg->mdfd_vfd_pc, PAGE_COMPRESS_CHUNK_SIZE(reln)); + if(pc_msync(pcMap) != 0) + elog(LOG, "failed to msync page compress map %s",FilePathName(seg->mdfd_vfd_pc)); + + if (FileSync(seg->mdfd_vfd_pc, WAIT_EVENT_DATA_FILE_SYNC) < 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + FilePathName(seg->mdfd_vfd_pc)))); + }else + { + if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + FilePathName(seg->mdfd_vfd)))); + } } } @@ -1114,18 +1767,39 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno, int oflags) { MdfdVec *v; - File fd; - char *fullpath; + File fd,fd_pc; + char *fullpath,*pcfile_path; fullpath = _mdfd_segpath(reln, forknum, segno); /* open the file */ fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags); - pfree(fullpath); - if (fd < 0) + { + pfree(fullpath); return NULL; + } + + fd_pc = -1; + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + /* open page compress file */ + pcfile_path = psprintf("%s_pc", fullpath); + fd_pc = PathNameOpenFile(pcfile_path, O_RDWR | PG_BINARY | oflags); + + pfree(pcfile_path); + + if (fd_pc < 0) + { + pfree(fullpath); + return NULL; + } + + SetupPageCompressMemoryMap(fd_pc, PAGE_COMPRESS_CHUNK_SIZE(reln), PAGE_COMPRESS_ALGORITHM(reln)); + } + + pfree(fullpath); /* * Segments are always opened in order from lowest to highest, so we must @@ -1138,6 +1812,7 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno, /* fill the entry */ v = &reln->md_seg_fds[forknum][segno]; v->mdfd_vfd = fd; + v->mdfd_vfd_pc = fd_pc; v->mdfd_segno = segno; Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE)); @@ -1283,6 +1958,13 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) { off_t len; + PageCompressHeader *pcMap; + + if(IS_COMPRESSED_MAINFORK(reln,forknum)) + { + pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(seg->mdfd_vfd_pc, PAGE_COMPRESS_CHUNK_SIZE(reln)); + return (BlockNumber) pg_atomic_read_u32(&pcMap->nblocks); + } len = FileSize(seg->mdfd_vfd); if (len < 0) @@ -1308,11 +1990,15 @@ mdsyncfiletag(const FileTag *ftag, char *path) bool need_to_close; int result, save_errno; + PageCompressHeader *pcMap; /* See if we already have the file open, or need to open it. */ if (ftag->segno < reln->md_num_open_segs[ftag->forknum]) { - file = reln->md_seg_fds[ftag->forknum][ftag->segno].mdfd_vfd; + if(IS_COMPRESSED_MAINFORK(reln,ftag->forknum)) + file = reln->md_seg_fds[ftag->forknum][ftag->segno].mdfd_vfd_pc; + else + file = reln->md_seg_fds[ftag->forknum][ftag->segno].mdfd_vfd; strlcpy(path, FilePathName(file), MAXPGPATH); need_to_close = false; } @@ -1321,13 +2007,26 @@ mdsyncfiletag(const FileTag *ftag, char *path) char *p; p = _mdfd_segpath(reln, ftag->forknum, ftag->segno); - strlcpy(path, p, MAXPGPATH); + if(IS_COMPRESSED_MAINFORK(reln,ftag->forknum)) + snprintf(path, MAXPGPATH, "%s_pc", p); + else + strlcpy(path, p, MAXPGPATH); pfree(p); file = PathNameOpenFile(path, O_RDWR | PG_BINARY); if (file < 0) return -1; need_to_close = true; + + if(IS_COMPRESSED_MAINFORK(reln,ftag->forknum)) + SetupPageCompressMemoryMap(file, PAGE_COMPRESS_CHUNK_SIZE(reln), PAGE_COMPRESS_ALGORITHM(reln)); + } + + if(IS_COMPRESSED_MAINFORK(reln,ftag->forknum)) + { + pcMap = (PageCompressHeader *)GetPageCompressMemoryMap(file, PAGE_COMPRESS_CHUNK_SIZE(reln)); + if(pc_msync(pcMap) != 0) + elog(LOG, "failed to msync page compress map %s",FilePathName(file)); } /* Sync the file. */ @@ -1350,15 +2049,27 @@ mdsyncfiletag(const FileTag *ftag, char *path) int mdunlinkfiletag(const FileTag *ftag, char *path) { + SMgrRelation reln = smgropen(ftag->rnode, InvalidBackendId); char *p; + int ret; /* Compute the path. */ p = relpathperm(ftag->rnode, MAIN_FORKNUM); strlcpy(path, p, MAXPGPATH); - pfree(p); /* Try to unlink the file. */ - return unlink(path); + ret = unlink(path); + + if((ret == 0 || errno == ENOENT) && + IS_COMPRESSED_MAINFORK(reln,ftag->forknum)) + { + snprintf(path, MAXPGPATH, "%s_pc", p); + ret = unlink(path); + } + + pfree(p); + + return ret; } /* diff --git a/src/backend/storage/smgr/page_compression.c b/src/backend/storage/smgr/page_compression.c new file mode 100644 index 0000000..2e3798e --- /dev/null +++ b/src/backend/storage/smgr/page_compression.c @@ -0,0 +1,240 @@ +/* + * page_compression.c + * Routines for page compression + * + * There are two implementations at the moment: zstd, and the Postgres + * pg_lzcompress(). zstd support requires that the server was compiled + * with --with-zstd. + * + * Copyright (c) 2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/storage/smgr/page_compression.c + */ +#include "postgres.h" +#include "miscadmin.h" + +#include +#include +#include +#include +#include +#include +#include + +#ifdef USE_ZSTD +#include +#endif + +#include "storage/page_compression.h" +#include "common/pg_lzcompress.h" +#include "utils/datum.h" + + +/** + * compress_page() -- Compress one page. + * + * Only the parts other than the page header will be compressed. The + * compressed data is rounded by chunck_size, and the compressed + * data and number of chuncks are returned. Compression needs to be + * able to save at least 1 chunk of space, otherwise it returns NULL. + */ +char * +compress_page(const char *src, int chunck_size, uint8 algorithm, int *nchuncks) +{ + int compressed_size,targetDstSize; + PageCompressData *pcdptr; + char *dst; + + *nchuncks = 0; + + targetDstSize = BLCKSZ - chunck_size; + + if(targetDstSize < chunck_size) + return NULL; + + switch(algorithm) + { + case COMPRESS_TYPE_PGLZ: + dst = palloc(BLCKSZ + 4); + pcdptr = (PageCompressData *)dst; + + compressed_size = pglz_compress(src + SizeOfPageHeaderData, + BLCKSZ - SizeOfPageHeaderData, + pcdptr->data, + PGLZ_strategy_always);//TODO PGLZ_strategy_default? PGLZ_strategy_always + break; + +#ifdef USE_ZSTD + case COMPRESS_TYPE_ZSTD: + { + size_t out_len = ZSTD_compressBound(BLCKSZ - SizeOfPageHeaderData); + dst = palloc(out_len); + pcdptr = (PageCompressData *)dst; + + compressed_size = ZSTD_compress(pcdptr->data, + out_len, + src + SizeOfPageHeaderData, + BLCKSZ - SizeOfPageHeaderData, + 3); + + if (ZSTD_isError(compressed_size)) + { + elog(WARNING, "ZSTD_compress failed: %s", ZSTD_getErrorName(compressed_size)); + pfree(dst); + return NULL; + } + break; + } +#endif + default: + elog(ERROR, "unrecognized compression algorithm %d",algorithm); + break; + + } + + elog(DEBUG1, "compress_page() called: compressed_size=%d", compressed_size); + + if(compressed_size < 0 || + SizeOfPageCompressDataHeaderData + compressed_size > targetDstSize) + { + pfree(dst); + return NULL; + } + + memcpy(pcdptr->page_header, src, SizeOfPageHeaderData); + pcdptr->size = compressed_size; + + *nchuncks = (SizeOfPageCompressDataHeaderData + compressed_size + chunck_size -1 ) / chunck_size; + + if((SizeOfPageCompressDataHeaderData + compressed_size) < chunck_size * (*nchuncks)) + { + memset(pcdptr->data + compressed_size, + 0x00, + chunck_size * (*nchuncks) - SizeOfPageCompressDataHeaderData - compressed_size); + } + + return dst; +} + +/** + * decompress_page() -- Decompress one compressed page. + * + * note:The size of dst must be greater than or equal to BLCKSZ. + */ +int +decompress_page(const char * src, char *dst, uint8 algorithm) +{ + int decompressed_size; + PageCompressData *pcdptr; + + pcdptr = (PageCompressData *)src; + + memcpy(dst, src, SizeOfPageHeaderData); + + switch(algorithm) + { + case COMPRESS_TYPE_PGLZ: + decompressed_size = pglz_decompress(pcdptr->data, + pcdptr->size, + dst + SizeOfPageHeaderData, + BLCKSZ - SizeOfPageHeaderData, + false); + break; + +#ifdef USE_ZSTD + case COMPRESS_TYPE_ZSTD: + decompressed_size = ZSTD_decompress(dst + SizeOfPageHeaderData, + BLCKSZ - SizeOfPageHeaderData, + pcdptr->data, + pcdptr->size); + + if (ZSTD_isError(decompressed_size)) + { + elog(WARNING, "ZSTD_decompress failed: %s", ZSTD_getErrorName(decompressed_size)); + return -1; + } + + break; +#endif + + default: + elog(ERROR, "unrecognized compression algorithm %d",algorithm); + break; + + } + + return SizeOfPageHeaderData + decompressed_size; +} + + +/** + * pc_mmap() -- create memory map for page compress file's address area. + * + */ +PageCompressHeader * +pc_mmap(int fd, int chunk_size) +{ + PageCompressHeader *map; + int file_size,pc_memory_map_size; + + pc_memory_map_size = SizeofPageCompressMemoryMapArea(chunk_size); + + file_size = lseek(fd, 0, SEEK_END); + if(file_size < pc_memory_map_size) + { + //if (ftruncate(fd, pc_memory_map_size) != 0) + if (fallocate(fd, 0, 0, pc_memory_map_size) != 0) + return (PageCompressHeader *) MAP_FAILED; + } + +#ifdef WIN32 + { + HANDLE mh = CreateSnapshotMapping((HANDLE)_get_osfhandle(fd), NULL, PAGE_READWRITE, + 0, (DWORD) pc_memory_map_size, NULL); + + if (mh == NULL) + return (PageCompressHeader *) MAP_FAILED; + + map = (PageCompressHeader *) MapViewOfFile(mh, FILE_MAP_ALL_ACCESS, 0, 0, 0); + CloseHandle(mh); + } + if (map == NULL) + return (PageCompressHeader *) MAP_FAILED; + +#else + map = (PageCompressHeader *) mmap(NULL, pc_memory_map_size, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0); +#endif + return map; +} + +/** + * pc_munmap() -- release memory map of page compress file. + * + */ +int +pc_munmap(PageCompressHeader * map) +{ +#ifdef WIN32 + return UnmapViewOfFile(map) ? 0 : -1; +#else + return munmap(map, SizeofPageCompressMemoryMapArea(map->chunk_size)); +#endif +} + +/** + * pc_msync() -- sync memory map of page compress file. + * + */ +int +pc_msync(PageCompressHeader *map) +{ + if (!enableFsync) + return 0; +#ifdef WIN32 + return FlushViewOfFile(map, SizeofPageCompressMemoryMapArea(map->chunk_size)) ? 0 : -1; +#else + return msync(map, SizeofPageCompressMemoryMapArea(map->chunk_size), MS_SYNC); +#endif +} + diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c index 8406644..d505d81 100644 --- a/src/backend/utils/adt/dbsize.c +++ b/src/backend/utils/adt/dbsize.c @@ -292,6 +292,9 @@ calculate_relation_size(RelFileNode *rfn, BackendId backend, ForkNumber forknum) snprintf(pathname, MAXPGPATH, "%s.%u", relationpath, segcount); + if(rfn->compress_algorithm != COMPRESS_TYPE_NONE) + strlcat(pathname, "_pc", MAXPGPATH); + if (stat(pathname, &fst) < 0) { if (errno == ENOENT) diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 0b9eb00..6e95282 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -295,6 +295,7 @@ static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid, StrategyNumber numSupport); static void RelationCacheInitFileRemoveInDir(const char *tblspcpath); static void unlink_initfile(const char *initfilename, int elevel); +static void SetupPageCompressForRelation(Relation relation, PageCompressOpts *compress_options); /* @@ -1329,6 +1330,23 @@ RelationInitPhysicalAddr(Relation relation) elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u", RelationGetRelationName(relation), relation->rd_id); } + + if(relation->rd_options) + { + switch(relation->rd_rel->relam) + { + case HEAP_TABLE_AM_OID: + SetupPageCompressForRelation(relation, &((StdRdOptions *)(relation->rd_options))->compress); + break; + + case BTREE_AM_OID: + SetupPageCompressForRelation(relation, &((BTOptions *)(relation->rd_options))->compress); + break; + + default: + break; + } + } } /* @@ -3342,7 +3360,8 @@ RelationBuildLocalRelation(const char *relname, bool shared_relation, bool mapped_relation, char relpersistence, - char relkind) + char relkind, + Datum reloptions) { Relation rel; MemoryContext oldcxt; @@ -3519,6 +3538,15 @@ RelationBuildLocalRelation(const char *relname, RelationInitPhysicalAddr(rel); + /* setup page compress option */ + if (reloptions && + (relkind == RELKIND_RELATION || + relkind == RELKIND_INDEX)) + { + StdRdOptions *options = (StdRdOptions *)default_reloptions(reloptions, false, RELOPT_KIND_HEAP); + SetupPageCompressForRelation(rel, &options->compress); + } + rel->rd_rel->relam = accessmtd; if (relkind == RELKIND_RELATION || @@ -6396,3 +6424,41 @@ unlink_initfile(const char *initfilename, int elevel) initfilename))); } } + +/* setup page compress options for relation */ +static void +SetupPageCompressForRelation(Relation relation, PageCompressOpts *compress_options) +{ + if(compress_options->compress_type == COMPRESS_TYPE_NONE) + { + relation->rd_node.compress_algorithm = COMPRESS_TYPE_NONE; + relation->rd_node.compress_chunk_size = 0; + relation->rd_node.compress_prealloc_chunks = 0; + } + else + { +#ifndef USE_ZSTD + if(compress_options->compress_type == COMPRESS_TYPE_ZSTD) + elog(ERROR, "unsupported compression algorithm %s","zstd"); +#endif + + relation->rd_node.compress_algorithm = compress_options->compress_type; + + if(compress_options->compress_chunk_size != BLCKSZ / 2 && + compress_options->compress_chunk_size != BLCKSZ / 4 && + compress_options->compress_chunk_size != BLCKSZ / 8) + { + elog(ERROR, "invalid compress_chunk_size %d , must be one of %d, %d or %d for %s", + compress_options->compress_chunk_size, + BLCKSZ / 8, BLCKSZ / 4, BLCKSZ / 2, + RelationGetRelationName(relation)); + } + + relation->rd_node.compress_chunk_size = compress_options->compress_chunk_size; + + if(compress_options->compress_prealloc_chunks >= BLCKSZ / compress_options->compress_chunk_size) + relation->rd_node.compress_prealloc_chunks = (uint8)(BLCKSZ / compress_options->compress_chunk_size - 1); + else + relation->rd_node.compress_prealloc_chunks = (uint8)(compress_options->compress_prealloc_chunks); + } +} \ No newline at end of file diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 4e2b056b..3f72571 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -23,6 +23,7 @@ #include "lib/stringinfo.h" #include "storage/bufmgr.h" #include "storage/shm_toc.h" +#include "utils/rel.h" /* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */ typedef uint16 BTCycleId; @@ -962,6 +963,7 @@ typedef struct BTOptions /* fraction of newly inserted tuples prior to trigger index cleanup */ float8 vacuum_cleanup_index_scale_factor; bool deduplicate_items; /* Try to deduplicate items? */ + PageCompressOpts compress; /* page compress related reloptions. */ } BTOptions; #define BTGetFillFactor(relation) \ diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index cbfdfe2..184366f 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -57,6 +57,7 @@ extern Relation heap_create(const char *relname, char relpersistence, bool shared_relation, bool mapped_relation, + Datum reloptions, bool allow_system_table_mods, TransactionId *relfrozenxid, MultiXactId *relminmxid); diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index c199cd4..d0f78c5 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -346,6 +346,9 @@ /* Define to 1 if you have the `z' library (-lz). */ #undef HAVE_LIBZ +/* Define to 1 if you have the `zstd' library (-lzstd). */ +#undef HAVE_LIBZSTD + /* Define to 1 if you have the `link' function. */ #undef HAVE_LINK @@ -686,6 +689,9 @@ /* Define to 1 if the assembler supports X86_64's POPCNTQ instruction. */ #undef HAVE_X86_64_POPCNTQ +/* Define to 1 if you have the header file. */ +#undef HAVE_ZSTD_H + /* Define to 1 if the system has the type `_Bool'. */ #undef HAVE__BOOL @@ -919,6 +925,9 @@ /* Define to select Win32-style shared memory. */ #undef USE_WIN32_SHARED_MEMORY +/* Define to 1 to build with zstd support. (--with-zstd) */ +#undef USE_ZSTD + /* Define to 1 if `wcstombs_l' requires . */ #undef WCSTOMBS_L_IN_XLOCALE diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 8cd125d..e7b74ee 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -132,6 +132,10 @@ extern void ReleaseExternalFD(void); /* Make a directory with default permissions */ extern int MakePGDirectory(const char *directoryName); +/* Page compression support routines */ +extern void SetupPageCompressMemoryMap(File file, int chunk_size, uint8 algorithm); +extern void *GetPageCompressMemoryMap(File file, int chunk_size); + /* Miscellaneous support routines */ extern void InitFileAccess(void); extern void set_max_safe_fds(void); diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h new file mode 100644 index 0000000..40a49d4 --- /dev/null +++ b/src/include/storage/page_compression.h @@ -0,0 +1,84 @@ +/* + * page_compression.h + * internal declarations for page compression + * + * Copyright (c) 2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/storage/page_compression.h + */ + +#ifndef PAGE_COMPRESSION_H +#define PAGE_COMPRESSION_H + +#include "storage/bufpage.h" +#include "port/atomics.h" +#include "utils/rel.h" + +typedef uint32 pc_chunk_number_t; + +/* + * layout of Page Compress file: + * + * - PageCompressHeader + * - PageCompressAddr[] + * - chuncks of PageCompressData + * + */ +typedef struct PageCompressHeader +{ + pg_atomic_uint32 nblocks; /* number of total blocks in this segment */ + pg_atomic_uint32 allocated_chunks; /* number of total allocated chunks in data area */ + uint16 chunk_size; /* size of each chunk, must be 1/2 1/4 or 1/8 of BLCKSZ */ + uint8 algorithm; /* compress algorithm, 1=pglz, 2=lz4 */ +} PageCompressHeader; + +typedef struct PageCompressAddr +{ + uint8 nchunks; /* number of chunks for this block */ + uint8 allocated_chunks; /* number of allocated chunks for this block */ + + /* variable-length fields, 1 based chunk no array for this block, size of the array must be 2, 4 or 8 */ + pc_chunk_number_t chunknos[FLEXIBLE_ARRAY_MEMBER]; +} PageCompressAddr; + +typedef struct PageCompressData +{ + char page_header[SizeOfPageHeaderData]; /* page header */ + uint32 size; /* size of compressed data */ + char data[FLEXIBLE_ARRAY_MEMBER]; /* compressed page, except for the page header */ +} PageCompressData; + + +#define SizeOfPageCompressHeaderData sizeof(PageCompressHeader) +#define SizeOfPageCompressAddrHeaderData offsetof(PageCompressAddr, chunknos) +#define SizeOfPageCompressDataHeaderData offsetof(PageCompressData, data) + +#define SizeOfPageCompressAddr(chunk_size) \ + (SizeOfPageCompressAddrHeaderData + sizeof(pc_chunk_number_t) * BLCKSZ / (chunk_size)) + +#define OffsetOfPageCompressAddr(chunk_size,blockno) \ + (MAXALIGN(SizeOfPageCompressHeaderData) + SizeOfPageCompressAddr(chunk_size) * (blockno)) + +#define GetPageCompressAddr(pcbuffer,chunk_size,blockno) \ + (PageCompressAddr *)((char *)pcbuffer + OffsetOfPageCompressAddr(chunk_size,(blockno) % RELSEG_SIZE)) + +#define OffsetOfFirstPageCompressChunck(chunk_size) \ + (((OffsetOfPageCompressAddr(chunk_size, RELSEG_SIZE) + BLCKSZ - 1)/ BLCKSZ) * BLCKSZ) + +#define OffsetOfPageCompressChunk(chunk_size, chunkno) \ + (OffsetOfFirstPageCompressChunck(chunk_size) + (chunk_size) * (chunkno - 1)) + +#define SizeofPageCompressMemoryMapArea(chunk_size) OffsetOfFirstPageCompressChunck(chunk_size) + + +/* Compress function */ +extern char *compress_page(const char *src, int chunck_size, uint8 algorithm, int *nchuncks); +extern int decompress_page(const char * src, char *dst, uint8 algorithm); + +/* Memory mapping function */ +extern PageCompressHeader * pc_mmap(int fd, int chunk_size); +extern int pc_munmap(PageCompressHeader * map); +extern int pc_msync(PageCompressHeader * map); + +#endif /* PAGE_COMPRESSION_H */ diff --git a/src/include/storage/relfilenode.h b/src/include/storage/relfilenode.h index 4de9fc1..939764a 100644 --- a/src/include/storage/relfilenode.h +++ b/src/include/storage/relfilenode.h @@ -59,6 +59,9 @@ typedef struct RelFileNode Oid spcNode; /* tablespace */ Oid dbNode; /* database */ Oid relNode; /* relation */ + uint32 compress_chunk_size; /* chunk size of compressed data */ + uint8 compress_algorithm; /* compress algorithm */ + uint8 compress_prealloc_chunks; /* prealloced chunks to store compressed data */ } RelFileNode; /* diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 0b5957b..c933e01 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -294,6 +294,22 @@ typedef struct AutoVacOpts float8 analyze_scale_factor; } AutoVacOpts; +/* PageCompressOpts->compress_type values */ +typedef enum compressTypeOption +{ + COMPRESS_TYPE_NONE, + COMPRESS_TYPE_PGLZ, + COMPRESS_TYPE_ZSTD +} compressTypeOption; + + /* page compress related reloptions. */ +typedef struct PageCompressOpts +{ + compressTypeOption compress_type; /* compress algorithm */ + int compress_chunk_size; /* chunk size of compressed data */ + int compress_prealloc_chunks; /* prealloced chunks to store compressed data */ +} PageCompressOpts; + typedef struct StdRdOptions { int32 vl_len_; /* varlena header (do not touch directly!) */ @@ -305,6 +321,7 @@ typedef struct StdRdOptions int parallel_workers; /* max number of parallel workers */ bool vacuum_index_cleanup; /* enables index vacuuming and cleanup */ bool vacuum_truncate; /* enables vacuum to truncate a relation */ + PageCompressOpts compress; /* page compress related reloptions. */ } StdRdOptions; #define HEAP_MIN_FILLFACTOR 10 diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h index 9a85b7d..259ec2a 100644 --- a/src/include/utils/relcache.h +++ b/src/include/utils/relcache.h @@ -106,7 +106,8 @@ extern Relation RelationBuildLocalRelation(const char *relname, bool shared_relation, bool mapped_relation, char relpersistence, - char relkind); + char relkind, + Datum reloptions); /* * Routines to manage assignment of new relfilenode to a relation