From eb1a33276d1f907d14e7e1962b1cd254b81e1587 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 7 Oct 2020 17:24:44 +0530 Subject: [PATCH v6 3/6] Allow copy from command to process data from file/STDIN contents to a table in parallel. This feature allows the copy from to leverage multiple CPUs in order to copy data from file/STDIN to a table. This adds a PARALLEL option to COPY FROM command where the user can specify the number of workers that can be used to perform the COPY FROM command. The backend, to which the "COPY FROM" query is submitted acts as leader with the responsibility of reading data from the file/stdin, launching at most n number of workers as specified with PARALLEL 'n' option in the "COPY FROM" query. The leader populates the common data required for the workers execution in the DSM and shares it with the workers. The leader then executes before statement triggers if there exists any. Leader populates DSM lines which includes the start offset and line size, while populating the lines it reads as many blocks as required into the DSM data blocks from the file. Each block is of 64K size. The leader parses the data to identify a line, the existing logic from CopyReadLineText which identifies the lines with some changes was used for this. Leader checks if a free line is available to copy the information, if there is no free line it waits till the required line is freed up by the worker and then copies the identified lines information (offset & line size) into the DSM lines. This process is repeated till the complete file is processed. Simultaneously, the workers cache the lines(50) locally into the local memory and release the lines to the leader for further populating. Each worker processes the lines that it cached and inserts it into the table. The leader does not participate in the insertion of data, leaders only responsibility will be to identify the lines as fast as possible for the workers to do the actual copy operation. The leader waits till all the lines populated are processed by the workers and exits. We have chosen this design based on the reason "that everything stalls if the leader doesn't accept further input data, as well as when there are no available splitted chunks so it doesn't seem like a good idea to have the leader do other work. This is backed by the performance data where we have seen that with 1 worker there is just a 5-10% performance difference". --- src/backend/access/common/toast_internals.c | 12 +- src/backend/access/heap/heapam.c | 11 - src/backend/access/transam/xact.c | 15 + src/backend/commands/copy.c | 220 +++-- src/backend/commands/copyparallel.c | 1269 +++++++++++++++++++++++++++ src/bin/psql/tab-complete.c | 2 +- src/include/access/xact.h | 1 + src/include/commands/copy.h | 69 +- src/tools/pgindent/typedefs.list | 1 + 9 files changed, 1514 insertions(+), 86 deletions(-) create mode 100644 src/backend/commands/copyparallel.c diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c index 25a81e5..70c070e 100644 --- a/src/backend/access/common/toast_internals.c +++ b/src/backend/access/common/toast_internals.c @@ -17,6 +17,7 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/heaptoast.h" +#include "access/parallel.h" #include "access/table.h" #include "access/toast_internals.h" #include "access/xact.h" @@ -116,7 +117,16 @@ toast_save_datum(Relation rel, Datum value, TupleDesc toasttupDesc; Datum t_values[3]; bool t_isnull[3]; - CommandId mycid = GetCurrentCommandId(true); + + /* + * Parallel copy can insert toast tuples, in case of parallel copy the + * command would have been set already by calling + * AssignCommandIdForWorker. For parallel copy call GetCurrentCommandId to + * get currentCommandId by passing used as false, as this is taken care + * earlier. + */ + CommandId mycid = IsParallelWorker() ? GetCurrentCommandId(false) : + GetCurrentCommandId(true); struct varlena *result; struct varatt_external toast_pointer; union diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 1585861..1602525 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2043,17 +2043,6 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options) { - /* - * To allow parallel inserts, we need to ensure that they are safe to be - * performed in workers. We have the infrastructure to allow parallel - * inserts in general except for the cases where inserts generate a new - * CommandId (eg. inserts into a table having a foreign key column). - */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot insert tuples in a parallel worker"))); - tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index af6afce..0b3337c 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -776,6 +776,21 @@ GetCurrentCommandId(bool used) } /* + * SetCurrentCommandIdUsedForWorker + * + * For a parallel worker, record that the currentCommandId has been used. + * This must only be called at the start of a parallel operation. + */ +void +SetCurrentCommandIdUsedForWorker(void) +{ + Assert(IsParallelWorker() && !currentCommandIdUsed && + (currentCommandId != InvalidCommandId)); + + currentCommandIdUsed = true; +} + +/* * SetParallelStartTimestamps * * In a parallel worker, we should inherit the parent transaction's diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 1e55a30..dc006a5 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -61,20 +61,6 @@ #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') -/* - * Represents the heap insert method to be used during COPY FROM. - */ -typedef enum CopyInsertMethod -{ - CIM_SINGLE, /* use table_tuple_insert or fdw routine */ - CIM_MULTI, /* always use table_multi_insert */ - CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ -} CopyInsertMethod; - -#define IsParallelCopy() (cstate->is_parallel) -#define IsLeader() (cstate->pcdata->is_leader) -#define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -131,7 +117,6 @@ typedef struct CopyMultiInsertInfo int ti_options; /* table insert options */ } CopyMultiInsertInfo; - /* * These macros centralize code used to process line_buf and raw_buf buffers. * They are macros because they often do continue/break control and to avoid @@ -182,9 +167,13 @@ if (1) \ { \ if (raw_buf_ptr > cstate->raw_buf_index) \ { \ - appendBinaryStringInfo(&cstate->line_buf, \ - cstate->raw_buf + cstate->raw_buf_index, \ - raw_buf_ptr - cstate->raw_buf_index); \ + if (!IsParallelCopy()) \ + appendBinaryStringInfo(&cstate->line_buf, \ + cstate->raw_buf + cstate->raw_buf_index, \ + raw_buf_ptr - cstate->raw_buf_index); \ + else \ + line_size += raw_buf_ptr - cstate->raw_buf_index; \ + \ cstate->raw_buf_index = raw_buf_ptr; \ } \ } else ((void) 0) @@ -197,18 +186,6 @@ if (1) \ goto not_end_of_copy; \ } else ((void) 0) -/* - * Increment the lines processed. - */ -#define INCREMENTPROCESSED(processed) \ -processed++; - -/* - * Get the lines processed. - */ -#define RETURNPROCESSED(processed) \ -return processed; - static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; @@ -225,7 +202,6 @@ static void EndCopyTo(CopyState cstate); static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot); -static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); static int CopyReadAttributesCSV(CopyState cstate); @@ -258,7 +234,6 @@ static int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); static void ClearEOLFromCopiedData(CopyState cstate, char *copy_line_data, int copy_line_pos, int *copy_line_size); -static void ConvertToServerEncoding(CopyState cstate); /* @@ -2639,7 +2614,7 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, * * Check if the relation specified in copy from is valid. */ -static void +void CheckTargetRelValidity(CopyState cstate) { Assert(cstate->rel); @@ -2735,7 +2710,7 @@ CopyFrom(CopyState cstate) PartitionTupleRouting *proute = NULL; ErrorContextCallback errcallback; - CommandId mycid = GetCurrentCommandId(true); + CommandId mycid; int ti_options = 0; /* start with default options for insert */ BulkInsertState bistate = NULL; CopyInsertMethod insertMethod; @@ -2745,7 +2720,18 @@ CopyFrom(CopyState cstate) bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; - CheckTargetRelValidity(cstate); + /* + * Perform this check if it is not parallel copy. In case of parallel + * copy, this check is done by the leader, so that if any invalid case + * exist the copy from command will error out from the leader itself, + * avoiding launching workers, just to throw error. + */ + if (!IsParallelCopy()) + CheckTargetRelValidity(cstate); + else + SetCurrentCommandIdUsedForWorker(); + + mycid = GetCurrentCommandId(!IsParallelCopy()); /* * If the target file is new-in-transaction, we assume that checking FSM @@ -2785,7 +2771,8 @@ CopyFrom(CopyState cstate) target_resultRelInfo = resultRelInfo; /* Verify the named relation is a valid target for INSERT */ - CheckValidResultRel(resultRelInfo, CMD_INSERT); + if (!IsParallelCopy()) + CheckValidResultRel(resultRelInfo, CMD_INSERT); ExecOpenIndices(resultRelInfo, false); @@ -2934,13 +2921,17 @@ CopyFrom(CopyState cstate) has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row); - /* - * Check BEFORE STATEMENT insertion triggers. It's debatable whether we - * should do this for COPY, since it's not really an "INSERT" statement as - * such. However, executing these triggers maintains consistency with the - * EACH ROW triggers that we already fire on COPY. - */ - ExecBSInsertTriggers(estate, resultRelInfo); + if (!IsParallelCopy()) + { + /* + * Check BEFORE STATEMENT insertion triggers. It's debatable whether + * we should do this for COPY, since it's not really an "INSERT" + * statement as such. However, executing these triggers maintains + * consistency with the EACH ROW triggers that we already fire on + * COPY. + */ + ExecBSInsertTriggers(estate, resultRelInfo); + } econtext = GetPerTupleExprContext(estate); @@ -3040,6 +3031,29 @@ CopyFrom(CopyState cstate) !has_instead_insert_row_trig && resultRelInfo->ri_FdwRoutine == NULL; + /* + * We may still be able to perform parallel inserts for + * partitioned tables. However, the possibility of this + * depends on which types of triggers exist on the partition. + * We must not do parallel inserts if the partition is a + * foreign table or it has any BEFORE/INSTEAD OF row triggers. + * Since the partition's resultRelInfo are initialized only + * when we actually insert the first tuple into them, we may + * not know this info easily in the leader while deciding for + * the parallelism. We would have gone ahead and allowed + * parallelism. Now it's the time to throw an error and also + * provide a hint to the user to not use parallelism. Throwing + * an error seemed a simple approach than to look for all the + * partitions in the leader while deciding for the + * parallelism. Note that this error is thrown early, exactly + * on the first tuple being inserted into the partition, so + * not much work, that has been done so far, is wasted. + */ + if (!leafpart_use_multi_insert && IsParallelWorker()) + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform PARALLEL COPY if partition has BEFORE/INSTEAD OF triggers, or if the partition is foreign partition"), + errhint("Try COPY without PARALLEL option"))); + /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert) { @@ -3325,7 +3339,7 @@ CopyFrom(CopyState cstate) * * Populate the cstate catalog information. */ -static void +void PopulateCstateCatalogInfo(CopyState cstate) { TupleDesc tupDesc; @@ -3607,26 +3621,35 @@ NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields) /* only available for text or csv input */ Assert(!cstate->binary); - /* on input just throw the header line away */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (IsParallelCopy()) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) - return false; /* done */ + done = GetWorkerLine(cstate); + if (done && cstate->line_buf.len == 0) + return false; } + else + { + /* on input just throw the header line away */ + if (cstate->cur_lineno == 0 && cstate->header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + return false; /* done */ + } - cstate->cur_lineno++; + cstate->cur_lineno++; - /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + /* Actually read the line into memory here */ + done = CopyReadLine(cstate); - /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. - */ - if (done && cstate->line_buf.len == 0) - return false; + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + return false; + } /* Parse the line into de-escaped field values */ if (cstate->csv_mode) @@ -3851,7 +3874,7 @@ EndCopyFrom(CopyState cstate) * by newline. The terminating newline or EOF marker is not included * in the final value of line_buf. */ -static bool +bool CopyReadLine(CopyState cstate) { bool result; @@ -3874,9 +3897,34 @@ CopyReadLine(CopyState cstate) */ if (cstate->copy_dest == COPY_NEW_FE) { + bool bIsFirst = true; + do { - cstate->raw_buf_index = cstate->raw_buf_len; + if (!IsParallelCopy()) + cstate->raw_buf_index = cstate->raw_buf_len; + else + { + if (cstate->raw_buf_index == RAW_BUF_SIZE) + { + /* + * Get a new block if it is the first time, From the + * subsequent time, reset the index and re-use the + * same block. + */ + if (bIsFirst) + { + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint32 block_pos = WaitGetFreeCopyBlock(pcshared_info); + + cstate->raw_buf = pcshared_info->data_blocks[block_pos].data; + bIsFirst = false; + } + + cstate->raw_buf_index = cstate->raw_buf_len = 0; + } + } + } while (CopyLoadRawBuf(cstate)); } } @@ -3931,11 +3979,11 @@ ClearEOLFromCopiedData(CopyState cstate, char *copy_line_data, * * Convert contents to server encoding. */ -static void +void ConvertToServerEncoding(CopyState cstate) { /* Done reading the line. Convert it to server encoding. */ - if (cstate->need_transcoding) + if (cstate->need_transcoding && (!IsParallelCopy() || IsWorker())) { char *cvt; @@ -3975,6 +4023,11 @@ CopyReadLineText(CopyState cstate) char quotec = '\0'; char escapec = '\0'; + /* For parallel copy */ + int line_size = 0; + uint32 line_pos = 0; + + cstate->eol_type = EOL_UNKNOWN; if (cstate->csv_mode) { quotec = cstate->quote[0]; @@ -4029,6 +4082,8 @@ CopyReadLineText(CopyState cstate) if (raw_buf_ptr >= copy_buf_len || need_data) { REFILL_LINEBUF; + SetRawBufForLoad(cstate, line_size, copy_buf_len, raw_buf_ptr, + ©_raw_buf); /* * Try to read some more data. This will certainly reset @@ -4253,9 +4308,15 @@ CopyReadLineText(CopyState cstate) * discard the data and the \. sequence. */ if (prev_raw_ptr > cstate->raw_buf_index) - appendBinaryStringInfo(&cstate->line_buf, - cstate->raw_buf + cstate->raw_buf_index, - prev_raw_ptr - cstate->raw_buf_index); + { + if (!IsParallelCopy()) + appendBinaryStringInfo(&cstate->line_buf, + cstate->raw_buf + cstate->raw_buf_index, + prev_raw_ptr - cstate->raw_buf_index); + else + line_size += prev_raw_ptr - cstate->raw_buf_index; + } + cstate->raw_buf_index = raw_buf_ptr; result = true; /* report EOF */ break; @@ -4307,6 +4368,22 @@ not_end_of_copy: IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1); raw_buf_ptr += mblen - 1; } + + /* + * Skip the header line. Update the line here, this cannot be done at + * the beginning, as there is a possibility that file contains empty + * lines. + */ + if (IsParallelCopy() && first_char_in_line && !IsHeaderLine()) + { + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + + line_pos = UpdateSharedLineInfo(cstate, + pcshared_info->cur_block_pos, + cstate->raw_buf_index, -1, + LINE_LEADER_POPULATING, -1); + } + first_char_in_line = false; } /* end of outer loop */ @@ -4315,9 +4392,16 @@ not_end_of_copy: */ REFILL_LINEBUF; if (!result && !IsHeaderLine()) - ClearEOLFromCopiedData(cstate, cstate->line_buf.data, - cstate->line_buf.len, &cstate->line_buf.len); + { + if (IsParallelCopy()) + ClearEOLFromCopiedData(cstate, cstate->raw_buf, raw_buf_ptr, + &line_size); + else + ClearEOLFromCopiedData(cstate, cstate->line_buf.data, + cstate->line_buf.len, &cstate->line_buf.len); + } + EndLineParallelCopy(cstate, line_pos, line_size, raw_buf_ptr); return result; } diff --git a/src/backend/commands/copyparallel.c b/src/backend/commands/copyparallel.c new file mode 100644 index 0000000..6a44a01 --- /dev/null +++ b/src/backend/commands/copyparallel.c @@ -0,0 +1,1269 @@ +/*------------------------------------------------------------------------- + * + * copyparallel.c + * Implements the Parallel COPY utility command + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/commands/copyparallel.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/heapam.h" +#include "catalog/pg_proc_d.h" +#include "commands/copy.h" +#include "optimizer/clauses.h" +#include "optimizer/optimizer.h" +#include "pgstat.h" +#include "utils/lsyscache.h" + +/* DSM keys for parallel copy. */ +#define PARALLEL_COPY_KEY_SHARED_INFO 1 +#define PARALLEL_COPY_KEY_CSTATE 2 +#define PARALLEL_COPY_WAL_USAGE 3 +#define PARALLEL_COPY_BUFFER_USAGE 4 + +/* Begin parallel copy Macros */ +#define SET_NEWLINE_SIZE() \ +{ \ + if (cstate->eol_type == EOL_NL || cstate->eol_type == EOL_CR) \ + new_line_size = 1; \ + else if (cstate->eol_type == EOL_CRNL) \ + new_line_size = 2; \ + else \ + new_line_size = 0; \ +} + +/* + * COPY_WAIT_TO_PROCESS - Wait before continuing to process. + */ +#define COPY_WAIT_TO_PROCESS() \ +{ \ + CHECK_FOR_INTERRUPTS(); \ + (void) WaitLatch(MyLatch, \ + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, \ + 1L, WAIT_EVENT_PG_SLEEP); \ + ResetLatch(MyLatch); \ +} + +/* + * CopyStringToSharedMemory - Copy the string to shared memory. + */ +static void +CopyStringToSharedMemory(CopyState cstate, char *srcPtr, char *destptr, + uint32 *copiedsize) +{ + uint32 len = srcPtr ? strlen(srcPtr) + 1 : 0; + + memcpy(destptr, (uint16 *) &len, sizeof(uint16)); + *copiedsize += sizeof(uint16); + if (len) + { + memcpy(destptr + sizeof(uint16), srcPtr, len); + *copiedsize += len; + } +} + +/* + * SerializeParallelCopyState - Serialize the data into shared memory. + */ +static void +SerializeParallelCopyState(ParallelContext *pcxt, CopyState cstate, + uint32 estimatedSize, char *whereClauseStr, + char *rangeTableStr, char *attnameListStr, + char *notnullListStr, char *nullListStr, + char *convertListStr) +{ + SerializedParallelCopyState shared_cstate; + char *shmptr = (char *) shm_toc_allocate(pcxt->toc, estimatedSize + 1); + uint32 copiedsize = 0; + + shared_cstate.copy_dest = cstate->copy_dest; + shared_cstate.file_encoding = cstate->file_encoding; + shared_cstate.need_transcoding = cstate->need_transcoding; + shared_cstate.encoding_embeds_ascii = cstate->encoding_embeds_ascii; + shared_cstate.csv_mode = cstate->csv_mode; + shared_cstate.header_line = cstate->header_line; + shared_cstate.null_print_len = cstate->null_print_len; + shared_cstate.force_quote_all = cstate->force_quote_all; + shared_cstate.convert_selectively = cstate->convert_selectively; + shared_cstate.num_defaults = cstate->num_defaults; + shared_cstate.relid = cstate->pcdata->relid; + + memcpy(shmptr, (char *) &shared_cstate, sizeof(SerializedParallelCopyState)); + copiedsize = sizeof(SerializedParallelCopyState); + + CopyStringToSharedMemory(cstate, cstate->null_print, shmptr + copiedsize, + &copiedsize); + CopyStringToSharedMemory(cstate, cstate->delim, shmptr + copiedsize, + &copiedsize); + CopyStringToSharedMemory(cstate, cstate->quote, shmptr + copiedsize, + &copiedsize); + CopyStringToSharedMemory(cstate, cstate->escape, shmptr + copiedsize, + &copiedsize); + CopyStringToSharedMemory(cstate, attnameListStr, shmptr + copiedsize, + &copiedsize); + CopyStringToSharedMemory(cstate, notnullListStr, shmptr + copiedsize, + &copiedsize); + CopyStringToSharedMemory(cstate, nullListStr, shmptr + copiedsize, + &copiedsize); + CopyStringToSharedMemory(cstate, convertListStr, shmptr + copiedsize, + &copiedsize); + CopyStringToSharedMemory(cstate, whereClauseStr, shmptr + copiedsize, + &copiedsize); + CopyStringToSharedMemory(cstate, rangeTableStr, shmptr + copiedsize, + &copiedsize); + + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_CSTATE, shmptr); +} + +/* + * CopyNodeFromSharedMemory - Copy the shared memory & return the ptr. + */ +static char * +CopyStringFromSharedMemory(char *srcPtr, uint32 *copiedsize) +{ + char *destptr = NULL; + uint16 len = 0; + + memcpy((uint16 *) (&len), srcPtr, sizeof(uint16)); + *copiedsize += sizeof(uint16); + if (len) + { + destptr = (char *) palloc0(len); + memcpy(destptr, srcPtr + sizeof(uint16), len); + *copiedsize += len; + } + + return destptr; +} + +/* + * CopyNodeFromSharedMemory - Copy the shared memory & convert it into node + * type. + */ +static void * +CopyNodeFromSharedMemory(char *srcPtr, uint32 *copiedsize) +{ + char *destptr = NULL; + List *destList = NIL; + uint16 len = 0; + + memcpy((uint16 *) (&len), srcPtr, sizeof(uint16)); + *copiedsize += sizeof(uint16); + if (len) + { + destptr = (char *) palloc0(len); + memcpy(destptr, srcPtr + sizeof(uint16), len); + *copiedsize += len; + destList = (List *) stringToNode(destptr); + pfree(destptr); + } + + return destList; +} + +/* + * RestoreParallelCopyState - Retrieve the cstate from shared memory. + */ +static void +RestoreParallelCopyState(shm_toc *toc, CopyState cstate, List **attlist) +{ + char *shared_str_val = (char *) shm_toc_lookup(toc, PARALLEL_COPY_KEY_CSTATE, true); + SerializedParallelCopyState shared_cstate = {0}; + uint32 copiedsize = 0; + + memcpy(&shared_cstate, (char *) shared_str_val, sizeof(SerializedParallelCopyState)); + copiedsize = sizeof(SerializedParallelCopyState); + + cstate->file_encoding = shared_cstate.file_encoding; + cstate->need_transcoding = shared_cstate.need_transcoding; + cstate->encoding_embeds_ascii = shared_cstate.encoding_embeds_ascii; + cstate->csv_mode = shared_cstate.csv_mode; + cstate->header_line = shared_cstate.header_line; + cstate->null_print_len = shared_cstate.null_print_len; + cstate->force_quote_all = shared_cstate.force_quote_all; + cstate->convert_selectively = shared_cstate.convert_selectively; + cstate->num_defaults = shared_cstate.num_defaults; + cstate->pcdata->relid = shared_cstate.relid; + + cstate->null_print = CopyStringFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); + cstate->delim = CopyStringFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); + cstate->quote = CopyStringFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); + cstate->escape = CopyStringFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); + + *attlist = (List *) CopyNodeFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); + cstate->force_notnull = (List *) CopyNodeFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); + cstate->force_null = (List *) CopyNodeFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); + cstate->convert_select = (List *) CopyNodeFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); + cstate->whereClause = (Node *) CopyNodeFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); + cstate->range_table = (List *) CopyNodeFromSharedMemory(shared_str_val + copiedsize, + &copiedsize); +} + +/* + * EstimateStringSize - Estimate the size required for the string in shared + * memory. + */ +static uint32 +EstimateStringSize(char *str) +{ + uint32 strsize = sizeof(uint16); + + if (str) + strsize += strlen(str) + 1; + + return strsize; +} + +/* + * EstimateNodeSize - Convert the list to string & estimate the size required + * in shared memory. + */ +static uint32 +EstimateNodeSize(void *list, char **listStr) +{ + uint32 strsize = sizeof(uint16); + + if (list != NIL) + { + *listStr = nodeToString(list); + strsize += strlen(*listStr) + 1; + } + + return strsize; +} + +/* + * EstimateCstateSize - Estimate the size required in shared memory for cstate + * variables. + */ +static uint32 +EstimateCstateSize(ParallelContext *pcxt, CopyState cstate, List *attnamelist, + char **whereClauseStr, char **rangeTableStr, + char **attnameListStr, char **notnullListStr, + char **nullListStr, char **convertListStr) +{ + uint32 strsize = MAXALIGN(sizeof(SerializedParallelCopyState)); + + strsize += EstimateStringSize(cstate->null_print); + strsize += EstimateStringSize(cstate->delim); + strsize += EstimateStringSize(cstate->quote); + strsize += EstimateStringSize(cstate->escape); + strsize += EstimateNodeSize(attnamelist, attnameListStr); + strsize += EstimateNodeSize(cstate->force_notnull, notnullListStr); + strsize += EstimateNodeSize(cstate->force_null, nullListStr); + strsize += EstimateNodeSize(cstate->convert_select, convertListStr); + strsize += EstimateNodeSize(cstate->whereClause, whereClauseStr); + strsize += EstimateNodeSize(cstate->range_table, rangeTableStr); + + strsize++; + shm_toc_estimate_chunk(&pcxt->estimator, strsize); + shm_toc_estimate_keys(&pcxt->estimator, 1); + return strsize; +} + +/* + * PopulateParallelCopyShmInfo - Set ParallelCopyShmInfo. + */ +static void +PopulateParallelCopyShmInfo(ParallelCopyShmInfo *shared_info_ptr) +{ + uint32 count; + + MemSet(shared_info_ptr, 0, sizeof(ParallelCopyShmInfo)); + shared_info_ptr->is_read_in_progress = true; + shared_info_ptr->cur_block_pos = -1; + for (count = 0; count < RINGSIZE; count++) + { + ParallelCopyLineBoundary *lineInfo = &shared_info_ptr->line_boundaries.ring[count]; + + pg_atomic_init_u32(&(lineInfo->line_size), -1); + } +} + +/* + * CheckTrigFunParallelSafety - For all triggers, check if the associated + * trigger functions are parallel safe. If at least one trigger function is + * parallel unsafe, we do not allow parallelism. + */ +static pg_attribute_always_inline bool +CheckTrigFunParallelSafety(TriggerDesc *trigdesc) +{ + int i; + + for (i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trigger = &trigdesc->triggers[i]; + int trigtype = RI_TRIGGER_NONE; + + if (func_parallel(trigger->tgfoid) != PROPARALLEL_SAFE) + return false; + + /* If the trigger is parallel safe, also look for RI_TRIGGER. */ + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + + /* + * No parallelism if foreign key check trigger is present. This is + * because, while performing foreign key checks, we take KEY SHARE + * lock on primary key table rows which inturn will increment the + * command counter and updates the snapshot. Since we share the + * snapshots at the beginning of the command, we can't allow it to be + * changed later. So, unless we do something special for it, we can't + * allow parallelism in such cases. + */ + if (trigtype == RI_TRIGGER_FK) + return false; + } + + return true; +} + +/* + * CheckExprParallelSafety - Determine parallel safety of volatile expressions + * in default clause of column definition or in where clause and return true if + * they are parallel safe. + */ +static pg_attribute_always_inline bool +CheckExprParallelSafety(CopyState cstate) +{ + if (contain_volatile_functions(cstate->whereClause)) + { + if (max_parallel_hazard((Query *) cstate->whereClause) != PROPARALLEL_SAFE) + return false; + } + + /* + * Check if any of the column has volatile default expression. if yes, and + * they are not parallel safe, then parallelism is not allowed. For + * instance, if there are any serial/bigserial columns for which nextval() + * default expression which is parallel unsafe is associated, parallelism + * should not be allowed. + */ + if (cstate->defexprs != NULL && cstate->num_defaults != 0) + { + int i; + + for (i = 0; i < cstate->num_defaults; i++) + { + bool volatile_expr = contain_volatile_functions((Node *) cstate->defexprs[i]->expr); + + if (volatile_expr && + (max_parallel_hazard((Query *) cstate->defexprs[i]->expr)) != + PROPARALLEL_SAFE) + return false; + } + } + + return true; +} + +/* + * IsParallelCopyAllowed - Check for the cases where parallel copy is not + * applicable. + */ +static pg_attribute_always_inline bool +IsParallelCopyAllowed(CopyState cstate) +{ + /* Parallel copy not allowed for frontend (2.0 protocol) & binary option. */ + if ((cstate->copy_dest == COPY_OLD_FE) || cstate->binary) + return false; + + /* + * Check if copy is into foreign table. We can not allow parallelism in + * this case because each worker needs to establish FDW connection and + * operate in a separate transaction. Unless we have a capability to + * provide two-phase commit protocol, we can not allow parallelism. + * + * Also check if copy is into temporary table. Since parallel workers can + * not access temporary table, parallelism is not allowed. + */ + if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE || + RelationUsesLocalBuffers(cstate->rel)) + return false; + + /* + * If there are volatile default expressions or where clause contain + * volatile expressions, allow parallelism if they are parallel safe, + * otherwise not. + */ + if (!CheckExprParallelSafety(cstate)) + return false; + + /* Check parallel safety of the trigger functions. */ + if (cstate->rel->trigdesc != NULL && + !CheckTrigFunParallelSafety(cstate->rel->trigdesc)) + return false; + + /* + * When transition tables are involved (if after statement triggers are + * present), we collect minimal tuples in the tuple store after processing + * them so that later after statement triggers can access them. Now, if + * we want to enable parallelism for such cases, we instead need to store + * and access tuples from shared tuple store. However, it does not have + * the facility to store tuples in-memory, so we always need to store and + * access from a file which could be costly unless we also have an + * additional way to store minimal tuples in shared memory till work_mem + * and then in shared tuple store. It is possible to do all this to enable + * parallel copy for such cases. Currently, we can disallow parallelism + * for such cases and later allow if required. + * + * When there are BEFORE/AFTER/INSTEAD OF row triggers on the table. We do + * not allow parallelism in such cases because such triggers might query + * the table we are inserting into and act differently if the tuples that + * have already been processed and prepared for insertion are not there. + * Now, if we allow parallelism with such triggers the behaviour would + * depend on if the parallel worker has already inserted or not that + * particular tuples. + */ + if (cstate->rel->trigdesc != NULL && + (cstate->rel->trigdesc->trig_insert_after_statement || + cstate->rel->trigdesc->trig_insert_new_table || + cstate->rel->trigdesc->trig_insert_before_row || + cstate->rel->trigdesc->trig_insert_after_row || + cstate->rel->trigdesc->trig_insert_instead_row)) + return false; + + return true; +} + +/* + * BeginParallelCopy - Start parallel copy tasks. + * + * Get the number of workers required to perform the parallel copy. The data + * structures that are required by the parallel workers will be initialized, the + * size required in DSM will be calculated and the necessary keys will be loaded + * in the DSM. The specified number of workers will then be launched. + */ +ParallelContext * +BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) +{ + ParallelContext *pcxt; + ParallelCopyShmInfo *shared_info_ptr; + Size est_cstateshared; + char *whereClauseStr = NULL; + char *rangeTableStr = NULL; + char *attnameListStr = NULL; + char *notnullListStr = NULL; + char *nullListStr = NULL; + char *convertListStr = NULL; + int parallel_workers = 0; + WalUsage *walusage; + BufferUsage *bufferusage; + ParallelCopyData *pcdata; + MemoryContext oldcontext; + uint32 strsize; + + CheckTargetRelValidity(cstate); + parallel_workers = Min(nworkers, max_worker_processes); + + /* Can't perform copy in parallel */ + if (parallel_workers <= 0) + return NULL; + + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); + MemoryContextSwitchTo(oldcontext); + cstate->pcdata = pcdata; + + /* + * User chosen parallel copy. Determine if the parallel copy is actually + * allowed. If not, go with the non-parallel mode. + */ + if (!IsParallelCopyAllowed(cstate)) + return NULL; + + (void) GetCurrentFullTransactionId(); + (void) GetCurrentCommandId(true); + + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "ParallelCopyMain", + parallel_workers); + Assert(pcxt->nworkers > 0); + + /* + * Estimate size for shared information for PARALLEL_COPY_KEY_SHARED_INFO + */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelCopyShmInfo)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate the size for shared information for PARALLEL_COPY_KEY_CSTATE */ + est_cstateshared = MAXALIGN(sizeof(SerializedParallelCopyState)); + shm_toc_estimate_chunk(&pcxt->estimator, est_cstateshared); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + strsize = EstimateCstateSize(pcxt, cstate, attnamelist, &whereClauseStr, + &rangeTableStr, &attnameListStr, + ¬nullListStr, &nullListStr, + &convertListStr); + + /* + * Estimate space for WalUsage and BufferUsage -- PARALLEL_COPY_WAL_USAGE + * and PARALLEL_COPY_BUFFER_USAGE. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial copy) */ + if (pcxt->seg == NULL) + { + EndParallelCopy(pcxt); + return NULL; + } + + /* Allocate shared memory for PARALLEL_COPY_KEY_SHARED_INFO */ + shared_info_ptr = (ParallelCopyShmInfo *) shm_toc_allocate(pcxt->toc, sizeof(ParallelCopyShmInfo)); + PopulateParallelCopyShmInfo(shared_info_ptr); + + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_SHARED_INFO, shared_info_ptr); + pcdata->pcshared_info = shared_info_ptr; + pcdata->relid = relid; + + SerializeParallelCopyState(pcxt, cstate, strsize, whereClauseStr, + rangeTableStr, attnameListStr, notnullListStr, + nullListStr, convertListStr); + + /* + * Allocate space for each worker's WalUsage and BufferUsage; no need to + * initialize. + */ + walusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_COPY_WAL_USAGE, walusage); + pcdata->walusage = walusage; + bufferusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_COPY_BUFFER_USAGE, bufferusage); + pcdata->bufferusage = bufferusage; + + LaunchParallelWorkers(pcxt); + if (pcxt->nworkers_launched == 0) + { + EndParallelCopy(pcxt); + return NULL; + } + + /* + * Caller needs to wait for all launched workers when we return. Make + * sure that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); + + pcdata->is_leader = true; + cstate->is_parallel = true; + return pcxt; +} + +/* + * EndParallelCopy - End the parallel copy tasks. + */ +pg_attribute_always_inline void +EndParallelCopy(ParallelContext *pcxt) +{ + Assert(!IsParallelWorker()); + + DestroyParallelContext(pcxt); + ExitParallelMode(); +} + +/* + * InitializeParallelCopyInfo - Initialize parallel worker. + */ +static void +InitializeParallelCopyInfo(CopyState cstate, List *attnamelist) +{ + uint32 count; + ParallelCopyData *pcdata = cstate->pcdata; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + PopulateCommonCstateInfo(cstate, tup_desc, attnamelist); + + /* Initialize state variables. */ + cstate->reached_eof = false; + cstate->eol_type = EOL_UNKNOWN; + cstate->cur_relname = RelationGetRelationName(cstate->rel); + cstate->cur_lineno = 0; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + + /* Set up variables to avoid per-attribute overhead. */ + initStringInfo(&cstate->attribute_buf); + + initStringInfo(&cstate->line_buf); + for (count = 0; count < WORKER_CHUNK_COUNT; count++) + initStringInfo(&pcdata->worker_line_buf[count].line_buf); + + cstate->line_buf_converted = false; + cstate->raw_buf = NULL; + cstate->raw_buf_index = cstate->raw_buf_len = 0; + + PopulateCstateCatalogInfo(cstate); + + /* Create workspace for CopyReadAttributes results. */ + if (!cstate->binary) + { + AttrNumber attr_count = list_length(cstate->attnumlist); + + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); + } +} + +/* + * CacheLineInfo - Cache the line information to local memory. + */ +static bool +CacheLineInfo(CopyState cstate, uint32 buff_count) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyData *pcdata = cstate->pcdata; + uint32 write_pos; + ParallelCopyDataBlock *data_blk_ptr; + ParallelCopyLineBoundary *lineInfo; + uint32 offset; + int dataSize; + int copiedSize = 0; + + resetStringInfo(&pcdata->worker_line_buf[buff_count].line_buf); + write_pos = GetLinePosition(cstate); + if (-1 == write_pos) + return true; + + /* Get the current line information. */ + lineInfo = &pcshared_info->line_boundaries.ring[write_pos]; + if (pg_atomic_read_u32(&lineInfo->line_size) == 0) + goto empty_data_line_update; + + /* Get the block information. */ + data_blk_ptr = &pcshared_info->data_blocks[lineInfo->first_block]; + + /* Get the offset information from where the data must be copied. */ + offset = lineInfo->start_offset; + pcdata->worker_line_buf[buff_count].cur_lineno = lineInfo->cur_lineno; + + elog(DEBUG1, "[Worker] Processing - line position:%d, block:%d, unprocessed lines:%d, offset:%d, line size:%d", + write_pos, lineInfo->first_block, + pg_atomic_read_u32(&data_blk_ptr->unprocessed_line_parts), + offset, pg_atomic_read_u32(&lineInfo->line_size)); + + for (;;) + { + uint8 skip_bytes = data_blk_ptr->skip_bytes; + + /* + * There is a possibility that the loop embedded at the bottom of the + * current loop has come out because data_blk_ptr->curr_blk_completed + * is set, but dataSize read might be an old value, if + * data_blk_ptr->curr_blk_completed and the line is completed, + * line_size will be set. Read the line_size again to be sure if it is + * completed or partial block. + */ + dataSize = pg_atomic_read_u32(&lineInfo->line_size); + if (dataSize) + { + int remainingSize = dataSize - copiedSize; + + if (!remainingSize) + break; + + /* Whole line is in current block. */ + if (remainingSize + offset + skip_bytes < DATA_BLOCK_SIZE) + { + appendBinaryStringInfo(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + remainingSize); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, + 1); + break; + } + else + { + /* Line is spread across the blocks. */ + uint32 lineInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset; + + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + lineInCurrentBlock); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, 1); + copiedSize += lineInCurrentBlock; + while (copiedSize < dataSize) + { + uint32 currentBlockCopySize; + ParallelCopyDataBlock *currBlkPtr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; + + skip_bytes = currBlkPtr->skip_bytes; + + /* + * If complete data is present in current block use + * dataSize - copiedSize, or copy the whole block from + * current block. + */ + currentBlockCopySize = Min(dataSize - copiedSize, DATA_BLOCK_SIZE - skip_bytes); + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &currBlkPtr->data[0], + currentBlockCopySize); + pg_atomic_sub_fetch_u32(&currBlkPtr->unprocessed_line_parts, 1); + copiedSize += currentBlockCopySize; + data_blk_ptr = currBlkPtr; + } + + break; + } + } + else + { + /* Copy this complete block from the current offset. */ + uint32 lineInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset; + + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + lineInCurrentBlock); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, 1); + copiedSize += lineInCurrentBlock; + + /* + * Reset the offset. For the first copy, copy from the offset. For + * the subsequent copy the complete block. + */ + offset = 0; + + /* Set data_blk_ptr to the following block. */ + data_blk_ptr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; + } + + for (;;) + { + /* Get the size of this line */ + dataSize = pg_atomic_read_u32(&lineInfo->line_size); + + /* + * If the data is present in current block lineInfo->line_size + * will be updated. If the data is spread across the blocks either + * of lineInfo->line_size or data_blk_ptr->curr_blk_completed can + * be updated. lineInfo->line_size will be updated if the complete + * read is finished. data_blk_ptr->curr_blk_completed will be + * updated if processing of current block is finished and data + * processing is not finished. + */ + if (data_blk_ptr->curr_blk_completed || (dataSize != -1)) + break; + + COPY_WAIT_TO_PROCESS() + } + } + +empty_data_line_update: + elog(DEBUG1, "[Worker] Completed processing line:%d", write_pos); + pg_atomic_write_u32(&lineInfo->line_state, LINE_WORKER_PROCESSED); + pg_atomic_write_u32(&lineInfo->line_size, -1); + pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); + return false; +} + +/* + * GetWorkerLine - Returns a line for worker to process. + */ +bool +GetWorkerLine(CopyState cstate) +{ + uint32 buff_count; + ParallelCopyData *pcdata = cstate->pcdata; + + /* + * Copy the line data to line_buf and release the line position so that + * the worker can continue loading data. + */ + if (pcdata->worker_line_buf_pos < pcdata->worker_line_buf_count) + goto return_line; + + pcdata->worker_line_buf_pos = 0; + pcdata->worker_line_buf_count = 0; + + for (buff_count = 0; buff_count < WORKER_CHUNK_COUNT; buff_count++) + { + bool result = CacheLineInfo(cstate, buff_count); + if (result) + break; + + pcdata->worker_line_buf_count++; + } + + if (pcdata->worker_line_buf_count) + goto return_line; + else + resetStringInfo(&cstate->line_buf); + + return true; + +return_line: + cstate->line_buf = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].line_buf; + cstate->cur_lineno = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].cur_lineno; + cstate->line_buf_valid = true; + + /* Mark that encoding conversion hasn't occurred yet. */ + cstate->line_buf_converted = false; + ConvertToServerEncoding(cstate); + pcdata->worker_line_buf_pos++; + return false; +} + +/* + * ParallelCopyMain - Parallel copy worker's code. + * + * Where clause handling, convert tuple to columns, add default null values for + * the missing columns that are not present in that record. Find the partition + * if it is partitioned table, invoke before row insert Triggers, handle + * constraints and insert the tuples. + */ +void +ParallelCopyMain(dsm_segment *seg, shm_toc *toc) +{ + CopyState cstate; + ParallelCopyData *pcdata; + ParallelCopyShmInfo *pcshared_info; + Relation rel = NULL; + MemoryContext oldcontext; + List *attlist = NIL; + WalUsage *walusage; + BufferUsage *bufferusage; + + /* Allocate workspace and zero all fields. */ + cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); + + /* + * We allocate everything used by a cstate in a new memory context. This + * avoids memory leaks during repeated use of COPY in a query. + */ + cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY", + ALLOCSET_DEFAULT_SIZES); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); + cstate->pcdata = pcdata; + pcdata->is_leader = false; + pcdata->worker_processed_pos = -1; + cstate->is_parallel = true; + pcshared_info = (ParallelCopyShmInfo *) shm_toc_lookup(toc, PARALLEL_COPY_KEY_SHARED_INFO, false); + + ereport(DEBUG1, (errmsg("Starting parallel copy worker"))); + + pcdata->pcshared_info = pcshared_info; + RestoreParallelCopyState(toc, cstate, &attlist); + + /* Open and lock the relation, using the appropriate lock type. */ + rel = table_open(cstate->pcdata->relid, RowExclusiveLock); + cstate->rel = rel; + InitializeParallelCopyInfo(cstate, attlist); + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + CopyFrom(cstate); + + if (rel != NULL) + table_close(rel, RowExclusiveLock); + + /* Report WAL/buffer usage during parallel execution */ + bufferusage = shm_toc_lookup(toc, PARALLEL_COPY_BUFFER_USAGE, false); + walusage = shm_toc_lookup(toc, PARALLEL_COPY_WAL_USAGE, false); + InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + &walusage[ParallelWorkerNumber]); + + MemoryContextSwitchTo(oldcontext); + pfree(cstate); + return; +} + +/* + * UpdateSharedLineInfo - Update the line information. + */ +uint32 +UpdateSharedLineInfo(CopyState cstate, uint32 blk_pos, uint32 offset, + uint32 line_size, uint32 line_state, uint32 blk_line_pos) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyLineBoundaries *lineBoundaryPtr = &pcshared_info->line_boundaries; + ParallelCopyLineBoundary *lineInfo; + uint32 line_pos; + + /* blk_line_pos will be valid in case line_pos was blocked earlier. */ + if (blk_line_pos == -1) + { + line_pos = lineBoundaryPtr->pos; + + /* Update the line information for the worker to pick and process. */ + lineInfo = &lineBoundaryPtr->ring[line_pos]; + while (pg_atomic_read_u32(&lineInfo->line_size) != -1) + COPY_WAIT_TO_PROCESS() + + lineInfo->first_block = blk_pos; + lineInfo->start_offset = offset; + lineInfo->cur_lineno = cstate->cur_lineno; + lineBoundaryPtr->pos = (lineBoundaryPtr->pos + 1) % RINGSIZE; + } + else + { + line_pos = blk_line_pos; + lineInfo = &lineBoundaryPtr->ring[line_pos]; + } + + if (line_state == LINE_LEADER_POPULATED) + { + elog(DEBUG1, "[Leader] Added line with block:%d, offset:%d, line position:%d, line size:%d", + lineInfo->first_block, lineInfo->start_offset, line_pos, + pg_atomic_read_u32(&lineInfo->line_size)); + pcshared_info->populated++; + } + else + elog(DEBUG1, "[Leader] Adding - block:%d, offset:%d, line position:%d", + lineInfo->first_block, lineInfo->start_offset, line_pos); + + pg_atomic_write_u32(&lineInfo->line_size, line_size); + pg_atomic_write_u32(&lineInfo->line_state, line_state); + + return line_pos; +} + +/* + * ParallelCopyFrom - parallel copy leader's functionality. + * + * Leader executes the before statement for before statement trigger, if before + * statement trigger is present. It will read the table data from the file and + * copy the contents to DSM data blocks. It will then read the input contents + * from the DSM data block and identify the records based on line breaks. This + * information is called line or a record that need to be inserted into a + * relation. The line information will be stored in ParallelCopyLineBoundary DSM + * data structure. Workers will then process this information and insert the + * data in to table. It will repeat this process until the all data is read from + * the file and all the DSM data blocks are processed. While processing if + * leader identifies that DSM Data blocks or DSM ParallelCopyLineBoundary data + * structures is full, leader will wait till the worker frees up some entries + * and repeat the process. It will wait till all the lines populated are + * processed by the workers and exits. + */ +void +ParallelCopyFrom(CopyState cstate) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + + ereport(DEBUG1, (errmsg("Running parallel copy leader"))); + + /* raw_buf is not used in parallel copy, instead data blocks are used. */ + pfree(cstate->raw_buf); + cstate->raw_buf = NULL; + + /* Execute the before statement triggers from the leader */ + ExecBeforeStmtTrigger(cstate); + + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } + } + + for (;;) + { + bool done; + + cstate->cur_lineno++; + + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + + pcshared_info->is_read_in_progress = false; + cstate->cur_lineno = 0; +} + +/* + * GetLinePosition - Return the line position once the leader has populated the + * data. + */ +uint32 +GetLinePosition(CopyState cstate) +{ + ParallelCopyData *pcdata = cstate->pcdata; + ParallelCopyShmInfo *pcshared_info = pcdata->pcshared_info; + uint32 previous_pos = pcdata->worker_processed_pos; + uint32 write_pos = (previous_pos == -1) ? 0 : (previous_pos + 1) % RINGSIZE; + + for (;;) + { + int dataSize; + bool is_read_in_progress = pcshared_info->is_read_in_progress; + ParallelCopyLineBoundary *lineInfo; + ParallelCopyDataBlock *data_blk_ptr; + uint32 line_state = LINE_LEADER_POPULATED; + ParallelCopyLineState curr_line_state; + + CHECK_FOR_INTERRUPTS(); + + /* File read completed & no elements to process. */ + if (!is_read_in_progress && + (pcshared_info->populated == + pg_atomic_read_u64(&pcshared_info->total_worker_processed))) + { + write_pos = -1; + break; + } + + /* Get the current line information. */ + lineInfo = &pcshared_info->line_boundaries.ring[write_pos]; + curr_line_state = pg_atomic_read_u32(&lineInfo->line_state); + if ((write_pos % WORKER_CHUNK_COUNT == 0) && + (curr_line_state == LINE_WORKER_PROCESSED || + curr_line_state == LINE_WORKER_PROCESSING)) + { + pcdata->worker_processed_pos = write_pos; + write_pos = (write_pos + WORKER_CHUNK_COUNT) % RINGSIZE; + continue; + } + + /* Get the size of this line. */ + dataSize = pg_atomic_read_u32(&lineInfo->line_size); + + if (dataSize != 0) /* If not an empty line. */ + { + /* Get the block information. */ + data_blk_ptr = &pcshared_info->data_blocks[lineInfo->first_block]; + + if (!data_blk_ptr->curr_blk_completed && (dataSize == -1)) + { + /* Wait till the current line or block is added. */ + COPY_WAIT_TO_PROCESS() + continue; + } + } + + /* Make sure that no worker has consumed this element. */ + if (pg_atomic_compare_exchange_u32(&lineInfo->line_state, + &line_state, LINE_WORKER_PROCESSING)) + break; + } + + pcdata->worker_processed_pos = write_pos; + return write_pos; +} + +/* + * GetFreeCopyBlock - Get a free block for data to be copied. + */ +static pg_attribute_always_inline uint32 +GetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) +{ + int count = 0; + uint32 last_free_block = pcshared_info->cur_block_pos; + uint32 block_pos = (last_free_block != -1) ? ((last_free_block + 1) % MAX_BLOCKS_COUNT) : 0; + + /* + * Get a new block for copying data, don't check current block, current + * block will have some unprocessed data. + */ + while (count < (MAX_BLOCKS_COUNT - 1)) + { + ParallelCopyDataBlock *dataBlkPtr = &pcshared_info->data_blocks[block_pos]; + uint32 unprocessed_line_parts = pg_atomic_read_u32(&dataBlkPtr->unprocessed_line_parts); + + if (unprocessed_line_parts == 0) + { + dataBlkPtr->curr_blk_completed = false; + dataBlkPtr->skip_bytes = 0; + dataBlkPtr->following_block = -1; + pcshared_info->cur_block_pos = block_pos; + MemSet(&dataBlkPtr->data[0], 0, DATA_BLOCK_SIZE); + return block_pos; + } + + block_pos = (block_pos + 1) % MAX_BLOCKS_COUNT; + count++; + } + + return -1; +} + +/* + * WaitGetFreeCopyBlock - If there are no blocks available, wait and get a block + * for copying data. + */ +uint32 +WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info) +{ + uint32 new_free_pos = -1; + + for (;;) + { + new_free_pos = GetFreeCopyBlock(pcshared_info); + if (new_free_pos != -1) /* We have got one block, break now. */ + break; + + COPY_WAIT_TO_PROCESS() + } + + return new_free_pos; +} + +/* + * SetRawBufForLoad - Set raw_buf to the shared memory where the file data must + * be read. + */ +void +SetRawBufForLoad(CopyState cstate, uint32 line_size, uint32 copy_buf_len, + uint32 raw_buf_ptr, char **copy_raw_buf) +{ + ParallelCopyShmInfo *pcshared_info; + uint32 cur_block_pos; + uint32 next_block_pos; + ParallelCopyDataBlock *cur_data_blk_ptr = NULL; + ParallelCopyDataBlock *next_data_blk_ptr = NULL; + + if (!IsParallelCopy()) + return; + + pcshared_info = cstate->pcdata->pcshared_info; + cur_block_pos = pcshared_info->cur_block_pos; + cur_data_blk_ptr = (cstate->raw_buf) ? &pcshared_info->data_blocks[cur_block_pos] : NULL; + next_block_pos = WaitGetFreeCopyBlock(pcshared_info); + next_data_blk_ptr = &pcshared_info->data_blocks[next_block_pos]; + + /* set raw_buf to the data block in shared memory */ + cstate->raw_buf = next_data_blk_ptr->data; + *copy_raw_buf = cstate->raw_buf; + if (cur_data_blk_ptr && line_size) + { + /* + * Mark the previous block as completed, worker can start copying this + * data. + */ + cur_data_blk_ptr->following_block = next_block_pos; + pg_atomic_add_fetch_u32(&cur_data_blk_ptr->unprocessed_line_parts, 1); + cur_data_blk_ptr->skip_bytes = copy_buf_len - raw_buf_ptr; + cur_data_blk_ptr->curr_blk_completed = true; + } +} + +/* + * EndLineParallelCopy - Update the line information in shared memory. + */ +void +EndLineParallelCopy(CopyState cstate, uint32 line_pos, uint32 line_size, + uint32 raw_buf_ptr) +{ + uint8 new_line_size; + + if (!IsParallelCopy()) + return; + + if (!IsHeaderLine()) + { + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + + SET_NEWLINE_SIZE() + if (line_size) + { + /* + * If the new_line_size > raw_buf_ptr, then the new block has only + * new line char content. The unprocessed count should not be + * increased in this case. + */ + if (raw_buf_ptr > new_line_size) + { + uint32 cur_block_pos = pcshared_info->cur_block_pos; + ParallelCopyDataBlock *curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; + + pg_atomic_add_fetch_u32(&curr_data_blk_ptr->unprocessed_line_parts, 1); + } + + /* + * Update line size & line state, other members are already + * updated. + */ + (void) UpdateSharedLineInfo(cstate, -1, -1, line_size, + LINE_LEADER_POPULATED, line_pos); + } + else if (new_line_size) + /* This means only new line char, empty record should be inserted. */ + (void) UpdateSharedLineInfo(cstate, -1, -1, 0, + LINE_LEADER_POPULATED, -1); + } +} + +/* + * ExecBeforeStmtTrigger - Execute the before statement trigger, this will be + * executed for parallel copy by the leader process. + */ +void +ExecBeforeStmtTrigger(CopyState cstate) +{ + EState *estate = CreateExecutorState(); + ResultRelInfo *resultRelInfo; + + Assert(IsLeader()); + + /* + * We need a ResultRelInfo so we can use the regular executor's + * index-entry-making machinery. (There used to be a huge amount of code + * here that basically duplicated execUtils.c ...) + */ + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, + cstate->rel, + 1, /* must match rel's position in range_table */ + NULL, + 0); + + /* Verify the named relation is a valid target for INSERT */ + CheckValidResultRel(resultRelInfo, CMD_INSERT); + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + ExecInitRangeTable(estate, cstate->range_table); + + /* + * Check BEFORE STATEMENT insertion triggers. It's debatable whether we + * should do this for COPY, since it's not really an "INSERT" statement as + * such. However, executing these triggers maintains consistency with the + * EACH ROW triggers that we already fire on COPY. + */ + ExecBSInsertTriggers(estate, resultRelInfo); + + /* Close any trigger target relations */ + ExecCleanUpTriggerState(estate); + + FreeExecutorState(estate); +} diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 24c7b41..cf00256 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -2353,7 +2353,7 @@ psql_completion(const char *text, int start, int end) /* Complete COPY FROM|TO filename WITH ( */ else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(")) COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL", - "HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE", + "HEADER", "PARALLEL", "QUOTE", "ESCAPE", "FORCE_QUOTE", "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING"); /* Complete COPY FROM|TO filename WITH (FORMAT */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index df1b43a..96295bc 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -385,6 +385,7 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); +extern void SetCurrentCommandIdUsedForWorker(void); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index cd2d56e..a9fe950 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -51,6 +51,31 @@ */ #define WORKER_CHUNK_COUNT 64 +#define IsParallelCopy() (cstate->is_parallel) +#define IsLeader() (cstate->pcdata->is_leader) +#define IsWorker() (IsParallelCopy() && !IsLeader()) +#define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) + +/* + * Increment the lines processed. + */ +#define INCREMENTPROCESSED(processed) \ +{ \ + if (!IsParallelCopy()) \ + processed++; \ + else \ + pg_atomic_add_fetch_u64(&cstate->pcdata->pcshared_info->processed, 1); \ +} + +/* + * Get the lines processed. + */ +#define RETURNPROCESSED(processed) \ +if (!IsParallelCopy()) \ + return processed; \ +else \ + return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); + /* * Represents the different source/dest cases we need to worry about at * the bottom level @@ -75,6 +100,28 @@ typedef enum EolType } EolType; /* + * State of the line. + */ +typedef enum ParallelCopyLineState +{ + LINE_INIT, /* initial state of line */ + LINE_LEADER_POPULATING, /* leader processing line */ + LINE_LEADER_POPULATED, /* leader completed populating line */ + LINE_WORKER_PROCESSING, /* worker processing line */ + LINE_WORKER_PROCESSED /* worker completed processing line */ +} ParallelCopyLineState; + +/* + * Represents the heap insert method to be used during COPY FROM. + */ +typedef enum CopyInsertMethod +{ + CIM_SINGLE, /* use table_tuple_insert or fdw routine */ + CIM_MULTI, /* always use table_multi_insert */ + CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ +} CopyInsertMethod; + +/* * Copy data block information. * * These data blocks are created in DSM. Data read from file will be copied in @@ -194,8 +241,6 @@ typedef struct ParallelCopyShmInfo uint64 populated; /* lines populated by leader */ uint32 cur_block_pos; /* current data block */ ParallelCopyDataBlock data_blocks[MAX_BLOCKS_COUNT]; /* data block array */ - FullTransactionId full_transaction_id; /* xid for copy from statement */ - CommandId mycid; /* command id */ ParallelCopyLineBoundaries line_boundaries; /* line array */ } ParallelCopyShmInfo; @@ -242,12 +287,12 @@ typedef struct ParallelCopyData ParallelCopyShmInfo *pcshared_info; /* common info in shared memory */ bool is_leader; + /* line position which worker is processing */ + uint32 worker_processed_pos; + WalUsage *walusage; BufferUsage *bufferusage; - /* line position which worker is processing */ - uint32 worker_processed_pos; - /* * Local line_buf array, workers will copy it here and release the lines * for the leader to continue. @@ -423,9 +468,23 @@ extern DestReceiver *CreateCopyDestReceiver(void); extern void PopulateCommonCstateInfo(CopyState cstate, TupleDesc tup_desc, List *attnamelist); +extern void ConvertToServerEncoding(CopyState cstate); extern void ParallelCopyMain(dsm_segment *seg, shm_toc *toc); extern ParallelContext *BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid); extern void ParallelCopyFrom(CopyState cstate); extern void EndParallelCopy(ParallelContext *pcxt); +extern void ExecBeforeStmtTrigger(CopyState cstate); +extern void CheckTargetRelValidity(CopyState cstate); +extern void PopulateCstateCatalogInfo(CopyState cstate); +extern uint32 GetLinePosition(CopyState cstate); +extern bool GetWorkerLine(CopyState cstate); +extern bool CopyReadLine(CopyState cstate); +extern uint32 WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info); +extern void SetRawBufForLoad(CopyState cstate, uint32 line_size, uint32 copy_buf_len, + uint32 raw_buf_ptr, char **copy_raw_buf); +extern uint32 UpdateSharedLineInfo(CopyState cstate, uint32 blk_pos, uint32 offset, + uint32 line_size, uint32 line_state, uint32 blk_line_pos); +extern void EndLineParallelCopy(CopyState cstate, uint32 line_pos, uint32 line_size, + uint32 raw_buf_ptr); #endif /* COPY_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index f5b818b..a198bf0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1707,6 +1707,7 @@ ParallelCopyLineBoundary ParallelCopyData ParallelCopyDataBlock ParallelCopyLineBuf +ParallelCopyLineState ParallelCopyShmInfo ParallelExecutorInfo ParallelHashGrowth -- 1.8.3.1