From 447a954eed01432c170ad94e3ffffa30112f53aa Mon Sep 17 00:00:00 2001 From: Vignesh C ,Bharath Date: Tue, 23 Jun 2020 07:04:46 +0530 Subject: [PATCH 2/6] Framework for leader/worker in parallel copy. This patch has the framework for data structures in parallel copy, leader initialization, worker initialization, shared memory updation, starting workers, wait for workers and workers exiting. --- src/backend/access/transam/parallel.c | 4 + src/backend/commands/copy.c | 838 +++++++++++++++++++++++++++- src/backend/replication/logical/tablesync.c | 2 +- src/include/commands/copy.h | 4 + src/tools/pgindent/typedefs.list | 8 + 5 files changed, 851 insertions(+), 5 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 14a8690..09e7a19 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -25,6 +25,7 @@ #include "catalog/pg_enum.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/copy.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -145,6 +146,9 @@ static const struct }, { "parallel_vacuum_main", parallel_vacuum_main + }, + { + "ParallelCopyMain", ParallelCopyMain } }; diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 65a504f..34c657c 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -96,9 +96,138 @@ typedef enum CopyInsertMethod } CopyInsertMethod; #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + +#define DATA_BLOCK_SIZE RAW_BUF_SIZE +#define RINGSIZE (10 * 1000) +#define MAX_BLOCKS_COUNT 1000 +#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */ + +#define IsParallelCopy() (cstate->is_parallel) +#define IsLeader() (cstate->pcdata->is_leader) #define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) /* + * Copy data block information. + */ +typedef struct ParallelCopyDataBlock +{ + /* The number of unprocessed lines in the current block. */ + pg_atomic_uint32 unprocessed_line_parts; + + /* + * If the current line data is continued into another block, following_block + * will have the position where the remaining data need to be read. + */ + uint32 following_block; + + /* + * This flag will be set, when the leader finds out this block can be read + * safely by the worker. This helps the worker to start processing the line + * early where the line will be spread across many blocks and the worker + * need not wait for the complete line to be processed. + */ + bool curr_blk_completed; + char data[DATA_BLOCK_SIZE]; /* data read from file */ + uint8 skip_bytes; +}ParallelCopyDataBlock; + +/* + * Individual line information. + * ParallelCopyLineBoundary is common data structure between leader & worker, + * this is protected by the following sequence in the leader & worker. + * Leader should operate in the following order: + * 1) update first_block, start_offset & cur_lineno in any order. + * 2) update line_size. + * 3) update line_state. + * Worker should operate in the following order: + * 1) read line_size. + * 2) only one worker should choose one line for processing, this is handled by + * using pg_atomic_compare_exchange_u32, worker will change the sate to + * LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED. + * 3) read first_block, start_offset & cur_lineno in any order. + */ +typedef struct ParallelCopyLineBoundary +{ + /* Position of the first block in data_blocks array. */ + uint32 first_block; + uint32 start_offset; /* start offset of the line */ + + /* + * Size of the current line -1 means line is yet to be filled completely, + * 0 means empty line, >0 means line filled with line size data. + */ + pg_atomic_uint32 line_size; + pg_atomic_uint32 line_state; /* line state */ + uint64 cur_lineno; /* line number for error messages */ +}ParallelCopyLineBoundary; + +/* + * Circular queue used to store the line information. + */ +typedef struct ParallelCopyLineBoundaries +{ + /* Position for the leader to populate a line. */ + uint32 leader_pos; + + /* Data read from the file/stdin by the leader process. */ + ParallelCopyLineBoundary ring[RINGSIZE]; +}ParallelCopyLineBoundaries; + +/* + * Shared information among parallel copy workers. This will be allocated in the + * DSM segment. + */ +typedef struct ParallelCopyShmInfo +{ + bool is_read_in_progress; /* file read status */ + + /* + * Actual lines inserted by worker (some records will be filtered based on + * where condition). + */ + pg_atomic_uint64 processed; + pg_atomic_uint64 total_worker_processed; /* total processed records by the workers */ + uint64 populated; /* lines populated by leader */ + uint32 cur_block_pos; /* current data block */ + ParallelCopyDataBlock data_blocks[MAX_BLOCKS_COUNT]; /* data block array */ + FullTransactionId full_transaction_id; /* xid for copy from statement */ + CommandId mycid; /* command id */ + ParallelCopyLineBoundaries line_boundaries; /* line array */ +} ParallelCopyShmInfo; + +/* + * Parallel copy line buffer information. + */ +typedef struct ParallelCopyLineBuf +{ + StringInfoData line_buf; + uint64 cur_lineno; /* line number for error messages */ +}ParallelCopyLineBuf; + +/* + * Parallel copy data information. + */ +typedef struct ParallelCopyData +{ + Oid relid; /* relation id of the table */ + ParallelCopyShmInfo *pcshared_info; /* common info in shared memory */ + bool is_leader; + + /* line position which worker is processing */ + uint32 worker_processed_pos; + + /* + * Local line_buf array, workers will copy it here and release the lines + * for the leader to continue. + */ + ParallelCopyLineBuf worker_line_buf[WORKER_CHUNK_COUNT]; + uint32 worker_line_buf_count; /* Number of lines */ + + /* Current position in worker_line_buf */ + uint32 worker_line_buf_pos; +}ParallelCopyData; + +/* * This struct contains all the state variables used throughout a COPY * operation. For simplicity, we use the same struct for all variants of COPY, * even though some fields are used in only some cases. @@ -225,8 +354,66 @@ typedef struct CopyStateData char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ + int nworkers; + bool is_parallel; + ParallelCopyData *pcdata; } CopyStateData; +/* + * This structure helps in storing the common data from CopyStateData that are + * required by the workers. This information will then be allocated and stored + * into the DSM for the worker to retrieve and copy it to CopyStateData. + */ +typedef struct ParallelCopyCommonKeyData +{ + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + bool csv_mode; /* Comma Separated Value format? */ + bool header_line; /* CSV header line? */ + int null_print_len; /* length of same */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool convert_selectively; /* do selective binary conversion? */ + + /* Working state for COPY FROM */ + AttrNumber num_defaults; + Oid relid; +}ParallelCopyCommonKeyData; + +/* + * This structure will help in converting a List data type into the below + * structure format with the count having the number of elements in the list and + * the info having the List elements appended contigously. This converted + * structure will be allocated in shared memory and stored in DSM for the worker + * to retrieve and later convert it back to List data type. + */ +typedef struct ParallelCopyKeyListInfo +{ + int count; /* count of attributes */ + + /* string info in the form info followed by info1, info2... infon */ + char info[1]; +} ParallelCopyKeyListInfo; + +/* + * List of internal parallel copy function pointers. + */ +static const struct +{ + char *fn_name; + copy_data_source_cb fn_addr; +} InternalParallelCopyFuncPtrs[] = + +{ + { + "copy_read_data", copy_read_data + }, +}; + /* DestReceiver for COPY (query) TO */ typedef struct { @@ -256,6 +443,23 @@ typedef struct /* Trim the list of buffers back down to this number after flushing */ #define MAX_PARTITION_BUFFERS 32 +/* DSM keys for parallel copy. */ +#define PARALLEL_COPY_KEY_SHARED_INFO 1 +#define PARALLEL_COPY_KEY_CSTATE 2 +#define PARALLEL_COPY_KEY_NULL_PRINT 3 +#define PARALLEL_COPY_KEY_NULL_PRINT_CLIENT 4 +#define PARALLEL_COPY_KEY_DELIM 5 +#define PARALLEL_COPY_KEY_QUOTE 6 +#define PARALLEL_COPY_KEY_ESCAPE 7 +#define PARALLEL_COPY_KEY_ATTNAME_LIST 8 +#define PARALLEL_COPY_KEY_FORCE_QUOTE_LIST 9 +#define PARALLEL_COPY_KEY_NOT_NULL_LIST 10 +#define PARALLEL_COPY_KEY_NULL_LIST 11 +#define PARALLEL_COPY_KEY_CONVERT_LIST 12 +#define PARALLEL_COPY_KEY_DATASOURCE_CB 13 +#define PARALLEL_COPY_KEY_WHERE_CLAUSE_STR 14 +#define PARALLEL_COPY_KEY_RANGE_TABLE 15 + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { @@ -476,10 +680,596 @@ static bool CopyGetInt32(CopyState cstate, int32 *val); static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); +static pg_attribute_always_inline void EndParallelCopy(ParallelContext *pcxt); static void PopulateGlobalsForCopyFrom(CopyState cstate, TupleDesc tup_desc, List *attnamelist); +static pg_attribute_always_inline copy_data_source_cb LookupParallelCopyFnPtr(const char *funcname); +static pg_attribute_always_inline char* LookupParallelCopyFnStr(copy_data_source_cb fn_addr); /* + * CopyCommonInfoForWorker - Copy shared_cstate using cstate information. + */ +static pg_attribute_always_inline void +CopyCommonInfoForWorker(CopyState cstate, ParallelCopyCommonKeyData *shared_cstate) +{ + shared_cstate->copy_dest = cstate->copy_dest; + shared_cstate->file_encoding = cstate->file_encoding; + shared_cstate->need_transcoding = cstate->need_transcoding; + shared_cstate->encoding_embeds_ascii = cstate->encoding_embeds_ascii; + shared_cstate->csv_mode = cstate->csv_mode; + shared_cstate->header_line = cstate->header_line; + shared_cstate->null_print_len = cstate->null_print_len; + shared_cstate->force_quote_all = cstate->force_quote_all; + shared_cstate->convert_selectively = cstate->convert_selectively; + shared_cstate->num_defaults = cstate->num_defaults; + shared_cstate->relid = cstate->pcdata->relid; +} + +/* + * RetrieveSharedString - Retrieve the string from shared memory. + */ +static void +RetrieveSharedString(shm_toc *toc, int sharedkey, char **copystr) +{ + char *shared_str_val = (char *)shm_toc_lookup(toc, sharedkey, true); + if (shared_str_val) + *copystr = pstrdup(shared_str_val); +} + +/* + * RetrieveSharedList - Retrieve the list from shared memory. + */ +static void +RetrieveSharedList(shm_toc *toc, int sharedkey, List **copylist) +{ + ParallelCopyKeyListInfo *listinformation = (ParallelCopyKeyListInfo *)shm_toc_lookup(toc, sharedkey, + true); + if (listinformation) + { + int length = 0; + int count; + + for (count = 0; count < listinformation->count; count++) + { + char *attname = (char *)(listinformation->info + length); + length += strlen(attname) + 1; + *copylist = lappend(*copylist, makeString(attname)); + } + } +} + +/* + * CopyListSharedMemory - Copy the list into shared memory. + */ +static void +CopyListSharedMemory(List *inputlist, Size memsize, ParallelCopyKeyListInfo *sharedlistinfo) +{ + ListCell *l; + int length = 0; + + MemSet(sharedlistinfo, 0, memsize); + foreach(l, inputlist) + { + char *name = strVal(lfirst(l)); + memcpy((char *)(sharedlistinfo->info + length), name, strlen(name)); + sharedlistinfo->count++; + length += strlen(name) + 1; + } +} + +/* + * ComputeListSize - compute the list size. + */ +static int +ComputeListSize(List *inputlist) +{ + int est_size = sizeof(int); + if (inputlist != NIL) + { + ListCell *l; + foreach(l, inputlist) + est_size += strlen(strVal(lfirst(l))) + 1; + } + + return est_size; +} + +/* + * EstimateLineKeysStr - Estimate the size required in shared memory for the + * input string. + */ +static void +EstimateLineKeysStr(ParallelContext *pcxt, char *inputstr) +{ + if (inputstr) + { + shm_toc_estimate_chunk(&pcxt->estimator, strlen(inputstr) + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* + * EstimateLineKeysList - Estimate the size required in shared memory for the + * input list. + */ +static void +EstimateLineKeysList(ParallelContext *pcxt, List *inputlist, + Size *est_list_size) +{ + if (inputlist != NIL) + { + *est_list_size = ComputeListSize(inputlist); + shm_toc_estimate_chunk(&pcxt->estimator, *est_list_size); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* + * InsertStringShm - Insert a string into shared memory. + */ +static void +InsertStringShm(ParallelContext *pcxt, int key, char *inputstr) +{ + if (inputstr) + { + char *shmptr = (char *)shm_toc_allocate(pcxt->toc, + strlen(inputstr) + 1); + strcpy(shmptr, inputstr); + shm_toc_insert(pcxt->toc, key, shmptr); + } +} + +/* + * InsertListShm - Insert a list into shared memory. + */ +static void +InsertListShm(ParallelContext *pcxt, int key, List *inputlist, + Size est_list_size) +{ + if (inputlist != NIL) + { + ParallelCopyKeyListInfo *sharedlistinfo = (ParallelCopyKeyListInfo *)shm_toc_allocate(pcxt->toc, + est_list_size); + CopyListSharedMemory(inputlist, est_list_size, sharedlistinfo); + shm_toc_insert(pcxt->toc, key, sharedlistinfo); + } +} + +/* + * BeginParallelCopy - start parallel copy tasks. + * + * Get the number of workers required to perform the parallel copy. The data + * structures that are required by the parallel workers will be initialized, the + * size required in DSM will be calculated and the necessary keys will be loaded + * in the DSM. The specified number of workers will then be launched. + * + */ +static ParallelContext* +BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) +{ + ParallelContext *pcxt; + ParallelCopyShmInfo *shared_info_ptr; + ParallelCopyCommonKeyData *shared_cstate; + FullTransactionId full_transaction_id; + Size est_shared_info; + Size est_cstateshared; + Size est_att_list_size; + Size est_quote_list_size; + Size est_notnull_list_size; + Size est_null_list_size; + Size est_convert_list_size; + Size est_datasource_cb_size; + int count = 0; + char *whereClauseStr = NULL; + char *rangeTableStr = NULL; + int parallel_workers = 0; + ParallelCopyData *pcdata; + + parallel_workers = Min(nworkers, max_worker_processes); + + /* Can't perform copy in parallel */ + if (parallel_workers <= 0) + { + elog(WARNING, + "No workers available, copy will be run in non-parallel mode"); + return NULL; + } + + pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); + cstate->pcdata = pcdata; + + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "ParallelCopyMain", + parallel_workers); + Assert(pcxt->nworkers > 0); + + /* + * Estimate size for shared information for PARALLEL_COPY_KEY_SHARED_INFO + */ + est_shared_info = sizeof(ParallelCopyShmInfo); + shm_toc_estimate_chunk(&pcxt->estimator, est_shared_info); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate the size for shared information for PARALLEL_COPY_KEY_CSTATE */ + est_cstateshared = MAXALIGN(sizeof(ParallelCopyCommonKeyData)); + shm_toc_estimate_chunk(&pcxt->estimator, est_cstateshared); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + EstimateLineKeysStr(pcxt, cstate->null_print); + EstimateLineKeysStr(pcxt, cstate->null_print_client); + EstimateLineKeysStr(pcxt, cstate->delim); + EstimateLineKeysStr(pcxt, cstate->quote); + EstimateLineKeysStr(pcxt, cstate->escape); + + if (cstate->whereClause != NULL) + { + whereClauseStr = nodeToString(cstate->whereClause); + EstimateLineKeysStr(pcxt, whereClauseStr); + } + + if (cstate->range_table != NULL) + { + rangeTableStr = nodeToString(cstate->range_table); + EstimateLineKeysStr(pcxt, rangeTableStr); + } + + /* Estimate the size for shared information for PARALLEL_COPY_KEY_XID. */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FullTransactionId)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_ATTNAME_LIST. + */ + EstimateLineKeysList(pcxt, attnamelist, &est_att_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_FORCE_QUOTE_LIST. + */ + EstimateLineKeysList(pcxt, cstate->force_quote, &est_quote_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_NOT_NULL_LIST. + */ + EstimateLineKeysList(pcxt, cstate->force_notnull, + &est_notnull_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_NULL_LIST. + */ + EstimateLineKeysList(pcxt, cstate->force_null, &est_null_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_CONVERT_LIST. + */ + EstimateLineKeysList(pcxt, cstate->convert_select, + &est_convert_list_size); + + /* + * Estimate the size for shared information for + * PARALLEL_COPY_KEY_DATASOURCE_CB. + */ + if (cstate->data_source_cb) + { + char *functionname = LookupParallelCopyFnStr(cstate->data_source_cb); + est_datasource_cb_size = strlen(functionname) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, est_datasource_cb_size); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial copy) */ + if (pcxt->seg == NULL) + { + EndParallelCopy(pcxt); + elog(WARNING, + "DSM segments not available, copy will be run in non-parallel mode"); + return NULL; + } + + /* Allocate shared memory for PARALLEL_COPY_KEY_SHARED_INFO */ + shared_info_ptr = (ParallelCopyShmInfo *) shm_toc_allocate(pcxt->toc, + est_shared_info); + MemSet(shared_info_ptr, 0, est_shared_info); + shared_info_ptr->is_read_in_progress = true; + shared_info_ptr->cur_block_pos = -1; + shared_info_ptr->full_transaction_id = full_transaction_id; + shared_info_ptr->mycid = GetCurrentCommandId(true); + for (count = 0; count < RINGSIZE; count++) + { + ParallelCopyLineBoundary *lineInfo = &shared_info_ptr->line_boundaries.ring[count]; + pg_atomic_init_u32(&(lineInfo->line_size), -1); + } + + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_SHARED_INFO, shared_info_ptr); + pcdata->pcshared_info = shared_info_ptr; + pcdata->relid = relid; + + /* Store shared build state, for which we reserved space. */ + shared_cstate = (ParallelCopyCommonKeyData *)shm_toc_allocate(pcxt->toc, + est_cstateshared); + + /* copy cstate variables. */ + CopyCommonInfoForWorker(cstate, shared_cstate); + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_CSTATE, shared_cstate); + + InsertStringShm(pcxt, PARALLEL_COPY_KEY_NULL_PRINT, cstate->null_print); + InsertStringShm(pcxt, PARALLEL_COPY_KEY_NULL_PRINT_CLIENT, + cstate->null_print_client); + InsertStringShm(pcxt, PARALLEL_COPY_KEY_DELIM, cstate->delim); + InsertStringShm(pcxt, PARALLEL_COPY_KEY_QUOTE, cstate->quote); + InsertStringShm(pcxt, PARALLEL_COPY_KEY_ESCAPE, cstate->escape); + + InsertListShm(pcxt, PARALLEL_COPY_KEY_ATTNAME_LIST, + attnamelist, est_att_list_size); + InsertListShm(pcxt, PARALLEL_COPY_KEY_FORCE_QUOTE_LIST, + cstate->force_quote, est_quote_list_size); + InsertListShm(pcxt, PARALLEL_COPY_KEY_NOT_NULL_LIST, + cstate->force_notnull, est_notnull_list_size); + InsertListShm(pcxt, PARALLEL_COPY_KEY_NULL_LIST, cstate->force_null, + est_null_list_size); + InsertListShm(pcxt, PARALLEL_COPY_KEY_CONVERT_LIST, + cstate->convert_select, est_convert_list_size); + + if (cstate->data_source_cb) + { + char *functionname = LookupParallelCopyFnStr(cstate->data_source_cb); + char *data_source_cb = (char *) shm_toc_allocate(pcxt->toc, + est_datasource_cb_size); + strcpy(data_source_cb, functionname); + shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_DATASOURCE_CB, + data_source_cb); + } + + if (cstate->whereClause) + InsertStringShm(pcxt, PARALLEL_COPY_KEY_WHERE_CLAUSE_STR, + whereClauseStr); + + if(cstate->range_table) + InsertStringShm(pcxt, PARALLEL_COPY_KEY_RANGE_TABLE, rangeTableStr); + + LaunchParallelWorkers(pcxt); + if (pcxt->nworkers_launched == 0) + { + EndParallelCopy(pcxt); + elog(WARNING, + "No workers available, copy will be run in non-parallel mode"); + return NULL; + } + + /* + * Caller needs to wait for all launched workers when we return. Make sure + * that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); + + pcdata->is_leader = true; + cstate->is_parallel = true; + return pcxt; +} + +/* + * EndParallelCopy - end the parallel copy tasks. + */ +static pg_attribute_always_inline void +EndParallelCopy(ParallelContext *pcxt) +{ + Assert(!IsParallelWorker()); + + DestroyParallelContext(pcxt); + ExitParallelMode(); +} + +/* + * ParallelWorkerInitialization - Initialize parallel worker. + */ +static void +ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate, + CopyState cstate, List *attnamelist) +{ + uint32 count; + ParallelCopyData *pcdata = cstate->pcdata; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + cstate->copy_dest = shared_cstate->copy_dest; + cstate->file_encoding = shared_cstate->file_encoding; + cstate->need_transcoding = shared_cstate->need_transcoding; + cstate->encoding_embeds_ascii = shared_cstate->encoding_embeds_ascii; + cstate->csv_mode = shared_cstate->csv_mode; + cstate->header_line = shared_cstate->header_line; + cstate->null_print_len = shared_cstate->null_print_len; + cstate->force_quote_all = shared_cstate->force_quote_all; + cstate->convert_selectively = shared_cstate->convert_selectively; + cstate->num_defaults = shared_cstate->num_defaults; + pcdata->relid = shared_cstate->relid; + + PopulateGlobalsForCopyFrom(cstate, tup_desc, attnamelist); + + /* Initialize state variables. */ + cstate->reached_eof = false; + cstate->eol_type = EOL_UNKNOWN; + cstate->cur_relname = RelationGetRelationName(cstate->rel); + cstate->cur_lineno = 0; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + + /* Set up variables to avoid per-attribute overhead. */ + initStringInfo(&cstate->attribute_buf); + + initStringInfo(&cstate->line_buf); + for (count = 0; count < WORKER_CHUNK_COUNT;count++) + initStringInfo(&pcdata->worker_line_buf[count].line_buf); + + cstate->line_buf_converted = false; + cstate->raw_buf = NULL; + cstate->raw_buf_index = cstate->raw_buf_len = 0; +} +/* + * ParallelCopyMain - parallel copy worker's code. + * + * Where clause handling, convert tuple to columns, add default null values for + * the missing columns that are not present in that record. Find the partition + * if it is partitioned table, invoke before row insert Triggers, handle + * constraints and insert the tuples. + */ +void +ParallelCopyMain(dsm_segment *seg, shm_toc *toc) +{ + CopyState cstate; + ParallelCopyData *pcdata; + ParallelCopyShmInfo *pcshared_info; + ParallelCopyCommonKeyData *shared_cstate; + Relation rel = NULL; + MemoryContext oldcontext; + List *attlist = NIL; + char *data_source_cb; + char *whereClauseStr = NULL; + char *rangeTableStr = NULL; + + /* Allocate workspace and zero all fields. */ + cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); + pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData)); + cstate->pcdata = pcdata; + pcdata->is_leader = false; + pcdata->worker_processed_pos = -1; + cstate->is_parallel = true; + + /* + * We allocate everything used by a cstate in a new memory context. This + * avoids memory leaks during repeated use of COPY in a query. + */ + cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY", + ALLOCSET_DEFAULT_SIZES); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + pcshared_info = (ParallelCopyShmInfo *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_SHARED_INFO, + false); + + ereport(DEBUG1, (errmsg("Starting parallel copy worker"))); + + pcdata->pcshared_info = pcshared_info; + + shared_cstate = (ParallelCopyCommonKeyData *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_CSTATE, + false); + + cstate->null_print = (char *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_NULL_PRINT, + true); + cstate->null_print_client = (char *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_NULL_PRINT_CLIENT, + true); + + RetrieveSharedString(toc, PARALLEL_COPY_KEY_DELIM, &cstate->delim); + RetrieveSharedString(toc, PARALLEL_COPY_KEY_QUOTE, &cstate->quote); + RetrieveSharedString(toc, PARALLEL_COPY_KEY_ESCAPE, &cstate->escape); + + RetrieveSharedList(toc, PARALLEL_COPY_KEY_ATTNAME_LIST, &attlist); + RetrieveSharedList(toc, PARALLEL_COPY_KEY_FORCE_QUOTE_LIST, &cstate->force_quote); + RetrieveSharedList(toc, PARALLEL_COPY_KEY_NOT_NULL_LIST, &cstate->force_notnull); + RetrieveSharedList(toc, PARALLEL_COPY_KEY_NULL_LIST, &cstate->force_null); + RetrieveSharedList(toc, PARALLEL_COPY_KEY_CONVERT_LIST, &cstate->convert_select); + data_source_cb = (char *)shm_toc_lookup(toc, + PARALLEL_COPY_KEY_DATASOURCE_CB, + true); + RetrieveSharedString(toc, PARALLEL_COPY_KEY_WHERE_CLAUSE_STR, &whereClauseStr); + RetrieveSharedString(toc, PARALLEL_COPY_KEY_RANGE_TABLE, &rangeTableStr); + + if (data_source_cb) + cstate->data_source_cb = LookupParallelCopyFnPtr(data_source_cb); + + if (whereClauseStr) + { + Node *whereClauseCnvrtdFrmStr = (Node *) stringToNode(whereClauseStr); + cstate->whereClause = whereClauseCnvrtdFrmStr; + } + + if (rangeTableStr) + { + List *rangeTableCnvrtdFrmStr = (List *) stringToNode(rangeTableStr); + cstate->range_table = rangeTableCnvrtdFrmStr; + } + + /* Open and lock the relation, using the appropriate lock type. */ + rel = table_open(shared_cstate->relid, RowExclusiveLock); + cstate->rel = rel; + ParallelWorkerInitialization(shared_cstate, cstate, attlist); + + CopyFrom(cstate); + + if (rel != NULL) + table_close(rel, RowExclusiveLock); + + MemoryContextSwitchTo(oldcontext); + return; +} +/* + * ParallelCopyLeader - parallel copy leader's functionality. + * + * Leader will populate the shared memory and share it across the workers. + * Leader will read the table data from the file and copy the contents to block. + * Leader will then read the input contents and identify the data based on line + * beaks. This information is called line. The line will be populate in + * ParallelCopyLineBoundary. Workers will then pick up this information and + * insert in to table. Leader will do this till it completes processing the + * file. Leader executes the before statement if before statement trigger is + * present. Leader read the data from input file. Leader then loads data to data + * blocks as and when required block by block. Leader traverses through the data + * block to identify one line. It gets a free line to copy the information, if + * there is no free line it will wait till there is one free line. + * Server copies the identified lines information into lines. This process is + * repeated till the complete file is processed. + * Leader will wait till all the lines populated are processed by the workers + * and exits. + */ +static void +ParallelCopyLeader(CopyState cstate) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + ereport(DEBUG1, (errmsg("Running parallel copy leader"))); + + pcshared_info->is_read_in_progress = false; + cstate->cur_lineno = 0; +} +/* + * LookupParallelCopyFnPtr - Look up parallel copy function pointer. + */ +static pg_attribute_always_inline copy_data_source_cb +LookupParallelCopyFnPtr(const char *funcname) +{ + int i; + + for (i = 0; i < lengthof(InternalParallelCopyFuncPtrs); i++) + { + if (strcmp(InternalParallelCopyFuncPtrs[i].fn_name, funcname) == 0) + return InternalParallelCopyFuncPtrs[i].fn_addr; + } + + /* We can only reach this by programming error. */ + elog(ERROR, "internal function \"%s\" not found", funcname); +} + +/* + * LookupParallelCopyFnStr - Lookup function string from a function pointer. + */ +static pg_attribute_always_inline char* +LookupParallelCopyFnStr(copy_data_source_cb fn_addr) +{ + int i; + + for (i = 0; i < lengthof(InternalParallelCopyFuncPtrs); i++) + { + if (InternalParallelCopyFuncPtrs[i].fn_addr == fn_addr) + return InternalParallelCopyFuncPtrs[i].fn_name; + } + + /* We can only reach this by programming error. */ + elog(ERROR, "internal function pointer not found"); +} +/* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. */ @@ -1149,6 +1939,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, if (is_from) { + ParallelContext *pcxt = NULL; Assert(rel); /* check read-only transaction and parallel mode */ @@ -1158,7 +1949,24 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); cstate->whereClause = whereClause; - *processed = CopyFrom(cstate); /* copy from file to database */ + cstate->is_parallel = false; + + if (cstate->nworkers > 0) + pcxt = BeginParallelCopy(cstate->nworkers, cstate, stmt->attlist, + relid); + + if (pcxt) + { + ParallelCopyLeader(cstate); + + /* Wait for all copy workers to finish */ + WaitForParallelWorkersToFinish(pcxt); + *processed = pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); + EndParallelCopy(pcxt); + } + else + *processed = CopyFrom(cstate); /* copy from file to database */ + EndCopyFrom(cstate); } else @@ -1207,6 +2015,7 @@ ProcessCopyOptions(ParseState *pstate, cstate->is_copy_from = is_from; cstate->file_encoding = -1; + cstate->nworkers = -1; /* Extract options from the statement node tree */ foreach(option, options) @@ -1375,6 +2184,26 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "parallel") == 0) + { + if (!cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parallel option supported only for copy from"), + parser_errposition(pstate, defel->location))); + if (cstate->nworkers >= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->nworkers = atoi(defGetString(defel)); + if (cstate->nworkers <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be a positive integer greater than zero", + defel->defname), + parser_errposition(pstate, defel->location))); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1727,12 +2556,13 @@ BeginCopy(ParseState *pstate, } /* - * PopulateGlobalsForCopyFrom - Populates the common variables required for copy - * from operation. This is a helper function for BeginCopy function. + * PopulateCatalogInformation - Populates the common variables required for copy + * from operation. This is a helper function for BeginCopy & + * ParallelWorkerInitialization function. */ static void PopulateGlobalsForCopyFrom(CopyState cstate, TupleDesc tupDesc, - List *attnamelist) + List *attnamelist) { int num_phys_attrs; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index c27d970..b3787c1 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -557,7 +557,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel) * Data source callback for the COPY FROM, which reads from the remote * connection and passes the data back to our local COPY. */ -static int +int copy_read_data(void *outbuf, int minread, int maxread) { int bytesread = 0; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833..5dc95ac 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,6 +14,7 @@ #ifndef COPY_H #define COPY_H +#include "access/parallel.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" @@ -41,4 +42,7 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); +extern void ParallelCopyMain(dsm_segment *seg, shm_toc *toc); + +extern int copy_read_data(void *outbuf, int minread, int maxread); #endif /* COPY_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c65a552..8a79794 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1698,6 +1698,14 @@ ParallelBitmapHeapState ParallelBlockTableScanDesc ParallelCompletionPtr ParallelContext +ParallelCopyLineBoundaries +ParallelCopyLineBoundary +ParallelCopyCommonKeyData +ParallelCopyData +ParallelCopyDataBlock +ParallelCopyKeyListInfo +ParallelCopyLineBuf +ParallelCopyShmInfo ParallelExecutorInfo ParallelHashGrowth ParallelHashJoinBatch -- 1.8.3.1