From 92a86ca52e33444ef2f559a1da9c8632398892a4 Mon Sep 17 00:00:00 2001 From: "Andrey V. Lepikhov" Date: Fri, 29 May 2020 10:39:57 +0500 Subject: [PATCH] Fast COPY FROM into the foreign (or sharded) table. --- contrib/postgres_fdw/deparse.c | 25 ++ .../postgres_fdw/expected/postgres_fdw.out | 5 +- contrib/postgres_fdw/postgres_fdw.c | 95 ++++++++ contrib/postgres_fdw/postgres_fdw.h | 1 + src/backend/commands/copy.c | 213 ++++++++++++------ src/include/commands/copy.h | 5 + src/include/foreign/fdwapi.h | 9 + 7 files changed, 286 insertions(+), 67 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index ad37a74221..427402c8eb 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -1758,6 +1758,31 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * Deparse COPY FROM into given buf. + * We need to use list of parameters at each query. + */ +void +deparseCopyFromSql(StringInfo buf, Relation rel) +{ + int attnum; + + appendStringInfoString(buf, "COPY "); + deparseRelation(buf, rel); + appendStringInfoString(buf, " ( "); + + for(attnum = 0; attnum < rel->rd_att->natts; attnum++) + { + appendStringInfoString(buf, NameStr(rel->rd_att->attrs[attnum].attname)); + + if (attnum != rel->rd_att->natts-1) + appendStringInfoString(buf, ", "); + } + + appendStringInfoString(buf, " ) "); + appendStringInfoString(buf, " FROM STDIN "); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 90db550b92..5ae24fef7c 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8063,8 +8063,9 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) -COPY rem2, line 1: "-1 xyzzy" +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2 ( f1, f2 ) FROM STDIN +COPY rem2, line 2 select * from rem2; f1 | f2 ----+----- diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9fc53cad68..bd2a8f596f 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -18,6 +18,7 @@ #include "access/sysattr.h" #include "access/table.h" #include "catalog/pg_class.h" +#include "commands/copy.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" @@ -190,6 +191,7 @@ typedef struct PgFdwModifyState /* for update row movement if subplan result rel */ struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if * created */ + CopyState fdwcstate; } PgFdwModifyState; /* @@ -350,12 +352,16 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +static void postgresExecForeignCopy(ResultRelInfo *resultRelInfo, + TupleTableSlot *slot); static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo); static void postgresBeginForeignInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo); static void postgresEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo); +static void postgresBeginForeignCopy(ResultRelInfo *resultRelInfo); +static void postgresEndForeignCopy(ResultRelInfo *resultRelInfo, bool status); static int postgresIsForeignRelUpdatable(Relation rel); static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, @@ -530,9 +536,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->ExecForeignInsert = postgresExecForeignInsert; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; + routine->ExecForeignCopy = postgresExecForeignCopy; routine->EndForeignModify = postgresEndForeignModify; routine->BeginForeignInsert = postgresBeginForeignInsert; routine->EndForeignInsert = postgresEndForeignInsert; + routine->BeginForeignCopy = postgresBeginForeignCopy; + routine->EndForeignCopy = postgresEndForeignCopy; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; routine->PlanDirectModify = postgresPlanDirectModify; routine->BeginDirectModify = postgresBeginDirectModify; @@ -1890,6 +1899,27 @@ postgresExecForeignDelete(EState *estate, slot, planSlot); } +/* + * postgresExecForeignCopy + * Copy one row into a foreign table + */ +static void +postgresExecForeignCopy(ResultRelInfo *resultRelInfo, TupleTableSlot *slot) +{ + PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState; + char *buf; + + buf = NextForeignCopyRow(fmstate->fdwcstate, slot); + + if (PQputCopyData(fmstate->conn, buf, strlen(buf)) <= 0) + { + PGresult *res; + + res = PQgetResult(fmstate->conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, false, fmstate->query); + } +} /* * postgresEndForeignModify * Finish an insert/update/delete operation on a foreign table @@ -2051,6 +2081,71 @@ postgresEndForeignInsert(EState *estate, finish_foreign_modify(fmstate); } +/* + * postgresBeginForeignCopy + * Begin an COPY operation on a foreign table + */ +static void +postgresBeginForeignCopy(ResultRelInfo *resultRelInfo) +{ + Relation rel = resultRelInfo->ri_RelationDesc; + PgFdwModifyState *fmstate = (PgFdwModifyState *) (resultRelInfo->ri_FdwState); + StringInfoData sql; + PGresult *res; + + Assert(resultRelInfo->ri_FdwRoutine != NULL); + + fmstate->target_attrs = NULL; + fmstate->has_returning = false; + fmstate->retrieved_attrs = NULL; + + if (fmstate->fdwcstate == NULL) + fmstate->fdwcstate = BeginForeignCopyTo(rel); + + initStringInfo(&sql); + deparseCopyFromSql(&sql, rel); + fmstate->query = sql.data; + + res = PQexec(fmstate->conn, fmstate->query); +} + +/* + * postgresEndForeignCopy + * Finish an COPY operation on a foreign table + */ +static void +postgresEndForeignCopy(ResultRelInfo *resultRelInfo, bool status) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + PGresult *res; + + Assert(fmstate != NULL); + + if (!status) + { + PQputCopyEnd(fmstate->conn, (PQprotocolVersion(fmstate->conn) < 3) ? + NULL : + _("aborted foreign copy")); + pfree(fmstate->fdwcstate); + fmstate->fdwcstate = NULL; + EndForeignCopyTo(fmstate->fdwcstate); + return; + } + + while (res = PQgetResult(fmstate->conn), PQresultStatus(res) == PGRES_COPY_IN) + { + /* We can't send an error message if we're using protocol version 2 */ + PQputCopyEnd(fmstate->conn, (status || PQprotocolVersion(fmstate->conn) < 3) ? NULL : + _("aborted foreign copy")); + PQclear(res); + } + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, false, fmstate->query); + + while (PQgetResult(fmstate->conn) != NULL); +} + /* * postgresIsForeignRelUpdatable * Determine whether a foreign table supports INSERT, UPDATE and/or diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..8fc5ff018f 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -162,6 +162,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, List **retrieved_attrs); +extern void deparseCopyFromSql(StringInfo buf, Relation rel); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 6d53dc463c..87e0f46846 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -133,6 +133,7 @@ typedef struct CopyStateData char *filename; /* filename, or NULL for STDIN/STDOUT */ bool is_program; /* is 'filename' a program to popen? */ copy_data_source_cb data_source_cb; /* function for reading data */ + copy_data_dest_cb data_dest_cb; bool binary; /* binary format? */ bool freeze; /* freeze rows on loading? */ bool csv_mode; /* Comma Separated Value format? */ @@ -358,8 +359,11 @@ static void EndCopy(CopyState cstate); static void ClosePipeToProgram(CopyState cstate); static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, - List *attnamelist, List *options); + copy_data_dest_cb data_dest_cb, List *attnamelist, + List *options); static void EndCopyTo(CopyState cstate); +static void CopyToStart(CopyState cstate); +static void CopyToFinish(CopyState cstate); static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot); @@ -587,7 +591,9 @@ CopySendEndOfRow(CopyState cstate) (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; case COPY_CALLBACK: - Assert(false); /* Not yet supported. */ + CopySendChar(cstate, '\n'); + CopySendChar(cstate, '\0'); + cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -1075,7 +1081,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, else { cstate = BeginCopyTo(pstate, rel, query, relid, - stmt->filename, stmt->is_program, + stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); @@ -1815,6 +1821,32 @@ EndCopy(CopyState cstate) pfree(cstate); } +static char *buf = NULL; +static void +data_dest_cb(void *outbuf, int len) +{ + buf = (char *) palloc(len); + memcpy(buf, (char *) outbuf, len); +} + +CopyState +BeginForeignCopyTo(Relation rel) +{ + CopyState cstate; + + cstate = BeginCopy(NULL, false, rel, NULL, InvalidOid, NIL, NIL); + cstate->copy_dest = COPY_CALLBACK; + cstate->data_dest_cb = data_dest_cb; + CopyToStart(cstate); + return cstate; +} + +void +EndForeignCopyTo(CopyState cstate) +{ + CopyToFinish(cstate); +} + /* * Setup CopyState to read tuples from a table or a query for COPY TO. */ @@ -1825,6 +1857,7 @@ BeginCopyTo(ParseState *pstate, Oid queryRelId, const char *filename, bool is_program, + copy_data_dest_cb data_dest_cb, List *attnamelist, List *options) { @@ -1880,6 +1913,11 @@ BeginCopyTo(ParseState *pstate, if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } + else if (data_dest_cb) + { + cstate->copy_dest = COPY_CALLBACK; + cstate->data_dest_cb = data_dest_cb; + } else { cstate->filename = pstrdup(filename); @@ -1950,6 +1988,13 @@ BeginCopyTo(ParseState *pstate, return cstate; } +char * +NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot) +{ + CopyOneRowTo(cstate, slot); + return buf; +} + /* * This intermediate routine exists mainly to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". @@ -1966,7 +2011,9 @@ DoCopyTo(CopyState cstate) if (fe_copy) SendCopyBegin(cstate); + CopyToStart(cstate); processed = CopyTo(cstate); + CopyToFinish(cstate); if (fe_copy) SendCopyEnd(cstate); @@ -2005,16 +2052,12 @@ EndCopyTo(CopyState cstate) EndCopy(cstate); } -/* - * Copy from relation or query TO file. - */ -static uint64 -CopyTo(CopyState cstate) +static void +CopyToStart(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; ListCell *cur; - uint64 processed; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); @@ -2104,6 +2147,29 @@ CopyTo(CopyState cstate) CopySendEndOfRow(cstate); } } +} + +static void +CopyToFinish(CopyState cstate) +{ + if (cstate->binary) + { + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); + } + + MemoryContextDelete(cstate->rowcontext); +} + +/* + * Copy from relation or query TO file. + */ +static uint64 +CopyTo(CopyState cstate) +{ + uint64 processed; if (cstate->rel) { @@ -2135,17 +2201,6 @@ CopyTo(CopyState cstate) ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true); processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - - if (cstate->binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } - - MemoryContextDelete(cstate->rowcontext); - return processed; } @@ -2449,53 +2504,82 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->line_buf_valid = false; save_cur_lineno = cstate->cur_lineno; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); - - for (i = 0; i < nused; i++) + if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE) { - /* - * If there are any indexes, update them for all the inserted tuples, - * and run AFTER ROW INSERT triggers. - */ - if (resultRelInfo->ri_NumIndices > 0) + /* Flush into foreign table or partition */ + int i; + bool status = false; + + Assert(resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwState != NULL); + + PG_TRY(); { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL, - NIL); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); + resultRelInfo->ri_FdwRoutine->BeginForeignCopy(resultRelInfo); + for (i = 0; i < nused; i++) + resultRelInfo->ri_FdwRoutine->ExecForeignCopy(resultRelInfo, + slots[i]); + status = true; } - + PG_FINALLY(); + { + resultRelInfo->ri_FdwRoutine->EndForeignCopy( + buffer->resultRelInfo, + status); + } + PG_END_TRY(); + } + else + { /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. + * table_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + table_multi_insert(resultRelInfo->ri_RelationDesc, + slots, + nused, + mycid, + ti_options, + buffer->bistate); + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < nused; i++) { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, cstate->transition_capture); - } + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL, + NIL); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } - ExecClearTuple(slots[i]); + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, cstate->transition_capture); + } + + ExecClearTuple(slots[i]); + } } /* Mark that all slots are free */ @@ -2868,8 +2952,7 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } - else if (resultRelInfo->ri_FdwRoutine != NULL || - cstate->volatile_defexprs) + else if (cstate->volatile_defexprs) { /* * Can't support multi-inserts to foreign tables or if there are any @@ -3037,8 +3120,7 @@ CopyFrom(CopyState cstate) */ leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && !has_before_insert_row_trig && - !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; + !has_instead_insert_row_trig; /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert) @@ -3048,7 +3130,8 @@ CopyFrom(CopyState cstate) resultRelInfo); } else if (insertMethod == CIM_MULTI_CONDITIONAL && - !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) + !CopyMultiInsertInfoIsEmpty(&multiInsertInfo) && + resultRelInfo->ri_FdwRoutine == NULL) { /* * Flush pending inserts if this partition can't use diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833565..ef119a761a 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -22,6 +22,7 @@ /* CopyStateData is private in commands/copy.c */ typedef struct CopyStateData *CopyState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); +typedef void (*copy_data_dest_cb) (void *outbuf, int len); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, @@ -41,4 +42,8 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); +extern CopyState BeginForeignCopyTo(Relation rel); +extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot); +extern void EndForeignCopyTo(CopyState cstate); + #endif /* COPY_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 95556dfb15..197301c5a5 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -94,6 +94,8 @@ typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +typedef void (*ExecForeignCopy_function) (ResultRelInfo *rinfo, + TupleTableSlot *slot); typedef void (*EndForeignModify_function) (EState *estate, ResultRelInfo *rinfo); @@ -104,6 +106,10 @@ typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate, typedef void (*EndForeignInsert_function) (EState *estate, ResultRelInfo *rinfo); +typedef void (*BeginForeignCopy_function) (ResultRelInfo *rinfo); + +typedef void (*EndForeignCopy_function) (ResultRelInfo *rinfo, bool status); + typedef int (*IsForeignRelUpdatable_function) (Relation rel); typedef bool (*PlanDirectModify_function) (PlannerInfo *root, @@ -211,9 +217,12 @@ typedef struct FdwRoutine ExecForeignInsert_function ExecForeignInsert; ExecForeignUpdate_function ExecForeignUpdate; ExecForeignDelete_function ExecForeignDelete; + ExecForeignCopy_function ExecForeignCopy; EndForeignModify_function EndForeignModify; BeginForeignInsert_function BeginForeignInsert; EndForeignInsert_function EndForeignInsert; + BeginForeignCopy_function BeginForeignCopy; + EndForeignCopy_function EndForeignCopy; IsForeignRelUpdatable_function IsForeignRelUpdatable; PlanDirectModify_function PlanDirectModify; BeginDirectModify_function BeginDirectModify; -- 2.17.1