From fd470b5454555af0f633371f3e7ab99104e36f2c Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Fri, 9 Oct 2020 12:58:20 +0530 Subject: [PATCH v6 6/6] Parallel Copy For Binary Format Files Leader reads data from the file into the DSM data blocks each of 64K size. It also identifies each tuple data block id, start offset, end offset, tuple size and updates this information in the ring data structure. Workers parallelly read the tuple information from the ring data structure, the actual tuple data from the data blocks and parallelly insert the tuples into the table. --- src/backend/commands/copy.c | 134 +++++---- src/backend/commands/copyparallel.c | 426 ++++++++++++++++++++++++++-- src/include/commands/copy.h | 126 ++++++++ 3 files changed, 599 insertions(+), 87 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 5bed12896f..69119d8513 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -223,19 +223,14 @@ static void CopySendData(CopyState cstate, const void *databuf, int datasize); static void CopySendString(CopyState cstate, const char *str); static void CopySendChar(CopyState cstate, char c); static void CopySendEndOfRow(CopyState cstate); -static int CopyGetData(CopyState cstate, void *databuf, - int minread, int maxread); static void CopySendInt32(CopyState cstate, int32 val); static bool CopyGetInt32(CopyState cstate, int32 *val); static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); static bool CopyLoadRawBuf(CopyState cstate); -static int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); - static void ClearEOLFromCopiedData(CopyState cstate, char *copy_line_data, int copy_line_pos, int *copy_line_size); - /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. @@ -449,7 +444,7 @@ CopySendEndOfRow(CopyState cstate) * * NB: no data conversion is applied here. */ -static int +int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) { int bytesread = 0; @@ -582,10 +577,25 @@ CopyGetInt32(CopyState cstate, int32 *val) { uint32 buf; - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + /* + * For parallel copy, avoid reading data to raw buf, read directly from + * file, later the data will be read to parallel copy data buffers. + */ + if (cstate->nworkers > 0) { - *val = 0; /* suppress compiler warning */ - return false; + if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } + } + else + { + if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } } *val = (int32) pg_ntoh32(buf); return true; @@ -661,7 +671,7 @@ CopyLoadRawBuf(CopyState cstate) * and writes them to 'dest'. Returns the number of bytes read (which * would be less than 'nbytes' only if we reach EOF). */ -static int +int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes) { int copied_bytes = 0; @@ -986,7 +996,15 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, EndParallelCopy(pcxt); } else + { + /* + * Reset nworkers to -1 here. This is useful in cases where user + * specifies parallel workers, but, no worker is picked up, so go + * back to non parallel mode value of nworkers. + */ + cstate->nworkers = -1; *processed = CopyFrom(cstate); /* copy from file to database */ + } EndCopyFrom(cstate); } @@ -3556,7 +3574,7 @@ BeginCopyFrom(ParseState *pstate, int32 tmp; /* Signature */ - if (CopyReadBinaryData(cstate, readSig, 11) != 11 || + if (CopyGetData(cstate, readSig, 11, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), @@ -3584,7 +3602,7 @@ BeginCopyFrom(ParseState *pstate, /* Skip extension header, if present */ while (tmp-- > 0) { - if (CopyReadBinaryData(cstate, readSig, 1) != 1) + if (CopyGetData(cstate, readSig, 1, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); @@ -3781,60 +3799,45 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, else { /* binary */ - int16 fld_count; - ListCell *cur; - cstate->cur_lineno++; + cstate->max_fields = list_length(cstate->attnumlist); - if (!CopyGetInt16(cstate, &fld_count)) + if (!IsParallelCopy()) { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } + int16 fld_count; + ListCell *cur; - if (fld_count == -1) - { - /* - * Received EOF marker. In a V3-protocol copy, wait for the - * protocol-level EOF, and complain if it doesn't come - * immediately. This ensures that we correctly handle CopyFail, - * if client chooses to send that now. - * - * Note that we MUST NOT try to read more data in an old-protocol - * copy, since there is no protocol-level EOF marker then. We - * could go either way for copy from file, but choose to throw - * error if there's data after the EOF marker, for consistency - * with the new-protocol case. - */ - char dummy; - - if (cstate->copy_dest != COPY_OLD_FE && - CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + CHECK_FIELD_COUNT; - foreach(cur, cstate->attnumlist) + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + } + else { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); + bool eof = false; - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; + eof = CopyReadBinaryTupleWorker(cstate, values, nulls); + + if (eof) + return false; } } @@ -4846,18 +4849,15 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, Datum result; if (!CopyGetInt32(cstate, &fld_size)) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); + EOF_ERROR; + if (fld_size == -1) { *isnull = true; return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); } - if (fld_size < 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid field size"))); + + CHECK_FIELD_SIZE(fld_size); /* reset attribute_buf to empty, and load raw data in it */ resetStringInfo(&cstate->attribute_buf); @@ -4865,9 +4865,7 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, enlargeStringInfo(&cstate->attribute_buf, fld_size); if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, fld_size) != fld_size) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); + EOF_ERROR; cstate->attribute_buf.len = fld_size; cstate->attribute_buf.data[fld_size] = '\0'; diff --git a/src/backend/commands/copyparallel.c b/src/backend/commands/copyparallel.c index 6a44a01e47..ccfe38363c 100644 --- a/src/backend/commands/copyparallel.c +++ b/src/backend/commands/copyparallel.c @@ -94,6 +94,7 @@ SerializeParallelCopyState(ParallelContext *pcxt, CopyState cstate, shared_cstate.convert_selectively = cstate->convert_selectively; shared_cstate.num_defaults = cstate->num_defaults; shared_cstate.relid = cstate->pcdata->relid; + shared_cstate.binary = cstate->binary; memcpy(shmptr, (char *) &shared_cstate, sizeof(SerializedParallelCopyState)); copiedsize = sizeof(SerializedParallelCopyState); @@ -191,6 +192,7 @@ RestoreParallelCopyState(shm_toc *toc, CopyState cstate, List **attlist) cstate->convert_selectively = shared_cstate.convert_selectively; cstate->num_defaults = shared_cstate.num_defaults; cstate->pcdata->relid = shared_cstate.relid; + cstate->binary = shared_cstate.binary; cstate->null_print = CopyStringFromSharedMemory(shared_str_val + copiedsize, &copiedsize); @@ -380,7 +382,7 @@ static pg_attribute_always_inline bool IsParallelCopyAllowed(CopyState cstate) { /* Parallel copy not allowed for frontend (2.0 protocol) & binary option. */ - if ((cstate->copy_dest == COPY_OLD_FE) || cstate->binary) + if (cstate->copy_dest == COPY_OLD_FE) return false; /* @@ -976,39 +978,425 @@ ParallelCopyFrom(CopyState cstate) /* Execute the before statement triggers from the leader */ ExecBeforeStmtTrigger(cstate); - /* On input just throw the header line away. */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (!cstate->binary) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) { - pcshared_info->is_read_in_progress = false; - return; /* done */ + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } } - } - for (;;) - { - bool done; + for (;;) + { + bool done; - cstate->cur_lineno++; + cstate->cur_lineno++; - /* Actually read the line into memory here. */ - done = CopyReadLine(cstate); + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + /* + * EOF at start of line means we're done. If we see EOF after + * some characters, we act as though it was newline followed by + * EOF, ie, process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + } + else + { /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. + * Binary Format Files. For parallel copy leader, fill in the error + * context information here, in case any failures while determining + * tuple offsets, leader would throw the errors with proper context. */ - if (done && cstate->line_buf.len == 0) - break; + ErrorContextCallback errcallback; + + errcallback.callback = CopyFromErrorCallback; + errcallback.arg = (void *) cstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + cstate->pcdata->curr_data_block = NULL; + cstate->raw_buf_index = 0; + pcshared_info->populated = 0; + cstate->cur_lineno = 0; + cstate->max_fields = list_length(cstate->attnumlist); + + for (;;) + { + bool eof = false; + + cstate->cur_lineno++; + + eof = CopyReadBinaryTupleLeader(cstate); + + if (eof) + break; + } + + /* Done, clean up */ + error_context_stack = errcallback.previous; } pcshared_info->is_read_in_progress = false; cstate->cur_lineno = 0; } +/* + * CopyReadBinaryGetDataBlock + * + * Gets a new block, updates the current offset, calculates the skip bytes. + */ +void +CopyReadBinaryGetDataBlock(CopyState cstate, FieldInfoType field_info) +{ + ParallelCopyDataBlock *data_block = NULL; + ParallelCopyDataBlock *curr_data_block = cstate->pcdata->curr_data_block; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint8 move_bytes = 0; + uint32 block_pos; + uint32 prev_block_pos; + int read_bytes = 0; + + prev_block_pos = pcshared_info->cur_block_pos; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + if (field_info == FIELD_SIZE || field_info == FIELD_COUNT) + move_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + + if (curr_data_block != NULL) + curr_data_block->skip_bytes = move_bytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (move_bytes > 0 && curr_data_block != NULL) + memmove(&data_block->data[0], &curr_data_block->data[cstate->raw_buf_index], move_bytes); + + elog(DEBUG1, "LEADER - field info %d is spread across data blocks - moved %d bytes from current block %u to %u block", + field_info, move_bytes, prev_block_pos, block_pos); + + read_bytes = CopyGetData(cstate, &data_block->data[move_bytes], 1, (DATA_BLOCK_SIZE - move_bytes)); + + if (field_info == FIELD_NONE && cstate->reached_eof) + return; + + if (cstate->reached_eof) + EOF_ERROR; + + elog(DEBUG1, "LEADER - bytes read from file %d", read_bytes); + + if (field_info == FIELD_SIZE || field_info == FIELD_DATA) + { + ParallelCopyDataBlock *prev_data_block = NULL; + + prev_data_block = curr_data_block; + prev_data_block->following_block = block_pos; + + if (prev_data_block->curr_blk_completed == false) + prev_data_block->curr_blk_completed = true; + + pg_atomic_add_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + } + + cstate->pcdata->curr_data_block = data_block; + cstate->raw_buf_index = 0; +} + +/* + * CopyReadBinaryTupleLeader + * + * Leader reads data from binary formatted file to data blocks and identifies + * tuple boundaries/offsets so that workers can work on the data blocks data. + */ +bool +CopyReadBinaryTupleLeader(CopyState cstate) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + int16 fld_count; + uint32 line_size = 0; + uint32 start_block_pos; + uint32 start_offset; + + if (cstate->pcdata->curr_data_block == NULL) + { + CopyReadBinaryGetDataBlock(cstate, FIELD_NONE); + + /* + * no data is read from file here. one possibility to be here could be + * that the binary file just has a valid signature but nothing else. + */ + if (cstate->reached_eof) + return true; + } + + if ((cstate->raw_buf_index + sizeof(fld_count)) >= DATA_BLOCK_SIZE) + CopyReadBinaryGetDataBlock(cstate, FIELD_COUNT); + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + CHECK_FIELD_COUNT; + start_offset = cstate->raw_buf_index; + cstate->raw_buf_index += sizeof(fld_count); + line_size += sizeof(fld_count); + start_block_pos = pcshared_info->cur_block_pos; + + CopyReadBinaryFindTupleSize(cstate, &line_size); + + pg_atomic_add_fetch_u32(&cstate->pcdata->curr_data_block->unprocessed_line_parts, 1); + + if (line_size > 0) + (void) UpdateSharedLineInfo(cstate, start_block_pos, start_offset, + line_size, LINE_LEADER_POPULATED, -1); + + return false; +} + +/* + * CopyReadBinaryFindTupleSize + * + * Leader identifies boundaries/offsets for each attribute/column and finally + * results in the tuple/row size. It moves on to next data block if the + * attribute/column is spread across data blocks. + */ +void +CopyReadBinaryFindTupleSize(CopyState cstate, uint32 *line_size) +{ + int32 fld_size; + ListCell *cur; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + foreach(cur, cstate->attnumlist) + { + int att_num = lfirst_int(cur); + Form_pg_attribute att = TupleDescAttr(tup_desc, (att_num - 1)); + + cstate->cur_attname = NameStr(att->attname); + fld_size = 0; + + if ((cstate->raw_buf_index + sizeof(fld_size)) >= DATA_BLOCK_SIZE) + CopyReadBinaryGetDataBlock(cstate, FIELD_SIZE); + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + cstate->raw_buf_index += sizeof(fld_size); + *line_size += sizeof(fld_size); + fld_size = (int32) pg_ntoh32(fld_size); + + CHECK_FIELD_SIZE(fld_size); + + *line_size += fld_size; + + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) + { + cstate->raw_buf_index += fld_size; + elog(DEBUG1, "LEADER - tuple lies in he same data block"); + } + else + { + int32 required_blks = 0; + int32 curr_blk_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + int i = 0; + + GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes); + + i = required_blks; + + while (i > 0) + { + CopyReadBinaryGetDataBlock(cstate, FIELD_DATA); + i--; + } + + GET_RAW_BUF_INDEX(cstate->raw_buf_index, fld_size, required_blks, curr_blk_bytes); + + /* + * raw_buf_index should never cross data block size, as the + * required number of data blocks would have been obtained in the + * above while loop. + */ + Assert(cstate->raw_buf_index <= DATA_BLOCK_SIZE); + } + cstate->cur_attname = NULL; + } +} + +/* + * CopyReadBinaryTupleWorker + * + * Each worker reads data from data blocks after getting leader-identified tuple + * offsets from ring data structure. + */ +bool +CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls) +{ + uint32 line_pos; + ParallelCopyLineBoundary *line_info; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + int16 fld_count; + ListCell *cur; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + + line_pos = GetLinePosition(cstate); + if (line_pos == -1) + return true; + + line_info = &pcshared_info->line_boundaries.ring[line_pos]; + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[line_info->first_block]; + cstate->raw_buf_index = line_info->start_offset; + + if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE) + { + /* + * The case where field count spread across datablocks should never + * occur, as the leader would have moved it to next block. this code + * exists for debugging purposes only. + */ + elog(DEBUG1, "WORKER - field count spread across datablocks should never occur"); + } + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + + CHECK_FIELD_COUNT; + + cstate->raw_buf_index += sizeof(fld_count); + + foreach(cur, cstate->attnumlist) + { + int att_num = lfirst_int(cur); + int m = att_num - 1; + Form_pg_attribute att = TupleDescAttr(tup_desc, m); + + cstate->cur_attname = NameStr(att->attname); + + values[m] = CopyReadBinaryAttributeWorker(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + pg_atomic_sub_fetch_u32(&cstate->pcdata->curr_data_block->unprocessed_line_parts, 1); + line_info->start_offset = -1; + pg_atomic_write_u32(&line_info->line_state, LINE_WORKER_PROCESSED); + pg_atomic_write_u32(&line_info->line_size, -1); + pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); + + return false; +} + +/* + * CopyReadBinaryAttributeWorker + * + * Leader identifies boundaries/offsets for each attribute/column, it moves on + * to next data block if the attribute/column is spread across data blocks. + */ +Datum +CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull) +{ + int32 fld_size; + Datum result; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + + if ((cstate->raw_buf_index + sizeof(fld_size)) >= DATA_BLOCK_SIZE) + { + ParallelCopyDataBlock *prev_data_block = cstate->pcdata->curr_data_block; + + elog(DEBUG1, "WORKER - field size is spread across data blocks"); + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[prev_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + cstate->raw_buf_index = 0; + } + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + fld_size = (int32) pg_ntoh32(fld_size); + + CHECK_FIELD_SIZE(fld_size); + + cstate->raw_buf_index += sizeof(fld_size); + + /* reset attribute_buf to empty, and load raw data in it */ + resetStringInfo(&cstate->attribute_buf); + + enlargeStringInfo(&cstate->attribute_buf, fld_size); + + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) + { + elog(DEBUG1, "WORKER - tuple lies in single data block"); + memcpy(&cstate->attribute_buf.data[0], &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], fld_size); + cstate->raw_buf_index += fld_size; + } + else + { + uint32 att_buf_idx = 0; + uint32 copy_bytes = 0; + int32 required_blks = 0; + int32 curr_blk_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + ParallelCopyDataBlock *prev_data_block = NULL; + int i = 0; + + GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes); + + i = required_blks; + prev_data_block = cstate->pcdata->curr_data_block; + elog(DEBUG1, "WORKER - tuple is spread across data blocks"); + memcpy(&cstate->attribute_buf.data[0], &prev_data_block->data[cstate->raw_buf_index], + curr_blk_bytes); + copy_bytes = curr_blk_bytes; + att_buf_idx = curr_blk_bytes; + + while (i > 0) + { + cstate->pcdata->curr_data_block = &pcshared_info->data_blocks[prev_data_block->following_block]; + pg_atomic_sub_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + cstate->raw_buf_index = 0; + copy_bytes = fld_size - att_buf_idx; + + /* + * The bytes that are yet to be taken into att buff are more than + * the entire data block size, but only take the data block size + * elements. + */ + if (copy_bytes >= DATA_BLOCK_SIZE) + copy_bytes = DATA_BLOCK_SIZE; + + memcpy(&cstate->attribute_buf.data[att_buf_idx], + &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], copy_bytes); + att_buf_idx += copy_bytes; + prev_data_block = cstate->pcdata->curr_data_block; + i--; + } + GET_RAW_BUF_INDEX(cstate->raw_buf_index, fld_size, required_blks, curr_blk_bytes); + } + + cstate->attribute_buf.len = fld_size; + cstate->attribute_buf.data[fld_size] = '\0'; + + /* Call the column type's binary input converter */ + result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, + typioparam, typmod); + + /* Trouble if it didn't eat the whole buffer */ + if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format"))); + + *isnull = false; + return result; +} + /* * GetLinePosition - Return the line position once the leader has populated the * data. diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index a9fe950e75..746c139e94 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -76,6 +76,109 @@ if (!IsParallelCopy()) \ else \ return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); +/* + * CHECK_FIELD_COUNT - Handles the error cases for field count + * for binary format files. + */ +#define CHECK_FIELD_COUNT \ +{\ + if (fld_count == -1) \ + { \ + if (IsParallelCopy() && \ + !IsLeader()) \ + return true; \ + else if (IsParallelCopy() && \ + IsLeader()) \ + { \ + if (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index + sizeof(fld_count)] != 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + return true; \ + } \ + else \ + { \ + /* \ + * Received EOF marker. In a V3-protocol copy, wait for the \ + * protocol-level EOF, and complain if it doesn't come \ + * immediately. This ensures that we correctly handle CopyFail, \ + * if client chooses to send that now. \ + * \ + * Note that we MUST NOT try to read more data in an old-protocol \ + * copy, since there is no protocol-level EOF marker then. We \ + * could go either way for copy from file, but choose to throw \ + * error if there's data after the EOF marker, for consistency \ + * with the new-protocol case. \ + */ \ + char dummy; \ + if (cstate->copy_dest != COPY_OLD_FE && \ + CopyReadBinaryData(cstate, &dummy, 1) > 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + return false; \ + } \ + } \ + if (fld_count != cstate->max_fields) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("row field count is %d, expected %d", \ + (int) fld_count, cstate->max_fields))); \ +} + +/* + * CHECK_FIELD_SIZE - Handles the error case for field size + * for binary format files. + */ +#define CHECK_FIELD_SIZE(fld_size) \ +{ \ + if (fld_size < -1) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("invalid field size")));\ +} + +/* + * EOF_ERROR - Error statement for EOF for binary format + * files. + */ +#define EOF_ERROR \ +{ \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("unexpected EOF in COPY data")));\ +} + +/* + * GET_RAW_BUF_INDEX - Calculates the raw buf index for the cases + * where the data spread is across multiple data blocks. + */ +#define GET_RAW_BUF_INDEX(raw_buf_index, fld_size, required_blks, curr_blk_bytes) \ +{ \ + raw_buf_index = fld_size - (((required_blks - 1) * DATA_BLOCK_SIZE) + curr_blk_bytes); \ +} + +/* + * GET_REQUIRED_BLOCKS - Calculates the number of data + * blocks required for the cases where the data spread + * is across multiple data blocks. + */ +#define GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes) \ +{ \ + /* \ + * field size can spread across multiple data blocks, \ + * calculate the number of required data blocks and try to get \ + * those many data blocks. \ + */ \ + required_blks = (int32)(fld_size - curr_blk_bytes)/(int32)DATA_BLOCK_SIZE; \ + /* \ + * check if we need the data block for the field data \ + * bytes that are not modulus of data block size. \ + */ \ + if ((fld_size - curr_blk_bytes)%DATA_BLOCK_SIZE != 0) \ + required_blks++; \ +} + /* * Represents the different source/dest cases we need to worry about at * the bottom level @@ -253,6 +356,17 @@ typedef struct ParallelCopyLineBuf uint64 cur_lineno; /* line number for error messages */ } ParallelCopyLineBuf; +/* + * Represents the usage mode for CopyReadBinaryGetDataBlock. + */ +typedef enum FieldInfoType +{ + FIELD_NONE = 0, + FIELD_COUNT, + FIELD_SIZE, + FIELD_DATA +} FieldInfoType; + /* * This structure helps in storing the common data from CopyStateData that are * required by the workers. This information will then be allocated and stored @@ -276,6 +390,7 @@ typedef struct SerializedParallelCopyState /* Working state for COPY FROM */ AttrNumber num_defaults; Oid relid; + bool binary; } SerializedParallelCopyState; /* @@ -302,6 +417,9 @@ typedef struct ParallelCopyData /* Current position in worker_line_buf */ uint32 worker_line_buf_pos; + + /* For binary formatted files */ + ParallelCopyDataBlock *curr_data_block; } ParallelCopyData; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); @@ -487,4 +605,12 @@ extern uint32 UpdateSharedLineInfo(CopyState cstate, uint32 blk_pos, uint32 offs uint32 line_size, uint32 line_state, uint32 blk_line_pos); extern void EndLineParallelCopy(CopyState cstate, uint32 line_pos, uint32 line_size, uint32 raw_buf_ptr); +extern int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread); +extern int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); +extern bool CopyReadBinaryTupleLeader(CopyState cstate); +extern bool CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls); +extern void CopyReadBinaryFindTupleSize(CopyState cstate, uint32 *line_size); +extern Datum CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull); +extern void CopyReadBinaryGetDataBlock(CopyState cstate, FieldInfoType field_info); #endif /* COPY_H */ -- 2.25.1