diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 8ec60ded76..74043ff331 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -19,6 +19,7 @@ OBJS = \ basebackup.o \ basebackup_copy.o \ basebackup_gzip.o \ + basebackup_lz4.o \ basebackup_progress.o \ basebackup_server.o \ basebackup_sink.o \ diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index d6df3fdeb2..64641903bf 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -64,7 +64,8 @@ typedef enum typedef enum { BACKUP_COMPRESSION_NONE, - BACKUP_COMPRESSION_GZIP + BACKUP_COMPRESSION_GZIP, + BACKUP_COMPRESSION_LZ4 } basebackup_compression_type; typedef struct @@ -303,6 +304,8 @@ perform_base_backup(basebackup_options *opt) /* Set up server-side compression, if client requested it */ if (opt->compression == BACKUP_COMPRESSION_GZIP) sink = bbsink_gzip_new(sink, opt->compression_level); + else if (opt->compression == BACKUP_COMPRESSION_LZ4) + sink = bbsink_lz4_new(sink); /* Set up progress reporting. */ sink = progress_sink = bbsink_progress_new(sink, opt->progress); @@ -936,6 +939,8 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->compression = BACKUP_COMPRESSION_GZIP; opt->compression_level = optval[4] - '0'; } + else if (strcmp(optval, "lz4") == 0) + opt->compression = BACKUP_COMPRESSION_LZ4; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), diff --git a/src/backend/replication/basebackup_lz4.c b/src/backend/replication/basebackup_lz4.c new file mode 100644 index 0000000000..85f51fea4d --- /dev/null +++ b/src/backend/replication/basebackup_lz4.c @@ -0,0 +1,291 @@ +/*------------------------------------------------------------------------- + * + * basebackup_lz4.c + * Basebackup sink implementing lz4 compression. + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_lz4.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#ifdef HAVE_LIBLZ4 +#include +#endif +#include + +#include "replication/basebackup_sink.h" + +#ifdef HAVE_LIBLZ4 + +/* + * Read the input buffer in CHUNK_SIZE length in each iteration and pass it to + * the lz4 compression. Defined as 8k, since the input buffer is multiple of + * BLCKSZ i.e. multiple of 8k. + */ +#define CHUNK_SIZE 8192 + +typedef struct bbsink_lz4 +{ + /* Common information for all types of sink. */ + bbsink base; + + LZ4F_compressionContext_t ctx; + LZ4F_preferences_t prefs; + size_t output_buffer_bound; + + /* Number of bytes staged in output buffer. */ + size_t bytes_written; +} bbsink_lz4; + +static void bbsink_lz4_begin_backup(bbsink *sink); +static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name); +static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in); +static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len); +static void bbsink_lz4_end_archive(bbsink *sink); + +const bbsink_ops bbsink_lz4_ops = { + .begin_backup = bbsink_lz4_begin_backup, + .begin_archive = bbsink_lz4_begin_archive, + .archive_contents = bbsink_lz4_archive_contents, + .end_archive = bbsink_lz4_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_lz4_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_forward_end_backup +}; +#endif + +/* Create a new basebackup sink that performs lz4 compression. */ +bbsink * +bbsink_lz4_new(bbsink *next) +{ +#ifndef HAVE_LIBLZ4 + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("lz4 compression is not supported by this build"))); +#else + bbsink_lz4 *sink; + + Assert(next != NULL); + + sink = palloc0(sizeof(bbsink_lz4)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops; + sink->base.bbs_next = next; + + return &sink->base; +#endif +} + +#ifdef HAVE_LIBLZ4 + +/* + * Begin backup. + */ +static void +bbsink_lz4_begin_backup(bbsink *sink) +{ + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; + size_t next_buf_len; + LZ4F_preferences_t *prefs = &mysink->prefs; + + /* Initialize compressor object. */ + prefs->frameInfo.blockSizeID = LZ4F_max256KB; + prefs->frameInfo.blockMode = LZ4F_blockLinked; + prefs->frameInfo.contentChecksumFlag = LZ4F_noContentChecksum; + prefs->frameInfo.frameType = LZ4F_frame; + prefs->frameInfo.contentSize = 0; + prefs->frameInfo.dictID = 0; + prefs->frameInfo.blockChecksumFlag = LZ4F_noBlockChecksum; + prefs->compressionLevel = 0; + prefs->autoFlush = 0; + prefs->favorDecSpeed = 0; + prefs->reserved[0] = 0; + prefs->reserved[1] = 0; + prefs->reserved[2] = 0; + + /* + * We need our own buffer, because we're going to pass different data to + * the next sink than what gets passed to us. + */ + mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length); + + /* + * Remember the compressed buffer bound needed for input buffer to avoid + * recomputation in bbsink_lz4_archive_contents(). + */ + mysink->output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length, + &mysink->prefs); + + /* + * Since LZ4F_compressUpdate() requires the output buffer of size equal or + * greater than that of LZ4F_compressBound(), make sure we have the next + * sink's bbs_buffer of length that can accommodate the compressed input + * buffer. + */ + next_buf_len = mysink->base.bbs_buffer_length + mysink->output_buffer_bound; + + /* + * The buffer length is expected to be a multiple of BLCKSZ, so round up. + */ + next_buf_len = next_buf_len + BLCKSZ - (next_buf_len % BLCKSZ); + + bbsink_begin_backup(sink->bbs_next, sink->bbs_state, next_buf_len); +} + +/* + * Prepare to compress the next archive. + */ +static void +bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; + char *lz4_archive_name; + LZ4F_errorCode_t ctxError; + size_t headerSize; + + ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION); + if (LZ4F_isError(ctxError)) + elog(ERROR, "could not create lz4 compression context: %s", + LZ4F_getErrorName(ctxError)); + + /* First of all write the frame header to destination buffer. */ + headerSize = LZ4F_compressBegin(mysink->ctx, + mysink->base.bbs_next->bbs_buffer, + mysink->base.bbs_next->bbs_buffer_length, + &mysink->prefs); + + if (LZ4F_isError(headerSize)) + elog(ERROR, "could not write lz4 header: %s", + LZ4F_getErrorName(headerSize)); + + /* + * We need to write the compressed data after the header in the output + * buffer. So, make sure to update the notion of bytes written to output + * buffer. + */ + mysink->bytes_written = mysink->bytes_written + headerSize; + + /* Add ".lz4" to the archive name. */ + lz4_archive_name = psprintf("%s.lz4", archive_name); + Assert(sink->bbs_next != NULL); + bbsink_begin_archive(sink->bbs_next, lz4_archive_name); + pfree(lz4_archive_name); +} + +/* + * Compress the input data to the output buffer until we run out of input + * data. Each time the output buffer falls below the compression bound for + * the input buffer, invoke the archive_contents() method for then next sink. + * + * Note that since we're compressing the input, it may very commonly happen + * that we consume all the input data without filling the output buffer. In + * that case, the compressed representation of the current input data won't + * actually be sent to the next bbsink until a later call to this function, + * or perhaps even not until bbsink_lz4_end_archive() is invoked. + */ +static void +bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in) +{ + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; + uint8 *next_in = (uint8 *) mysink->base.bbs_buffer; + + while (avail_in > 0) + { + size_t compressedSize; + int nextChunkLen = CHUNK_SIZE; + + /* Last chunk to be read from the input. */ + if (avail_in < CHUNK_SIZE) + nextChunkLen = avail_in; + + /* + * Read the nextChunkLen size of data from the input buffer and write the + * output data into unused portion of output buffer. + */ + compressedSize = LZ4F_compressUpdate(mysink->ctx, + mysink->base.bbs_next->bbs_buffer + mysink->bytes_written, + mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written, + next_in, + nextChunkLen, + NULL); + + if (LZ4F_isError(compressedSize)) + elog(ERROR, "could not compress data: %s", + LZ4F_getErrorName(compressedSize)); + + /* + * Update our notion of how many bytes we've written into output + * buffer. + */ + mysink->bytes_written = mysink->bytes_written + compressedSize; + + /* Advance the input start since we already read some data. */ + next_in = (uint8 *) next_in + nextChunkLen; + avail_in = avail_in - nextChunkLen; + + /* + * If the number of available bytes has fallen below the value computed + * by LZ4F_compressBound(), ask the next sink to process the data so + * that we can empty the buffer. + */ + if ((mysink->base.bbs_next->bbs_buffer_length - + mysink->bytes_written) < mysink->output_buffer_bound) + { + bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); + mysink->bytes_written = 0; + } + } +} + +/* + * Finalize the lz4 frame and then get that forwarded to the successor sink + * as archive content. Then, we can end processing for this archive. + */ +static void +bbsink_lz4_end_archive(bbsink *sink) +{ + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; + size_t compressedSize; + + /* Write output data into unused portion of output buffer. */ + Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length); + + compressedSize = LZ4F_compressEnd(mysink->ctx, + mysink->base.bbs_next->bbs_buffer + mysink->bytes_written, + mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written, + NULL); + + if (LZ4F_isError(compressedSize)) + elog(ERROR, "could not end lz4 compression: %s", + LZ4F_getErrorName(compressedSize)); + + /* Update our notion of how many bytes we've written. */ + mysink->bytes_written = mysink->bytes_written + compressedSize; + + /* Send whatever accumulated output bytes we have. */ + bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); + mysink->bytes_written = 0; + + /* Release the resources. */ + LZ4F_freeCompressionContext(mysink->ctx); + + /* Pass on the information that this archive has ended. */ + bbsink_forward_end_archive(sink); +} + +/* + * Manifest contents are not compressed, but we do need to copy them into + * the successor sink's buffer, because we have our own. + */ +static void +bbsink_lz4_manifest_contents(bbsink *sink, size_t len) +{ + memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len); + bbsink_manifest_contents(sink->bbs_next, len); +} + +#endif diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index f09aecb53b..84dc305d56 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -264,6 +264,7 @@ extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, extern bbsink *bbsink_copystream_new(bool send_to_client); extern bbsink *bbsink_copytblspc_new(void); extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel); +extern bbsink *bbsink_lz4_new(bbsink *next); extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_server_new(bbsink *next, char *pathname); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate);