From 47d27d300dea6adf9af6d688044e04770d1d8650 Mon Sep 17 00:00:00 2001 From: Vignesh C , Bharath Date: Mon, 6 Jul 2020 14:33:12 +0530 Subject: [PATCH 3/6] Allow copy from command to process data from file/STDIN contents to a table in parallel. This feature allows the copy from to leverage multiple CPUs in order to copy data from file/STDIN to a table. This adds a PARALLEL option to COPY FROM command where the user can specify the number of workers that can be used to perform the COPY FROM command. Specifying zero as number of workers will disable parallelism. The backend, to which the "COPY FROM" query is submitted acts as leader with the responsibility of reading data from the file/stdin, launching at most n number of workers as specified with PARALLEL 'n' option in the "COPY FROM" query. The leader populates the common data required for the workers execution in the DSM and shares it with the workers. The leader then executes before statement triggers if there exists any. Leader populates DSM lines which includes the start offset and line size, while populating the lines it reads as many blocks as required into the DSM data blocks from the file. Each block is of 64K size. The leader parses the data to identify a line, the existing logic from CopyReadLineText which identifies the lines with some changes was used for this. Leader checks if a free line is available to copy the information, if there is no free line it waits till the required line is freed up by the worker and then copies the identified lines information (offset & line size) into the DSM lines. This process is repeated till the complete file is processed. Simultaneously, the workers cache the lines(50) locally into the local memory and release the lines to the leader for further populating. Each worker processes the lines that it cached and inserts it into the table. The leader does not participate in the insertion of data, leaders only responsibility will be to identify the lines as fast as possible for the workers to do the actual copy operation. The leader waits till all the lines populated are processed by the workers and exits. --- src/backend/access/common/toast_internals.c | 11 +- src/backend/access/heap/heapam.c | 13 - src/backend/access/transam/xact.c | 31 + src/backend/commands/copy.c | 900 ++++++++++++++++++++++++++-- src/include/access/xact.h | 2 + src/tools/pgindent/typedefs.list | 1 + 6 files changed, 906 insertions(+), 52 deletions(-) diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c index 25a81e5..586d53d 100644 --- a/src/backend/access/common/toast_internals.c +++ b/src/backend/access/common/toast_internals.c @@ -17,6 +17,7 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/heaptoast.h" +#include "access/parallel.h" #include "access/table.h" #include "access/toast_internals.h" #include "access/xact.h" @@ -116,7 +117,15 @@ toast_save_datum(Relation rel, Datum value, TupleDesc toasttupDesc; Datum t_values[3]; bool t_isnull[3]; - CommandId mycid = GetCurrentCommandId(true); + + /* + * Parallel copy can insert toast tuples, in case of parallel copy the + * command would have been set already by calling AssignCommandIdForWorker. + * For parallel copy call GetCurrentCommandId to get currentCommandId by + * passing used as false, as this is taken care earlier. + */ + CommandId mycid = IsParallelWorker() ? GetCurrentCommandId(false) : + GetCurrentCommandId(true); struct varlena *result; struct varatt_external toast_pointer; union diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 537913d..28f3a98 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2012,19 +2012,6 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options) { - /* - * Parallel operations are required to be strictly read-only in a parallel - * worker. Parallel inserts are not safe even in the leader in the - * general case, because group locking means that heavyweight locks for - * relation extension or GIN page locks will not conflict between members - * of a lock group, but we don't prohibit that case here because there are - * useful special cases that we can safely allow, such as CREATE TABLE AS. - */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot insert tuples in a parallel worker"))); - tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 905dc7d..ed4009e 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -502,6 +502,37 @@ GetCurrentFullTransactionIdIfAny(void) } /* + * AssignFullTransactionIdForWorker + * + * For parallel copy, transaction id of leader will be used by the workers. + */ +void +AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId) +{ + TransactionState s = CurrentTransactionState; + + Assert((IsInParallelMode() || IsParallelWorker())); + s->fullTransactionId = fullTransactionId; +} + +/* + * AssignCommandIdForWorker + * + * For parallel copy, command id of leader will be used by the workers. + */ +void +AssignCommandIdForWorker(CommandId commandId, bool used) +{ + Assert((IsInParallelMode() || IsParallelWorker())); + + /* this is global to a transaction, not subtransaction-local */ + if (used) + currentCommandIdUsed = true; + + currentCommandId = commandId; +} + +/* * MarkCurrentTransactionIdLoggedIfAny * * Remember that the current xid - if it is assigned - now has been wal logged. diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 0a4d997..048f2d2 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -26,6 +26,7 @@ #include "access/xlog.h" #include "catalog/dependency.h" #include "catalog/pg_authid.h" +#include "catalog/pg_proc_d.h" #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" @@ -40,11 +41,13 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "nodes/makefuncs.h" +#include "optimizer/clauses.h" #include "optimizer/optimizer.h" #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_relation.h" +#include "pgstat.h" #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" @@ -95,6 +98,18 @@ typedef enum CopyInsertMethod CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ } CopyInsertMethod; +/* + * State of the line. + */ +typedef enum ParallelCopyLineState +{ + LINE_INIT, /* initial state of line */ + LINE_LEADER_POPULATING, /* leader processing line */ + LINE_LEADER_POPULATED, /* leader completed populating line */ + LINE_WORKER_PROCESSING, /* worker processing line */ + LINE_WORKER_PROCESSED /* worker completed processing line */ +}ParallelCopyLineState; + #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ #define DATA_BLOCK_SIZE RAW_BUF_SIZE @@ -538,9 +553,13 @@ if (1) \ { \ if (raw_buf_ptr > cstate->raw_buf_index) \ { \ - appendBinaryStringInfo(&cstate->line_buf, \ - cstate->raw_buf + cstate->raw_buf_index, \ - raw_buf_ptr - cstate->raw_buf_index); \ + if (!IsParallelCopy()) \ + appendBinaryStringInfo(&cstate->line_buf, \ + cstate->raw_buf + cstate->raw_buf_index, \ + raw_buf_ptr - cstate->raw_buf_index); \ + else \ + line_size += raw_buf_ptr - cstate->raw_buf_index; \ + \ cstate->raw_buf_index = raw_buf_ptr; \ } \ } else ((void) 0) @@ -553,13 +572,40 @@ if (1) \ goto not_end_of_copy; \ } else ((void) 0) +/* Begin parallel copy Macros */ +#define SET_NEWLINE_SIZE() \ +{ \ + if (cstate->eol_type == EOL_NL || cstate->eol_type == EOL_CR) \ + new_line_size = 1; \ + else if (cstate->eol_type == EOL_CRNL) \ + new_line_size = 2; \ + else \ + new_line_size = 0; \ +} + +/* + * COPY_WAIT_TO_PROCESS - Wait before continuing to process. + */ +#define COPY_WAIT_TO_PROCESS() \ +{ \ + CHECK_FOR_INTERRUPTS(); \ + (void) WaitLatch(MyLatch, \ + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, \ + 1L, WAIT_EVENT_PG_SLEEP); \ + ResetLatch(MyLatch); \ +} + +/* End parallel copy Macros */ + /* * CONVERT_TO_SERVER_ENCODING - convert contents to server encoding. */ #define CONVERT_TO_SERVER_ENCODING(cstate) \ { \ /* Done reading the line. Convert it to server encoding. */ \ - if (cstate->need_transcoding) \ + if (cstate->need_transcoding && \ + (!IsParallelCopy() || \ + (IsParallelCopy() && !IsLeader()))) \ { \ char *cvt; \ cvt = pg_any_to_server(cstate->line_buf.data, \ @@ -618,22 +664,38 @@ if (1) \ * CLEAR_EOL_LINE - Wrapper for clearing EOL. */ #define CLEAR_EOL_LINE() \ -if (!result && !IsHeaderLine()) \ - CLEAR_EOL_FROM_COPIED_DATA(cstate->line_buf.data, \ - cstate->line_buf.len, \ - cstate->line_buf.len) \ +{ \ + if (!result && !IsHeaderLine()) \ + { \ + if (IsParallelCopy()) \ + CLEAR_EOL_FROM_COPIED_DATA(cstate->raw_buf, \ + raw_buf_ptr, line_size) \ + else \ + CLEAR_EOL_FROM_COPIED_DATA(cstate->line_buf.data, \ + cstate->line_buf.len, \ + cstate->line_buf.len) \ + } \ +} /* * INCREMENTPROCESSED - Increment the lines processed. */ -#define INCREMENTPROCESSED(processed) \ -processed++; +#define INCREMENTPROCESSED(processed) \ +{ \ + if (!IsParallelCopy()) \ + processed++; \ + else \ + pg_atomic_add_fetch_u64(&cstate->pcdata->pcshared_info->processed, 1); \ +} /* * GETPROCESSED - Get the lines processed. */ #define GETPROCESSED(processed) \ -return processed; +if (!IsParallelCopy()) \ + return processed; \ +else \ + return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; @@ -681,8 +743,12 @@ static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); static pg_attribute_always_inline void EndParallelCopy(ParallelContext *pcxt); +static void ExecBeforeStmtTrigger(CopyState cstate); +static void CheckTargetRelValidity(CopyState cstate); static void PopulateGlobalsForCopyFrom(CopyState cstate, TupleDesc tup_desc, List *attnamelist); +static void PopulateCatalogInformation(CopyState cstate); +static pg_attribute_always_inline uint32 GetLinePosition(CopyState cstate); static pg_attribute_always_inline copy_data_source_cb LookupParallelCopyFnPtr(const char *funcname); static pg_attribute_always_inline char* LookupParallelCopyFnStr(copy_data_source_cb fn_addr); @@ -836,6 +902,137 @@ InsertListShm(ParallelContext *pcxt, int key, List *inputlist, } /* + * IsTriggerFunctionParallelSafe - Check if the trigger function is parallel + * safe for the triggers. Return false if any one of the trigger has parallel + * unsafe function. + */ +static pg_attribute_always_inline bool +IsTriggerFunctionParallelSafe(TriggerDesc *trigdesc) +{ + int i; + for (i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trigger = &trigdesc->triggers[i]; + int trigtype = RI_TRIGGER_NONE; + + if (func_parallel(trigger->tgfoid) != PROPARALLEL_SAFE) + return false; + + /* If the trigger is parallel safe, also look for RI_TRIGGER. */ + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + if (trigtype == RI_TRIGGER_PK || trigtype == RI_TRIGGER_FK) + return false; + } + + return true; +} + +/* + * CheckExprParallelSafety - determine parallel safety of volatile expressions + * in default clause of column definition or in where clause and return true if + * they are parallel safe. + */ +static pg_attribute_always_inline bool +CheckExprParallelSafety(CopyState cstate) +{ + if (contain_volatile_functions(cstate->whereClause)) + { + if (max_parallel_hazard((Query *)cstate->whereClause) != PROPARALLEL_SAFE) + return false; + } + + /* + * Check if any of the column has volatile default expression. if yes, and + * they are not parallel safe, then parallelism is not allowed. For + * instance, if there are any serial/bigserial columns for which nextval() + * default expression which is parallel unsafe is associated, parallelism + * should not be allowed. In non parallel copy volatile functions are not + * checked for nextval(). + */ + if (cstate->defexprs != NULL && cstate->num_defaults != 0) + { + int i; + for (i = 0; i < cstate->num_defaults; i++) + { + bool volatile_expr = contain_volatile_functions((Node *)cstate->defexprs[i]->expr); + if (volatile_expr && + (max_parallel_hazard((Query *)cstate->defexprs[i]->expr)) != + PROPARALLEL_SAFE) + return false; + } + } + + return true; +} + +/* + * FindInsertMethod - determine insert mode single, multi, or multi conditional. + */ +static pg_attribute_always_inline CopyInsertMethod +FindInsertMethod(CopyState cstate) +{ + if (cstate->rel->trigdesc != NULL && + (cstate->rel->trigdesc->trig_insert_before_row || + cstate->rel->trigdesc->trig_insert_instead_row)) + return CIM_SINGLE; + + if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + cstate->rel->trigdesc != NULL && + cstate->rel->trigdesc->trig_insert_new_table) + return CIM_SINGLE; + + if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + return CIM_SINGLE; + + if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return CIM_MULTI_CONDITIONAL; + + return CIM_MULTI; +} + +/* + * IsParallelCopyAllowed - check for the cases where parallel copy is not + * applicable. + */ +static pg_attribute_always_inline bool +IsParallelCopyAllowed(CopyState cstate) +{ + /* Parallel copy not allowed for frontend (2.0 protocol) & binary option. */ + if ((cstate->copy_dest == COPY_OLD_FE) || cstate->binary) + return false; + + /* Check if copy is into foreign table or temporary table. */ + if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE || + RelationUsesLocalBuffers(cstate->rel)) + return false; + + /* Check if trigger function is parallel safe. */ + if (cstate->rel->trigdesc != NULL && + !IsTriggerFunctionParallelSafe(cstate->rel->trigdesc)) + return false; + + /* + * Check if there is after statement or instead of trigger or transition + * table triggers. + */ + if (cstate->rel->trigdesc != NULL && + (cstate->rel->trigdesc->trig_insert_after_statement || + cstate->rel->trigdesc->trig_insert_instead_row || + cstate->rel->trigdesc->trig_insert_new_table)) + return false; + + /* Check if the volatile expressions are parallel safe, if present any. */ + if (!CheckExprParallelSafety(cstate)) + return false; + + /* Check if the insertion mode is single. */ + if (FindInsertMethod(cstate) == CIM_SINGLE) + return false; + + return true; +} + +/* * BeginParallelCopy - start parallel copy tasks. * * Get the number of workers required to perform the parallel copy. The data @@ -865,6 +1062,8 @@ BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) int parallel_workers = 0; ParallelCopyData *pcdata; + CheckTargetRelValidity(cstate); + parallel_workers = Min(nworkers, max_worker_processes); /* Can't perform copy in parallel */ @@ -878,6 +1077,19 @@ BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); cstate->pcdata = pcdata; + /* + * User chosen parallel copy. Determine if the parallel copy is actually + * allowed. If not, go with the non-parallel mode. + */ + if (!IsParallelCopyAllowed(cstate)) + { + elog(WARNING, + "Parallel copy not supported for specified table, copy will be run in non-parallel mode"); + return NULL; + } + + full_transaction_id = GetCurrentFullTransactionId(); + EnterParallelMode(); pcxt = CreateParallelContext("postgres", "ParallelCopyMain", parallel_workers); @@ -1108,7 +1320,211 @@ ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate, cstate->line_buf_converted = false; cstate->raw_buf = NULL; cstate->raw_buf_index = cstate->raw_buf_len = 0; + + PopulateCatalogInformation(cstate); + + /* Create workspace for CopyReadAttributes results. */ + if (!cstate->binary) + { + AttrNumber attr_count = list_length(cstate->attnumlist); + + cstate->max_fields = attr_count; + cstate->raw_fields = (char **)palloc(attr_count * sizeof(char *)); + } +} + +/* + * CacheLineInfo - Cache the line information to local memory. + */ +static bool +CacheLineInfo(CopyState cstate, uint32 buff_count) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyData *pcdata = cstate->pcdata; + uint32 write_pos; + ParallelCopyDataBlock *data_blk_ptr; + ParallelCopyLineBoundary *lineInfo; + uint32 offset; + int dataSize; + int copiedSize = 0; + + resetStringInfo(&pcdata->worker_line_buf[buff_count].line_buf); + write_pos = GetLinePosition(cstate); + if (-1 == write_pos) + return true; + + /* Get the current line information. */ + lineInfo = &pcshared_info->line_boundaries.ring[write_pos]; + if (pg_atomic_read_u32(&lineInfo->line_size) == 0) + goto empty_data_line_update; + + /* Get the block information. */ + data_blk_ptr = &pcshared_info->data_blocks[lineInfo->first_block]; + + /* Get the offset information from where the data must be copied. */ + offset = lineInfo->start_offset; + pcdata->worker_line_buf[buff_count].cur_lineno = lineInfo->cur_lineno; + + elog(DEBUG1, "[Worker] Processing - line position:%d, block:%d, unprocessed lines:%d, offset:%d, line size:%d", + write_pos, lineInfo->first_block, + pg_atomic_read_u32(&data_blk_ptr->unprocessed_line_parts), + offset, pg_atomic_read_u32(&lineInfo->line_size)); + + for (;;) + { + uint8 skip_bytes = data_blk_ptr->skip_bytes; + /* + * There is a possibility that the above loop has come out because + * data_blk_ptr->curr_blk_completed is set, but dataSize read might + * be an old value, if data_blk_ptr->curr_blk_completed and the line is + * completed, line_size will be set. Read the line_size again to be + * sure if it is complete or partial block. + */ + dataSize = pg_atomic_read_u32(&lineInfo->line_size); + if (dataSize) + { + int remainingSize = dataSize - copiedSize; + if (!remainingSize) + break; + + /* Whole line is in current block. */ + if (remainingSize + offset + skip_bytes < DATA_BLOCK_SIZE) + { + appendBinaryStringInfo(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + remainingSize); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, + 1); + break; + } + else + { + /* Line is spread across the blocks. */ + uint32 lineInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset; + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + lineInCurrentBlock); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, 1); + copiedSize += lineInCurrentBlock; + while (copiedSize < dataSize) + { + uint32 currentBlockCopySize; + ParallelCopyDataBlock *currBlkPtr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; + skip_bytes = currBlkPtr->skip_bytes; + + /* + * If complete data is present in current block use + * dataSize - copiedSize, or copy the whole block from + * current block. + */ + currentBlockCopySize = Min(dataSize - copiedSize, DATA_BLOCK_SIZE - skip_bytes); + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &currBlkPtr->data[0], + currentBlockCopySize); + pg_atomic_sub_fetch_u32(&currBlkPtr->unprocessed_line_parts, 1); + copiedSize += currentBlockCopySize; + data_blk_ptr = currBlkPtr; + } + + break; + } + } + else + { + /* Copy this complete block from the current offset. */ + uint32 lineInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset; + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + lineInCurrentBlock); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, 1); + copiedSize += lineInCurrentBlock; + + /* + * Reset the offset. For the first copy, copy from the offset. For + * the subsequent copy the complete block. + */ + offset = 0; + + /* Set data_blk_ptr to the following block. */ + data_blk_ptr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; + } + + for (;;) + { + /* Get the size of this line */ + dataSize = pg_atomic_read_u32(&lineInfo->line_size); + + /* + * If the data is present in current block lineInfo.line_size + * will be updated. If the data is spread across the blocks either + * of lineInfo.line_size or data_blk_ptr->curr_blk_completed can + * be updated. lineInfo.line_size will be updated if the complete + * read is finished. data_blk_ptr->curr_blk_completed will be + * updated if processing of current block is finished and data + * processing is not finished. + */ + if (data_blk_ptr->curr_blk_completed || (dataSize != -1)) + break; + + COPY_WAIT_TO_PROCESS() + } + } + +empty_data_line_update: + elog(DEBUG1, "[Worker] Completed processing line:%d", write_pos); + pg_atomic_write_u32(&lineInfo->line_state, LINE_WORKER_PROCESSED); + pg_atomic_write_u32(&lineInfo->line_size, -1); + pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); + return false; +} + +/* + * GetWorkerLine - Returns a line for worker to process. + */ +static bool +GetWorkerLine(CopyState cstate) +{ + uint32 buff_count; + ParallelCopyData *pcdata = cstate->pcdata; + + /* + * Copy the line data to line_buf and release the line position so that the + * worker can continue loading data. + */ + if (pcdata->worker_line_buf_pos < pcdata->worker_line_buf_count) + goto return_line; + + pcdata->worker_line_buf_pos = 0; + pcdata->worker_line_buf_count = 0; + + for (buff_count = 0; buff_count < WORKER_CHUNK_COUNT; buff_count++) + { + bool result = CacheLineInfo(cstate, buff_count); + if (result) + break; + + pcdata->worker_line_buf_count++; + } + + if (pcdata->worker_line_buf_count) + goto return_line; + else + resetStringInfo(&cstate->line_buf); + + return true; + +return_line: + cstate->line_buf = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].line_buf; + cstate->cur_lineno = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].cur_lineno; + cstate->line_buf_valid = true; + + /* Mark that encoding conversion hasn't occurred yet. */ + cstate->line_buf_converted = false; + CONVERT_TO_SERVER_ENCODING(cstate) + pcdata->worker_line_buf_pos++; + return false; } + /* * ParallelCopyMain - parallel copy worker's code. * @@ -1154,6 +1570,8 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc) ereport(DEBUG1, (errmsg("Starting parallel copy worker"))); pcdata->pcshared_info = pcshared_info; + AssignFullTransactionIdForWorker(pcshared_info->full_transaction_id); + AssignCommandIdForWorker(pcshared_info->mycid, true); shared_cstate = (ParallelCopyCommonKeyData *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_CSTATE, false); @@ -1206,6 +1624,34 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc) MemoryContextSwitchTo(oldcontext); return; } + +/* + * UpdateBlockInLineInfo - Update the line information. + */ +static pg_attribute_always_inline int +UpdateBlockInLineInfo(CopyState cstate, uint32 blk_pos, + uint32 offset, uint32 line_size, uint32 line_state) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyLineBoundaries *lineBoundaryPtr = &pcshared_info->line_boundaries; + ParallelCopyLineBoundary *lineInfo; + int line_pos = lineBoundaryPtr->leader_pos; + + /* Update the line information for the worker to pick and process. */ + lineInfo = &lineBoundaryPtr->ring[line_pos]; + while (pg_atomic_read_u32(&lineInfo->line_size) != -1) + COPY_WAIT_TO_PROCESS() + + lineInfo->first_block = blk_pos; + lineInfo->start_offset = offset; + lineInfo->cur_lineno = cstate->cur_lineno; + pg_atomic_write_u32(&lineInfo->line_size, line_size); + pg_atomic_write_u32(&lineInfo->line_state, line_state); + lineBoundaryPtr->leader_pos = (lineBoundaryPtr->leader_pos + 1) % RINGSIZE; + + return line_pos; +} + /* * ParallelCopyLeader - parallel copy leader's functionality. * @@ -1231,9 +1677,161 @@ ParallelCopyLeader(CopyState cstate) ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; ereport(DEBUG1, (errmsg("Running parallel copy leader"))); + /* Execute the before statement triggers from the leader */ + ExecBeforeStmtTrigger(cstate); + + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } + } + + for (;;) + { + bool done; + cstate->cur_lineno++; + + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + pcshared_info->is_read_in_progress = false; cstate->cur_lineno = 0; } + +/* + * GetLinePosition - return the line position that worker should process. + */ +static uint32 +GetLinePosition(CopyState cstate) +{ + ParallelCopyData *pcdata = cstate->pcdata; + ParallelCopyShmInfo *pcshared_info = pcdata->pcshared_info; + uint32 previous_pos = pcdata->worker_processed_pos; + uint32 write_pos = (previous_pos == -1) ? 0 : (previous_pos + 1) % RINGSIZE; + for (;;) + { + int dataSize; + bool is_read_in_progress = pcshared_info->is_read_in_progress; + ParallelCopyLineBoundary *lineInfo; + ParallelCopyDataBlock *data_blk_ptr; + ParallelCopyLineState line_state = LINE_LEADER_POPULATED; + ParallelCopyLineState curr_line_state; + CHECK_FOR_INTERRUPTS(); + + /* File read completed & no elements to process. */ + if (!is_read_in_progress && + (pcshared_info->populated == + pg_atomic_read_u64(&pcshared_info->total_worker_processed))) + { + write_pos = -1; + break; + } + + /* Get the current line information. */ + lineInfo = &pcshared_info->line_boundaries.ring[write_pos]; + curr_line_state = pg_atomic_read_u32(&lineInfo->line_state); + if ((write_pos % WORKER_CHUNK_COUNT == 0) && + (curr_line_state == LINE_WORKER_PROCESSED || + curr_line_state == LINE_WORKER_PROCESSING)) + { + pcdata->worker_processed_pos = write_pos; + write_pos = (write_pos + WORKER_CHUNK_COUNT) % RINGSIZE; + continue; + } + + /* Get the size of this line. */ + dataSize = pg_atomic_read_u32(&lineInfo->line_size); + + if (dataSize != 0) /* If not an empty line. */ + { + /* Get the block information. */ + data_blk_ptr = &pcshared_info->data_blocks[lineInfo->first_block]; + + if (!data_blk_ptr->curr_blk_completed && (dataSize == -1)) + { + /* Wait till the current line or block is added. */ + COPY_WAIT_TO_PROCESS() + continue; + } + } + + /* Make sure that no worker has consumed this element. */ + if (pg_atomic_compare_exchange_u32(&lineInfo->line_state, + &line_state, LINE_WORKER_PROCESSING)) + break; + } + + pcdata->worker_processed_pos = write_pos; + return write_pos; +} + +/* + * GetFreeCopyBlock - Get a free block for data to be copied. + */ +static pg_attribute_always_inline uint32 +GetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) +{ + int count = 0; + uint32 last_free_block = pcshared_info->cur_block_pos; + uint32 block_pos = (last_free_block != -1) ? ((last_free_block + 1) % MAX_BLOCKS_COUNT): 0; + + /* + * Get a new block for copying data, don't check curent block, current + * block will have some unprocessed data. + */ + while (count < (MAX_BLOCKS_COUNT - 1)) + { + ParallelCopyDataBlock *dataBlkPtr = &pcshared_info->data_blocks[block_pos]; + uint32 unprocessed_line_parts = pg_atomic_read_u32(&dataBlkPtr->unprocessed_line_parts); + if (unprocessed_line_parts == 0) + { + dataBlkPtr->curr_blk_completed = false; + dataBlkPtr->skip_bytes = 0; + pcshared_info->cur_block_pos = block_pos; + return block_pos; + } + + block_pos = (block_pos + 1) % MAX_BLOCKS_COUNT; + count++; + } + + return -1; +} + +/* + * WaitGetFreeCopyBlock - If there are no blocks available, wait and get a block + * for copying data. + */ +static uint32 +WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) +{ + uint32 new_free_pos = -1; + for (;;) + { + new_free_pos = GetFreeCopyBlock(pcshared_info); + if (new_free_pos != -1) /* We have got one block, break now. */ + break; + + COPY_WAIT_TO_PROCESS() + } + + return new_free_pos; +} + /* * LookupParallelCopyFnPtr - Look up parallel copy function pointer. */ @@ -1269,6 +1867,146 @@ LookupParallelCopyFnStr(copy_data_source_cb fn_addr) /* We can only reach this by programming error. */ elog(ERROR, "internal function pointer not found"); } + +/* + * SetRawBufForLoad - Set raw_buf to the shared memory where the file data must + * be read. + */ +static void +SetRawBufForLoad(CopyState cstate, uint32 line_size, uint32 copy_buf_len, + uint32 raw_buf_ptr, char **copy_raw_buf) +{ + ParallelCopyShmInfo *pcshared_info; + uint32 cur_block_pos; + uint32 next_block_pos; + ParallelCopyDataBlock *cur_data_blk_ptr = NULL; + ParallelCopyDataBlock *next_data_blk_ptr = NULL; + + if (!IsParallelCopy()) + return; + + pcshared_info = cstate->pcdata->pcshared_info; + cur_block_pos = pcshared_info->cur_block_pos; + cur_data_blk_ptr = (cstate->raw_buf) ? &pcshared_info->data_blocks[cur_block_pos] : NULL; + next_block_pos = WaitGetFreeCopyBlock(pcshared_info); + next_data_blk_ptr = &pcshared_info->data_blocks[next_block_pos]; + + /* set raw_buf to the data block in shared memory */ + cstate->raw_buf = next_data_blk_ptr->data; + *copy_raw_buf = cstate->raw_buf; + if (cur_data_blk_ptr && line_size) + { + /* + * Mark the previous block as completed, worker can start copying this + * data. + */ + cur_data_blk_ptr->following_block = next_block_pos; + pg_atomic_add_fetch_u32(&cur_data_blk_ptr->unprocessed_line_parts, 1); + cur_data_blk_ptr->skip_bytes = copy_buf_len - raw_buf_ptr; + cur_data_blk_ptr->curr_blk_completed = true; + } +} + +/* + * EndLineParallelCopy - Update the line information in shared memory. + */ +static void +EndLineParallelCopy(CopyState cstate, uint32 line_pos, uint32 line_size, + uint32 raw_buf_ptr) +{ + uint8 new_line_size; + if (!IsParallelCopy()) + return; + + if (!IsHeaderLine()) + { + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyLineBoundaries *lineBoundaryPtr = &pcshared_info->line_boundaries; + SET_NEWLINE_SIZE() + if (line_size) + { + ParallelCopyLineBoundary *lineInfo = &lineBoundaryPtr->ring[line_pos]; + /* + * If the new_line_size > raw_buf_ptr, then the new block has only + * new line char content. The unprocessed count should not be + * increased in this case. + */ + if (raw_buf_ptr > new_line_size) + { + uint32 cur_block_pos = pcshared_info->cur_block_pos; + ParallelCopyDataBlock *curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; + pg_atomic_add_fetch_u32(&curr_data_blk_ptr->unprocessed_line_parts, 1); + } + + /* Update line size. */ + pg_atomic_write_u32(&lineInfo->line_size, line_size); + pg_atomic_write_u32(&lineInfo->line_state, LINE_LEADER_POPULATED); + elog(DEBUG1, "[Leader] After adding - line position:%d, line_size:%d", + line_pos, line_size); + pcshared_info->populated++; + } + else if (new_line_size) + { + /* This means only new line char, empty record should be inserted.*/ + ParallelCopyLineBoundary *lineInfo; + line_pos = UpdateBlockInLineInfo(cstate, -1, -1, 0, + LINE_LEADER_POPULATED); + lineInfo = &lineBoundaryPtr->ring[line_pos]; + elog(DEBUG1, "[Leader] Added empty line with offset:%d, line position:%d, line size:%d", + lineInfo->start_offset, line_pos, + pg_atomic_read_u32(&lineInfo->line_size)); + pcshared_info->populated++; + } + } +} + +/* + * ExecBeforeStmtTrigger - Execute the before statement trigger, this will be + * executed for parallel copy by the leader process. + */ +static void +ExecBeforeStmtTrigger(CopyState cstate) +{ + EState *estate = CreateExecutorState(); + ResultRelInfo *resultRelInfo; + + Assert(IsLeader()); + + /* + * We need a ResultRelInfo so we can use the regular executor's + * index-entry-making machinery. (There used to be a huge amount of code + * here that basically duplicated execUtils.c ...) + */ + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, + cstate->rel, + 1, /* must match rel's position in range_table */ + NULL, + 0); + + /* Verify the named relation is a valid target for INSERT */ + CheckValidResultRel(resultRelInfo, CMD_INSERT); + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + ExecInitRangeTable(estate, cstate->range_table); + + /* + * Check BEFORE STATEMENT insertion triggers. It's debatable whether we + * should do this for COPY, since it's not really an "INSERT" statement as + * such. However, executing these triggers maintains consistency with the + * EACH ROW triggers that we already fire on COPY. + */ + ExecBSInsertTriggers(estate, resultRelInfo); + + /* Close any trigger target relations */ + ExecCleanUpTriggerState(estate); + + FreeExecutorState(estate); +} + /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. @@ -3698,7 +4436,8 @@ CopyFrom(CopyState cstate) PartitionTupleRouting *proute = NULL; ErrorContextCallback errcallback; - CommandId mycid = GetCurrentCommandId(true); + CommandId mycid = IsParallelCopy() ? GetCurrentCommandId(false) : + GetCurrentCommandId(true); int ti_options = 0; /* start with default options for insert */ BulkInsertState bistate = NULL; CopyInsertMethod insertMethod; @@ -3708,7 +4447,14 @@ CopyFrom(CopyState cstate) bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; - CheckTargetRelValidity(cstate); + /* + * Perform this check if it is not parallel copy. In case of parallel + * copy, this check is done by the leader, so that if any invalid case + * exist the copy from command will error out from the leader itself, + * avoiding launching workers, just to throw error. + */ + if (!IsParallelCopy()) + CheckTargetRelValidity(cstate); /* * If the target file is new-in-transaction, we assume that checking FSM @@ -3897,13 +4643,16 @@ CopyFrom(CopyState cstate) has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row); - /* - * Check BEFORE STATEMENT insertion triggers. It's debatable whether we - * should do this for COPY, since it's not really an "INSERT" statement as - * such. However, executing these triggers maintains consistency with the - * EACH ROW triggers that we already fire on COPY. - */ - ExecBSInsertTriggers(estate, resultRelInfo); + if (!IsParallelCopy()) + { + /* + * Check BEFORE STATEMENT insertion triggers. It's debatable whether we + * should do this for COPY, since it's not really an "INSERT" statement as + * such. However, executing these triggers maintains consistency with the + * EACH ROW triggers that we already fire on COPY. + */ + ExecBSInsertTriggers(estate, resultRelInfo); + } econtext = GetPerTupleExprContext(estate); @@ -4003,6 +4752,16 @@ CopyFrom(CopyState cstate) !has_instead_insert_row_trig && resultRelInfo->ri_FdwRoutine == NULL; + /* + * If the table has any partitions that are either foreign or + * has BEFORE/INSTEAD OF triggers, we can't perform copy + * operations with parallel workers. + */ + if (!leafpart_use_multi_insert && IsParallelWorker()) + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform PARALLEL COPY if partition has BEFORE/INSTEAD OF triggers, or if the partition is foreign partition"), + errhint("Try COPY without PARALLEL option"))); + /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert) { @@ -4417,7 +5176,7 @@ BeginCopyFrom(ParseState *pstate, initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf = (IsParallelCopy()) ? NULL : (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; /* Assign range table, we'll need it in CopyFrom. */ @@ -4561,26 +5320,35 @@ NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields) /* only available for text or csv input */ Assert(!cstate->binary); - /* on input just throw the header line away */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (IsParallelCopy()) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) - return false; /* done */ + done = GetWorkerLine(cstate); + if (done && cstate->line_buf.len == 0) + return false; } + else + { + /* on input just throw the header line away */ + if (cstate->cur_lineno == 0 && cstate->header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + return false; /* done */ + } - cstate->cur_lineno++; + cstate->cur_lineno++; - /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + /* Actually read the line into memory here */ + done = CopyReadLine(cstate); - /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. - */ - if (done && cstate->line_buf.len == 0) - return false; + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + return false; + } /* Parse the line into de-escaped field values */ if (cstate->csv_mode) @@ -4828,9 +5596,31 @@ CopyReadLine(CopyState cstate) */ if (cstate->copy_dest == COPY_NEW_FE) { + bool bIsFirst = true; do { - cstate->raw_buf_index = cstate->raw_buf_len; + if (!IsParallelCopy()) + cstate->raw_buf_index = cstate->raw_buf_len; + else + { + if (cstate->raw_buf_index == RAW_BUF_SIZE) + { + /* Get a new block if it is the first time, From the + * subsequent time, reset the index and re-use the same + * block. + */ + if (bIsFirst) + { + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint32 block_pos = WaitGetFreeCopyBlock(pcshared_info); + cstate->raw_buf = pcshared_info->data_blocks[block_pos].data; + bIsFirst = false; + } + + cstate->raw_buf_index = cstate->raw_buf_len = 0; + } + } + } while (CopyLoadRawBuf(cstate)); } } @@ -4860,6 +5650,11 @@ CopyReadLineText(CopyState cstate) char quotec = '\0'; char escapec = '\0'; + /* For parallel copy */ + uint32 line_size = 0; + int line_pos = 0; + + cstate->eol_type = EOL_UNKNOWN; if (cstate->csv_mode) { quotec = cstate->quote[0]; @@ -4914,6 +5709,8 @@ CopyReadLineText(CopyState cstate) if (raw_buf_ptr >= copy_buf_len || need_data) { REFILL_LINEBUF; + SetRawBufForLoad(cstate, line_size, copy_buf_len, raw_buf_ptr, + ©_raw_buf); /* * Try to read some more data. This will certainly reset @@ -5138,9 +5935,15 @@ CopyReadLineText(CopyState cstate) * discard the data and the \. sequence. */ if (prev_raw_ptr > cstate->raw_buf_index) - appendBinaryStringInfo(&cstate->line_buf, + { + if (!IsParallelCopy()) + appendBinaryStringInfo(&cstate->line_buf, cstate->raw_buf + cstate->raw_buf_index, prev_raw_ptr - cstate->raw_buf_index); + else + line_size += prev_raw_ptr - cstate->raw_buf_index; + } + cstate->raw_buf_index = raw_buf_ptr; result = true; /* report EOF */ break; @@ -5192,6 +5995,26 @@ not_end_of_copy: IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1); raw_buf_ptr += mblen - 1; } + + /* + * Skip the header line. Update the line here, this cannot be done at + * the beginning, as there is a possibility that file contains empty + * lines. + */ + if (IsParallelCopy() && first_char_in_line && !IsHeaderLine()) + { + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyLineBoundary *lineInfo; + uint32 line_first_block = pcshared_info->cur_block_pos; + line_pos = UpdateBlockInLineInfo(cstate, + line_first_block, + cstate->raw_buf_index, -1, + LINE_LEADER_POPULATING); + lineInfo = &pcshared_info->line_boundaries.ring[line_pos]; + elog(DEBUG1, "[Leader] Adding - block:%d, offset:%d, line position:%d", + line_first_block, lineInfo->start_offset, line_pos); + } + first_char_in_line = false; } /* end of outer loop */ @@ -5200,6 +6023,7 @@ not_end_of_copy: */ REFILL_LINEBUF; CLEAR_EOL_LINE() + EndLineParallelCopy(cstate, line_pos, line_size, raw_buf_ptr); return result; } diff --git a/src/include/access/xact.h b/src/include/access/xact.h index db19187..20dafb7 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -381,6 +381,8 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); +extern void AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId); +extern void AssignCommandIdForWorker(CommandId commandId, bool used); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 6a42ac4..86a7620 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1701,6 +1701,7 @@ ParallelCompletionPtr ParallelContext ParallelCopyLineBoundaries ParallelCopyLineBoundary +ParallelCopyLineState ParallelCopyCommonKeyData ParallelCopyData ParallelCopyDataBlock -- 1.8.3.1