From 8dc40e5d290edd954b7914d3f8abe3de22b1667d Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Tue, 22 Sep 2020 13:43:10 +0530 Subject: [PATCH v5 6/6] Parallel Copy For Binary Format Files Leader reads data from the file into the DSM data blocks each of 64K size. It also identifies each tuple data block id, start offset, end offset, tuple size and updates this information in the ring data structure. Workers parallelly read the tuple information from the ring data structure, the actual tuple data from the data blocks and parallelly insert the tuples into the table. --- src/backend/commands/copy.c | 681 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 597 insertions(+), 84 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index ba188d7..5b1884a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -266,6 +266,17 @@ typedef struct ParallelCopyLineBuf } ParallelCopyLineBuf; /* + * Represents the usage mode for CopyReadBinaryGetDataBlock. + */ +typedef enum FieldInfoType +{ + FIELD_NONE = 0, + FIELD_COUNT, + FIELD_SIZE, + FIELD_DATA +} FieldInfoType; + +/* * Parallel copy data information. */ typedef struct ParallelCopyData @@ -286,6 +297,9 @@ typedef struct ParallelCopyData /* Current position in worker_line_buf */ uint32 worker_line_buf_pos; + + /* For binary formatted files */ + ParallelCopyDataBlock *curr_data_block; } ParallelCopyData; /* @@ -450,6 +464,7 @@ typedef struct SerializedParallelCopyState /* Working state for COPY FROM */ AttrNumber num_defaults; Oid relid; + bool binary; } SerializedParallelCopyState; /* DestReceiver for COPY (query) TO */ @@ -524,7 +539,6 @@ typedef struct CopyMultiInsertInfo int ti_options; /* table insert options */ } CopyMultiInsertInfo; - /* * These macros centralize code used to process line_buf and raw_buf buffers. * They are macros because they often do continue/break control and to avoid @@ -652,11 +666,113 @@ if (!IsParallelCopy()) \ else \ return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); +/* + * CHECK_FIELD_COUNT - Handles the error cases for field count + * for binary format files. + */ +#define CHECK_FIELD_COUNT \ +{\ + if (fld_count == -1) \ + { \ + if (IsParallelCopy() && \ + !IsLeader()) \ + return true; \ + else if (IsParallelCopy() && \ + IsLeader()) \ + { \ + if (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index + sizeof(fld_count)] != 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + return true; \ + } \ + else \ + { \ + /* \ + * Received EOF marker. In a V3-protocol copy, wait for the \ + * protocol-level EOF, and complain if it doesn't come \ + * immediately. This ensures that we correctly handle CopyFail, \ + * if client chooses to send that now. \ + * \ + * Note that we MUST NOT try to read more data in an old-protocol \ + * copy, since there is no protocol-level EOF marker then. We \ + * could go either way for copy from file, but choose to throw \ + * error if there's data after the EOF marker, for consistency \ + * with the new-protocol case. \ + */ \ + char dummy; \ + if (cstate->copy_dest != COPY_OLD_FE && \ + CopyReadBinaryData(cstate, &dummy, 1) > 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + return false; \ + } \ + } \ + if (fld_count != cstate->max_fields) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("row field count is %d, expected %d", \ + (int) fld_count, cstate->max_fields))); \ +} + +/* + * CHECK_FIELD_SIZE - Handles the error case for field size + * for binary format files. + */ +#define CHECK_FIELD_SIZE(fld_size) \ +{ \ + if (fld_size < -1) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("invalid field size")));\ +} + +/* + * EOF_ERROR - Error statement for EOF for binary format + * files. + */ +#define EOF_ERROR \ +{ \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("unexpected EOF in COPY data")));\ +} + +/* + * GET_RAW_BUF_INDEX - Calculates the raw buf index for the cases + * where the data spread is across multiple data blocks. + */ +#define GET_RAW_BUF_INDEX(raw_buf_index, fld_size, required_blks, curr_blk_bytes) \ +{ \ + raw_buf_index = fld_size - (((required_blks - 1) * DATA_BLOCK_SIZE) + curr_blk_bytes); \ +} + +/* + * GET_REQUIRED_BLOCKS - Calculates the number of data + * blocks required for the cases where the data spread + * is across multiple data blocks. + */ +#define GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes) \ +{ \ + /* \ + * field size can spread across multiple data blocks, \ + * calculate the number of required data blocks and try to get \ + * those many data blocks. \ + */ \ + required_blks = (int32)(fld_size - curr_blk_bytes)/(int32)DATA_BLOCK_SIZE; \ + /* \ + * check if we need the data block for the field data \ + * bytes that are not modulus of data block size. \ + */ \ + if ((fld_size - curr_blk_bytes)%DATA_BLOCK_SIZE != 0) \ + required_blks++; \ +} + /* End parallel copy Macros */ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; - /* non-export function prototypes */ static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, RawStmt *raw_query, Oid queryRelId, List *attnamelist, @@ -711,6 +827,13 @@ static void ExecBeforeStmtTrigger(CopyState cstate); static void CheckTargetRelValidity(CopyState cstate); static void PopulateCstateCatalogInfo(CopyState cstate); static pg_attribute_always_inline uint32 GetLinePosition(CopyState cstate); +static uint32 WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info); +static bool CopyReadBinaryTupleLeader(CopyState cstate); +static void CopyReadBinaryFindTupleSize(CopyState cstate, uint32 *line_size); +static bool CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls); +static Datum CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull); +static void CopyReadBinaryGetDataBlock(CopyState cstate, FieldInfoType field_info); /* * SerializeParallelCopyState - Copy shared_cstate using cstate information. @@ -729,6 +852,7 @@ SerializeParallelCopyState(CopyState cstate, SerializedParallelCopyState *shared shared_cstate->convert_selectively = cstate->convert_selectively; shared_cstate->num_defaults = cstate->num_defaults; shared_cstate->relid = cstate->pcdata->relid; + shared_cstate->binary = cstate->binary; } /* @@ -888,8 +1012,8 @@ FindInsertMethod(CopyState cstate) static pg_attribute_always_inline bool IsParallelCopyAllowed(CopyState cstate) { - /* Parallel copy not allowed for frontend (2.0 protocol) & binary option. */ - if ((cstate->copy_dest == COPY_OLD_FE) || cstate->binary) + /* Parallel copy not allowed for frontend (2.0 protocol). */ + if (cstate->copy_dest == COPY_OLD_FE) return false; /* Check if copy is into foreign table or temporary table. */ @@ -1159,6 +1283,7 @@ InitializeParallelCopyInfo(SerializedParallelCopyState *shared_cstate, cstate->convert_selectively = shared_cstate->convert_selectively; cstate->num_defaults = shared_cstate->num_defaults; pcdata->relid = shared_cstate->relid; + cstate->binary = shared_cstate->binary; PopulateCommonCstateInfo(cstate, tup_desc, attnamelist); @@ -1554,32 +1679,66 @@ ParallelCopyFrom(CopyState cstate) /* Execute the before statement triggers from the leader */ ExecBeforeStmtTrigger(cstate); - /* On input just throw the header line away. */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (!cstate->binary) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) { - pcshared_info->is_read_in_progress = false; - return; /* done */ + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } } - } - for (;;) - { - bool done; - cstate->cur_lineno++; + for (;;) + { + bool done; + cstate->cur_lineno++; - /* Actually read the line into memory here. */ - done = CopyReadLine(cstate); + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + } + else + { /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. + * Binary Format Files. + * For parallel copy leader, fill in the error + * context information here, in case any failures + * while determining tuple offsets, leader + * would throw the errors with proper context. */ - if (done && cstate->line_buf.len == 0) - break; + ErrorContextCallback errcallback; + errcallback.callback = CopyFromErrorCallback; + errcallback.arg = (void *) cstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + cstate->pcdata->curr_data_block = NULL; + cstate->raw_buf_index = 0; + pcshared_info->populated = 0; + cstate->cur_lineno = 0; + cstate->max_fields = list_length(cstate->attnumlist); + + for (;;) + { + bool eof = false; + cstate->cur_lineno++; + + eof = CopyReadBinaryTupleLeader(cstate); + + if (eof) + break; + } } pcshared_info->is_read_in_progress = false; @@ -1587,6 +1746,354 @@ ParallelCopyFrom(CopyState cstate) } /* + * CopyReadBinaryGetDataBlock - Gets a new block, updates + * the current offset, calculates the skip bytes. + */ +static void +CopyReadBinaryGetDataBlock(CopyState cstate, FieldInfoType field_info) +{ + ParallelCopyDataBlock *data_block = NULL; + ParallelCopyDataBlock *curr_data_block = cstate->pcdata->curr_data_block; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint8 move_bytes = 0; + uint32 block_pos; + uint32 prev_block_pos; + int read_bytes = 0; + + prev_block_pos = pcshared_info->cur_block_pos; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + if (field_info == FIELD_SIZE || field_info == FIELD_COUNT) + move_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + + if (curr_data_block != NULL) + curr_data_block->skip_bytes = move_bytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (move_bytes > 0 && curr_data_block != NULL) + memmove(&data_block->data[0], &curr_data_block->data[cstate->raw_buf_index], move_bytes); + + elog(DEBUG1, "LEADER - field info %d is spread across data blocks - moved %d bytes from current block %u to %u block", + field_info, move_bytes, prev_block_pos, block_pos); + + read_bytes = CopyGetData(cstate, &data_block->data[move_bytes], 1, (DATA_BLOCK_SIZE - move_bytes)); + + if (field_info == FIELD_NONE && cstate->reached_eof) + return; + + if (cstate->reached_eof) + EOF_ERROR; + + elog(DEBUG1, "LEADER - bytes read from file %d", read_bytes); + + if(field_info == FIELD_SIZE || field_info == FIELD_DATA) + { + ParallelCopyDataBlock *prev_data_block = NULL; + prev_data_block = curr_data_block; + prev_data_block->following_block = block_pos; + + if (prev_data_block->curr_blk_completed == false) + prev_data_block->curr_blk_completed = true; + + pg_atomic_add_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + } + + cstate->pcdata->curr_data_block = data_block; + cstate->raw_buf_index = 0; +} + +/* + * CopyReadBinaryTupleLeader - Leader reads data from binary formatted file + * to data blocks and identifies tuple boundaries/offsets so that workers + * can work on the data blocks data. + */ +static bool +CopyReadBinaryTupleLeader(CopyState cstate) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + int16 fld_count; + uint32 line_size = 0; + uint32 start_block_pos; + uint32 start_offset; + + if (cstate->pcdata->curr_data_block == NULL) + { + CopyReadBinaryGetDataBlock(cstate, FIELD_NONE); + + /* + * no data is read from file here. one possibility + * to be here could be that the binary file just + * has a valid signature but nothing else. + */ + if (cstate->reached_eof) + return true; + } + + if ((cstate->raw_buf_index + sizeof(fld_count)) >= DATA_BLOCK_SIZE) + CopyReadBinaryGetDataBlock(cstate, FIELD_COUNT); + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + CHECK_FIELD_COUNT; + start_offset = cstate->raw_buf_index; + cstate->raw_buf_index += sizeof(fld_count); + line_size += sizeof(fld_count); + start_block_pos = pcshared_info->cur_block_pos; + + CopyReadBinaryFindTupleSize(cstate, &line_size); + + pg_atomic_add_fetch_u32(&cstate->pcdata->curr_data_block->unprocessed_line_parts, 1); + + if (line_size > 0) + { + int line_pos = UpdateBlockInLineInfo(cstate, + start_block_pos, + start_offset, + line_size, + LINE_LEADER_POPULATED); + + pcshared_info->populated++; + elog(DEBUG1, "LEADER - adding - block:%u, offset:%u, line size:%u line position:%d", + start_block_pos, start_offset, line_size, line_pos); + } + + return false; +} + +/* + * CopyReadBinaryFindTupleSize - Leader identifies boundaries/ + * offsets for each attribute/column and finally results in the + * tuple/row size. It moves on to next data block if the attribute/ + * column is spread across data blocks. + */ +static void +CopyReadBinaryFindTupleSize(CopyState cstate, uint32 *line_size) +{ + int32 fld_size; + ListCell *cur; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + foreach(cur, cstate->attnumlist) + { + int att_num = lfirst_int(cur); + Form_pg_attribute att = TupleDescAttr(tup_desc, (att_num - 1)); + cstate->cur_attname = NameStr(att->attname); + fld_size = 0; + + if ((cstate->raw_buf_index + sizeof(fld_size)) >= DATA_BLOCK_SIZE) + CopyReadBinaryGetDataBlock(cstate, FIELD_SIZE); + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + cstate->raw_buf_index += sizeof(fld_size); + *line_size += sizeof(fld_size); + fld_size = (int32) pg_ntoh32(fld_size); + + CHECK_FIELD_SIZE(fld_size); + + *line_size += fld_size; + + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) + { + cstate->raw_buf_index += fld_size; + elog(DEBUG1, "LEADER - tuple lies in he same data block"); + } + else + { + int32 required_blks = 0; + int32 curr_blk_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + int i = 0; + + GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes); + + i = required_blks; + + while(i > 0) + { + CopyReadBinaryGetDataBlock(cstate, FIELD_DATA); + i--; + } + + GET_RAW_BUF_INDEX(cstate->raw_buf_index, fld_size, required_blks, curr_blk_bytes); + + /* + * raw_buf_index should never cross data block size, + * as the required number of data blocks would have + * been obtained in the above while loop. + */ + Assert(cstate->raw_buf_index <= DATA_BLOCK_SIZE); + } + cstate->cur_attname = NULL; + } +} + +/* + * CopyReadBinaryTupleWorker - Each worker reads data from data blocks after + * getting leader-identified tuple offsets from ring data structure. + */ +static bool +CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls) +{ + uint32 line_pos; + ParallelCopyLineBoundary *line_info; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + int16 fld_count; + ListCell *cur; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + line_pos = GetLinePosition(cstate); + + if (line_pos == -1) + return true; + + line_info = &pcshared_info->line_boundaries.ring[line_pos]; + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[line_info->first_block]; + cstate->raw_buf_index = line_info->start_offset; + + if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE) + { + /* + * The case where field count spread across datablocks should never occur, + * as the leader would have moved it to next block. this code exists for + * debugging purposes only. + */ + elog(DEBUG1, "WORKER - field count spread across datablocks should never occur"); + } + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + + CHECK_FIELD_COUNT; + + cstate->raw_buf_index += sizeof(fld_count); + + foreach(cur, cstate->attnumlist) + { + int att_num = lfirst_int(cur); + int m = att_num - 1; + Form_pg_attribute att = TupleDescAttr(tup_desc, m); + cstate->cur_attname = NameStr(att->attname); + + values[m] = CopyReadBinaryAttributeWorker(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + pg_atomic_sub_fetch_u32(&cstate->pcdata->curr_data_block->unprocessed_line_parts, 1); + line_info->start_offset = -1; + pg_atomic_write_u32(&line_info->line_state, LINE_WORKER_PROCESSED); + pg_atomic_write_u32(&line_info->line_size, -1); + pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); + + return false; +} + +/* + * CopyReadBinaryAttributeWorker - Leader identifies boundaries/offsets + * for each attribute/column, it moves on to next data block if the + * attribute/column is spread across data blocks. + */ +static Datum +CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull) +{ + int32 fld_size; + Datum result; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + + if ((cstate->raw_buf_index + sizeof(fld_size)) >= DATA_BLOCK_SIZE) + { + ParallelCopyDataBlock *prev_data_block = cstate->pcdata->curr_data_block; + elog(DEBUG1, "WORKER - field size is spread across data blocks"); + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[prev_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + cstate->raw_buf_index = 0; + } + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + fld_size = (int32) pg_ntoh32(fld_size); + + CHECK_FIELD_SIZE(fld_size); + + cstate->raw_buf_index += sizeof(fld_size); + + /* reset attribute_buf to empty, and load raw data in it */ + resetStringInfo(&cstate->attribute_buf); + + enlargeStringInfo(&cstate->attribute_buf, fld_size); + + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) + { + elog(DEBUG1, "WORKER - tuple lies in single data block"); + memcpy(&cstate->attribute_buf.data[0],&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], fld_size); + cstate->raw_buf_index += fld_size; + } + else + { + uint32 att_buf_idx = 0; + uint32 copy_bytes = 0; + int32 required_blks = 0; + int32 curr_blk_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + ParallelCopyDataBlock *prev_data_block = NULL; + int i = 0; + + GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes); + + i = required_blks; + prev_data_block = cstate->pcdata->curr_data_block; + elog(DEBUG1, "WORKER - tuple is spread across data blocks"); + memcpy(&cstate->attribute_buf.data[0], &prev_data_block->data[cstate->raw_buf_index], + curr_blk_bytes); + copy_bytes = curr_blk_bytes; + att_buf_idx = curr_blk_bytes; + + while (i>0) + { + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[prev_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + cstate->raw_buf_index = 0; + copy_bytes = fld_size - att_buf_idx; + + /* + * The bytes that are yet to be taken into att buff are more than + * the entire data block size, but only take the data block size + * elements. + */ + if (copy_bytes >= DATA_BLOCK_SIZE) + copy_bytes = DATA_BLOCK_SIZE; + + memcpy(&cstate->attribute_buf.data[att_buf_idx], + &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], copy_bytes); + att_buf_idx += copy_bytes; + i--; + } + GET_RAW_BUF_INDEX(cstate->raw_buf_index, fld_size, required_blks, curr_blk_bytes); + } + + cstate->attribute_buf.len = fld_size; + cstate->attribute_buf.data[fld_size] = '\0'; + + /* Call the column type's binary input converter */ + result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, + typioparam, typmod); + + /* Trouble if it didn't eat the whole buffer */ + if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format"))); + + *isnull = false; + return result; +} + +/* * GetLinePosition - Return the line position that worker should process. */ static uint32 @@ -1675,7 +2182,9 @@ GetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) { dataBlkPtr->curr_blk_completed = false; dataBlkPtr->skip_bytes = 0; + dataBlkPtr->following_block = -1; pcshared_info->cur_block_pos = block_pos; + MemSet(&dataBlkPtr->data[0], 0, DATA_BLOCK_SIZE); return block_pos; } @@ -2191,10 +2700,26 @@ CopyGetInt32(CopyState cstate, int32 *val) { uint32 buf; - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + /* + * For parallel copy, avoid reading data to raw buf, read directly + * from file, later the data will be read to parallel copy data + * buffers. + */ + if (cstate->nworkers > 0) { - *val = 0; /* suppress compiler warning */ - return false; + if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } + } + else + { + if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } } *val = (int32) pg_ntoh32(buf); return true; @@ -2583,7 +3108,15 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, EndParallelCopy(pcxt); } else + { + /* + * Reset nworkers to -1 here. This is useful in cases where user + * specifies parallel workers, but, no worker is picked up, so go + * back to non parallel mode value of nworkers. + */ + cstate->nworkers = -1; *processed = CopyFrom(cstate); /* copy from file to database */ + } EndCopyFrom(cstate); } @@ -5047,7 +5580,7 @@ BeginCopyFrom(ParseState *pstate, * only in text mode. */ initStringInfo(&cstate->attribute_buf); - cstate->raw_buf = (IsParallelCopy()) ? NULL : (char *) palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; if (!cstate->binary) { @@ -5127,7 +5660,7 @@ BeginCopyFrom(ParseState *pstate, int32 tmp; /* Signature */ - if (CopyReadBinaryData(cstate, readSig, 11) != 11 || + if (CopyGetData(cstate, readSig, 11, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), @@ -5155,7 +5688,7 @@ BeginCopyFrom(ParseState *pstate, /* Skip extension header, if present */ while (tmp-- > 0) { - if (CopyReadBinaryData(cstate, readSig, 1) != 1) + if (CopyGetData(cstate, readSig, 1, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); @@ -5352,60 +5885,45 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, else { /* binary */ - int16 fld_count; - ListCell *cur; - cstate->cur_lineno++; + cstate->max_fields = list_length(cstate->attnumlist); - if (!CopyGetInt16(cstate, &fld_count)) + if (!IsParallelCopy()) { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } + int16 fld_count; + ListCell *cur; - if (fld_count == -1) - { - /* - * Received EOF marker. In a V3-protocol copy, wait for the - * protocol-level EOF, and complain if it doesn't come - * immediately. This ensures that we correctly handle CopyFail, - * if client chooses to send that now. - * - * Note that we MUST NOT try to read more data in an old-protocol - * copy, since there is no protocol-level EOF marker then. We - * could go either way for copy from file, but choose to throw - * error if there's data after the EOF marker, for consistency - * with the new-protocol case. - */ - char dummy; - - if (cstate->copy_dest != COPY_OLD_FE && - CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + CHECK_FIELD_COUNT; - foreach(cur, cstate->attnumlist) + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + } + else { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); + bool eof = false; - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; + eof = CopyReadBinaryTupleWorker(cstate, values, nulls); + + if (eof) + return false; } } @@ -6405,18 +6923,15 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, Datum result; if (!CopyGetInt32(cstate, &fld_size)) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); + EOF_ERROR; + if (fld_size == -1) { *isnull = true; return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); } - if (fld_size < 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid field size"))); + + CHECK_FIELD_SIZE(fld_size); /* reset attribute_buf to empty, and load raw data in it */ resetStringInfo(&cstate->attribute_buf); @@ -6424,9 +6939,7 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, enlargeStringInfo(&cstate->attribute_buf, fld_size); if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, fld_size) != fld_size) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); + EOF_ERROR; cstate->attribute_buf.len = fld_size; cstate->attribute_buf.data[fld_size] = '\0'; -- 1.8.3.1