From ff8f0686abb2e37468d6ce71968a51ada9919674 Mon Sep 17 00:00:00 2001 From: Andrey Lepikhov Date: Thu, 10 Sep 2020 14:37:18 +0500 Subject: [PATCH 4/4] Optimized version of the 'Fast COPY FROM' feature. Execute remote query 'COPY .. FROM STDIN' once for each foreign partition (table) in the BeginForeignCopy() routine. TODO: 1. reporting on errors need to remake. Here is differences from the way of INSERT query on each row: we can find out error event after sending END of copy command. 2. It is necessary to examine all possible ways in which an error may occur during the COPY FROM operation. --- contrib/postgres_fdw/connection.c | 15 ++++ .../postgres_fdw/expected/postgres_fdw.out | 4 +- contrib/postgres_fdw/postgres_fdw.c | 81 +++++++++---------- src/backend/commands/copy.c | 51 ++++++------ src/include/foreign/fdwapi.h | 8 +- 5 files changed, 86 insertions(+), 73 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 048c641e85..8409cea40b 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -827,6 +827,21 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Assume we might have lost track of prepared statements */ entry->have_error = true; + if (entry->conn && PQstatus(entry->conn) == CONNECTION_OK) + { + /* Process special case of the unfinished COPY command */ + res = PQgetResult(entry->conn); + if (PQresultStatus(res) == PGRES_COPY_IN && + (PQputCopyEnd(entry->conn, _("canceled by server")) <= 0 || + PQflush(entry->conn))) + { + ereport(ERROR, + (errmsg("error returned by PQputCopyEnd: %s", + PQerrorMessage(entry->conn)))); + } + PQclear(res); + } + /* * If a command has been submitted to the remote server by * using an asynchronous execution function, the command diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 5206814f10..ef9b903b58 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8086,7 +8086,7 @@ ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). CONTEXT: COPY loc2, line 1: "-1 xyzzy" remote SQL command: COPY public.loc2(f1, f2) FROM STDIN -COPY rem2, line 2 +COPY rem2, line 2: "" select * from rem2; f1 | f2 ----+----- @@ -8223,7 +8223,7 @@ alter table loc2 drop column f2; copy rem2 from stdin; ERROR: column "f1" of relation "loc2" does not exist CONTEXT: remote SQL command: COPY public.loc2(f1, f2) FROM STDIN -COPY rem2, line 3 +COPY rem2, line 0 alter table loc2 add column f1 int; alter table loc2 add column f2 int; select * from rem2; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 8bca71e3f5..6006c359f9 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -359,12 +359,12 @@ static void postgresBeginForeignInsert(ModifyTableState *mtstate, static void postgresEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo); static void postgresBeginForeignCopy(ModifyTableState *mtstate, - ResultRelInfo *resultRelInfo); -static void postgresEndForeignCopy(EState *estate, ResultRelInfo *resultRelInfo); +static void postgresEndForeignCopy(EState *estate, + ResultRelInfo *resultRelInfo); static void postgresExecForeignCopy(ResultRelInfo *resultRelInfo, - TupleTableSlot **slots, - int nslots); + TupleTableSlot **slots, + int nslots); static int postgresIsForeignRelUpdatable(Relation rel); static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, @@ -2093,6 +2093,7 @@ postgresBeginForeignCopy(ModifyTableState *mtstate, StringInfoData sql; RangeTblEntry *rte; Relation rel = resultRelInfo->ri_RelationDesc; + PGresult *res; rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state); initStringInfo(&sql); @@ -2114,6 +2115,14 @@ postgresBeginForeignCopy(ModifyTableState *mtstate, NIL, NIL); CopyToStart(fmstate->cstate); resultRelInfo->ri_FdwState = fmstate; + + /* + * Start COPY operation. We may do so because we got a separate connection. + */ + res = PQexec(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + PQclear(res); } /* @@ -2124,6 +2133,28 @@ static void postgresEndForeignCopy(EState *estate, ResultRelInfo *resultRelInfo) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + PGresult *res; + PGconn *conn = fmstate->conn; + + /* Finish COPY IN protocol. It is needed to do after successful copy or + * after an error. + */ + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + ereport(ERROR, + (errmsg("error returned by PQputCopyEnd: %s", + PQerrorMessage(conn)))); + + /* After successfully sending an EOF signal, check command status. */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + + PQclear(res); + /* Do this to ensure we've pumped libpq back to idle state */ + if (PQgetResult(conn) != NULL) + ereport(ERROR, + (errmsg("unexpected extra results during COPY of table: %s", + PQerrorMessage(conn)))); /* Check correct use of CopyIn FDW API. */ Assert(fmstate->cstate != NULL); @@ -2143,58 +2174,26 @@ postgresExecForeignCopy(ResultRelInfo *resultRelInfo, TupleTableSlot **slots, int nslots) { PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState; - PGresult *res; - PGconn *conn = fmstate->conn; - bool status = false; int i; - /* Check correct use of CopyIn FDW API. */ + /* Check correct use of Copy FDW API. */ Assert(fmstate->cstate != NULL); Assert(copy_fmstate == NULL); - res = PQexec(conn, fmstate->query); - if (PQresultStatus(res) != PGRES_COPY_IN) - pgfdw_report_error(ERROR, res, conn, true, fmstate->query); - PQclear(res); - PG_TRY(); { copy_fmstate = fmstate; for (i = 0; i < nslots; i++) CopyOneRowTo(fmstate->cstate, slots[i]); - - status = true; } - PG_FINALLY(); + PG_CATCH(); { copy_fmstate = NULL; /* Detect problems */ - - /* Finish COPY IN protocol. It is needed to do after successful copy or - * after an error. - */ - if (PQputCopyEnd(conn, status ? NULL : _("canceled by server")) <= 0 || - PQflush(conn)) - ereport(ERROR, - (errmsg("error returned by PQputCopyEnd: %s", - PQerrorMessage(conn)))); - - /* After successfully sending an EOF signal, check command status. */ - res = PQgetResult(conn); - if ((!status && PQresultStatus(res) != PGRES_FATAL_ERROR) || - (status && PQresultStatus(res) != PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); - - PQclear(res); - /* Do this to ensure we've pumped libpq back to idle state */ - if (PQgetResult(conn) != NULL) - ereport(ERROR, - (errmsg("unexpected extra results during COPY of table: %s", - PQerrorMessage(conn)))); - - if (!status) - PG_RE_THROW(); + PG_RE_THROW(); } PG_END_TRY(); + + copy_fmstate = NULL; } /* diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 02a034fb37..02487c9742 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -2941,20 +2941,6 @@ CopyFrom(CopyState cstate) mtstate->operation = CMD_INSERT; mtstate->resultRelInfo = estate->es_result_relations; - /* - * Init COPY into foreign table. - */ - if (target_resultRelInfo->ri_FdwRoutine != NULL) - { - if (target_resultRelInfo->ri_usesMultiInsert) - target_resultRelInfo->ri_FdwRoutine->BeginForeignCopy(mtstate, - resultRelInfo); - else if (target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) - target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, - resultRelInfo); - } - - /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); @@ -3021,6 +3007,19 @@ CopyFrom(CopyState cstate) errcallback.previous = error_context_stack; error_context_stack = &errcallback; + /* + * Init COPY into foreign table. + */ + if (target_resultRelInfo->ri_FdwRoutine != NULL) + { + if (target_resultRelInfo->ri_usesMultiInsert) + target_resultRelInfo->ri_FdwRoutine->BeginForeignCopy(mtstate, + resultRelInfo); + else if (target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) + target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, + resultRelInfo); + } + for (;;) { TupleTableSlot *myslot; @@ -3327,6 +3326,18 @@ CopyFrom(CopyState cstate) if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); + /* Allow the FDW to shut down */ + if (target_resultRelInfo->ri_FdwRoutine != NULL) + { + if (target_resultRelInfo->ri_usesMultiInsert && + target_resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL) + target_resultRelInfo->ri_FdwRoutine->EndForeignCopy(estate, + target_resultRelInfo); + else if (target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) + target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, + target_resultRelInfo); + } + /* Done, clean up */ error_context_stack = errcallback.previous; @@ -3350,18 +3361,6 @@ CopyFrom(CopyState cstate) ExecResetTupleTable(estate->es_tupleTable, false); - /* Allow the FDW to shut down */ - if (target_resultRelInfo->ri_FdwRoutine != NULL) - { - if (target_resultRelInfo->ri_usesMultiInsert && - target_resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL) - target_resultRelInfo->ri_FdwRoutine->EndForeignCopy(estate, - target_resultRelInfo); - else if (target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) - target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, - target_resultRelInfo); - } - /* Tear down the multi-insert buffer data */ CopyMultiInsertInfoCleanup(&multiInsertInfo); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index e932bdf2f4..d807f872ba 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -105,14 +105,14 @@ typedef void (*EndForeignInsert_function) (EState *estate, ResultRelInfo *rinfo); typedef void (*BeginForeignCopy_function) (ModifyTableState *mtstate, - ResultRelInfo *rinfo); + ResultRelInfo *rinfo); typedef void (*EndForeignCopy_function) (EState *estate, - ResultRelInfo *rinfo); + ResultRelInfo *rinfo); typedef void (*ExecForeignCopy_function) (ResultRelInfo *rinfo, - TupleTableSlot **slots, - int nslots); + TupleTableSlot **slots, + int nslots); typedef int (*IsForeignRelUpdatable_function) (Relation rel); -- 2.25.1