From 4a1febf53529c4ea29312660c7a7b633f829c342 Mon Sep 17 00:00:00 2001 From: Vignesh C , Bharath Rupireddy Date: Wed, 3 Jun 2020 09:29:58 +0530 Subject: [PATCH] Allow copy from command to process data from file/STDIN contents to a table in parallel. This feature allows the copy from to leverage multiple CPUs in order to copy data from file/STDIN to a table. This adds a PARALLEL option to COPY FROM command where the user can specify the number of workers that can be used to perform the COPY FROM command. Specifying zero as number of workers will disable parallelism. --- doc/src/sgml/ref/copy.sgml | 16 + src/backend/access/heap/heapam.c | 13 - src/backend/access/transam/parallel.c | 4 + src/backend/access/transam/xact.c | 13 + src/backend/commands/copy.c | 2729 ++++++++++++++++++++++----- src/backend/optimizer/util/clauses.c | 2 +- src/backend/replication/logical/tablesync.c | 2 +- src/include/access/xact.h | 1 + src/include/commands/copy.h | 4 + src/tools/pgindent/typedefs.list | 10 + 10 files changed, 2329 insertions(+), 465 deletions(-) diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 18189ab..95d349d 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -37,6 +37,7 @@ COPY { table_name [ ( delimiter_character' NULL 'null_string' HEADER [ boolean ] + PARALLEL integer QUOTE 'quote_character' ESCAPE 'escape_character' FORCE_QUOTE { ( column_name [, ...] ) | * } @@ -275,6 +276,21 @@ COPY { table_name [ ( + PARALLEL + + + Perform COPY FROM in parallel using integer background workers. Please + note that it is not guaranteed that the number of parallel workers + specified in integer will + be used during execution. It is possible for a copy to run with fewer + workers than specified, or even with no workers at all. This option is + allowed only in COPY FROM. + + + + + QUOTE diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 94eb37d..6991b9f 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2012,19 +2012,6 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options) { - /* - * Parallel operations are required to be strictly read-only in a parallel - * worker. Parallel inserts are not safe even in the leader in the - * general case, because group locking means that heavyweight locks for - * relation extension or GIN page locks will not conflict between members - * of a lock group, but we don't prohibit that case here because there are - * useful special cases that we can safely allow, such as CREATE TABLE AS. - */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot insert tuples in a parallel worker"))); - tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 14a8690..09e7a19 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -25,6 +25,7 @@ #include "catalog/pg_enum.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/copy.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -145,6 +146,9 @@ static const struct }, { "parallel_vacuum_main", parallel_vacuum_main + }, + { + "ParallelCopyMain", ParallelCopyMain } }; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index cd30b62..d43902c 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -502,6 +502,19 @@ GetCurrentFullTransactionIdIfAny(void) } /* + * AssignFullTransactionIdForWorker + * + * For parallel copy, all the workers must use the same transaction id. + */ +void AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId) +{ + TransactionState s = CurrentTransactionState; + + Assert((IsInParallelMode() || IsParallelWorker())); + s->fullTransactionId = fullTransactionId; +} + +/* * MarkCurrentTransactionIdLoggedIfAny * * Remember that the current xid - if it is assigned - now has been wal logged. diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 6d53dc4..2a49255 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -26,6 +26,7 @@ #include "access/xlog.h" #include "catalog/dependency.h" #include "catalog/pg_authid.h" +#include "catalog/pg_proc_d.h" #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" @@ -40,11 +41,13 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "nodes/makefuncs.h" +#include "optimizer/clauses.h" #include "optimizer/optimizer.h" #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_relation.h" +#include "pgstat.h" #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" @@ -96,6 +99,154 @@ typedef enum CopyInsertMethod } CopyInsertMethod; /* + * State of the chunk. + */ +typedef enum ChunkState +{ + CHUNK_INIT, /* initial state of chunk */ + CHUNK_LEADER_POPULATING, /* leader processing chunk */ + CHUNK_LEADER_POPULATED, /* leader completed populating chunk */ + CHUNK_WORKER_PROCESSING, /* worker processing chunk */ + CHUNK_WORKER_PROCESSED /* worker completed processing chunk */ +}ChunkState; + +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + +#define DATA_BLOCK_SIZE RAW_BUF_SIZE +#define RINGSIZE (10 * 1000) +#define MAX_BLOCKS_COUNT 1000 +#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */ + +#define IsParallelCopy() (cstate->is_parallel) +#define IsLeader() (cstate->pcdata->is_leader) +#define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) + +/* + * Copy data block information. + */ +typedef struct CopyDataBlock +{ + /* The number of unprocessed chunks in the current block. */ + pg_atomic_uint32 unprocessed_chunk_parts; + + /* + * If the current chunk data is continued into another block, + * following_block will have the position where the remaining data need to + * be read. + */ + uint32 following_block; + + /* + * This flag will be set, when the leader finds out this block can be read + * safely by the worker. This helps the worker to start processing the chunk + * early where the chunk will be spread across many blocks and the worker + * need not wait for the complete chunk to be processed. + */ + bool curr_blk_completed; + char data[DATA_BLOCK_SIZE + 1]; /* data read from file */ +}CopyDataBlock; + +/* + * Individual Chunk information. + */ +typedef struct ChunkBoundary +{ + /* Position of the first block in data_blocks array. */ + uint32 first_block; + uint32 start_offset; /* start offset of the chunk */ + + /* + * Size of the current chunk -1 means chunk is yet to be filled completely, + * 0 means empty chunk, >0 means chunk filled with chunk size data. + */ + pg_atomic_uint32 chunk_size; + pg_atomic_uint32 chunk_state; /* chunk state */ + uint64 cur_lineno; /* line number for error messages */ +}ChunkBoundary; + +/* + * Array of the chunk. + */ +typedef struct ChunkBoundaries +{ + /* Position for the leader to populate a chunk. */ + uint32 leader_pos; + + /* Data read from the file/stdin by the leader process. */ + ChunkBoundary ring[RINGSIZE]; +}ChunkBoundaries; + +/* + * Shared information among parallel copy workers. This will be allocated in the + * DSM segment. + */ +typedef struct ShmCopyInfo +{ + bool is_read_in_progress; /* file read status */ + + /* + * Actual Chunks inserted by worker (some records will be filtered based on + * where condition). + */ + pg_atomic_uint64 processed; + pg_atomic_uint64 total_worker_processed; /* total processed records by the workers */ + uint64 populated; /* Chunks populated by leader */ + uint32 cur_block_pos; /* current data block */ + CopyDataBlock data_blocks[MAX_BLOCKS_COUNT]; /* data block array */ + FullTransactionId full_transaction_id; /* xid for copy from statement */ + CommandId mycid; /* command id */ + ChunkBoundaries chunk_boundaries; /* chunk array */ +} ShmCopyInfo; + +/* + * This structure maintains the state of the buffer information. + */ +typedef struct CopyBufferState +{ + char *copy_raw_buf; + int raw_buf_ptr; /* current offset */ + int copy_buf_len; /* total size available */ + + /* For parallel copy */ + CopyDataBlock *data_blk_ptr; + CopyDataBlock *curr_data_blk_ptr; + uint32 chunk_size; + bool block_switched; +}CopyBufferState; + +/* + * Parallel copy line buffer information. + */ +typedef struct ParallelCopyLineBuf +{ + StringInfoData line_buf; + uint64 cur_lineno; /* line number for error messages */ +}ParallelCopyLineBuf; + +/* + * Parallel copy data information. + */ +typedef struct ParallelCopyData +{ + Oid relid; /* relation id of the table */ + ShmCopyInfo *pcshared_info; /* common info in shared memory */ + bool is_leader; + + /* chunk position which worker is processing */ + uint32 worker_processed_pos; + + /* + * Local line_buf array, workers will copy it here and release the chunks + * for the leader to continue. + */ + ParallelCopyLineBuf worker_line_buf[WORKER_CHUNK_COUNT]; + uint32 worker_line_buf_count; /* Number of lines */ + + /* Current position in worker_line_buf */ + uint32 worker_line_buf_pos; +}ParallelCopyData; + +/* * This struct contains all the state variables used throughout a COPY * operation. For simplicity, we use the same struct for all variants of COPY, * even though some fields are used in only some cases. @@ -219,12 +370,61 @@ typedef struct CopyStateData * converts it. Note: we guarantee that there is a \0 at * raw_buf[raw_buf_len]. */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ + int nworkers; + bool is_parallel; + ParallelCopyData *pcdata; } CopyStateData; +/* + * Common information that need to be copied to shared memory. + */ +typedef struct CopyWorkerCommonData +{ + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + bool csv_mode; /* Comma Separated Value format? */ + bool header_line; /* CSV header line? */ + int null_print_len; /* length of same */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool convert_selectively; /* do selective binary conversion? */ + + /* Working state for COPY FROM */ + AttrNumber num_defaults; + Oid relid; +}CopyWorkerCommonData; + +/* List information */ +typedef struct ListInfo +{ + int count; /* count of attributes */ + + /* string info in the form info followed by info1, info2... infon */ + char info[1]; +} ListInfo; + +/* + * List of internal parallel copy function pointers. + */ +static const struct +{ + char *fn_name; + copy_data_source_cb fn_addr; +} InternalParallelCopyFuncPtrs[] = + +{ + { + "copy_read_data", copy_read_data + }, +}; + /* DestReceiver for COPY (query) TO */ typedef struct { @@ -254,6 +454,23 @@ typedef struct /* Trim the list of buffers back down to this number after flushing */ #define MAX_PARTITION_BUFFERS 32 +/* DSM keys for parallel copy. */ +#define PARALLEL_COPY_KEY_SHARED_INFO 1 +#define PARALLEL_COPY_KEY_CSTATE 2 +#define PARALLEL_COPY_KEY_NULL_PRINT 3 +#define PARALLEL_COPY_KEY_NULL_PRINT_CLIENT 4 +#define PARALLEL_COPY_KEY_DELIM 5 +#define PARALLEL_COPY_KEY_QUOTE 6 +#define PARALLEL_COPY_KEY_ESCAPE 7 +#define PARALLEL_COPY_KEY_ATTNAME_LIST 8 +#define PARALLEL_COPY_KEY_FORCE_QUOTE_LIST 9 +#define PARALLEL_COPY_KEY_NOT_NULL_LIST 10 +#define PARALLEL_COPY_KEY_NULL_LIST 11 +#define PARALLEL_COPY_KEY_CONVERT_LIST 12 +#define PARALLEL_COPY_KEY_DATASOURCE_CB 13 +#define PARALLEL_COPY_KEY_WHERE_CLAUSE_STR 14 +#define PARALLEL_COPY_KEY_RANGE_TABLE 15 + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { @@ -295,104 +512,1484 @@ typedef struct CopyMultiInsertInfo */ /* - * This keeps the character read at the top of the loop in the buffer - * even if there is more than one read-ahead. + * This keeps the character read at the top of the loop in the buffer + * even if there is more than one read-ahead. + */ +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \ +if (1) \ +{ \ + if (copy_buff_state.raw_buf_ptr + (extralen) >= copy_buff_state.copy_buf_len && !hit_eof) \ + { \ + if (IsParallelCopy()) \ + { \ + copy_buff_state.chunk_size = prev_chunk_size; /* update previous chunk size */ \ + if (copy_buff_state.block_switched) \ + { \ + pg_atomic_sub_fetch_u32(©_buff_state.data_blk_ptr->unprocessed_chunk_parts, 1); \ + copy_buff_state.copy_buf_len = prev_copy_buf_len; \ + } \ + } \ + copy_buff_state.raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \ + need_data = true; \ + continue; \ + } \ +} else ((void) 0) + +/* This consumes the remainder of the buffer and breaks */ +#define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \ +if (1) \ +{ \ + if (copy_buff_state.raw_buf_ptr + (extralen) >= copy_buff_state.copy_buf_len && hit_eof) \ + { \ + if (extralen) \ + copy_buff_state.raw_buf_ptr = copy_buff_state.copy_buf_len; /* consume the partial character */ \ + /* backslash just before EOF, treat as data char */ \ + result = true; \ + break; \ + } \ +} else ((void) 0) + +/* + * Transfer any approved data to line_buf; must do this to be sure + * there is some room in raw_buf. + */ +#define REFILL_LINEBUF \ +if (1) \ +{ \ + if (copy_buff_state.raw_buf_ptr > cstate->raw_buf_index && !IsParallelCopy()) \ + appendBinaryStringInfo(&cstate->line_buf, \ + cstate->raw_buf + cstate->raw_buf_index, \ + copy_buff_state.raw_buf_ptr - cstate->raw_buf_index); \ + cstate->raw_buf_index = copy_buff_state.raw_buf_ptr; \ +} else ((void) 0) + +/* Undo any read-ahead and jump out of the block. */ +#define NO_END_OF_COPY_GOTO \ +if (1) \ +{ \ + if (!IsParallelCopy()) \ + copy_buff_state.raw_buf_ptr = prev_raw_ptr + 1; \ + else \ + { \ + copy_buff_state.chunk_size = prev_chunk_size + 1; \ + if (copy_buff_state.block_switched) \ + { \ + pg_atomic_sub_fetch_u32(©_buff_state.data_blk_ptr->unprocessed_chunk_parts, 1); \ + cstate->raw_buf = copy_buff_state.data_blk_ptr->data; \ + copy_buff_state.copy_buf_len = prev_copy_buf_len; \ + } \ + copy_buff_state.raw_buf_ptr = (prev_raw_ptr + 1) % DATA_BLOCK_SIZE; \ + } \ + goto not_end_of_copy; \ +} else ((void) 0) + +/* + * SEEK_COPY_BUFF_POS - Seek the buffer and set the buffer state information. + */ +#define SEEK_COPY_BUFF_POS(cstate, add_size, copy_buff_state) \ +{ \ + if (IsParallelCopy()) \ + { \ + copy_buff_state.chunk_size += add_size; \ + if (copy_buff_state.raw_buf_ptr + add_size >= DATA_BLOCK_SIZE) \ + { \ + /* Increment the unprocessed chunks for the block which we are working */ \ + if (copy_buff_state.copy_raw_buf == copy_buff_state.data_blk_ptr->data) \ + pg_atomic_add_fetch_u32(©_buff_state.data_blk_ptr->unprocessed_chunk_parts, 1); \ + else \ + pg_atomic_add_fetch_u32(©_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts, 1); \ + cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \ + copy_buff_state.copy_buf_len -= DATA_BLOCK_SIZE; \ + copy_buff_state.raw_buf_ptr = 0; \ + copy_buff_state.block_switched = true; \ + } \ + else \ + copy_buff_state.raw_buf_ptr += add_size; \ + } \ + else \ + copy_buff_state.raw_buf_ptr += add_size; \ +} + +/* + * BEGIN_READ_LINE - Initializes the buff state for read line. + */ +#define BEGIN_READ_LINE(cstate, chunk_first_block) \ +{ \ + copy_buff_state.copy_raw_buf = cstate->raw_buf; \ + copy_buff_state.raw_buf_ptr = cstate->raw_buf_index; \ + copy_buff_state.copy_buf_len = cstate->raw_buf_len; \ + /* \ + * There is some data that was read earlier, which need to be \ + * processed. \ + */ \ + if (IsParallelCopy()) \ + { \ + copy_buff_state.chunk_size = 0; \ + if ((copy_buff_state.copy_buf_len - copy_buff_state.raw_buf_ptr) > 0) \ + { \ + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \ + uint32 cur_block_pos = pcshared_info->cur_block_pos; \ + chunk_first_block = pcshared_info->cur_block_pos; \ + copy_buff_state.data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; \ + copy_buff_state.curr_data_blk_ptr = copy_buff_state.data_blk_ptr; \ + } \ + } \ +} + +/* + * SET_RAWBUF_FOR_LOAD - Set raw_buf to the shared memory where the file data must + * be read. + */ +#define SET_RAWBUF_FOR_LOAD() \ +{ \ + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \ + uint32 cur_block_pos; \ + /* \ + * Mark the previous block as completed, worker can start copying this data. \ + */ \ + if (copy_buff_state.data_blk_ptr != copy_buff_state.curr_data_blk_ptr && \ + copy_buff_state.data_blk_ptr->curr_blk_completed == false) \ + copy_buff_state.data_blk_ptr->curr_blk_completed = true; \ + \ + copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \ + cur_block_pos = WaitGetFreeCopyBlock(pcshared_info); \ + copy_buff_state.curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; \ + \ + if (!copy_buff_state.data_blk_ptr) \ + { \ + copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \ + chunk_first_block = cur_block_pos; \ + } \ + else if (need_data == false) \ + copy_buff_state.data_blk_ptr->following_block = cur_block_pos; \ + \ + cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \ + copy_buff_state.copy_raw_buf = cstate->raw_buf; \ +} + +/* + * END_CHUNK_PARALLEL_COPY - Update the chunk information in shared memory. + */ +#define END_CHUNK_PARALLEL_COPY() \ +{ \ + if (!IsHeaderLine()) \ + { \ + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \ + ChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries; \ + if (copy_buff_state.chunk_size) \ + { \ + ChunkBoundary *chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \ + /* \ + * If raw_buf_ptr is zero, unprocessed_chunk_parts would have been \ + * incremented in SEEK_COPY_BUFF_POS. This will happen if the whole \ + * chunk finishes at the end of the current block. If the \ + * new_line_size > raw_buf_ptr, then the new block has only new line \ + * char content. The unprocessed count should not be increased in \ + * this case. \ + */ \ + if (copy_buff_state.raw_buf_ptr != 0 && \ + copy_buff_state.raw_buf_ptr > new_line_size) \ + pg_atomic_add_fetch_u32(©_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts, 1); \ + \ + /* Update chunk size. */ \ + pg_atomic_write_u32(&chunkInfo->chunk_size, copy_buff_state.chunk_size); \ + pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_LEADER_POPULATED); \ + elog(DEBUG1, "[Leader] After adding - chunk position:%d, chunk_size:%d", \ + chunk_pos, copy_buff_state.chunk_size); \ + pcshared_info->populated++; \ + } \ + else if (new_line_size) \ + { \ + /* \ + * This means only new line char, empty record should be \ + * inserted. \ + */ \ + ChunkBoundary *chunkInfo; \ + chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0, \ + CHUNK_LEADER_POPULATED); \ + chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \ + elog(DEBUG1, "[Leader] Added empty chunk with offset:%d, chunk position:%d, chunk size:%d", \ + chunkInfo->start_offset, chunk_pos, \ + pg_atomic_read_u32(&chunkInfo->chunk_size)); \ + pcshared_info->populated++; \ + } \ + }\ + \ + /*\ + * All of the read data is processed, reset index & len. In the\ + * subsequent read, we will get a new block and copy data in to the\ + * new block.\ + */\ + if (copy_buff_state.raw_buf_ptr == copy_buff_state.copy_buf_len)\ + {\ + cstate->raw_buf_index = 0;\ + cstate->raw_buf_len = 0;\ + }\ + else\ + cstate->raw_buf_len = copy_buff_state.copy_buf_len;\ +} + +/* + * CONVERT_TO_SERVER_ENCODING - convert contents to server encoding. + */ +#define CONVERT_TO_SERVER_ENCODING(cstate) \ +{ \ + /* Done reading the line. Convert it to server encoding. */ \ + if (cstate->need_transcoding && \ + (!IsParallelCopy() || \ + (IsParallelCopy() && !IsLeader()))) \ + { \ + char *cvt; \ + cvt = pg_any_to_server(cstate->line_buf.data, \ + cstate->line_buf.len, \ + cstate->file_encoding); \ + if (cvt != cstate->line_buf.data) \ + { \ + /* transfer converted data back to line_buf */ \ + resetStringInfo(&cstate->line_buf); \ + appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); \ + pfree(cvt); \ + } \ + } \ + /* Now it's safe to use the buffer in error messages */ \ + cstate->line_buf_converted = true; \ +} + +/* + * CLEAR_EOL_CHUNK_NON_PARALLEL - Clear EOL from the copied data. + */ +#define CLEAR_EOL_CHUNK_NON_PARALLEL(cstate) \ +{ \ + /* \ + * If we didn't hit EOF, then we must have transferred the EOL marker \ + * to line_buf along with the data. Get rid of it. \ + */ \ + switch (cstate->eol_type) \ + { \ + case EOL_NL: \ + Assert(cstate->line_buf.len >= 1); \ + Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); \ + cstate->line_buf.len--; \ + cstate->line_buf.data[cstate->line_buf.len] = '\0'; \ + break; \ + case EOL_CR: \ + Assert(cstate->line_buf.len >= 1); \ + Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); \ + cstate->line_buf.len--; \ + cstate->line_buf.data[cstate->line_buf.len] = '\0'; \ + break; \ + case EOL_CRNL: \ + Assert(cstate->line_buf.len >= 2); \ + Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); \ + Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); \ + cstate->line_buf.len -= 2; \ + cstate->line_buf.data[cstate->line_buf.len] = '\0'; \ + break; \ + case EOL_UNKNOWN: \ + /* shouldn't get here */ \ + Assert(false); \ + break; \ + } \ +} + +/* + * CLEAR_EOL_LINE - Wrapper for clearing EOL. + */ +#define CLEAR_EOL_LINE() \ +{ \ + if (!result && !IsHeaderLine()) \ + { \ + if (IsParallelCopy()) \ + new_line_size = ClearEOLFromParallelChunk(cstate, ©_buff_state); \ + else \ + CLEAR_EOL_CHUNK_NON_PARALLEL(cstate) \ + } \ +} + +/* + * COPY_WAIT_TO_PROCESS - Wait before continuing to process. + */ +#define COPY_WAIT_TO_PROCESS() \ +{ \ + CHECK_FOR_INTERRUPTS(); \ + (void) WaitLatch(MyLatch, \ + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, \ + 1L, WAIT_EVENT_PG_SLEEP); \ + ResetLatch(MyLatch); \ +} + +/* + * INCREMENTPROCESSED - Increment the lines processed. + */ +#define INCREMENTPROCESSED(processed) \ +{ \ + if (!IsParallelCopy()) \ + processed++; \ + else \ + pg_atomic_add_fetch_u64(&cstate->pcdata->pcshared_info->processed, 1); \ +} + +/* + * GETPROCESSED - Get the lines processed. + */ +#define GETPROCESSED(processed) \ +if (!IsParallelCopy()) \ + return processed; \ +else \ + return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); + +static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; + +/* non-export function prototypes */ +static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, + RawStmt *raw_query, Oid queryRelId, List *attnamelist, + List *options); +static void EndCopy(CopyState cstate); +static void ClosePipeToProgram(CopyState cstate); +static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, + Oid queryRelId, const char *filename, bool is_program, + List *attnamelist, List *options); +static void EndCopyTo(CopyState cstate); +static uint64 DoCopyTo(CopyState cstate); +static uint64 CopyTo(CopyState cstate); +static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot); +static bool CopyReadLine(CopyState cstate); +static bool CopyReadLineText(CopyState cstate); +static int CopyReadAttributesText(CopyState cstate); +static int CopyReadAttributesCSV(CopyState cstate); +static Datum CopyReadBinaryAttribute(CopyState cstate, + int column_no, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, + bool *isnull); +static void CopyAttributeOutText(CopyState cstate, char *string); +static void CopyAttributeOutCSV(CopyState cstate, char *string, + bool use_quote, bool single_attr); +static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, + List *attnamelist); +static char *limit_printout_length(const char *str); + +/* Low-level communications functions */ +static void SendCopyBegin(CopyState cstate); +static void ReceiveCopyBegin(CopyState cstate); +static void SendCopyEnd(CopyState cstate); +static void CopySendData(CopyState cstate, const void *databuf, int datasize); +static void CopySendString(CopyState cstate, const char *str); +static void CopySendChar(CopyState cstate, char c); +static void CopySendEndOfRow(CopyState cstate); +static int CopyGetData(CopyState cstate, void *databuf, + int minread, int maxread); +static void CopySendInt32(CopyState cstate, int32 val); +static bool CopyGetInt32(CopyState cstate, int32 *val); +static void CopySendInt16(CopyState cstate, int16 val); +static bool CopyGetInt16(CopyState cstate, int16 *val); + +static ParallelContext *BeginParallelCopy(int nworkers, CopyState cstate, + List *attlist, Oid relid); +static pg_attribute_always_inline void EndParallelCopy(ParallelContext *pcxt); +static void ExecBeforeStmtTrigger(CopyState cstate); +static pg_attribute_always_inline bool IsTriggerFunctionParallelSafe(TriggerDesc *trigdesc); +static pg_attribute_always_inline bool IsParallelCopyAllowed(CopyState cstate); +static void CheckCopyFromValidity(CopyState cstate); +static pg_attribute_always_inline bool CheckExprParallelSafety(CopyState cstate); +extern void ParallelCopyMain(dsm_segment *seg, shm_toc *toc); +static void ParallelCopyLeader(CopyState cstate); +static void ParallelWorkerInitialization(CopyWorkerCommonData *shared_cstate, + CopyState cstate, List *attnamelist); +static bool CacheChunkInfo(CopyState cstate, uint32 buff_count); +static void PopulateAttributes(CopyState cstate, TupleDesc tup_desc, + List *attnamelist); +static void PopulateCatalogInformation(CopyState cstate); +static pg_attribute_always_inline uint32 GetChunkPosition(CopyState cstate); + +static pg_attribute_always_inline copy_data_source_cb LookupParallelCopyFnPtr(const char *funcname); +static pg_attribute_always_inline char* LookupParallelCopyFnStr(copy_data_source_cb fn_addr); + +/* + * CopyCommonInfoForWorker - Copy shared_cstate using cstate information. + */ +static pg_attribute_always_inline void +CopyCommonInfoForWorker(CopyState cstate, CopyWorkerCommonData *shared_cstate) +{ + shared_cstate->copy_dest = cstate->copy_dest; + shared_cstate->file_encoding = cstate->file_encoding; + shared_cstate->need_transcoding = cstate->need_transcoding; + shared_cstate->encoding_embeds_ascii = cstate->encoding_embeds_ascii; + shared_cstate->csv_mode = cstate->csv_mode; + shared_cstate->header_line = cstate->header_line; + shared_cstate->null_print_len = cstate->null_print_len; + shared_cstate->force_quote_all = cstate->force_quote_all; + shared_cstate->convert_selectively = cstate->convert_selectively; + shared_cstate->num_defaults = cstate->num_defaults; + shared_cstate->relid = cstate->pcdata->relid; +} + +/* + * RetrieveSharedString - Retrieve the string from shared memory. + */ +static void +RetrieveSharedString(shm_toc *toc, int sharedkey, char **copystr) +{ + char *shared_str_val = (char *)shm_toc_lookup(toc, sharedkey, true); + if (shared_str_val) + *copystr = pstrdup(shared_str_val); +} + +/* + * RetrieveSharedList - Retrieve the list from shared memory. + */ +static void +RetrieveSharedList(shm_toc *toc, int sharedkey, List **copylist) +{ + ListInfo *listinformation = (ListInfo *)shm_toc_lookup(toc, sharedkey, + true); + if (listinformation) + { + int length = 0; + int count; + + for (count = 0; count < listinformation->count; count++) + { + char *attname = (char *)(listinformation->info + length); + length += strlen(attname) + 1; + *copylist = lappend(*copylist, makeString(attname)); + } + } +} + +/* + * CopyListSharedMemory - Copy the list into shared memory. + */ +static void +CopyListSharedMemory(List *inputlist, Size memsize, ListInfo *sharedlistinfo) +{ + ListCell *l; + int length = 0; + + MemSet(sharedlistinfo, 0, memsize); + foreach(l, inputlist) + { + char *name = strVal(lfirst(l)); + memcpy((char *)(sharedlistinfo->info + length), name, strlen(name)); + sharedlistinfo->count++; + length += strlen(name) + 1; + } +} + +/* + * ComputeListSize - compute the list size. + */ +static int +ComputeListSize(List *inputlist) +{ + int est_size = sizeof(int); + if (inputlist != NIL) + { + ListCell *l; + foreach(l, inputlist) + est_size += strlen(strVal(lfirst(l))) + 1; + } + + return est_size; +} + +/* + * EstimateChunkKeysStr - Estimate the size required in shared memory for the + * input string. + */ +static void +EstimateChunkKeysStr(ParallelContext *pcxt, char *inputstr) +{ + if (inputstr) + { + shm_toc_estimate_chunk(&pcxt->estimator, strlen(inputstr) + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* + * EstimateChunkKeysList - Estimate the size required in shared memory for the + * input list. + */ +static void +EstimateChunkKeysList(ParallelContext *pcxt, List *inputlist, + Size *est_list_size) +{ + if (inputlist != NIL) + { + *est_list_size = ComputeListSize(inputlist); + shm_toc_estimate_chunk(&pcxt->estimator, *est_list_size); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* + * InsertStringShm - Insert a string into shared memory. + */ +static void +InsertStringShm(ParallelContext *pcxt, int key, char *inputstr) +{ + if (inputstr) + { + char *shmptr = (char *)shm_toc_allocate(pcxt->toc, + strlen(inputstr) + 1); + strcpy(shmptr, inputstr); + shm_toc_insert(pcxt->toc, key, shmptr); + } +} + +/* + * InsertListShm - Insert a list into shared memory. + */ +static void +InsertListShm(ParallelContext *pcxt, int key, List *inputlist, + Size est_list_size) +{ + if (inputlist != NIL) + { + ListInfo *sharedlistinfo = (ListInfo *)shm_toc_allocate(pcxt->toc, + est_list_size); + CopyListSharedMemory(inputlist, est_list_size, sharedlistinfo); + shm_toc_insert(pcxt->toc, key, sharedlistinfo); + } +} + +/* + * IsTriggerFunctionParallelSafe - Check if the trigger function is parallel + * safe for the triggers. Return false if any one of the trigger has parallel + * unsafe function. + */ +static pg_attribute_always_inline bool +IsTriggerFunctionParallelSafe(TriggerDesc *trigdesc) +{ + int i; + for (i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trigger = &trigdesc->triggers[i]; + int trigtype = RI_TRIGGER_NONE; + + if (func_parallel(trigger->tgfoid) != PROPARALLEL_SAFE) + return false; + + /* If the trigger is parallel safe, also look for RI_TRIGGER. */ + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + if (trigtype == RI_TRIGGER_PK || trigtype == RI_TRIGGER_FK) + return false; + } + + return true; +} + +/* + * CheckExprParallelSafety - determine parallel safety of volatile expressions + * in default clause of column definition or in where clause and return true if + * they are parallel safe. + */ +static pg_attribute_always_inline bool +CheckExprParallelSafety(CopyState cstate) +{ + if (contain_volatile_functions(cstate->whereClause)) + { + if (!is_parallel_safe(NULL, (Node *)cstate->whereClause)) + return false; + } + + if (cstate->volatile_defexprs && cstate->defexprs != NULL && + cstate->num_defaults != 0) + { + int i; + for (i = 0; i < cstate->num_defaults; i++) + { + if (!is_parallel_safe(NULL, (Node *) cstate->defexprs[i]->expr)) + return false; + } + } + + return true; +} + +/* + * FindInsertMethod - determine insert mode single, multi, or multi conditional. + */ +static pg_attribute_always_inline CopyInsertMethod +FindInsertMethod(CopyState cstate) +{ + if (cstate->rel->trigdesc != NULL && + (cstate->rel->trigdesc->trig_insert_before_row || + cstate->rel->trigdesc->trig_insert_instead_row)) + return CIM_SINGLE; + + if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + cstate->rel->trigdesc != NULL && + cstate->rel->trigdesc->trig_insert_new_table) + return CIM_SINGLE; + + if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + return CIM_SINGLE; + + if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return CIM_MULTI_CONDITIONAL; + + return CIM_MULTI; +} + +/* + * IsParallelCopyAllowed - check for the cases where parallel copy is not + * applicable. + */ +static pg_attribute_always_inline bool +IsParallelCopyAllowed(CopyState cstate) +{ + /* Parallel copy not allowed for freeze & binary option. */ + if (cstate->freeze || cstate->binary) + return false; + + /* Check if copy is into foreign table. */ + if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + return false; + + /* Check if copy is into a temporary table. */ + if (RELATION_IS_LOCAL(cstate->rel) || RELATION_IS_OTHER_TEMP(cstate->rel)) + return false; + + /* Check if trigger function is parallel safe. */ + if (cstate->rel->trigdesc != NULL && + !IsTriggerFunctionParallelSafe(cstate->rel->trigdesc)) + return false; + + /* + * Check if there is after statement or instead of trigger or transition + * table triggers. + */ + if (cstate->rel->trigdesc != NULL && + (cstate->rel->trigdesc->trig_insert_after_statement || + cstate->rel->trigdesc->trig_insert_instead_row || + cstate->rel->trigdesc->trig_insert_new_table)) + return false; + + /* Check if the volatile expressions are parallel safe, if present any. */ + if (!CheckExprParallelSafety(cstate)) + return false; + + /* Check if the insertion mode is single. */ + if (FindInsertMethod(cstate) == CIM_SINGLE) + return false; + + return true; +} + +/* + * BeginParallelCopy - start parallel copy tasks. + * + * Get the number of workers required to perform the parallel copy. The data + * structures that are required by the parallel workers will be initialized, the + * size required in DSM will be calculated and the necessary keys will be loaded + * in the DSM. The specified number of workers will then be launched. + * + */ +static ParallelContext* +BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) +{ + ParallelContext *pcxt; + ShmCopyInfo *shared_info_ptr; + CopyWorkerCommonData *shared_cstate; + FullTransactionId full_transaction_id; + Size est_shared_info; + Size est_cstateshared; + Size est_att_list_size; + Size est_quote_list_size; + Size est_notnull_list_size; + Size est_null_list_size; + Size est_convert_list_size; + Size est_datasource_cb_size; + int count = 0; + char *whereClauseStr = NULL; + char *rangeTableStr = NULL; + int parallel_workers = 0; + ParallelCopyData *pcdata; + + CheckCopyFromValidity(cstate); + + parallel_workers = Min(nworkers, max_worker_processes); + + /* Can't perform copy in parallel */ + if (parallel_workers <= 0) + return NULL; + + pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); + cstate->pcdata = pcdata; + + /* + * User chosen parallel copy. Determine if the parallel copy is actually + * allowed. If not, go with the non-parallel mode. + */ + if (!IsParallelCopyAllowed(cstate)) + return NULL; + + full_transaction_id = GetCurrentFullTransactionId(); + + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "ParallelCopyMain", + parallel_workers); + Assert(pcxt->nworkers > 0); + + /* + * Estimate size for shared information for PARALLEL_COPY_KEY_SHARED_INFO + */ + est_shared_info = sizeof(ShmCopyInfo); + shm_toc_estimate_chunk(&pcxt->estimator, est_shared_info); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate the size for shared information for PARALLEL_COPY_KEY_CSTATE */ + est_cstateshared = MAXALIGN(sizeof(CopyWorkerCommonData)); + shm_toc_estimate_chunk(&pcxt->estimator, est_cstateshared); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + EstimateChunkKeysStr(pcxt, cstate->null_print); + EstimateChunkKeysStr(pcxt, cstate->null_print_client); + EstimateChunkKeysStr(pcxt, cstate->delim); + EstimateChunkKeysStr(pcxt, cstate->quote); + EstimateChunkKeysStr(pcxt, cstate->escape); + + if (cstate->whereClause != NULL) + { + whereClauseStr = nodeToString(cstate->whereClause); + EstimateChunkKeysStr(pcxt, whereClauseStr); + } + + if (cstate->range_table != NULL) + { + rangeTableStr = nodeToString(cstate->range_table); + EstimateChunkKeysStr(pcxt, rangeTableStr); + } + + /* Estimate the size for shared information for PARALLEL_COPY_KEY_XID. */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FullTransactionId)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_ATTNAME_LIST. + */ + EstimateChunkKeysList(pcxt, attnamelist, &est_att_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_FORCE_QUOTE_LIST. + */ + EstimateChunkKeysList(pcxt, cstate->force_quote, &est_quote_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_NOT_NULL_LIST. + */ + EstimateChunkKeysList(pcxt, cstate->force_notnull, + &est_notnull_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_NULL_LIST. + */ + EstimateChunkKeysList(pcxt, cstate->force_null, &est_null_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_CONVERT_LIST. + */ + EstimateChunkKeysList(pcxt, cstate->convert_select, + &est_convert_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_DATASOURCE_CB. + */ + if (cstate->data_source_cb) + { + char *functionname = LookupParallelCopyFnStr(cstate->data_source_cb); + est_datasource_cb_size = strlen(functionname) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, est_datasource_cb_size); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial copy) */ + if (pcxt->seg == NULL) + { + EndParallelCopy(pcxt); + return NULL; + } + + /* Allocate shared memory for PARALLEL_COPY_KEY_SHARED_INFO */ + shared_info_ptr = (ShmCopyInfo *) shm_toc_allocate(pcxt->toc, + est_shared_info); + MemSet(shared_info_ptr, 0, est_shared_info); + shared_info_ptr->is_read_in_progress = true; + shared_info_ptr->cur_block_pos = -1; + shared_info_ptr->full_transaction_id = full_transaction_id; + shared_info_ptr->mycid = GetCurrentCommandId(true); + for (count = 0; count < RINGSIZE; count++) + { + ChunkBoundary *chunkInfo = &shared_info_ptr->chunk_boundaries.ring[count]; + pg_atomic_init_u32(&(chunkInfo->chunk_size), -1); + } + + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_SHARED_INFO, shared_info_ptr); + pcdata->pcshared_info = shared_info_ptr; + pcdata->relid = relid; + + /* Store shared build state, for which we reserved space. */ + shared_cstate = (CopyWorkerCommonData *)shm_toc_allocate(pcxt->toc, + est_cstateshared); + + /* copy cstate variables. */ + CopyCommonInfoForWorker(cstate, shared_cstate); + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_CSTATE, shared_cstate); + + InsertStringShm(pcxt, PARALLEL_COPY_KEY_NULL_PRINT, cstate->null_print); + InsertStringShm(pcxt, PARALLEL_COPY_KEY_NULL_PRINT_CLIENT, + cstate->null_print_client); + InsertStringShm(pcxt, PARALLEL_COPY_KEY_DELIM, cstate->delim); + InsertStringShm(pcxt, PARALLEL_COPY_KEY_QUOTE, cstate->quote); + InsertStringShm(pcxt, PARALLEL_COPY_KEY_ESCAPE, cstate->escape); + + InsertListShm(pcxt, PARALLEL_COPY_KEY_ATTNAME_LIST, + attnamelist, est_att_list_size); + InsertListShm(pcxt, PARALLEL_COPY_KEY_FORCE_QUOTE_LIST, + cstate->force_quote, est_quote_list_size); + InsertListShm(pcxt, PARALLEL_COPY_KEY_NOT_NULL_LIST, + cstate->force_notnull, est_notnull_list_size); + InsertListShm(pcxt, PARALLEL_COPY_KEY_NULL_LIST, cstate->force_null, + est_null_list_size); + InsertListShm(pcxt, PARALLEL_COPY_KEY_CONVERT_LIST, + cstate->convert_select, est_convert_list_size); + + if (cstate->data_source_cb) + { + char *functionname = LookupParallelCopyFnStr(cstate->data_source_cb); + char *data_source_cb = (char *) shm_toc_allocate(pcxt->toc, + est_datasource_cb_size); + strcpy(data_source_cb, functionname); + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_DATASOURCE_CB, + data_source_cb); + } + + if (cstate->whereClause) + InsertStringShm(pcxt, PARALLEL_COPY_KEY_WHERE_CLAUSE_STR, + whereClauseStr); + + if(cstate->range_table) + InsertStringShm(pcxt, PARALLEL_COPY_KEY_RANGE_TABLE, rangeTableStr); + + LaunchParallelWorkers(pcxt); + if (pcxt->nworkers_launched == 0) + { + EndParallelCopy(pcxt); + return NULL; + } + + /* + * Caller needs to wait for all launched workers when we return. Make sure + * that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); + + pcdata->is_leader = true; + cstate->is_parallel = true; + return pcxt; +} + +/* + * EndParallelCopy - end the parallel copy tasks. + */ +static pg_attribute_always_inline void +EndParallelCopy(ParallelContext *pcxt) +{ + Assert(!IsParallelWorker()); + + DestroyParallelContext(pcxt); + ExitParallelMode(); +} + +/* + * ParallelWorkerInitialization - Initialize parallel worker. + */ +static void +ParallelWorkerInitialization(CopyWorkerCommonData *shared_cstate, + CopyState cstate, List *attnamelist) +{ + uint32 count; + ParallelCopyData *pcdata = cstate->pcdata; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + cstate->copy_dest = shared_cstate->copy_dest; + cstate->file_encoding = shared_cstate->file_encoding; + cstate->need_transcoding = shared_cstate->need_transcoding; + cstate->encoding_embeds_ascii = shared_cstate->encoding_embeds_ascii; + cstate->csv_mode = shared_cstate->csv_mode; + cstate->header_line = shared_cstate->header_line; + cstate->null_print_len = shared_cstate->null_print_len; + cstate->force_quote_all = shared_cstate->force_quote_all; + cstate->convert_selectively = shared_cstate->convert_selectively; + cstate->num_defaults = shared_cstate->num_defaults; + pcdata->relid = shared_cstate->relid; + + PopulateAttributes(cstate, tup_desc, attnamelist); + + /* Initialize state variables. */ + cstate->reached_eof = false; + cstate->eol_type = EOL_UNKNOWN; + cstate->cur_relname = RelationGetRelationName(cstate->rel); + cstate->cur_lineno = 0; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + + /* Set up variables to avoid per-attribute overhead. */ + initStringInfo(&cstate->attribute_buf); + + initStringInfo(&cstate->line_buf); + for (count = 0; count < WORKER_CHUNK_COUNT;count++) + initStringInfo(&pcdata->worker_line_buf[count].line_buf); + + cstate->line_buf_converted = false; + cstate->raw_buf = NULL; + cstate->raw_buf_index = cstate->raw_buf_len = 0; + + PopulateCatalogInformation(cstate); + + /* Create workspace for CopyReadAttributes results. */ + if (!cstate->binary) + { + AttrNumber attr_count = list_length(cstate->attnumlist); + + cstate->max_fields = attr_count; + cstate->raw_fields = (char **)palloc(attr_count * sizeof(char *)); + } +} + +/* + * CacheChunkInfo - Cache the chunk information to local memory. + */ +static bool +CacheChunkInfo(CopyState cstate, uint32 buff_count) +{ + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; + ParallelCopyData *pcdata = cstate->pcdata; + uint32 write_pos; + CopyDataBlock *data_blk_ptr; + ChunkBoundary *chunkInfo; + uint32 offset; + int dataSize; + int copiedSize = 0; + + resetStringInfo(&pcdata->worker_line_buf[buff_count].line_buf); + write_pos = GetChunkPosition(cstate); + if (-1 == write_pos) + return true; + + /* Get the current chunk information. */ + chunkInfo = &pcshared_info->chunk_boundaries.ring[write_pos]; + if (pg_atomic_read_u32(&chunkInfo->chunk_size) == 0) + goto empty_data_chunk_update; + + /* Get the block information. */ + data_blk_ptr = &pcshared_info->data_blocks[chunkInfo->first_block]; + + /* Get the offset information from where the data must be copied. */ + offset = chunkInfo->start_offset; + pcdata->worker_line_buf[buff_count].cur_lineno = chunkInfo->cur_lineno; + + elog(DEBUG1, "[Worker] Processing - chunk position:%d, block:%d, unprocessed chunks:%d, offset:%d, chunk size:%d", + write_pos, chunkInfo->first_block, + pg_atomic_read_u32(&data_blk_ptr->unprocessed_chunk_parts), + offset, pg_atomic_read_u32(&chunkInfo->chunk_size)); + + for (;;) + { + /* + * There is a possibility that the above loop has come out because + * data_blk_ptr->curr_blk_completed is set, but dataSize read might + * be an old value, if data_blk_ptr->curr_blk_completed and the chunk is + * completed, chunk_size will be set. Read the chunk_size again to be + * sure if it is complete or partial block. + */ + dataSize = pg_atomic_read_u32(&chunkInfo->chunk_size); + if (dataSize) + { + int remainingSize = dataSize - copiedSize; + if (!remainingSize) + break; + + /* Whole chunk is in current block. */ + if (remainingSize + offset < DATA_BLOCK_SIZE) + { + appendBinaryStringInfo(&pcdata->worker_line_buf[buff_count].line_buf, &data_blk_ptr->data[offset], + remainingSize); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_chunk_parts, 1); + break; + } + else + { + /* Chunk is spread across the blocks. */ + int chunkInCurrentBlock = DATA_BLOCK_SIZE - offset; + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + chunkInCurrentBlock); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_chunk_parts, 1); + copiedSize += chunkInCurrentBlock; + while (copiedSize < dataSize) + { + CopyDataBlock *currBlkPtr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; + + /* + * If complete data is present in current block use + * dataSize - copiedSize, or copy the whole block from + * current block. + */ + int currentBlockCopySize = Min(dataSize - copiedSize, DATA_BLOCK_SIZE); + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &currBlkPtr->data[0], + currentBlockCopySize); + pg_atomic_sub_fetch_u32(&currBlkPtr->unprocessed_chunk_parts, 1); + copiedSize += currentBlockCopySize; + data_blk_ptr = currBlkPtr; + } + + break; + } + } + else + { + /* Copy this complete block from the current offset. */ + int chunkInCurrentBlock = DATA_BLOCK_SIZE - offset; + appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf, + &data_blk_ptr->data[offset], + chunkInCurrentBlock); + pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_chunk_parts, 1); + copiedSize += chunkInCurrentBlock; + + /* + * Reset the offset. For the first copy, copy from the offset. For + * the subsequent copy the complete block. + */ + offset = 0; + + /* Set data_blk_ptr to the following block. */ + data_blk_ptr = &pcshared_info->data_blocks[data_blk_ptr->following_block]; + } + + for (;;) + { + /* Get the size of this chunk */ + dataSize = pg_atomic_read_u32(&chunkInfo->chunk_size); + + /* + * If the data is present in current block chunkInfo.chunk_size + * will be updated. If the data is spread across the blocks either + * of chunkInfo.chunk_size or data_blk_ptr->curr_blk_completed can + * be updated. chunkInfo.chunk_size will be updated if the complete + * read is finished. data_blk_ptr->curr_blk_completed will be + * updated if processing of current block is finished and data + * processing is not finished. + */ + if (data_blk_ptr->curr_blk_completed || (dataSize != -1)) + break; + + COPY_WAIT_TO_PROCESS() + } + } + +empty_data_chunk_update: + elog(DEBUG1, "[Worker] Completed processing chunk:%d", write_pos); + pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_WORKER_PROCESSED); + pg_atomic_write_u32(&chunkInfo->chunk_size, -1); + pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1); + return false; +} + +/* + * GetWorkerChunk - Returns a chunk for worker to process. + */ +static bool +GetWorkerChunk(CopyState cstate) +{ + uint32 buff_count; + ParallelCopyData *pcdata = cstate->pcdata; + + /* + * Copy the chunk data to line_buf and release the chunk position so that the + * worker can continue loading data. + */ + if (pcdata->worker_line_buf_pos < pcdata->worker_line_buf_count) + goto return_chunk; + + pcdata->worker_line_buf_pos = 0; + pcdata->worker_line_buf_count = 0; + + for (buff_count = 0; buff_count < WORKER_CHUNK_COUNT; buff_count++) + { + bool result = CacheChunkInfo(cstate, buff_count); + if (result) + break; + + pcdata->worker_line_buf_count++; + } + + if (pcdata->worker_line_buf_count) + goto return_chunk; + else + resetStringInfo(&cstate->line_buf); + + return true; + +return_chunk: + cstate->line_buf = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].line_buf; + cstate->cur_lineno = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].cur_lineno; + cstate->line_buf_valid = true; + + /* Mark that encoding conversion hasn't occurred yet. */ + cstate->line_buf_converted = false; + CONVERT_TO_SERVER_ENCODING(cstate) + pcdata->worker_line_buf_pos++; + return false; +} + +/* + * ParallelCopyMain - parallel copy worker's code. + * + * Where clause handling, convert tuple to columns, add default null values for + * the missing columns that are not present in that record. Find the partition + * if it is partitioned table, invoke before row insert Triggers, handle + * constraints and insert the tuples. + */ +void +ParallelCopyMain(dsm_segment *seg, shm_toc *toc) +{ + CopyState cstate; + ParallelCopyData *pcdata; + ShmCopyInfo *pcshared_info; + CopyWorkerCommonData *shared_cstate; + Relation rel = NULL; + MemoryContext oldcontext; + List *attlist = NIL; + char *data_source_cb; + char *whereClauseStr = NULL; + char *rangeTableStr = NULL; + + /* Allocate workspace and zero all fields. */ + cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); + pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); + cstate->pcdata = pcdata; + pcdata->is_leader = false; + pcdata->worker_processed_pos = -1; + cstate->is_parallel = true; + + /* + * We allocate everything used by a cstate in a new memory context. This + * avoids memory leaks during repeated use of COPY in a query. + */ + cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY", + ALLOCSET_DEFAULT_SIZES); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + pcshared_info = (ShmCopyInfo *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_SHARED_INFO, + false); + + ereport(DEBUG1, (errmsg("Starting parallel copy worker"))); + + pcdata->pcshared_info = pcshared_info; + AssignFullTransactionIdForWorker(pcshared_info->full_transaction_id); + + shared_cstate = (CopyWorkerCommonData *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_CSTATE, + false); + + cstate->null_print = (char *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_NULL_PRINT, + true); + cstate->null_print_client = (char *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_NULL_PRINT_CLIENT, + true); + + RetrieveSharedString(toc, PARALLEL_COPY_KEY_DELIM, &cstate->delim); + RetrieveSharedString(toc, PARALLEL_COPY_KEY_QUOTE, &cstate->quote); + RetrieveSharedString(toc, PARALLEL_COPY_KEY_ESCAPE, &cstate->escape); + + RetrieveSharedList(toc, PARALLEL_COPY_KEY_ATTNAME_LIST, &attlist); + RetrieveSharedList(toc, PARALLEL_COPY_KEY_FORCE_QUOTE_LIST, &cstate->force_quote); + RetrieveSharedList(toc, PARALLEL_COPY_KEY_NOT_NULL_LIST, &cstate->force_notnull); + RetrieveSharedList(toc, PARALLEL_COPY_KEY_NULL_LIST, &cstate->force_null); + RetrieveSharedList(toc, PARALLEL_COPY_KEY_CONVERT_LIST, &cstate->convert_select); + data_source_cb = (char *)shm_toc_lookup(toc, + PARALLEL_COPY_KEY_DATASOURCE_CB, + true); + RetrieveSharedString(toc, PARALLEL_COPY_KEY_WHERE_CLAUSE_STR, &whereClauseStr); + RetrieveSharedString(toc, PARALLEL_COPY_KEY_RANGE_TABLE, &rangeTableStr); + + if (data_source_cb) + cstate->data_source_cb = LookupParallelCopyFnPtr(data_source_cb); + + if (whereClauseStr) + { + Node *whereClauseCnvrtdFrmStr = (Node *) stringToNode(whereClauseStr); + cstate->whereClause = whereClauseCnvrtdFrmStr; + } + + if (rangeTableStr) + { + List *rangeTableCnvrtdFrmStr = (List *) stringToNode(rangeTableStr); + cstate->range_table = rangeTableCnvrtdFrmStr; + } + + /* Open and lock the relation, using the appropriate lock type. */ + rel = table_open(shared_cstate->relid, RowExclusiveLock); + cstate->rel = rel; + ParallelWorkerInitialization(shared_cstate, cstate, attlist); + + CopyFrom(cstate); + + if (rel != NULL) + table_close(rel, RowExclusiveLock); + + MemoryContextSwitchTo(oldcontext); + return; +} + +/* + * UpdateBlockInChunkInfo - Update the chunk information. + */ +static pg_attribute_always_inline int +UpdateBlockInChunkInfo(CopyState cstate, uint32 blk_pos, + uint32 offset, uint32 chunk_size, uint32 chunk_state) +{ + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; + ChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries; + ChunkBoundary *chunkInfo; + int chunk_pos = chunkBoundaryPtr->leader_pos; + + /* Update the chunk information for the worker to pick and process. */ + chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; + while (pg_atomic_read_u32(&chunkInfo->chunk_size) != -1) + COPY_WAIT_TO_PROCESS() + + chunkInfo->first_block = blk_pos; + chunkInfo->start_offset = offset; + chunkInfo->cur_lineno = cstate->cur_lineno; + pg_atomic_write_u32(&chunkInfo->chunk_size, chunk_size); + pg_atomic_write_u32(&chunkInfo->chunk_state, chunk_state); + chunkBoundaryPtr->leader_pos = (chunkBoundaryPtr->leader_pos + 1) % RINGSIZE; + + return chunk_pos; +} + +/* + * ParallelCopyLeader - parallel copy leader's functionality. + * + * Leader will populate the shared queue and share it across the workers. Leader + * will read the table data from the file and copy the contents to block. Leader + * will then read the input contents and identify the data based on line beaks. + * This information is called chunk. The chunk will be populate in + * ChunkBoundary. Workers will then pick up this information and insert + * in to table. Leader will do this till it completes processing the file. + * Leader executes the before statement if before statement trigger is present. + * Leader read the data from input file. Leader then loads data to data blocks + * as and when required block by block. Leader traverses through the data block + * to identify one chunk. It gets a free chunk to copy the information, if there + * is no free chunk it will wait till there is one free chunk. + * Server copies the identified chunks information into chunks. This process is + * repeated till the complete file is processed. + * Leader will wait till all the chunks populated are processed by the workers + * and exits. + */ +static void +ParallelCopyLeader(CopyState cstate) +{ + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; + ereport(DEBUG1, (errmsg("Running parallel copy leader"))); + + /* Execute the before statement triggers from the leader */ + ExecBeforeStmtTrigger(cstate); + + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } + } + + for (;;) + { + bool done; + cstate->cur_lineno++; + + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + + pcshared_info->is_read_in_progress = false; + cstate->cur_lineno = 0; +} + +/* + * GetChunkPosition - return the chunk position that worker should process. + */ +static uint32 +GetChunkPosition(CopyState cstate) +{ + ParallelCopyData *pcdata = cstate->pcdata; + ShmCopyInfo *pcshared_info = pcdata->pcshared_info; + uint32 previous_pos = pcdata->worker_processed_pos; + uint32 write_pos = (previous_pos == -1) ? 0 : (previous_pos + 1) % RINGSIZE; + for (;;) + { + int dataSize; + bool is_read_in_progress = pcshared_info->is_read_in_progress; + ChunkBoundary *chunkInfo; + CopyDataBlock *data_blk_ptr; + ChunkState chunk_state = CHUNK_LEADER_POPULATED; + ChunkState curr_chunk_state; + CHECK_FOR_INTERRUPTS(); + + /* File read completed & no elements to process. */ + if (!is_read_in_progress && + (pcshared_info->populated == + pg_atomic_read_u64(&pcshared_info->total_worker_processed))) + { + write_pos = -1; + break; + } + + /* Get the current chunk information. */ + chunkInfo = &pcshared_info->chunk_boundaries.ring[write_pos]; + curr_chunk_state = pg_atomic_read_u32(&chunkInfo->chunk_state); + if ((write_pos % WORKER_CHUNK_COUNT == 0) && + (curr_chunk_state == CHUNK_WORKER_PROCESSED || + curr_chunk_state == CHUNK_WORKER_PROCESSING)) + { + pcdata->worker_processed_pos = write_pos; + write_pos = (write_pos + WORKER_CHUNK_COUNT) % RINGSIZE; + continue; + } + + /* Get the size of this chunk. */ + dataSize = pg_atomic_read_u32(&chunkInfo->chunk_size); + + if (dataSize != 0) /* If not an empty chunk. */ + { + /* Get the block information. */ + data_blk_ptr = &pcshared_info->data_blocks[chunkInfo->first_block]; + + if (!data_blk_ptr->curr_blk_completed && (dataSize == -1)) + { + /* Wait till the current chunk or block is added. */ + COPY_WAIT_TO_PROCESS() + continue; + } + } + + /* Make sure that no worker has consumed this element. */ + if (pg_atomic_compare_exchange_u32(&chunkInfo->chunk_state, + &chunk_state, CHUNK_WORKER_PROCESSING)) + break; + } + + pcdata->worker_processed_pos = write_pos; + return write_pos; +} + +/* + * GetFreeCopyBlock - Get a free block for data to be copied. + */ +static pg_attribute_always_inline uint32 +GetFreeCopyBlock(ShmCopyInfo *pcshared_info) +{ + int count = 0; + uint32 last_free_block = pcshared_info->cur_block_pos; + uint32 block_pos = (last_free_block != -1) ? ((last_free_block + 1) % MAX_BLOCKS_COUNT): 0; + + /* Get a new block for copying data. */ + while (count < MAX_BLOCKS_COUNT) + { + CopyDataBlock *inputBlk = &pcshared_info->data_blocks[block_pos]; + uint32 unprocessed_chunk_parts = pg_atomic_read_u32(&inputBlk->unprocessed_chunk_parts); + if (unprocessed_chunk_parts == 0) + { + inputBlk->curr_blk_completed = false; + pcshared_info->cur_block_pos = block_pos; + return block_pos; + } + + block_pos = (block_pos + 1) % MAX_BLOCKS_COUNT; + count++; + } + + return -1; +} + +/* + * WaitGetFreeCopyBlock - If there are no blocks available, wait and get a block + * for copying data. */ -#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \ -if (1) \ -{ \ - if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \ - { \ - raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \ - need_data = true; \ - continue; \ - } \ -} else ((void) 0) +static uint32 +WaitGetFreeCopyBlock(ShmCopyInfo *pcshared_info) +{ + uint32 new_free_pos = -1; + for (;;) + { + new_free_pos = GetFreeCopyBlock(pcshared_info); + if (new_free_pos != -1) /* We have got one block, break now. */ + break; -/* This consumes the remainder of the buffer and breaks */ -#define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \ -if (1) \ -{ \ - if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \ - { \ - if (extralen) \ - raw_buf_ptr = copy_buf_len; /* consume the partial character */ \ - /* backslash just before EOF, treat as data char */ \ - result = true; \ - break; \ - } \ -} else ((void) 0) + COPY_WAIT_TO_PROCESS() + } + + return new_free_pos; +} /* - * Transfer any approved data to line_buf; must do this to be sure - * there is some room in raw_buf. + * LookupParallelCopyFnPtr - Look up parallel copy function pointer. */ -#define REFILL_LINEBUF \ -if (1) \ -{ \ - if (raw_buf_ptr > cstate->raw_buf_index) \ - { \ - appendBinaryStringInfo(&cstate->line_buf, \ - cstate->raw_buf + cstate->raw_buf_index, \ - raw_buf_ptr - cstate->raw_buf_index); \ - cstate->raw_buf_index = raw_buf_ptr; \ - } \ -} else ((void) 0) - -/* Undo any read-ahead and jump out of the block. */ -#define NO_END_OF_COPY_GOTO \ -if (1) \ -{ \ - raw_buf_ptr = prev_raw_ptr + 1; \ - goto not_end_of_copy; \ -} else ((void) 0) +static pg_attribute_always_inline copy_data_source_cb +LookupParallelCopyFnPtr(const char *funcname) +{ + int i; -static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; + for (i = 0; i < lengthof(InternalParallelCopyFuncPtrs); i++) + { + if (strcmp(InternalParallelCopyFuncPtrs[i].fn_name, funcname) == 0) + return InternalParallelCopyFuncPtrs[i].fn_addr; + } + /* We can only reach this by programming error. */ + elog(ERROR, "internal function \"%s\" not found", funcname); +} -/* non-export function prototypes */ -static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, - RawStmt *raw_query, Oid queryRelId, List *attnamelist, - List *options); -static void EndCopy(CopyState cstate); -static void ClosePipeToProgram(CopyState cstate); -static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, - Oid queryRelId, const char *filename, bool is_program, - List *attnamelist, List *options); -static void EndCopyTo(CopyState cstate); -static uint64 DoCopyTo(CopyState cstate); -static uint64 CopyTo(CopyState cstate); -static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot); -static bool CopyReadLine(CopyState cstate); -static bool CopyReadLineText(CopyState cstate); -static int CopyReadAttributesText(CopyState cstate); -static int CopyReadAttributesCSV(CopyState cstate); -static Datum CopyReadBinaryAttribute(CopyState cstate, - int column_no, FmgrInfo *flinfo, - Oid typioparam, int32 typmod, - bool *isnull); -static void CopyAttributeOutText(CopyState cstate, char *string); -static void CopyAttributeOutCSV(CopyState cstate, char *string, - bool use_quote, bool single_attr); -static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, - List *attnamelist); -static char *limit_printout_length(const char *str); +/* + * LookupParallelCopyFnStr - Lookup function string from a function pointer. + */ +static pg_attribute_always_inline char* +LookupParallelCopyFnStr(copy_data_source_cb fn_addr) +{ + int i; -/* Low-level communications functions */ -static void SendCopyBegin(CopyState cstate); -static void ReceiveCopyBegin(CopyState cstate); -static void SendCopyEnd(CopyState cstate); -static void CopySendData(CopyState cstate, const void *databuf, int datasize); -static void CopySendString(CopyState cstate, const char *str); -static void CopySendChar(CopyState cstate, char c); -static void CopySendEndOfRow(CopyState cstate); -static int CopyGetData(CopyState cstate, void *databuf, - int minread, int maxread); -static void CopySendInt32(CopyState cstate, int32 val); -static bool CopyGetInt32(CopyState cstate, int32 *val); -static void CopySendInt16(CopyState cstate, int16 val); -static bool CopyGetInt16(CopyState cstate, int16 *val); + for (i = 0; i < lengthof(InternalParallelCopyFuncPtrs); i++) + { + if (InternalParallelCopyFuncPtrs[i].fn_addr == fn_addr) + return InternalParallelCopyFuncPtrs[i].fn_name; + } + /* We can only reach this by programming error. */ + elog(ERROR, "internal function pointer not found"); +} /* * Send copy start/stop messages for frontend copies. These have changed @@ -611,7 +2208,6 @@ static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) { int bytesread = 0; - switch (cstate->copy_dest) { case COPY_FILE: @@ -790,17 +2386,17 @@ CopyGetInt16(CopyState cstate, int16 *val) * bufferload boundary. */ static bool -CopyLoadRawBuf(CopyState cstate) +CopyLoadRawBuf(CopyState cstate, int raw_buf_len, int *raw_buf_index) { int nbytes; int inbytes; - if (cstate->raw_buf_index < cstate->raw_buf_len) + if (!IsParallelCopy() && *raw_buf_index < raw_buf_len) { - /* Copy down the unprocessed data */ - nbytes = cstate->raw_buf_len - cstate->raw_buf_index; - memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, - nbytes); + /* Copy down the unprocessed data. */ + nbytes = raw_buf_len - *raw_buf_index; + memmove(cstate->raw_buf, cstate->raw_buf + *raw_buf_index, + nbytes); } else nbytes = 0; /* no data need be saved */ @@ -809,12 +2405,16 @@ CopyLoadRawBuf(CopyState cstate) 1, RAW_BUF_SIZE - nbytes); nbytes += inbytes; cstate->raw_buf[nbytes] = '\0'; - cstate->raw_buf_index = 0; + if (!IsParallelCopy()) + { + cstate->raw_buf_index = 0; + *raw_buf_index = 0; + } cstate->raw_buf_len = nbytes; + return (inbytes > 0); } - /* * DoCopy executes the SQL COPY statement * @@ -1060,6 +2660,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, if (is_from) { + ParallelContext *pcxt = NULL; Assert(rel); /* check read-only transaction and parallel mode */ @@ -1069,7 +2670,24 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); cstate->whereClause = whereClause; - *processed = CopyFrom(cstate); /* copy from file to database */ + cstate->is_parallel = false; + + if (cstate->nworkers > 0) + pcxt = BeginParallelCopy(cstate->nworkers, cstate, stmt->attlist, + relid); + + if (pcxt) + { + ParallelCopyLeader(cstate); + + /* Wait for all copy workers to finish */ + WaitForParallelWorkersToFinish(pcxt); + *processed = pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); + EndParallelCopy(pcxt); + } + else + *processed = CopyFrom(cstate); /* copy from file to database */ + EndCopyFrom(cstate); } else @@ -1118,6 +2736,7 @@ ProcessCopyOptions(ParseState *pstate, cstate->is_copy_from = is_from; cstate->file_encoding = -1; + cstate->nworkers = -1; /* Extract options from the statement node tree */ foreach(option, options) @@ -1286,6 +2905,26 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "parallel") == 0) + { + if (!cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parallel option supported only for copy from"), + parser_errposition(pstate, defel->location))); + if (cstate->nworkers >= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->nworkers = atoi(defGetString(defel)); + if (cstate->nworkers < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be a non-negative integer", + defel->defname), + parser_errposition(pstate, defel->location))); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1377,67 +3016,194 @@ ProcessCopyOptions(ParseState *pstate, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote must be a single one-byte character"))); - if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0]) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY delimiter and quote must be different"))); + if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0]) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY delimiter and quote must be different"))); + + /* Check escape */ + if (!cstate->csv_mode && cstate->escape != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY escape available only in CSV mode"))); + + if (cstate->csv_mode && strlen(cstate->escape) != 1) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY escape must be a single one-byte character"))); + + /* Check force_quote */ + if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY force quote available only in CSV mode"))); + if ((cstate->force_quote || cstate->force_quote_all) && is_from) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY force quote only available using COPY TO"))); + + /* Check force_notnull */ + if (!cstate->csv_mode && cstate->force_notnull != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY force not null available only in CSV mode"))); + if (cstate->force_notnull != NIL && !is_from) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY force not null only available using COPY FROM"))); + + /* Check force_null */ + if (!cstate->csv_mode && cstate->force_null != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY force null available only in CSV mode"))); + + if (cstate->force_null != NIL && !is_from) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY force null only available using COPY FROM"))); + + /* Don't allow the delimiter to appear in the null string. */ + if (strchr(cstate->null_print, cstate->delim[0]) != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY delimiter must not appear in the NULL specification"))); + + /* Don't allow the CSV quote char to appear in the null string. */ + if (cstate->csv_mode && + strchr(cstate->null_print, cstate->quote[0]) != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("CSV quote character must not appear in the NULL specification"))); +} + +/* + * PopulateAttributes - Populate the attributes. + */ +void PopulateAttributes(CopyState cstate, TupleDesc tupDesc, List *attnamelist) +{ + int num_phys_attrs; + + /* Generate or convert list of attributes to process */ + cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); + + num_phys_attrs = tupDesc->natts; + + /* Convert FORCE_QUOTE name list to per-column flags, check validity */ + cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); + if (cstate->force_quote_all) + { + int i; + + for (i = 0; i < num_phys_attrs; i++) + cstate->force_quote_flags[i] = true; + } + else if (cstate->force_quote) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote); + + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY", + NameStr(attr->attname)))); + cstate->force_quote_flags[attnum - 1] = true; + } + } + + /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ + cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); + if (cstate->force_notnull) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull); + + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY", + NameStr(attr->attname)))); + cstate->force_notnull_flags[attnum - 1] = true; + } + } + + /* Convert FORCE_NULL name list to per-column flags, check validity */ + cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); + if (cstate->force_null) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null); - /* Check escape */ - if (!cstate->csv_mode && cstate->escape != NULL) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY escape available only in CSV mode"))); + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - if (cstate->csv_mode && strlen(cstate->escape) != 1) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY escape must be a single one-byte character"))); + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("FORCE_NULL column \"%s\" not referenced by COPY", + NameStr(attr->attname)))); + cstate->force_null_flags[attnum - 1] = true; + } + } - /* Check force_quote */ - if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY force quote available only in CSV mode"))); - if ((cstate->force_quote || cstate->force_quote_all) && is_from) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY force quote only available using COPY TO"))); + /* Convert convert_selectively name list to per-column flags */ + if (cstate->convert_selectively) + { + List *attnums; + ListCell *cur; - /* Check force_notnull */ - if (!cstate->csv_mode && cstate->force_notnull != NIL) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY force not null available only in CSV mode"))); - if (cstate->force_notnull != NIL && !is_from) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY force not null only available using COPY FROM"))); + cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); - /* Check force_null */ - if (!cstate->csv_mode && cstate->force_null != NIL) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY force null available only in CSV mode"))); + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select); - if (cstate->force_null != NIL && !is_from) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY force null only available using COPY FROM"))); + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - /* Don't allow the delimiter to appear in the null string. */ - if (strchr(cstate->null_print, cstate->delim[0]) != NULL) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY delimiter must not appear in the NULL specification"))); + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg_internal("selected column \"%s\" not referenced by COPY", + NameStr(attr->attname)))); + cstate->convert_select_flags[attnum - 1] = true; + } + } - /* Don't allow the CSV quote char to appear in the null string. */ - if (cstate->csv_mode && - strchr(cstate->null_print, cstate->quote[0]) != NULL) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("CSV quote character must not appear in the NULL specification"))); -} + /* Use client encoding when ENCODING option is not specified. */ + if (cstate->file_encoding < 0) + cstate->file_encoding = pg_get_client_encoding(); + /* + * Set up encoding conversion info. Even if the file and server encodings + * are the same, we must apply pg_any_to_server() to validate data in + * multibyte encodings. + */ + cstate->need_transcoding = + (cstate->file_encoding != GetDatabaseEncoding() || + pg_database_encoding_max_length() > 1); + /* See Multibyte encoding comment above */ + cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); +} /* * Common setup routines used by BeginCopyFrom and BeginCopyTo. * @@ -1464,7 +3230,6 @@ BeginCopy(ParseState *pstate, { CopyState cstate; TupleDesc tupDesc; - int num_phys_attrs; MemoryContext oldcontext; /* Allocate workspace and zero all fields */ @@ -1630,126 +3395,7 @@ BeginCopy(ParseState *pstate, tupDesc = cstate->queryDesc->tupDesc; } - /* Generate or convert list of attributes to process */ - cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); - - num_phys_attrs = tupDesc->natts; - - /* Convert FORCE_QUOTE name list to per-column flags, check validity */ - cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); - if (cstate->force_quote_all) - { - int i; - - for (i = 0; i < num_phys_attrs; i++) - cstate->force_quote_flags[i] = true; - } - else if (cstate->force_quote) - { - List *attnums; - ListCell *cur; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote); - - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (!list_member_int(cstate->attnumlist, attnum)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY", - NameStr(attr->attname)))); - cstate->force_quote_flags[attnum - 1] = true; - } - } - - /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ - cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); - if (cstate->force_notnull) - { - List *attnums; - ListCell *cur; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull); - - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (!list_member_int(cstate->attnumlist, attnum)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY", - NameStr(attr->attname)))); - cstate->force_notnull_flags[attnum - 1] = true; - } - } - - /* Convert FORCE_NULL name list to per-column flags, check validity */ - cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); - if (cstate->force_null) - { - List *attnums; - ListCell *cur; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null); - - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (!list_member_int(cstate->attnumlist, attnum)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("FORCE_NULL column \"%s\" not referenced by COPY", - NameStr(attr->attname)))); - cstate->force_null_flags[attnum - 1] = true; - } - } - - /* Convert convert_selectively name list to per-column flags */ - if (cstate->convert_selectively) - { - List *attnums; - ListCell *cur; - - cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); - - attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select); - - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (!list_member_int(cstate->attnumlist, attnum)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg_internal("selected column \"%s\" not referenced by COPY", - NameStr(attr->attname)))); - cstate->convert_select_flags[attnum - 1] = true; - } - } - - /* Use client encoding when ENCODING option is not specified. */ - if (cstate->file_encoding < 0) - cstate->file_encoding = pg_get_client_encoding(); - - /* - * Set up encoding conversion info. Even if the file and server encodings - * are the same, we must apply pg_any_to_server() to validate data in - * multibyte encodings. - */ - cstate->need_transcoding = - (cstate->file_encoding != GetDatabaseEncoding() || - pg_database_encoding_max_length() > 1); - /* See Multibyte encoding comment above */ - cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - + PopulateAttributes(cstate, tupDesc, attnamelist); cstate->copy_dest = COPY_FILE; /* default */ MemoryContextSwitchTo(oldcontext); @@ -2638,41 +4284,67 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; - /* Record this slot as being used */ - buffer->nused++; + /* Record this slot as being used */ + buffer->nused++; + + /* Update how many tuples are stored and their size */ + miinfo->bufferedTuples++; + miinfo->bufferedBytes += tuplen; +} + +/* + * ExecBeforeStmtTrigger - Execute the before statement trigger, this will be + * executed for parallel copy by the leader process. + */ +static void +ExecBeforeStmtTrigger(CopyState cstate) +{ + EState *estate = CreateExecutorState(); + ResultRelInfo *resultRelInfo; + + Assert(IsLeader()); + + /* + * We need a ResultRelInfo so we can use the regular executor's + * index-entry-making machinery. (There used to be a huge amount of code + * here that basically duplicated execUtils.c ...) + */ + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, + cstate->rel, + 1, /* must match rel's position in range_table */ + NULL, + 0); + + /* Verify the named relation is a valid target for INSERT */ + CheckValidResultRel(resultRelInfo, CMD_INSERT); + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + ExecInitRangeTable(estate, cstate->range_table); + + /* + * Check BEFORE STATEMENT insertion triggers. It's debatable whether we + * should do this for COPY, since it's not really an "INSERT" statement as + * such. However, executing these triggers maintains consistency with the + * EACH ROW triggers that we already fire on COPY. + */ + ExecBSInsertTriggers(estate, resultRelInfo); + + /* Close any trigger target relations */ + ExecCleanUpTriggerState(estate); - /* Update how many tuples are stored and their size */ - miinfo->bufferedTuples++; - miinfo->bufferedBytes += tuplen; + FreeExecutorState(estate); } /* - * Copy FROM file to relation. + * Check if the relation specified in copy from is valid. */ -uint64 -CopyFrom(CopyState cstate) +static void +CheckCopyFromValidity(CopyState cstate) { - ResultRelInfo *resultRelInfo; - ResultRelInfo *target_resultRelInfo; - ResultRelInfo *prevResultRelInfo = NULL; - EState *estate = CreateExecutorState(); /* for ExecConstraints() */ - ModifyTableState *mtstate; - ExprContext *econtext; - TupleTableSlot *singleslot = NULL; - MemoryContext oldcontext = CurrentMemoryContext; - - PartitionTupleRouting *proute = NULL; - ErrorContextCallback errcallback; - CommandId mycid = GetCurrentCommandId(true); - int ti_options = 0; /* start with default options for insert */ - BulkInsertState bistate = NULL; - CopyInsertMethod insertMethod; - CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ - uint64 processed = 0; - bool has_before_insert_row_trig; - bool has_instead_insert_row_trig; - bool leafpart_use_multi_insert = false; - Assert(cstate->rel); /* @@ -2708,6 +4380,44 @@ CopyFrom(CopyState cstate) errmsg("cannot copy to non-table relation \"%s\"", RelationGetRelationName(cstate->rel)))); } +} + +/* + * Copy FROM file to relation. + */ +uint64 +CopyFrom(CopyState cstate) +{ + ResultRelInfo *resultRelInfo; + ResultRelInfo *target_resultRelInfo; + ResultRelInfo *prevResultRelInfo = NULL; + EState *estate = CreateExecutorState(); /* for ExecConstraints() */ + ModifyTableState *mtstate; + ExprContext *econtext; + TupleTableSlot *singleslot = NULL; + MemoryContext oldcontext = CurrentMemoryContext; + + PartitionTupleRouting *proute = NULL; + ErrorContextCallback errcallback; + CommandId mycid = IsParallelCopy() ? cstate->pcdata->pcshared_info->mycid : + GetCurrentCommandId(true); + int ti_options = 0; /* start with default options for insert */ + BulkInsertState bistate = NULL; + CopyInsertMethod insertMethod; + CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ + uint64 processed = 0; + bool has_before_insert_row_trig; + bool has_instead_insert_row_trig; + bool leafpart_use_multi_insert = false; + + /* + * Perform this check if it is not parallel copy. In case of parallel + * copy, this check is done by the leader, so that if any invalid case + * exist the copy from command will error out from the leader itself, + * avoiding launching workers, just to throw error. + */ + if (!IsParallelCopy()) + CheckCopyFromValidity(cstate); /* * If the target file is new-in-transaction, we assume that checking FSM @@ -2934,13 +4644,16 @@ CopyFrom(CopyState cstate) has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row); - /* - * Check BEFORE STATEMENT insertion triggers. It's debatable whether we - * should do this for COPY, since it's not really an "INSERT" statement as - * such. However, executing these triggers maintains consistency with the - * EACH ROW triggers that we already fire on COPY. - */ - ExecBSInsertTriggers(estate, resultRelInfo); + if (!IsParallelCopy()) + { + /* + * Check BEFORE STATEMENT insertion triggers. It's debatable whether we + * should do this for COPY, since it's not really an "INSERT" statement as + * such. However, executing these triggers maintains consistency with the + * EACH ROW triggers that we already fire on COPY. + */ + ExecBSInsertTriggers(estate, resultRelInfo); + } econtext = GetPerTupleExprContext(estate); @@ -3262,7 +4975,7 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - processed++; + INCREMENTPROCESSED(processed) } } @@ -3317,30 +5030,14 @@ CopyFrom(CopyState cstate) FreeExecutorState(estate); - return processed; + GETPROCESSED(processed) } /* - * Setup to read tuples from a file for COPY FROM. - * - * 'rel': Used as a template for the tuples - * 'filename': Name of server-local file to read - * 'attnamelist': List of char *, columns to include. NIL selects all cols. - * 'options': List of DefElem. See copy_opt_item in gram.y for selections. - * - * Returns a CopyState, to be passed to NextCopyFrom and related functions. + * PopulateCatalogInformation - populate the catalog information. */ -CopyState -BeginCopyFrom(ParseState *pstate, - Relation rel, - const char *filename, - bool is_program, - copy_data_source_cb data_source_cb, - List *attnamelist, - List *options) +void PopulateCatalogInformation(CopyState cstate) { - CopyState cstate; - bool pipe = (filename == NULL); TupleDesc tupDesc; AttrNumber num_phys_attrs, num_defaults; @@ -3350,31 +5047,8 @@ BeginCopyFrom(ParseState *pstate, Oid in_func_oid; int *defmap; ExprState **defexprs; - MemoryContext oldcontext; bool volatile_defexprs; - cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options); - oldcontext = MemoryContextSwitchTo(cstate->copycontext); - - /* Initialize state variables */ - cstate->reached_eof = false; - cstate->eol_type = EOL_UNKNOWN; - cstate->cur_relname = RelationGetRelationName(cstate->rel); - cstate->cur_lineno = 0; - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - - /* Set up variables to avoid per-attribute overhead. */ - initStringInfo(&cstate->attribute_buf); - initStringInfo(&cstate->line_buf); - cstate->line_buf_converted = false; - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); - cstate->raw_buf_index = cstate->raw_buf_len = 0; - - /* Assign range table, we'll need it in CopyFrom. */ - if (pstate) - cstate->range_table = pstate->p_rtable; - tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; num_defaults = 0; @@ -3452,6 +5126,55 @@ BeginCopyFrom(ParseState *pstate, cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; +} + +/* + * Setup to read tuples from a file for COPY FROM. + * + * 'rel': Used as a template for the tuples + * 'filename': Name of server-local file to read + * 'attnamelist': List of char *, columns to include. NIL selects all cols. + * 'options': List of DefElem. See copy_opt_item in gram.y for selections. + * + * Returns a CopyState, to be passed to NextCopyFrom and related functions. + */ +CopyState +BeginCopyFrom(ParseState *pstate, + Relation rel, + const char *filename, + bool is_program, + copy_data_source_cb data_source_cb, + List *attnamelist, + List *options) +{ + CopyState cstate; + bool pipe = (filename == NULL); + MemoryContext oldcontext; + + cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, + options); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + /* Initialize state variables */ + cstate->reached_eof = false; + cstate->eol_type = EOL_UNKNOWN; + cstate->cur_relname = RelationGetRelationName(cstate->rel); + cstate->cur_lineno = 0; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + + /* Set up variables to avoid per-attribute overhead. */ + initStringInfo(&cstate->attribute_buf); + initStringInfo(&cstate->line_buf); + cstate->line_buf_converted = false; + cstate->raw_buf = (IsParallelCopy()) ? NULL : (char *) palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf_index = cstate->raw_buf_len = 0; + + /* Assign range table, we'll need it in CopyFrom. */ + if (pstate) + cstate->range_table = pstate->p_rtable; + + PopulateCatalogInformation(cstate); cstate->is_program = is_program; if (data_source_cb) @@ -3588,26 +5311,35 @@ NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields) /* only available for text or csv input */ Assert(!cstate->binary); - /* on input just throw the header line away */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (IsParallelCopy()) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) - return false; /* done */ + done = GetWorkerChunk(cstate); + if (done && cstate->line_buf.len == 0) + return false; } + else + { + /* on input just throw the header line away */ + if (cstate->cur_lineno == 0 && cstate->header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + return false; /* done */ + } - cstate->cur_lineno++; + cstate->cur_lineno++; - /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + /* Actually read the line into memory here */ + done = CopyReadLine(cstate); - /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. - */ - if (done && cstate->line_buf.len == 0) - return false; + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + return false; + } /* Parse the line into de-escaped field values */ if (cstate->csv_mode) @@ -3828,6 +5560,61 @@ EndCopyFrom(CopyState cstate) } /* + * ClearEOLFromParallelChunk - Clear EOL from the copied data. + */ +static int +ClearEOLFromParallelChunk(CopyState cstate, CopyBufferState *copy_buff_state) +{ + /* raw_buf_ptr will be pointing to the next char that need to be read. */ + int cur_pos = (copy_buff_state->raw_buf_ptr == 0) ? RAW_BUF_SIZE - 1: copy_buff_state->raw_buf_ptr - 1; + CopyDataBlock *data_blk_ptr = copy_buff_state->data_blk_ptr; + CopyDataBlock *curr_data_blk_ptr = copy_buff_state->curr_data_blk_ptr; + int new_line_size = 0; + PG_USED_FOR_ASSERTS_ONLY char ch; + + /* + * If we didn't hit EOF, then we must have transferred the EOL marker + * to line_buf along with the data. Get rid of it. + */ + switch (cstate->eol_type) + { + case EOL_NL: + Assert(copy_buff_state->chunk_size >= 1); + Assert(curr_data_blk_ptr->data[cur_pos] == '\n'); + copy_buff_state->chunk_size -= 1; + curr_data_blk_ptr->data[cur_pos] = '\0'; + new_line_size = 1; + break; + case EOL_CR: + Assert(copy_buff_state->chunk_size >= 1); + Assert(curr_data_blk_ptr->data[cur_pos] == '\r'); + copy_buff_state->chunk_size -= 1; + curr_data_blk_ptr->data[cur_pos] = '\0'; + new_line_size = 1; + break; + case EOL_CRNL: + Assert(copy_buff_state->chunk_size >= 2); + + if (cur_pos >= 1) + ch = curr_data_blk_ptr->data[cur_pos - 1]; + else + ch = data_blk_ptr->data[RAW_BUF_SIZE - 1]; + Assert(ch == '\r'); + Assert(curr_data_blk_ptr->data[cur_pos] == '\n'); + copy_buff_state->chunk_size -= 2; + curr_data_blk_ptr->data[cur_pos] = '\0'; + new_line_size = 2; + break; + case EOL_UNKNOWN: + /* shouldn't get here */ + Assert(false); + break; + } + + return new_line_size; +} + +/* * Read the next input line and stash it in line_buf, with conversion to * server encoding. * @@ -3839,7 +5626,6 @@ static bool CopyReadLine(CopyState cstate) { bool result; - resetStringInfo(&cstate->line_buf); cstate->line_buf_valid = true; @@ -3858,66 +5644,40 @@ CopyReadLine(CopyState cstate) */ if (cstate->copy_dest == COPY_NEW_FE) { + bool bIsFirst = true; do { - cstate->raw_buf_index = cstate->raw_buf_len; - } while (CopyLoadRawBuf(cstate)); - } - } - else - { - /* - * If we didn't hit EOF, then we must have transferred the EOL marker - * to line_buf along with the data. Get rid of it. - */ - switch (cstate->eol_type) - { - case EOL_NL: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CR: - Assert(cstate->line_buf.len >= 1); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); - cstate->line_buf.len--; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_CRNL: - Assert(cstate->line_buf.len >= 2); - Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); - Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); - cstate->line_buf.len -= 2; - cstate->line_buf.data[cstate->line_buf.len] = '\0'; - break; - case EOL_UNKNOWN: - /* shouldn't get here */ - Assert(false); - break; - } - } - - /* Done reading the line. Convert it to server encoding. */ - if (cstate->need_transcoding) - { - char *cvt; + if (!IsParallelCopy()) + cstate->raw_buf_index = cstate->raw_buf_len; + else + { + if (cstate->raw_buf_index == RAW_BUF_SIZE) + { + /* Get a new block if it is the first time */ + if (bIsFirst) + { + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint32 block_pos = WaitGetFreeCopyBlock(pcshared_info); + cstate->raw_buf = pcshared_info->data_blocks[block_pos].data; + cstate->raw_buf_index = cstate->raw_buf_len = 0; + bIsFirst = false; + } + else + { + /* + * From the subsequent time, reset the index and + * re-use the same block. + */ + cstate->raw_buf_index = cstate->raw_buf_len = 0; + } + } + } - cvt = pg_any_to_server(cstate->line_buf.data, - cstate->line_buf.len, - cstate->file_encoding); - if (cvt != cstate->line_buf.data) - { - /* transfer converted data back to line_buf */ - resetStringInfo(&cstate->line_buf); - appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); - pfree(cvt); + } while (CopyLoadRawBuf(cstate, cstate->raw_buf_len, &cstate->raw_buf_index)); } } - /* Now it's safe to use the buffer in error messages */ - cstate->line_buf_converted = true; - + CONVERT_TO_SERVER_ENCODING(cstate) return result; } @@ -3927,9 +5687,6 @@ CopyReadLine(CopyState cstate) static bool CopyReadLineText(CopyState cstate) { - char *copy_raw_buf; - int raw_buf_ptr; - int copy_buf_len; bool need_data = false; bool hit_eof = false; bool result = false; @@ -3942,6 +5699,11 @@ CopyReadLineText(CopyState cstate) char quotec = '\0'; char escapec = '\0'; + int chunk_pos = 0; + uint32 chunk_first_block = 0; + uint32 new_line_size = 0; + CopyBufferState copy_buff_state = {0}; + if (cstate->csv_mode) { quotec = cstate->quote[0]; @@ -3974,14 +5736,13 @@ CopyReadLineText(CopyState cstate) * For a little extra speed within the loop, we copy raw_buf and * raw_buf_len into local variables. */ - copy_raw_buf = cstate->raw_buf; - raw_buf_ptr = cstate->raw_buf_index; - copy_buf_len = cstate->raw_buf_len; - + BEGIN_READ_LINE(cstate, chunk_first_block) for (;;) { int prev_raw_ptr; char c; + uint32 prev_chunk_size = copy_buff_state.chunk_size; + int prev_copy_buf_len = copy_buff_state.copy_buf_len; /* * Load more data if needed. Ideally we would just force four bytes @@ -3993,35 +5754,70 @@ CopyReadLineText(CopyState cstate) * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it, * considering the size of the buffer. */ - if (raw_buf_ptr >= copy_buf_len || need_data) + if (copy_buff_state.raw_buf_ptr >= copy_buff_state.copy_buf_len || + need_data) { + uint32 remaining_data; REFILL_LINEBUF; + /* In parallel mode, read as much data as possible to a new block */ + if (IsParallelCopy()) + SET_RAWBUF_FOR_LOAD() + + remaining_data = copy_buff_state.copy_buf_len - copy_buff_state.raw_buf_ptr; + /* * Try to read some more data. This will certainly reset * raw_buf_index to zero, and raw_buf_ptr must go with it. */ - if (!CopyLoadRawBuf(cstate)) + if (!CopyLoadRawBuf(cstate, copy_buff_state.copy_buf_len, + ©_buff_state.raw_buf_ptr)) hit_eof = true; - raw_buf_ptr = 0; - copy_buf_len = cstate->raw_buf_len; + + remaining_data += cstate->raw_buf_len; /* - * If we are completely out of data, break out of the loop, - * reporting EOF. + * If all the data of previous block is not consumed, set raw_buf back to + * previous block. */ - if (copy_buf_len <= 0) + if (IsParallelCopy() && need_data && + (copy_buff_state.curr_data_blk_ptr != copy_buff_state.data_blk_ptr)) + { + copy_buff_state.copy_buf_len += cstate->raw_buf_len; + cstate->raw_buf = copy_buff_state.data_blk_ptr->data; + } + else + copy_buff_state.copy_buf_len = cstate->raw_buf_len; + + need_data = false; + if (remaining_data <= 0) { + /* + * If we are completely out of data, break out of the loop, + * reporting EOF. + */ result = true; break; } - need_data = false; } - /* OK to fetch a character */ - prev_raw_ptr = raw_buf_ptr; - c = copy_raw_buf[raw_buf_ptr++]; + /* + * Store the current information, we might have to reset if we find that + * enough data is not present while reading. If enough data is not + * present, we will reset using the current information and load more + * data. + */ + prev_raw_ptr = copy_buff_state.raw_buf_ptr; + if (IsParallelCopy()) + { + prev_copy_buf_len = copy_buff_state.copy_buf_len; + prev_chunk_size = copy_buff_state.chunk_size; + copy_buff_state.block_switched = false; + } + /* OK to fetch a character */ + c = copy_buff_state.copy_raw_buf[copy_buff_state.raw_buf_ptr]; + SEEK_COPY_BUFF_POS(cstate, 1, copy_buff_state) if (cstate->csv_mode) { /* @@ -4079,11 +5875,10 @@ CopyReadLineText(CopyState cstate) IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); /* get next char */ - c = copy_raw_buf[raw_buf_ptr]; - + c = copy_buff_state.copy_raw_buf[copy_buff_state.raw_buf_ptr]; if (c == '\n') { - raw_buf_ptr++; /* eat newline */ + SEEK_COPY_BUFF_POS(cstate, 1, copy_buff_state) /* eat newline */ cstate->eol_type = EOL_CRNL; /* in case not set yet */ } else @@ -4153,11 +5948,11 @@ CopyReadLineText(CopyState cstate) * through and continue processing for file encoding. * ----- */ - c2 = copy_raw_buf[raw_buf_ptr]; + c2 = copy_buff_state.copy_raw_buf[copy_buff_state.raw_buf_ptr]; if (c2 == '.') { - raw_buf_ptr++; /* consume the '.' */ + SEEK_COPY_BUFF_POS(cstate, 1, copy_buff_state) /* consume the '.' */ /* * Note: if we loop back for more data here, it does not @@ -4169,7 +5964,8 @@ CopyReadLineText(CopyState cstate) /* Get the next character */ IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); /* if hit_eof, c2 will become '\0' */ - c2 = copy_raw_buf[raw_buf_ptr++]; + c2 = copy_buff_state.copy_raw_buf[copy_buff_state.raw_buf_ptr]; + SEEK_COPY_BUFF_POS(cstate, 1, copy_buff_state) if (c2 == '\n') { @@ -4194,7 +5990,8 @@ CopyReadLineText(CopyState cstate) /* Get the next character */ IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); /* if hit_eof, c2 will become '\0' */ - c2 = copy_raw_buf[raw_buf_ptr++]; + c2 = copy_buff_state.copy_raw_buf[copy_buff_state.raw_buf_ptr]; + SEEK_COPY_BUFF_POS(cstate, 1, copy_buff_state) if (c2 != '\r' && c2 != '\n') { @@ -4220,14 +6017,21 @@ CopyReadLineText(CopyState cstate) * discard the data and the \. sequence. */ if (prev_raw_ptr > cstate->raw_buf_index) - appendBinaryStringInfo(&cstate->line_buf, + { + if (!IsParallelCopy()) + appendBinaryStringInfo(&cstate->line_buf, cstate->raw_buf + cstate->raw_buf_index, prev_raw_ptr - cstate->raw_buf_index); - cstate->raw_buf_index = raw_buf_ptr; + else + copy_buff_state.chunk_size = prev_chunk_size; + } + + cstate->raw_buf_index = copy_buff_state.raw_buf_ptr; result = true; /* report EOF */ break; } else if (!cstate->csv_mode) + { /* * If we are here, it means we found a backslash followed by @@ -4240,7 +6044,8 @@ CopyReadLineText(CopyState cstate) * character after the backslash just like a normal character, * so we don't increment in those cases. */ - raw_buf_ptr++; + SEEK_COPY_BUFF_POS(cstate, 1, copy_buff_state) + } } /* @@ -4272,8 +6077,27 @@ not_end_of_copy: IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1); IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1); - raw_buf_ptr += mblen - 1; + SEEK_COPY_BUFF_POS(cstate, mblen - 1, copy_buff_state) + } + + /* + * Skip the header line. Update the chunk here, this cannot be done at + * the beginning, as there is a possibility that file contains empty + * lines. + */ + if (IsParallelCopy() && first_char_in_line && !IsHeaderLine()) + { + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; + ChunkBoundary *chunkInfo; + chunk_pos = UpdateBlockInChunkInfo(cstate, + chunk_first_block, + cstate->raw_buf_index, -1, + CHUNK_LEADER_POPULATING); + chunkInfo = &pcshared_info->chunk_boundaries.ring[chunk_pos]; + elog(DEBUG1, "[Leader] Adding - block:%d, offset:%d, chunk position:%d", + chunk_first_block, chunkInfo->start_offset, chunk_pos); } + first_char_in_line = false; } /* end of outer loop */ @@ -4281,6 +6105,11 @@ not_end_of_copy: * Transfer any still-uncopied data to line_buf. */ REFILL_LINEBUF; + CLEAR_EOL_LINE() + + /* Skip the header line */ + if (IsParallelCopy()) + END_CHUNK_PARALLEL_COPY() return result; } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 0c6fe01..3faadb8 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -865,7 +865,7 @@ is_parallel_safe(PlannerInfo *root, Node *node) * planning, because those are parallel-restricted and there might be one * in this expression. But otherwise we don't need to look. */ - if (root->glob->maxParallelHazard == PROPARALLEL_SAFE && + if (root != NULL && root->glob->maxParallelHazard == PROPARALLEL_SAFE && root->glob->paramExecTypes == NIL) return true; /* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index c27d970..b3787c1 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -557,7 +557,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel) * Data source callback for the COPY FROM, which reads from the remote * connection and passes the data back to our local COPY. */ -static int +int copy_read_data(void *outbuf, int minread, int maxread) { int bytesread = 0; diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 7ee04ba..6933ade 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -381,6 +381,7 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); +extern void AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833..5dc95ac 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,6 +14,7 @@ #ifndef COPY_H #define COPY_H +#include "access/parallel.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" @@ -41,4 +42,7 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); +extern void ParallelCopyMain(dsm_segment *seg, shm_toc *toc); + +extern int copy_read_data(void *outbuf, int minread, int maxread); #endif /* COPY_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 05c5e9c..ad5fbd0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -337,6 +337,9 @@ CheckpointStatsData CheckpointerRequest CheckpointerShmemStruct Chromosome +ChunkBoundaries +ChunkBoundary +ChunkState CkptSortItem CkptTsStatus ClientAuthentication_hook_type @@ -419,6 +422,8 @@ ConvProcInfo ConversionLocation ConvertRowtypeExpr CookedConstraint +CopyBufferState +CopyDataBlock CopyDest CopyInsertMethod CopyMultiInsertBuffer @@ -426,6 +431,7 @@ CopyMultiInsertInfo CopyState CopyStateData CopyStmt +CopyWorkerCommonData Cost CostSelector Counters @@ -1278,6 +1284,7 @@ LimitStateCond List ListCell ListDictionary +ListInfo ListParsedLex ListenAction ListenActionKind @@ -1699,6 +1706,8 @@ ParallelBitmapHeapState ParallelBlockTableScanDesc ParallelCompletionPtr ParallelContext +ParallelCopyData +ParallelCopyLineBuf ParallelExecutorInfo ParallelHashGrowth ParallelHashJoinBatch @@ -2264,6 +2273,7 @@ Sharedsort ShellTypeInfo ShippableCacheEntry ShippableCacheKey +ShmCopyInfo ShmemIndexEnt ShutdownForeignScan_function ShutdownInformation -- 1.8.3.1