From 64bcaf49bcb3a435692e95031ed4c86849f31820 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Thu, 2 Jul 2020 15:21:24 +0530 Subject: [PATCH v1] Retry cached remote connections in case if remote backend is killed Remote connections are cached for postgres_fdw in the local backend. There are high chances that the remote backend can no logner be avaiable i.e. it can get killed, the subsequent foreign queries from local backend/session that uses cached connection fails as the remote backend woule have become unavailable/killed. So, this patch, solves this problem, 1. local backend/session uses cached connection, but recieves error 2. upon receiving the first error, it deletes the cached entry 3. try to get new connection at the start of begin remote xact --- contrib/postgres_fdw/connection.c | 100 ++++++++++++++++++++++-------- src/interfaces/libpq/fe-connect.c | 1 + src/interfaces/libpq/libpq-int.h | 2 + 3 files changed, 77 insertions(+), 26 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 52d1fe3563..1298eb3a89 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -16,6 +16,7 @@ #include "access/xact.h" #include "catalog/pg_user_mapping.h" #include "commands/defrem.h" +#include "libpq-int.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -60,6 +61,14 @@ typedef struct ConnCacheEntry uint32 mapping_hashvalue; /* hash value of user mapping OID */ } ConnCacheEntry; +/* Used for retrying remote connections in postgres_fdw. */ +typedef enum +{ + REMOTE_CONNECTION_RETRY_INIT, + CAN_RETRY_REMOTE_CONNECTION, + DO_RETRY_REMOTE_CONNECTION +}PGRemoteRetryConnectionType; + /* * Connection cache (initialized on first use) */ @@ -109,6 +118,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) bool found; ConnCacheEntry *entry; ConnCacheKey key; + uint8 retrycount = 0; /* First time through, initialize connection cache hashtable */ if (ConnectionHash == NULL) @@ -180,35 +190,54 @@ GetConnection(UserMapping *user, bool will_prep_stmt) * connection. (If connect_pg_server throws an error, the cache entry * will remain in a valid empty state, ie conn == NULL.) */ - if (entry->conn == NULL) + while(true) { - ForeignServer *server = GetForeignServer(user->serverid); + if (entry->conn == NULL) + { + ForeignServer *server = GetForeignServer(user->serverid); - /* Reset all transient state fields, to be sure all are clean */ - entry->xact_depth = 0; - entry->have_prep_stmt = false; - entry->have_error = false; - entry->changing_xact_state = false; - entry->invalidated = false; - entry->server_hashvalue = - GetSysCacheHashValue1(FOREIGNSERVEROID, - ObjectIdGetDatum(server->serverid)); - entry->mapping_hashvalue = - GetSysCacheHashValue1(USERMAPPINGOID, - ObjectIdGetDatum(user->umid)); - - /* Now try to make the connection */ - entry->conn = connect_pg_server(server, user); - - elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", - entry->conn, server->servername, user->umid, user->userid); - } + /* Reset all transient state fields, to be sure all are clean */ + entry->xact_depth = 0; + entry->have_prep_stmt = false; + entry->have_error = false; + entry->changing_xact_state = false; + entry->invalidated = false; + entry->server_hashvalue = + GetSysCacheHashValue1(FOREIGNSERVEROID, + ObjectIdGetDatum(server->serverid)); + entry->mapping_hashvalue = + GetSysCacheHashValue1(USERMAPPINGOID, + ObjectIdGetDatum(user->umid)); + + /* Now try to make the connection */ + entry->conn = connect_pg_server(server, user); + + elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", + entry->conn, server->servername, user->umid, user->userid); + } - /* - * Start a new transaction or subtransaction if needed. - */ - begin_remote_xact(entry); + if (retrycount == 0) + entry->conn->remote_retry_conn = CAN_RETRY_REMOTE_CONNECTION; + else + entry->conn->remote_retry_conn = REMOTE_CONNECTION_RETRY_INIT; + /* + * Start a new transaction or subtransaction if needed. + */ + begin_remote_xact(entry); + + retrycount++; + if (entry->conn->remote_retry_conn == DO_RETRY_REMOTE_CONNECTION) + { + disconnect_pg_server(entry); + continue; + } + else + { + entry->conn->remote_retry_conn = REMOTE_CONNECTION_RETRY_INIT; + break; + } + } /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; @@ -457,6 +486,11 @@ do_sql_command(PGconn *conn, const char *sql) if (!PQsendQuery(conn, sql)) pgfdw_report_error(ERROR, NULL, conn, false, sql); res = pgfdw_get_result(conn, sql); + + if (conn->remote_retry_conn == DO_RETRY_REMOTE_CONNECTION && + res == NULL) + return; + if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -490,9 +524,15 @@ begin_remote_xact(ConnCacheEntry *entry) else sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; entry->changing_xact_state = true; + do_sql_command(entry->conn, sql); - entry->xact_depth = 1; + entry->changing_xact_state = false; + + if (entry->conn->remote_retry_conn == DO_RETRY_REMOTE_CONNECTION) + return; + + entry->xact_depth = 1; } /* @@ -617,7 +657,15 @@ pgfdw_get_result(PGconn *conn, const char *query) if (wc & WL_SOCKET_READABLE) { if (!PQconsumeInput(conn)) + { + if (conn->remote_retry_conn == CAN_RETRY_REMOTE_CONNECTION) + { + conn->remote_retry_conn = DO_RETRY_REMOTE_CONNECTION; + return NULL; + } + pgfdw_report_error(ERROR, NULL, conn, false, query); + } } } diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 27c9bb46ee..0385489a37 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -3929,6 +3929,7 @@ makeEmptyPGconn(void) conn->rowBuf = (PGdataValue *) malloc(conn->rowBufLen * sizeof(PGdataValue)); initPQExpBuffer(&conn->errorMessage); initPQExpBuffer(&conn->workBuffer); + conn->remote_retry_conn = 0; if (conn->inBuffer == NULL || conn->outBuffer == NULL || diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 1de91ae295..1a09cea499 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -522,6 +522,8 @@ struct pg_conn * connection */ #endif + uint8 remote_retry_conn; + /* Buffer for current error message */ PQExpBufferData errorMessage; /* expansible string */ -- 2.25.1