From 5dba42e231986d4c0f0f826ab8ddbcd6861524a6 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 8 Jul 2020 17:24:03 +0530 Subject: [PATCH v2] Retry Cached Remote Connections For postgres_fdw Remote connections are cached for postgres_fdw in the local backend. There are high chances that the remote backend can no logner be avaiable i.e. it can get killed, the subsequent foreign queries from local backend/session that uses cached connection fails as the remote backend woule have become unavailable/killed. So, this patch, solves this problem, 1. local backend/session uses cached connection, but recieves error 2. upon receiving the first error, it deletes the cached entry 3. try to get new connection at the start of begin remote xact --- contrib/postgres_fdw/connection.c | 125 +++++++++++++++++++++++++++++- 1 file changed, 121 insertions(+), 4 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 52d1fe3563..e2d4fe0569 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -45,6 +45,19 @@ */ typedef Oid ConnCacheKey; +typedef enum ConnectionRetryType +{ + /* initial value for all cache entries */ + CONN_RETRY_NONE, + /* indicates that the caller is ready to retry connection */ + CONN_RETRY_READY, + /* + * indicates to the caller that the connection may have + * broken, it's okay to retry to get a new connection. + */ + CONN_RETRY_DO +}ConnectionRetryType; + typedef struct ConnCacheEntry { ConnCacheKey key; /* hash key (must be first) */ @@ -58,8 +71,15 @@ typedef struct ConnCacheEntry bool invalidated; /* true if reconnect is pending */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + /* conn retry status, default is CONN_RETRY_NONE for all entries */ + uint8 conn_retry; } ConnCacheEntry; +#define IsRemoteXactBegin(sql) \ + ((strcmp(sql, "START TRANSACTION ISOLATION LEVEL SERIALIZABLE") == 0 || \ + strcmp(sql, "START TRANSACTION ISOLATION LEVEL REPEATABLE READ") == 0) ? \ + 1 : 0) + /* * Connection cache (initialized on first use) */ @@ -109,6 +129,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) bool found; ConnCacheEntry *entry; ConnCacheKey key; + bool dobeginxact = true; /* First time through, initialize connection cache hashtable */ if (ConnectionHash == NULL) @@ -170,10 +191,53 @@ GetConnection(UserMapping *user, bool will_prep_stmt) } /* - * We don't check the health of cached connection here, because it would - * require some overhead. Broken connection will be detected when the - * connection is actually used. + * We check the health of cached connection here, while the remote xact is + * goting to begin. Broken connection will be detected and if so, clear the + * existing conn, get a new connection and resubmit the remote xact begin. + * We retry the broken connections only once during each begin remote xact + * call, still the broken connection exist after that then, the query fails. */ + if (entry->conn != NULL && + entry->xact_depth <= 0 && + entry->conn_retry == CONN_RETRY_NONE) + { + /* + * This is before begin remote xact, so safe to do retry, hence indicate + * that in the cached entry. + */ + entry->conn_retry = CONN_RETRY_READY; + begin_remote_xact(entry); + + /* + * The previously set RETRY_READY status gets changed to RETRY_DO, if there + * exists a broken connection, if so, then clear the entry. + */ + if (entry->conn_retry == CONN_RETRY_DO) + { + elog(DEBUG3, "cached connection %p is broken, hence closing it", entry->conn); + /* + * It is okay to call disconnect here thought the connection is broken. + * No termninate message will actually be sent to remote backend by + * sendTerminateConn() as the connection status will be CONNECTION_BAD. + * So disconnect will just do the clearing and resetting of parameters + * in entry data structure. + */ + disconnect_pg_server(entry); + } + else if (entry->conn_retry == CONN_RETRY_READY) + { + elog(DEBUG3, "cached connection %p is healthy, hence using it", entry->conn); + /* + * Connection is not broken, so remote xact being command is executed + * successfully. Mark that here, to avoid further begin remote xact call + * in this funciton. + */ + dobeginxact = false; + } + + /* Reset the conn_retry to default value. */ + entry->conn_retry = CONN_RETRY_NONE; + } /* * If cache entry doesn't have a connection, we have to establish a new @@ -199,6 +263,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); + entry->conn_retry = CONN_RETRY_NONE; elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", entry->conn, server->servername, user->umid, user->userid); @@ -207,7 +272,8 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* * Start a new transaction or subtransaction if needed. */ - begin_remote_xact(entry); + if (dobeginxact) + begin_remote_xact(entry); /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; @@ -457,6 +523,17 @@ do_sql_command(PGconn *conn, const char *sql) if (!PQsendQuery(conn, sql)) pgfdw_report_error(ERROR, NULL, conn, false, sql); res = pgfdw_get_result(conn, sql); + + /* + * If this function is called from begin_remote_xact and returned res is null + * from pgfdw_get_result, which means that the remote connection is broken, so + * just return from here, the cached entry's retry status would have been set + * appropriately to indicate the broken connection. + */ + if (res == NULL && + IsRemoteXactBegin(sql)) + return; + if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -591,6 +668,7 @@ PGresult * pgfdw_get_result(PGconn *conn, const char *query) { PGresult *volatile last_res = NULL; + bool retry = false; /* In what follows, do not leak any PGresults on an error. */ PG_TRY(); @@ -617,10 +695,49 @@ pgfdw_get_result(PGconn *conn, const char *query) if (wc & WL_SOCKET_READABLE) { if (!PQconsumeInput(conn)) + { + /* + * Do this on error and for only begin remote xact + * commands. + */ + if (IsRemoteXactBegin(query) && + ConnectionHash != NULL) + { + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->conn == conn && + entry->conn_retry == CONN_RETRY_READY) + { + /* got the required connection from the cache. */ + entry->conn_retry = CONN_RETRY_DO; + hash_seq_term(&scan); + retry = true; + break; + } + } + } + + /* if retry to be done, don't report error. */ + if (retry == true) + break; + pgfdw_report_error(ERROR, NULL, conn, false, query); + } } } + if (retry == true) + { + /* clear if there is any previous res. */ + PQclear(last_res); + last_res = NULL; + break; + } + res = PQgetResult(conn); if (res == NULL) break; /* query is complete */ -- 2.25.1