From 84a641370debe73a8e4f32b2628d0cb2d21ab75b Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Tue, 22 Sep 2020 13:54:45 +0530 Subject: [PATCH v5 2/6] Framework for leader/worker in parallel copy This patch has the framework for data structures in parallel copy, leader initialization, worker initialization, shared memory updation, starting workers, wait for workers and workers exiting. --- src/backend/access/transam/parallel.c | 4 + src/backend/commands/copy.c | 742 +++++++++++++++++++++++++++++++++- src/include/commands/copy.h | 2 + src/tools/pgindent/typedefs.list | 7 + 4 files changed, 753 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index b042696..a3cff4b 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -25,6 +25,7 @@ #include "catalog/pg_enum.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/copy.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -145,6 +146,9 @@ static const struct }, { "parallel_vacuum_main", parallel_vacuum_main + }, + { + "ParallelCopyMain", ParallelCopyMain } }; diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cf7277a..cf16109 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -96,9 +96,183 @@ typedef enum CopyInsertMethod } CopyInsertMethod; #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + +/* + * The macros DATA_BLOCK_SIZE, RINGSIZE & MAX_BLOCKS_COUNT stores the records + * read from the file that need to be inserted into the relation. These values + * help in handover of multiple records with significant size of data to be + * processed by each of the workers to make sure there is no context switch & the + * work is fairly distributed among the workers. This number showed best + * results in the performance tests. + */ +#define DATA_BLOCK_SIZE RAW_BUF_SIZE + +/* It can hold 1000 blocks of 64K data in DSM to be processed by the worker. */ +#define MAX_BLOCKS_COUNT 1000 + +/* + * It can hold upto 10000 record information for worker to process. RINGSIZE + * should be a multiple of WORKER_CHUNK_COUNT, as wrap around cases is currently + * not handled while selecting the WORKER_CHUNK_COUNT by the worker. + */ +#define RINGSIZE (10 * 1000) + +/* + * Each worker will be allocated WORKER_CHUNK_COUNT of records from DSM data + * block to process to avoid lock contention. Read RINGSIZE comments before + * changing this value. + */ +#define WORKER_CHUNK_COUNT 50 + +#define IsParallelCopy() (cstate->is_parallel) +#define IsLeader() (cstate->pcdata->is_leader) #define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) /* + * Copy data block information. + * ParallelCopyDataBlock's will be created in DSM. Data read from file will be + * copied in these DSM data blocks. The leader process identifies the records + * and the record information will be shared to the workers. The workers will + * insert the records into the table. There can be one or more number of records + * in each of the data block based on the record size. + */ +typedef struct ParallelCopyDataBlock +{ + /* The number of unprocessed lines in the current block. */ + pg_atomic_uint32 unprocessed_line_parts; + + /* + * If the current line data is continued into another block, following_block + * will have the position where the remaining data need to be read. + */ + uint32 following_block; + + /* + * This flag will be set, when the leader finds out this block can be read + * safely by the worker. This helps the worker to start processing the line + * early where the line will be spread across many blocks and the worker + * need not wait for the complete line to be processed. + */ + bool curr_blk_completed; + char data[DATA_BLOCK_SIZE]; /* data read from file */ + uint8 skip_bytes; +} ParallelCopyDataBlock; + +/* + * Individual line information. + * ParallelCopyLineBoundary is common data structure between leader & worker, + * Leader process will be populating data block, data block offset & the size of + * the record in DSM for the workers to copy the data into the relation. + * This is protected by the following sequence in the leader & worker. If they + * don't follow this order the worker might process wrong line_size and leader + * might populate the information which worker has not yet processed or in the + * process of processing. + * Leader should operate in the following order: + * 1) check if line_size is -1, if not wait, it means worker is still + * processing. + * 2) set line_state to LINE_LEADER_POPULATING. + * 3) update first_block, start_offset & cur_lineno in any order. + * 4) update line_size. + * 5) update line_state to LINE_LEADER_POPULATED. + * Worker should operate in the following order: + * 1) check line_state is LINE_LEADER_POPULATED, if not it means leader is still + * populating the data. + * 2) read line_size. + * 3) only one worker should choose one line for processing, this is handled by + * using pg_atomic_compare_exchange_u32, worker will change the state to + * LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED. + * 4) read first_block, start_offset & cur_lineno in any order. + * 5) process line_size data. + * 6) update line_size to -1. + */ +typedef struct ParallelCopyLineBoundary +{ + /* Position of the first block in data_blocks array. */ + uint32 first_block; + uint32 start_offset; /* start offset of the line */ + + /* + * Size of the current line -1 means line is yet to be filled completely, + * 0 means empty line, >0 means line filled with line size data. + */ + pg_atomic_uint32 line_size; + pg_atomic_uint32 line_state; /* line state */ + uint64 cur_lineno; /* line number for error messages */ +} ParallelCopyLineBoundary; + +/* + * Circular queue used to store the line information. + */ +typedef struct ParallelCopyLineBoundaries +{ + /* Position for the leader to populate a line. */ + uint32 pos; + + /* Data read from the file/stdin by the leader process. */ + ParallelCopyLineBoundary ring[RINGSIZE]; +} ParallelCopyLineBoundaries; + +/* + * Shared information among parallel copy workers. This will be allocated in the + * DSM segment. + */ +typedef struct ParallelCopyShmInfo +{ + bool is_read_in_progress; /* file read status */ + + /* + * Actual lines inserted by worker, will not be same as + * total_worker_processed if where condition is specified along with copy. + * This will be the actual records inserted into the relation. + */ + pg_atomic_uint64 processed; + + /* + * The number of records currently processed by the worker, this will also + * include the number of records that was filtered because of where clause. + */ + pg_atomic_uint64 total_worker_processed; + uint64 populated; /* lines populated by leader */ + uint32 cur_block_pos; /* current data block */ + ParallelCopyDataBlock data_blocks[MAX_BLOCKS_COUNT]; /* data block array */ + FullTransactionId full_transaction_id; /* xid for copy from statement */ + CommandId mycid; /* command id */ + ParallelCopyLineBoundaries line_boundaries; /* line array */ +} ParallelCopyShmInfo; + +/* + * Parallel copy line buffer information. + */ +typedef struct ParallelCopyLineBuf +{ + StringInfoData line_buf; + uint64 cur_lineno; /* line number for error messages */ +} ParallelCopyLineBuf; + +/* + * Parallel copy data information. + */ +typedef struct ParallelCopyData +{ + Oid relid; /* relation id of the table */ + ParallelCopyShmInfo *pcshared_info; /* common info in shared memory */ + bool is_leader; + + /* line position which worker is processing */ + uint32 worker_processed_pos; + + /* + * Local line_buf array, workers will copy it here and release the lines + * for the leader to continue. + */ + ParallelCopyLineBuf worker_line_buf[WORKER_CHUNK_COUNT]; + uint32 worker_line_buf_count; /* Number of lines */ + + /* Current position in worker_line_buf */ + uint32 worker_line_buf_pos; +} ParallelCopyData; + +/* * This struct contains all the state variables used throughout a COPY * operation. For simplicity, we use the same struct for all variants of COPY, * even though some fields are used in only some cases. @@ -230,10 +404,38 @@ typedef struct CopyStateData char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ + int nworkers; + bool is_parallel; + ParallelCopyData *pcdata; /* Shorthand for number of unconsumed bytes available in raw_buf */ #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) } CopyStateData; +/* + * This structure helps in storing the common data from CopyStateData that are + * required by the workers. This information will then be allocated and stored + * into the DSM for the worker to retrieve and copy it to CopyStateData. + */ +typedef struct SerializedParallelCopyState +{ + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + bool csv_mode; /* Comma Separated Value format? */ + bool header_line; /* CSV header line? */ + int null_print_len; /* length of same */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool convert_selectively; /* do selective binary conversion? */ + + /* Working state for COPY FROM */ + AttrNumber num_defaults; + Oid relid; +} SerializedParallelCopyState; + /* DestReceiver for COPY (query) TO */ typedef struct { @@ -263,6 +465,22 @@ typedef struct /* Trim the list of buffers back down to this number after flushing */ #define MAX_PARTITION_BUFFERS 32 +/* DSM keys for parallel copy. */ +#define PARALLEL_COPY_KEY_SHARED_INFO 1 +#define PARALLEL_COPY_KEY_CSTATE 2 +#define PARALLEL_COPY_KEY_NULL_PRINT 3 +#define PARALLEL_COPY_KEY_DELIM 4 +#define PARALLEL_COPY_KEY_QUOTE 5 +#define PARALLEL_COPY_KEY_ESCAPE 6 +#define PARALLEL_COPY_KEY_ATTNAME_LIST 7 +#define PARALLEL_COPY_KEY_NOT_NULL_LIST 8 +#define PARALLEL_COPY_KEY_NULL_LIST 9 +#define PARALLEL_COPY_KEY_CONVERT_LIST 10 +#define PARALLEL_COPY_KEY_WHERE_CLAUSE_STR 11 +#define PARALLEL_COPY_KEY_RANGE_TABLE 12 +#define PARALLEL_COPY_WAL_USAGE 13 +#define PARALLEL_COPY_BUFFER_USAGE 14 + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { @@ -424,11 +642,478 @@ static bool CopyGetInt16(CopyState cstate, int16 *val); static bool CopyLoadRawBuf(CopyState cstate); static int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); +static pg_attribute_always_inline void EndParallelCopy(ParallelContext *pcxt); static void PopulateCommonCstateInfo(CopyState cstate, TupleDesc tup_desc, List *attnamelist); static void ClearEOLFromCopiedData(CopyState cstate, char *copy_line_data, int copy_line_pos, int *copy_line_size); static void ConvertToServerEncoding(CopyState cstate); + + +/* + * SerializeParallelCopyState - Copy shared_cstate using cstate information. + */ +static pg_attribute_always_inline void +SerializeParallelCopyState(CopyState cstate, SerializedParallelCopyState *shared_cstate) +{ + shared_cstate->copy_dest = cstate->copy_dest; + shared_cstate->file_encoding = cstate->file_encoding; + shared_cstate->need_transcoding = cstate->need_transcoding; + shared_cstate->encoding_embeds_ascii = cstate->encoding_embeds_ascii; + shared_cstate->csv_mode = cstate->csv_mode; + shared_cstate->header_line = cstate->header_line; + shared_cstate->null_print_len = cstate->null_print_len; + shared_cstate->force_quote_all = cstate->force_quote_all; + shared_cstate->convert_selectively = cstate->convert_selectively; + shared_cstate->num_defaults = cstate->num_defaults; + shared_cstate->relid = cstate->pcdata->relid; +} + +/* + * RestoreString - Retrieve the string from shared memory. + */ +static void +RestoreString(shm_toc *toc, int sharedkey, char **copystr) +{ + char *shared_str_val = (char *) shm_toc_lookup(toc, sharedkey, true); + if (shared_str_val) + *copystr = pstrdup(shared_str_val); +} + +/* + * EstimateLineKeysStr - Estimate the size required in shared memory for the + * input string. + */ +static void +EstimateLineKeysStr(ParallelContext *pcxt, char *inputstr) +{ + if (inputstr) + { + shm_toc_estimate_chunk(&pcxt->estimator, strlen(inputstr) + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* + * SerializeString - Insert a string into shared memory. + */ +static void +SerializeString(ParallelContext *pcxt, int key, char *inputstr) +{ + if (inputstr) + { + char *shmptr = (char *) shm_toc_allocate(pcxt->toc, + strlen(inputstr) + 1); + strcpy(shmptr, inputstr); + shm_toc_insert(pcxt->toc, key, shmptr); + } +} + +/* + * PopulateParallelCopyShmInfo - Set ParallelCopyShmInfo. + */ +static void +PopulateParallelCopyShmInfo(ParallelCopyShmInfo *shared_info_ptr, + FullTransactionId full_transaction_id) +{ + uint32 count; + + MemSet(shared_info_ptr, 0, sizeof(ParallelCopyShmInfo)); + shared_info_ptr->is_read_in_progress = true; + shared_info_ptr->cur_block_pos = -1; + shared_info_ptr->full_transaction_id = full_transaction_id; + shared_info_ptr->mycid = GetCurrentCommandId(true); + for (count = 0; count < RINGSIZE; count++) + { + ParallelCopyLineBoundary *lineInfo = &shared_info_ptr->line_boundaries.ring[count]; + pg_atomic_init_u32(&(lineInfo->line_size), -1); + } +} + +/* + * BeginParallelCopy - Start parallel copy tasks. + * + * Get the number of workers required to perform the parallel copy. The data + * structures that are required by the parallel workers will be initialized, the + * size required in DSM will be calculated and the necessary keys will be loaded + * in the DSM. The specified number of workers will then be launched. + */ +static ParallelContext* +BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) +{ + ParallelContext *pcxt; + ParallelCopyShmInfo *shared_info_ptr; + SerializedParallelCopyState *shared_cstate; + FullTransactionId full_transaction_id; + Size est_cstateshared; + char *whereClauseStr = NULL; + char *rangeTableStr = NULL; + char *attnameListStr = NULL; + char *notnullListStr = NULL; + char *nullListStr = NULL; + char *convertListStr = NULL; + int parallel_workers = 0; + WalUsage *walusage; + BufferUsage *bufferusage; + ParallelCopyData *pcdata; + MemoryContext oldcontext; + + parallel_workers = Min(nworkers, max_worker_processes); + + /* Can't perform copy in parallel */ + if (parallel_workers <= 0) + return NULL; + + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); + MemoryContextSwitchTo(oldcontext); + cstate->pcdata = pcdata; + + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "ParallelCopyMain", + parallel_workers); + Assert(pcxt->nworkers > 0); + + /* + * Estimate size for shared information for PARALLEL_COPY_KEY_SHARED_INFO + */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelCopyShmInfo)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate the size for shared information for PARALLEL_COPY_KEY_CSTATE */ + est_cstateshared = MAXALIGN(sizeof(SerializedParallelCopyState)); + shm_toc_estimate_chunk(&pcxt->estimator, est_cstateshared); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + EstimateLineKeysStr(pcxt, cstate->null_print); + EstimateLineKeysStr(pcxt, cstate->null_print_client); + EstimateLineKeysStr(pcxt, cstate->delim); + EstimateLineKeysStr(pcxt, cstate->quote); + EstimateLineKeysStr(pcxt, cstate->escape); + + if (cstate->whereClause != NULL) + { + whereClauseStr = nodeToString(cstate->whereClause); + EstimateLineKeysStr(pcxt, whereClauseStr); + } + + if (cstate->range_table != NULL) + { + rangeTableStr = nodeToString(cstate->range_table); + EstimateLineKeysStr(pcxt, rangeTableStr); + } + + /* Estimate the size for shared information for PARALLEL_COPY_KEY_XID. */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FullTransactionId)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_ATTNAME_LIST. + */ + if (attnamelist != NIL) + { + attnameListStr = nodeToString(attnamelist); + EstimateLineKeysStr(pcxt, attnameListStr); + } + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_NOT_NULL_LIST. + */ + if (cstate->force_notnull != NIL) + { + notnullListStr = nodeToString(cstate->force_notnull); + EstimateLineKeysStr(pcxt, notnullListStr); + } + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_NULL_LIST. + */ + if (cstate->force_null != NIL) + { + nullListStr = nodeToString(cstate->force_null); + EstimateLineKeysStr(pcxt, nullListStr); + } + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_CONVERT_LIST. + */ + if (cstate->convert_select != NIL) + { + convertListStr = nodeToString(cstate->convert_select); + EstimateLineKeysStr(pcxt, convertListStr); + } + + /* + * Estimate space for WalUsage and BufferUsage -- PARALLEL_COPY_WAL_USAGE + * and PARALLEL_COPY_BUFFER_USAGE. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial copy) */ + if (pcxt->seg == NULL) + { + EndParallelCopy(pcxt); + return NULL; + } + + /* Allocate shared memory for PARALLEL_COPY_KEY_SHARED_INFO */ + shared_info_ptr = (ParallelCopyShmInfo *) shm_toc_allocate(pcxt->toc, sizeof(ParallelCopyShmInfo)); + PopulateParallelCopyShmInfo(shared_info_ptr, full_transaction_id); + + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_SHARED_INFO, shared_info_ptr); + pcdata->pcshared_info = shared_info_ptr; + pcdata->relid = relid; + + /* Store shared build state, for which we reserved space. */ + shared_cstate = (SerializedParallelCopyState *) shm_toc_allocate(pcxt->toc, est_cstateshared); + + /* copy cstate variables. */ + SerializeParallelCopyState(cstate, shared_cstate); + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_CSTATE, shared_cstate); + + SerializeString(pcxt, PARALLEL_COPY_KEY_NULL_PRINT, cstate->null_print); + SerializeString(pcxt, PARALLEL_COPY_KEY_DELIM, cstate->delim); + SerializeString(pcxt, PARALLEL_COPY_KEY_QUOTE, cstate->quote); + SerializeString(pcxt, PARALLEL_COPY_KEY_ESCAPE, cstate->escape); + SerializeString(pcxt, PARALLEL_COPY_KEY_ATTNAME_LIST, attnameListStr); + SerializeString(pcxt, PARALLEL_COPY_KEY_NOT_NULL_LIST, notnullListStr); + SerializeString(pcxt, PARALLEL_COPY_KEY_NULL_LIST, nullListStr); + SerializeString(pcxt, PARALLEL_COPY_KEY_CONVERT_LIST, convertListStr); + SerializeString(pcxt, PARALLEL_COPY_KEY_WHERE_CLAUSE_STR, whereClauseStr); + SerializeString(pcxt, PARALLEL_COPY_KEY_RANGE_TABLE, rangeTableStr); + + /* + * Allocate space for each worker's WalUsage and BufferUsage; no need to + * initialize. + */ + walusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_COPY_WAL_USAGE, walusage); + bufferusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_COPY_BUFFER_USAGE, bufferusage); + + LaunchParallelWorkers(pcxt); + if (pcxt->nworkers_launched == 0) + { + EndParallelCopy(pcxt); + return NULL; + } + + /* + * Caller needs to wait for all launched workers when we return. Make sure + * that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); + + pcdata->is_leader = true; + cstate->is_parallel = true; + return pcxt; +} + +/* + * EndParallelCopy - End the parallel copy tasks. + */ +static pg_attribute_always_inline void +EndParallelCopy(ParallelContext *pcxt) +{ + Assert(!IsParallelWorker()); + + DestroyParallelContext(pcxt); + ExitParallelMode(); +} + +/* + * InitializeParallelCopyInfo - Initialize parallel worker. + */ +static void +InitializeParallelCopyInfo(SerializedParallelCopyState *shared_cstate, + CopyState cstate, List *attnamelist) +{ + uint32 count; + ParallelCopyData *pcdata = cstate->pcdata; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + cstate->copy_dest = shared_cstate->copy_dest; + cstate->file_encoding = shared_cstate->file_encoding; + cstate->need_transcoding = shared_cstate->need_transcoding; + cstate->encoding_embeds_ascii = shared_cstate->encoding_embeds_ascii; + cstate->csv_mode = shared_cstate->csv_mode; + cstate->header_line = shared_cstate->header_line; + cstate->null_print_len = shared_cstate->null_print_len; + cstate->force_quote_all = shared_cstate->force_quote_all; + cstate->convert_selectively = shared_cstate->convert_selectively; + cstate->num_defaults = shared_cstate->num_defaults; + pcdata->relid = shared_cstate->relid; + + PopulateCommonCstateInfo(cstate, tup_desc, attnamelist); + + /* Initialize state variables. */ + cstate->reached_eof = false; + cstate->eol_type = EOL_UNKNOWN; + cstate->cur_relname = RelationGetRelationName(cstate->rel); + cstate->cur_lineno = 0; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + + /* Set up variables to avoid per-attribute overhead. */ + initStringInfo(&cstate->attribute_buf); + + initStringInfo(&cstate->line_buf); + for (count = 0; count < WORKER_CHUNK_COUNT; count++) + initStringInfo(&pcdata->worker_line_buf[count].line_buf); + + cstate->line_buf_converted = false; + cstate->raw_buf = NULL; + cstate->raw_buf_index = cstate->raw_buf_len = 0; +} + +/* + * ParallelCopyMain - Parallel copy worker's code. + * + * Where clause handling, convert tuple to columns, add default null values for + * the missing columns that are not present in that record. Find the partition + * if it is partitioned table, invoke before row insert Triggers, handle + * constraints and insert the tuples. + */ +void +ParallelCopyMain(dsm_segment *seg, shm_toc *toc) +{ + CopyState cstate; + ParallelCopyData *pcdata; + ParallelCopyShmInfo *pcshared_info; + SerializedParallelCopyState *shared_cstate; + Relation rel = NULL; + MemoryContext oldcontext; + List *attlist = NIL; + char *whereClauseStr = NULL; + char *rangeTableStr = NULL; + char *attnameListStr = NULL; + char *notnullListStr = NULL; + char *nullListStr = NULL; + char *convertListStr = NULL; + WalUsage *walusage; + BufferUsage *bufferusage; + + /* Allocate workspace and zero all fields. */ + cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); + + /* + * We allocate everything used by a cstate in a new memory context. This + * avoids memory leaks during repeated use of COPY in a query. + */ + cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY", + ALLOCSET_DEFAULT_SIZES); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); + cstate->pcdata = pcdata; + pcdata->is_leader = false; + pcdata->worker_processed_pos = -1; + cstate->is_parallel = true; + pcshared_info = (ParallelCopyShmInfo *) shm_toc_lookup(toc, PARALLEL_COPY_KEY_SHARED_INFO, false); + + ereport(DEBUG1, (errmsg("Starting parallel copy worker"))); + + pcdata->pcshared_info = pcshared_info; + + shared_cstate = (SerializedParallelCopyState *) shm_toc_lookup(toc, PARALLEL_COPY_KEY_CSTATE, false); + cstate->null_print = (char *) shm_toc_lookup(toc, PARALLEL_COPY_KEY_NULL_PRINT, true); + + RestoreString(toc, PARALLEL_COPY_KEY_DELIM, &cstate->delim); + RestoreString(toc, PARALLEL_COPY_KEY_QUOTE, &cstate->quote); + RestoreString(toc, PARALLEL_COPY_KEY_ESCAPE, &cstate->escape); + RestoreString(toc, PARALLEL_COPY_KEY_ATTNAME_LIST, &attnameListStr); + if (attnameListStr) + attlist = (List *) stringToNode(attnameListStr); + + RestoreString(toc, PARALLEL_COPY_KEY_NOT_NULL_LIST, ¬nullListStr); + if (notnullListStr) + cstate->force_notnull = (List *) stringToNode(notnullListStr); + + RestoreString(toc, PARALLEL_COPY_KEY_NULL_LIST, &nullListStr); + if (nullListStr) + cstate->force_null = (List *) stringToNode(nullListStr); + + RestoreString(toc, PARALLEL_COPY_KEY_CONVERT_LIST, &convertListStr); + if (convertListStr) + cstate->convert_select = (List *) stringToNode(convertListStr); + + RestoreString(toc, PARALLEL_COPY_KEY_WHERE_CLAUSE_STR, &whereClauseStr); + RestoreString(toc, PARALLEL_COPY_KEY_RANGE_TABLE, &rangeTableStr); + + if (whereClauseStr) + { + Node *whereClauseCnvrtdFrmStr = (Node *) stringToNode(whereClauseStr); + cstate->whereClause = whereClauseCnvrtdFrmStr; + } + + if (rangeTableStr) + { + List *rangeTableCnvrtdFrmStr = (List *) stringToNode(rangeTableStr); + cstate->range_table = rangeTableCnvrtdFrmStr; + } + + /* Open and lock the relation, using the appropriate lock type. */ + rel = table_open(shared_cstate->relid, RowExclusiveLock); + cstate->rel = rel; + InitializeParallelCopyInfo(shared_cstate, cstate, attlist); + + CopyFrom(cstate); + + if (rel != NULL) + table_close(rel, RowExclusiveLock); + + /* Report WAL/buffer usage during parallel execution */ + bufferusage = shm_toc_lookup(toc, PARALLEL_COPY_BUFFER_USAGE, false); + walusage = shm_toc_lookup(toc, PARALLEL_COPY_WAL_USAGE, false); + InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + &walusage[ParallelWorkerNumber]); + + MemoryContextSwitchTo(oldcontext); + pfree(cstate); + return; +} + +/* + * ParallelCopyFrom - parallel copy leader's functionality. + * + * Leader executes the before statement for before statement trigger, if before + * statement trigger is present. It will read the table data from the file and + * copy the contents to DSM data blocks. It will then read the input contents + * from the DSM data block and identify the records based on line breaks. This + * information is called line or a record that need to be inserted into a + * relation. The line information will be stored in ParallelCopyLineBoundary DSM + * data structure. Workers will then process this information and insert the + * data in to table. It will repeat this process until the all data is read from + * the file and all the DSM data blocks are processed. While processing if + * leader identifies that DSM Data blocks or DSM ParallelCopyLineBoundary data + * structures is full, leader will wait till the worker frees up some entries + * and repeat the process. It will wait till all the lines populated are + * processed by the workers and exits. + */ +static void +ParallelCopyFrom(CopyState cstate) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ereport(DEBUG1, (errmsg("Running parallel copy leader"))); + + pcshared_info->is_read_in_progress = false; + cstate->cur_lineno = 0; +} + /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. @@ -1141,6 +1826,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, if (is_from) { + ParallelContext *pcxt = NULL; Assert(rel); /* check read-only transaction and parallel mode */ @@ -1150,7 +1836,24 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); cstate->whereClause = whereClause; - *processed = CopyFrom(cstate); /* copy from file to database */ + cstate->is_parallel = false; + + if (cstate->nworkers > 0) + pcxt = BeginParallelCopy(cstate->nworkers, cstate, stmt->attlist, + relid); + + if (pcxt) + { + ParallelCopyFrom(cstate); + + /* Wait for all copy workers to finish */ + WaitForParallelWorkersToFinish(pcxt); + *processed = pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); + EndParallelCopy(pcxt); + } + else + *processed = CopyFrom(cstate); /* copy from file to database */ + EndCopyFrom(cstate); } else @@ -1199,6 +1902,7 @@ ProcessCopyOptions(ParseState *pstate, cstate->is_copy_from = is_from; cstate->file_encoding = -1; + cstate->nworkers = -1; /* Extract options from the statement node tree */ foreach(option, options) @@ -1367,6 +2071,39 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "parallel") == 0) + { + int val; + bool parsed; + char *strval; + + if (!cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parallel option is supported only for copy from"), + parser_errposition(pstate, defel->location))); + if (cstate->nworkers >= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + + strval = defGetString(defel); + parsed = parse_int(strval, &val, 0, NULL); + if (!parsed) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for integer option \"%s\": %s", + defel->defname, strval))); + if (val < 1 || val > 1024) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("value %s out of bounds for option \"%s\"", + strval, defel->defname), + errdetail("Valid values are between \"%d\" and \"%d\".", + 1, 1024))); + cstate->nworkers = val; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1720,7 +2457,8 @@ BeginCopy(ParseState *pstate, /* * PopulateCommonCstateInfo - Populates the common variables required for copy - * from operation. This is a helper function for BeginCopy function. + * from operation. This is a helper function for BeginCopy & + * InitializeParallelCopyInfo function. */ static void PopulateCommonCstateInfo(CopyState cstate, TupleDesc tupDesc, List *attnamelist) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833..82843c6 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,6 +14,7 @@ #ifndef COPY_H #define COPY_H +#include "access/parallel.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" @@ -41,4 +42,5 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); +extern void ParallelCopyMain(dsm_segment *seg, shm_toc *toc); #endif /* COPY_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b1afb34..509c695 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1702,6 +1702,12 @@ ParallelBitmapHeapState ParallelBlockTableScanDesc ParallelCompletionPtr ParallelContext +ParallelCopyLineBoundaries +ParallelCopyLineBoundary +ParallelCopyData +ParallelCopyDataBlock +ParallelCopyLineBuf +ParallelCopyShmInfo ParallelExecutorInfo ParallelHashGrowth ParallelHashJoinBatch @@ -2219,6 +2225,7 @@ SerCommitSeqNo SerialControl SerializableXactHandle SerializedActiveRelMaps +SerializedParallelCopyState SerializedReindexState SerializedSnapshotData SerializedTransactionState -- 1.8.3.1