From 443297e9b1f842a78cdd37e6f273dbfa7a706897 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Sat, 1 Aug 2020 08:52:48 +0530 Subject: [PATCH v5 1/6] Copy code readjustment to support parallel copy. This patch has the copy code slightly readjusted so that the common code is separated to functions/macros, these functions/macros will be used by the workers in the parallel copy code of the upcoming patches. EOL removal is moved from CopyReadLine to CopyReadLineText, this change was required because in case of parallel copy the record identification and record updation is done in CopyReadLineText, before record information is updated in shared memory the new line characters should be removed. --- src/backend/commands/copy.c | 360 ++++++++++++++++++++++++++------------------ 1 file changed, 217 insertions(+), 143 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 2047557..cf7277a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -95,6 +95,9 @@ typedef enum CopyInsertMethod CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ } CopyInsertMethod; +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ +#define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) + /* * This struct contains all the state variables used throughout a COPY * operation. For simplicity, we use the same struct for all variants of COPY, @@ -224,7 +227,6 @@ typedef struct CopyStateData * appropriate amounts of data from this buffer. In both modes, we * guarantee that there is a \0 at raw_buf[raw_buf_len]. */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ @@ -354,6 +356,27 @@ if (1) \ goto not_end_of_copy; \ } else ((void) 0) +/* + * CLEAR_EOL_LINE - Wrapper for clearing EOL. + */ +#define CLEAR_EOL_LINE() \ +if (!result && !IsHeaderLine()) \ + ClearEOLFromCopiedData(cstate, cstate->line_buf.data, \ + cstate->line_buf.len, \ + &cstate->line_buf.len) \ + +/* + * INCREMENTPROCESSED - Increment the lines processed. + */ +#define INCREMENTPROCESSED(processed) \ +processed++; + +/* + * RETURNPROCESSED - Get the lines processed. + */ +#define RETURNPROCESSED(processed) \ +return processed; + static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; @@ -401,7 +424,11 @@ static bool CopyGetInt16(CopyState cstate, int16 *val); static bool CopyLoadRawBuf(CopyState cstate); static int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); - +static void PopulateCommonCstateInfo(CopyState cstate, TupleDesc tup_desc, + List *attnamelist); +static void ClearEOLFromCopiedData(CopyState cstate, char *copy_line_data, + int copy_line_pos, int *copy_line_size); +static void ConvertToServerEncoding(CopyState cstate); /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. @@ -801,14 +828,18 @@ CopyLoadRawBuf(CopyState cstate) { int nbytes = RAW_BUF_BYTES(cstate); int inbytes; + int minread = 1; /* Copy down the unprocessed data if any. */ if (nbytes > 0) memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, nbytes); + if (cstate->copy_dest == COPY_NEW_FE) + minread = RAW_BUF_SIZE - nbytes; + inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes, - 1, RAW_BUF_SIZE - nbytes); + minread, RAW_BUF_SIZE - nbytes); nbytes += inbytes; cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf_index = 0; @@ -1514,7 +1545,6 @@ BeginCopy(ParseState *pstate, { CopyState cstate; TupleDesc tupDesc; - int num_phys_attrs; MemoryContext oldcontext; /* Allocate workspace and zero all fields */ @@ -1680,6 +1710,23 @@ BeginCopy(ParseState *pstate, tupDesc = cstate->queryDesc->tupDesc; } + PopulateCommonCstateInfo(cstate, tupDesc, attnamelist); + cstate->copy_dest = COPY_FILE; /* default */ + + MemoryContextSwitchTo(oldcontext); + + return cstate; +} + +/* + * PopulateCommonCstateInfo - Populates the common variables required for copy + * from operation. This is a helper function for BeginCopy function. + */ +static void +PopulateCommonCstateInfo(CopyState cstate, TupleDesc tupDesc, List *attnamelist) +{ + int num_phys_attrs; + /* Generate or convert list of attributes to process */ cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); @@ -1799,12 +1846,6 @@ BeginCopy(ParseState *pstate, pg_database_encoding_max_length() > 1); /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - - cstate->copy_dest = COPY_FILE; /* default */ - - MemoryContextSwitchTo(oldcontext); - - return cstate; } /* @@ -2696,32 +2737,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, } /* - * Copy FROM file to relation. + * Check if the relation specified in copy from is valid. */ -uint64 -CopyFrom(CopyState cstate) +static void +CheckTargetRelValidity(CopyState cstate) { - ResultRelInfo *resultRelInfo; - ResultRelInfo *target_resultRelInfo; - ResultRelInfo *prevResultRelInfo = NULL; - EState *estate = CreateExecutorState(); /* for ExecConstraints() */ - ModifyTableState *mtstate; - ExprContext *econtext; - TupleTableSlot *singleslot = NULL; - MemoryContext oldcontext = CurrentMemoryContext; - - PartitionTupleRouting *proute = NULL; - ErrorContextCallback errcallback; - CommandId mycid = GetCurrentCommandId(true); - int ti_options = 0; /* start with default options for insert */ - BulkInsertState bistate = NULL; - CopyInsertMethod insertMethod; - CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ - uint64 processed = 0; - bool has_before_insert_row_trig; - bool has_instead_insert_row_trig; - bool leafpart_use_multi_insert = false; - Assert(cstate->rel); /* @@ -2758,27 +2778,6 @@ CopyFrom(CopyState cstate) RelationGetRelationName(cstate->rel)))); } - /* - * If the target file is new-in-transaction, we assume that checking FSM - * for free space is a waste of time. This could possibly be wrong, but - * it's unlikely. - */ - if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) && - (cstate->rel->rd_createSubid != InvalidSubTransactionId || - cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId)) - ti_options |= TABLE_INSERT_SKIP_FSM; - - /* - * Optimize if new relfilenode was created in this subxact or one of its - * committed children and we won't see those rows later as part of an - * earlier scan or command. The subxact test ensures that if this subxact - * aborts then the frozen rows won't be visible after xact cleanup. Note - * that the stronger test of exactly which subtransaction created it is - * crucial for correctness of this optimization. The test for an earlier - * scan or command tolerates false negatives. FREEZE causes other sessions - * to see rows they would not see under MVCC, and a false negative merely - * spreads that anomaly to the current session. - */ if (cstate->freeze) { /* @@ -2816,9 +2815,61 @@ CopyFrom(CopyState cstate) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction"))); + } +} + +/* + * Copy FROM file to relation. + */ +uint64 +CopyFrom(CopyState cstate) +{ + ResultRelInfo *resultRelInfo; + ResultRelInfo *target_resultRelInfo; + ResultRelInfo *prevResultRelInfo = NULL; + EState *estate = CreateExecutorState(); /* for ExecConstraints() */ + ModifyTableState *mtstate; + ExprContext *econtext; + TupleTableSlot *singleslot = NULL; + MemoryContext oldcontext = CurrentMemoryContext; + + PartitionTupleRouting *proute = NULL; + ErrorContextCallback errcallback; + CommandId mycid = GetCurrentCommandId(true); + int ti_options = 0; /* start with default options for insert */ + BulkInsertState bistate = NULL; + CopyInsertMethod insertMethod; + CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ + uint64 processed = 0; + bool has_before_insert_row_trig; + bool has_instead_insert_row_trig; + bool leafpart_use_multi_insert = false; + + CheckTargetRelValidity(cstate); + /* + * If the target file is new-in-transaction, we assume that checking FSM + * for free space is a waste of time. This could possibly be wrong, but + * it's unlikely. + */ + if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) && + (cstate->rel->rd_createSubid != InvalidSubTransactionId || + cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId)) + ti_options |= TABLE_INSERT_SKIP_FSM; + + /* + * Optimize if new relfilenode was created in this subxact or one of its + * committed children and we won't see those rows later as part of an + * earlier scan or command. The subxact test ensures that if this subxact + * aborts then the frozen rows won't be visible after xact cleanup. Note + * that the stronger test of exactly which subtransaction created it is + * crucial for correctness of this optimization. The test for an earlier + * scan or command tolerates false negatives. FREEZE causes other sessions + * to see rows they would not see under MVCC, and a false negative merely + * spreads that anomaly to the current session. + */ + if (cstate->freeze) ti_options |= TABLE_INSERT_FROZEN; - } /* * We need a ResultRelInfo so we can use the regular executor's @@ -3311,7 +3362,7 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - processed++; + INCREMENTPROCESSED(processed) } } @@ -3366,30 +3417,15 @@ CopyFrom(CopyState cstate) FreeExecutorState(estate); - return processed; + RETURNPROCESSED(processed) } /* - * Setup to read tuples from a file for COPY FROM. - * - * 'rel': Used as a template for the tuples - * 'filename': Name of server-local file to read - * 'attnamelist': List of char *, columns to include. NIL selects all cols. - * 'options': List of DefElem. See copy_opt_item in gram.y for selections. - * - * Returns a CopyState, to be passed to NextCopyFrom and related functions. + * PopulateCstateCatalogInfo - Populate the catalog information. */ -CopyState -BeginCopyFrom(ParseState *pstate, - Relation rel, - const char *filename, - bool is_program, - copy_data_source_cb data_source_cb, - List *attnamelist, - List *options) +static void +PopulateCstateCatalogInfo(CopyState cstate) { - CopyState cstate; - bool pipe = (filename == NULL); TupleDesc tupDesc; AttrNumber num_phys_attrs, num_defaults; @@ -3399,38 +3435,8 @@ BeginCopyFrom(ParseState *pstate, Oid in_func_oid; int *defmap; ExprState **defexprs; - MemoryContext oldcontext; bool volatile_defexprs; - cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options); - oldcontext = MemoryContextSwitchTo(cstate->copycontext); - - /* Initialize state variables */ - cstate->reached_eof = false; - cstate->eol_type = EOL_UNKNOWN; - cstate->cur_relname = RelationGetRelationName(cstate->rel); - cstate->cur_lineno = 0; - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - - /* - * Set up variables to avoid per-attribute overhead. attribute_buf and - * raw_buf are used in both text and binary modes, but we use line_buf - * only in text mode. - */ - initStringInfo(&cstate->attribute_buf); - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); - cstate->raw_buf_index = cstate->raw_buf_len = 0; - if (!cstate->binary) - { - initStringInfo(&cstate->line_buf); - cstate->line_buf_converted = false; - } - - /* Assign range table, we'll need it in CopyFrom. */ - if (pstate) - cstate->range_table = pstate->p_rtable; - tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; num_defaults = 0; @@ -3508,6 +3514,61 @@ BeginCopyFrom(ParseState *pstate, cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; +} + +/* + * Setup to read tuples from a file for COPY FROM. + * + * 'rel': Used as a template for the tuples + * 'filename': Name of server-local file to read + * 'attnamelist': List of char *, columns to include. NIL selects all cols. + * 'options': List of DefElem. See copy_opt_item in gram.y for selections. + * + * Returns a CopyState, to be passed to NextCopyFrom and related functions. + */ +CopyState +BeginCopyFrom(ParseState *pstate, + Relation rel, + const char *filename, + bool is_program, + copy_data_source_cb data_source_cb, + List *attnamelist, + List *options) +{ + CopyState cstate; + bool pipe = (filename == NULL); + MemoryContext oldcontext; + + cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + /* Initialize state variables */ + cstate->reached_eof = false; + cstate->eol_type = EOL_UNKNOWN; + cstate->cur_relname = RelationGetRelationName(cstate->rel); + cstate->cur_lineno = 0; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + + /* + * Set up variables to avoid per-attribute overhead. attribute_buf is + * used in both text and binary modes, but we use line_buf and raw_buf + * only in text mode. + */ + initStringInfo(&cstate->attribute_buf); + cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf_index = cstate->raw_buf_len = 0; + if (!cstate->binary) + { + initStringInfo(&cstate->line_buf); + cstate->line_buf_converted = false; + } + + /* Assign range table, we'll need it in CopyFrom. */ + if (pstate) + cstate->range_table = pstate->p_rtable; + + PopulateCstateCatalogInfo(cstate); cstate->is_program = is_program; if (data_source_cb) @@ -3917,45 +3978,60 @@ CopyReadLine(CopyState cstate) } while (CopyLoadRawBuf(cstate)); } } - else - { - /* - * If we didn't hit EOF, then we must have transferred the EOL marker - * to line_buf along with the data. Get rid of it. - */ - switch (cstate->eol_type) - { - case EOL_NL: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CR: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CRNL: - Assert(cstate->line_buf.len >= 2); - Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len -= 2; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_UNKNOWN: - /* shouldn't get here */ - Assert(false); - break; - } - } + ConvertToServerEncoding(cstate); + return result; +} + +/* + * ClearEOLFromCopiedData - Clear EOL from the copied data. + */ +static void +ClearEOLFromCopiedData(CopyState cstate, char *copy_line_data, + int copy_line_pos, int *copy_line_size) +{ + /* + * If we didn't hit EOF, then we must have transferred the EOL marker + * to line_buf along with the data. Get rid of it. + */ + switch (cstate->eol_type) + { + case EOL_NL: + Assert(*copy_line_size >= 1); + Assert(copy_line_data[copy_line_pos - 1] == '\n'); + copy_line_data[copy_line_pos - 1] = '\0'; + (*copy_line_size)--; + break; + case EOL_CR: + Assert(*copy_line_size >= 1); + Assert(copy_line_data[copy_line_pos - 1] == '\r'); + copy_line_data[copy_line_pos - 1] = '\0'; + (*copy_line_size)--; + break; + case EOL_CRNL: + Assert(*copy_line_size >= 2); + Assert(copy_line_data[copy_line_pos - 2] == '\r'); + Assert(copy_line_data[copy_line_pos - 1] == '\n'); + copy_line_data[copy_line_pos - 2] = '\0'; + *copy_line_size -= 2; + break; + case EOL_UNKNOWN: + /* shouldn't get here */ + Assert(false); + break; + } +} + +/* + * ConvertToServerEncoding - Convert contents to server encoding. + */ +static void +ConvertToServerEncoding(CopyState cstate) +{ /* Done reading the line. Convert it to server encoding. */ if (cstate->need_transcoding) { char *cvt; - cvt = pg_any_to_server(cstate->line_buf.data, cstate->line_buf.len, cstate->file_encoding); @@ -3967,11 +4043,8 @@ CopyReadLine(CopyState cstate) pfree(cvt); } } - /* Now it's safe to use the buffer in error messages */ cstate->line_buf_converted = true; - - return result; } /* @@ -4334,6 +4407,7 @@ not_end_of_copy: * Transfer any still-uncopied data to line_buf. */ REFILL_LINEBUF; + CLEAR_EOL_LINE(); return result; } -- 1.8.3.1