From be5da9036b223be38d0df4617781eb02634ecdac Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 24 Jun 2020 13:12:55 +0530 Subject: [PATCH 6/6] Parallel Copy For Binary Format Files Leader reads data from the file into the DSM data blocks each of 64K size. It also identifies each tuple data block id, start offset, end offset, tuple size and updates this information in the ring data structure. Workers parallely read the tuple information from the ring data structure, the actual tuple data from the data blocks and parallely insert the tuples into the table. --- src/backend/commands/copy.c | 642 ++++++++++++++++++++++++++++++++---- 1 file changed, 572 insertions(+), 70 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index e8a89a40a0..e1f03241e8 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -219,6 +219,16 @@ typedef struct ParallelCopyLineBuf uint64 cur_lineno; /* line number for error messages */ }ParallelCopyLineBuf; +/* + * Tuple boundary information used for parallel copy + * for binary format files. + */ +typedef struct ParallelCopyTupleInfo +{ + uint32 offset; + uint32 block_id; +}ParallelCopyTupleInfo; + /* * Parallel copy data information. */ @@ -240,6 +250,11 @@ typedef struct ParallelCopyData /* Current position in worker_line_buf */ uint32 worker_line_buf_pos; + + /* + * For binary formatted files + */ + ParallelCopyDataBlock *curr_data_block; }ParallelCopyData; /* @@ -397,6 +412,7 @@ typedef struct ParallelCopyCommonKeyData /* Working state for COPY FROM */ AttrNumber num_defaults; Oid relid; + bool binary; }ParallelCopyCommonKeyData; /* @@ -697,6 +713,51 @@ if (!IsParallelCopy()) \ else \ return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); +/* + * CHECK_FIELD_COUNT - Handles the error cases for field count + * for binary format files. + */ +#define CHECK_FIELD_COUNT \ +{\ + if (fld_count == -1) \ + { \ + if (IsParallelCopy() && \ + !IsLeader()) \ + return true; \ + else \ + { \ + /* \ + * Received EOF marker. In a V3-protocol copy, wait for the \ + * protocol-level EOF, and complain if it doesn't come \ + * immediately. This ensures that we correctly handle CopyFail, \ + * if client chooses to send that now. \ + * \ + * Note that we MUST NOT try to read more data in an old-protocol \ + * copy, since there is no protocol-level EOF marker then. We \ + * could go either way for copy from file, but choose to throw \ + * error if there's data after the EOF marker, for consistency \ + * with the new-protocol case. \ + */ \ + char dummy; \ + if (cstate->copy_dest != COPY_OLD_FE && \ + CopyGetData(cstate, &dummy, 1, 1) > 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + if (IsParallelCopy() && \ + IsLeader()) \ + return true; \ + else \ + return false; \ + } \ + } \ + if (fld_count != cstate->max_fields) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("row field count is %d, expected %d", \ + (int) fld_count, cstate->max_fields))); \ +} + static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; @@ -751,6 +812,16 @@ static void PopulateCatalogInformation(CopyState cstate); static pg_attribute_always_inline uint32 GetLinePosition(CopyState cstate); static pg_attribute_always_inline copy_data_source_cb LookupParallelCopyFnPtr(const char *funcname); static pg_attribute_always_inline char* LookupParallelCopyFnStr(copy_data_source_cb fn_addr); +static uint32 WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info); +static pg_attribute_always_inline bool CopyReadBinaryTupleLeader(CopyState cstate); +static pg_attribute_always_inline void CopyReadBinaryAttributeLeader(CopyState cstate, + FmgrInfo *flinfo, Oid typioparam, int32 typmod, + uint32 *new_block_pos, int m, ParallelCopyTupleInfo *tuple_start_info_ptr, + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size); +static pg_attribute_always_inline bool CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls); +static pg_attribute_always_inline Datum CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull); +static void AdjustFieldInfo(CopyState cstate, uint8 mode); /* * CopyCommonInfoForWorker - Copy shared_cstate using cstate information. @@ -769,6 +840,7 @@ CopyCommonInfoForWorker(CopyState cstate, ParallelCopyCommonKeyData *shared_csta shared_cstate->convert_selectively = cstate->convert_selectively; shared_cstate->num_defaults = cstate->num_defaults; shared_cstate->relid = cstate->pcdata->relid; + shared_cstate->binary = cstate->binary; } /* @@ -997,8 +1069,8 @@ FindInsertMethod(CopyState cstate) static pg_attribute_always_inline bool IsParallelCopyAllowed(CopyState cstate) { - /* Parallel copy not allowed for frontend (2.0 protocol) & binary option. */ - if ((cstate->copy_dest == COPY_OLD_FE) || cstate->binary) + /* Parallel copy not allowed for frontend (2.0 protocol). */ + if (cstate->copy_dest == COPY_OLD_FE) return false; /* Check if copy is into foreign table or temporary table. */ @@ -1299,6 +1371,7 @@ ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate, cstate->convert_selectively = shared_cstate->convert_selectively; cstate->num_defaults = shared_cstate->num_defaults; pcdata->relid = shared_cstate->relid; + cstate->binary = shared_cstate->binary; PopulateGlobalsForCopyFrom(cstate, tup_desc, attnamelist); @@ -1314,7 +1387,7 @@ ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate, initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->line_buf); - for (count = 0; count < WORKER_CHUNK_COUNT;count++) + for (count = 0; count < WORKER_CHUNK_COUNT; count++) initStringInfo(&pcdata->worker_line_buf[count].line_buf); cstate->line_buf_converted = false; @@ -1679,32 +1752,55 @@ ParallelCopyLeader(CopyState cstate) /* Execute the before statement triggers from the leader */ ExecBeforeStmtTrigger(cstate); - /* On input just throw the header line away. */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (!cstate->binary) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) { - pcshared_info->is_read_in_progress = false; - return; /* done */ + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } } - } - for (;;) + for (;;) + { + bool done; + cstate->cur_lineno++; + + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + } + else { - bool done; - cstate->cur_lineno++; + /* binary format */ + cstate->pcdata->curr_data_block = NULL; + cstate->raw_buf_index = 0; + pcshared_info->populated = 0; + cstate->cur_lineno = 0; + cstate->max_fields = list_length(cstate->attnumlist); - /* Actually read the line into memory here. */ - done = CopyReadLine(cstate); + for (;;) + { + bool eof = false; + cstate->cur_lineno++; - /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. - */ - if (done && cstate->line_buf.len == 0) - break; + eof = CopyReadBinaryTupleLeader(cstate); + + if (eof) + break; + } } pcshared_info->is_read_in_progress = false; @@ -1712,7 +1808,425 @@ ParallelCopyLeader(CopyState cstate) } /* - * GetLinePosition - return the line position that worker should process. + * AdjustFieldInfo - gets a new block, updates the + * current offset, calculates the skip bytes. + * Works in two modes, 1 for field count + * 2 for field size + */ +static void +AdjustFieldInfo(CopyState cstate, uint8 mode) +{ + ParallelCopyDataBlock *data_block = NULL; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint8 movebytes = 0; + uint32 block_pos; + uint32 prev_block_pos; + int readbytes = 0; + + prev_block_pos = pcshared_info->cur_block_pos; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index; + + cstate->pcdata->curr_data_block->skip_bytes = movebytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (movebytes > 0) + memmove(&data_block->data[0], &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], + movebytes); + + elog(DEBUG1, "LEADER - field info is spread across data blocks - moved %d bytes from current block %u to %u block", + movebytes, prev_block_pos, block_pos); + + readbytes = CopyGetData(cstate, &data_block->data[movebytes], 1, (DATA_BLOCK_SIZE - movebytes)); + + elog(DEBUG1, "LEADER - bytes read from file after field info is moved to next data block %d", readbytes); + + if (cstate->reached_eof) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + if (mode == 1) + { + cstate->pcdata->curr_data_block = data_block; + cstate->raw_buf_index = 0; + } + else if(mode == 2) + { + ParallelCopyDataBlock *prev_data_block = NULL; + prev_data_block = cstate->pcdata->curr_data_block; + prev_data_block->following_block = block_pos; + cstate->pcdata->curr_data_block = data_block; + + if (prev_data_block->curr_blk_completed == false) + prev_data_block->curr_blk_completed = true; + + cstate->raw_buf_index = 0; + } +} + +/* + * CopyReadBinaryTupleLeader - leader reads data from binary formatted file + * to data blocks and identies tuple boundaries/offsets so that workers + * can work on the data blocks data. + */ +static pg_attribute_always_inline bool +CopyReadBinaryTupleLeader(CopyState cstate) +{ + int readbytes = 0; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint32 block_pos; + int16 fld_count; + ListCell *cur; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + TupleDesc tupDesc = RelationGetDescr(cstate->rel); + uint32 new_block_pos; + uint32 line_size = -1; + ParallelCopyTupleInfo curr_tuple_start_info; + ParallelCopyTupleInfo curr_tuple_end_info; + + curr_tuple_start_info.block_id = -1; + curr_tuple_start_info.offset = -1; + curr_tuple_end_info.block_id = -1; + curr_tuple_end_info.offset = -1; + + if (cstate->pcdata->curr_data_block == NULL) + { + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[block_pos]; + + cstate->raw_buf_index = 0; + + readbytes = CopyGetData(cstate, &cstate->pcdata->curr_data_block->data, 1, DATA_BLOCK_SIZE); + + elog(DEBUG1, "LEADER - bytes read from file %d", readbytes); + + if (cstate->reached_eof) + return true; + } + + if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1)) + AdjustFieldInfo(cstate, 1); + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + + fld_count = (int16) pg_ntoh16(fld_count); + + CHECK_FIELD_COUNT; + + cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count); + new_block_pos = pcshared_info->cur_block_pos; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + CopyReadBinaryAttributeLeader(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &new_block_pos, + m, + &curr_tuple_start_info, + &curr_tuple_end_info, + &line_size); + } + + if (line_size > 0) + { + int line_pos = UpdateBlockInLineInfo(cstate, + curr_tuple_start_info.block_id, + curr_tuple_start_info.offset, + line_size, + LINE_LEADER_POPULATED); + + pcshared_info->populated++; + + elog(DEBUG1, "LEADER - adding - block:%u, offset:%u, line size:%u line position:%d", + curr_tuple_start_info.block_id, + curr_tuple_start_info.offset, + line_size, line_pos); + } + + return false; +} + +/* + * CopyReadBinaryAttributeLeader - leader identifies boundaries/offsets + * for each attribute/column, it moves on to next data block if the + * attribute/column is spread across data blocks. + */ +static pg_attribute_always_inline void +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, uint32 *new_block_pos, + int m, ParallelCopyTupleInfo *tuple_start_info_ptr, + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size) +{ + int32 fld_size; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + + if (m == 0) + { + tuple_start_info_ptr->block_id = pcshared_info->cur_block_pos; + /* raw_buf_index would have moved away size of field count bytes + in the caller, so move back to store the tuple start offset. + */ + tuple_start_info_ptr->offset = cstate->raw_buf_index - sizeof(int16); + } + + if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1)) + { + AdjustFieldInfo(cstate, 2); + *new_block_pos = pcshared_info->cur_block_pos; + } + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + + cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_size); + + fld_size = (int32) pg_ntoh32(fld_size); + + if (fld_size == 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + if (fld_size < -1) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid field size"))); + + if ((DATA_BLOCK_SIZE-cstate->raw_buf_index) >= fld_size) + { + cstate->raw_buf_index = cstate->raw_buf_index + fld_size; + } + else + { + ParallelCopyDataBlock *data_block = NULL; + ParallelCopyDataBlock *prev_data_block = NULL; + uint32 block_pos; + int readbytes; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + data_block = &pcshared_info->data_blocks[block_pos]; + + readbytes = CopyGetData(cstate, &data_block->data[0], 1, DATA_BLOCK_SIZE); + + elog(DEBUG1, "LEADER - bytes read from file after detecting that tuple is spread across data blocks %d", readbytes); + + if (cstate->reached_eof) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + prev_data_block = cstate->pcdata->curr_data_block; + prev_data_block->following_block = block_pos; + + if (prev_data_block->curr_blk_completed == false) + prev_data_block->curr_blk_completed = true; + + cstate->pcdata->curr_data_block = data_block; + cstate->raw_buf_index = fld_size - (DATA_BLOCK_SIZE - cstate->raw_buf_index); + *new_block_pos = block_pos; + } + + if (m == cstate->max_fields - 1) + { + tuple_end_info_ptr->block_id = *new_block_pos; + tuple_end_info_ptr->offset = cstate->raw_buf_index - 1; + + if (tuple_start_info_ptr->block_id == tuple_end_info_ptr->block_id) + { + elog(DEBUG1,"LEADER - tuple lies in a single data block"); + + *line_size = tuple_end_info_ptr->offset - tuple_start_info_ptr->offset + 1; + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[tuple_start_info_ptr->block_id].unprocessed_line_parts, 1); + } + else + { + uint32 following_block_id = pcshared_info->data_blocks[tuple_start_info_ptr->block_id].following_block; + + elog(DEBUG1,"LEADER - tuple is spread across data blocks"); + + *line_size = DATA_BLOCK_SIZE - tuple_start_info_ptr->offset - + pcshared_info->data_blocks[tuple_start_info_ptr->block_id].skip_bytes; + + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[tuple_start_info_ptr->block_id].unprocessed_line_parts, 1); + + while (following_block_id != tuple_end_info_ptr->block_id) + { + *line_size = *line_size + DATA_BLOCK_SIZE - pcshared_info->data_blocks[following_block_id].skip_bytes; + + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts, 1); + + following_block_id = pcshared_info->data_blocks[following_block_id].following_block; + + if (following_block_id == -1) + break; + } + + if (following_block_id != -1) + pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts, 1); + + *line_size = *line_size + tuple_end_info_ptr->offset + 1; + } + } +} + +/* + * CopyReadBinaryTupleWorker - each worker reads data from data blocks after + * getting leader-identified tuple offsets from ring data structure. + */ +static pg_attribute_always_inline bool +CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls) +{ + uint32 line_pos; + ParallelCopyLineBoundary *line_info; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + int16 fld_count; + ListCell *cur; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + TupleDesc tupDesc = RelationGetDescr(cstate->rel); + + line_pos = GetLinePosition(cstate); + + if (line_pos == -1) + return true; + + line_info = &pcshared_info->line_boundaries.ring[line_pos]; + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[line_info->first_block]; + cstate->raw_buf_index = line_info->start_offset; + + if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1)) + /*the case where field count spread across datablocks should never occur. + as the leader would have moved it to next block*/ + elog(DEBUG1, "WORKER - field count spread across datablocks should never occur"); + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + + CHECK_FIELD_COUNT; + + cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + cstate->cur_attname = NameStr(att->attname); + + values[m] = CopyReadBinaryAttributeWorker(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + pg_atomic_sub_fetch_u32(&cstate->pcdata->curr_data_block->unprocessed_line_parts, 1); + line_info->start_offset = -1; + pg_atomic_write_u32(&line_info->line_state, LINE_WORKER_PROCESSED); + pg_atomic_write_u32(&line_info->line_size, -1); + pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); + + return false; +} + +/* + * CopyReadBinaryAttributeWorker - leader identifies boundaries/offsets + * for each attribute/column, it moves on to next data block if the + * attribute/column is spread across data blocks. + */ +static pg_attribute_always_inline Datum +CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull) +{ + int32 fld_size; + Datum result; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + + if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1)) + { + ParallelCopyDataBlock *prev_data_block = cstate->pcdata->curr_data_block; + + elog(DEBUG1, "WORKER - field size is spread across data blocks"); + + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[cstate->pcdata->curr_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + cstate->raw_buf_index = 0; + } + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + fld_size = (int32) pg_ntoh32(fld_size); + + if (fld_size == 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + if (fld_size < -1) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid field size"))); + + cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_size); + + /* reset attribute_buf to empty, and load raw data in it */ + resetStringInfo(&cstate->attribute_buf); + + enlargeStringInfo(&cstate->attribute_buf, fld_size); + + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) + { + elog(DEBUG1, "WORKER - tuple lies in single data block"); + + memcpy(&cstate->attribute_buf.data[0],&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], fld_size); + cstate->raw_buf_index = cstate->raw_buf_index + fld_size; + } + else + { + ParallelCopyDataBlock *prev_data_block = cstate->pcdata->curr_data_block; + + elog(DEBUG1, "WORKER - tuple is spread across data blocks"); + + memcpy(&cstate->attribute_buf.data[0], &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], + (DATA_BLOCK_SIZE - cstate->raw_buf_index)); + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[cstate->pcdata->curr_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + memcpy(&cstate->attribute_buf.data[DATA_BLOCK_SIZE - cstate->raw_buf_index], + &cstate->pcdata->curr_data_block->data[0], (fld_size - (DATA_BLOCK_SIZE - cstate->raw_buf_index))); + cstate->raw_buf_index = fld_size - (DATA_BLOCK_SIZE - cstate->raw_buf_index); + } + + cstate->attribute_buf.len = fld_size; + cstate->attribute_buf.data[fld_size] = '\0'; + + /* Call the column type's binary input converter */ + result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, + typioparam, typmod); + + /* Trouble if it didn't eat the whole buffer */ + if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format"))); + + *isnull = false; + return result; +} + +/* + * GetLinePosition - return the line position that worker should pcdata->process. */ static uint32 GetLinePosition(CopyState cstate) @@ -1797,6 +2311,7 @@ GetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) { dataBlkPtr->curr_blk_completed = false; dataBlkPtr->skip_bytes = 0; + dataBlkPtr->following_block = -1; pcshared_info->cur_block_pos = block_pos; return block_pos; } @@ -5449,60 +5964,47 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, else { /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) + if (!IsParallelCopy()) { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } + int16 fld_count; + ListCell *cur; - if (fld_count == -1) - { - /* - * Received EOF marker. In a V3-protocol copy, wait for the - * protocol-level EOF, and complain if it doesn't come - * immediately. This ensures that we correctly handle CopyFail, - * if client chooses to send that now. - * - * Note that we MUST NOT try to read more data in an old-protocol - * copy, since there is no protocol-level EOF marker then. We - * could go either way for copy from file, but choose to throw - * error if there's data after the EOF marker, for consistency - * with the new-protocol case. - */ - char dummy; + cstate->cur_lineno++; + cstate->max_fields = list_length(cstate->attnumlist); - if (cstate->copy_dest != COPY_OLD_FE && - CopyGetData(cstate, &dummy, 1, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + CHECK_FIELD_COUNT; - foreach(cur, cstate->attnumlist) + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + } + else { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); + bool eof = false; + cstate->cur_lineno++; + cstate->max_fields = list_length(cstate->attnumlist); - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; + eof = CopyReadBinaryTupleWorker(cstate, values, nulls); + + if (eof) + return false; } } -- 2.25.1