From abe4db0a5391735f7663daac81df579644a70fc3 Mon Sep 17 00:00:00 2001 From: Andrey Lepikhov Date: Wed, 17 Jun 2020 11:07:54 +0500 Subject: [PATCH] Fast COPY FROM into the foreign or sharded table. This feature enables bulk COPY into foreign table in the case of multi inserts is possible and foreign table has non-zero number of columns. --- contrib/postgres_fdw/deparse.c | 60 ++++- .../postgres_fdw/expected/postgres_fdw.out | 33 ++- contrib/postgres_fdw/postgres_fdw.c | 98 ++++++++ contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 28 +++ src/backend/commands/copy.c | 223 ++++++++++++------ src/include/commands/copy.h | 5 + src/include/foreign/fdwapi.h | 9 + 8 files changed, 374 insertions(+), 83 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index ad37a74221..a37981ff66 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -184,6 +184,8 @@ static void appendAggOrderBy(List *orderList, List *targetList, static void appendFunctionName(Oid funcid, deparse_expr_cxt *context); static Node *deparseSortGroupClause(Index ref, List *tlist, bool force_colno, deparse_expr_cxt *context); +static List *deparseRelColumnList(StringInfo buf, Relation rel, + bool enclose_in_parens); /* * Helper functions @@ -1758,6 +1760,20 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * Deparse COPY FROM into given buf. + * We need to use list of parameters at each query. + */ +void +deparseCopyFromSql(StringInfo buf, Relation rel) +{ + appendStringInfoString(buf, "COPY "); + deparseRelation(buf, rel); + (void) deparseRelColumnList(buf, rel, true); + + appendStringInfoString(buf, " FROM STDIN "); +} + /* * deparse remote UPDATE statement * @@ -2061,6 +2077,30 @@ deparseAnalyzeSizeSql(StringInfo buf, Relation rel) */ void deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) +{ + appendStringInfoString(buf, "SELECT "); + *retrieved_attrs = deparseRelColumnList(buf, rel, false); + + /* Don't generate bad syntax for zero-column relation. */ + if (list_length(*retrieved_attrs) == 0) + appendStringInfoString(buf, "NULL"); + + /* + * Construct FROM clause + */ + appendStringInfoString(buf, " FROM "); + deparseRelation(buf, rel); +} + +/* + * Construct the list of columns of given foreign relation in the order they + * appear in the tuple descriptor of the relation. Ignore any dropped columns. + * Use column names on the foreign server instead of local names. + * + * Optionally enclose the list in parantheses. + */ +static List * +deparseRelColumnList(StringInfo buf, Relation rel, bool enclose_in_parens) { Oid relid = RelationGetRelid(rel); TupleDesc tupdesc = RelationGetDescr(rel); @@ -2069,10 +2109,8 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) List *options; ListCell *lc; bool first = true; + List *retrieved_attrs = NIL; - *retrieved_attrs = NIL; - - appendStringInfoString(buf, "SELECT "); for (i = 0; i < tupdesc->natts; i++) { /* Ignore dropped columns. */ @@ -2081,6 +2119,9 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) if (!first) appendStringInfoString(buf, ", "); + else if (enclose_in_parens) + appendStringInfoChar(buf, '('); + first = false; /* Use attribute name or column_name option. */ @@ -2100,18 +2141,13 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) appendStringInfoString(buf, quote_identifier(colname)); - *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1); + retrieved_attrs = lappend_int(retrieved_attrs, i + 1); } - /* Don't generate bad syntax for zero-column relation. */ - if (first) - appendStringInfoString(buf, "NULL"); + if (enclose_in_parens && list_length(retrieved_attrs) > 0) + appendStringInfoChar(buf, ')'); - /* - * Construct FROM clause - */ - appendStringInfoString(buf, " FROM "); - deparseRelation(buf, rel); + return retrieved_attrs; } /* diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 82fc1290ef..3a3cca5047 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8063,8 +8063,9 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) -COPY rem2, line 1: "-1 xyzzy" +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN +COPY rem2, line 2 select * from rem2; f1 | f2 ----+----- @@ -8183,6 +8184,34 @@ drop trigger rem2_trig_row_before on rem2; drop trigger rem2_trig_row_after on rem2; drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +ERROR: column "f1" of relation "loc2" does not exist +CONTEXT: remote SQL command: COPY public.loc2(f1, f2) FROM STDIN +COPY rem2, line 3 +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + f1 | f2 +----+---- +(0 rows) + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(2 rows) + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(4 rows) + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9fc53cad68..45441f3441 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -18,6 +18,7 @@ #include "access/sysattr.h" #include "access/table.h" #include "catalog/pg_class.h" +#include "commands/copy.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" @@ -190,6 +191,7 @@ typedef struct PgFdwModifyState /* for update row movement if subplan result rel */ struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if * created */ + CopyState fdwcstate; } PgFdwModifyState; /* @@ -350,12 +352,16 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +static void postgresExecForeignCopy(ResultRelInfo *resultRelInfo, + TupleTableSlot *slot); static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo); static void postgresBeginForeignInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo); static void postgresEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo); +static void postgresBeginForeignCopy(ResultRelInfo *resultRelInfo); +static void postgresEndForeignCopy(ResultRelInfo *resultRelInfo, bool status); static int postgresIsForeignRelUpdatable(Relation rel); static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, @@ -530,9 +536,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->ExecForeignInsert = postgresExecForeignInsert; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; + routine->ExecForeignCopy = postgresExecForeignCopy; routine->EndForeignModify = postgresEndForeignModify; routine->BeginForeignInsert = postgresBeginForeignInsert; routine->EndForeignInsert = postgresEndForeignInsert; + routine->BeginForeignCopy = postgresBeginForeignCopy; + routine->EndForeignCopy = postgresEndForeignCopy; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; routine->PlanDirectModify = postgresPlanDirectModify; routine->BeginDirectModify = postgresBeginDirectModify; @@ -1890,6 +1899,27 @@ postgresExecForeignDelete(EState *estate, slot, planSlot); } +/* + * postgresExecForeignCopy + * Copy one row into a foreign table + */ +static void +postgresExecForeignCopy(ResultRelInfo *resultRelInfo, TupleTableSlot *slot) +{ + PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState; + char *buf; + + buf = NextForeignCopyRow(fmstate->fdwcstate, slot); + + if (PQputCopyData(fmstate->conn, buf, strlen(buf)) <= 0) + { + PGresult *res; + + res = PQgetResult(fmstate->conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, false, fmstate->query); + } +} /* * postgresEndForeignModify * Finish an insert/update/delete operation on a foreign table @@ -2051,6 +2081,74 @@ postgresEndForeignInsert(EState *estate, finish_foreign_modify(fmstate); } +/* + * postgresBeginForeignCopy + * Begin an COPY operation on a foreign table + */ +static void +postgresBeginForeignCopy(ResultRelInfo *resultRelInfo) +{ + Relation rel = resultRelInfo->ri_RelationDesc; + PgFdwModifyState *fmstate = (PgFdwModifyState *) (resultRelInfo->ri_FdwState); + StringInfoData sql; + PGresult *res; + + Assert(resultRelInfo->ri_FdwRoutine != NULL); + + fmstate->target_attrs = NULL; + fmstate->has_returning = false; + fmstate->retrieved_attrs = NULL; + + if (fmstate->fdwcstate == NULL) + fmstate->fdwcstate = BeginForeignCopyTo(rel); + + initStringInfo(&sql); + deparseCopyFromSql(&sql, rel); + fmstate->query = sql.data; + + res = PQexec(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(ERROR, res, fmstate->conn, false, fmstate->query); + PQclear(res); +} + +/* + * postgresEndForeignCopy + * Finish an COPY operation on a foreign table + */ +static void +postgresEndForeignCopy(ResultRelInfo *resultRelInfo, bool status) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + PGresult *res; + + Assert(fmstate != NULL); + + if (!status) + { + PQputCopyEnd(fmstate->conn, (PQprotocolVersion(fmstate->conn) < 3) ? + NULL : + _("aborted foreign copy")); + pfree(fmstate->fdwcstate); + fmstate->fdwcstate = NULL; + EndForeignCopyTo(fmstate->fdwcstate); + return; + } + + while (res = PQgetResult(fmstate->conn), PQresultStatus(res) == PGRES_COPY_IN) + { + /* We can't send an error message if we're using protocol version 2 */ + PQputCopyEnd(fmstate->conn, (status || PQprotocolVersion(fmstate->conn) < 3) ? NULL : + _("aborted foreign copy")); + PQclear(res); + } + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, false, fmstate->query); + + while (PQgetResult(fmstate->conn) != NULL); +} + /* * postgresIsForeignRelUpdatable * Determine whether a foreign table supports INSERT, UPDATE and/or diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..8fc5ff018f 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -162,6 +162,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, List **retrieved_attrs); +extern void deparseCopyFromSql(StringInfo buf, Relation rel); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 83971665e3..73f98a3152 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2293,6 +2293,34 @@ drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +1 foo +2 bar +\. + +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 6d53dc463c..1be164b3ca 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -133,6 +133,7 @@ typedef struct CopyStateData char *filename; /* filename, or NULL for STDIN/STDOUT */ bool is_program; /* is 'filename' a program to popen? */ copy_data_source_cb data_source_cb; /* function for reading data */ + copy_data_dest_cb data_dest_cb; bool binary; /* binary format? */ bool freeze; /* freeze rows on loading? */ bool csv_mode; /* Comma Separated Value format? */ @@ -358,8 +359,11 @@ static void EndCopy(CopyState cstate); static void ClosePipeToProgram(CopyState cstate); static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, - List *attnamelist, List *options); + copy_data_dest_cb data_dest_cb, List *attnamelist, + List *options); static void EndCopyTo(CopyState cstate); +static void CopyToStart(CopyState cstate); +static void CopyToFinish(CopyState cstate); static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot); @@ -587,7 +591,9 @@ CopySendEndOfRow(CopyState cstate) (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; case COPY_CALLBACK: - Assert(false); /* Not yet supported. */ + CopySendChar(cstate, '\n'); + CopySendChar(cstate, '\0'); + cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -1075,7 +1081,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, else { cstate = BeginCopyTo(pstate, rel, query, relid, - stmt->filename, stmt->is_program, + stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); @@ -1815,6 +1821,32 @@ EndCopy(CopyState cstate) pfree(cstate); } +static char *buf = NULL; +static void +data_dest_cb(void *outbuf, int len) +{ + buf = (char *) palloc(len); + memcpy(buf, (char *) outbuf, len); +} + +CopyState +BeginForeignCopyTo(Relation rel) +{ + CopyState cstate; + + cstate = BeginCopy(NULL, false, rel, NULL, InvalidOid, NIL, NIL); + cstate->copy_dest = COPY_CALLBACK; + cstate->data_dest_cb = data_dest_cb; + CopyToStart(cstate); + return cstate; +} + +void +EndForeignCopyTo(CopyState cstate) +{ + CopyToFinish(cstate); +} + /* * Setup CopyState to read tuples from a table or a query for COPY TO. */ @@ -1825,6 +1857,7 @@ BeginCopyTo(ParseState *pstate, Oid queryRelId, const char *filename, bool is_program, + copy_data_dest_cb data_dest_cb, List *attnamelist, List *options) { @@ -1880,6 +1913,11 @@ BeginCopyTo(ParseState *pstate, if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } + else if (data_dest_cb) + { + cstate->copy_dest = COPY_CALLBACK; + cstate->data_dest_cb = data_dest_cb; + } else { cstate->filename = pstrdup(filename); @@ -1950,6 +1988,13 @@ BeginCopyTo(ParseState *pstate, return cstate; } +char * +NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot) +{ + CopyOneRowTo(cstate, slot); + return buf; +} + /* * This intermediate routine exists mainly to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". @@ -1966,7 +2011,9 @@ DoCopyTo(CopyState cstate) if (fe_copy) SendCopyBegin(cstate); + CopyToStart(cstate); processed = CopyTo(cstate); + CopyToFinish(cstate); if (fe_copy) SendCopyEnd(cstate); @@ -2005,16 +2052,12 @@ EndCopyTo(CopyState cstate) EndCopy(cstate); } -/* - * Copy from relation or query TO file. - */ -static uint64 -CopyTo(CopyState cstate) +static void +CopyToStart(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; ListCell *cur; - uint64 processed; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); @@ -2104,6 +2147,29 @@ CopyTo(CopyState cstate) CopySendEndOfRow(cstate); } } +} + +static void +CopyToFinish(CopyState cstate) +{ + if (cstate->binary) + { + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); + } + + MemoryContextDelete(cstate->rowcontext); +} + +/* + * Copy from relation or query TO file. + */ +static uint64 +CopyTo(CopyState cstate) +{ + uint64 processed; if (cstate->rel) { @@ -2135,17 +2201,6 @@ CopyTo(CopyState cstate) ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true); processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - - if (cstate->binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } - - MemoryContextDelete(cstate->rowcontext); - return processed; } @@ -2449,53 +2504,83 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->line_buf_valid = false; save_cur_lineno = cstate->cur_lineno; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); - - for (i = 0; i < nused; i++) + if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE) { - /* - * If there are any indexes, update them for all the inserted tuples, - * and run AFTER ROW INSERT triggers. - */ - if (resultRelInfo->ri_NumIndices > 0) + /* Flush into foreign table or partition */ + int i; + bool status = false; + + Assert(resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwState != NULL); + + resultRelInfo->ri_FdwRoutine->BeginForeignCopy(resultRelInfo); + + PG_TRY(); { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL, - NIL); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); + for (i = 0; i < nused; i++) + resultRelInfo->ri_FdwRoutine->ExecForeignCopy(resultRelInfo, + slots[i]); + status = true; } - + PG_FINALLY(); + { + resultRelInfo->ri_FdwRoutine->EndForeignCopy( + buffer->resultRelInfo, + status); + } + PG_END_TRY(); + } + else + { /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. + * table_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + table_multi_insert(resultRelInfo->ri_RelationDesc, + slots, + nused, + mycid, + ti_options, + buffer->bistate); + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < nused; i++) { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, cstate->transition_capture); - } + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL, + NIL); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } - ExecClearTuple(slots[i]); + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, cstate->transition_capture); + } + + ExecClearTuple(slots[i]); + } } /* Mark that all slots are free */ @@ -2868,14 +2953,14 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } - else if (resultRelInfo->ri_FdwRoutine != NULL || - cstate->volatile_defexprs) + else if (cstate->volatile_defexprs || (resultRelInfo->ri_FdwRoutine != NULL && + list_length(cstate->attnumlist) == 0)) { /* - * Can't support multi-inserts to foreign tables or if there are any - * volatile default expressions in the table. Similarly to the - * trigger case above, such expressions may query the table we're - * inserting into. + * Can't support bufferization of copy into foreign tables without any + * defined columns or if there are any volatile default expressions in the + * table. Similarly to the trigger case above, such expressions may query + * the table we're inserting into. * * Note: It does not matter if any partitions have any volatile * default expressions as we use the defaults from the target of the @@ -3037,8 +3122,7 @@ CopyFrom(CopyState cstate) */ leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && !has_before_insert_row_trig && - !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; + !has_instead_insert_row_trig; /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert) @@ -3048,7 +3132,8 @@ CopyFrom(CopyState cstate) resultRelInfo); } else if (insertMethod == CIM_MULTI_CONDITIONAL && - !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) + !CopyMultiInsertInfoIsEmpty(&multiInsertInfo) && + resultRelInfo->ri_FdwRoutine == NULL) { /* * Flush pending inserts if this partition can't use diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833565..ef119a761a 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -22,6 +22,7 @@ /* CopyStateData is private in commands/copy.c */ typedef struct CopyStateData *CopyState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); +typedef void (*copy_data_dest_cb) (void *outbuf, int len); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, @@ -41,4 +42,8 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); +extern CopyState BeginForeignCopyTo(Relation rel); +extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot); +extern void EndForeignCopyTo(CopyState cstate); + #endif /* COPY_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 95556dfb15..197301c5a5 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -94,6 +94,8 @@ typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +typedef void (*ExecForeignCopy_function) (ResultRelInfo *rinfo, + TupleTableSlot *slot); typedef void (*EndForeignModify_function) (EState *estate, ResultRelInfo *rinfo); @@ -104,6 +106,10 @@ typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate, typedef void (*EndForeignInsert_function) (EState *estate, ResultRelInfo *rinfo); +typedef void (*BeginForeignCopy_function) (ResultRelInfo *rinfo); + +typedef void (*EndForeignCopy_function) (ResultRelInfo *rinfo, bool status); + typedef int (*IsForeignRelUpdatable_function) (Relation rel); typedef bool (*PlanDirectModify_function) (PlannerInfo *root, @@ -211,9 +217,12 @@ typedef struct FdwRoutine ExecForeignInsert_function ExecForeignInsert; ExecForeignUpdate_function ExecForeignUpdate; ExecForeignDelete_function ExecForeignDelete; + ExecForeignCopy_function ExecForeignCopy; EndForeignModify_function EndForeignModify; BeginForeignInsert_function BeginForeignInsert; EndForeignInsert_function EndForeignInsert; + BeginForeignCopy_function BeginForeignCopy; + EndForeignCopy_function EndForeignCopy; IsForeignRelUpdatable_function IsForeignRelUpdatable; PlanDirectModify_function PlanDirectModify; BeginDirectModify_function BeginDirectModify; -- 2.25.1