From 97204eb6abafe891a654b34ff84cf9812e6c1fef Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 10 Jun 2020 06:07:17 +0530 Subject: [PATCH 1/4] Copy code readjustment to support parallel copy. This patch has the copy code slightly readjusted so that the common code is separated to functions/macros, these functions/macros will be used by the workers in the parallel copy code of the upcoming patches. EOL removal is moved from CopyReadLine to CopyReadLineText, this change was required because in case of parallel copy the chunk identification and chunk updation is done in CopyReadLineText, before chunk information is updated in shared memory the new line characters should be removed. --- src/backend/commands/copy.c | 320 ++++++++++++++++++++++++++------------------ 1 file changed, 191 insertions(+), 129 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 6d53dc4..eaf0f78 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -95,6 +95,9 @@ typedef enum CopyInsertMethod CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ } CopyInsertMethod; +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ +#define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) + /* * This struct contains all the state variables used throughout a COPY * operation. For simplicity, we use the same struct for all variants of COPY, @@ -219,7 +222,6 @@ typedef struct CopyStateData * converts it. Note: we guarantee that there is a \0 at * raw_buf[raw_buf_len]. */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ @@ -347,6 +349,88 @@ if (1) \ goto not_end_of_copy; \ } else ((void) 0) +/* + * CONVERT_TO_SERVER_ENCODING - convert contents to server encoding. + */ +#define CONVERT_TO_SERVER_ENCODING(cstate) \ +{ \ + /* Done reading the line. Convert it to server encoding. */ \ + if (cstate->need_transcoding) \ + { \ + char *cvt; \ + cvt = pg_any_to_server(cstate->line_buf.data, \ + cstate->line_buf.len, \ + cstate->file_encoding); \ + if (cvt != cstate->line_buf.data) \ + { \ + /* transfer converted data back to line_buf */ \ + resetStringInfo(&cstate->line_buf); \ + appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); \ + pfree(cvt); \ + } \ + } \ + /* Now it's safe to use the buffer in error messages */ \ + cstate->line_buf_converted = true; \ +} + +/* + * CLEAR_EOL_FROM_COPIED_DATA - Clear EOL from the copied data. + */ +#define CLEAR_EOL_FROM_COPIED_DATA(copy_line_data, copy_line_pos, copy_line_size) \ +{ \ + /* \ + * If we didn't hit EOF, then we must have transferred the EOL marker \ + * to line_buf along with the data. Get rid of it. \ + */ \ + switch (cstate->eol_type) \ + { \ + case EOL_NL: \ + Assert(copy_line_size >= 1); \ + Assert(copy_line_data[copy_line_pos - 1] == '\n'); \ + copy_line_data[copy_line_pos - 1] = '\0'; \ + copy_line_size--; \ + break; \ + case EOL_CR: \ + Assert(copy_line_size >= 1); \ + Assert(copy_line_data[copy_line_pos - 1] == '\r'); \ + copy_line_data[copy_line_pos - 1] = '\0'; \ + copy_line_size--; \ + break; \ + case EOL_CRNL: \ + Assert(copy_line_size >= 2); \ + Assert(copy_line_data[copy_line_pos - 2] == '\r'); \ + Assert(copy_line_data[copy_line_pos - 1] == '\n'); \ + copy_line_data[copy_line_pos - 2] = '\0'; \ + copy_line_size -= 2; \ + break; \ + case EOL_UNKNOWN: \ + /* shouldn't get here */ \ + Assert(false); \ + break; \ + } \ +} + +/* + * CLEAR_EOL_LINE - Wrapper for clearing EOL. + */ +#define CLEAR_EOL_LINE() \ +if (!result && !IsHeaderLine()) \ + CLEAR_EOL_FROM_COPIED_DATA(cstate->line_buf.data, \ + cstate->line_buf.len, \ + cstate->line_buf.len) \ + +/* + * INCREMENTPROCESSED - Increment the lines processed. + */ +#define INCREMENTPROCESSED(processed) \ +processed++; + +/* + * GETPROCESSED - Get the lines processed. + */ +#define GETPROCESSED(processed) \ +return processed; + static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; @@ -393,6 +477,8 @@ static bool CopyGetInt32(CopyState cstate, int32 *val); static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); +static void PopulateAttributes(CopyState cstate, TupleDesc tup_desc, + List *attnamelist); /* * Send copy start/stop messages for frontend copies. These have changed @@ -1464,7 +1550,6 @@ BeginCopy(ParseState *pstate, { CopyState cstate; TupleDesc tupDesc; - int num_phys_attrs; MemoryContext oldcontext; /* Allocate workspace and zero all fields */ @@ -1630,6 +1715,22 @@ BeginCopy(ParseState *pstate, tupDesc = cstate->queryDesc->tupDesc; } + PopulateAttributes(cstate, tupDesc, attnamelist); + cstate->copy_dest = COPY_FILE; /* default */ + + MemoryContextSwitchTo(oldcontext); + + return cstate; +} + +/* + * PopulateAttributes - Populate the attributes. + */ +static void +PopulateAttributes(CopyState cstate, TupleDesc tupDesc, List *attnamelist) +{ + int num_phys_attrs; + /* Generate or convert list of attributes to process */ cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); @@ -1749,12 +1850,6 @@ BeginCopy(ParseState *pstate, pg_database_encoding_max_length() > 1); /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - - cstate->copy_dest = COPY_FILE; /* default */ - - MemoryContextSwitchTo(oldcontext); - - return cstate; } /* @@ -2647,32 +2742,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, } /* - * Copy FROM file to relation. + * Check if the relation specified in copy from is valid. */ -uint64 -CopyFrom(CopyState cstate) +static void +CheckCopyFromValidity(CopyState cstate) { - ResultRelInfo *resultRelInfo; - ResultRelInfo *target_resultRelInfo; - ResultRelInfo *prevResultRelInfo = NULL; - EState *estate = CreateExecutorState(); /* for ExecConstraints() */ - ModifyTableState *mtstate; - ExprContext *econtext; - TupleTableSlot *singleslot = NULL; - MemoryContext oldcontext = CurrentMemoryContext; - - PartitionTupleRouting *proute = NULL; - ErrorContextCallback errcallback; - CommandId mycid = GetCurrentCommandId(true); - int ti_options = 0; /* start with default options for insert */ - BulkInsertState bistate = NULL; - CopyInsertMethod insertMethod; - CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ - uint64 processed = 0; - bool has_before_insert_row_trig; - bool has_instead_insert_row_trig; - bool leafpart_use_multi_insert = false; - Assert(cstate->rel); /* @@ -2708,6 +2782,36 @@ CopyFrom(CopyState cstate) errmsg("cannot copy to non-table relation \"%s\"", RelationGetRelationName(cstate->rel)))); } +} + +/* + * Copy FROM file to relation. + */ +uint64 +CopyFrom(CopyState cstate) +{ + ResultRelInfo *resultRelInfo; + ResultRelInfo *target_resultRelInfo; + ResultRelInfo *prevResultRelInfo = NULL; + EState *estate = CreateExecutorState(); /* for ExecConstraints() */ + ModifyTableState *mtstate; + ExprContext *econtext; + TupleTableSlot *singleslot = NULL; + MemoryContext oldcontext = CurrentMemoryContext; + + PartitionTupleRouting *proute = NULL; + ErrorContextCallback errcallback; + CommandId mycid = GetCurrentCommandId(true); + int ti_options = 0; /* start with default options for insert */ + BulkInsertState bistate = NULL; + CopyInsertMethod insertMethod; + CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ + uint64 processed = 0; + bool has_before_insert_row_trig; + bool has_instead_insert_row_trig; + bool leafpart_use_multi_insert = false; + + CheckCopyFromValidity(cstate); /* * If the target file is new-in-transaction, we assume that checking FSM @@ -3262,7 +3366,7 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - processed++; + INCREMENTPROCESSED(processed) } } @@ -3317,30 +3421,15 @@ CopyFrom(CopyState cstate) FreeExecutorState(estate); - return processed; + GETPROCESSED(processed) } /* - * Setup to read tuples from a file for COPY FROM. - * - * 'rel': Used as a template for the tuples - * 'filename': Name of server-local file to read - * 'attnamelist': List of char *, columns to include. NIL selects all cols. - * 'options': List of DefElem. See copy_opt_item in gram.y for selections. - * - * Returns a CopyState, to be passed to NextCopyFrom and related functions. + * PopulateCatalogInformation - populate the catalog information. */ -CopyState -BeginCopyFrom(ParseState *pstate, - Relation rel, - const char *filename, - bool is_program, - copy_data_source_cb data_source_cb, - List *attnamelist, - List *options) +static void +PopulateCatalogInformation(CopyState cstate) { - CopyState cstate; - bool pipe = (filename == NULL); TupleDesc tupDesc; AttrNumber num_phys_attrs, num_defaults; @@ -3350,31 +3439,8 @@ BeginCopyFrom(ParseState *pstate, Oid in_func_oid; int *defmap; ExprState **defexprs; - MemoryContext oldcontext; bool volatile_defexprs; - cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options); - oldcontext = MemoryContextSwitchTo(cstate->copycontext); - - /* Initialize state variables */ - cstate->reached_eof = false; - cstate->eol_type = EOL_UNKNOWN; - cstate->cur_relname = RelationGetRelationName(cstate->rel); - cstate->cur_lineno = 0; - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - - /* Set up variables to avoid per-attribute overhead. */ - initStringInfo(&cstate->attribute_buf); - initStringInfo(&cstate->line_buf); - cstate->line_buf_converted = false; - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); - cstate->raw_buf_index = cstate->raw_buf_len = 0; - - /* Assign range table, we'll need it in CopyFrom. */ - if (pstate) - cstate->range_table = pstate->p_rtable; - tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; num_defaults = 0; @@ -3452,6 +3518,54 @@ BeginCopyFrom(ParseState *pstate, cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; +} + +/* + * Setup to read tuples from a file for COPY FROM. + * + * 'rel': Used as a template for the tuples + * 'filename': Name of server-local file to read + * 'attnamelist': List of char *, columns to include. NIL selects all cols. + * 'options': List of DefElem. See copy_opt_item in gram.y for selections. + * + * Returns a CopyState, to be passed to NextCopyFrom and related functions. + */ +CopyState +BeginCopyFrom(ParseState *pstate, + Relation rel, + const char *filename, + bool is_program, + copy_data_source_cb data_source_cb, + List *attnamelist, + List *options) +{ + CopyState cstate; + bool pipe = (filename == NULL); + MemoryContext oldcontext; + + cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + /* Initialize state variables */ + cstate->reached_eof = false; + cstate->eol_type = EOL_UNKNOWN; + cstate->cur_relname = RelationGetRelationName(cstate->rel); + cstate->cur_lineno = 0; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + + /* Set up variables to avoid per-attribute overhead. */ + initStringInfo(&cstate->attribute_buf); + initStringInfo(&cstate->line_buf); + cstate->line_buf_converted = false; + cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf_index = cstate->raw_buf_len = 0; + + /* Assign range table, we'll need it in CopyFrom. */ + if (pstate) + cstate->range_table = pstate->p_rtable; + + PopulateCatalogInformation(cstate); cstate->is_program = is_program; if (data_source_cb) @@ -3839,7 +3953,6 @@ static bool CopyReadLine(CopyState cstate) { bool result; - resetStringInfo(&cstate->line_buf); cstate->line_buf_valid = true; @@ -3864,60 +3977,8 @@ CopyReadLine(CopyState cstate) } while (CopyLoadRawBuf(cstate)); } } - else - { - /* - * If we didn't hit EOF, then we must have transferred the EOL marker - * to line_buf along with the data. Get rid of it. - */ - switch (cstate->eol_type) - { - case EOL_NL: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CR: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CRNL: - Assert(cstate->line_buf.len >= 2); - Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len -= 2; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_UNKNOWN: - /* shouldn't get here */ - Assert(false); - break; - } - } - - /* Done reading the line. Convert it to server encoding. */ - if (cstate->need_transcoding) - { - char *cvt; - - cvt = pg_any_to_server(cstate->line_buf.data, - cstate->line_buf.len, - cstate->file_encoding); - if (cvt != cstate->line_buf.data) - { - /* transfer converted data back to line_buf */ - resetStringInfo(&cstate->line_buf); - appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); - pfree(cvt); - } - } - - /* Now it's safe to use the buffer in error messages */ - cstate->line_buf_converted = true; + CONVERT_TO_SERVER_ENCODING(cstate) return result; } @@ -4281,6 +4342,7 @@ not_end_of_copy: * Transfer any still-uncopied data to line_buf. */ REFILL_LINEBUF; + CLEAR_EOL_LINE() return result; } -- 1.8.3.1