From 4ff785c888e93a8dd33d4e48cb4f804e204cb739 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 10 Jun 2020 07:18:33 +0530 Subject: [PATCH 3/4] Allow copy from command to process data from file/STDIN contents to a table in parallel. This feature allows the copy from to leverage multiple CPUs in order to copy data from file/STDIN to a table. This adds a PARALLEL option to COPY FROM command where the user can specify the number of workers that can be used to perform the COPY FROM command. Specifying zero as number of workers will disable parallelism. The backend, to which the "COPY FROM" query is submitted acts as leader with the responsibility of reading data from the file/stdin, launching at most n number of workers as specified with PARALLEL 'n' option in the "COPY FROM" query. The leader populates the common data required for the workers execution in the DSM and shares it with the workers. The leader then executes before statement triggers if there exists any. Leader populates DSM chunks which includes the start offset and chunk size, while populating the chunks it reads as many blocks as required into the DSM data blocks from the file. Each block is of 64K size. The leader parses the data to identify a chunk, the existing logic from CopyReadLineText which identifies the chunks with some changes was used for this. Leader checks if a free chunk is available to copy the information, if there is no free chunk it waits till the required chunk is freed up by the worker and then copies the identified chunks information (offset & chunk size) into the DSM chunks. This process is repeated till the complete file is processed. Simultaneously, the workers cache the chunks(50) locally into the local memory and release the chunks to the leader for further populating. Each worker processes the chunk that it cached and inserts it into the table. The leader does not participate in the insertion of data, leaders only responsibility will be to identify the chunks as fast as possible for the workers to do the actual copy operation. The leader waits till all the chunks populated are processed by the workers and exits. --- src/backend/access/heap/heapam.c | 13 - src/backend/access/transam/xact.c | 13 + src/backend/commands/copy.c | 875 +++++++++++++++++++++++++++++++++-- src/backend/optimizer/util/clauses.c | 2 +- src/include/access/xact.h | 1 + src/tools/pgindent/typedefs.list | 1 + 6 files changed, 853 insertions(+), 52 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 94eb37d..6991b9f 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2012,19 +2012,6 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options) { - /* - * Parallel operations are required to be strictly read-only in a parallel - * worker. Parallel inserts are not safe even in the leader in the - * general case, because group locking means that heavyweight locks for - * relation extension or GIN page locks will not conflict between members - * of a lock group, but we don't prohibit that case here because there are - * useful special cases that we can safely allow, such as CREATE TABLE AS. - */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot insert tuples in a parallel worker"))); - tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index cd30b62..d43902c 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -502,6 +502,19 @@ GetCurrentFullTransactionIdIfAny(void) } /* + * AssignFullTransactionIdForWorker + * + * For parallel copy, all the workers must use the same transaction id. + */ +void AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId) +{ + TransactionState s = CurrentTransactionState; + + Assert((IsInParallelMode() || IsParallelWorker())); + s->fullTransactionId = fullTransactionId; +} + +/* * MarkCurrentTransactionIdLoggedIfAny * * Remember that the current xid - if it is assigned - now has been wal logged. diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index d930644..b1e2e71 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -26,6 +26,7 @@ #include "access/xlog.h" #include "catalog/dependency.h" #include "catalog/pg_authid.h" +#include "catalog/pg_proc_d.h" #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" @@ -40,11 +41,13 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "nodes/makefuncs.h" +#include "optimizer/clauses.h" #include "optimizer/optimizer.h" #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_relation.h" +#include "pgstat.h" #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" @@ -95,6 +98,18 @@ typedef enum CopyInsertMethod CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ } CopyInsertMethod; +/* + * State of the chunk. + */ +typedef enum ParallelCopyChunkState +{ + CHUNK_INIT, /* initial state of chunk */ + CHUNK_LEADER_POPULATING, /* leader processing chunk */ + CHUNK_LEADER_POPULATED, /* leader completed populating chunk */ + CHUNK_WORKER_PROCESSING, /* worker processing chunk */ + CHUNK_WORKER_PROCESSED /* worker completed processing chunk */ +}ParallelCopyChunkState; + #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ #define DATA_BLOCK_SIZE RAW_BUF_SIZE @@ -527,9 +542,13 @@ if (1) \ { \ if (raw_buf_ptr > cstate->raw_buf_index) \ { \ - appendBinaryStringInfo(&cstate->line_buf, \ - cstate->raw_buf + cstate->raw_buf_index, \ - raw_buf_ptr - cstate->raw_buf_index); \ + if (!IsParallelCopy()) \ + appendBinaryStringInfo(&cstate->line_buf, \ + cstate->raw_buf + cstate->raw_buf_index, \ + raw_buf_ptr - cstate->raw_buf_index); \ + else \ + chunk_size += raw_buf_ptr - cstate->raw_buf_index; \ + \ cstate->raw_buf_index = raw_buf_ptr; \ } \ } else ((void) 0) @@ -542,13 +561,40 @@ if (1) \ goto not_end_of_copy; \ } else ((void) 0) +/* Begin parallel copy Macros */ +#define SET_NEWLINE_SIZE() \ +{ \ + if (cstate->eol_type == EOL_NL || cstate->eol_type == EOL_CR) \ + new_line_size = 1; \ + else if (cstate->eol_type == EOL_CRNL) \ + new_line_size = 2; \ + else \ + new_line_size = 0; \ +} + +/* + * COPY_WAIT_TO_PROCESS - Wait before continuing to process. + */ +#define COPY_WAIT_TO_PROCESS() \ +{ \ + CHECK_FOR_INTERRUPTS(); \ + (void) WaitLatch(MyLatch, \ + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, \ + 1L, WAIT_EVENT_PG_SLEEP); \ + ResetLatch(MyLatch); \ +} + +/* End parallel copy Macros */ + /* * CONVERT_TO_SERVER_ENCODING - convert contents to server encoding. */ #define CONVERT_TO_SERVER_ENCODING(cstate) \ { \ /* Done reading the line. Convert it to server encoding. */ \ - if (cstate->need_transcoding) \ + if (cstate->need_transcoding && \ + (!IsParallelCopy() || \ + (IsParallelCopy() && !IsLeader()))) \ { \ char *cvt; \ cvt = pg_any_to_server(cstate->line_buf.data, \ @@ -607,22 +653,38 @@ if (1) \ * CLEAR_EOL_LINE - Wrapper for clearing EOL. */ #define CLEAR_EOL_LINE() \ -if (!result && !IsHeaderLine()) \ - CLEAR_EOL_FROM_COPIED_DATA(cstate->line_buf.data, \ - cstate->line_buf.len, \ - cstate->line_buf.len) \ +{ \ + if (!result && !IsHeaderLine()) \ + { \ + if (IsParallelCopy()) \ + CLEAR_EOL_FROM_COPIED_DATA(cstate->raw_buf, \ + raw_buf_ptr, chunk_size) \ + else \ + CLEAR_EOL_FROM_COPIED_DATA(cstate->line_buf.data, \ + cstate->line_buf.len, \ + cstate->line_buf.len) \ + } \ +} /* * INCREMENTPROCESSED - Increment the lines processed. */ -#define INCREMENTPROCESSED(processed) \ -processed++; +#define INCREMENTPROCESSED(processed) \ +{ \ + if (!IsParallelCopy()) \ + processed++; \ + else \ + pg_atomic_add_fetch_u64(&cstate->pcdata->pcshared_info->processed, 1); \ +} /* * GETPROCESSED - Get the lines processed. */ #define GETPROCESSED(processed) \ -return processed; +if (!IsParallelCopy()) \ + return processed; \ +else \ + return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; @@ -671,8 +733,12 @@ static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); static pg_attribute_always_inline void EndParallelCopy(ParallelContext *pcxt); +static void ExecBeforeStmtTrigger(CopyState cstate); +static void CheckCopyFromValidity(CopyState cstate); static void PopulateAttributes(CopyState cstate, TupleDesc tup_desc, List *attnamelist); +static void PopulateCatalogInformation(CopyState cstate); +static pg_attribute_always_inline uint32 GetChunkPosition(CopyState cstate); static pg_attribute_always_inline copy_data_source_cb LookupParallelCopyFnPtr(const char *funcname); static pg_attribute_always_inline char* LookupParallelCopyFnStr(copy_data_source_cb fn_addr); @@ -826,6 +892,130 @@ InsertListShm(ParallelContext *pcxt, int key, List *inputlist, } /* + * IsTriggerFunctionParallelSafe - Check if the trigger function is parallel + * safe for the triggers. Return false if any one of the trigger has parallel + * unsafe function. + */ +static pg_attribute_always_inline bool +IsTriggerFunctionParallelSafe(TriggerDesc *trigdesc) +{ + int i; + for (i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trigger = &trigdesc->triggers[i]; + int trigtype = RI_TRIGGER_NONE; + + if (func_parallel(trigger->tgfoid) != PROPARALLEL_SAFE) + return false; + + /* If the trigger is parallel safe, also look for RI_TRIGGER. */ + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + if (trigtype == RI_TRIGGER_PK || trigtype == RI_TRIGGER_FK) + return false; + } + + return true; +} + +/* + * CheckExprParallelSafety - determine parallel safety of volatile expressions + * in default clause of column definition or in where clause and return true if + * they are parallel safe. + */ +static pg_attribute_always_inline bool +CheckExprParallelSafety(CopyState cstate) +{ + if (contain_volatile_functions(cstate->whereClause)) + { + if (!is_parallel_safe(NULL, (Node *)cstate->whereClause)) + return false; + } + + if (cstate->volatile_defexprs && cstate->defexprs != NULL && + cstate->num_defaults != 0) + { + int i; + for (i = 0; i < cstate->num_defaults; i++) + { + if (!is_parallel_safe(NULL, (Node *) cstate->defexprs[i]->expr)) + return false; + } + } + + return true; +} + +/* + * FindInsertMethod - determine insert mode single, multi, or multi conditional. + */ +static pg_attribute_always_inline CopyInsertMethod +FindInsertMethod(CopyState cstate) +{ + if (cstate->rel->trigdesc != NULL && + (cstate->rel->trigdesc->trig_insert_before_row || + cstate->rel->trigdesc->trig_insert_instead_row)) + return CIM_SINGLE; + + if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + cstate->rel->trigdesc != NULL && + cstate->rel->trigdesc->trig_insert_new_table) + return CIM_SINGLE; + + if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + return CIM_SINGLE; + + if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return CIM_MULTI_CONDITIONAL; + + return CIM_MULTI; +} + +/* + * IsParallelCopyAllowed - check for the cases where parallel copy is not + * applicable. + */ +static pg_attribute_always_inline bool +IsParallelCopyAllowed(CopyState cstate) +{ + /* Parallel copy not allowed for freeze & binary option. */ + if (cstate->freeze || cstate->binary) + return false; + + /* Check if copy is into foreign table. */ + if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + return false; + + /* Check if copy is into a temporary table. */ + if (RELATION_IS_LOCAL(cstate->rel) || RELATION_IS_OTHER_TEMP(cstate->rel)) + return false; + + /* Check if trigger function is parallel safe. */ + if (cstate->rel->trigdesc != NULL && + !IsTriggerFunctionParallelSafe(cstate->rel->trigdesc)) + return false; + + /* + * Check if there is after statement or instead of trigger or transition + * table triggers. + */ + if (cstate->rel->trigdesc != NULL && + (cstate->rel->trigdesc->trig_insert_after_statement || + cstate->rel->trigdesc->trig_insert_instead_row || + cstate->rel->trigdesc->trig_insert_new_table)) + return false; + + /* Check if the volatile expressions are parallel safe, if present any. */ + if (!CheckExprParallelSafety(cstate)) + return false; + + /* Check if the insertion mode is single. */ + if (FindInsertMethod(cstate) == CIM_SINGLE) + return false; + + return true; +} + +/* * BeginParallelCopy - start parallel copy tasks. * * Get the number of workers required to perform the parallel copy. The data @@ -855,6 +1045,8 @@ BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) int parallel_workers = 0; ParallelCopyData *pcdata; + CheckCopyFromValidity(cstate); + parallel_workers = Min(nworkers, max_worker_processes); /* Can't perform copy in parallel */ @@ -864,6 +1056,15 @@ BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); cstate->pcdata = pcdata; + /* + * User chosen parallel copy. Determine if the parallel copy is actually + * allowed. If not, go with the non-parallel mode. + */ + if (!IsParallelCopyAllowed(cstate)) + return NULL; + + full_transaction_id = GetCurrentFullTransactionId(); + EnterParallelMode(); pcxt = CreateParallelContext("postgres", "ParallelCopyMain", parallel_workers); @@ -1090,7 +1291,211 @@ ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate, cstate->line_buf_converted = false; cstate->raw_buf = NULL; cstate->raw_buf_index = cstate->raw_buf_len = 0; + + PopulateCatalogInformation(cstate); + + /* Create workspace for CopyReadAttributes results. */ + if (!cstate->binary) + { + AttrNumber attr_count = list_length(cstate->attnumlist); + + cstate->max_fields = attr_count; + cstate->raw_fields = (char **)palloc(attr_count * sizeof(char *)); + } } + +/* + * CacheChunkInfo - Cache the chunk information to local memory. + */ +static bool +CacheChunkInfo(CopyState cstate, uint32 buff_count) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyData *pcdata = cstate->pcdata; + uint32 write_pos; + ParallelCopyDataBlock *data_blk_ptr; + ParallelCopyChunkBoundary *chunkInfo; + uint32 offset; + int dataSize; + int copiedSize = 0; + + resetStringInfo(&pcdata->worker_line_buf[buff_count].line_buf); + write_pos = GetChunkPosition(cstate); + if (-1 == write_pos) + return true; + + /* Get the current chunk information. */ + chunkInfo = &pcshared_info->chunk_boundaries.ring[write_pos]; + if (pg_atomic_read_u32(&chunkInfo->chunk_size) == 0) + goto empty_data_chunk_update; + + /* Get the block information. */ + data_blk_ptr = &pcshared_info->data_blocks[chunkInfo->first_block]; + + /* Get the offset information from where the data must be copied. */ + offset = chunkInfo->start_offset; + pcdata->worker_line_buf[buff_count].cur_lineno = chunkInfo->cur_lineno; + + elog(DEBUG1, "[Worker] Processing - chunk position:%d, block:%d, unprocessed chunks:%d, offset:%d, chunk size:%d", + write_pos, chunkInfo->first_block, + pg_atomic_read_u32(&data_blk_ptr->unprocessed_chunk_parts), + offset, pg_atomic_read_u32(&chunkInfo->chunk_size)); + + for (;;) + { + uint8 skip_bytes = data_blk_ptr->skip_bytes; + /* + * There is a possibility that the above loop has come out because + * data_blk_ptr->curr_blk_completed is set, but dataSize read might + * be an old value, if data_blk_ptr->curr_blk_completed and the chunk is + * completed, chunk_size will be set. Read the chunk_size again to be + * sure if it is complete or partial block. + */ + dataSize = pg_atomic_read_u32(&chunkInfo->chunk_size); + if (dataSize) + { + int remainingSize = dataSize - copiedSize; + if (!remainingSize) + break; + + /* Whole chunk is in current block. */ + if (remainingSize + offset + skip_bytes < DATA_BLOCK_SIZE) + { + appendBinaryStringInfo(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + remainingSize); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_chunk_parts, + 1); + break; + } + else + { + /* Chunk is spread across the blocks. */ + uint32 chunkInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset; + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + chunkInCurrentBlock); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_chunk_parts, 1); + copiedSize += chunkInCurrentBlock; + while (copiedSize < dataSize) + { + uint32 currentBlockCopySize; + ParallelCopyDataBlock *currBlkPtr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; + skip_bytes = currBlkPtr->skip_bytes; + + /* + * If complete data is present in current block use + * dataSize - copiedSize, or copy the whole block from + * current block. + */ + currentBlockCopySize = Min(dataSize - copiedSize, DATA_BLOCK_SIZE - skip_bytes); + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &currBlkPtr->data[0], + currentBlockCopySize); + pg_atomic_sub_fetch_u32(&currBlkPtr->unprocessed_chunk_parts, 1); + copiedSize += currentBlockCopySize; + data_blk_ptr = currBlkPtr; + } + + break; + } + } + else + { + /* Copy this complete block from the current offset. */ + uint32 chunkInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset; + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + chunkInCurrentBlock); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_chunk_parts, 1); + copiedSize += chunkInCurrentBlock; + + /* + * Reset the offset. For the first copy, copy from the offset. For + * the subsequent copy the complete block. + */ + offset = 0; + + /* Set data_blk_ptr to the following block. */ + data_blk_ptr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; + } + + for (;;) + { + /* Get the size of this chunk */ + dataSize = pg_atomic_read_u32(&chunkInfo->chunk_size); + + /* + * If the data is present in current block chunkInfo.chunk_size + * will be updated. If the data is spread across the blocks either + * of chunkInfo.chunk_size or data_blk_ptr->curr_blk_completed can + * be updated. chunkInfo.chunk_size will be updated if the complete + * read is finished. data_blk_ptr->curr_blk_completed will be + * updated if processing of current block is finished and data + * processing is not finished. + */ + if (data_blk_ptr->curr_blk_completed || (dataSize != -1)) + break; + + COPY_WAIT_TO_PROCESS() + } + } + +empty_data_chunk_update: + elog(DEBUG1, "[Worker] Completed processing chunk:%d", write_pos); + pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_WORKER_PROCESSED); + pg_atomic_write_u32(&chunkInfo->chunk_size, -1); + pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); + return false; +} + +/* + * GetWorkerChunk - Returns a chunk for worker to process. + */ +static bool +GetWorkerChunk(CopyState cstate) +{ + uint32 buff_count; + ParallelCopyData *pcdata = cstate->pcdata; + + /* + * Copy the chunk data to line_buf and release the chunk position so that the + * worker can continue loading data. + */ + if (pcdata->worker_line_buf_pos < pcdata->worker_line_buf_count) + goto return_chunk; + + pcdata->worker_line_buf_pos = 0; + pcdata->worker_line_buf_count = 0; + + for (buff_count = 0; buff_count < WORKER_CHUNK_COUNT; buff_count++) + { + bool result = CacheChunkInfo(cstate, buff_count); + if (result) + break; + + pcdata->worker_line_buf_count++; + } + + if (pcdata->worker_line_buf_count) + goto return_chunk; + else + resetStringInfo(&cstate->line_buf); + + return true; + +return_chunk: + cstate->line_buf = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].line_buf; + cstate->cur_lineno = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].cur_lineno; + cstate->line_buf_valid = true; + + /* Mark that encoding conversion hasn't occurred yet. */ + cstate->line_buf_converted = false; + CONVERT_TO_SERVER_ENCODING(cstate) + pcdata->worker_line_buf_pos++; + return false; +} + /* * ParallelCopyMain - parallel copy worker's code. * @@ -1136,6 +1541,7 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc) ereport(DEBUG1, (errmsg("Starting parallel copy worker"))); pcdata->pcshared_info = pcshared_info; + AssignFullTransactionIdForWorker(pcshared_info->full_transaction_id); shared_cstate = (ParallelCopyCommonKeyData *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_CSTATE, false); @@ -1188,6 +1594,34 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc) MemoryContextSwitchTo(oldcontext); return; } + +/* + * UpdateBlockInChunkInfo - Update the chunk information. + */ +static pg_attribute_always_inline int +UpdateBlockInChunkInfo(CopyState cstate, uint32 blk_pos, + uint32 offset, uint32 chunk_size, uint32 chunk_state) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries; + ParallelCopyChunkBoundary *chunkInfo; + int chunk_pos = chunkBoundaryPtr->leader_pos; + + /* Update the chunk information for the worker to pick and process. */ + chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; + while (pg_atomic_read_u32(&chunkInfo->chunk_size) != -1) + COPY_WAIT_TO_PROCESS() + + chunkInfo->first_block = blk_pos; + chunkInfo->start_offset = offset; + chunkInfo->cur_lineno = cstate->cur_lineno; + pg_atomic_write_u32(&chunkInfo->chunk_size, chunk_size); + pg_atomic_write_u32(&chunkInfo->chunk_state, chunk_state); + chunkBoundaryPtr->leader_pos = (chunkBoundaryPtr->leader_pos + 1) % RINGSIZE; + + return chunk_pos; +} + /* * ParallelCopyLeader - parallel copy leader's functionality. * @@ -1213,9 +1647,158 @@ ParallelCopyLeader(CopyState cstate) ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; ereport(DEBUG1, (errmsg("Running parallel copy leader"))); + /* Execute the before statement triggers from the leader */ + ExecBeforeStmtTrigger(cstate); + + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } + } + + for (;;) + { + bool done; + cstate->cur_lineno++; + + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + pcshared_info->is_read_in_progress = false; cstate->cur_lineno = 0; } + +/* + * GetChunkPosition - return the chunk position that worker should process. + */ +static uint32 +GetChunkPosition(CopyState cstate) +{ + ParallelCopyData *pcdata = cstate->pcdata; + ParallelCopyShmInfo *pcshared_info = pcdata->pcshared_info; + uint32 previous_pos = pcdata->worker_processed_pos; + uint32 write_pos = (previous_pos == -1) ? 0 : (previous_pos + 1) % RINGSIZE; + for (;;) + { + int dataSize; + bool is_read_in_progress = pcshared_info->is_read_in_progress; + ParallelCopyChunkBoundary *chunkInfo; + ParallelCopyDataBlock *data_blk_ptr; + ParallelCopyChunkState chunk_state = CHUNK_LEADER_POPULATED; + ParallelCopyChunkState curr_chunk_state; + CHECK_FOR_INTERRUPTS(); + + /* File read completed & no elements to process. */ + if (!is_read_in_progress && + (pcshared_info->populated == + pg_atomic_read_u64(&pcshared_info->total_worker_processed))) + { + write_pos = -1; + break; + } + + /* Get the current chunk information. */ + chunkInfo = &pcshared_info->chunk_boundaries.ring[write_pos]; + curr_chunk_state = pg_atomic_read_u32(&chunkInfo->chunk_state); + if ((write_pos % WORKER_CHUNK_COUNT == 0) && + (curr_chunk_state == CHUNK_WORKER_PROCESSED || + curr_chunk_state == CHUNK_WORKER_PROCESSING)) + { + pcdata->worker_processed_pos = write_pos; + write_pos = (write_pos + WORKER_CHUNK_COUNT) % RINGSIZE; + continue; + } + + /* Get the size of this chunk. */ + dataSize = pg_atomic_read_u32(&chunkInfo->chunk_size); + + if (dataSize != 0) /* If not an empty chunk. */ + { + /* Get the block information. */ + data_blk_ptr = &pcshared_info->data_blocks[chunkInfo->first_block]; + + if (!data_blk_ptr->curr_blk_completed && (dataSize == -1)) + { + /* Wait till the current chunk or block is added. */ + COPY_WAIT_TO_PROCESS() + continue; + } + } + + /* Make sure that no worker has consumed this element. */ + if (pg_atomic_compare_exchange_u32(&chunkInfo->chunk_state, + &chunk_state, CHUNK_WORKER_PROCESSING)) + break; + } + + pcdata->worker_processed_pos = write_pos; + return write_pos; +} + +/* + * GetFreeCopyBlock - Get a free block for data to be copied. + */ +static pg_attribute_always_inline uint32 +GetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) +{ + int count = 0; + uint32 last_free_block = pcshared_info->cur_block_pos; + uint32 block_pos = (last_free_block != -1) ? ((last_free_block + 1) % MAX_BLOCKS_COUNT): 0; + + /* Get a new block for copying data. */ + while (count < MAX_BLOCKS_COUNT) + { + ParallelCopyDataBlock *dataBlkPtr = &pcshared_info->data_blocks[block_pos]; + uint32 unprocessed_chunk_parts = pg_atomic_read_u32(&dataBlkPtr->unprocessed_chunk_parts); + if (unprocessed_chunk_parts == 0) + { + dataBlkPtr->curr_blk_completed = false; + dataBlkPtr->skip_bytes = 0; + pcshared_info->cur_block_pos = block_pos; + return block_pos; + } + + block_pos = (block_pos + 1) % MAX_BLOCKS_COUNT; + count++; + } + + return -1; +} + +/* + * WaitGetFreeCopyBlock - If there are no blocks available, wait and get a block + * for copying data. + */ +static uint32 +WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) +{ + uint32 new_free_pos = -1; + for (;;) + { + new_free_pos = GetFreeCopyBlock(pcshared_info); + if (new_free_pos != -1) /* We have got one block, break now. */ + break; + + COPY_WAIT_TO_PROCESS() + } + + return new_free_pos; +} + /* * LookupParallelCopyFnPtr - Look up parallel copy function pointer. */ @@ -1251,6 +1834,146 @@ LookupParallelCopyFnStr(copy_data_source_cb fn_addr) /* We can only reach this by programming error. */ elog(ERROR, "internal function pointer not found"); } + +/* + * SetRawBufForLoad - Set raw_buf to the shared memory where the file data must + * be read. + */ +static void +SetRawBufForLoad(CopyState cstate, uint32 chunk_size, uint32 copy_buf_len, + uint32 raw_buf_ptr, char **copy_raw_buf) +{ + ParallelCopyShmInfo *pcshared_info; + uint32 cur_block_pos; + uint32 next_block_pos; + ParallelCopyDataBlock *cur_data_blk_ptr = NULL; + ParallelCopyDataBlock *next_data_blk_ptr = NULL; + + if (!IsParallelCopy()) + return; + + pcshared_info = cstate->pcdata->pcshared_info; + cur_block_pos = pcshared_info->cur_block_pos; + cur_data_blk_ptr = (cstate->raw_buf) ? &pcshared_info->data_blocks[cur_block_pos] : NULL; + next_block_pos = WaitGetFreeCopyBlock(pcshared_info); + next_data_blk_ptr = &pcshared_info->data_blocks[next_block_pos]; + + /* set raw_buf to the data block in shared memory */ + cstate->raw_buf = next_data_blk_ptr->data; + *copy_raw_buf = cstate->raw_buf; + if (cur_data_blk_ptr && chunk_size) + { + /* + * Mark the previous block as completed, worker can start copying this + * data. + */ + cur_data_blk_ptr->following_block = next_block_pos; + pg_atomic_add_fetch_u32(&cur_data_blk_ptr->unprocessed_chunk_parts, 1); + cur_data_blk_ptr->skip_bytes = copy_buf_len - raw_buf_ptr; + cur_data_blk_ptr->curr_blk_completed = true; + } +} + +/* + * EndChunkParallelCopy - Update the chunk information in shared memory. + */ +static void +EndChunkParallelCopy(CopyState cstate, uint32 chunk_pos, uint32 chunk_size, + uint32 raw_buf_ptr) +{ + uint8 new_line_size; + if (!IsParallelCopy()) + return; + + if (!IsHeaderLine()) + { + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries; + SET_NEWLINE_SIZE() + if (chunk_size) + { + ParallelCopyChunkBoundary *chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; + /* + * If the new_line_size > raw_buf_ptr, then the new block has only + * new line char content. The unprocessed count should not be + * increased in this case. + */ + if (raw_buf_ptr > new_line_size) + { + uint32 cur_block_pos = pcshared_info->cur_block_pos; + ParallelCopyDataBlock *curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; + pg_atomic_add_fetch_u32(&curr_data_blk_ptr->unprocessed_chunk_parts, 1); + } + + /* Update chunk size. */ + pg_atomic_write_u32(&chunkInfo->chunk_size, chunk_size); + pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_LEADER_POPULATED); + elog(DEBUG1, "[Leader] After adding - chunk position:%d, chunk_size:%d", + chunk_pos, chunk_size); + pcshared_info->populated++; + } + else if (new_line_size) + { + /* This means only new line char, empty record should be inserted.*/ + ParallelCopyChunkBoundary *chunkInfo; + chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0, + CHUNK_LEADER_POPULATED); + chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; + elog(DEBUG1, "[Leader] Added empty chunk with offset:%d, chunk position:%d, chunk size:%d", + chunkInfo->start_offset, chunk_pos, + pg_atomic_read_u32(&chunkInfo->chunk_size)); + pcshared_info->populated++; + } + } +} + +/* + * ExecBeforeStmtTrigger - Execute the before statement trigger, this will be + * executed for parallel copy by the leader process. + */ +static void +ExecBeforeStmtTrigger(CopyState cstate) +{ + EState *estate = CreateExecutorState(); + ResultRelInfo *resultRelInfo; + + Assert(IsLeader()); + + /* + * We need a ResultRelInfo so we can use the regular executor's + * index-entry-making machinery. (There used to be a huge amount of code + * here that basically duplicated execUtils.c ...) + */ + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, + cstate->rel, + 1, /* must match rel's position in range_table */ + NULL, + 0); + + /* Verify the named relation is a valid target for INSERT */ + CheckValidResultRel(resultRelInfo, CMD_INSERT); + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + ExecInitRangeTable(estate, cstate->range_table); + + /* + * Check BEFORE STATEMENT insertion triggers. It's debatable whether we + * should do this for COPY, since it's not really an "INSERT" statement as + * such. However, executing these triggers maintains consistency with the + * EACH ROW triggers that we already fire on COPY. + */ + ExecBSInsertTriggers(estate, resultRelInfo); + + /* Close any trigger target relations */ + ExecCleanUpTriggerState(estate); + + FreeExecutorState(estate); +} + /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. @@ -3611,7 +4334,8 @@ CopyFrom(CopyState cstate) PartitionTupleRouting *proute = NULL; ErrorContextCallback errcallback; - CommandId mycid = GetCurrentCommandId(true); + CommandId mycid = IsParallelCopy() ? cstate->pcdata->pcshared_info->mycid : + GetCurrentCommandId(true); int ti_options = 0; /* start with default options for insert */ BulkInsertState bistate = NULL; CopyInsertMethod insertMethod; @@ -3621,7 +4345,14 @@ CopyFrom(CopyState cstate) bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; - CheckCopyFromValidity(cstate); + /* + * Perform this check if it is not parallel copy. In case of parallel + * copy, this check is done by the leader, so that if any invalid case + * exist the copy from command will error out from the leader itself, + * avoiding launching workers, just to throw error. + */ + if (!IsParallelCopy()) + CheckCopyFromValidity(cstate); /* * If the target file is new-in-transaction, we assume that checking FSM @@ -3848,13 +4579,16 @@ CopyFrom(CopyState cstate) has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row); - /* - * Check BEFORE STATEMENT insertion triggers. It's debatable whether we - * should do this for COPY, since it's not really an "INSERT" statement as - * such. However, executing these triggers maintains consistency with the - * EACH ROW triggers that we already fire on COPY. - */ - ExecBSInsertTriggers(estate, resultRelInfo); + if (!IsParallelCopy()) + { + /* + * Check BEFORE STATEMENT insertion triggers. It's debatable whether we + * should do this for COPY, since it's not really an "INSERT" statement as + * such. However, executing these triggers maintains consistency with the + * EACH ROW triggers that we already fire on COPY. + */ + ExecBSInsertTriggers(estate, resultRelInfo); + } econtext = GetPerTupleExprContext(estate); @@ -4368,7 +5102,7 @@ BeginCopyFrom(ParseState *pstate, initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf = (IsParallelCopy()) ? NULL : (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; /* Assign range table, we'll need it in CopyFrom. */ @@ -4512,26 +5246,35 @@ NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields) /* only available for text or csv input */ Assert(!cstate->binary); - /* on input just throw the header line away */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (IsParallelCopy()) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) - return false; /* done */ + done = GetWorkerChunk(cstate); + if (done && cstate->line_buf.len == 0) + return false; } + else + { + /* on input just throw the header line away */ + if (cstate->cur_lineno == 0 && cstate->header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + return false; /* done */ + } - cstate->cur_lineno++; + cstate->cur_lineno++; - /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + /* Actually read the line into memory here */ + done = CopyReadLine(cstate); - /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. - */ - if (done && cstate->line_buf.len == 0) - return false; + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + return false; + } /* Parse the line into de-escaped field values */ if (cstate->csv_mode) @@ -4781,9 +5524,31 @@ CopyReadLine(CopyState cstate) */ if (cstate->copy_dest == COPY_NEW_FE) { + bool bIsFirst = true; do { - cstate->raw_buf_index = cstate->raw_buf_len; + if (!IsParallelCopy()) + cstate->raw_buf_index = cstate->raw_buf_len; + else + { + if (cstate->raw_buf_index == RAW_BUF_SIZE) + { + /* Get a new block if it is the first time, From the + * subsequent time, reset the index and re-use the same + * block. + */ + if (bIsFirst) + { + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint32 block_pos = WaitGetFreeCopyBlock(pcshared_info); + cstate->raw_buf = pcshared_info->data_blocks[block_pos].data; + bIsFirst = false; + } + + cstate->raw_buf_index = cstate->raw_buf_len = 0; + } + } + } while (CopyLoadRawBuf(cstate)); } } @@ -4813,6 +5578,11 @@ CopyReadLineText(CopyState cstate) char quotec = '\0'; char escapec = '\0'; + /* For parallel copy */ + uint32 chunk_size = 0; + int chunk_pos = 0; + + cstate->eol_type = EOL_UNKNOWN; if (cstate->csv_mode) { quotec = cstate->quote[0]; @@ -4867,6 +5637,8 @@ CopyReadLineText(CopyState cstate) if (raw_buf_ptr >= copy_buf_len || need_data) { REFILL_LINEBUF; + SetRawBufForLoad(cstate, chunk_size, copy_buf_len, raw_buf_ptr, + ©_raw_buf); /* * Try to read some more data. This will certainly reset @@ -5091,9 +5863,15 @@ CopyReadLineText(CopyState cstate) * discard the data and the \. sequence. */ if (prev_raw_ptr > cstate->raw_buf_index) - appendBinaryStringInfo(&cstate->line_buf, + { + if (!IsParallelCopy()) + appendBinaryStringInfo(&cstate->line_buf, cstate->raw_buf + cstate->raw_buf_index, prev_raw_ptr - cstate->raw_buf_index); + else + chunk_size += prev_raw_ptr - cstate->raw_buf_index; + } + cstate->raw_buf_index = raw_buf_ptr; result = true; /* report EOF */ break; @@ -5145,6 +5923,26 @@ not_end_of_copy: IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1); raw_buf_ptr += mblen - 1; } + + /* + * Skip the header line. Update the chunk here, this cannot be done at + * the beginning, as there is a possibility that file contains empty + * lines. + */ + if (IsParallelCopy() && first_char_in_line && !IsHeaderLine()) + { + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyChunkBoundary *chunkInfo; + uint32 chunk_first_block = pcshared_info->cur_block_pos; + chunk_pos = UpdateBlockInChunkInfo(cstate, + chunk_first_block, + cstate->raw_buf_index, -1, + CHUNK_LEADER_POPULATING); + chunkInfo = &pcshared_info->chunk_boundaries.ring[chunk_pos]; + elog(DEBUG1, "[Leader] Adding - block:%d, offset:%d, chunk position:%d", + chunk_first_block, chunkInfo->start_offset, chunk_pos); + } + first_char_in_line = false; } /* end of outer loop */ @@ -5153,6 +5951,7 @@ not_end_of_copy: */ REFILL_LINEBUF; CLEAR_EOL_LINE() + EndChunkParallelCopy(cstate, chunk_pos, chunk_size, raw_buf_ptr); return result; } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 0c6fe01..3faadb8 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -865,7 +865,7 @@ is_parallel_safe(PlannerInfo *root, Node *node) * planning, because those are parallel-restricted and there might be one * in this expression. But otherwise we don't need to look. */ - if (root->glob->maxParallelHazard == PROPARALLEL_SAFE && + if (root != NULL && root->glob->maxParallelHazard == PROPARALLEL_SAFE && root->glob->paramExecTypes == NIL) return true; /* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 88025b1..f8bdcc3 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -381,6 +381,7 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); +extern void AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3373894..30eb49d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1700,6 +1700,7 @@ ParallelCompletionPtr ParallelContext ParallelCopyChunkBoundaries ParallelCopyChunkBoundary +ParallelCopyChunkState ParallelCopyCommonKeyData ParallelCopyData ParallelCopyDataBlock -- 1.8.3.1