From 4209ad4e9d3c46d143de07549061f55f23c50e9d Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 9 May 2016 11:48:11 -0400 Subject: [PATCH 3/3] Lightweight framework for waiting for events. --- src/backend/executor/Makefile | 4 +- src/backend/executor/execAsync.c | 256 ++++++++++++++++++++++++++++++++++++ src/backend/executor/execProcnode.c | 82 ++++++++---- src/include/executor/execAsync.h | 23 ++++ src/include/executor/executor.h | 2 + src/include/nodes/execnodes.h | 10 ++ 6 files changed, 352 insertions(+), 25 deletions(-) create mode 100644 src/backend/executor/execAsync.c create mode 100644 src/include/executor/execAsync.h diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 51edd4c..0675b01 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -12,8 +12,8 @@ subdir = src/backend/executor top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ - execMain.o execParallel.o execProcnode.o execQual.o \ +OBJS = execAmi.o execAsync.o execCurrent.o execGrouping.o execIndexing.o \ + execJunk.o execMain.o execParallel.o execProcnode.o execQual.o \ execScan.o execTuples.o \ execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \ nodeBitmapAnd.o nodeBitmapOr.o \ diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c new file mode 100644 index 0000000..20601fa --- /dev/null +++ b/src/backend/executor/execAsync.c @@ -0,0 +1,256 @@ +/*------------------------------------------------------------------------- + * + * execAsync.c + * Support routines for asynchronous execution. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * This file contains routines that are intended to asynchronous + * execution; that is, suspending an executor node until some external + * event occurs, or until one of its child nodes produces a tuple. + * This allows the executor to avoid blocking on a single external event, + * such as a file descriptor waiting on I/O, or a parallel worker which + * must complete work elsewhere in the plan tree, when there might at the + * same time be useful computation that could be accomplished in some + * other part of the plan tree. + * + * IDENTIFICATION + * src/backend/executor/execParallel.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execAsync.h" +#include "executor/executor.h" +#include "storage/latch.h" + +#define EVENT_BUFFER_SIZE 16 + +static void ExecAsyncConfigureWait(PlanState *planstate, bool reinit); + +void +ExecAsyncWaitForNode(PlanState *planstate) +{ + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + PlanState *callbacks[EVENT_BUFFER_SIZE]; + int ncallbacks = 0; + EState *estate = planstate->state; + + while (!planstate->result_ready) + { + bool reinit = (estate->es_wait_event_set == NULL); + int n; + int noccurred; + + if (reinit) + { + /* + * Allow for a few extra events without reinitializing. It + * doesn't seem worth the complexity of doing anything very + * aggressive here, because plans that depend on massive numbers + * of external FDs are likely to run afoul of kernel limits anyway. + */ + estate->es_max_async_events = estate->es_total_async_events + 16; + estate->es_wait_event_set = + CreateWaitEventSet(estate->es_query_cxt, + estate->es_max_async_events); + } + + /* Give each waiting node a chance to add or modify events. */ + for (n = 0; n < estate->es_num_waiting_nodes; ++n) + ExecAsyncConfigureWait(estate->es_waiting_nodes[n], reinit); + + /* Wait for at least one event to occur. */ + noccurred = WaitEventSetWait(estate->es_wait_event_set, -1, + occurred_event, EVENT_BUFFER_SIZE); + Assert(noccurred > 0); + + /* + * Loop over the occurred events and make a list of nodes that need + * a callback. The waiting nodes should have registered their wait + * events with user_data pointing back to the node. + */ + for (n = 0; n < noccurred; ++n) + { + WaitEvent *w = &occurred_event[n]; + PlanState *ps = w->user_data; + + callbacks[ncallbacks++] = ps; + } + + /* + * Initially, this loop will call the node-type-specific function for + * each node for which an event occurred. If any of those nodes + * produce a result, its parent enters the set of nodes that are + * pending for a callback. In this way, when a result becomes + * available in a leaf of the plan tree, it can bubble upwards towards + * the root as far as necessary. + */ + while (ncallbacks > 0) + { + int i, + j; + + /* Loop over all callbacks. */ + for (i = 0; i < ncallbacks; ++i) + { + /* Skip if NULL. */ + if (callbacks[i] == NULL) + continue; + + /* + * Remove any duplicates. O(n) may not seem good, but it + * should hopefully be OK as long as EVENT_BUFFER_SIZE is + * not too large. + */ + for (j = i + 1; j < ncallbacks; ++j) + if (callbacks[i] == callbacks[j]) + callbacks[j] = NULL; + + /* Dispatch to node-type-specific code. */ + ExecDispatchNode(callbacks[i]); + + /* + * If there's now a tuple ready, we must dispatch to the + * parent node; otherwise, there's nothing more to do. + */ + if (callbacks[i]->result_ready) + callbacks[i] = callbacks[i]->parent; + else + callbacks[i] = NULL; + } + + /* Squeeze out NULLs. */ + for (i = 0, j = 0; j < ncallbacks; ++j) + if (callbacks[j] != NULL) + callbacks[i++] = callbacks[j]; + ncallbacks = i; + } + } +} + +/* + * An executor node should call this function to signal that it needs to wait + * on one more or events that can be registered on a WaitEventSet. nevents + * should be the maximum number of events that it will wish to register. + * reinit should be true if the node can't reuse the WaitEventSet it most + * recently initialized, for example because it needs to drop a wait event + * from the set. + */ +void +ExecAsyncNeedsWait(PlanState *planstate, int nevents, bool reinit) +{ + EState *estate = planstate->state; + + Assert(nevents > 0); /* otherwise, use ExecAsyncDoesNotNeedWait */ + + /* + * If this node is not already present in the array of waiting nodes, + * then add it. If that array hasn't been allocated or is full, this may + * require (re)allocating it. + */ + if (planstate->n_async_events == 0) + { + if (estate->es_max_waiting_nodes >= estate->es_num_waiting_nodes) + { + int newmax; + + if (estate->es_max_waiting_nodes == 0) + { + newmax = 16; + estate->es_waiting_nodes = + MemoryContextAlloc(estate->es_query_cxt, newmax); + } + else + { + newmax = estate->es_max_waiting_nodes * 2; + estate->es_waiting_nodes = + repalloc(estate->es_waiting_nodes, + newmax * sizeof(PlanState *)); + } + estate->es_max_waiting_nodes = newmax; + } + estate->es_waiting_nodes[estate->es_num_waiting_nodes++] = planstate; + } + + /* Adjust per-node and per-estate totals. */ + estate->es_total_async_events -= planstate->n_async_events; + planstate->n_async_events = nevents; + estate->es_total_async_events += planstate->n_async_events; + + /* + * If a WaitEventSet has already been created, we need to discard it and + * start again if the user passed reinit = true, or if the total number of + * required events exceeds the supported number. + */ + if (estate->es_wait_event_set != NULL && (reinit || + estate->es_total_async_events > estate->es_max_async_events)) + { + FreeWaitEventSet(estate->es_wait_event_set); + estate->es_wait_event_set = NULL; + } +} + +/* + * If an executor node no longer needs to wait, it should call this function + * to report that fact. + */ +void +ExecAsyncDoesNotNeedWait(PlanState *planstate) +{ + int n; + EState *estate = planstate->state; + + if (planstate->n_async_events <= 0) + return; + + /* + * Remove the node from the list of waiting nodes. (Is a linear search + * going to be a problem here? I think probably not.) + */ + for (n = 0; n < estate->es_num_waiting_nodes; ++n) + { + if (estate->es_waiting_nodes[n] == planstate) + { + estate->es_waiting_nodes[n] = + estate->es_waiting_nodes[--estate->es_num_waiting_nodes]; + break; + } + } + + /* We should always find ourselves in the array. */ + Assert(n < estate->es_num_waiting_nodes); + + /* We no longer need any asynchronous events. */ + estate->es_total_async_events -= planstate->n_async_events; + planstate->n_async_events = 0; + + /* + * The next wait will need to rebuild the WaitEventSet, because whatever + * events we registered are gone now. It's probably OK that this code + * assumes we actually did register some events at one point, because we + * needed to wait at some point and we don't any more. + */ + if (estate->es_wait_event_set != NULL) + { + FreeWaitEventSet(estate->es_wait_event_set); + estate->es_wait_event_set = NULL; + } +} + +/* + * Give per-nodetype function a chance to register wait events. + */ +static void +ExecAsyncConfigureWait(PlanState *planstate, bool reinit) +{ + switch (nodeTag(planstate)) + { + /* XXX: Add calls to per-nodetype handlers here. */ + default: + elog(ERROR, "unexpected node type: %d", nodeTag(planstate)); + } +} diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 3f2ebff..b7ac08e 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -77,6 +77,7 @@ */ #include "postgres.h" +#include "executor/execAsync.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeAppend.h" @@ -368,24 +369,14 @@ ExecInitNode(Plan *node, EState *estate, PlanState *parent, int eflags) /* ---------------------------------------------------------------- - * ExecProcNode + * ExecDispatchNode * - * Execute the given node to return a(nother) tuple. + * Invoke the given node's dispatch function. * ---------------------------------------------------------------- */ -TupleTableSlot * -ExecProcNode(PlanState *node) +void +ExecDispatchNode(PlanState *node) { - TupleTableSlot *result; - - CHECK_FOR_INTERRUPTS(); - - /* mark any previous result as having been consumed */ - node->result_ready = false; - - if (node->chgParam != NULL) /* something changed */ - ExecReScan(node); /* let ReScan handle this */ - if (node->instrument) InstrStartNode(node->instrument); @@ -539,22 +530,67 @@ ExecProcNode(PlanState *node) default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); - result = NULL; break; } - /* We don't support asynchronous execution yet. */ - Assert(node->result_ready); + if (node->instrument) + { + double nTuples = 0.0; - /* Result should be a TupleTableSlot, unless it's NULL. */ - Assert(node->result == NULL || IsA(node->result, TupleTableSlot)); + if (node->result_ready && node->result != NULL && + IsA(node->result, TupleTableSlot)) + nTuples = 1.0; - result = (TupleTableSlot *) node->result; + InstrStopNode(node->instrument, nTuples); + } +} - if (node->instrument) - InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0); - return result; +/* ---------------------------------------------------------------- + * ExecExecuteNode + * + * Request the next tuple from the given node. Note that + * if the node supports asynchrony, result_ready may not be + * set on return (use ExecProcNode if you need that, or call + * ExecAsyncWaitForNode). + * ---------------------------------------------------------------- + */ +void +ExecExecuteNode(PlanState *node) +{ + node->result_ready = false; + ExecDispatchNode(node); +} + + +/* ---------------------------------------------------------------- + * ExecProcNode + * + * Get the next tuple from the given node. If the node is + * asynchronous, wait for a tuple to be ready before + * returning. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecProcNode(PlanState *node) +{ + CHECK_FOR_INTERRUPTS(); + + /* mark any previous result as having been consumed */ + node->result_ready = false; + + if (node->chgParam != NULL) /* something changed */ + ExecReScan(node); /* let ReScan handle this */ + + ExecDispatchNode(node); + + if (!node->result_ready) + ExecAsyncWaitForNode(node); + + /* Result should be a TupleTableSlot, unless it's NULL. */ + Assert(node->result == NULL || IsA(node->result, TupleTableSlot)); + + return (TupleTableSlot *) node->result; } diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h new file mode 100644 index 0000000..38b37a1 --- /dev/null +++ b/src/include/executor/execAsync.h @@ -0,0 +1,23 @@ +/*-------------------------------------------------------------------- + * execAsync.h + * Support functions for asynchronous query execution + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execAsync.h + *-------------------------------------------------------------------- + */ + +#ifndef EXECASYNC_H +#define EXECASYNC_H + +#include "nodes/execnodes.h" + +extern void ExecAsyncWaitForNode(PlanState *planstate); +extern void ExecAsyncNeedsWait(PlanState *planstate, int nevents, + bool reinit); +extern void ExecAsyncDoesNotNeedWait(PlanState *planstate); + +#endif /* EXECASYNC_H */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 087735a..979dea3 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -223,6 +223,8 @@ extern void EvalPlanQualEnd(EPQState *epqstate); */ extern PlanState *ExecInitNode(Plan *node, EState *estate, PlanState *parent, int eflags); +extern void ExecDispatchNode(PlanState *node); +extern void ExecExecuteNode(PlanState *node); extern TupleTableSlot *ExecProcNode(PlanState *node); extern Node *MultiExecProcNode(PlanState *node); extern void ExecEndNode(PlanState *node); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index a0bc8af..3dba03c 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -382,6 +382,14 @@ typedef struct EState ParamListInfo es_param_list_info; /* values of external params */ ParamExecData *es_param_exec_vals; /* values of internal params */ + /* Asynchronous execution support */ + struct PlanState **es_waiting_nodes; /* array of waiting nodes */ + int es_num_waiting_nodes; /* # of waiters in array */ + int es_max_waiting_nodes; /* # of allocated entries */ + int es_total_async_events; /* total of per-node n_async_events */ + int es_max_async_events; /* # supported by event set */ + struct WaitEventSet *es_wait_event_set; + /* Other working state: */ MemoryContext es_query_cxt; /* per-query context in which EState lives */ @@ -1034,6 +1042,8 @@ typedef struct PlanState bool result_ready; /* true if result is ready */ Node *result; /* result, most often TupleTableSlot */ + int n_async_events; /* # of async events we want to register */ + Instrumentation *instrument; /* Optional runtime stats for this node */ WorkerInstrumentation *worker_instrument; /* per-worker instrumentation */ -- 2.5.4 (Apple Git-61)