From ae91c1271ce36263d46f7f97335866b6646dbb0b Mon Sep 17 00:00:00 2001 From: Mark Dilger Date: Sat, 23 Jan 2021 18:26:15 -0800 Subject: [PATCH v33 5/8] Parameterizing parallel slot result handling The function consumeQueryResult was being used to handle all results returned by queries executed through the parallel slot interface, but this hardcodes knowledge about the expectations of reindexdb and vacuumdb such as the expected result status being PGRES_COMMAND_OK (as opposed to, say, PGRES_TUPLES_OK). Reworking the slot interface to optionally include a PGresultHandler and related fields per slot. The idea is that a caller who executes a command or query through the slot can set the handler to be called when the query completes. The old logic of consumeQueryResults is moved into a new callback function, TableCommandSlotHandler(), which gets registered as the slot handler explicitly from vacuumdb and reindexdb. This is defined in fe_utils/parallel_slot.c rather than somewhere in src/bin/scripts where its only callers reside, partly to keep it close to the rest of the shared parallel slot handling code and partly in anticipation that other utility programs will eventually want to use it also. Adding a default handler which is used to handle results for slots which have no handler explicitly registered. The default simply checks the status of the result and makes a judgement about whether the status is ok, similarly to psql's AcceptResult(). I also considered whether to just have a missing handler always be an error, but decided against requiring users of the parallel slot infrastructure to pedantically specify the default handler. Both designs seem reasonable, but the tie-breaker for me is that edge cases that do not come up in testing will be better handled in production with this design than with pedantically erroring out. The expectation of this commit is that pg_amcheck will have handlers for table and index checks which will process the PGresults of calls to the amcheck functions. This commit sets up the infrastructure necessary to support those handlers being different from the one used by vacuumdb and reindexdb. --- src/bin/scripts/reindexdb.c | 2 + src/bin/scripts/vacuumdb.c | 2 + src/fe_utils/parallel_slot.c | 143 +++++++++++++++++++++------ src/include/fe_utils/parallel_slot.h | 44 +++++++++ 4 files changed, 163 insertions(+), 28 deletions(-) diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c index b03c94f35f..af0cc2bb00 100644 --- a/src/bin/scripts/reindexdb.c +++ b/src/bin/scripts/reindexdb.c @@ -465,6 +465,8 @@ reindex_one_database(const ConnParams *cparams, ReindexType type, goto finish; } + ParallelSlotSetHandler(free_slot, TableCommandSlotHandler, + PGRES_COMMAND_OK, -1, NULL); run_reindex_command(free_slot->connection, process_type, objname, echo, verbose, concurrently, true); diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c index a4f5d545a7..10ab894f10 100644 --- a/src/bin/scripts/vacuumdb.c +++ b/src/bin/scripts/vacuumdb.c @@ -712,6 +712,8 @@ vacuum_one_database(const ConnParams *cparams, * Execute the vacuum. All errors are handled in processQueryResult * through ParallelSlotsGetIdle. */ + ParallelSlotSetHandler(free_slot, TableCommandSlotHandler, + PGRES_COMMAND_OK, -1, sql.data); run_vacuum_command(free_slot->connection, sql.data, echo, tabname); diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c index 3987a4702b..f1e78089e9 100644 --- a/src/fe_utils/parallel_slot.c +++ b/src/fe_utils/parallel_slot.c @@ -30,7 +30,7 @@ static void init_slot(ParallelSlot *slot, PGconn *conn); static int select_loop(int maxFd, fd_set *workerset); -static bool processQueryResult(PGconn *conn, PGresult *result); +static bool handleOneQueryResult(ParallelSlot *slot, PGresult *result); static void init_slot(ParallelSlot *slot, PGconn *conn) @@ -38,53 +38,47 @@ init_slot(ParallelSlot *slot, PGconn *conn) slot->connection = conn; /* Initially assume connection is idle */ slot->isFree = true; + ParallelSlotClearHandler(slot); } /* - * Process (and delete) a query result. Returns true if there's no error, - * false otherwise -- but errors about trying to work on a missing relation - * are reported and subsequently ignored. + * Invoke the slot's handler for a single query result, or fall back to the + * default handler if none is defined for the slot. Returns true if the + * handler reports that there's no error, false otherwise. */ static bool -processQueryResult(PGconn *conn, PGresult *result) +handleOneQueryResult(ParallelSlot *slot, PGresult *result) { - /* - * If it's an error, report it. Errors about a missing table are harmless - * so we continue processing; but die for other errors. - */ - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); + PGresultHandler handler = slot->handler; - pg_log_error("processing of database \"%s\" failed: %s", - PQdb(conn), PQerrorMessage(conn)); + if (!handler) + handler = DefaultSlotHandler; - if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) - { - PQclear(result); - return false; - } - } + /* On failure, the handler should return NULL after freeing the result. */ + if (!handler(result, slot->connection, slot->expected_status, + slot->expected_ntups, slot->query)) + return false; + /* Ok, we have to free it ourself */ PQclear(result); return true; } /* - * Consume all the results generated for the given connection until + * Handle all the results generated for the given connection until * nothing remains. If at least one error is encountered, return false. * Note that this will block if the connection is busy. */ static bool -consumeQueryResult(PGconn *conn) +handleQueryResults(ParallelSlot *slot) { bool ok = true; PGresult *result; - SetCancelConn(conn); - while ((result = PQgetResult(conn)) != NULL) + SetCancelConn(slot->connection); + while ((result = PQgetResult(slot->connection)) != NULL) { - if (!processQueryResult(conn, result)) + if (!handleOneQueryResult(slot, result)) ok = false; } ResetCancelConn(); @@ -227,14 +221,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots) if (result != NULL) { - /* Check and discard the command result */ - if (!processQueryResult(slots[i].connection, result)) + /* Handle and discard the command result */ + if (!handleOneQueryResult(slots + i, result)) return NULL; } else { /* This connection has become idle */ slots[i].isFree = true; + ParallelSlotClearHandler(slots + i); if (firstFree < 0) firstFree = i; break; @@ -329,9 +324,101 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots) for (i = 0; i < numslots; i++) { - if (!consumeQueryResult((slots + i)->connection)) + if (!handleQueryResults(slots + i)) return false; } return true; } + +/* + * DefaultSlotHandler + * default handler of query results for slots with no handler registered. + * + * This gets called when the slot's handler is NULL, but it could also be used + * explicitly. Either way, we do not trust 'expected_status', 'expected_ntups' + * or 'query' fields to have been defined, since a user who neglected to set up + * the handler may well also have neglected to set up these other fields. So + * we ignore them and only consider whether the result status looks like a + * success. + */ +PGresult * +DefaultSlotHandler(PGresult *res, PGconn *conn, ExecStatusType expected_status, + int expected_ntups, const char *query) +{ + switch (PQresultStatus(res)) + { + /* Success codes */ + case PGRES_EMPTY_QUERY: + case PGRES_COMMAND_OK: + case PGRES_TUPLES_OK: + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + case PGRES_COPY_BOTH: + case PGRES_SINGLE_TUPLE: + /* Ok */ + return res; + + /* + * Error codes. + * + * There is no default here, as we want the compiler to warn about + * missing cases. + */ + case PGRES_BAD_RESPONSE: + case PGRES_NONFATAL_ERROR: + case PGRES_FATAL_ERROR: + break; + } + + /* + * Handle all error cases here, including anything not matched in the + * switch (though that should not happen.) The 'query' argument may be + * NULL or garbage left over from a prior usage of the lot. Don't include + * it in the error message! + */ + pg_log_error("processing in database \"%s\" failed: %s", PQdb(conn), + PQerrorMessage(conn)); + PQclear(res); + return NULL; +} + +/* + * TableCommandSlotHandler + * handler for results of commands against tables + * + * Requires that the result status is either PGRES_COMMAND_OK or an error about + * a missing table. This is useful for utilities that compile a list of tables + * to process and then run commands (vacuum, reindex, or whatever) against + * those tables, as there is a race condition between the time the list is + * compiled and the time the command attempts to open the table. + * + * For missing tables, logs an error but allows processing to continue. + * + * For all other errors, logs an error and terminates further processing. + */ +PGresult * +TableCommandSlotHandler(PGresult *res, PGconn *conn, + ExecStatusType expected_status, int expected_ntups, + const char *query) +{ + /* + * If it's an error, report it. Errors about a missing table are harmless + * so we continue processing; but die for other errors. + */ + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE); + + pg_log_error("processing of database \"%s\" failed: %s", + PQdb(conn), PQerrorMessage(conn)); + + if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) + { + PQclear(res); + return NULL; + } + } + + return res; +} diff --git a/src/include/fe_utils/parallel_slot.h b/src/include/fe_utils/parallel_slot.h index 99eeb3328d..007c764067 100644 --- a/src/include/fe_utils/parallel_slot.h +++ b/src/include/fe_utils/parallel_slot.h @@ -13,14 +13,50 @@ #define PARALLEL_SLOT_H #include "fe_utils/connect_utils.h" +#include "fe_utils/pgreshandler.h" #include "libpq-fe.h" typedef struct ParallelSlot { PGconn *connection; /* One connection */ bool isFree; /* Is it known to be idle? */ + + /* + * If a command or query has been issued on 'connection', + * the following fields store our expectations about the + * result we should get back. + */ + PGresultHandler handler; + ExecStatusType expected_status; + int expected_ntups; + + /* + * If not null, should contain the query string for the + * currently executing query, for use by the handler. + */ + const char *query; } ParallelSlot; +static inline void +ParallelSlotSetHandler(ParallelSlot *slot, PGresultHandler handler, + ExecStatusType expected_status, int expected_ntups, + const char *query) +{ + slot->handler = handler; + slot->expected_status = expected_status; + slot->expected_ntups = expected_ntups; + slot->query = query; +} + +static inline void +ParallelSlotClearHandler(ParallelSlot *slot) +{ + slot->handler = NULL; + slot->expected_status = -1; + slot->expected_ntups = -1; + slot->query = NULL; +} + extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots); extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams, @@ -31,5 +67,13 @@ extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots); extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots); +extern PGresult *DefaultSlotHandler(PGresult *res, PGconn *conn, + ExecStatusType expected_status, + int expected_ntups, const char *query); + +extern PGresult *TableCommandSlotHandler(PGresult *res, PGconn *conn, + ExecStatusType expected_status, + int expected_ntups, + const char *query); #endif /* PARALLEL_SLOT_H */ -- 2.21.1 (Apple Git-122.3)