From 5f3f9c365e5ba75a293f9685247a1a6c19762c51 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 15 Jun 2020 15:41:06 +0530 Subject: [PATCH v3] Parallel Copy For Binary Format Files Leader reads data from the file into the DSM data blocks each of 64K size. It also identifies each tuple data block id, start offset, end offset, tuple size and updates this information in the ring data structure. Workers parallely read the tuple information from the ring data structure, the actual tuple data from the data blocks and parallely insert the tuples into the table. --- src/backend/commands/copy.c | 667 ++++++++++++++++++++++++++++++++---- 1 file changed, 599 insertions(+), 68 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index b1e2e71a7c..5b9508d27b 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -231,6 +231,16 @@ typedef struct ParallelCopyData uint32 worker_line_buf_pos; }ParallelCopyData; +/* + * Tuple boundary information used for parallel copy + * for binary format files. + */ +typedef struct ParallelCopyTupleInfo +{ + uint32 offset; + uint32 block_id; +}ParallelCopyTupleInfo; + /* * This struct contains all the state variables used throughout a COPY * operation. For simplicity, we use the same struct for all variants of COPY, @@ -361,6 +371,16 @@ typedef struct CopyStateData int nworkers; bool is_parallel; ParallelCopyData *pcdata; + + /* + * Parallel copy for binary formatted files + */ + ParallelCopyDataBlock *curr_data_block; + ParallelCopyDataBlock *prev_data_block; + uint32 curr_data_offset; + uint32 curr_block_pos; + ParallelCopyTupleInfo curr_tuple_start_info; + ParallelCopyTupleInfo curr_tuple_end_info; } CopyStateData; /* @@ -386,6 +406,7 @@ typedef struct ParallelCopyCommonKeyData /* Working state for COPY FROM */ AttrNumber num_defaults; Oid relid; + bool binary; }ParallelCopyCommonKeyData; /* @@ -741,6 +762,14 @@ static void PopulateCatalogInformation(CopyState cstate); static pg_attribute_always_inline uint32 GetChunkPosition(CopyState cstate); static pg_attribute_always_inline copy_data_source_cb LookupParallelCopyFnPtr(const char *funcname); static pg_attribute_always_inline char* LookupParallelCopyFnStr(copy_data_source_cb fn_addr); +static uint32 WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info); +static pg_attribute_always_inline bool CopyReadBinaryTupleLeader(CopyState cstate); +static pg_attribute_always_inline void CopyReadBinaryAttributeLeader(CopyState cstate, + FmgrInfo *flinfo, Oid typioparam, int32 typmod, + uint32 *new_block_pos); +static pg_attribute_always_inline bool CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls); +static pg_attribute_always_inline Datum CopyReadBinaryAttributeWorker(CopyState cstate, int column_no, + FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull); /* * CopyCommonInfoForWorker - Copy shared_cstate using cstate information. @@ -759,6 +788,7 @@ CopyCommonInfoForWorker(CopyState cstate, ParallelCopyCommonKeyData *shared_csta shared_cstate->convert_selectively = cstate->convert_selectively; shared_cstate->num_defaults = cstate->num_defaults; shared_cstate->relid = cstate->pcdata->relid; + shared_cstate->binary = cstate->binary; } /* @@ -977,8 +1007,8 @@ FindInsertMethod(CopyState cstate) static pg_attribute_always_inline bool IsParallelCopyAllowed(CopyState cstate) { - /* Parallel copy not allowed for freeze & binary option. */ - if (cstate->freeze || cstate->binary) + /* Parallel copy not allowed for freeze. */ + if (cstate->freeze) return false; /* Check if copy is into foreign table. */ @@ -1270,6 +1300,7 @@ ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate, cstate->convert_selectively = shared_cstate->convert_selectively; cstate->num_defaults = shared_cstate->num_defaults; pcdata->relid = shared_cstate->relid; + cstate->binary = shared_cstate->binary; PopulateAttributes(cstate, tup_desc, attnamelist); @@ -1302,6 +1333,15 @@ ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate, cstate->max_fields = attr_count; cstate->raw_fields = (char **)palloc(attr_count * sizeof(char *)); } + + cstate->curr_data_block = NULL; + cstate->prev_data_block = NULL; + cstate->curr_data_offset = 0; + cstate->curr_block_pos = 0; + cstate->curr_tuple_start_info.block_id = -1; + cstate->curr_tuple_start_info.offset = -1; + cstate->curr_tuple_end_info.block_id = -1; + cstate->curr_tuple_end_info.offset = -1; } /* @@ -1650,38 +1690,515 @@ ParallelCopyLeader(CopyState cstate) /* Execute the before statement triggers from the leader */ ExecBeforeStmtTrigger(cstate); - /* On input just throw the header line away. */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (!cstate->binary) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) { - pcshared_info->is_read_in_progress = false; - return; /* done */ + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } } - } - for (;;) + for (;;) + { + bool done; + cstate->cur_lineno++; + + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + } + else { - bool done; - cstate->cur_lineno++; + cstate->curr_data_block = NULL; + cstate->prev_data_block = NULL; + cstate->curr_data_offset = 0; + cstate->curr_block_pos = 0; + cstate->curr_tuple_start_info.block_id = -1; + cstate->curr_tuple_start_info.offset = -1; + cstate->curr_tuple_end_info.block_id = -1; + cstate->curr_tuple_end_info.offset = -1; + pcshared_info->populated = 0; + cstate->cur_lineno = 0; - /* Actually read the line into memory here. */ - done = CopyReadLine(cstate); + for (;;) + { + bool eof = false; + cstate->cur_lineno++; - /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. - */ - if (done && cstate->line_buf.len == 0) - break; + eof = CopyReadBinaryTupleLeader(cstate); + + if (eof) + break; + } } pcshared_info->is_read_in_progress = false; cstate->cur_lineno = 0; } +/* + * CopyReadBinaryTupleLeader - leader reads data from binary formatted file + * to data blocks and identies tuple boundaries/offsets so that workers + * can work on the data blocks data. + */ +static pg_attribute_always_inline bool +CopyReadBinaryTupleLeader(CopyState cstate) +{ + int readbytes = 0; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint32 block_pos; + int16 fld_count; + AttrNumber attr_count = list_length(cstate->attnumlist); + ListCell *cur; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + TupleDesc tupDesc = RelationGetDescr(cstate->rel); + uint32 new_block_pos; + uint32 chunk_size; + + if (cstate->curr_data_block == NULL) + { + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + cstate->curr_block_pos = block_pos; + + cstate->curr_data_block = &pcshared_info->data_blocks[block_pos]; + + cstate->curr_data_offset = 0; + + readbytes = CopyGetData(cstate, &cstate->curr_data_block->data, 1, DATA_BLOCK_SIZE); + + elog(DEBUG1, "LEADER - bytes read from file %d", readbytes); + + if (cstate->reached_eof) + return true; + } + + if (cstate->curr_data_offset + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1)) + { + ParallelCopyDataBlock *data_block = NULL; + uint8 movebytes = 0; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + movebytes = DATA_BLOCK_SIZE - cstate->curr_data_offset; + + cstate->curr_data_block->skip_bytes = movebytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (movebytes > 0) + memmove(&data_block->data[0], &cstate->curr_data_block->data[cstate->curr_data_offset], + movebytes); + + elog(DEBUG1, "LEADER - field count is spread across data blocks - moved %d bytes from current block %u to %u block", + movebytes, cstate->curr_block_pos, block_pos); + + readbytes = CopyGetData(cstate, &data_block->data[movebytes], 1, (DATA_BLOCK_SIZE - movebytes)); + + elog(DEBUG1, "LEADER - bytes read from file after field count is moved to next data block %d", readbytes); + + if (cstate->reached_eof) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + cstate->curr_data_block = data_block; + cstate->curr_data_offset = 0; + cstate->curr_block_pos = block_pos; + } + + memcpy(&fld_count, &cstate->curr_data_block->data[cstate->curr_data_offset], sizeof(fld_count)); + + fld_count = (int16) pg_ntoh16(fld_count); + + if (fld_count == -1) + { + /* + * Received EOF marker. In a V3-protocol copy, wait for the + * protocol-level EOF, and complain if it doesn't come + * immediately. This ensures that we correctly handle CopyFail, + * if client chooses to send that now. + * + * Note that we MUST NOT try to read more data in an old-protocol + * copy, since there is no protocol-level EOF marker then. We + * could go either way for copy from file, but choose to throw + * error if there's data after the EOF marker, for consistency + * with the new-protocol case. + */ + char dummy; + + if (cstate->copy_dest != COPY_OLD_FE && + CopyGetData(cstate, &dummy, 1, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return true; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + cstate->curr_tuple_start_info.block_id = cstate->curr_block_pos; + cstate->curr_tuple_start_info.offset = cstate->curr_data_offset; + cstate->curr_data_offset = cstate->curr_data_offset + sizeof(fld_count); + new_block_pos = cstate->curr_block_pos; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + CopyReadBinaryAttributeLeader(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &new_block_pos); + } + + cstate->curr_tuple_end_info.block_id = new_block_pos; + cstate->curr_tuple_end_info.offset = cstate->curr_data_offset-1;; + + if (cstate->curr_tuple_start_info.block_id == cstate->curr_tuple_end_info.block_id) + { + elog(DEBUG1,"LEADER - tuple lies in a single data block"); + + chunk_size = cstate->curr_tuple_end_info.offset - cstate->curr_tuple_start_info.offset + 1; + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].unprocessed_chunk_parts, 1); + } + else + { + uint32 following_block_id = pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].following_block; + + elog(DEBUG1,"LEADER - tuple is spread across data blocks"); + + chunk_size = DATA_BLOCK_SIZE - cstate->curr_tuple_start_info.offset- + pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].skip_bytes; + + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].unprocessed_chunk_parts, 1); + + while (following_block_id != cstate->curr_tuple_end_info.block_id) + { + chunk_size = chunk_size + DATA_BLOCK_SIZE - pcshared_info->data_blocks[following_block_id].skip_bytes; + + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_chunk_parts, 1); + + following_block_id = pcshared_info->data_blocks[following_block_id].following_block; + + if (following_block_id == -1) + break; + } + + if (following_block_id != -1) + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_chunk_parts, 1); + + chunk_size = chunk_size + cstate->curr_tuple_end_info.offset + 1; + } + + if (chunk_size > 0) + { + int chunk_pos = UpdateBlockInChunkInfo(cstate, + cstate->curr_tuple_start_info.block_id, + cstate->curr_tuple_start_info.offset, + chunk_size, + CHUNK_LEADER_POPULATED); + + pcshared_info->populated++; + + elog(DEBUG1, "LEADER - adding - block:%u, offset:%u, chunk size:%u chunk position:%d", + cstate->curr_tuple_start_info.block_id, + cstate->curr_tuple_start_info.offset, + chunk_size, chunk_pos); + } + + cstate->curr_tuple_start_info.block_id = -1; + cstate->curr_tuple_start_info.offset = -1; + cstate->curr_tuple_end_info.block_id = -1; + cstate->curr_tuple_end_info.offset = -1; + + return false; +} + +/* + * CopyReadBinaryAttributeLeader - leader identifies boundaries/offsets + * for each attribute/column, it moves on to next data block if the + * attribute/column is spread across data blocks. + */ +static pg_attribute_always_inline void +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, uint32 *new_block_pos) +{ + int32 fld_size; + int readbytes; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint32 block_pos; + + if ((cstate->curr_data_offset + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1)) + { + ParallelCopyDataBlock *data_block = NULL; + uint8 movebytes = DATA_BLOCK_SIZE - cstate->curr_data_offset; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + cstate->curr_data_block->skip_bytes = movebytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (movebytes > 0) + memmove(&data_block->data[0], &cstate->curr_data_block->data[cstate->curr_data_offset], movebytes); + + elog(DEBUG1, "LEADER - field size is spread across data blocks - moved %d bytes from current block %u to %u block", + movebytes, cstate->curr_block_pos, block_pos); + + readbytes = CopyGetData(cstate, &data_block->data[movebytes], 1, DATA_BLOCK_SIZE-movebytes); + + elog(DEBUG1, "LEADER - bytes read from file after field size is moved to next data block %d", readbytes); + + if (cstate->reached_eof) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + cstate->prev_data_block = cstate->curr_data_block; + cstate->prev_data_block->following_block = block_pos; + cstate->curr_data_block = data_block; + cstate->curr_block_pos = block_pos; + + if (cstate->prev_data_block->curr_blk_completed == false) + cstate->prev_data_block->curr_blk_completed = true; + + cstate->curr_data_offset = 0; + *new_block_pos = block_pos; + } + + memcpy(&fld_size, &cstate->curr_data_block->data[cstate->curr_data_offset], sizeof(fld_size)); + + cstate->curr_data_offset = cstate->curr_data_offset + sizeof(fld_size); + + fld_size = (int32) pg_ntoh32(fld_size); + + if (fld_size == 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + if (fld_size < -1) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid field size"))); + + if ((DATA_BLOCK_SIZE-cstate->curr_data_offset) >= fld_size) + { + cstate->curr_data_offset = cstate->curr_data_offset + fld_size; + } + else + { + ParallelCopyDataBlock *data_block = NULL; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + data_block = &pcshared_info->data_blocks[block_pos]; + + readbytes = CopyGetData(cstate, &data_block->data[0], 1, DATA_BLOCK_SIZE); + + elog(DEBUG1, "LEADER - bytes read from file after detecting that tuple is spread across data blocks %d", readbytes); + + if (cstate->reached_eof) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + + cstate->prev_data_block = cstate->curr_data_block; + cstate->prev_data_block->following_block = block_pos; + + if (cstate->prev_data_block->curr_blk_completed == false) + cstate->prev_data_block->curr_blk_completed = true; + + cstate->curr_data_block = data_block; + cstate->curr_data_offset = fld_size - (DATA_BLOCK_SIZE - cstate->curr_data_offset); + cstate->curr_block_pos = block_pos; + *new_block_pos = block_pos; + } +} + +/* + * CopyReadBinaryTupleWorker - each worker reads data from data blocks after + * getting leader-identified tuple offsets from ring data structure. + */ +static pg_attribute_always_inline bool +CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls) +{ + uint32 chunk_pos; + ParallelCopyChunkBoundary *chunk_info; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + int16 fld_count; + AttrNumber attr_count = list_length(cstate->attnumlist); + ListCell *cur; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + TupleDesc tupDesc = RelationGetDescr(cstate->rel); + int i; + + chunk_pos = GetChunkPosition(cstate); + + if (chunk_pos == -1) + return true; + + chunk_info = &pcshared_info->chunk_boundaries.ring[chunk_pos]; + cstate->curr_data_block = &pcshared_info->data_blocks[chunk_info->first_block]; + cstate->curr_data_offset = chunk_info->start_offset; + + if (cstate->curr_data_offset + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1)) + { + /*the case where field count spread across datablocks should never occur. + as the leader would have moved it to next block*/ + elog(DEBUG1, "WORKER - field count spread across datablocks should never occur"); + } + + memcpy(&fld_count, &cstate->curr_data_block->data[cstate->curr_data_offset], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + + if (fld_count == -1) + { + return true; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + cstate->curr_data_offset = cstate->curr_data_offset + sizeof(fld_count); + i = 0; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + cstate->cur_attname = NameStr(att->attname); + + i++; + values[m] = CopyReadBinaryAttributeWorker(cstate, + i, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + pg_atomic_sub_fetch_u32(&cstate->curr_data_block->unprocessed_chunk_parts, 1); + chunk_info->start_offset = -1; + pg_atomic_write_u32(&chunk_info->chunk_state, CHUNK_WORKER_PROCESSED); + pg_atomic_write_u32(&chunk_info->chunk_size, -1); + pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); + + return false; +} + +/* + * CopyReadBinaryAttributeWorker - leader identifies boundaries/offsets + * for each attribute/column, it moves on to next data block if the + * attribute/column is spread across data blocks. + */ +static pg_attribute_always_inline Datum +CopyReadBinaryAttributeWorker(CopyState cstate, int column_no, + FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull) +{ + int32 fld_size; + Datum result; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + + if ((cstate->curr_data_offset + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1)) + { + ParallelCopyDataBlock *prev_data_block = cstate->curr_data_block; + + elog(DEBUG1, "WORKER - field size is spread across data blocks"); + + cstate->curr_data_block = &pcshared_info->data_blocks[cstate->curr_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_chunk_parts, 1); + cstate->curr_data_offset = 0; + } + + memcpy(&fld_size, &cstate->curr_data_block->data[cstate->curr_data_offset], sizeof(fld_size)); + fld_size = (int32) pg_ntoh32(fld_size); + + if (fld_size == 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + if (fld_size < -1) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid field size"))); + + cstate->curr_data_offset = cstate->curr_data_offset + sizeof(fld_size); + + /* reset attribute_buf to empty, and load raw data in it */ + resetStringInfo(&cstate->attribute_buf); + + enlargeStringInfo(&cstate->attribute_buf, fld_size); + + if ((DATA_BLOCK_SIZE-cstate->curr_data_offset) >= fld_size) + { + elog(DEBUG1, "WORKER - tuple lies in single data block"); + + memcpy(&cstate->attribute_buf.data[0],&cstate->curr_data_block->data[cstate->curr_data_offset], fld_size); + cstate->curr_data_offset = cstate->curr_data_offset + fld_size; + } + else + { + ParallelCopyDataBlock *prev_data_block = cstate->curr_data_block; + + elog(DEBUG1, "WORKER - tuple is spread across data blocks"); + + memcpy(&cstate->attribute_buf.data[0], &cstate->curr_data_block->data[cstate->curr_data_offset], + (DATA_BLOCK_SIZE - cstate->curr_data_offset)); + cstate->curr_data_block = &pcshared_info->data_blocks[cstate->curr_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_chunk_parts, 1); + memcpy(&cstate->attribute_buf.data[DATA_BLOCK_SIZE - cstate->curr_data_offset], + &cstate->curr_data_block->data[0], (fld_size - (DATA_BLOCK_SIZE - cstate->curr_data_offset))); + cstate->curr_data_offset = fld_size - (DATA_BLOCK_SIZE - cstate->curr_data_offset); + } + + cstate->attribute_buf.len = fld_size; + cstate->attribute_buf.data[fld_size] = '\0'; + + /* Call the column type's binary input converter */ + result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, + typioparam, typmod); + + /* Trouble if it didn't eat the whole buffer */ + if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format"))); + + *isnull = false; + return result; +} + /* * GetChunkPosition - return the chunk position that worker should process. */ @@ -5402,63 +5919,77 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, else { /* binary */ - int16 fld_count; - ListCell *cur; + if (!IsParallelCopy()) + { + int16 fld_count; + ListCell *cur; - cstate->cur_lineno++; + cstate->cur_lineno++; - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } - if (fld_count == -1) - { - /* - * Received EOF marker. In a V3-protocol copy, wait for the - * protocol-level EOF, and complain if it doesn't come - * immediately. This ensures that we correctly handle CopyFail, - * if client chooses to send that now. - * - * Note that we MUST NOT try to read more data in an old-protocol - * copy, since there is no protocol-level EOF marker then. We - * could go either way for copy from file, but choose to throw - * error if there's data after the EOF marker, for consistency - * with the new-protocol case. - */ - char dummy; + if (fld_count == -1) + { + /* + * Received EOF marker. In a V3-protocol copy, wait for the + * protocol-level EOF, and complain if it doesn't come + * immediately. This ensures that we correctly handle CopyFail, + * if client chooses to send that now. + * + * Note that we MUST NOT try to read more data in an old-protocol + * copy, since there is no protocol-level EOF marker then. We + * could go either way for copy from file, but choose to throw + * error if there's data after the EOF marker, for consistency + * with the new-protocol case. + */ + char dummy; + + if (cstate->copy_dest != COPY_OLD_FE && + CopyGetData(cstate, &dummy, 1, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } - if (cstate->copy_dest != COPY_OLD_FE && - CopyGetData(cstate, &dummy, 1, 1) > 0) + if (fld_count != attr_count) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + i = 0; + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + i++; + values[m] = CopyReadBinaryAttribute(cstate, + i, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } } + else + { + bool eof = false; - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + cstate->cur_lineno++; - i = 0; - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); + eof = CopyReadBinaryTupleWorker(cstate, values, nulls); - cstate->cur_attname = NameStr(att->attname); - i++; - values[m] = CopyReadBinaryAttribute(cstate, - i, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; + if (eof) + return false; } } -- 2.25.1