From dbcf9dcfc87d8e5eed4d56b2a079a64258c591c6 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Sat, 11 Jul 2020 11:39:50 +0530 Subject: [PATCH v10] Parallel Copy For Binary Format Files Leader reads data from the file into the DSM data blocks each of 64K size. It also identifies each tuple data block id, start offset, end offset, tuple size and updates this information in the ring data structure. Workers parallely read the tuple information from the ring data structure, the actual tuple data from the data blocks and parallely insert the tuples into the table. --- src/backend/commands/copy.c | 655 +++++++++++++++++++++++++++++++----- 1 file changed, 574 insertions(+), 81 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 048f2d2cb6..a4e4163c32 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -219,6 +219,17 @@ typedef struct ParallelCopyLineBuf uint64 cur_lineno; /* line number for error messages */ }ParallelCopyLineBuf; +/* + * Represents the usage mode for CopyReadBinaryGetDataBlock. + */ +typedef enum FieldInfoType +{ + FIELD_NONE = 0, + FIELD_COUNT, + FIELD_SIZE, + FIELD_DATA +} FieldInfoType; + /* * Parallel copy data information. */ @@ -240,6 +251,9 @@ typedef struct ParallelCopyData /* Current position in worker_line_buf */ uint32 worker_line_buf_pos; + + /* For binary formatted files */ + ParallelCopyDataBlock *curr_data_block; }ParallelCopyData; /* @@ -397,6 +411,7 @@ typedef struct ParallelCopyCommonKeyData /* Working state for COPY FROM */ AttrNumber num_defaults; Oid relid; + bool binary; }ParallelCopyCommonKeyData; /* @@ -502,7 +517,6 @@ typedef struct CopyMultiInsertInfo int ti_options; /* table insert options */ } CopyMultiInsertInfo; - /* * These macros centralize code used to process line_buf and raw_buf buffers. * They are macros because they often do continue/break control and to avoid @@ -697,8 +711,110 @@ if (!IsParallelCopy()) \ else \ return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); -static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; +/* + * CHECK_FIELD_COUNT - Handles the error cases for field count + * for binary format files. + */ +#define CHECK_FIELD_COUNT \ +{\ + if (fld_count == -1) \ + { \ + if (IsParallelCopy() && \ + !IsLeader()) \ + return true; \ + else if (IsParallelCopy() && \ + IsLeader()) \ + { \ + if (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index + sizeof(fld_count)] != 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + return true; \ + } \ + else \ + { \ + /* \ + * Received EOF marker. In a V3-protocol copy, wait for the \ + * protocol-level EOF, and complain if it doesn't come \ + * immediately. This ensures that we correctly handle CopyFail, \ + * if client chooses to send that now. \ + * \ + * Note that we MUST NOT try to read more data in an old-protocol \ + * copy, since there is no protocol-level EOF marker then. We \ + * could go either way for copy from file, but choose to throw \ + * error if there's data after the EOF marker, for consistency \ + * with the new-protocol case. \ + */ \ + char dummy; \ + if (cstate->copy_dest != COPY_OLD_FE && \ + CopyGetData(cstate, &dummy, 1, 1) > 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + return false; \ + } \ + } \ + if (fld_count != cstate->max_fields) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("row field count is %d, expected %d", \ + (int) fld_count, cstate->max_fields))); \ +} + +/* + * CHECK_FIELD_SIZE - Handles the error case for field size + * for binary format files. + */ +#define CHECK_FIELD_SIZE(fld_size) \ +{ \ + if (fld_size < -1) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("invalid field size")));\ +} + +/* + * EOF_ERROR - Error statement for EOF for binary format + * files. + */ +#define EOF_ERROR \ +{ \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("unexpected EOF in COPY data")));\ +} + +/* + * GET_RAW_BUF_INDEX - Caclulates the raw buf index for the cases + * where the data spread is across multiple data blocks. + */ +#define GET_RAW_BUF_INDEX(raw_buf_index, fld_size, required_blks, curr_blk_bytes) \ +{ \ + raw_buf_index = fld_size - (((required_blks - 1) * DATA_BLOCK_SIZE) + curr_blk_bytes); \ +} + +/* + * GET_REQUIRED_BLOCKS - Caclulates the number of data + * blocks required for the cases where the data spread + * is across multiple data blocks. + */ +#define GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes) \ +{ \ + /* \ + * field size can spread across multiple data blocks, \ + * calculate the number of required data blocks and try to get \ + * those many data blocks. \ + */ \ + required_blks = (int32)(fld_size - curr_blk_bytes)/(int32)DATA_BLOCK_SIZE; \ + /* \ + * check if we need the data block for the field data \ + * bytes that are not modulus of data block size. \ + */ \ + if ((fld_size - curr_blk_bytes)%DATA_BLOCK_SIZE != 0) \ + required_blks++; \ +} +static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, @@ -751,6 +867,13 @@ static void PopulateCatalogInformation(CopyState cstate); static pg_attribute_always_inline uint32 GetLinePosition(CopyState cstate); static pg_attribute_always_inline copy_data_source_cb LookupParallelCopyFnPtr(const char *funcname); static pg_attribute_always_inline char* LookupParallelCopyFnStr(copy_data_source_cb fn_addr); +static uint32 WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info); +static bool CopyReadBinaryTupleLeader(CopyState cstate); +static void CopyReadBinaryFindTupleSize(CopyState cstate, uint32 *line_size); +static bool CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls); +static Datum CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull); +static void CopyReadBinaryGetDataBlock(CopyState cstate, FieldInfoType field_info); /* * CopyCommonInfoForWorker - Copy shared_cstate using cstate information. @@ -769,6 +892,7 @@ CopyCommonInfoForWorker(CopyState cstate, ParallelCopyCommonKeyData *shared_csta shared_cstate->convert_selectively = cstate->convert_selectively; shared_cstate->num_defaults = cstate->num_defaults; shared_cstate->relid = cstate->pcdata->relid; + shared_cstate->binary = cstate->binary; } /* @@ -997,8 +1121,8 @@ FindInsertMethod(CopyState cstate) static pg_attribute_always_inline bool IsParallelCopyAllowed(CopyState cstate) { - /* Parallel copy not allowed for frontend (2.0 protocol) & binary option. */ - if ((cstate->copy_dest == COPY_OLD_FE) || cstate->binary) + /* Parallel copy not allowed for frontend (2.0 protocol). */ + if (cstate->copy_dest == COPY_OLD_FE) return false; /* Check if copy is into foreign table or temporary table. */ @@ -1299,6 +1423,7 @@ ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate, cstate->convert_selectively = shared_cstate->convert_selectively; cstate->num_defaults = shared_cstate->num_defaults; pcdata->relid = shared_cstate->relid; + cstate->binary = shared_cstate->binary; PopulateGlobalsForCopyFrom(cstate, tup_desc, attnamelist); @@ -1314,7 +1439,7 @@ ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate, initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->line_buf); - for (count = 0; count < WORKER_CHUNK_COUNT;count++) + for (count = 0; count < WORKER_CHUNK_COUNT; count++) initStringInfo(&pcdata->worker_line_buf[count].line_buf); cstate->line_buf_converted = false; @@ -1680,32 +1805,66 @@ ParallelCopyLeader(CopyState cstate) /* Execute the before statement triggers from the leader */ ExecBeforeStmtTrigger(cstate); - /* On input just throw the header line away. */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (!cstate->binary) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) { - pcshared_info->is_read_in_progress = false; - return; /* done */ + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } } - } - for (;;) - { - bool done; - cstate->cur_lineno++; + for (;;) + { + bool done; + cstate->cur_lineno++; - /* Actually read the line into memory here. */ - done = CopyReadLine(cstate); + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + } + else + { /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. + * Binary Format Files. + * For parallel copy leader, fill in the error + * context inforation here, in case any failures + * while determining tuple offsets, leader + * would throw the errors with proper context. */ - if (done && cstate->line_buf.len == 0) - break; + ErrorContextCallback errcallback; + errcallback.callback = CopyFromErrorCallback; + errcallback.arg = (void *) cstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + cstate->pcdata->curr_data_block = NULL; + cstate->raw_buf_index = 0; + pcshared_info->populated = 0; + cstate->cur_lineno = 0; + cstate->max_fields = list_length(cstate->attnumlist); + + for (;;) + { + bool eof = false; + cstate->cur_lineno++; + + eof = CopyReadBinaryTupleLeader(cstate); + + if (eof) + break; + } } pcshared_info->is_read_in_progress = false; @@ -1713,7 +1872,357 @@ ParallelCopyLeader(CopyState cstate) } /* - * GetLinePosition - return the line position that worker should process. + * CopyReadBinaryGetDataBlock - gets a new block, updates + * the current offset, calculates the skip bytes. + */ +static void +CopyReadBinaryGetDataBlock(CopyState cstate, FieldInfoType field_info) +{ + ParallelCopyDataBlock *data_block = NULL; + ParallelCopyDataBlock *curr_data_block = cstate->pcdata->curr_data_block; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint8 move_bytes = 0; + uint32 block_pos; + uint32 prev_block_pos; + int read_bytes = 0; + + prev_block_pos = pcshared_info->cur_block_pos; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + if (field_info == FIELD_SIZE || field_info == FIELD_COUNT) + move_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + + if (curr_data_block != NULL) + curr_data_block->skip_bytes = move_bytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (move_bytes > 0 && curr_data_block != NULL) + memmove(&data_block->data[0], &curr_data_block->data[cstate->raw_buf_index], move_bytes); + + elog(DEBUG1, "LEADER - field info %d is spread across data blocks - moved %d bytes from current block %u to %u block", + field_info, move_bytes, prev_block_pos, block_pos); + + read_bytes = CopyGetData(cstate, &data_block->data[move_bytes], 1, (DATA_BLOCK_SIZE - move_bytes)); + + if (field_info == FIELD_NONE && cstate->reached_eof) + return; + + if (cstate->reached_eof) + EOF_ERROR; + + elog(DEBUG1, "LEADER - bytes read from file %d", read_bytes); + + if(field_info == FIELD_SIZE || field_info == FIELD_DATA) + { + ParallelCopyDataBlock *prev_data_block = NULL; + prev_data_block = curr_data_block; + prev_data_block->following_block = block_pos; + + if (prev_data_block->curr_blk_completed == false) + prev_data_block->curr_blk_completed = true; + + pg_atomic_add_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + } + + cstate->pcdata->curr_data_block = data_block; + cstate->raw_buf_index = 0; +} + +/* + * CopyReadBinaryTupleLeader - leader reads data from binary formatted file + * to data blocks and identies tuple boundaries/offsets so that workers + * can work on the data blocks data. + */ +static bool +CopyReadBinaryTupleLeader(CopyState cstate) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + int16 fld_count; + uint32 line_size = 0; + uint32 start_block_pos; + uint32 start_offset; + + if (cstate->pcdata->curr_data_block == NULL) + { + CopyReadBinaryGetDataBlock(cstate, FIELD_NONE); + + /* + * no data is read from file here. one possibility + * to be here could be that the binary file just + * has a valid signature but nothing else. + */ + if (cstate->reached_eof) + return true; + } + + if ((cstate->raw_buf_index + sizeof(fld_count)) >= DATA_BLOCK_SIZE) + CopyReadBinaryGetDataBlock(cstate, FIELD_COUNT); + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + CHECK_FIELD_COUNT; + start_offset = cstate->raw_buf_index; + cstate->raw_buf_index += sizeof(fld_count); + line_size += sizeof(fld_count); + start_block_pos = pcshared_info->cur_block_pos; + + CopyReadBinaryFindTupleSize(cstate, &line_size); + + pg_atomic_add_fetch_u32(&cstate->pcdata->curr_data_block->unprocessed_line_parts, 1); + + if (line_size > 0) + { + int line_pos = UpdateBlockInLineInfo(cstate, + start_block_pos, + start_offset, + line_size, + LINE_LEADER_POPULATED); + + pcshared_info->populated++; + elog(DEBUG1, "LEADER - adding - block:%u, offset:%u, line size:%u line position:%d", + start_block_pos, start_offset, line_size, line_pos); + } + + return false; +} + +/* + * CopyReadBinaryFindTupleSize - leader identifies boundaries/ + * offsets for each attribute/column and finally results in the + * tuple/row size. It moves on to next data block if the attribute/ + * column is spread across data blocks. + */ +static void +CopyReadBinaryFindTupleSize(CopyState cstate, uint32 *line_size) +{ + int32 fld_size; + ListCell *cur; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + foreach(cur, cstate->attnumlist) + { + int att_num = lfirst_int(cur); + Form_pg_attribute att = TupleDescAttr(tup_desc, (att_num - 1)); + cstate->cur_attname = NameStr(att->attname); + fld_size = 0; + + if ((cstate->raw_buf_index + sizeof(fld_size)) >= DATA_BLOCK_SIZE) + CopyReadBinaryGetDataBlock(cstate, FIELD_SIZE); + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + cstate->raw_buf_index += sizeof(fld_size); + *line_size += sizeof(fld_size); + fld_size = (int32) pg_ntoh32(fld_size); + + CHECK_FIELD_SIZE(fld_size); + + *line_size += fld_size; + + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) + { + cstate->raw_buf_index += fld_size; + elog(DEBUG1, "LEADER - tuple lies in he same data block"); + } + else + { + int32 required_blks = 0; + int32 curr_blk_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + int i = 0; + + GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes); + + i = required_blks; + + while(i > 0) + { + CopyReadBinaryGetDataBlock(cstate, FIELD_DATA); + i--; + } + + GET_RAW_BUF_INDEX(cstate->raw_buf_index, fld_size, required_blks, curr_blk_bytes); + + /* + * raw_buf_index should never cross data block size, + * as the required number of data blocks would have + * been obtained in the above while loop. + */ + Assert(cstate->raw_buf_index <= DATA_BLOCK_SIZE); + } + cstate->cur_attname = NULL; + } +} + +/* + * CopyReadBinaryTupleWorker - each worker reads data from data blocks after + * getting leader-identified tuple offsets from ring data structure. + */ +static bool +CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls) +{ + uint32 line_pos; + ParallelCopyLineBoundary *line_info; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + int16 fld_count; + ListCell *cur; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + line_pos = GetLinePosition(cstate); + + if (line_pos == -1) + return true; + + line_info = &pcshared_info->line_boundaries.ring[line_pos]; + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[line_info->first_block]; + cstate->raw_buf_index = line_info->start_offset; + +#if 0 + if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE) + { + /* + * The case where field count spread across datablocks should never occur, + * as the leader would have moved it to next block. enable this if block + * for debugging purposes only. + */ + elog(DEBUG1, "WORKER - field count spread across datablocks should never occur"); + } +#endif + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + + CHECK_FIELD_COUNT; + + cstate->raw_buf_index += sizeof(fld_count); + + foreach(cur, cstate->attnumlist) + { + int att_num = lfirst_int(cur); + int m = att_num - 1; + Form_pg_attribute att = TupleDescAttr(tup_desc, m); + cstate->cur_attname = NameStr(att->attname); + + values[m] = CopyReadBinaryAttributeWorker(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + pg_atomic_sub_fetch_u32(&cstate->pcdata->curr_data_block->unprocessed_line_parts, 1); + line_info->start_offset = -1; + pg_atomic_write_u32(&line_info->line_state, LINE_WORKER_PROCESSED); + pg_atomic_write_u32(&line_info->line_size, -1); + pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); + + return false; +} + +/* + * CopyReadBinaryAttributeWorker - leader identifies boundaries/offsets + * for each attribute/column, it moves on to next data block if the + * attribute/column is spread across data blocks. + */ +static Datum +CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull) +{ + int32 fld_size; + Datum result; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + + if ((cstate->raw_buf_index + sizeof(fld_size)) >= DATA_BLOCK_SIZE) + { + ParallelCopyDataBlock *prev_data_block = cstate->pcdata->curr_data_block; + elog(DEBUG1, "WORKER - field size is spread across data blocks"); + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[prev_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + cstate->raw_buf_index = 0; + } + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + fld_size = (int32) pg_ntoh32(fld_size); + + CHECK_FIELD_SIZE(fld_size); + + cstate->raw_buf_index += sizeof(fld_size); + + /* reset attribute_buf to empty, and load raw data in it */ + resetStringInfo(&cstate->attribute_buf); + + enlargeStringInfo(&cstate->attribute_buf, fld_size); + + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) + { + elog(DEBUG1, "WORKER - tuple lies in single data block"); + memcpy(&cstate->attribute_buf.data[0],&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], fld_size); + cstate->raw_buf_index += fld_size; + } + else + { + uint32 att_buf_idx = 0; + uint32 copy_bytes = 0; + int32 required_blks = 0; + int32 curr_blk_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + ParallelCopyDataBlock *prev_data_block = NULL; + int i = 0; + + GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes); + + i = required_blks; + prev_data_block = cstate->pcdata->curr_data_block; + elog(DEBUG1, "WORKER - tuple is spread across data blocks"); + memcpy(&cstate->attribute_buf.data[0], &prev_data_block->data[cstate->raw_buf_index], + curr_blk_bytes); + copy_bytes = curr_blk_bytes; + att_buf_idx = curr_blk_bytes; + + while (i>0) + { + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[prev_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + cstate->raw_buf_index = 0; + copy_bytes = fld_size - att_buf_idx; + + /* + * the bytes that are yet to be taken into att buff are more than + * the entire data block size, but only take the data block size + * elements. + */ + if (copy_bytes >= DATA_BLOCK_SIZE) + copy_bytes = DATA_BLOCK_SIZE; + + memcpy(&cstate->attribute_buf.data[att_buf_idx], + &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], copy_bytes); + att_buf_idx += copy_bytes; + i--; + } + GET_RAW_BUF_INDEX(cstate->raw_buf_index, fld_size, required_blks, curr_blk_bytes); + } + + cstate->attribute_buf.len = fld_size; + cstate->attribute_buf.data[fld_size] = '\0'; + + /* Call the column type's binary input converter */ + result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, + typioparam, typmod); + + /* Trouble if it didn't eat the whole buffer */ + if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format"))); + + *isnull = false; + return result; +} + +/* + * GetLinePosition - return the line position that worker should pcdata->process. */ static uint32 GetLinePosition(CopyState cstate) @@ -1801,7 +2310,9 @@ GetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) { dataBlkPtr->curr_blk_completed = false; dataBlkPtr->skip_bytes = 0; + dataBlkPtr->following_block = -1; pcshared_info->cur_block_pos = block_pos; + MemSet(&dataBlkPtr->data[0], 0, DATA_BLOCK_SIZE); return block_pos; } @@ -5476,60 +5987,47 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, else { /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) + if (!IsParallelCopy()) { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } + int16 fld_count; + ListCell *cur; - if (fld_count == -1) - { - /* - * Received EOF marker. In a V3-protocol copy, wait for the - * protocol-level EOF, and complain if it doesn't come - * immediately. This ensures that we correctly handle CopyFail, - * if client chooses to send that now. - * - * Note that we MUST NOT try to read more data in an old-protocol - * copy, since there is no protocol-level EOF marker then. We - * could go either way for copy from file, but choose to throw - * error if there's data after the EOF marker, for consistency - * with the new-protocol case. - */ - char dummy; + cstate->cur_lineno++; + cstate->max_fields = list_length(cstate->attnumlist); - if (cstate->copy_dest != COPY_OLD_FE && - CopyGetData(cstate, &dummy, 1, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + CHECK_FIELD_COUNT; - foreach(cur, cstate->attnumlist) + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + } + else { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); + bool eof = false; + cstate->cur_lineno++; + cstate->max_fields = list_length(cstate->attnumlist); - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; + eof = CopyReadBinaryTupleWorker(cstate, values, nulls); + + if (eof) + return false; } } @@ -6465,18 +6963,15 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, Datum result; if (!CopyGetInt32(cstate, &fld_size)) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); + EOF_ERROR; + if (fld_size == -1) { *isnull = true; return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); } - if (fld_size < 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid field size"))); + + CHECK_FIELD_SIZE(fld_size); /* reset attribute_buf to empty, and load raw data in it */ resetStringInfo(&cstate->attribute_buf); @@ -6484,9 +6979,7 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, enlargeStringInfo(&cstate->attribute_buf, fld_size); if (CopyGetData(cstate, cstate->attribute_buf.data, fld_size, fld_size) != fld_size) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); + EOF_ERROR; cstate->attribute_buf.len = fld_size; cstate->attribute_buf.data[fld_size] = '\0'; -- 2.25.1