diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index ab3226287d..7093a41445 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -59,6 +59,7 @@ typedef struct ConnCacheEntry bool invalidated; /* true if reconnect is pending */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + PgFdwConnState state; /* extra per-connection state */ } ConnCacheEntry; /* @@ -106,7 +107,7 @@ static bool UserMappingPasswordRequired(UserMapping *user); * (not even on error), we need this flag to cue manual cleanup. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { bool found; bool retry = false; @@ -256,6 +257,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; + /* If caller needs access to the per-connection state, return it. */ + if (state) + *state = &entry->state; + return entry->conn; } @@ -282,6 +287,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->mapping_hashvalue = GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(user->umid)); + memset(&entry->state, 0, sizeof(entry->state)); /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 2d88d06358..274a125b81 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -6986,7 +6986,7 @@ INSERT INTO a(aa) VALUES('aaaaa'); INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+------- a | aaa @@ -7014,7 +7014,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7042,7 +7042,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7070,7 +7070,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | newtoo @@ -7098,7 +7098,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) DELETE FROM a; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+---- (0 rows) @@ -7140,35 +7140,40 @@ insert into bar2 values(3,33,33); insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for update; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append -> Seq Scan on public.foo foo_1 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid - -> Foreign Scan on public.foo2 foo_2 + -> Async Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) +(28 rows) -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; f1 | f2 ----+---- 1 | 11 @@ -7178,35 +7183,40 @@ select * from bar where f1 in (select f1 from foo) for update; (4 rows) explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for share; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append -> Seq Scan on public.foo foo_1 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid - -> Foreign Scan on public.foo2 foo_2 + -> Async Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) +(28 rows) -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; f1 | f2 ----+---- 1 | 11 @@ -7238,7 +7248,7 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); -> Append -> Seq Scan on public.foo foo_1 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid - -> Foreign Scan on public.foo2 foo_2 + -> Async Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -> Hash Join @@ -7256,7 +7266,7 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); -> Append -> Seq Scan on public.foo foo_1 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid - -> Foreign Scan on public.foo2 foo_2 + -> Async Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 (39 rows) @@ -7274,6 +7284,7 @@ select tableoid::regclass, * from bar order by 1,2; (6 rows) -- Check UPDATE with inherited target and an appendrel subquery +SET enable_async_append TO false; explain (verbose, costs off) update bar set f2 = f2 + 100 from @@ -7332,6 +7343,7 @@ update bar set f2 = f2 + 100 from ( select f1 from foo union all select f1+3 from foo ) ss where bar.f1 = ss.f1; +RESET enable_async_append; select tableoid::regclass, * from bar order by 1,2; tableoid | f1 | f2 ----------+----+----- @@ -8571,9 +8583,9 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J Sort Sort Key: t1.a, t3.c -> Append - -> Foreign Scan + -> Async Foreign Scan Relations: ((ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)) INNER JOIN (ftprt1_p1 t3_1) - -> Foreign Scan + -> Async Foreign Scan Relations: ((ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)) INNER JOIN (ftprt1_p2 t3_2) (7 rows) @@ -8610,19 +8622,19 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10) -- with whole-row reference; partitionwise join does not apply EXPLAIN (COSTS OFF) SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2; - QUERY PLAN --------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------- Sort Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2) -> Hash Full Join Hash Cond: (t1.a = t2.b) -> Append - -> Foreign Scan on ftprt1_p1 t1_1 - -> Foreign Scan on ftprt1_p2 t1_2 + -> Async Foreign Scan on ftprt1_p1 t1_1 + -> Async Foreign Scan on ftprt1_p2 t1_2 -> Hash -> Append - -> Foreign Scan on ftprt2_p1 t2_1 - -> Foreign Scan on ftprt2_p2 t2_2 + -> Async Foreign Scan on ftprt2_p1 t2_1 + -> Async Foreign Scan on ftprt2_p2 t2_2 (11 rows) SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2; @@ -8652,9 +8664,9 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t Sort Sort Key: t1.a, t1.b -> Append - -> Foreign Scan + -> Async Foreign Scan Relations: (ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1) - -> Foreign Scan + -> Async Foreign Scan Relations: (ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2) (7 rows) @@ -8707,6 +8719,7 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE (14 rows) -- test FOR UPDATE; partitionwise join does not apply +SET enable_async_append TO false; EXPLAIN (COSTS OFF) SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1; QUERY PLAN @@ -8734,6 +8747,7 @@ SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a 400 | 400 (4 rows) +RESET enable_async_append; RESET enable_partitionwise_join; -- =================================================================== -- test partitionwise aggregates @@ -8758,17 +8772,17 @@ ANALYZE fpagg_tab_p3; SET enable_partitionwise_aggregate TO false; EXPLAIN (COSTS OFF) SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1; - QUERY PLAN ------------------------------------------------------------ + QUERY PLAN +----------------------------------------------------------------- Sort Sort Key: pagg_tab.a -> HashAggregate Group Key: pagg_tab.a Filter: (avg(pagg_tab.b) < '22'::numeric) -> Append - -> Foreign Scan on fpagg_tab_p1 pagg_tab_1 - -> Foreign Scan on fpagg_tab_p2 pagg_tab_2 - -> Foreign Scan on fpagg_tab_p3 pagg_tab_3 + -> Async Foreign Scan on fpagg_tab_p1 pagg_tab_1 + -> Async Foreign Scan on fpagg_tab_p2 pagg_tab_2 + -> Async Foreign Scan on fpagg_tab_p3 pagg_tab_3 (9 rows) -- Plan with partitionwise aggregates is enabled @@ -8780,11 +8794,11 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O Sort Sort Key: pagg_tab.a -> Append - -> Foreign Scan + -> Async Foreign Scan Relations: Aggregate on (fpagg_tab_p1 pagg_tab) - -> Foreign Scan + -> Async Foreign Scan Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1) - -> Foreign Scan + -> Async Foreign Scan Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2) (9 rows) diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 9c5aaacc51..60afa37c76 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execAsync.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -36,6 +37,7 @@ #include "optimizer/tlist.h" #include "parser/parsetree.h" #include "postgres_fdw.h" +#include "storage/latch.h" #include "utils/builtins.h" #include "utils/float.h" #include "utils/guc.h" @@ -154,6 +156,11 @@ typedef struct PgFdwScanState int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + /* for asynchronous execution */ + bool async_aware; /* engage async-aware logic? */ + PgFdwConnState *conn_state; /* extra per-connection state */ + ForeignScanState *next_node; /* next ForeignScan node to activate */ + /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -391,6 +398,11 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static void postgresForeignAsyncBegin(AsyncRequest *areq); +static void postgresForeignAsyncConfigureWait(AsyncRequest *areq); +static void postgresForeignAsyncNotify(AsyncRequest *areq); +static void postgresForeignAsyncRequest(AsyncRequest *areq); /* * Helper functions @@ -419,6 +431,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); static void create_cursor(ForeignScanState *node); +static void fetch_more_data_begin(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); static PgFdwModifyState *create_foreign_modify(EState *estate, @@ -559,6 +572,13 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for asynchronous execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ForeignAsyncBegin = postgresForeignAsyncBegin; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + routine->ForeignAsyncNotify = postgresForeignAsyncNotify; + routine->ForeignAsyncRequest = postgresForeignAsyncRequest; + PG_RETURN_POINTER(routine); } @@ -1434,7 +1454,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, &fsstate->conn_state); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1485,6 +1505,12 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); + + /* Initialize async state */ + fsstate->async_aware = node->ss.ps.plan->async_aware; + fsstate->conn_state->curr_node = NULL; + fsstate->conn_state->async_query_sent = false; + fsstate->next_node = NULL; } /* @@ -1510,6 +1536,9 @@ postgresIterateForeignScan(ForeignScanState *node) */ if (fsstate->next_tuple >= fsstate->num_tuples) { + /* In async mode, just clear tuple slot. */ + if (fsstate->async_aware) + return ExecClearTuple(slot); /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) fetch_more_data(node); @@ -1539,6 +1568,14 @@ postgresReScanForeignScan(ForeignScanState *node) char sql[64]; PGresult *res; + /* Reset async state */ + if (fsstate->async_aware) + { + fsstate->conn_state->curr_node = NULL; + fsstate->conn_state->async_query_sent = false; + fsstate->next_node = NULL; + } + /* If we haven't created the cursor yet, nothing to do. */ if (!fsstate->cursor_exists) return; @@ -1597,6 +1634,14 @@ postgresEndForeignScan(ForeignScanState *node) if (fsstate == NULL) return; + /* + * If we're ending before we've collected a response from an asynchronous + * query, we have to consume the response. + */ + if (fsstate->conn_state->curr_node == node && + fsstate->conn_state->async_query_sent) + fetch_more_data(node); + /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) close_cursor(fsstate->conn, fsstate->cursor_number); @@ -2373,7 +2418,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, NULL); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2747,7 +2792,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, NULL); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3384,6 +3429,34 @@ create_cursor(ForeignScanState *node) pfree(buf.data); } +/* + * Begin an asynchronous data fetch. + * fetch_more_data must be called to fetch the results.. + */ +static void +fetch_more_data_begin(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PGconn *conn = fsstate->conn; + char sql[64]; + + Assert(fsstate->conn_state->curr_node == node); + Assert(!fsstate->conn_state->async_query_sent); + + /* Create the cursor synchronously. */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* We will send this query, but not wait for the response. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (PQsendQuery(conn, sql) < 0) + pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query); + + fsstate->conn_state->async_query_sent = true; +} + /* * Fetch some more rows from the node's cursor. */ @@ -3406,17 +3479,36 @@ fetch_more_data(ForeignScanState *node) PG_TRY(); { PGconn *conn = fsstate->conn; - char sql[64]; int numrows; int i; - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); + if (fsstate->async_aware) + { + Assert(fsstate->conn_state->curr_node == node); + Assert(fsstate->conn_state->async_query_sent); - res = pgfdw_exec_query(conn, sql); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = PQgetResult(conn); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } + else + { + char sql[64]; + + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + res = pgfdw_exec_query(conn, sql); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } /* Convert the data into HeapTuples */ numrows = PQntuples(res); @@ -3443,6 +3535,15 @@ fetch_more_data(ForeignScanState *node) /* Must be EOF if we didn't get as many tuples as we asked for. */ fsstate->eof_reached = (numrows < fsstate->fetch_size); + + /* If this was the second part of an async request, we must fetch until NULL. */ + if (fsstate->async_aware) + { + /* call once and raise error if not NULL as expected? */ + while (PQgetResult(conn) != NULL) + ; + fsstate->conn_state->async_query_sent = false; + } } PG_FINALLY(); { @@ -3567,7 +3668,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, NULL); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -4442,7 +4543,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct command to get page count for relation. @@ -4528,7 +4629,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct cursor that retrieves whole rows from remote. @@ -4756,7 +4857,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, NULL); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -6302,6 +6403,170 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, add_path(final_rel, (Path *) final_path); } +/* + * postgresIsForeignPathAsyncCapable + * Check whether a given ForeignPath node is async-capable. + */ +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + return true; +} + +/* + * postgresForeignAsyncBegin + * Begin a data fetch from a foreign PostgreSQL table. + */ +static void +postgresForeignAsyncBegin(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + ForeignScanState *curr_node = fsstate->conn_state->curr_node; + + /* + * If the connection has already been used by another ForeignScan node, + * put this ForeignScan node at the end of the waiting-node chain. + * Otherwise, activate this ForeignScan node now. + */ + if (curr_node) + { + PgFdwScanState *curr_fsstate = (PgFdwScanState *) curr_node->fdw_state; + + /* Scan down the chain ... */ + while (curr_fsstate->next_node) + { + curr_node = curr_fsstate->next_node; + curr_fsstate = (PgFdwScanState *) curr_node->fdw_state; + } + /* Update the chain linking */ + curr_fsstate->next_node = node; + } + else + { + /* Mark the connection as used by the requestee node */ + fsstate->conn_state->curr_node = node; + Assert(!fsstate->conn_state->async_query_sent); + /* Begin a data fetch */ + fetch_more_data_begin(node); + } + + /* Either way mark this ForeignScan node as needing a callback */ + ExecAsyncMarkAsNeedingCallback(areq); +} + +/* + * postgresForeignAsyncConfigureWait + * Configure a file descriptor event for which we wish to wait. + */ +static void +postgresForeignAsyncConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AppendState *requestor = (AppendState *) areq->requestor; + WaitEventSet *set = requestor->as_eventset; + + Assert(areq->callback_pending); + + /* If the ForeignScan node isn't activated, nothing to do */ + if (fsstate->conn_state->curr_node != node) + return; + + AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), + NULL, areq); +} + +/* + * postgresForeignAsyncNotify + * Fetch data we have requested asynchronously. + */ +static void +postgresForeignAsyncNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + + /* The core code would have initialized the callback_pending flag */ + Assert(!areq->callback_pending); + + fetch_more_data(node); +} + +/* + * postgresForeignAsyncRequest + * Asynchronously request next tuple from a foreign PostgreSQL table. + */ +static void +postgresForeignAsyncRequest(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + TupleTableSlot *result; + + /* Get some more tuples, if we've run out */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* No point in another fetch if we already detected EOF, though */ + if (!fsstate->eof_reached) + { + /* Begin another fetch */ + fetch_more_data_begin(node); + /* Mark the ForeignScan node as needing a callback */ + ExecAsyncMarkAsNeedingCallback(areq); + return; + } + fsstate->conn_state->curr_node = NULL; + + /* Activate the next ForeignScan node if any */ + if (fsstate->next_node) + { + /* Mark the connection as used by the next ForeignScan node */ + fsstate->conn_state->curr_node = fsstate->next_node; + Assert(!fsstate->conn_state->async_query_sent); + /* Begin a data fetch */ + fetch_more_data_begin(fsstate->next_node); + } + + /* There's nothing more to do; set the result to a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + return; + } + + /* Get a tuple from the ForeignScan node */ + result = ExecProcNode((PlanState *) node); + + if (TupIsNull(result)) + { + Assert(fsstate->next_tuple >= fsstate->num_tuples); + + /* Get some more tuples, if we've not detected EOF yet */ + if (!fsstate->eof_reached) + { + /* Begin another fetch */ + fetch_more_data_begin(node); + /* Mark the ForeignScan node as needing a callback */ + ExecAsyncMarkAsNeedingCallback(areq); + return; + } + fsstate->conn_state->curr_node = NULL; + + /* Activate the next ForeignScan node if any */ + if (fsstate->next_node) + { + /* Mark the connection as used by the next ForeignScan node */ + fsstate->conn_state->curr_node = fsstate->next_node; + Assert(!fsstate->conn_state->async_query_sent); + /* Begin a data fetch */ + fetch_more_data_begin(fsstate->next_node); + } + } + + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); +} + /* * Create a tuple from the specified row of the PGresult. * diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..ee93262862 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -16,6 +16,7 @@ #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "libpq-fe.h" +#include "nodes/execnodes.h" #include "nodes/pathnodes.h" #include "utils/relcache.h" @@ -124,12 +125,22 @@ typedef struct PgFdwRelationInfo int relation_index; } PgFdwRelationInfo; +/* + * Extra control information relating to a connection. + */ +typedef struct PgFdwConnState +{ + ForeignScanState *curr_node; /* currently activated ForeignScan node */ + bool async_query_sent; /* has an asynchronous query been sent? */ +} PgFdwConnState; + /* in postgres_fdw.c */ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + PgFdwConnState **state); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 7581c5417b..074cdd96ab 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1799,31 +1799,31 @@ INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; DELETE FROM a; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; @@ -1859,12 +1859,12 @@ insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; -- Check UPDATE with inherited target and an inherited source table explain (verbose, costs off) @@ -1874,6 +1874,7 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); select tableoid::regclass, * from bar order by 1,2; -- Check UPDATE with inherited target and an appendrel subquery +SET enable_async_append TO false; explain (verbose, costs off) update bar set f2 = f2 + 100 from @@ -1883,6 +1884,7 @@ update bar set f2 = f2 + 100 from ( select f1 from foo union all select f1+3 from foo ) ss where bar.f1 = ss.f1; +RESET enable_async_append; select tableoid::regclass, * from bar order by 1,2; @@ -2492,9 +2494,11 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE a % 25 = 0) t1 FULL JOIN (SELECT 't2_phv' phv, * FROM fprt2 WHERE b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY t1.a, t2.b; -- test FOR UPDATE; partitionwise join does not apply +SET enable_async_append TO false; EXPLAIN (COSTS OFF) SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1; SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1; +RESET enable_async_append; RESET enable_partitionwise_join; diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a632cf98ba..24a9e014da 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4704,6 +4704,20 @@ ANY num_sync ( + enable_async_append (boolean) + + enable_async_append configuration parameter + + + + + Enables or disables the query planner's use of async-aware + append plan types. The default is on. + + + + enable_bitmapscan (boolean) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 98e1995453..1328df533a 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1557,6 +1557,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + AppendReady + Waiting for a subplan of Append to be ready. + BackupWaitWalArchive Waiting for WAL files required for a backup to be successfully diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 43f9b01e83..d4530bda22 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1374,6 +1374,8 @@ ExplainNode(PlanState *planstate, List *ancestors, } if (plan->parallel_aware) appendStringInfoString(es->str, "Parallel "); + if (plan->async_aware) + appendStringInfoString(es->str, "Async "); appendStringInfoString(es->str, pname); es->indent++; } @@ -1393,6 +1395,7 @@ ExplainNode(PlanState *planstate, List *ancestors, if (custom_name) ExplainPropertyText("Custom Plan Provider", custom_name, es); ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es); + ExplainPropertyBool("Async Aware", plan->async_aware, es); } switch (nodeTag(plan)) diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index f990c6473a..1004647d4f 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ + execAsync.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index e2154ba86a..1848d58eda 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -521,6 +521,10 @@ ExecSupportsBackwardScan(Plan *node) { ListCell *l; + /* With async, tuples may be interleaved, so can't back up. */ + if (((Append *) node)->nasyncplans != 0) + return false; + foreach(l, ((Append *) node)->appendplans) { if (!ExecSupportsBackwardScan((Plan *) lfirst(l))) diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index e69de29bb2..f29d450d27 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -0,0 +1,120 @@ +/*------------------------------------------------------------------------- + * + * execAsync.c + * Support routines for asynchronous execution + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execAsync.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execAsync.h" +#include "executor/nodeForeignscan.h" + +/* + * Begin execution of a designed async-aware node. + */ +void +ExecAsyncBegin(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanBegin(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } +} + +/* + * Give the asynchronous node a chance to configure the file descriptor event + * for which it wishes to wait. We expect the node-type specific callback to + * make a sigle call of the following form: + * + * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq); + */ +void +ExecAsyncConfigureWait(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanConfigureWait(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } +} + +/* + * Call the asynchronous node back when a relevant event has occurred. + */ +void +ExecAsyncNotify(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanNotify(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } +} + +/* + * Asynchronously request a tuple from the asynchronous node. + */ +void +ExecAsyncRequest(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanRequest(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } +} + +/* + * A requestee node should call this function to indicate that it needs a + * callback to deliver tuples to its requestor node. The node can call this + * from its ExecAsyncBegin, ExecAsyncNotify, or ExecAsyncRequest callback. + */ +void +ExecAsyncMarkAsNeedingCallback(AsyncRequest *areq) +{ + areq->callback_pending = true; + areq->request_complete = false; + areq->result = NULL; +} + +/* + * A requestee node should call this function to deliver the tuple to its + * requestor node. The node can call this from its ExecAsyncRequest callback + * if the requested tuple is available immediately. + */ +void +ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result) +{ + areq->callback_pending = false; + areq->request_complete = true; + areq->result = result; +} diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 88919e62fa..d0969745a4 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -57,10 +57,13 @@ #include "postgres.h" +#include "executor/execAsync.h" #include "executor/execdebug.h" #include "executor/execPartition.h" #include "executor/nodeAppend.h" #include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" /* Shared state for parallel-aware Append. */ struct ParallelAppendState @@ -78,12 +81,22 @@ struct ParallelAppendState }; #define INVALID_SUBPLAN_INDEX -1 +#define EVENT_BUFFER_SIZE 16 + +#define ExecAppendAsyncDone(node) \ + (bms_is_empty((node)->as_needrequest) && \ + bms_is_empty((node)->as_asyncpending)) static TupleTableSlot *ExecAppend(PlanState *pstate); static bool choose_next_subplan_locally(AppendState *node); static bool choose_next_subplan_for_leader(AppendState *node); static bool choose_next_subplan_for_worker(AppendState *node); static void mark_invalid_subplans_as_finished(AppendState *node); +static void ExecAppendAsyncBegin(AppendState *node); +static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result); +static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result); +static bool ExecAppendAsyncResponse(AsyncRequest *areq); +static void ExecAppendAsyncEventWait(AppendState *node); /* ---------------------------------------------------------------- * ExecInitAppend @@ -102,7 +115,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags) AppendState *appendstate = makeNode(AppendState); PlanState **appendplanstates; Bitmapset *validsubplans; + Bitmapset *asyncplans; int nplans; + int nasyncplans; int firstvalid; int i, j; @@ -119,6 +134,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* Let choose_next_subplan_* function handle setting the first subplan */ appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; + appendstate->as_syncdone = false; /* If run-time partition pruning is enabled, then set that up now */ if (node->part_prune_info != NULL) @@ -191,12 +207,24 @@ ExecInitAppend(Append *node, EState *estate, int eflags) * While at it, find out the first valid partial plan. */ j = 0; + asyncplans = NULL; + nasyncplans = 0; firstvalid = nplans; i = -1; while ((i = bms_next_member(validsubplans, i)) >= 0) { Plan *initNode = (Plan *) list_nth(node->appendplans, i); + /* + * Record async subplans. When executing EvalPlanQual, we process + * async subplans synchronously, so don't do this in that case. + */ + if (initNode->async_aware && estate->es_epq_active == NULL) + { + asyncplans = bms_add_member(asyncplans, j); + nasyncplans++; + } + /* * Record the lowest appendplans index which is a valid partial plan. */ @@ -210,6 +238,37 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->appendplans = appendplanstates; appendstate->as_nplans = nplans; + /* Initialize async state */ + appendstate->as_asyncplans = asyncplans; + appendstate->as_nasyncplans = nasyncplans; + appendstate->as_lastasyncplan = INVALID_SUBPLAN_INDEX; + appendstate->as_needrequest = NULL; + appendstate->as_asyncpending = NULL; + appendstate->as_asyncrequests = NULL; + appendstate->as_eventset = NULL; + + if (nasyncplans > 0) + { + appendstate->as_asyncrequests = (AsyncRequest **) + palloc0(nplans * sizeof(AsyncRequest *)); + + i = -1; + while ((i = bms_next_member(asyncplans, i)) >= 0) + { + AsyncRequest *areq; + + areq = palloc(sizeof(AsyncRequest)); + areq->requestor = (PlanState *) appendstate; + areq->requestee = appendplanstates[i]; + areq->request_index = i; + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + + appendstate->as_asyncrequests[i] = areq; + } + } + /* * Miscellaneous initialization */ @@ -232,31 +291,45 @@ static TupleTableSlot * ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + TupleTableSlot *result; - if (node->as_whichplan < 0) + if (!node->as_syncdone && node->as_whichplan == INVALID_SUBPLAN_INDEX) { /* Nothing to do if there are no subplans */ if (node->as_nplans == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); + /* If there are any async subplans, begin execution of them */ + if (node->as_nasyncplans > 0) + ExecAppendAsyncBegin(node); + /* - * If no subplan has been chosen, we must choose one before + * If no sync subplan has been chosen, we must choose one before * proceeding. */ - if (node->as_whichplan == INVALID_SUBPLAN_INDEX && - !node->choose_next_subplan(node)) + if (!node->choose_next_subplan(node) && ExecAppendAsyncDone(node)) return ExecClearTuple(node->ps.ps_ResultTupleSlot); } for (;;) { PlanState *subnode; - TupleTableSlot *result; CHECK_FOR_INTERRUPTS(); /* - * figure out which subplan we are currently processing + * try to get a tuple from async subplans + */ + if (!bms_is_empty(node->as_needrequest) || + (node->as_syncdone && !bms_is_empty(node->as_asyncpending))) + { + if (ExecAppendAsyncGetNext(node, &result)) + return result; + Assert(bms_is_empty(node->as_needrequest)); + } + + /* + * figure out which sync subplan we are currently processing */ Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); subnode = node->appendplans[node->as_whichplan]; @@ -276,8 +349,16 @@ ExecAppend(PlanState *pstate) return result; } - /* choose new subplan; if none, we're done */ - if (!node->choose_next_subplan(node)) + /* wait or poll async events */ + if (!bms_is_empty(node->as_asyncpending)) + { + Assert(!node->as_syncdone); + Assert(bms_is_empty(node->as_needrequest)); + ExecAppendAsyncEventWait(node); + } + + /* choose new sync subplan; if no sync/async subplans, we're done */ + if (!node->choose_next_subplan(node) && ExecAppendAsyncDone(node)) return ExecClearTuple(node->ps.ps_ResultTupleSlot); } } @@ -313,6 +394,7 @@ ExecEndAppend(AppendState *node) void ExecReScanAppend(AppendState *node) { + int nasyncplans = node->as_nasyncplans; int i; /* @@ -347,8 +429,29 @@ ExecReScanAppend(AppendState *node) ExecReScan(subnode); } + /* Reset async state */ + node->as_lastasyncplan = INVALID_SUBPLAN_INDEX; + bms_free(node->as_needrequest); + node->as_needrequest = NULL; + bms_free(node->as_asyncpending); + node->as_asyncpending = NULL; + + if (nasyncplans > 0) + { + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + } + } + /* Let choose_next_subplan_* function handle setting the first subplan */ node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_syncdone = false; } /* ---------------------------------------------------------------- @@ -429,7 +532,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) /* ---------------------------------------------------------------- * choose_next_subplan_locally * - * Choose next subplan for a non-parallel-aware Append, + * Choose next sync subplan for a non-parallel-aware Append, * returning false if there are no more. * ---------------------------------------------------------------- */ @@ -444,9 +547,9 @@ choose_next_subplan_locally(AppendState *node) /* * If first call then have the bms member function choose the first valid - * subplan by initializing whichplan to -1. If there happen to be no - * valid subplans then the bms member function will handle that by - * returning a negative number which will allow us to exit returning a + * sync subplan by initializing whichplan to -1. If there happen to be + * no valid sync subplans then the bms member function will handle that + * by returning a negative number which will allow us to exit returning a * false value. */ if (whichplan == INVALID_SUBPLAN_INDEX) @@ -467,7 +570,10 @@ choose_next_subplan_locally(AppendState *node) nextplan = bms_prev_member(node->as_valid_subplans, whichplan); if (nextplan < 0) + { + node->as_syncdone = true; return false; + } node->as_whichplan = nextplan; @@ -709,3 +815,292 @@ mark_invalid_subplans_as_finished(AppendState *node) node->as_pstate->pa_finished[i] = true; } } + +/* ---------------------------------------------------------------- + * ExecAppendAsyncBegin + * + * Begin execution of designed async-aware nodes. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncBegin(AppendState *node) +{ + Bitmapset *valid_asyncplans; + int i; + + /* We should never be called when there are no async subplans. */ + Assert(node->as_nasyncplans > 0); + + if (node->as_valid_subplans == NULL) + { + node->as_valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + } + + /* Nothing to do if there are no valid async subplans */ + if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans)) + return; + + /* Get valid async subplans. */ + valid_asyncplans = bms_copy(node->as_asyncplans); + valid_asyncplans = bms_int_members(valid_asyncplans, + node->as_valid_subplans); + + /* Adjust the node's as_valid_suplans to only contain sync subplans. */ + node->as_valid_subplans = bms_del_members(node->as_valid_subplans, + valid_asyncplans); + + /* Allow async-aware nodes to perform additional initialization. */ + i = -1; + while ((i = bms_next_member(valid_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + Assert(areq->request_index == i); + Assert(!areq->callback_pending); + + /* Perform the actual callback. */ + ExecAsyncBegin(areq); + + /* + * If the callback_pending flag is kept false, the node would be + * ready for a request. Otherwise, it would needs a callback. + */ + if (!areq->callback_pending) + node->as_needrequest = bms_add_member(node->as_needrequest, i); + else + node->as_asyncpending = bms_add_member(node->as_asyncpending, i); + } + bms_free(valid_asyncplans); +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncGetNext + * + * Retrieve a tuple from asynchronous subplans. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) +{ + *result = NULL; + + /* Request a tuple asynchronously. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + while (!bms_is_empty(node->as_asyncpending)) + { + CHECK_FOR_INTERRUPTS(); + + /* Wait or poll async events. */ + ExecAppendAsyncEventWait(node); + + /* Request a tuple asynchronously. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + /* Break from loop if there is any sync subplan not complete */ + if (!node->as_syncdone) + break; + } + + /* + * If all sync subplans are complete, we're totally done scanning the + * given node. Otherwise, we're done with the asynchronous stuff but + * must continue scanning the sync subplans. + */ + if (node->as_syncdone) + { + Assert(ExecAppendAsyncDone(node)); + *result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncRequest + * + * Retrieve a tuple from ready subplans if any. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) +{ + /* Nothing to do if there are no ready subplans. */ + if (bms_is_empty(node->as_needrequest)) + return false; + + /* Asynchronously request a tuple from last ready subplan if any. */ + if (node->as_lastasyncplan != INVALID_SUBPLAN_INDEX) + { + int i = node->as_lastasyncplan; + AsyncRequest *areq = node->as_asyncrequests[i]; + + Assert(bms_is_member(i, node->as_needrequest)); + + /* Perform the actual callback. */ + ExecAsyncRequest(areq); + if (ExecAppendAsyncResponse(areq)) + { + Assert(!TupIsNull(areq->result)); + *result = areq->result; + return true; + } + } + + /* Likewise for the other ready subplans if any. */ + if (!bms_is_empty(node->as_needrequest)) + { + Bitmapset *needrequest = bms_copy(node->as_needrequest); + int i = -1; + + while ((i = bms_next_member(needrequest, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + /* Perform the actual callback. */ + ExecAsyncRequest(areq); + if (ExecAppendAsyncResponse(areq)) + { + Assert(!TupIsNull(areq->result)); + *result = areq->result; + bms_free(needrequest); + return true; + } + } + + Assert(bms_is_empty(node->as_needrequest)); + bms_free(needrequest); + return false; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncResponse + * + * Process a response from an asynchronous request we made. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncResponse(AsyncRequest *areq) +{ + AppendState *node = (AppendState *) areq->requestor; + TupleTableSlot *slot = areq->result; + + /* The result should be a TupleTableSlot or NULL. */ + Assert(slot == NULL || IsA(slot, TupleTableSlot)); + + if (!areq->request_complete) + { + /* The result should be NULL. */ + Assert(slot == NULL); + /* The requestee node would need a callback. */ + Assert(areq->callback_pending); + bms_del_member(node->as_needrequest, areq->request_index); + node->as_asyncpending = bms_add_member(node->as_asyncpending, + areq->request_index); + return false; + } + + /* If the result is NULL or an empty slot, there's nothing more to do. */ + if (TupIsNull(slot)) + { + bms_del_member(node->as_needrequest, areq->request_index); + node->as_lastasyncplan = INVALID_SUBPLAN_INDEX; + return false; + } + + /* + * Remember the subplan so that ExecAppendAsyncRequest will keep trying + * the subplan first until it stops delivering tuples to us. + */ + node->as_lastasyncplan = areq->request_index; + return true; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncEventWait + * + * Wait or poll for file descriptor wait events and fire callbacks. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncEventWait(AppendState *node) +{ + long timeout = node->as_syncdone ? -1 : 0; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + int noccurred; + int i; + + /* Nothing to do if there are no pending subplans. */ + if (bms_is_empty(node->as_asyncpending)) + return; + + node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, + node->as_nasyncplans + 1); + AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* Give each waiting node a chance to add a wait event. */ + i = -1; + while ((i = bms_next_member(node->as_asyncpending, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + ExecAsyncConfigureWait(areq); + } + + /* Wait for at least one event to occur. */ + noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, + EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY); + FreeWaitEventSet(node->as_eventset); + node->as_eventset = NULL; + if (noccurred == 0) + return; + + /* Deliver notifications. */ + for (i = 0; i < noccurred; i++) + { + WaitEvent *w = &occurred_event[i]; + + /* + * Each waiting node should have registered its wait event with + * user_data pointing back to its AsyncRequest. + */ + if ((w->events & WL_SOCKET_READABLE) != 0) + { + AsyncRequest *areq = (AsyncRequest *) w->user_data; + int request_index = areq->request_index; + + Assert(areq->callback_pending); + Assert(bms_is_member(request_index, node->as_asyncpending)); + + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + areq->callback_pending = false; + bms_del_member(node->as_asyncpending, request_index); + + /* Perform the actual callback. */ + ExecAsyncNotify(areq); + + /* + * If the callback_pending flag is kept false, the node would be + * ready for a request. Otherwise, it would need a callback. + */ + if (!areq->callback_pending) + node->as_needrequest = bms_add_member(node->as_needrequest, + request_index); + else + node->as_asyncpending = bms_add_member(node->as_asyncpending, + request_index); + } + } +} diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 0b20f94035..4caecdb78a 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -391,3 +391,67 @@ ExecShutdownForeignScan(ForeignScanState *node) if (fdwroutine->ShutdownForeignScan) fdwroutine->ShutdownForeignScan(node); } + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanBegin + * + * Begin execution of a designed async-aware node + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanBegin(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncBegin != NULL); + fdwroutine->ForeignAsyncBegin(areq); +} + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanConfigureWait + * + * In async mode, configure for a wait + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncConfigureWait != NULL); + fdwroutine->ForeignAsyncConfigureWait(areq); +} + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanNotify + * + * Callback invoked when a relevant event has occurred + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncNotify != NULL); + fdwroutine->ForeignAsyncNotify(areq); +} + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanRequest + * + * Request a tuple asynchronously + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanRequest(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncRequest != NULL); + fdwroutine->ForeignAsyncRequest(areq); +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 5a591d0a75..ebc1f013a8 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -120,6 +120,7 @@ CopyPlanFields(const Plan *from, Plan *newnode) COPY_SCALAR_FIELD(plan_width); COPY_SCALAR_FIELD(parallel_aware); COPY_SCALAR_FIELD(parallel_safe); + COPY_SCALAR_FIELD(async_aware); COPY_SCALAR_FIELD(plan_node_id); COPY_NODE_FIELD(targetlist); COPY_NODE_FIELD(qual); @@ -241,6 +242,7 @@ _copyAppend(const Append *from) */ COPY_BITMAPSET_FIELD(apprelids); COPY_NODE_FIELD(appendplans); + COPY_SCALAR_FIELD(nasyncplans); COPY_SCALAR_FIELD(first_partial_plan); COPY_NODE_FIELD(part_prune_info); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 4504b1503b..e9ad2a0803 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -333,6 +333,7 @@ _outPlanInfo(StringInfo str, const Plan *node) WRITE_INT_FIELD(plan_width); WRITE_BOOL_FIELD(parallel_aware); WRITE_BOOL_FIELD(parallel_safe); + WRITE_BOOL_FIELD(async_aware); WRITE_INT_FIELD(plan_node_id); WRITE_NODE_FIELD(targetlist); WRITE_NODE_FIELD(qual); @@ -431,6 +432,7 @@ _outAppend(StringInfo str, const Append *node) WRITE_BITMAPSET_FIELD(apprelids); WRITE_NODE_FIELD(appendplans); + WRITE_INT_FIELD(nasyncplans); WRITE_INT_FIELD(first_partial_plan); WRITE_NODE_FIELD(part_prune_info); } diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index ab7b535caa..5bb77d00aa 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1571,6 +1571,7 @@ ReadCommonPlan(Plan *local_node) READ_INT_FIELD(plan_width); READ_BOOL_FIELD(parallel_aware); READ_BOOL_FIELD(parallel_safe); + READ_BOOL_FIELD(async_aware); READ_INT_FIELD(plan_node_id); READ_NODE_FIELD(targetlist); READ_NODE_FIELD(qual); @@ -1667,6 +1668,7 @@ _readAppend(void) READ_BITMAPSET_FIELD(apprelids); READ_NODE_FIELD(appendplans); + READ_INT_FIELD(nasyncplans); READ_INT_FIELD(first_partial_plan); READ_NODE_FIELD(part_prune_info); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index f1dfdc1a4a..4eadca0f50 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -147,6 +147,7 @@ bool enable_partitionwise_aggregate = false; bool enable_parallel_append = true; bool enable_parallel_hash = true; bool enable_partition_pruning = true; +bool enable_async_append = true; typedef struct { diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 40abe6f9f6..bc43d6f14d 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals); static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan, List *gating_quals); static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path); +static bool is_async_capable_path(Path *path); static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags); static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, @@ -1066,6 +1067,30 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path) return plan; } +/* + * is_projection_capable_path + * Check whether a given Path node is async-capable. + */ +static bool +is_async_capable_path(Path *path) +{ + switch (nodeTag(path)) + { + case T_ForeignPath: + { + FdwRoutine *fdwroutine = path->parent->fdwroutine; + + Assert(fdwroutine != NULL); + if (fdwroutine->IsForeignPathAsyncCapable != NULL && + fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path)) + return true; + } + default: + break; + } + return false; +} + /* * create_append_plan * Create an Append plan for 'best_path' and (recursively) plans @@ -1083,6 +1108,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) List *pathkeys = best_path->path.pathkeys; List *subplans = NIL; ListCell *subpaths; + int nasyncplans = 0; RelOptInfo *rel = best_path->path.parent; PartitionPruneInfo *partpruneinfo = NULL; int nodenumsortkeys = 0; @@ -1220,6 +1246,17 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } subplans = lappend(subplans, subplan); + + if (enable_async_append) + { + /* Determine whether the subplan can be executed asynchronously */ + if (pathkeys == NIL && !best_path->path.parallel_safe && + is_async_capable_path(subpath)) + { + subplan->async_aware = true; + ++nasyncplans; + } + } } /* @@ -1254,6 +1291,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } plan->appendplans = subplans; + plan->nasyncplans = nasyncplans; plan->first_partial_plan = best_path->first_partial_path; plan->part_prune_info = partpruneinfo; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index e76e627c6b..57d6d933ed 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3898,6 +3898,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) switch (w) { + case WAIT_EVENT_APPEND_READY: + event_name = "AppendReady"; + break; case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE: event_name = "BackupWaitWalArchive"; break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index bb34630e8e..0347eedd33 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1108,6 +1108,16 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of async append plans."), + NULL, + GUC_EXPLAIN + }, + &enable_async_append, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, gettext_noop("Enables genetic query optimization."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 9cb571f7cc..c9de4a1b63 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -371,6 +371,7 @@ #enable_partitionwise_aggregate = off #enable_parallel_hash = on #enable_partition_pruning = on +#enable_async_append = on # - Planner Cost Constants - diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h index e69de29bb2..0831726798 100644 --- a/src/include/executor/execAsync.h +++ b/src/include/executor/execAsync.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * execAsync.h + * Support functions for asynchronous execution + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execAsync.h + *------------------------------------------------------------------------- + */ + +#ifndef EXECASYNC_H +#define EXECASYNC_H + +#include "nodes/execnodes.h" + +extern void ExecAsyncBegin(AsyncRequest *areq); +extern void ExecAsyncConfigureWait(AsyncRequest *areq); +extern void ExecAsyncNotify(AsyncRequest *areq); +extern void ExecAsyncRequest(AsyncRequest *areq); +extern void ExecAsyncMarkAsNeedingCallback(AsyncRequest *areq); +extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result); + +#endif /* EXECASYNC_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 326d713ebf..e935a428e3 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -31,4 +31,9 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownForeignScan(ForeignScanState *node); +extern void ExecAsyncForeignScanBegin(AsyncRequest *areq); +extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq); +extern void ExecAsyncForeignScanNotify(AsyncRequest *areq); +extern void ExecAsyncForeignScanRequest(AsyncRequest *areq); + #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 95556dfb15..558b9ce30e 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -170,6 +170,16 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path); + +typedef void (*ForeignAsyncBegin_function) (AsyncRequest *areq); + +typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq); + +typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq); + +typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq); + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -246,6 +256,13 @@ typedef struct FdwRoutine /* Support functions for path reparameterization. */ ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild; + + /* Support functions for asynchronous execution */ + IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable; + ForeignAsyncBegin_function ForeignAsyncBegin; + ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait; + ForeignAsyncNotify_function ForeignAsyncNotify; + ForeignAsyncRequest_function ForeignAsyncRequest; } FdwRoutine; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 6c0a7d68d6..ac4e459de6 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -502,6 +502,22 @@ typedef struct ResultRelInfo struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer; } ResultRelInfo; +/* ---------------- + * AsyncRequest + * + * State for an asynchronous tuple request. + * ---------------- + */ +typedef struct AsyncRequest +{ + struct PlanState *requestor; /* Node that wants a tuple */ + struct PlanState *requestee; /* Node from which a tuple is wanted */ + int request_index; /* Scratch space for requestor */ + bool callback_pending; /* Callback is needed */ + bool request_complete; /* Request complete, result valid */ + TupleTableSlot *result; /* Result (NULL if no more tuples) */ +} AsyncRequest; + /* ---------------- * EState information * @@ -1218,6 +1234,15 @@ struct AppendState PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; int as_whichplan; + bool as_syncdone; /* all synchronous plans done? */ + Bitmapset *as_asyncplans; /* asynchronous plans indexes */ + int as_nasyncplans; /* # of asynchronous plans */ + int as_lastasyncplan; /* last async plan delivering a tuple */ + Bitmapset *as_needrequest; /* async plans ready for a request */ + Bitmapset *as_asyncpending; /* async plans needing a callback */ + AsyncRequest **as_asyncrequests; /* array of AsyncRequests */ + struct WaitEventSet *as_eventset; /* WaitEventSet used to configure + * file descriptor wait events */ int as_first_partial_plan; /* Index of 'appendplans' containing * the first partial plan */ ParallelAppendState *as_pstate; /* parallel coordination info */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 7e6b10f86b..9f6ac35551 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -129,6 +129,11 @@ typedef struct Plan bool parallel_aware; /* engage parallel-aware logic? */ bool parallel_safe; /* OK to use as part of parallel plan? */ + /* + * information needed for asynchronous execution + */ + bool async_aware; /* engage async-aware logic? */ + /* * Common structural data for all Plan types. */ @@ -245,6 +250,7 @@ typedef struct Append Plan plan; Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */ List *appendplans; + int nasyncplans; /* # of async plans, always at start of list */ /* * All 'appendplans' preceding this index are non-partial plans. All diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 6141654e47..107e57bb7c 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate; extern PGDLLIMPORT bool enable_parallel_append; extern PGDLLIMPORT bool enable_parallel_hash; extern PGDLLIMPORT bool enable_partition_pruning; +extern PGDLLIMPORT bool enable_async_append; extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 257e515bfe..d4a2d580ca 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -917,6 +917,7 @@ typedef enum */ typedef enum { + WAIT_EVENT_APPEND_READY, WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC, WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out index dc7ab2ce8b..760847dd2a 100644 --- a/src/test/regress/expected/explain.out +++ b/src/test/regress/expected/explain.out @@ -87,6 +87,7 @@ select explain_filter('explain (analyze, buffers, format json) select * from int "Plan": { + "Node Type": "Seq Scan", + "Parallel Aware": false, + + "Async Aware": false, + "Relation Name": "int8_tbl",+ "Alias": "i8", + "Startup Cost": N.N, + @@ -136,6 +137,7 @@ select explain_filter('explain (analyze, buffers, format xml) select * from int8 + Seq Scan + false + + false + int8_tbl + i8 + N.N + @@ -183,6 +185,7 @@ select explain_filter('explain (analyze, buffers, format yaml) select * from int - Plan: + Node Type: "Seq Scan" + Parallel Aware: false + + Async Aware: false + Relation Name: "int8_tbl"+ Alias: "i8" + Startup Cost: N.N + @@ -233,6 +236,7 @@ select explain_filter('explain (buffers, format json) select * from int8_tbl i8' "Plan": { + "Node Type": "Seq Scan", + "Parallel Aware": false, + + "Async Aware": false, + "Relation Name": "int8_tbl",+ "Alias": "i8", + "Startup Cost": N.N, + @@ -346,6 +350,7 @@ select jsonb_pretty( "Plan Width": 0, + "Total Cost": 0.0, + "Actual Rows": 0, + + "Async Aware": false, + "Actual Loops": 0, + "Startup Cost": 0.0, + "Relation Name": "tenk1", + @@ -391,6 +396,7 @@ select jsonb_pretty( "Plan Width": 0, + "Total Cost": 0.0, + "Actual Rows": 0, + + "Async Aware": false, + "Actual Loops": 0, + "Startup Cost": 0.0, + "Parallel Aware": false, + @@ -433,6 +439,7 @@ select jsonb_pretty( "Plan Width": 0, + "Total Cost": 0.0, + "Actual Rows": 0, + + "Async Aware": false, + "Actual Loops": 0, + "Startup Cost": 0.0, + "Parallel Aware": false, + diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out index 7cf2eee7c1..c97a7d0b89 100644 --- a/src/test/regress/expected/incremental_sort.out +++ b/src/test/regress/expected/incremental_sort.out @@ -557,6 +557,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from ], + "Node Type": "Incremental Sort", + "Actual Rows": 55, + + "Async Aware": false, + "Actual Loops": 1, + "Presorted Key": [ + "t.a" + @@ -733,6 +734,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from ], + "Node Type": "Incremental Sort", + "Actual Rows": 70, + + "Async Aware": false, + "Actual Loops": 1, + "Presorted Key": [ + "t.a" + diff --git a/src/test/regress/expected/insert_conflict.out b/src/test/regress/expected/insert_conflict.out index ff157ceb1c..1b5e1d42aa 100644 --- a/src/test/regress/expected/insert_conflict.out +++ b/src/test/regress/expected/insert_conflict.out @@ -204,6 +204,7 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb "Node Type": "ModifyTable", + "Operation": "Insert", + "Parallel Aware": false, + + "Async Aware": false, + "Relation Name": "insertconflicttest", + "Alias": "insertconflicttest", + "Conflict Resolution": "UPDATE", + @@ -213,7 +214,8 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb { + "Node Type": "Result", + "Parent Relationship": "Member", + - "Parallel Aware": false + + "Parallel Aware": false, + + "Async Aware": false + } + ] + } + diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 81bdacf59d..b7818c0637 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -88,6 +88,7 @@ select count(*) = 1 as ok from pg_stat_wal; select name, setting from pg_settings where name like 'enable%'; name | setting --------------------------------+--------- + enable_async_append | on enable_bitmapscan | on enable_gathermerge | on enable_hashagg | on @@ -106,7 +107,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(18 rows) +(19 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail