From 108dc421cec88ab5afd092f40da3fa31b8fcfbc5 Mon Sep 17 00:00:00 2001 From: "Andrey V. Lepikhov" Date: Mon, 22 Jun 2020 10:28:42 +0500 Subject: [PATCH] Fast COPY FROM into the foreign or sharded table. This feature enables bulk COPY into foreign table in the case of multi inserts is possible and foreign table has non-zero number of columns. --- contrib/postgres_fdw/deparse.c | 60 ++++- .../postgres_fdw/expected/postgres_fdw.out | 33 ++- contrib/postgres_fdw/postgres_fdw.c | 87 ++++++++ contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 28 +++ src/backend/commands/copy.c | 206 ++++++++++++------ src/include/commands/copy.h | 5 + src/include/foreign/fdwapi.h | 8 + 8 files changed, 344 insertions(+), 84 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index ad37a74221..a37981ff66 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -184,6 +184,8 @@ static void appendAggOrderBy(List *orderList, List *targetList, static void appendFunctionName(Oid funcid, deparse_expr_cxt *context); static Node *deparseSortGroupClause(Index ref, List *tlist, bool force_colno, deparse_expr_cxt *context); +static List *deparseRelColumnList(StringInfo buf, Relation rel, + bool enclose_in_parens); /* * Helper functions @@ -1758,6 +1760,20 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * Deparse COPY FROM into given buf. + * We need to use list of parameters at each query. + */ +void +deparseCopyFromSql(StringInfo buf, Relation rel) +{ + appendStringInfoString(buf, "COPY "); + deparseRelation(buf, rel); + (void) deparseRelColumnList(buf, rel, true); + + appendStringInfoString(buf, " FROM STDIN "); +} + /* * deparse remote UPDATE statement * @@ -2061,6 +2077,30 @@ deparseAnalyzeSizeSql(StringInfo buf, Relation rel) */ void deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) +{ + appendStringInfoString(buf, "SELECT "); + *retrieved_attrs = deparseRelColumnList(buf, rel, false); + + /* Don't generate bad syntax for zero-column relation. */ + if (list_length(*retrieved_attrs) == 0) + appendStringInfoString(buf, "NULL"); + + /* + * Construct FROM clause + */ + appendStringInfoString(buf, " FROM "); + deparseRelation(buf, rel); +} + +/* + * Construct the list of columns of given foreign relation in the order they + * appear in the tuple descriptor of the relation. Ignore any dropped columns. + * Use column names on the foreign server instead of local names. + * + * Optionally enclose the list in parantheses. + */ +static List * +deparseRelColumnList(StringInfo buf, Relation rel, bool enclose_in_parens) { Oid relid = RelationGetRelid(rel); TupleDesc tupdesc = RelationGetDescr(rel); @@ -2069,10 +2109,8 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) List *options; ListCell *lc; bool first = true; + List *retrieved_attrs = NIL; - *retrieved_attrs = NIL; - - appendStringInfoString(buf, "SELECT "); for (i = 0; i < tupdesc->natts; i++) { /* Ignore dropped columns. */ @@ -2081,6 +2119,9 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) if (!first) appendStringInfoString(buf, ", "); + else if (enclose_in_parens) + appendStringInfoChar(buf, '('); + first = false; /* Use attribute name or column_name option. */ @@ -2100,18 +2141,13 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) appendStringInfoString(buf, quote_identifier(colname)); - *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1); + retrieved_attrs = lappend_int(retrieved_attrs, i + 1); } - /* Don't generate bad syntax for zero-column relation. */ - if (first) - appendStringInfoString(buf, "NULL"); + if (enclose_in_parens && list_length(retrieved_attrs) > 0) + appendStringInfoChar(buf, ')'); - /* - * Construct FROM clause - */ - appendStringInfoString(buf, " FROM "); - deparseRelation(buf, rel); + return retrieved_attrs; } /* diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 82fc1290ef..3a3cca5047 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8063,8 +8063,9 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) -COPY rem2, line 1: "-1 xyzzy" +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN +COPY rem2, line 2 select * from rem2; f1 | f2 ----+----- @@ -8183,6 +8184,34 @@ drop trigger rem2_trig_row_before on rem2; drop trigger rem2_trig_row_after on rem2; drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +ERROR: column "f1" of relation "loc2" does not exist +CONTEXT: remote SQL command: COPY public.loc2(f1, f2) FROM STDIN +COPY rem2, line 3 +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + f1 | f2 +----+---- +(0 rows) + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(2 rows) + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(4 rows) + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9fc53cad68..2b3d7d6dfb 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -18,6 +18,7 @@ #include "access/sysattr.h" #include "access/table.h" #include "catalog/pg_class.h" +#include "commands/copy.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" @@ -190,6 +191,7 @@ typedef struct PgFdwModifyState /* for update row movement if subplan result rel */ struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if * created */ + CopyState fdwcstate; } PgFdwModifyState; /* @@ -350,6 +352,9 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +static void postgresExecForeignBulkInsert(ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + int nslots); static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo); static void postgresBeginForeignInsert(ModifyTableState *mtstate, @@ -530,6 +535,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->ExecForeignInsert = postgresExecForeignInsert; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; + routine->ExecForeignBulkInsert = postgresExecForeignBulkInsert; routine->EndForeignModify = postgresEndForeignModify; routine->BeginForeignInsert = postgresBeginForeignInsert; routine->EndForeignInsert = postgresEndForeignInsert; @@ -1890,6 +1896,87 @@ postgresExecForeignDelete(EState *estate, slot, planSlot); } +/* + * postgresExecForeignBulkInsert + * Copy rows into a foreign table by COPY .. FROM STDIN machinery + */ +static void +postgresExecForeignBulkInsert(ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + int nslots) +{ + Relation rel = resultRelInfo->ri_RelationDesc; + PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState; + StringInfoData sql; + PGresult *res; + bool status = false; + PGconn *conn = fmstate->conn; + int i; + + Assert(resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwState != NULL); + + fmstate->target_attrs = NULL; + fmstate->has_returning = false; + fmstate->retrieved_attrs = NULL; + fmstate->fdwcstate = BeginForeignCopyTo(rel); + + initStringInfo(&sql); + deparseCopyFromSql(&sql, rel); + fmstate->query = sql.data; + + res = PQexec(conn, fmstate->query); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(ERROR, res, conn, true, fmstate->query); + PQclear(res); + + PG_TRY(); + { + for (i = 0; i < nslots; i++) + { + char *buf = NextForeignCopyRow(fmstate->fdwcstate, slots[i]); + + if (PQputCopyData(conn, buf, strlen(buf)) <= 0) + { + res = PQgetResult(conn); + pgfdw_report_error(ERROR, res, conn, true, fmstate->query); + } + } + + status = true; + } + PG_FINALLY(); + { + /* Finish COPY IN protocol. It is needed to do after successful copy or + * after an error. + */ + if (PQputCopyEnd(conn, status ? NULL : _("canceled by server")) <= 0 || + PQflush(conn)) + ereport(ERROR, + (errmsg("error returned by PQputCopyEnd: %s", + PQerrorMessage(conn)))); + + /* After successfully sending an EOF signal, check command status. */ + res = PQgetResult(conn); + if ((!status && PQresultStatus(res) != PGRES_FATAL_ERROR) || + (status && PQresultStatus(res) != PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + + PQclear(res); + /* Do this to ensure we've pumped libpq back to idle state */ + if (PQgetResult(conn) != NULL) + ereport(ERROR, + (errmsg("unexpected extra results during COPY of table: %s", + PQerrorMessage(conn)))); + + EndForeignCopyTo(fmstate->fdwcstate); + pfree(fmstate->fdwcstate); + + if (!status) + PG_RE_THROW(); + } + PG_END_TRY(); +} /* * postgresEndForeignModify * Finish an insert/update/delete operation on a foreign table diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..8fc5ff018f 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -162,6 +162,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, List **retrieved_attrs); +extern void deparseCopyFromSql(StringInfo buf, Relation rel); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 83971665e3..73f98a3152 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2293,6 +2293,34 @@ drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +1 foo +2 bar +\. + +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 6d53dc463c..ddf3c10146 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -133,6 +133,7 @@ typedef struct CopyStateData char *filename; /* filename, or NULL for STDIN/STDOUT */ bool is_program; /* is 'filename' a program to popen? */ copy_data_source_cb data_source_cb; /* function for reading data */ + copy_data_dest_cb data_dest_cb; bool binary; /* binary format? */ bool freeze; /* freeze rows on loading? */ bool csv_mode; /* Comma Separated Value format? */ @@ -358,8 +359,11 @@ static void EndCopy(CopyState cstate); static void ClosePipeToProgram(CopyState cstate); static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, - List *attnamelist, List *options); + copy_data_dest_cb data_dest_cb, List *attnamelist, + List *options); static void EndCopyTo(CopyState cstate); +static void CopyToStart(CopyState cstate); +static void CopyToFinish(CopyState cstate); static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot); @@ -587,7 +591,9 @@ CopySendEndOfRow(CopyState cstate) (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; case COPY_CALLBACK: - Assert(false); /* Not yet supported. */ + CopySendChar(cstate, '\n'); + CopySendChar(cstate, '\0'); + cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -1075,7 +1081,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, else { cstate = BeginCopyTo(pstate, rel, query, relid, - stmt->filename, stmt->is_program, + stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); @@ -1815,6 +1821,32 @@ EndCopy(CopyState cstate) pfree(cstate); } +static char *buf = NULL; +static void +data_dest_cb(void *outbuf, int len) +{ + buf = (char *) palloc(len); + memcpy(buf, (char *) outbuf, len); +} + +CopyState +BeginForeignCopyTo(Relation rel) +{ + CopyState cstate; + + cstate = BeginCopy(NULL, false, rel, NULL, InvalidOid, NIL, NIL); + cstate->copy_dest = COPY_CALLBACK; + cstate->data_dest_cb = data_dest_cb; + CopyToStart(cstate); + return cstate; +} + +void +EndForeignCopyTo(CopyState cstate) +{ + CopyToFinish(cstate); +} + /* * Setup CopyState to read tuples from a table or a query for COPY TO. */ @@ -1825,6 +1857,7 @@ BeginCopyTo(ParseState *pstate, Oid queryRelId, const char *filename, bool is_program, + copy_data_dest_cb data_dest_cb, List *attnamelist, List *options) { @@ -1880,6 +1913,11 @@ BeginCopyTo(ParseState *pstate, if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } + else if (data_dest_cb) + { + cstate->copy_dest = COPY_CALLBACK; + cstate->data_dest_cb = data_dest_cb; + } else { cstate->filename = pstrdup(filename); @@ -1950,6 +1988,13 @@ BeginCopyTo(ParseState *pstate, return cstate; } +char * +NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot) +{ + CopyOneRowTo(cstate, slot); + return buf; +} + /* * This intermediate routine exists mainly to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". @@ -1966,7 +2011,9 @@ DoCopyTo(CopyState cstate) if (fe_copy) SendCopyBegin(cstate); + CopyToStart(cstate); processed = CopyTo(cstate); + CopyToFinish(cstate); if (fe_copy) SendCopyEnd(cstate); @@ -2005,16 +2052,12 @@ EndCopyTo(CopyState cstate) EndCopy(cstate); } -/* - * Copy from relation or query TO file. - */ -static uint64 -CopyTo(CopyState cstate) +static void +CopyToStart(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; ListCell *cur; - uint64 processed; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); @@ -2104,6 +2147,29 @@ CopyTo(CopyState cstate) CopySendEndOfRow(cstate); } } +} + +static void +CopyToFinish(CopyState cstate) +{ + if (cstate->binary) + { + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); + } + + MemoryContextDelete(cstate->rowcontext); +} + +/* + * Copy from relation or query TO file. + */ +static uint64 +CopyTo(CopyState cstate) +{ + uint64 processed; if (cstate->rel) { @@ -2135,17 +2201,6 @@ CopyTo(CopyState cstate) ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true); processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - - if (cstate->binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } - - MemoryContextDelete(cstate->rowcontext); - return processed; } @@ -2449,53 +2504,64 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->line_buf_valid = false; save_cur_lineno = cstate->cur_lineno; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); - - for (i = 0; i < nused; i++) + if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + { + /* Flush into foreign table or partition */ + resultRelInfo->ri_FdwRoutine->ExecForeignBulkInsert(resultRelInfo, + slots, + nused); + } + else { /* - * If there are any indexes, update them for all the inserted tuples, - * and run AFTER ROW INSERT triggers. + * table_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. */ - if (resultRelInfo->ri_NumIndices > 0) - { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL, - NIL); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); - } + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + table_multi_insert(resultRelInfo->ri_RelationDesc, + slots, + nused, + mycid, + ti_options, + buffer->bistate); + MemoryContextSwitchTo(oldcontext); - /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. - */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + for (i = 0; i < nused; i++) { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, cstate->transition_capture); - } + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL, + NIL); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, cstate->transition_capture); + } - ExecClearTuple(slots[i]); + ExecClearTuple(slots[i]); + } } /* Mark that all slots are free */ @@ -2868,14 +2934,14 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } - else if (resultRelInfo->ri_FdwRoutine != NULL || - cstate->volatile_defexprs) + else if (cstate->volatile_defexprs || (resultRelInfo->ri_FdwRoutine != NULL && + list_length(cstate->attnumlist) == 0)) { /* - * Can't support multi-inserts to foreign tables or if there are any - * volatile default expressions in the table. Similarly to the - * trigger case above, such expressions may query the table we're - * inserting into. + * Can't support bufferization of copy into foreign tables without any + * defined columns or if there are any volatile default expressions in the + * table. Similarly to the trigger case above, such expressions may query + * the table we're inserting into. * * Note: It does not matter if any partitions have any volatile * default expressions as we use the defaults from the target of the @@ -3037,8 +3103,7 @@ CopyFrom(CopyState cstate) */ leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && !has_before_insert_row_trig && - !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; + !has_instead_insert_row_trig; /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert) @@ -3048,7 +3113,8 @@ CopyFrom(CopyState cstate) resultRelInfo); } else if (insertMethod == CIM_MULTI_CONDITIONAL && - !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) + !CopyMultiInsertInfoIsEmpty(&multiInsertInfo) && + resultRelInfo->ri_FdwRoutine == NULL) { /* * Flush pending inserts if this partition can't use diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833565..ef119a761a 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -22,6 +22,7 @@ /* CopyStateData is private in commands/copy.c */ typedef struct CopyStateData *CopyState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); +typedef void (*copy_data_dest_cb) (void *outbuf, int len); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, @@ -41,4 +42,8 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); +extern CopyState BeginForeignCopyTo(Relation rel); +extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot); +extern void EndForeignCopyTo(CopyState cstate); + #endif /* COPY_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 95556dfb15..0507c2f96f 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -94,6 +94,9 @@ typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +typedef void (*ExecForeignBulkInsert_function) (ResultRelInfo *rinfo, + TupleTableSlot **slots, + int nslots); typedef void (*EndForeignModify_function) (EState *estate, ResultRelInfo *rinfo); @@ -104,6 +107,10 @@ typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate, typedef void (*EndForeignInsert_function) (EState *estate, ResultRelInfo *rinfo); +typedef void (*BeginForeignCopy_function) (ResultRelInfo *rinfo); + +typedef void (*EndForeignCopy_function) (ResultRelInfo *rinfo, bool status); + typedef int (*IsForeignRelUpdatable_function) (Relation rel); typedef bool (*PlanDirectModify_function) (PlannerInfo *root, @@ -211,6 +218,7 @@ typedef struct FdwRoutine ExecForeignInsert_function ExecForeignInsert; ExecForeignUpdate_function ExecForeignUpdate; ExecForeignDelete_function ExecForeignDelete; + ExecForeignBulkInsert_function ExecForeignBulkInsert; EndForeignModify_function EndForeignModify; BeginForeignInsert_function BeginForeignInsert; EndForeignInsert_function EndForeignInsert; -- 2.17.1