From d8216ac470cb0722c536f1094791ce532dda2e4d Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Fri, 20 Jan 2017 21:20:56 +0100 Subject: [PATCH 1/3] Use asynchronous connect API in libpqwalreceiver This makes the connection attempt from CREATE SUBSCRIPTION and from WalReceiver interruptable by user in case the libpq connection is hanging. The previous coding required immediate shutdown (SIGQUIT) of PostgreSQL in that situation. --- src/backend/postmaster/pgstat.c | 4 +- .../libpqwalreceiver/libpqwalreceiver.c | 51 +++++++++++++++++++++- src/include/pgstat.h | 2 +- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index ada374c..2fb9a8b 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; - case WAIT_EVENT_LIBPQWALRECEIVER_READ: - event_name = "LibPQWalReceiverRead"; + case WAIT_EVENT_LIBPQWALRECEIVER: + event_name = "LibPQWalReceiver"; break; case WAIT_EVENT_WAL_SENDER_WAIT_WAL: event_name = "WalSenderWaitForWAL"; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 44a89c7..9366421 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -112,6 +112,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err) { WalReceiverConn *conn; + PostgresPollingStatusType status; const char *keys[5]; const char *vals[5]; int i = 0; @@ -138,7 +139,53 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, vals[i] = NULL; conn = palloc0(sizeof(WalReceiverConn)); - conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true); + conn->streamConn = PQconnectStartParams(keys, vals, + /* expand_dbname = */ true); + /* Check for conn status. */ + if (PQstatus(conn->streamConn) == CONNECTION_BAD) + { + *err = pstrdup(PQerrorMessage(conn->streamConn)); + return NULL; + } + + /* Poll connection. */ + do + { + int rc; + + /* Determine current state of the connection. */ + status = PQconnectPoll(conn->streamConn); + + /* Sleep a bit if waiting for socket. */ + if (status == PGRES_POLLING_READING || + status == PGRES_POLLING_WRITING) + { + int extra_flag; + + extra_flag = status == PGRES_POLLING_READING ? WL_SOCKET_READABLE : + WL_SOCKET_WRITEABLE; + + ResetLatch(&MyProc->procLatch); + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_POSTMASTER_DEATH | + WL_LATCH_SET | extra_flag, + PQsocket(conn->streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER); + + /* Emergency bailout. */ + if (rc & WL_POSTMASTER_DEATH) + exit(1); + + /* Interrupted. */ + if (rc & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); + } + + /* Otherwise loop until we have OK or FAILED status. */ + } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + + /* Check the status. */ if (PQstatus(conn->streamConn) != CONNECTION_OK) { *err = pstrdup(PQerrorMessage(conn->streamConn)); @@ -521,7 +568,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) WL_LATCH_SET, PQsocket(streamConn), 0, - WAIT_EVENT_LIBPQWALRECEIVER_READ); + WAIT_EVENT_LIBPQWALRECEIVER); if (rc & WL_POSTMASTER_DEATH) exit(1); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 8b710ec..0062fb8 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -764,7 +764,7 @@ typedef enum WAIT_EVENT_CLIENT_WRITE, WAIT_EVENT_SSL_OPEN_SERVER, WAIT_EVENT_WAL_RECEIVER_WAIT_START, - WAIT_EVENT_LIBPQWALRECEIVER_READ, + WAIT_EVENT_LIBPQWALRECEIVER, WAIT_EVENT_WAL_SENDER_WAIT_WAL, WAIT_EVENT_WAL_SENDER_WRITE_DATA } WaitEventClient; -- 2.7.4