From aaae714e3a9c30dffd8bf51fa3b3039c0ba7ab70 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 13 Jul 2020 06:33:56 +0530 Subject: [PATCH v3] Performance Improvement For Copy From Binary Files Read data from binary file in RAW_BUF_SIZE bytes at a time for COPY FROM command, instead of reading everytime from file for field count, size and field data. This avoids fread calls significantly and improves performance of the COPY FROM binary files command. --- src/backend/commands/copy.c | 132 +++++++++++++++++++++--------------- 1 file changed, 78 insertions(+), 54 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 99d1457180..2e34d1f84a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -195,7 +195,7 @@ typedef struct CopyStateData * the buffer on each cycle. * * (In binary COPY FROM, attribute_buf holds the binary data for the - * current field, while the other variables are not used.) + * current field and raw_buf holds the raw data read from file.) */ StringInfoData attribute_buf; @@ -370,6 +370,7 @@ static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); static int CopyReadAttributesCSV(CopyState cstate); +static int CopyReadFromRawBuf(CopyState cstate, char *dest, int nbytes); static Datum CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull); @@ -391,9 +392,7 @@ static void CopySendEndOfRow(CopyState cstate); static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread); static void CopySendInt32(CopyState cstate, int32 val); -static bool CopyGetInt32(CopyState cstate, int32 *val); static void CopySendInt16(CopyState cstate, int16 val); -static bool CopyGetInt16(CopyState cstate, int16 *val); /* @@ -732,25 +731,6 @@ CopySendInt32(CopyState cstate, int32 val) CopySendData(cstate, &buf, sizeof(buf)); } -/* - * CopyGetInt32 reads an int32 that appears in network byte order - * - * Returns true if OK, false if EOF - */ -static bool -CopyGetInt32(CopyState cstate, int32 *val) -{ - uint32 buf; - - if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) - { - *val = 0; /* suppress compiler warning */ - return false; - } - *val = (int32) pg_ntoh32(buf); - return true; -} - /* * CopySendInt16 sends an int16 in network byte order */ @@ -763,23 +743,6 @@ CopySendInt16(CopyState cstate, int16 val) CopySendData(cstate, &buf, sizeof(buf)); } -/* - * CopyGetInt16 reads an int16 that appears in network byte order - */ -static bool -CopyGetInt16(CopyState cstate, int16 *val) -{ - uint16 buf; - - if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) - { - *val = 0; /* suppress compiler warning */ - return false; - } - *val = (int16) pg_ntoh16(buf); - return true; -} - /* * CopyLoadRawBuf loads some more data into raw_buf @@ -816,6 +779,61 @@ CopyLoadRawBuf(CopyState cstate) return (inbytes > 0); } +/* + * CopyReadFromRawBuf + * Reads 'nbytes' bytes from cstate->copy_file via cstate->raw_buf and + * writes then to 'dest' + * + * Useful when reading binary data from the file. + */ +#define DRAIN_COPY_RAW_BUF(cstate, dest, nbytes)\ + do {\ + memcpy((dest), (cstate)->raw_buf + (cstate)->raw_buf_index, (nbytes));\ + (cstate)->raw_buf_index += (nbytes);\ + } while(0) + +#define BUF_BYTES (cstate->raw_buf_len - cstate->raw_buf_index) + +static int +CopyReadFromRawBuf(CopyState cstate, char *dest, int nbytes) +{ + int copied_bytes = 0; + + if (BUF_BYTES >= nbytes) + { + /* Enough bytes are present in the buffer. */ + DRAIN_COPY_RAW_BUF(cstate, dest, nbytes); + copied_bytes = nbytes; + } + else + { + /* + * Not enough bytes in the buffer, so must read from the file. Need + * the loop considering that 'nbytes' may be larger than the maximum + * bytes that the buffer can hold. + */ + do + { + int copy_bytes; + + /* + * This tries to read up to RAW_BUF_SIZE bytes into raw_buf, + * returning if no more were read. + */ + if (BUF_BYTES == 0 && !CopyLoadRawBuf(cstate)) + return copied_bytes; + + copy_bytes = Min(nbytes - copied_bytes, BUF_BYTES); + DRAIN_COPY_RAW_BUF(cstate, dest, copy_bytes); + dest += copy_bytes; + copied_bytes += copy_bytes; + } while (copied_bytes < nbytes); + } + + return copied_bytes; +} +#undef DRAIN_COPY_RAW_BUF +#undef BUF_BYTES /* * DoCopy executes the SQL COPY statement @@ -3363,17 +3381,17 @@ BeginCopyFrom(ParseState *pstate, cstate->cur_attval = NULL; /* - * Set up variables to avoid per-attribute overhead. attribute_buf is - * used in both text and binary modes, but we use line_buf and raw_buf - * only in text mode. + * Set up variables to avoid per-attribute overhead. line_buf + * is not used in binary mode. */ initStringInfo(&cstate->attribute_buf); + cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf_index = cstate->raw_buf_len = 0; + if (!cstate->binary) { initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); - cstate->raw_buf_index = cstate->raw_buf_len = 0; } /* Assign range table, we'll need it in CopyFrom. */ @@ -3524,16 +3542,18 @@ BeginCopyFrom(ParseState *pstate, int32 tmp; /* Signature */ - if (CopyGetData(cstate, readSig, 11, 11) != 11 || + if (CopyReadFromRawBuf(cstate, readSig, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("COPY file signature not recognized"))); /* Flags field */ - if (!CopyGetInt32(cstate, &tmp)) + if (CopyReadFromRawBuf(cstate, (char *) &tmp, sizeof(tmp)) != + sizeof(tmp)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); + tmp = (int32) pg_ntoh32(tmp); if ((tmp & (1 << 16)) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), @@ -3544,15 +3564,15 @@ BeginCopyFrom(ParseState *pstate, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unrecognized critical flags in COPY file header"))); /* Header extension length */ - if (!CopyGetInt32(cstate, &tmp) || - tmp < 0) + if (CopyReadFromRawBuf(cstate, (char *) &tmp, sizeof(tmp)) != + sizeof(tmp) || (tmp = (int32) pg_ntoh32(tmp)) < 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing length)"))); /* Skip extension header, if present */ while (tmp-- > 0) { - if (CopyGetData(cstate, readSig, 1, 1) != 1) + if (CopyReadFromRawBuf(cstate, readSig, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); @@ -3745,12 +3765,14 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, cstate->cur_lineno++; - if (!CopyGetInt16(cstate, &fld_count)) + if (CopyReadFromRawBuf(cstate, (char *) &fld_count, + sizeof(fld_count)) != sizeof(fld_count)) { /* EOF detected (end of file, or protocol-level EOF) */ return false; } + fld_count = (int16) pg_ntoh16(fld_count); if (fld_count == -1) { /* @@ -3768,7 +3790,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, char dummy; if (cstate->copy_dest != COPY_OLD_FE && - CopyGetData(cstate, &dummy, 1, 1) > 0) + CopyReadFromRawBuf(cstate, &dummy, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("received copy data after EOF marker"))); @@ -4723,10 +4745,12 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, int32 fld_size; Datum result; - if (!CopyGetInt32(cstate, &fld_size)) + if (CopyReadFromRawBuf(cstate, (char *) &fld_size, sizeof(fld_size)) != + sizeof(fld_size)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); + fld_size = (int32) pg_ntoh32(fld_size); if (fld_size == -1) { *isnull = true; @@ -4741,8 +4765,8 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, resetStringInfo(&cstate->attribute_buf); enlargeStringInfo(&cstate->attribute_buf, fld_size); - if (CopyGetData(cstate, cstate->attribute_buf.data, - fld_size, fld_size) != fld_size) + if (CopyReadFromRawBuf(cstate, cstate->attribute_buf.data, + fld_size) != fld_size) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); -- 2.25.1