From 1fc2ed184dd218809dd8bf3d2399bc05eddeb9f2 Mon Sep 17 00:00:00 2001 From: Andrey Lepikhov Date: Tue, 8 Sep 2020 14:30:03 +0500 Subject: [PATCH 3/4] Add separated connections into the postgres_fdw. Foreign Copy and someone other may want to use FDW connection that hasn't shared with anyone else. --- contrib/postgres_fdw/connection.c | 26 +++++++++++++------ contrib/postgres_fdw/postgres_fdw.c | 39 ++++++++++++++++------------- contrib/postgres_fdw/postgres_fdw.h | 3 ++- 3 files changed, 43 insertions(+), 25 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 08daf26fdf..048c641e85 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -44,7 +44,11 @@ * ourselves, so that rolling back a subtransaction will kill the right * queries and not the wrong ones. */ -typedef Oid ConnCacheKey; +typedef struct ConnCacheKey +{ + Oid user; + int cid; +} ConnCacheKey; typedef struct ConnCacheEntry { @@ -65,6 +69,7 @@ typedef struct ConnCacheEntry * Connection cache (initialized on first use) */ static HTAB *ConnectionHash = NULL; +static int SeparatedConnNum = 0; /* for assigning cursor numbers and prepared statement numbers */ static unsigned int cursor_number = 0; @@ -105,9 +110,9 @@ static bool UserMappingPasswordRequired(UserMapping *user); * (not even on error), we need this flag to cue manual cleanup. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, bool separate) { - bool found; + bool found = false; ConnCacheEntry *entry; ConnCacheKey key; @@ -141,7 +146,8 @@ GetConnection(UserMapping *user, bool will_prep_stmt) xact_got_connection = true; /* Create hash key for the entry. Assume no pad bytes in key struct */ - key = user->umid; + key.user = user->umid; + key.cid = separate ? ++SeparatedConnNum : 0; /* * Find or create cached entry for requested connection. @@ -870,10 +876,16 @@ pgfdw_xact_callback(XactEvent event, void *arg) */ if (PQstatus(entry->conn) != CONNECTION_OK || PQtransactionStatus(entry->conn) != PQTRANS_IDLE || - entry->changing_xact_state) + entry->changing_xact_state || entry->key.cid > 0) { elog(DEBUG3, "discarding connection %p", entry->conn); disconnect_pg_server(entry); + + if (entry->key.cid > 0) + { + hash_search(ConnectionHash, &entry->key, HASH_REMOVE, NULL); + SeparatedConnNum--; + } } } @@ -1057,9 +1069,9 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) /* find server name to be shown in the message below */ tup = SearchSysCache1(USERMAPPINGOID, - ObjectIdGetDatum(entry->key)); + ObjectIdGetDatum(entry->key.user)); if (!HeapTupleIsValid(tup)) - elog(ERROR, "cache lookup failed for user mapping %u", entry->key); + elog(ERROR, "cache lookup failed for user mapping %u", entry->key.user); umform = (Form_pg_user_mapping) GETSTRUCT(tup); server = GetForeignServer(umform->umserver); ReleaseSysCache(tup); diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9685e731e0..8bca71e3f5 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -438,7 +438,8 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, char *query, List *target_attrs, bool has_returning, - List *retrieved_attrs); + List *retrieved_attrs, + bool separate_conn); static TupleTableSlot *execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, @@ -450,7 +451,7 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); -static void finish_foreign_modify(PgFdwModifyState *fmstate); +static void finish_foreign_modify(PgFdwModifyState *fmstate, bool separate_conn); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); @@ -1445,7 +1446,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, false); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1840,7 +1841,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, query, target_attrs, has_returning, - retrieved_attrs); + retrieved_attrs, + false); resultRelInfo->ri_FdwState = fmstate; } @@ -1916,7 +1918,7 @@ postgresEndForeignModify(EState *estate, return; /* Destroy the execution state */ - finish_foreign_modify(fmstate); + finish_foreign_modify(fmstate, false); } /* @@ -2022,7 +2024,8 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, sql.data, targetAttrs, retrieved_attrs != NIL, - retrieved_attrs); + retrieved_attrs, + false); /* * If the given resultRelInfo already has PgFdwModifyState set, it means @@ -2059,7 +2062,7 @@ postgresEndForeignInsert(EState *estate, fmstate = fmstate->aux_fmstate; /* Destroy the execution state */ - finish_foreign_modify(fmstate); + finish_foreign_modify(fmstate, false); } static PgFdwModifyState *copy_fmstate = NULL; @@ -2103,7 +2106,8 @@ postgresBeginForeignCopy(ModifyTableState *mtstate, sql.data, NIL, false, - NIL); + NIL, + true); fmstate->cstate = BeginCopyTo(NULL, NULL, RelationGetDescr(rel), NULL, InvalidOid, NULL, false, pgfdw_copy_dest_cb, @@ -2126,7 +2130,7 @@ postgresEndForeignCopy(EState *estate, ResultRelInfo *resultRelInfo) CopyToFinish(fmstate->cstate); pfree(fmstate->cstate); fmstate->cstate = NULL; - finish_foreign_modify(fmstate); + finish_foreign_modify(fmstate, true); } /* @@ -2514,7 +2518,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, false); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2888,7 +2892,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, false); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3680,7 +3684,8 @@ create_foreign_modify(EState *estate, char *query, List *target_attrs, bool has_returning, - List *retrieved_attrs) + List *retrieved_attrs, + bool separate_conn) { PgFdwModifyState *fmstate; Relation rel = resultRelInfo->ri_RelationDesc; @@ -3708,7 +3713,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, separate_conn); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -4014,7 +4019,7 @@ store_returning_result(PgFdwModifyState *fmstate, * Release resources for a foreign insert/update/delete operation */ static void -finish_foreign_modify(PgFdwModifyState *fmstate) +finish_foreign_modify(PgFdwModifyState *fmstate, bool separate_conn) { Assert(fmstate != NULL); @@ -4583,7 +4588,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, false); /* * Construct command to get page count for relation. @@ -4669,7 +4674,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, false); /* * Construct cursor that retrieves whole rows from remote. @@ -4897,7 +4902,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, false); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 8fc5ff018f..95cf6487a2 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -129,7 +129,8 @@ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + bool separate); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); -- 2.25.1