*** a/contrib/postgres_fdw/postgres_fdw.c --- b/contrib/postgres_fdw/postgres_fdw.c *************** *** 376,387 **** static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, --- 376,396 ---- static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); + static PgFdwModifyState *create_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + Plan *subplan, + char *query, + List *target_attrs, + bool has_returning, + List *retrieved_attrs); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); + static void finish_foreign_modify(PgFdwModifyState *fmstate); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); *************** *** 1681,1698 **** postgresBeginForeignModify(ModifyTableState *mtstate, int eflags) { PgFdwModifyState *fmstate; ! EState *estate = mtstate->ps.state; ! CmdType operation = mtstate->operation; ! Relation rel = resultRelInfo->ri_RelationDesc; ! RangeTblEntry *rte; ! Oid userid; ! ForeignTable *table; ! UserMapping *user; ! AttrNumber n_params; ! Oid typefnoid; ! bool isvarlena; ! ListCell *lc; ! TupleDesc tupdesc = RelationGetDescr(rel); /* * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState --- 1690,1699 ---- int eflags) { PgFdwModifyState *fmstate; ! char *query; ! List *target_attrs; ! bool has_returning; ! List *retrieved_attrs; /* * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState *************** *** 1701,1782 **** postgresBeginForeignModify(ModifyTableState *mtstate, if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; - /* Begin constructing PgFdwModifyState. */ - fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); - fmstate->rel = rel; - - /* - * Identify which user to do the remote access as. This should match what - * ExecCheckRTEPerms() does. - */ - rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table); - userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); - - /* Get info about foreign table. */ - table = GetForeignTable(RelationGetRelid(rel)); - user = GetUserMapping(userid, table->serverid); - - /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); - fmstate->p_name = NULL; /* prepared statement not made yet */ - /* Deconstruct fdw_private data. */ ! fmstate->query = strVal(list_nth(fdw_private, ! FdwModifyPrivateUpdateSql)); ! fmstate->target_attrs = (List *) list_nth(fdw_private, ! FdwModifyPrivateTargetAttnums); ! fmstate->has_returning = intVal(list_nth(fdw_private, ! FdwModifyPrivateHasReturning)); ! fmstate->retrieved_attrs = (List *) list_nth(fdw_private, ! FdwModifyPrivateRetrievedAttrs); ! ! /* Create context for per-tuple temp workspace. */ ! fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, ! "postgres_fdw temporary data", ! ALLOCSET_SMALL_SIZES); ! ! /* Prepare for input conversion of RETURNING results. */ ! if (fmstate->has_returning) ! fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); ! ! /* Prepare for output conversion of parameters used in prepared stmt. */ ! n_params = list_length(fmstate->target_attrs) + 1; ! fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); ! fmstate->p_nums = 0; ! ! if (operation == CMD_UPDATE || operation == CMD_DELETE) ! { ! /* Find the ctid resjunk column in the subplan's result */ ! Plan *subplan = mtstate->mt_plans[subplan_index]->plan; ! ! fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, ! "ctid"); ! if (!AttributeNumberIsValid(fmstate->ctidAttno)) ! elog(ERROR, "could not find junk ctid column"); ! /* First transmittable parameter will be ctid */ ! getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); ! fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); ! fmstate->p_nums++; ! } ! ! if (operation == CMD_INSERT || operation == CMD_UPDATE) ! { ! /* Set up for remaining transmittable parameters */ ! foreach(lc, fmstate->target_attrs) ! { ! int attnum = lfirst_int(lc); ! Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); ! ! Assert(!attr->attisdropped); ! ! getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); ! fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); ! fmstate->p_nums++; ! } ! } ! ! Assert(fmstate->p_nums <= n_params); resultRelInfo->ri_FdwState = fmstate; } --- 1702,1726 ---- if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; /* Deconstruct fdw_private data. */ ! query = strVal(list_nth(fdw_private, ! FdwModifyPrivateUpdateSql)); ! target_attrs = (List *) list_nth(fdw_private, ! FdwModifyPrivateTargetAttnums); ! has_returning = intVal(list_nth(fdw_private, ! FdwModifyPrivateHasReturning)); ! retrieved_attrs = (List *) list_nth(fdw_private, ! FdwModifyPrivateRetrievedAttrs); ! /* Construct an execution state. */ ! fmstate = create_foreign_modify(mtstate->ps.state, ! resultRelInfo, ! mtstate->operation, ! mtstate->mt_plans[subplan_index]->plan, ! query, ! target_attrs, ! has_returning, ! retrieved_attrs); resultRelInfo->ri_FdwState = fmstate; } *************** *** 2011,2038 **** postgresEndForeignModify(EState *estate, if (fmstate == NULL) return; ! /* If we created a prepared statement, destroy it */ ! if (fmstate->p_name) ! { ! char sql[64]; ! PGresult *res; ! ! snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); ! ! /* ! * We don't use a PG_TRY block here, so be careful not to throw error ! * without releasing the PGresult. ! */ ! res = pgfdw_exec_query(fmstate->conn, sql); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); ! PQclear(res); ! fmstate->p_name = NULL; ! } ! ! /* Release remote connection */ ! ReleaseConnection(fmstate->conn); ! fmstate->conn = NULL; } /* --- 1955,1962 ---- if (fmstate == NULL) return; ! /* Destroy the execution state */ ! finish_foreign_modify(fmstate); } /* *************** *** 3229,3234 **** close_cursor(PGconn *conn, unsigned int cursor_number) --- 3153,3261 ---- } /* + * create_foreign_modify + * Construct an execution state of a foreign insert/update/delete + * operation + */ + static PgFdwModifyState * + create_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + Plan *subplan, + char *query, + List *target_attrs, + bool has_returning, + List *retrieved_attrs) + { + PgFdwModifyState *fmstate; + Relation rel = resultRelInfo->ri_RelationDesc; + TupleDesc tupdesc = RelationGetDescr(rel); + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + UserMapping *user; + AttrNumber n_params; + Oid typefnoid; + bool isvarlena; + ListCell *lc; + + /* Begin constructing PgFdwModifyState. */ + fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); + fmstate->rel = rel; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + table = GetForeignTable(RelationGetRelid(rel)); + user = GetUserMapping(userid, table->serverid); + + /* Open connection; report that we'll create a prepared statement. */ + fmstate->conn = GetConnection(user, true); + fmstate->p_name = NULL; /* prepared statement not made yet */ + + /* Set up remote query information. */ + fmstate->query = query; + fmstate->target_attrs = target_attrs; + fmstate->has_returning = has_returning; + fmstate->retrieved_attrs = retrieved_attrs; + + /* Create context for per-tuple temp workspace. */ + fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_SIZES); + + /* Prepare for input conversion of RETURNING results. */ + if (fmstate->has_returning) + fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + + /* Prepare for output conversion of parameters used in prepared stmt. */ + n_params = list_length(fmstate->target_attrs) + 1; + fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); + fmstate->p_nums = 0; + + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + Assert(subplan != NULL); + + /* Find the ctid resjunk column in the subplan's result */ + fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, + "ctid"); + if (!AttributeNumberIsValid(fmstate->ctidAttno)) + elog(ERROR, "could not find junk ctid column"); + + /* First transmittable parameter will be ctid */ + getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + + if (operation == CMD_INSERT || operation == CMD_UPDATE) + { + /* Set up for remaining transmittable parameters */ + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + Assert(!attr->attisdropped); + + getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + } + + Assert(fmstate->p_nums <= n_params); + + return fmstate; + } + + /* * prepare_foreign_modify * Establish a prepared statement for execution of INSERT/UPDATE/DELETE */ *************** *** 3371,3376 **** store_returning_result(PgFdwModifyState *fmstate, --- 3398,3436 ---- } /* + * finish_foreign_modify + * Release resources for a foreign insert/update/delete operation + */ + static void + finish_foreign_modify(PgFdwModifyState *fmstate) + { + Assert(fmstate != NULL); + + /* If we created a prepared statement, destroy it */ + if (fmstate->p_name) + { + char sql[64]; + PGresult *res; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(fmstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + PQclear(res); + fmstate->p_name = NULL; + } + + /* Release remote connection */ + ReleaseConnection(fmstate->conn); + fmstate->conn = NULL; + } + + /* * build_remote_returning * Build a RETURNING targetlist of a remote query for performing an * UPDATE/DELETE .. RETURNING on a join directly