From 67e5240af5ebe803473acebaf0e8796fd2a05cdd Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 7 Oct 2020 17:18:17 +0530 Subject: [PATCH v6 2/6] Framework for leader/worker in parallel copy This patch has the framework for data structures in parallel copy, leader initialization, worker initialization, shared memory updation, starting workers, wait for workers and workers exiting. --- src/backend/access/transam/parallel.c | 4 + src/backend/commands/Makefile | 1 + src/backend/commands/copy.c | 235 ++++++-------------- src/include/commands/copy.h | 389 +++++++++++++++++++++++++++++++++- src/tools/pgindent/typedefs.list | 7 + 5 files changed, 469 insertions(+), 167 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index b042696..a3cff4b 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -25,6 +25,7 @@ #include "catalog/pg_enum.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/copy.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -145,6 +146,9 @@ static const struct }, { "parallel_vacuum_main", parallel_vacuum_main + }, + { + "ParallelCopyMain", ParallelCopyMain } }; diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index d4815d3..a224aac 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -24,6 +24,7 @@ OBJS = \ constraint.o \ conversioncmds.o \ copy.o \ + copyparallel.o \ createas.o \ dbcommands.o \ define.o \ diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index f2848a1..1e55a30 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -29,7 +29,6 @@ #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" -#include "commands/trigger.h" #include "executor/execPartition.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" @@ -63,29 +62,6 @@ #define OCTVALUE(c) ((c) - '0') /* - * Represents the different source/dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to/from file (or a piped program) */ - COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ - COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ - COPY_CALLBACK /* to/from callback function */ -} CopyDest; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL -} EolType; - -/* * Represents the heap insert method to be used during COPY FROM. */ typedef enum CopyInsertMethod @@ -95,145 +71,10 @@ typedef enum CopyInsertMethod CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ } CopyInsertMethod; -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ +#define IsParallelCopy() (cstate->is_parallel) +#define IsLeader() (cstate->pcdata->is_leader) #define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1) -/* - * This struct contains all the state variables used throughout a COPY - * operation. For simplicity, we use the same struct for all variants of COPY, - * even though some fields are used in only some cases. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyStateData -{ - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for - * dest == COPY_NEW_FE in COPY FROM */ - bool is_copy_from; /* COPY TO, or COPY FROM? */ - bool reached_eof; /* true if we read to end of copy data (not - * all copy_dest types maintain this) */ - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to or from */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN/STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - bool binary; /* binary format? */ - bool freeze; /* freeze rows on loading? */ - bool csv_mode; /* Comma Separated Value format? */ - bool header_line; /* CSV header line? */ - char *null_print; /* NULL marker string (server encoding!) */ - int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ - char *delim; /* column delimiter (must be 1 byte) */ - char *quote; /* CSV quote char (must be 1 byte) */ - char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE_QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ - List *force_notnull; /* list of column names */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ - List *force_null; /* list of column names */ - bool *force_null_flags; /* per-column CSV FN flags */ - bool convert_selectively; /* do selective binary conversion? */ - List *convert_select; /* list of column names (can be NIL) */ - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - - /* - * Working state for COPY TO/FROM - */ - MemoryContext copycontext; /* per-copy execution context */ - - /* - * Working state for COPY TO - */ - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - - /* - * Working state for COPY FROM - */ - AttrNumber num_defaults; - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - int *defmap; /* array of default att numbers */ - ExprState **defexprs; /* array of default att expressions */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, convert it - * to server encoding there, and then extract the individual attribute - * fields into attribute_buf. line_buf is preserved unmodified so that we - * can display it in error messages if appropriate. (In binary mode, - * line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_converted; /* converted to server encoding? */ - bool line_buf_valid; /* contains the row being processed? */ - - /* - * Finally, raw_buf holds raw data read from the data source (file or - * client connection). In text mode, CopyReadLine parses this data - * sufficiently to locate line boundaries, then transfers the data to - * line_buf and converts it. In binary mode, CopyReadBinaryData fetches - * appropriate amounts of data from this buffer. In both modes, we - * guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) -} CopyStateData; - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -415,8 +256,6 @@ static bool CopyGetInt16(CopyState cstate, int16 *val); static bool CopyLoadRawBuf(CopyState cstate); static int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); -static void PopulateCommonCstateInfo(CopyState cstate, TupleDesc tup_desc, - List *attnamelist); static void ClearEOLFromCopiedData(CopyState cstate, char *copy_line_data, int copy_line_pos, int *copy_line_size); static void ConvertToServerEncoding(CopyState cstate); @@ -1134,6 +973,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, if (is_from) { + ParallelContext *pcxt = NULL; + Assert(rel); /* check read-only transaction and parallel mode */ @@ -1143,7 +984,35 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); cstate->whereClause = whereClause; - *processed = CopyFrom(cstate); /* copy from file to database */ + cstate->is_parallel = false; + + if (cstate->nworkers > 0) + pcxt = BeginParallelCopy(cstate->nworkers, cstate, stmt->attlist, + relid); + + if (pcxt) + { + int i; + + ParallelCopyFrom(cstate); + + /* Wait for all copy workers to finish */ + WaitForParallelWorkersToFinish(pcxt); + + /* + * Next, accumulate WAL usage. (This must wait for the workers to + * finish, or we might get incomplete data.) + */ + for (i = 0; i < pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&cstate->pcdata->bufferusage[i], + &cstate->pcdata->walusage[i]); + + *processed = pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed); + EndParallelCopy(pcxt); + } + else + *processed = CopyFrom(cstate); /* copy from file to database */ + EndCopyFrom(cstate); } else @@ -1192,6 +1061,7 @@ ProcessCopyOptions(ParseState *pstate, cstate->is_copy_from = is_from; cstate->file_encoding = -1; + cstate->nworkers = -1; /* Extract options from the statement node tree */ foreach(option, options) @@ -1360,6 +1230,39 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "parallel") == 0) + { + int val; + bool parsed; + char *strval; + + if (!cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parallel option is supported only for copy from"), + parser_errposition(pstate, defel->location))); + if (cstate->nworkers >= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + + strval = defGetString(defel); + parsed = parse_int(strval, &val, 0, NULL); + if (!parsed) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for integer option \"%s\": %s", + defel->defname, strval))); + if (val < 1 || val > 1024) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("value %s out of bounds for option \"%s\"", + strval, defel->defname), + errdetail("Valid values are between \"%d\" and \"%d\".", + 1, 1024))); + cstate->nworkers = val; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1715,9 +1618,9 @@ BeginCopy(ParseState *pstate, * PopulateCommonCstateInfo * * Populates the common variables required for copy from operation. This is a - * helper function for BeginCopy function. + * helper function for BeginCopy & InitializeParallelCopyInfo function. */ -static void +void PopulateCommonCstateInfo(CopyState cstate, TupleDesc tupDesc, List *attnamelist) { int num_phys_attrs; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833..cd2d56e 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,14 +14,394 @@ #ifndef COPY_H #define COPY_H +#include "access/parallel.h" +#include "commands/trigger.h" +#include "executor/executor.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" #include "tcop/dest.h" +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + +/* + * The macros DATA_BLOCK_SIZE, RINGSIZE & MAX_BLOCKS_COUNT stores the records + * read from the file that need to be inserted into the relation. These values + * help in the handover of multiple records with the significant size of data to + * be processed by each of the workers. This also ensures there is no context + * switch and the work is fairly distributed among the workers. This number + * showed best results in the performance tests. + */ +#define DATA_BLOCK_SIZE RAW_BUF_SIZE + +/* It can hold 1023 blocks of 64K data in DSM to be processed by the worker. */ +#define MAX_BLOCKS_COUNT 1024 + +/* + * It can hold upto 10240 record information for worker to process. RINGSIZE + * should be a multiple of WORKER_CHUNK_COUNT, as wrap around cases is currently + * not handled while selecting the WORKER_CHUNK_COUNT by the worker. + */ +#define RINGSIZE (10 * 1024) + +/* + * Each worker will be allocated WORKER_CHUNK_COUNT of records from DSM data + * block to process to avoid lock contention. Read RINGSIZE comments before + * changing this value. + */ +#define WORKER_CHUNK_COUNT 64 + +/* + * Represents the different source/dest cases we need to worry about at + * the bottom level + */ +typedef enum CopyDest +{ + COPY_FILE, /* to/from file (or a piped program) */ + COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ + COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ + COPY_CALLBACK /* to/from callback function */ +} CopyDest; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL +} EolType; + +/* + * Copy data block information. + * + * These data blocks are created in DSM. Data read from file will be copied in + * these DSM data blocks. The leader process identifies the records and the + * record information will be shared to the workers. The workers will insert the + * records into the table. There can be one or more number of records in each of + * the data block based on the record size. + */ +typedef struct ParallelCopyDataBlock +{ + /* The number of unprocessed lines in the current block. */ + pg_atomic_uint32 unprocessed_line_parts; + + /* + * If the current line data is continued into another block, + * following_block will have the position where the remaining data need to + * be read. + */ + uint32 following_block; + + /* + * This flag will be set, when the leader finds out this block can be read + * safely by the worker. This helps the worker to start processing the + * line early where the line will be spread across many blocks and the + * worker need not wait for the complete line to be processed. + */ + bool curr_blk_completed; + + /* + * Few bytes need to be skipped from this block, this will be set when a + * sequence of characters like \r\n is expected, but end of our block + * contained only \r. In this case we copy the data from \r into the new + * block as they have to be processed together to identify end of line. + * Worker will use skip_bytes to know that this data must be skipped from + * this data block. + */ + uint8 skip_bytes; + char data[DATA_BLOCK_SIZE]; /* data read from file */ +} ParallelCopyDataBlock; + +/* + * Individual line information. + * + * ParallelCopyLineBoundary is common data structure between leader & worker. + * Leader process will be populating data block, data block offset & the size of + * the record in DSM for the workers to copy the data into the relation. + * The leader & worker process access the shared line information by following + * the below steps to avoid any data corruption or hang: + * Leader should operate in the following order: + * 1) check if line_size is -1, if line_size is not -1 wait until line_size is + * set to -1 by the worker. If line_size is -1 it means worker is still + * processing. + * 2) set line_state to LINE_LEADER_POPULATING, so that the worker knows that + * leader is populating this line. + * 3) update first_block, start_offset & cur_lineno in any order. + * 4) update line_size. + * 5) update line_state to LINE_LEADER_POPULATED. + * Worker should operate in the following order: + * 1) check line_state is LINE_LEADER_POPULATED, if not it means leader is still + * populating the data. + * 2) read line_size to know the size of the data. + * 3) only one worker should choose one line for processing, this is handled by + * using pg_atomic_compare_exchange_u32, worker will change the state to + * LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED. + * 4) read first_block, start_offset & cur_lineno in any order. + * 5) process line_size data. + * 6) update line_size to -1. + */ +typedef struct ParallelCopyLineBoundary +{ + /* Position of the first block in data_blocks array. */ + uint32 first_block; + uint32 start_offset; /* start offset of the line */ + + /* + * Size of the current line -1 means line is yet to be filled completely, + * 0 means empty line, >0 means line filled with line size data. + */ + pg_atomic_uint32 line_size; + pg_atomic_uint32 line_state; /* line state */ + uint64 cur_lineno; /* line number for error messages */ +} ParallelCopyLineBoundary; + +/* + * Circular queue used to store the line information. + */ +typedef struct ParallelCopyLineBoundaries +{ + /* Position for the leader to populate a line. */ + uint32 pos; + + /* Data read from the file/stdin by the leader process. */ + ParallelCopyLineBoundary ring[RINGSIZE]; +} ParallelCopyLineBoundaries; + +/* + * Shared information among parallel copy workers. This will be allocated in the + * DSM segment. + */ +typedef struct ParallelCopyShmInfo +{ + bool is_read_in_progress; /* file read status */ + + /* + * Actual lines inserted by worker, will not be same as + * total_worker_processed if where condition is specified along with copy. + * This will be the actual records inserted into the relation. + */ + pg_atomic_uint64 processed; + + /* + * The number of records currently processed by the worker, this will also + * include the number of records that was filtered because of where + * clause. + */ + pg_atomic_uint64 total_worker_processed; + uint64 populated; /* lines populated by leader */ + uint32 cur_block_pos; /* current data block */ + ParallelCopyDataBlock data_blocks[MAX_BLOCKS_COUNT]; /* data block array */ + FullTransactionId full_transaction_id; /* xid for copy from statement */ + CommandId mycid; /* command id */ + ParallelCopyLineBoundaries line_boundaries; /* line array */ +} ParallelCopyShmInfo; + +/* + * Parallel copy line buffer information. + */ +typedef struct ParallelCopyLineBuf +{ + StringInfoData line_buf; + uint64 cur_lineno; /* line number for error messages */ +} ParallelCopyLineBuf; + +/* + * This structure helps in storing the common data from CopyStateData that are + * required by the workers. This information will then be allocated and stored + * into the DSM for the worker to retrieve and copy it to CopyStateData. + */ +typedef struct SerializedParallelCopyState +{ + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + bool csv_mode; /* Comma Separated Value format? */ + bool header_line; /* CSV header line? */ + int null_print_len; /* length of same */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool convert_selectively; /* do selective binary conversion? */ + + /* Working state for COPY FROM */ + AttrNumber num_defaults; + Oid relid; +} SerializedParallelCopyState; + +/* + * Parallel copy data information. + */ +typedef struct ParallelCopyData +{ + Oid relid; /* relation id of the table */ + ParallelCopyShmInfo *pcshared_info; /* common info in shared memory */ + bool is_leader; + + WalUsage *walusage; + BufferUsage *bufferusage; + + /* line position which worker is processing */ + uint32 worker_processed_pos; + + /* + * Local line_buf array, workers will copy it here and release the lines + * for the leader to continue. + */ + ParallelCopyLineBuf worker_line_buf[WORKER_CHUNK_COUNT]; + uint32 worker_line_buf_count; /* Number of lines */ + + /* Current position in worker_line_buf */ + uint32 worker_line_buf_pos; +} ParallelCopyData; + +typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); + +/* + * This struct contains all the state variables used throughout a COPY + * operation. For simplicity, we use the same struct for all variants of COPY, + * even though some fields are used in only some cases. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyStateData +{ + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for + * dest == COPY_NEW_FE in COPY FROM */ + bool is_copy_from; /* COPY TO, or COPY FROM? */ + bool reached_eof; /* true if we read to end of copy data (not + * all copy_dest types maintain this) */ + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy to or from */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN/STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + bool binary; /* binary format? */ + bool freeze; /* freeze rows on loading? */ + bool csv_mode; /* Comma Separated Value format? */ + bool header_line; /* CSV header line? */ + char *null_print; /* NULL marker string (server encoding!) */ + int null_print_len; /* length of same */ + char *null_print_client; /* same converted to file encoding */ + char *delim; /* column delimiter (must be 1 byte) */ + char *quote; /* CSV quote char (must be 1 byte) */ + char *escape; /* CSV escape char (must be 1 byte) */ + List *force_quote; /* list of column names */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool *force_quote_flags; /* per-column CSV FQ flags */ + List *force_notnull; /* list of column names */ + bool *force_notnull_flags; /* per-column CSV FNN flags */ + List *force_null; /* list of column names */ + bool *force_null_flags; /* per-column CSV FN flags */ + bool convert_selectively; /* do selective binary conversion? */ + List *convert_select; /* list of column names (can be NIL) */ + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + + /* + * Working state for COPY TO/FROM + */ + MemoryContext copycontext; /* per-copy execution context */ + + /* + * Working state for COPY TO + */ + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + + /* + * Working state for COPY FROM + */ + AttrNumber num_defaults; + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + int *defmap; /* array of default att numbers */ + ExprState **defexprs; /* array of default att expressions */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, convert it + * to server encoding there, and then extract the individual attribute + * fields into attribute_buf. line_buf is preserved unmodified so that we + * can display it in error messages if appropriate. (In binary mode, + * line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_converted; /* converted to server encoding? */ + bool line_buf_valid; /* contains the row being processed? */ + + /* + * Finally, raw_buf holds raw data read from the data source (file or + * client connection). In text mode, CopyReadLine parses this data + * sufficiently to locate line boundaries, then transfers the data to + * line_buf and converts it. In binary mode, CopyReadBinaryData fetches + * appropriate amounts of data from this buffer. In both modes, we + * guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + int nworkers; + bool is_parallel; + ParallelCopyData *pcdata; + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) +} CopyStateData; + /* CopyStateData is private in commands/copy.c */ typedef struct CopyStateData *CopyState; -typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, @@ -41,4 +421,11 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); +extern void PopulateCommonCstateInfo(CopyState cstate, TupleDesc tup_desc, + List *attnamelist); + +extern void ParallelCopyMain(dsm_segment *seg, shm_toc *toc); +extern ParallelContext *BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid); +extern void ParallelCopyFrom(CopyState cstate); +extern void EndParallelCopy(ParallelContext *pcxt); #endif /* COPY_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9cd1179..f5b818b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1702,6 +1702,12 @@ ParallelBitmapHeapState ParallelBlockTableScanDesc ParallelCompletionPtr ParallelContext +ParallelCopyLineBoundaries +ParallelCopyLineBoundary +ParallelCopyData +ParallelCopyDataBlock +ParallelCopyLineBuf +ParallelCopyShmInfo ParallelExecutorInfo ParallelHashGrowth ParallelHashJoinBatch @@ -2219,6 +2225,7 @@ SerCommitSeqNo SerialControl SerializableXactHandle SerializedActiveRelMaps +SerializedParallelCopyState SerializedReindexState SerializedSnapshotData SerializedTransactionState -- 1.8.3.1