From 343c747ac083bab9a3582b043af1c8615d2eeee7 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 30 Aug 2017 21:16:56 -0700 Subject: [PATCH 15/16] WIP: Expression based agg transition. Todo: - Split EEOP_AGG_PLAIN_TRANS into lifetime caring/not variant - Fix memory lifetime for JITed --- src/backend/executor/execExpr.c | 289 ++++++++++++++++++ src/backend/executor/execExprCompile.c | 357 ++++++++++++++++++++++ src/backend/executor/execExprInterp.c | 223 ++++++++++++++ src/backend/executor/nodeAgg.c | 534 ++++++--------------------------- src/backend/lib/llvmjit.c | 13 + src/include/executor/execExpr.h | 69 +++++ src/include/executor/executor.h | 2 + src/include/executor/nodeAgg.h | 308 +++++++++++++++++++ src/include/lib/llvmjit.h | 1 + src/include/nodes/execnodes.h | 5 + 10 files changed, 1351 insertions(+), 450 deletions(-) diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index e6ffe6e062..29aa8718fc 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -43,6 +43,7 @@ #include "optimizer/planner.h" #include "pgstat.h" #include "utils/builtins.h" +#include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/typcache.h" @@ -2532,6 +2533,294 @@ ExecInitArrayRef(ExprEvalStep *scratch, ArrayRef *aref, PlanState *parent, } } +static void +ExecInitAggTransTrans(ExprState *state, AggState *aggstate, ExprEvalStep *scratch, FunctionCallInfo fcinfo, + AggStatePerTrans pertrans, int transno, int setno, int setoff, bool ishash) +{ + int adjust_init_jumpnull = -1; + int adjust_strict_jumpnull = -1; + ExprContext *aggcontext; + + if (ishash) + aggcontext = aggstate->hashcontext; + else + aggcontext = aggstate->aggcontexts[setno]; + + /* + * If the initial value for the transition state doesn't exist in the + * pg_aggregate table then we will let the first non-NULL value + * returned from the outer procNode become the initial value. (This is + * useful for aggregates like max() and min().) The noTransValue flag + * signals that we still need to do this. + */ + if (pertrans->numSortCols == 0 && + fcinfo->flinfo->fn_strict && + pertrans->initValueIsNull) + { + scratch->opcode = EEOP_AGG_INIT_TRANS; + scratch->d.agg_init_trans.aggstate = aggstate; + scratch->d.agg_init_trans.pertrans = pertrans; + scratch->d.agg_init_trans.setno = setno; + scratch->d.agg_init_trans.setoff = setoff; + scratch->d.agg_init_trans.transno = transno; + scratch->d.agg_init_trans.aggcontext = aggcontext; + scratch->d.agg_init_trans.jumpnull = -1; /* adjust later */ + ExprEvalPushStep(state, scratch); + + adjust_init_jumpnull = state->steps_len - 1; + } + + if (pertrans->numSortCols == 0 && + fcinfo->flinfo->fn_strict) + { + scratch->opcode = EEOP_AGG_STRICT_TRANS_CHECK; + scratch->d.agg_strict_trans_check.aggstate = aggstate; + scratch->d.agg_strict_trans_check.setno = setno; + scratch->d.agg_strict_trans_check.setoff = setoff; + scratch->d.agg_strict_trans_check.transno = transno; + scratch->d.agg_strict_trans_check.jumpnull = -1; /* adjust later */ + ExprEvalPushStep(state, scratch); + + /* + * Note, we don't push into adjust_bailout here - those jump + * to the end of all transition value computations. + */ + adjust_strict_jumpnull = state->steps_len - 1; + } + + if (pertrans->numSortCols == 0) + { + scratch->opcode = EEOP_AGG_PLAIN_TRANS; + scratch->d.agg_plain_trans.aggstate = aggstate; + scratch->d.agg_plain_trans.pertrans = pertrans; + scratch->d.agg_plain_trans.setno = setno; + scratch->d.agg_plain_trans.setoff = setoff; + scratch->d.agg_plain_trans.transno = transno; + scratch->d.agg_plain_trans.aggcontext = aggcontext; + ExprEvalPushStep(state, scratch); + } + else if (pertrans->numInputs == 1) + { + scratch->opcode = EEOP_AGG_ORDERED_TRANS_DATUM; + scratch->d.agg_ordered_trans.aggstate = aggstate; + scratch->d.agg_ordered_trans.pertrans = pertrans; + scratch->d.agg_ordered_trans.setno = setno; + scratch->d.agg_ordered_trans.setoff = setoff; + scratch->d.agg_ordered_trans.aggcontext = aggcontext; + ExprEvalPushStep(state, scratch); + } + else + { + scratch->opcode = EEOP_AGG_ORDERED_TRANS_TUPLE; + scratch->d.agg_ordered_trans.aggstate = aggstate; + scratch->d.agg_ordered_trans.pertrans = pertrans; + scratch->d.agg_ordered_trans.setno = setno; + scratch->d.agg_ordered_trans.setoff = setoff; + scratch->d.agg_ordered_trans.aggcontext = aggcontext; + ExprEvalPushStep(state, scratch); + } + + if (adjust_init_jumpnull != -1 ) + { + ExprEvalStep *as = &state->steps[adjust_init_jumpnull]; + Assert(as->d.agg_init_trans.jumpnull == -1); + as->d.agg_init_trans.jumpnull = state->steps_len; + } + + if (adjust_strict_jumpnull != -1 ) + { + ExprEvalStep *as = &state->steps[adjust_strict_jumpnull]; + Assert(as->d.agg_strict_trans_check.jumpnull == -1); + as->d.agg_strict_trans_check.jumpnull = state->steps_len; + } +} + +ExprState * +ExecInitAggTrans(AggState *aggstate, AggStatePerPhase phase, + PlanState *parent, bool doSort, bool doHash) +{ + ExprState *state = makeNode(ExprState); + List *exprList = NIL; + ExprEvalStep scratch; + int transno = 0; + int setoff = 0; + + state->expr = (Expr *) aggstate; + + scratch.resvalue = &state->resvalue; + scratch.resnull = &state->resnull; + + /* + * First figure out which slots we're going to need. Out of expediency + * build one list for all expressions and then use existing code :( + */ + for (transno = 0; transno < aggstate->numtrans; transno++) + { + AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + + exprList = lappend(exprList, pertrans->aggref->aggdirectargs); + exprList = lappend(exprList, pertrans->aggref->args); + exprList = lappend(exprList, pertrans->aggref->aggorder); + exprList = lappend(exprList, pertrans->aggref->aggdistinct); + exprList = lappend(exprList, pertrans->aggref->aggfilter); + } + ExecInitExprSlots(state, (Node *) exprList); + + /* + * Emit instructions for each transition value / grouping set combination. + */ + for (transno = 0; transno < aggstate->numtrans; transno++) + { + AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + int numInputs = pertrans->numInputs; + int argno; + int setno; + FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; + ListCell *arg, *bail; + List *adjust_bailout = NIL; + bool *strictnulls = NULL; + + /* + * If filter present, emit. Do so before evaluating the input, to + * avoid potentially unneeded computations. + */ + if (pertrans->aggref->aggfilter) + { + /* evaluate filter expression */ + ExecInitExprRec(pertrans->aggref->aggfilter, parent, state, + &state->resvalue, &state->resnull); + /* and jump out if false */ + scratch.opcode = EEOP_AGG_FILTER; + scratch.d.agg_filter.jumpfalse = -1; /* adjust later */ + ExprEvalPushStep(state, &scratch); + adjust_bailout = lappend_int(adjust_bailout, + state->steps_len - 1); + } + + /* + * Evaluate aggregate input into the user of that information. + */ + argno = 0; + if (pertrans->numSortCols == 0) + { + strictnulls = fcinfo->argnull + 1; + + foreach (arg, pertrans->aggref->args) + { + TargetEntry *source_tle = (TargetEntry *) lfirst(arg); + + /* Start from 1, since the 0th arg will be the transition value */ + ExecInitExprRec(source_tle->expr, parent, state, + &fcinfo->arg[argno + 1], + &fcinfo->argnull[argno + 1]); + argno++; + } + } + else if (pertrans->numInputs == 1) + { + TargetEntry *source_tle = + (TargetEntry *) linitial(pertrans->aggref->args); + Assert(list_length(pertrans->aggref->args) == 1); + + ExecInitExprRec(source_tle->expr, parent, state, + &state->resvalue, + &state->resnull); + strictnulls = &state->resnull; + argno++; + } + else + { + Datum *values = pertrans->sortslot->tts_values; + bool *nulls = pertrans->sortslot->tts_isnull; + + strictnulls = nulls; + + foreach (arg, pertrans->aggref->args) + { + TargetEntry *source_tle = (TargetEntry *) lfirst(arg); + + ExecInitExprRec(source_tle->expr, parent, state, + &values[argno], &nulls[argno]); + argno++; + } + } + Assert(numInputs == argno); + + /* + * For a strict transfn, nothing happens when there's a NULL input; we + * just keep the prior transValue. This is true for both plain and + * sorted/distinct aggregates. + */ + if (fcinfo->flinfo->fn_strict && numInputs > 0) + { + scratch.opcode = EEOP_AGG_STRICT_INPUT_CHECK; + scratch.d.agg_strict_input_check.nulls = strictnulls; + scratch.d.agg_strict_input_check.jumpnull = -1; /* adjust later */ + scratch.d.agg_strict_input_check.nargs = numInputs; + ExprEvalPushStep(state, &scratch); + adjust_bailout = lappend_int(adjust_bailout, + state->steps_len - 1); + } + + + /* and call transition function (once for each grouping set) */ + + setoff = 0; + if (doSort) + { + int processGroupingSets = Max(phase->numsets, 1); + + for (setno = 0; setno < processGroupingSets; setno++) + { + ExecInitAggTransTrans(state, aggstate, &scratch, fcinfo, pertrans, transno, setno, setoff, false); + setoff++; + } + } + + if (doHash) + { + int numHashes = aggstate->num_hashes; + + if (aggstate->aggstrategy != AGG_HASHED) + setoff = aggstate->maxsets; + else + setoff = 0; + + for (setno = 0; setno < numHashes; setno++) + { + ExecInitAggTransTrans(state, aggstate, &scratch, fcinfo, pertrans, transno, setno, setoff, true); + setoff++; + } + } + + /* adjust early bail out jump target(s) */ + foreach (bail, adjust_bailout) + { + ExprEvalStep *as = &state->steps[lfirst_int(bail)]; + if (as->opcode == EEOP_AGG_FILTER) + { + Assert(as->d.agg_filter.jumpfalse == -1); + as->d.agg_filter.jumpfalse = state->steps_len; + } + else if (as->opcode == EEOP_AGG_STRICT_INPUT_CHECK) + { + Assert(as->d.agg_strict_input_check.jumpnull == -1); + as->d.agg_strict_input_check.jumpnull = state->steps_len; + } + } + + } + + scratch.resvalue = NULL; + scratch.resnull = NULL; + scratch.opcode = EEOP_DONE; + ExprEvalPushStep(state, &scratch); + + ExecReadyExpr(state, parent); + + return state; +} + /* * Helper for preparing ArrayRef expressions for evaluation: is expr a nested * FieldStore or ArrayRef that needs the old element value passed down? diff --git a/src/backend/executor/execExprCompile.c b/src/backend/executor/execExprCompile.c index 79b3ebd6c4..d0b943530c 100644 --- a/src/backend/executor/execExprCompile.c +++ b/src/backend/executor/execExprCompile.c @@ -23,6 +23,7 @@ #include "catalog/objectaccess.h" #include "catalog/pg_type.h" #include "executor/execdebug.h" +#include "executor/nodeAgg.h" #include "executor/nodeSubplan.h" #include "executor/execExpr.h" #include "funcapi.h" @@ -273,6 +274,28 @@ BuildFunctionCall(LLVMJitContext *context, LLVMBuilderRef builder, return v_retval; } +static LLVMValueRef +create_ExecAggInitGroup(LLVMModuleRef mod) +{ + LLVMTypeRef sig; + LLVMValueRef fn; + LLVMTypeRef param_types[3]; + const char *nm = "ExecAggInitGroup"; + + fn = LLVMGetNamedFunction(mod, nm); + if (fn) + return fn; + + param_types[0] = LLVMPointerType(TypeSizeT, 0); + param_types[1] = LLVMPointerType(TypeSizeT, 0); + param_types[2] = LLVMPointerType(StructAggStatePerGroupData, 0); + + sig = LLVMFunctionType(LLVMVoidType(), param_types, lengthof(param_types), 0); + fn = LLVMAddFunction(mod, nm, sig); + + return fn; +} + static Datum ExecRunCompiledExpr(ExprState *state, ExprContext *econtext, bool *isNull) { @@ -1446,6 +1469,8 @@ ExecReadyCompiledExpr(ExprState *state, PlanState *parent) case EEOP_NULLTEST_ROWISNULL: case EEOP_NULLTEST_ROWISNOTNULL: case EEOP_WHOLEROW: + case EEOP_AGG_ORDERED_TRANS_DATUM: + case EEOP_AGG_ORDERED_TRANS_TUPLE: { LLVMValueRef v_params[3]; const char *funcname; @@ -1502,6 +1527,10 @@ ExecReadyCompiledExpr(ExprState *state, PlanState *parent) funcname = "ExecEvalAlternativeSubPlan"; else if (op->opcode == EEOP_WHOLEROW) funcname = "ExecEvalWholeRowVar"; + else if (op->opcode == EEOP_AGG_ORDERED_TRANS_DATUM) + funcname = "ExecEvalAggOrderedTransDatum"; + else if (op->opcode == EEOP_AGG_ORDERED_TRANS_TUPLE) + funcname = "ExecEvalAggOrderedTransTuple"; else { Assert(false); @@ -2346,6 +2375,334 @@ ExecReadyCompiledExpr(ExprState *state, PlanState *parent) LLVMBuildBr(builder, opblocks[i + 1]); + break; + } + case EEOP_AGG_FILTER: + { + LLVMValueRef v_resnull, v_resvalue; + LLVMValueRef v_filtered; + + v_resnull = LLVMBuildLoad(builder, v_resnullp, ""); + v_resvalue = LLVMBuildLoad(builder, v_resvaluep, ""); + + v_filtered = LLVMBuildOr( + builder, + LLVMBuildICmp( + builder, LLVMIntEQ, v_resnull, + LLVMConstInt(LLVMInt8Type(), 1, false), ""), + LLVMBuildICmp( + builder, LLVMIntEQ, v_resvalue, + LLVMConstInt(TypeSizeT, 0, false), ""), + ""); + + LLVMBuildCondBr( + builder, + v_filtered, + opblocks[op->d.agg_filter.jumpfalse], + opblocks[i + 1]); + + break; + } + + case EEOP_AGG_STRICT_INPUT_CHECK: + { + int nargs = op->d.agg_strict_input_check.nargs; + bool *nulls = op->d.agg_strict_input_check.nulls; + int argno; + + LLVMValueRef v_nullp; + LLVMBasicBlockRef *b_checknulls; + + v_nullp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) nulls, false), + LLVMPointerType(LLVMInt8Type(), 0), + "v_nullp"); + + /* create blocks for checking args */ + b_checknulls = palloc(sizeof(LLVMBasicBlockRef *) * nargs); + for (argno = 0; argno < nargs; argno++) + { + b_checknulls[argno] = LLVMInsertBasicBlock(opblocks[i + 1], "check-null"); + } + + LLVMBuildBr(builder, b_checknulls[0]); + + /* strict function, check for NULL args */ + for (argno = 0; argno < nargs; argno++) + { + LLVMValueRef v_argno = LLVMConstInt(LLVMInt32Type(), argno, false); + LLVMValueRef v_argisnull; + LLVMBasicBlockRef b_argnotnull; + + LLVMPositionBuilderAtEnd(builder, b_checknulls[argno]); + + if (argno + 1 == nargs) + b_argnotnull = opblocks[i + 1]; + else + b_argnotnull = b_checknulls[argno + 1]; + + v_argisnull = LLVMBuildLoad( + builder, + LLVMBuildGEP( + builder, v_nullp, &v_argno, 1, ""), + ""); + + LLVMBuildCondBr( + builder, + LLVMBuildICmp(builder, LLVMIntEQ, v_argisnull, + LLVMConstInt(LLVMInt8Type(), 1, false), ""), + opblocks[op->d.agg_strict_input_check.jumpnull], + b_argnotnull); + } + + break; + } + + case EEOP_AGG_INIT_TRANS: + { + AggState *aggstate; + AggStatePerTrans pertrans; + + LLVMValueRef v_aggstatep; + LLVMValueRef v_pertransp; + + LLVMValueRef v_allpergroupspp; + + LLVMValueRef v_pergroupp; + + LLVMValueRef v_setoff, v_transno; + + LLVMValueRef v_notransvalue; + + LLVMBasicBlockRef b_init; + + aggstate = op->d.agg_init_trans.aggstate; + pertrans = op->d.agg_init_trans.pertrans; + + v_aggstatep = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) aggstate, false), + LLVMPointerType(TypeSizeT, 0), + ""); + + v_pertransp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) pertrans, false), + LLVMPointerType(TypeSizeT, 0), + ""); + + v_allpergroupspp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) &aggstate->all_pergroups, false), + LLVMPointerType(LLVMPointerType(LLVMPointerType(StructAggStatePerGroupData, 0), 0), 0), + "aggstate.all_pergroups"); + + v_setoff = LLVMConstInt(LLVMInt32Type(), op->d.agg_init_trans.setoff, 0); + v_transno = LLVMConstInt(LLVMInt32Type(), op->d.agg_init_trans.transno, 0); + + v_pergroupp = LLVMBuildGEP( + builder, + LLVMBuildLoad( + builder, + v_allpergroupspp, + ""), + &v_setoff, 1, ""); + + v_pergroupp = LLVMBuildGEP( + builder, + LLVMBuildLoad( + builder, + v_pergroupp, + ""), + &v_transno, 1, ""); + + v_notransvalue = LLVMBuildLoad( + builder, + LLVMBuildStructGEP( + builder, v_pergroupp, 2, "notransvalue"), + "" + ); + + b_init = LLVMInsertBasicBlock(opblocks[i + 1], "inittrans"); + + LLVMBuildCondBr( + builder, + LLVMBuildICmp(builder, LLVMIntEQ, v_notransvalue, + LLVMConstInt(LLVMInt8Type(), 1, false), ""), + b_init, + opblocks[i + 1]); + + LLVMPositionBuilderAtEnd(builder, b_init); + + { + LLVMValueRef params[3]; + + params[0] = v_aggstatep; + params[1] = v_pertransp; + params[2] = v_pergroupp; + + LLVMBuildCall( + builder, + create_ExecAggInitGroup(mod), + params, lengthof(params), + ""); + } + LLVMBuildBr(builder, opblocks[op->d.agg_init_trans.jumpnull]); + + break; + } + + case EEOP_AGG_STRICT_TRANS_CHECK: + { + LLVMBuildBr( + builder, + opblocks[i + 1]); + break; + } + + case EEOP_AGG_PLAIN_TRANS: + { + AggState *aggstate; + AggStatePerTrans pertrans; + FunctionCallInfo fcinfo; + + LLVMValueRef v_fcinfo_isnull; + LLVMValueRef v_argp, v_argnullp; + + LLVMValueRef v_arg0p; + LLVMValueRef v_argnull0p; + + LLVMValueRef v_transvaluep; + LLVMValueRef v_transnullp; + + LLVMValueRef v_setno, v_setoff, v_transno; + LLVMValueRef v_aggcontext; + + LLVMValueRef v_allpergroupsp; + LLVMValueRef v_current_setp; + LLVMValueRef v_current_pertransp; + LLVMValueRef v_curaggcontext; + + LLVMValueRef v_pertransp; + + LLVMValueRef v_pergroupp; + LLVMValueRef v_argno; + + + LLVMValueRef v_retval; + + aggstate = op->d.agg_plain_trans.aggstate; + pertrans = op->d.agg_plain_trans.pertrans; + + fcinfo = &pertrans->transfn_fcinfo; + + v_argnullp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) fcinfo->argnull, false), + LLVMPointerType(LLVMInt8Type(), 0), + "v_argnullp"); + + v_argp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) fcinfo->arg, false), + LLVMPointerType(TypeSizeT, 0), + "v_arg"); + + v_setno = LLVMConstInt(LLVMInt32Type(), op->d.agg_plain_trans.setno, 0); + v_setoff = LLVMConstInt(LLVMInt32Type(), op->d.agg_plain_trans.setoff, 0); + v_transno = LLVMConstInt(LLVMInt32Type(), op->d.agg_plain_trans.transno, 0); + v_aggcontext = LLVMConstInt(LLVMInt64Type(), (uintptr_t)op->d.agg_plain_trans.aggcontext, 0); + + v_pertransp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) pertrans, false), + LLVMPointerType(TypeSizeT, 0), + ""); + + v_current_setp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) &aggstate->current_set, false), + LLVMPointerType(LLVMInt32Type(), 0), + "aggstate.current_set"); + v_curaggcontext = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) &aggstate->curaggcontext, false), + LLVMPointerType(TypeSizeT, 0), + ""); + v_current_pertransp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) &aggstate->curpertrans, false), + LLVMPointerType(LLVMPointerType(TypeSizeT, 0), 0), + "aggstate.curpertrans"); + + v_allpergroupsp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) &aggstate->all_pergroups, false), + LLVMPointerType(LLVMPointerType(LLVMPointerType(StructAggStatePerGroupData, 0), 0), 0), + "aggstate.all_pergroups"); + + v_pergroupp = LLVMBuildGEP( + builder, + LLVMBuildLoad( + builder, + v_allpergroupsp, + ""), + &v_setoff, 1, "setoff"); + + v_pergroupp = LLVMBuildGEP( + builder, + LLVMBuildLoad( + builder, + v_pergroupp, + ""), + &v_transno, 1, "transno"); + + /* set aggstate globals */ + LLVMBuildStore(builder, v_setno, v_current_setp); + LLVMBuildStore(builder, v_pertransp, v_current_pertransp); + LLVMBuildStore(builder, v_aggcontext, v_curaggcontext); + + /* store transvalue in fcinfo->arg/argnull[0] */ + v_argno = LLVMConstInt(LLVMInt32Type(), 0, false); + v_arg0p = LLVMBuildGEP(builder, v_argp, &v_argno, 1, ""); + v_argnull0p = LLVMBuildGEP(builder, v_argnullp, &v_argno, 1, ""); + + v_transvaluep = LLVMBuildStructGEP( + builder, v_pergroupp, 0, "transvaluep"); + v_transnullp = LLVMBuildStructGEP( + builder, v_pergroupp, 1, "transnullp"); + + LLVMBuildStore( + builder, + LLVMBuildLoad( + builder, + v_transvaluep, + "transvalue"), + v_arg0p); + + LLVMBuildStore( + builder, + LLVMBuildLoad( + builder, + v_transnullp, + "transnull"), + v_argnull0p); + + v_retval = BuildFunctionCall(context, builder, mod, fcinfo, &v_fcinfo_isnull); + + /* retrieve trans value */ + LLVMBuildStore( + builder, + v_retval, + v_transvaluep); + LLVMBuildStore( + builder, + v_fcinfo_isnull, + v_transnullp); + + LLVMBuildBr(builder, opblocks[i + 1]); + break; } diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index df453b2ab4..a8e56f6f3a 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -64,12 +64,14 @@ #include "executor/execExpr.h" #include "executor/nodeSubplan.h" #include "funcapi.h" +#include "utils/memutils.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" #include "parser/parsetree.h" #include "pgstat.h" #include "utils/builtins.h" #include "utils/date.h" +#include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/timestamp.h" #include "utils/typcache.h" @@ -358,6 +360,13 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) &&CASE_EEOP_WINDOW_FUNC, &&CASE_EEOP_SUBPLAN, &&CASE_EEOP_ALTERNATIVE_SUBPLAN, + &&CASE_EEOP_AGG_FILTER, + &&CASE_EEOP_AGG_STRICT_INPUT_CHECK, + &&CASE_EEOP_AGG_INIT_TRANS, + &&CASE_EEOP_AGG_STRICT_TRANS_CHECK, + &&CASE_EEOP_AGG_PLAIN_TRANS, + &&CASE_EEOP_AGG_ORDERED_TRANS_DATUM, + &&CASE_EEOP_AGG_ORDERED_TRANS_TUPLE, &&CASE_EEOP_LAST }; @@ -1461,6 +1470,171 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) EEO_NEXT(); } + EEO_CASE(EEOP_AGG_FILTER) + { + if (*op->resnull || !DatumGetBool(*op->resvalue)) + { + Assert(op->d.agg_filter.jumpfalse != -1); + EEO_JUMP(op->d.agg_filter.jumpfalse); + } + else + EEO_NEXT(); + } + + EEO_CASE(EEOP_AGG_STRICT_INPUT_CHECK) + { + int argno; + bool *nulls = op->d.agg_strict_input_check.nulls; + + Assert(op->d.agg_strict_input_check.jumpnull != -1); + + for (argno = 0; argno < op->d.agg_strict_input_check.nargs; argno++) + { + if (nulls[argno]) + { + EEO_JUMP(op->d.agg_strict_input_check.jumpnull); + } + } + EEO_NEXT(); + } + + EEO_CASE(EEOP_AGG_INIT_TRANS) + { + AggState *aggstate; + AggStatePerGroup pergroup; + + aggstate = op->d.agg_init_trans.aggstate; + pergroup = &aggstate->all_pergroups + [op->d.agg_init_trans.setoff] + [op->d.agg_init_trans.transno]; + + if (pergroup->noTransValue) + { + AggStatePerTrans pertrans = op->d.agg_init_trans.pertrans; + + aggstate->curaggcontext = op->d.agg_init_trans.aggcontext; + aggstate->current_set = op->d.agg_init_trans.setno; + + ExecAggInitGroup(aggstate, pertrans, pergroup); + + EEO_JUMP(op->d.agg_init_trans.jumpnull); + } + + EEO_NEXT(); + } + + EEO_CASE(EEOP_AGG_STRICT_TRANS_CHECK) + { + AggState *aggstate; + AggStatePerGroup pergroup; + + aggstate = op->d.agg_strict_trans_check.aggstate; + pergroup = &aggstate->all_pergroups + [op->d.agg_strict_trans_check.setoff] + [op->d.agg_strict_trans_check.transno]; + + Assert(op->d.agg_strict_trans_check.jumpnull != -1); + + if (unlikely(pergroup->transValueIsNull)) + { + elog(ERROR, "blarg"); + EEO_JUMP(op->d.agg_strict_trans_check.jumpnull); + } + EEO_NEXT(); + } + + EEO_CASE(EEOP_AGG_PLAIN_TRANS) + { + AggState *aggstate; + AggStatePerTrans pertrans; + AggStatePerGroup pergroup; + FunctionCallInfo fcinfo; + MemoryContext oldContext; + Datum newVal; + + aggstate = op->d.agg_plain_trans.aggstate; + pertrans = op->d.agg_plain_trans.pertrans; + + pergroup = &aggstate->all_pergroups + [op->d.agg_plain_trans.setoff] + [op->d.agg_plain_trans.transno]; + + fcinfo = &pertrans->transfn_fcinfo; + + /* cf. select_current_set() */ + aggstate->curaggcontext = op->d.agg_plain_trans.aggcontext; + aggstate->current_set = op->d.agg_plain_trans.setno; + + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + + /* set up aggstate->curpertrans for AggGetAggref() */ + aggstate->curpertrans = pertrans; + + fcinfo->arg[0] = pergroup->transValue; + fcinfo->argnull[0] = pergroup->transValueIsNull; + fcinfo->isnull = false; /* just in case transfn doesn't set it */ + + newVal = FunctionCallInvoke(fcinfo); + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * free the prior transValue. But if transfn returned a pointer to its + * first input, we don't need to do anything. Also, if transfn returned a + * pointer to a R/W expanded object that is already a child of the + * aggcontext, assume we can adopt that value without copying it. + */ + if (!pertrans->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(pergroup->transValue)) + { + if (!fcinfo->isnull) + { + MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory); + if (DatumIsReadWriteExpandedObject(newVal, + false, + pertrans->transtypeLen) && + MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext) + /* do nothing */ ; + else + newVal = datumCopy(newVal, + pertrans->transtypeByVal, + pertrans->transtypeLen); + } + if (!pergroup->transValueIsNull) + { + if (DatumIsReadWriteExpandedObject(pergroup->transValue, + false, + pertrans->transtypeLen)) + DeleteExpandedObject(pergroup->transValue); + else + pfree(DatumGetPointer(pergroup->transValue)); + } + } + + + pergroup->transValue = newVal; + pergroup->transValueIsNull = fcinfo->isnull; + + MemoryContextSwitchTo(oldContext); + + EEO_NEXT(); + } + + EEO_CASE(EEOP_AGG_ORDERED_TRANS_DATUM) + { + /* too complex for an inline implementation */ + ExecEvalAggOrderedTransDatum(state, op, econtext); + + EEO_NEXT(); + } + + EEO_CASE(EEOP_AGG_ORDERED_TRANS_TUPLE) + { + /* too complex for an inline implementation */ + ExecEvalAggOrderedTransTuple(state, op, econtext); + + EEO_NEXT(); + } + EEO_CASE(EEOP_LAST) { /* unreachable */ @@ -3539,3 +3713,52 @@ ExecEvalWholeRowVar(ExprState *state, ExprEvalStep *op, ExprContext *econtext) *op->resvalue = PointerGetDatum(dtuple); *op->resnull = false; } + +void +ExecAggInitGroup(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup) +{ + FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; + MemoryContext oldContext; + + /* + * transValue has not been initialized. This is the first non-NULL + * input value. We use it as the initial value for transValue. (We + * already checked that the agg's input type is binary-compatible + * with its transtype, so straight copy here is OK.) + * + * We must copy the datum into aggcontext if it is pass-by-ref. We + * do not need to pfree the old transValue, since it's NULL. + */ + oldContext = MemoryContextSwitchTo( + aggstate->curaggcontext->ecxt_per_tuple_memory); + pergroup->transValue = datumCopy(fcinfo->arg[1], + pertrans->transtypeByVal, + pertrans->transtypeLen); + pergroup->transValueIsNull = false; + pergroup->noTransValue = false; + MemoryContextSwitchTo(oldContext); +} + + +void +ExecEvalAggOrderedTransDatum(ExprState *state, ExprEvalStep *op, + ExprContext *econtext) +{ + AggStatePerTrans pertrans = op->d.agg_plain_trans.pertrans; + int setno = op->d.agg_plain_trans.setno; + + tuplesort_putdatum(pertrans->sortstates[setno], + *op->resvalue, *op->resnull); +} + +void ExecEvalAggOrderedTransTuple(ExprState *state, ExprEvalStep *op, + ExprContext *econtext) +{ + AggStatePerTrans pertrans = op->d.agg_plain_trans.pertrans; + int setno = op->d.agg_plain_trans.setno; + + ExecClearTuple(pertrans->sortslot); + pertrans->sortslot->tts_nvalid = pertrans->numInputs; + ExecStoreVirtualTuple(pertrans->sortslot); + tuplesort_puttupleslot(pertrans->sortstates[setno], pertrans->sortslot); +} diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index a63c05cb68..3f3dadd2da 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -229,295 +229,6 @@ #include "utils/datum.h" -/* - * AggStatePerTransData - per aggregate state value information - * - * Working state for updating the aggregate's state value, by calling the - * transition function with an input row. This struct does not store the - * information needed to produce the final aggregate result from the transition - * state, that's stored in AggStatePerAggData instead. This separation allows - * multiple aggregate results to be produced from a single state value. - */ -typedef struct AggStatePerTransData -{ - /* - * These values are set up during ExecInitAgg() and do not change - * thereafter: - */ - - /* - * Link to an Aggref expr this state value is for. - * - * There can be multiple Aggref's sharing the same state value, as long as - * the inputs and transition function are identical. This points to the - * first one of them. - */ - Aggref *aggref; - - /* - * Nominal number of arguments for aggregate function. For plain aggs, - * this excludes any ORDER BY expressions. For ordered-set aggs, this - * counts both the direct and aggregated (ORDER BY) arguments. - */ - int numArguments; - - /* - * Number of aggregated input columns. This includes ORDER BY expressions - * in both the plain-agg and ordered-set cases. Ordered-set direct args - * are not counted, though. - */ - int numInputs; - - /* offset of input columns in AggState->evalslot */ - int inputoff; - - /* - * Number of aggregated input columns to pass to the transfn. This - * includes the ORDER BY columns for ordered-set aggs, but not for plain - * aggs. (This doesn't count the transition state value!) - */ - int numTransInputs; - - /* Oid of the state transition or combine function */ - Oid transfn_oid; - - /* Oid of the serialization function or InvalidOid */ - Oid serialfn_oid; - - /* Oid of the deserialization function or InvalidOid */ - Oid deserialfn_oid; - - /* Oid of state value's datatype */ - Oid aggtranstype; - - /* ExprStates of the FILTER and argument expressions. */ - ExprState *aggfilter; /* state of FILTER expression, if any */ - List *aggdirectargs; /* states of direct-argument expressions */ - - /* - * fmgr lookup data for transition function or combine function. Note in - * particular that the fn_strict flag is kept here. - */ - FmgrInfo transfn; - - /* fmgr lookup data for serialization function */ - FmgrInfo serialfn; - - /* fmgr lookup data for deserialization function */ - FmgrInfo deserialfn; - - /* Input collation derived for aggregate */ - Oid aggCollation; - - /* number of sorting columns */ - int numSortCols; - - /* number of sorting columns to consider in DISTINCT comparisons */ - /* (this is either zero or the same as numSortCols) */ - int numDistinctCols; - - /* deconstructed sorting information (arrays of length numSortCols) */ - AttrNumber *sortColIdx; - Oid *sortOperators; - Oid *sortCollations; - bool *sortNullsFirst; - - /* - * fmgr lookup data for input columns' equality operators --- only - * set/used when aggregate has DISTINCT flag. Note that these are in - * order of sort column index, not parameter index. - */ - FmgrInfo *equalfns; /* array of length numDistinctCols */ - - /* - * initial value from pg_aggregate entry - */ - Datum initValue; - bool initValueIsNull; - - /* - * We need the len and byval info for the agg's input and transition data - * types in order to know how to copy/delete values. - * - * Note that the info for the input type is used only when handling - * DISTINCT aggs with just one argument, so there is only one input type. - */ - int16 inputtypeLen, - transtypeLen; - bool inputtypeByVal, - transtypeByVal; - - /* - * Stuff for evaluation of aggregate inputs in cases where the aggregate - * requires sorted input. The arguments themselves will be evaluated via - * AggState->evalslot/evalproj for all aggregates at once, but we only - * want to sort the relevant columns for individual aggregates. - */ - TupleDesc sortdesc; /* descriptor of input tuples */ - - /* - * Slots for holding the evaluated input arguments. These are set up - * during ExecInitAgg() and then used for each input row requiring - * processing besides what's done in AggState->evalproj. - */ - TupleTableSlot *sortslot; /* current input tuple */ - TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */ - - /* - * These values are working state that is initialized at the start of an - * input tuple group and updated for each input tuple. - * - * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input - * values straight to the transition function. If it's DISTINCT or - * requires ORDER BY, we pass the input values into a Tuplesort object; - * then at completion of the input tuple group, we scan the sorted values, - * eliminate duplicates if needed, and run the transition function on the - * rest. - * - * We need a separate tuplesort for each grouping set. - */ - - Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */ - - /* - * This field is a pre-initialized FunctionCallInfo struct used for - * calling this aggregate's transfn. We save a few cycles per row by not - * re-initializing the unchanging fields; which isn't much, but it seems - * worth the extra space consumption. - */ - FunctionCallInfoData transfn_fcinfo; - - /* Likewise for serialization and deserialization functions */ - FunctionCallInfoData serialfn_fcinfo; - - FunctionCallInfoData deserialfn_fcinfo; -} AggStatePerTransData; - -/* - * AggStatePerAggData - per-aggregate information - * - * This contains the information needed to call the final function, to produce - * a final aggregate result from the state value. If there are multiple - * identical Aggrefs in the query, they can all share the same per-agg data. - * - * These values are set up during ExecInitAgg() and do not change thereafter. - */ -typedef struct AggStatePerAggData -{ - /* - * Link to an Aggref expr this state value is for. - * - * There can be multiple identical Aggref's sharing the same per-agg. This - * points to the first one of them. - */ - Aggref *aggref; - - /* index to the state value which this agg should use */ - int transno; - - /* Optional Oid of final function (may be InvalidOid) */ - Oid finalfn_oid; - - /* - * fmgr lookup data for final function --- only valid when finalfn_oid oid - * is not InvalidOid. - */ - FmgrInfo finalfn; - - /* - * Number of arguments to pass to the finalfn. This is always at least 1 - * (the transition state value) plus any ordered-set direct args. If the - * finalfn wants extra args then we pass nulls corresponding to the - * aggregated input columns. - */ - int numFinalArgs; - - /* - * We need the len and byval info for the agg's result data type in order - * to know how to copy/delete values. - */ - int16 resulttypeLen; - bool resulttypeByVal; - -} AggStatePerAggData; - -/* - * AggStatePerGroupData - per-aggregate-per-group working state - * - * These values are working state that is initialized at the start of - * an input tuple group and updated for each input tuple. - * - * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these - * structs (pointed to by aggstate->pergroup); we re-use the array for - * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the - * hash table contains an array of these structs for each tuple group. - * - * Logically, the sortstate field belongs in this struct, but we do not - * keep it here for space reasons: we don't support DISTINCT aggregates - * in AGG_HASHED mode, so there's no reason to use up a pointer field - * in every entry of the hashtable. - */ -typedef struct AggStatePerGroupData -{ - Datum transValue; /* current transition value */ - bool transValueIsNull; - - bool noTransValue; /* true if transValue not set yet */ - - /* - * Note: noTransValue initially has the same value as transValueIsNull, - * and if true both are cleared to false at the same time. They are not - * the same though: if transfn later returns a NULL, we want to keep that - * NULL and not auto-replace it with a later input value. Only the first - * non-NULL input will be auto-substituted. - */ -} AggStatePerGroupData; - -/* - * AggStatePerPhaseData - per-grouping-set-phase state - * - * Grouping sets are divided into "phases", where a single phase can be - * processed in one pass over the input. If there is more than one phase, then - * at the end of input from the current phase, state is reset and another pass - * taken over the data which has been re-sorted in the mean time. - * - * Accordingly, each phase specifies a list of grouping sets and group clause - * information, plus each phase after the first also has a sort order. - */ -typedef struct AggStatePerPhaseData -{ - AggStrategy aggstrategy; /* strategy for this phase */ - int numsets; /* number of grouping sets (or 0) */ - int *gset_lengths; /* lengths of grouping sets */ - Bitmapset **grouped_cols; /* column groupings for rollup */ - FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ - Agg *aggnode; /* Agg node for phase data */ - Sort *sortnode; /* Sort node for input ordering for phase */ -} AggStatePerPhaseData; - -/* - * AggStatePerHashData - per-hashtable state - * - * When doing grouping sets with hashing, we have one of these for each - * grouping set. (When doing hashing without grouping sets, we have just one of - * them.) - */ -typedef struct AggStatePerHashData -{ - TupleHashTable hashtable; /* hash table with one entry per group */ - TupleHashIterator hashiter; /* for iterating through hash table */ - TupleTableSlot *hashslot; /* slot for loading hash table */ - FmgrInfo *hashfunctions; /* per-grouping-field hash fns */ - FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ - int numCols; /* number of hash key columns */ - int numhashGrpCols; /* number of columns in hash table */ - int largestGrpColIdx; /* largest col required for hashing */ - AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */ - AttrNumber *hashGrpColIdxHash; /* indices in hashtbl tuples */ - Agg *aggnode; /* original Agg node, for numGroups etc. */ -} AggStatePerHashData; - - static void select_current_set(AggState *aggstate, int setno, bool is_hash); static void initialize_phase(AggState *aggstate, int newphase); static TupleTableSlot *fetch_input_tuple(AggState *aggstate); @@ -578,21 +289,6 @@ static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, List *transnos); -/* - * Select the current grouping set; affects current_set and - * curaggcontext. - */ -static void -select_current_set(AggState *aggstate, int setno, bool is_hash) -{ - if (is_hash) - aggstate->curaggcontext = aggstate->hashcontext; - else - aggstate->curaggcontext = aggstate->aggcontexts[setno]; - - aggstate->current_set = setno; -} - /* * Switch to phase "newphase", which must either be 0 or 1 (to reset) or * current_phase + 1. Juggle the tuplesorts accordingly. @@ -954,137 +650,12 @@ advance_transition_function(AggState *aggstate, static void advance_aggregates(AggState *aggstate, AggStatePerGroup *sort_pergroups, AggStatePerGroup *hash_pergroups) { - int transno; - int setno = 0; - int numGroupingSets = Max(aggstate->phase->numsets, 1); - int numHashes = aggstate->num_hashes; - int numTrans = aggstate->numtrans; - TupleTableSlot *slot = aggstate->evalslot; - Datum *values = slot->tts_values; - bool *nulls = slot->tts_isnull; - AggStatePerTrans pertrans; + bool isnull; - /* compute input for all aggregates */ - if (aggstate->evalproj) - aggstate->evalslot = ExecProject(aggstate->evalproj); - - for (transno = 0, pertrans = &aggstate->pertrans[0]; - transno < numTrans; transno++, pertrans++) - { - ExprState *filter = pertrans->aggfilter; - int numTransInputs = pertrans->numTransInputs; - int i; - int inputoff = pertrans->inputoff; - - /* Skip anything FILTERed out */ - if (filter) - { - Datum res; - bool isnull; - - res = ExecEvalExprSwitchContext(filter, aggstate->tmpcontext, - &isnull); - if (isnull || !DatumGetBool(res)) - continue; - } - - if (pertrans->numSortCols > 0) - { - /* DISTINCT and/or ORDER BY case */ - Assert(slot->tts_nvalid >= (pertrans->numInputs + inputoff)); - Assert(!hash_pergroups); - - /* - * If the transfn is strict, we want to check for nullity before - * storing the row in the sorter, to save space if there are a lot - * of nulls. Note that we must only check numTransInputs columns, - * not numInputs, since nullity in columns used only for sorting - * is not relevant here. - */ - if (pertrans->transfn.fn_strict) - { - for (i = 0; i < numTransInputs; i++) - { - if (slot->tts_isnull[i + inputoff]) - break; - } - if (i < numTransInputs) - continue; - } - - for (setno = 0; setno < numGroupingSets; setno++) - { - /* OK, put the tuple into the tuplesort object */ - if (pertrans->numInputs == 1) - tuplesort_putdatum(pertrans->sortstates[setno], - values[inputoff], nulls[inputoff]); - else - { - /* - * Copy slot contents, starting from inputoff, into sort - * slot. - */ - ExecClearTuple(pertrans->sortslot); - memcpy(pertrans->sortslot->tts_values, - &values[inputoff], - pertrans->numInputs * sizeof(Datum)); - memcpy(pertrans->sortslot->tts_isnull, - &nulls[inputoff], - pertrans->numInputs * sizeof(bool)); - pertrans->sortslot->tts_nvalid = pertrans->numInputs; - ExecStoreVirtualTuple(pertrans->sortslot); - tuplesort_puttupleslot(pertrans->sortstates[setno], pertrans->sortslot); - } - } - } - else - { - /* We can apply the transition function immediately */ - FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; - - /* Load values into fcinfo */ - /* Start from 1, since the 0th arg will be the transition value */ - Assert(slot->tts_nvalid >= (numTransInputs + inputoff)); - - for (i = 0; i < numTransInputs; i++) - { - fcinfo->arg[i + 1] = values[i + inputoff]; - fcinfo->argnull[i + 1] = nulls[i + inputoff]; - } - - if (sort_pergroups) - { - /* advance transition states for ordered grouping */ - - for (setno = 0; setno < numGroupingSets; setno++) - { - AggStatePerGroup pergroupstate; - - select_current_set(aggstate, setno, false); - - pergroupstate = &sort_pergroups[setno][transno]; - - advance_transition_function(aggstate, pertrans, pergroupstate); - } - } - - if (hash_pergroups) - { - /* advance transition states for hashed grouping */ - - for (setno = 0; setno < numHashes; setno++) - { - AggStatePerGroup pergroupstate; - - select_current_set(aggstate, setno, true); - - pergroupstate = &hash_pergroups[setno][transno]; - - advance_transition_function(aggstate, pertrans, pergroupstate); - } - } - } - } + ExecEvalExprSwitchContext(aggstate->phase->evaltrans, + aggstate->tmpcontext, + &isnull); + return; } /* @@ -2663,6 +2234,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) AggStatePerAgg peraggs; AggStatePerTrans pertransstates; AggStatePerTrans pertrans; + AggStatePerGroup *pergroups; + ExprContext **contexts; Plan *outerPlan; ExprContext *econtext; int numaggs, @@ -2985,6 +2558,30 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->peragg = peraggs; aggstate->pertrans = pertransstates; + + aggstate->all_pergroups = + (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup) + * (numGroupingSets + numHashes)); + aggstate->all_contexts = + (ExprContext **) palloc0(sizeof(ExprContext *) + * (numGroupingSets + numHashes)); + pergroups = aggstate->all_pergroups; + contexts = aggstate->all_contexts; + + if (node->aggstrategy != AGG_HASHED) + { + for (i = 0; i < numGroupingSets; i++) + { + pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) + * numaggs); + contexts[i] = aggstate->aggcontexts[i]; + } + + aggstate->pergroups = pergroups; + pergroups += numGroupingSets; + contexts += numGroupingSets; + } + /* * Hashing can only appear in the initial phase. */ @@ -3001,28 +2598,13 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } /* this is an array of pointers, not structures */ - aggstate->hash_pergroup = palloc0(sizeof(AggStatePerGroup) * numHashes); + aggstate->hash_pergroup = pergroups; find_hash_columns(aggstate); build_hash_table(aggstate); aggstate->table_filled = false; } - if (node->aggstrategy != AGG_HASHED) - { - AggStatePerGroup *pergroups = - (AggStatePerGroup*) palloc0(sizeof(AggStatePerGroup) - * numGroupingSets); - - for (i = 0; i < numGroupingSets; i++) - { - pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) - * numaggs); - } - - aggstate->pergroups = pergroups; - } - /* * Initialize current phase-dependent values to initial phase. The initial * phase is 1 (first sort pass) for all strategies that use sorting (if @@ -3388,6 +2970,58 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) NULL); ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc); + /* + * Build expressions doing all the transition stuff at once. We build a + * different one for each phase, as the number of transition function + * invocation changes. + */ + for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) + { + AggStatePerPhase phase = &aggstate->phases[phaseidx]; + bool dohash, dosort; + + if (!phase->aggnode) + continue; + + if (aggstate->aggstrategy == AGG_MIXED && + phaseidx == 1) + { + dohash = true; + dosort = true; + } + else if (aggstate->aggstrategy == AGG_MIXED && + phaseidx == 0) + { + dohash = true; + dosort = false; + } + else if (phase->aggstrategy == AGG_PLAIN || + phase->aggstrategy == AGG_SORTED) + { + dohash = false; + dosort = true; + } + else if (phase->aggstrategy == AGG_HASHED) + { + dohash = true; + dosort = false; + } + else if (phase->aggstrategy == AGG_MIXED) + { + dohash = true; + dosort = true; + } + else + { + elog(ERROR, "frak"); + } + + phase->evaltrans = ExecInitAggTrans(aggstate, phase, + &aggstate->ss.ps, + dosort, dohash); + + } + return aggstate; } diff --git a/src/backend/lib/llvmjit.c b/src/backend/lib/llvmjit.c index e05fe2dd72..57d0663410 100644 --- a/src/backend/lib/llvmjit.c +++ b/src/backend/lib/llvmjit.c @@ -63,6 +63,7 @@ LLVMTypeRef StructFmgrInfo; LLVMTypeRef StructFunctionCallInfoData; LLVMTypeRef StructExprState; LLVMTypeRef StructExprContext; +LLVMTypeRef StructAggStatePerGroupData; static LLVMTargetRef llvm_targetref; @@ -381,6 +382,18 @@ llvm_create_types(void) params[0] = LLVMPointerType(StructFunctionCallInfoData, 0); TypePGFunction = LLVMFunctionType(TypeSizeT, params, lengthof(params), 0); } + + { + LLVMTypeRef members[3]; + + members[0] = TypeSizeT; + members[1] = LLVMInt8Type(); + members[2] = LLVMInt8Type(); + + StructAggStatePerGroupData = LLVMStructCreateNamed(LLVMGetGlobalContext(), + "struct.AggStatePerGroupData"); + LLVMStructSetBody(StructAggStatePerGroupData, members, lengthof(members), false); + } } static uint64_t diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index 3919ac5598..661c18fcbe 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -15,6 +15,7 @@ #define EXEC_EXPR_H #include "nodes/execnodes.h" +#include "executor/nodeAgg.h" /* forward reference to avoid circularity */ struct ArrayRefState; @@ -207,6 +208,14 @@ typedef enum ExprEvalOp EEOP_SUBPLAN, EEOP_ALTERNATIVE_SUBPLAN, + EEOP_AGG_FILTER, + EEOP_AGG_STRICT_INPUT_CHECK, + EEOP_AGG_INIT_TRANS, + EEOP_AGG_STRICT_TRANS_CHECK, + EEOP_AGG_PLAIN_TRANS, + EEOP_AGG_ORDERED_TRANS_DATUM, + EEOP_AGG_ORDERED_TRANS_TUPLE, + /* non-existent operation, used e.g. to check array lengths */ EEOP_LAST } ExprEvalOp; @@ -555,6 +564,58 @@ typedef struct ExprEvalStep /* out-of-line state, created by nodeSubplan.c */ AlternativeSubPlanState *asstate; } alternative_subplan; + + struct + { + int jumpfalse; + } agg_filter; + + struct + { + bool *nulls; + int nargs; + int jumpnull; + } agg_strict_input_check; + + struct + { + AggState *aggstate; + AggStatePerTrans pertrans; + ExprContext *aggcontext; + int setno; + int transno; + int setoff; + int jumpnull; + } agg_init_trans; + + struct + { + AggState *aggstate; + int setno; + int transno; + int setoff; + int jumpnull; + } agg_strict_trans_check; + + struct + { + AggState *aggstate; + AggStatePerTrans pertrans; + ExprContext *aggcontext; + int setno; + int transno; + int setoff; + } agg_plain_trans; + + struct + { + AggState *aggstate; + AggStatePerTrans pertrans; + ExprContext *aggcontext; + int setno; + int transno; + int setoff; + } agg_ordered_trans; } d; } ExprEvalStep; @@ -648,4 +709,12 @@ extern void ExecEvalAlternativeSubPlan(ExprState *state, ExprEvalStep *op, extern void ExecEvalWholeRowVar(ExprState *state, ExprEvalStep *op, ExprContext *econtext); +extern void ExecEvalAggOrderedTransDatum(ExprState *state, ExprEvalStep *op, + ExprContext *econtext); +extern void ExecEvalAggOrderedTransTuple(ExprState *state, ExprEvalStep *op, + ExprContext *econtext); + + +extern void ExecAggInitGroup(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup); + #endif /* EXEC_EXPR_H */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index ab2df96ca0..af2dfcc287 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -264,6 +264,8 @@ extern ExprState *ExecInitExpr(Expr *node, PlanState *parent); extern ExprState *ExecInitQual(List *qual, PlanState *parent); extern ExprState *ExecInitCheck(List *qual, PlanState *parent); extern List *ExecInitExprList(List *nodes, PlanState *parent); +extern ExprState *ExecInitAggTrans(AggState *aggstate, struct AggStatePerPhaseData *phase, + PlanState *parent, bool doSort, bool doHash); extern ProjectionInfo *ExecBuildProjectionInfo(List *targetList, ExprContext *econtext, TupleTableSlot *slot, diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index eff5af9c2a..3932bd8270 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -16,6 +16,297 @@ #include "nodes/execnodes.h" + +/* + * AggStatePerTransData - per aggregate state value information + * + * Working state for updating the aggregate's state value, by calling the + * transition function with an input row. This struct does not store the + * information needed to produce the final aggregate result from the transition + * state, that's stored in AggStatePerAggData instead. This separation allows + * multiple aggregate results to be produced from a single state value. + */ +typedef struct AggStatePerTransData +{ + /* + * These values are set up during ExecInitAgg() and do not change + * thereafter: + */ + + /* + * Link to an Aggref expr this state value is for. + * + * There can be multiple Aggref's sharing the same state value, as long as + * the inputs and transition function are identical. This points to the + * first one of them. + */ + Aggref *aggref; + + /* + * Nominal number of arguments for aggregate function. For plain aggs, + * this excludes any ORDER BY expressions. For ordered-set aggs, this + * counts both the direct and aggregated (ORDER BY) arguments. + */ + int numArguments; + + /* + * Number of aggregated input columns. This includes ORDER BY expressions + * in both the plain-agg and ordered-set cases. Ordered-set direct args + * are not counted, though. + */ + int numInputs; + + /* offset of input columns in AggState->evalslot */ + int inputoff; + + /* + * Number of aggregated input columns to pass to the transfn. This + * includes the ORDER BY columns for ordered-set aggs, but not for plain + * aggs. (This doesn't count the transition state value!) + */ + int numTransInputs; + + /* Oid of the state transition or combine function */ + Oid transfn_oid; + + /* Oid of the serialization function or InvalidOid */ + Oid serialfn_oid; + + /* Oid of the deserialization function or InvalidOid */ + Oid deserialfn_oid; + + /* Oid of state value's datatype */ + Oid aggtranstype; + + /* ExprStates of the FILTER and argument expressions. */ + ExprState *aggfilter; /* state of FILTER expression, if any */ + List *aggdirectargs; /* states of direct-argument expressions */ + + /* + * fmgr lookup data for transition function or combine function. Note in + * particular that the fn_strict flag is kept here. + */ + FmgrInfo transfn; + + /* fmgr lookup data for serialization function */ + FmgrInfo serialfn; + + /* fmgr lookup data for deserialization function */ + FmgrInfo deserialfn; + + /* Input collation derived for aggregate */ + Oid aggCollation; + + /* number of sorting columns */ + int numSortCols; + + /* number of sorting columns to consider in DISTINCT comparisons */ + /* (this is either zero or the same as numSortCols) */ + int numDistinctCols; + + /* deconstructed sorting information (arrays of length numSortCols) */ + AttrNumber *sortColIdx; + Oid *sortOperators; + Oid *sortCollations; + bool *sortNullsFirst; + + /* + * fmgr lookup data for input columns' equality operators --- only + * set/used when aggregate has DISTINCT flag. Note that these are in + * order of sort column index, not parameter index. + */ + FmgrInfo *equalfns; /* array of length numDistinctCols */ + + /* + * initial value from pg_aggregate entry + */ + Datum initValue; + bool initValueIsNull; + + /* + * We need the len and byval info for the agg's input and transition data + * types in order to know how to copy/delete values. + * + * Note that the info for the input type is used only when handling + * DISTINCT aggs with just one argument, so there is only one input type. + */ + int16 inputtypeLen, + transtypeLen; + bool inputtypeByVal, + transtypeByVal; + + /* + * Stuff for evaluation of aggregate inputs in cases where the aggregate + * requires sorted input. The arguments themselves will be evaluated via + * AggState->evalslot/evalproj for all aggregates at once, but we only + * want to sort the relevant columns for individual aggregates. + */ + TupleDesc sortdesc; /* descriptor of input tuples */ + + /* + * Slots for holding the evaluated input arguments. These are set up + * during ExecInitAgg() and then used for each input row requiring + * processing besides what's done in AggState->evalproj. + */ + TupleTableSlot *sortslot; /* current input tuple */ + TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */ + + /* + * These values are working state that is initialized at the start of an + * input tuple group and updated for each input tuple. + * + * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input + * values straight to the transition function. If it's DISTINCT or + * requires ORDER BY, we pass the input values into a Tuplesort object; + * then at completion of the input tuple group, we scan the sorted values, + * eliminate duplicates if needed, and run the transition function on the + * rest. + * + * We need a separate tuplesort for each grouping set. + */ + + Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */ + + /* + * This field is a pre-initialized FunctionCallInfo struct used for + * calling this aggregate's transfn. We save a few cycles per row by not + * re-initializing the unchanging fields; which isn't much, but it seems + * worth the extra space consumption. + */ + FunctionCallInfoData transfn_fcinfo; + + /* Likewise for serialization and deserialization functions */ + FunctionCallInfoData serialfn_fcinfo; + + FunctionCallInfoData deserialfn_fcinfo; +} AggStatePerTransData; + +/* + * AggStatePerAggData - per-aggregate information + * + * This contains the information needed to call the final function, to produce + * a final aggregate result from the state value. If there are multiple + * identical Aggrefs in the query, they can all share the same per-agg data. + * + * These values are set up during ExecInitAgg() and do not change thereafter. + */ +typedef struct AggStatePerAggData +{ + /* + * Link to an Aggref expr this state value is for. + * + * There can be multiple identical Aggref's sharing the same per-agg. This + * points to the first one of them. + */ + Aggref *aggref; + + /* index to the state value which this agg should use */ + int transno; + + /* Optional Oid of final function (may be InvalidOid) */ + Oid finalfn_oid; + + /* + * fmgr lookup data for final function --- only valid when finalfn_oid oid + * is not InvalidOid. + */ + FmgrInfo finalfn; + + /* + * Number of arguments to pass to the finalfn. This is always at least 1 + * (the transition state value) plus any ordered-set direct args. If the + * finalfn wants extra args then we pass nulls corresponding to the + * aggregated input columns. + */ + int numFinalArgs; + + /* + * We need the len and byval info for the agg's result data type in order + * to know how to copy/delete values. + */ + int16 resulttypeLen; + bool resulttypeByVal; + +} AggStatePerAggData; + +/* + * AggStatePerGroupData - per-aggregate-per-group working state + * + * These values are working state that is initialized at the start of + * an input tuple group and updated for each input tuple. + * + * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these + * structs (pointed to by aggstate->pergroup); we re-use the array for + * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the + * hash table contains an array of these structs for each tuple group. + * + * Logically, the sortstate field belongs in this struct, but we do not + * keep it here for space reasons: we don't support DISTINCT aggregates + * in AGG_HASHED mode, so there's no reason to use up a pointer field + * in every entry of the hashtable. + */ +typedef struct AggStatePerGroupData +{ + Datum transValue; /* current transition value */ + bool transValueIsNull; + + bool noTransValue; /* true if transValue not set yet */ + + /* + * Note: noTransValue initially has the same value as transValueIsNull, + * and if true both are cleared to false at the same time. They are not + * the same though: if transfn later returns a NULL, we want to keep that + * NULL and not auto-replace it with a later input value. Only the first + * non-NULL input will be auto-substituted. + */ +} AggStatePerGroupData; + +/* + * AggStatePerPhaseData - per-grouping-set-phase state + * + * Grouping sets are divided into "phases", where a single phase can be + * processed in one pass over the input. If there is more than one phase, then + * at the end of input from the current phase, state is reset and another pass + * taken over the data which has been re-sorted in the mean time. + * + * Accordingly, each phase specifies a list of grouping sets and group clause + * information, plus each phase after the first also has a sort order. + */ +typedef struct AggStatePerPhaseData +{ + AggStrategy aggstrategy; /* strategy for this phase */ + int numsets; /* number of grouping sets (or 0) */ + int *gset_lengths; /* lengths of grouping sets */ + Bitmapset **grouped_cols; /* column groupings for rollup */ + FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ + Agg *aggnode; /* Agg node for phase data */ + Sort *sortnode; /* Sort node for input ordering for phase */ + + ExprState *evaltrans; /* evaluation of transition functions */ +} AggStatePerPhaseData; + +/* + * AggStatePerHashData - per-hashtable state + * + * When doing grouping sets with hashing, we have one of these for each + * grouping set. (When doing hashing without grouping sets, we have just one of + * them.) + */ +typedef struct AggStatePerHashData +{ + TupleHashTable hashtable; /* hash table with one entry per group */ + TupleHashIterator hashiter; /* for iterating through hash table */ + TupleTableSlot *hashslot; /* slot for loading hash table */ + FmgrInfo *hashfunctions; /* per-grouping-field hash fns */ + FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ + int numCols; /* number of hash key columns */ + int numhashGrpCols; /* number of columns in hash table */ + int largestGrpColIdx; /* largest col required for hashing */ + AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */ + AttrNumber *hashGrpColIdxHash; /* indices in hashtbl tuples */ + Agg *aggnode; /* original Agg node, for numGroups etc. */ +} AggStatePerHashData; + extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags); extern void ExecEndAgg(AggState *node); extern void ExecReScanAgg(AggState *node); @@ -24,4 +315,21 @@ extern Size hash_agg_entry_size(int numAggs); extern Datum aggregate_dummy(PG_FUNCTION_ARGS); +/* + * Select the current grouping set; affects current_set and + * curaggcontext. + */ +static inline void +select_current_set(AggState *aggstate, int setno, bool is_hash) +{ + /* when changing this, also adapt ExecInterpExpr() and friends */ + if (is_hash) + aggstate->curaggcontext = aggstate->hashcontext; + else + aggstate->curaggcontext = aggstate->aggcontexts[setno]; + + aggstate->current_set = setno; +} + + #endif /* NODEAGG_H */ diff --git a/src/include/lib/llvmjit.h b/src/include/lib/llvmjit.h index 61d7c67d6f..47f9b6d64c 100644 --- a/src/include/lib/llvmjit.h +++ b/src/include/lib/llvmjit.h @@ -59,6 +59,7 @@ extern LLVMTypeRef StructFmgrInfo; extern LLVMTypeRef StructFunctionCallInfoData; extern LLVMTypeRef StructExprState; extern LLVMTypeRef StructExprContext; +extern LLVMTypeRef StructAggStatePerGroupData; extern void llvm_initialize(void); extern void llvm_dispose_module(LLVMModuleRef mod, const char *funcname); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index b0c4856392..68352b431c 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1835,6 +1835,11 @@ typedef struct AggState AggStatePerHash perhash; AggStatePerGroup *hash_pergroup; /* grouping set indexed array of * per-group pointers */ + + AggStatePerGroup *all_pergroups; + ExprContext **all_contexts; + + /* support for evaluation of agg inputs */ TupleTableSlot *evalslot; /* slot for agg inputs */ ProjectionInfo *evalproj; /* projection machinery */ -- 2.14.1.2.g4274c698f4.dirty