diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 2f59af25a6f..2ca969ffa92 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -13358,7 +13358,7 @@ NULL baz(3 rows) array of the argument type - No + Yes input values, including nulls, concatenated into an array @@ -13372,7 +13372,7 @@ NULL baz(3 rows) same as argument data type - No + Yes input arrays concatenated into array of one higher dimension (inputs must all have same dimensionality, and cannot be empty or NULL) @@ -13633,7 +13633,7 @@ NULL baz(3 rows) same as argument types - No + Yes input values concatenated into a string, separated by delimiter diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c index cb7a6b6d010..ae698482899 100644 --- a/src/backend/utils/adt/array_userfuncs.c +++ b/src/backend/utils/adt/array_userfuncs.c @@ -13,12 +13,33 @@ #include "postgres.h" #include "catalog/pg_type.h" +#include "libpq/pqformat.h" #include "common/int.h" #include "utils/array.h" +#include "utils/datum.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/typcache.h" +/* + * DeserialIOData + * Used for caching type data in array deserial function + */ +typedef struct DeserialIOData +{ + Oid typreceive; + Oid typioparam; +} DeserialIOData; + +/* + * SerialIOData + * Used for caching type data in array serial function + */ +typedef struct SerialIOData +{ + Oid typsend; + bool typisvarlena; +} SerialIOData; static Datum array_position_common(FunctionCallInfo fcinfo); @@ -498,6 +519,267 @@ array_agg_transfn(PG_FUNCTION_ARGS) PG_RETURN_POINTER(state); } +Datum +array_agg_combine(PG_FUNCTION_ARGS) +{ + ArrayBuildState *state1; + ArrayBuildState *state2; + MemoryContext agg_context; + MemoryContext old_context; + int i; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(1); + + if (state2 == NULL) + PG_RETURN_POINTER(state1); + + /* Manually copy all fields from state2 to state1 */ + if (state1 == NULL) + { + old_context = MemoryContextSwitchTo(agg_context); + + state1 = initArrayResultWithSize(state2->element_type, agg_context, + false, state2->alen); + + state1->nelems = state2->nelems; + + for (i = 0; i < state2->nelems; i++) + state1->dvalues[i] = datumCopy(state2->dvalues[i], + state1->typbyval, state1->typlen); + + memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * state2->nelems); + + state1->nelems = state2->nelems; + + MemoryContextSwitchTo(old_context); + + PG_RETURN_POINTER(state1); + } + + /* We only need to combine the two states if state2 has any elements */ + else if (state2->nelems > 0) + { + int reqsize = state1->nelems + state2->nelems; + MemoryContext oldContext = MemoryContextSwitchTo(state1->mcontext); + + /* + * If there's not enough space in state1 then we'll need to reallocate + * more. + */ + if (state1->alen < reqsize) + { + /* Use a power of 2 size rather than allocating just reqsize */ + while (state1->alen < reqsize) + state1->alen *= 2; + + state1->dvalues = (Datum *) repalloc(state1->dvalues, state1->alen * sizeof(Datum)); + state1->dnulls = (bool *) repalloc(state1->dnulls, state1->alen * sizeof(bool)); + } + + /* Copy in the state2 elements to the end of the state1 arrays */ + for (i = 0; i < state2->nelems; i++) + state1->dvalues[i + state1->nelems] = datumCopy(state2->dvalues[i], + state1->typbyval, state1->typlen); + + memcpy(&state1->dnulls[state1->nelems], state2->dnulls, sizeof(bool) * state2->nelems); + + state1->nelems = reqsize; + + MemoryContextSwitchTo(oldContext); + } + + PG_RETURN_POINTER(state1); +} + +/* + * array_agg_serialize + * Serialize ArrayBuildState into bytea. + */ +Datum +array_agg_serialize(PG_FUNCTION_ARGS) +{ + ArrayBuildState *state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (ArrayBuildState *) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * element_type. Putting this first is more convenient in deserialization + */ + pq_sendint32(&buf, state->element_type); + + /* + * nelems -- send first so we know how large to make the dvalues and + * dnulls array during deserialization. + */ + pq_sendint64(&buf, state->nelems); + + /* alen can be decided during deserialization */ + + /* typlen */ + pq_sendint16(&buf, state->typlen); + + /* typbyval */ + pq_sendbyte(&buf, state->typbyval); + + /* typalign */ + pq_sendbyte(&buf, state->typalign); + + + /* + * dvalues -- this is very simple when the value type is byval, we can + * simply just send the Datums over, however, for non-byval types we must + * work a little harder. + */ + if (state->typbyval) + pq_sendbytes(&buf, (char *) state->dvalues, sizeof(Datum) * state->nelems); + else + { + SerialIOData *iodata; + bytea *outputbytes; + int i; + + /* + * To avoid having to constantly make calls to getTypeBinaryOutputInfo + * we'll cache that information in fn_extra for the next call. Let's + * see if we've done that already... + */ + if (fcinfo->flinfo->fn_extra) + iodata = (SerialIOData *) fcinfo->flinfo->fn_extra; + else + { + fcinfo->flinfo->fn_extra = iodata = + MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(SerialIOData)); + getTypeBinaryOutputInfo(state->element_type, &iodata->typsend, + &iodata->typisvarlena); + } + + for (i = 0; i < state->nelems; i++) + { + outputbytes = OidSendFunctionCall(iodata->typsend, state->dvalues[i]); + pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ); + pq_sendbytes(&buf, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + /* dnulls */ + pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * state->nelems); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +array_agg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + ArrayBuildState *result; + StringInfoData buf; + Oid element_type; + int64 nelems; + const char *temp; + + if (!AggCheckCallContext(fcinfo, NULL)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + /* element_type */ + element_type = pq_getmsgint(&buf, 4); + + /* nelems */ + nelems = pq_getmsgint64(&buf); + + result = initArrayResultWithSize(element_type, CurrentMemoryContext, + false, nelems); + + result->nelems = nelems; + + /* typlen */ + result->typlen = pq_getmsgint(&buf, 2); + + /* typbyval */ + result->typbyval = pq_getmsgbyte(&buf); + + /* typalign */ + result->typalign = pq_getmsgbyte(&buf); + + /* + * dvalues -- this is very simple when the value type is byval, we can + * simply just get all the Datums at once, however, for non-byval types we + * must work a little harder. + */ + if (result->typbyval) + { + temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems); + memcpy(result->dvalues, temp, sizeof(Datum) * nelems); + } + else + { + DeserialIOData *iodata; + int i; + + /* + * To avoid having to constantly make calls to getTypeBinaryInputInfo + * we'll cache that information in fn_extra for the next call. Let's + * see if we've done that already... + */ + if (fcinfo->flinfo->fn_extra) + iodata = (DeserialIOData *) fcinfo->flinfo->fn_extra; + else + { + fcinfo->flinfo->fn_extra = iodata = + MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(DeserialIOData)); + getTypeBinaryInputInfo(element_type, &iodata->typreceive, + &iodata->typioparam); + } + + for (i = 0; i < nelems; i++) + { + StringInfoData tmp; + tmp.cursor = 0; + tmp.maxlen = tmp.len = pq_getmsgint(&buf, 4); + tmp.data = (char *) pq_getmsgbytes(&buf, tmp.len); + + result->dvalues[i] = OidReceiveFunctionCall(iodata->typreceive, + &tmp, + iodata->typioparam, + -1); + } + } + + /* dnulls */ + temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems); + memcpy(result->dnulls, temp, sizeof(bool) * nelems); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + Datum array_agg_finalfn(PG_FUNCTION_ARGS) { @@ -577,6 +859,295 @@ array_agg_array_transfn(PG_FUNCTION_ARGS) PG_RETURN_POINTER(state); } +Datum +array_agg_array_combine(PG_FUNCTION_ARGS) +{ + ArrayBuildStateArr *state1; + ArrayBuildStateArr *state2; + MemoryContext agg_context; + MemoryContext old_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(1); + + if (state2 == NULL) + PG_RETURN_POINTER(state1); + + /* Manually copy all fields from state2 to state1 */ + if (state1 == NULL) + { + old_context = MemoryContextSwitchTo(agg_context); + + state1 = initArrayResultArr(state2->array_type, InvalidOid, + agg_context, false); + + state1->abytes = state2->abytes; + state1->data = (char *) palloc(state1->abytes); + + if (state2->nullbitmap) + { + int size = (state2->aitems + 7) / 8; + state1->nullbitmap = (bits8 *) palloc(size); + memcpy(state1->nullbitmap, state2->nullbitmap, size); + } + + memcpy(state1->data, state2->data, state2->nbytes); + state1->nbytes = state2->nbytes; + state1->aitems = state2->aitems; + state1->nitems = state2->nitems; + state1->ndims = state2->ndims; + memcpy(state1->dims, state2->dims, sizeof(state2->dims)); + memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs)); + state1->array_type = state2->array_type; + state1->element_type = state2->element_type; + + MemoryContextSwitchTo(old_context); + + PG_RETURN_POINTER(state1); + } + + /* We only need to combine the two states if state2 has any items */ + else if (state2->nitems > 0) + { + MemoryContext oldContext; + int reqsize = state1->nbytes + state2->nbytes; + int i; + + /* + * Check the states are compatible with each other. Ensure we use the + * same error messages that are listed in accumArrayResultArr so that + * the same error is shown as would have been if we'd not used the + * combine function for the aggregation. + */ + if (state1->ndims != state2->ndims) + ereport(ERROR, + (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), + errmsg("cannot accumulate arrays of different dimensionality"))); + + /* Check dimensions match ignoring the first dimension. */ + for (i = 1; i < state1->ndims; i++) + { + if (state1->dims[i] != state2->dims[i] || state1->lbs[i] != state2->lbs[i]) + ereport(ERROR, + (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), + errmsg("cannot accumulate arrays of different dimensionality"))); + } + + + oldContext = MemoryContextSwitchTo(state1->mcontext); + + /* + * If there's not enough space in state1 then we'll need to reallocate + * more. + */ + if (state1->abytes < reqsize) + { + /* use a power of 2 size rather than allocating just reqsize */ + while (state1->abytes < reqsize) + state1->abytes *= 2; + + state1->data = (char *) repalloc(state1->data, state1->abytes); + } + + if (state2->nullbitmap) + { + int newnitems = state1->nitems + state2->nitems; + + if (state1->nullbitmap == NULL) + { + /* + * First input with nulls; we must retrospectively handle any + * previous inputs by marking all their items non-null. + */ + state1->aitems = 256; + while (state1->aitems <= newnitems) + state1->aitems *= 2; + state1->nullbitmap = (bits8 *) palloc((state1->aitems + 7) / 8); + array_bitmap_copy(state1->nullbitmap, 0, + NULL, 0, + state1->nitems); + } + else if (newnitems > state1->aitems) + { + int newaitems = state1->aitems + state2->aitems; + + while (state1->aitems < newaitems) + state1->aitems *= 2; + + state1->nullbitmap = (bits8 *) + repalloc(state1->nullbitmap, (state1->aitems + 7) / 8); + } + array_bitmap_copy(state1->nullbitmap, state1->nitems, + state2->nullbitmap, 0, + state2->nitems); + } + + memcpy(state1->data + state1->nbytes, state2->data, state2->nbytes); + state1->nbytes += state2->nbytes; + state1->nitems += state2->nitems; + + state1->dims[0] += state2->dims[0]; + /* remaing dims already match, per test above */ + + Assert(state1->array_type == state2->array_type); + Assert(state1->element_type = state2->element_type); + + MemoryContextSwitchTo(oldContext); + } + + PG_RETURN_POINTER(state1); +} + +/* + * array_agg_array_serialize + * Serialize ArrayBuildStateArr into bytea. + */ +Datum +array_agg_array_serialize(PG_FUNCTION_ARGS) +{ + ArrayBuildStateArr *state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * element_type. Putting this first is more convenient in deserialization + * so that we can init the new state sooner. + */ + pq_sendint32(&buf, state->element_type); + + /* array_type */ + pq_sendint32(&buf, state->array_type); + + /* nbytes */ + pq_sendint32(&buf, state->nbytes); + + /* data */ + pq_sendbytes(&buf, state->data, state->nbytes); + + /* abytes */ + pq_sendint32(&buf, state->abytes); + + /* aitems */ + pq_sendint32(&buf, state->aitems); + + /* nullbitmap */ + if (state->nullbitmap) + { + Assert(state->aitems > 0); + pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 7) / 8); + } + + /* nitems */ + pq_sendint32(&buf, state->nitems); + + /* ndims */ + pq_sendint32(&buf, state->ndims); + + /* dims: XXX or should we just send ndim's worth? */ + pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims)); + + /* lbs */ + pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs)); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +array_agg_array_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + ArrayBuildStateArr *result; + StringInfoData buf; + Oid element_type; + Oid array_type; + int nbytes; + const char *temp; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + /* element_type */ + element_type = pq_getmsgint(&buf, 4); + + /* array_type */ + array_type = pq_getmsgint(&buf, 4); + + /* nbytes */ + nbytes = pq_getmsgint(&buf, 4); + + result = initArrayResultArr(array_type, element_type, + CurrentMemoryContext, false); + + result->abytes = 1024; + while (result->abytes < nbytes) + result->abytes *= 2; + + result->data = (char *) palloc(result->abytes); + + /* data */ + temp = pq_getmsgbytes(&buf, nbytes); + memcpy(result->data, temp, nbytes); + result->nbytes = nbytes; + + /* abytes */ + result->abytes = pq_getmsgint(&buf, 4); + + /* aitems: might be 0 */ + result->aitems = pq_getmsgint(&buf, 4); + + /* nullbitmap */ + if (result->aitems > 0) + { + int size = (result->aitems + 7) / 8; + result->nullbitmap = (bits8 *) palloc(size); + temp = pq_getmsgbytes(&buf, size); + memcpy(result->nullbitmap, temp, size); + } + else + result->nullbitmap = NULL; + + /* nitems */ + result->nitems = pq_getmsgint(&buf, 4); + + /* ndims */ + result->ndims = pq_getmsgint(&buf, 4); + + /* dims */ + temp = pq_getmsgbytes(&buf, sizeof(result->dims)); + memcpy(result->dims, temp, sizeof(result->dims)); + + /* lbs */ + temp = pq_getmsgbytes(&buf, sizeof(result->lbs)); + memcpy(result->lbs, temp, sizeof(result->lbs)); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + Datum array_agg_array_finalfn(PG_FUNCTION_ARGS) { diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c index 0cbdbe5587e..063182326de 100644 --- a/src/backend/utils/adt/arrayfuncs.c +++ b/src/backend/utils/adt/arrayfuncs.c @@ -5001,6 +5001,19 @@ array_insert_slice(ArrayType *destArray, */ ArrayBuildState * initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) +{ + return initArrayResultWithSize(element_type, rcontext, subcontext, + subcontext ? 64 : 8); +} + +/* + * initArrayResultWithSize + * As initArrayResult, but allow the initial size of the allocated arrays + * to be specified. + */ +ArrayBuildState * +initArrayResultWithSize(Oid element_type, MemoryContext rcontext, + bool subcontext, int initsize) { ArrayBuildState *astate; MemoryContext arr_context = rcontext; @@ -5015,7 +5028,7 @@ initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) MemoryContextAlloc(arr_context, sizeof(ArrayBuildState)); astate->mcontext = arr_context; astate->private_cxt = subcontext; - astate->alen = (subcontext ? 64 : 8); /* arbitrary starting array size */ + astate->alen = initsize; astate->dvalues = (Datum *) MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum)); astate->dnulls = (bool *) diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c index 4346410d5a9..e047ba07bbf 100644 --- a/src/backend/utils/adt/varlena.c +++ b/src/backend/utils/adt/varlena.c @@ -458,13 +458,30 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS) { bytea *value = PG_GETARG_BYTEA_PP(1); - /* On the first time through, we ignore the delimiter. */ + /* + * It's important that we store the delimiter text for all aggregated + * items, even the first one, which at first thought you might think + * could just be discarded. The reason for this is that if this + * function is being called from a parallel worker, then we'll need + * the first delimiter in order to properly combine the partially + * aggregated state with the states coming from other workers. In the + * final output, the first delimiter will be stripped off of the final + * aggregate state, but in order to know where the actual first data + * is we must store the position of the first data value somewhere. + * Conveniently, StringInfo has a cursor property which can be used + * to serve our needs here. + */ if (state == NULL) + { state = makeStringAggState(fcinfo); - else if (!PG_ARGISNULL(2)) + + if (!PG_ARGISNULL(2)) + state->cursor = VARSIZE_ANY_EXHDR(PG_GETARG_BYTEA_PP(2)); + } + + if (!PG_ARGISNULL(2)) { bytea *delim = PG_GETARG_BYTEA_PP(2); - appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim)); } @@ -478,6 +495,69 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS) PG_RETURN_POINTER(state); } +Datum +bytea_string_agg_serialize(PG_FUNCTION_ARGS) +{ + StringInfo state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (StringInfo) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * cursor -- we must send this position so we can properly skip past the + * delimiter for the first aggregated string + */ + pq_sendint(&buf, state->cursor, 4); + + pq_sendbytes(&buf, state->data, state->len); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +bytea_string_agg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + StringInfo result; + StringInfoData buf; + char *temp; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + result = makeStringAggState(fcinfo); + + /* cursor */ + result->cursor = pq_getmsgint(&buf, 4); + + /* data */ + temp = (char *) pq_getmsgbytes(&buf, VARSIZE_ANY_EXHDR(sstate) - 4); + appendBinaryStringInfo(result, temp, VARSIZE_ANY_EXHDR(sstate) - 4); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + Datum bytea_string_agg_finalfn(PG_FUNCTION_ARGS) { @@ -492,9 +572,10 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS) { bytea *result; - result = (bytea *) palloc(state->len + VARHDRSZ); - SET_VARSIZE(result, state->len + VARHDRSZ); - memcpy(VARDATA(result), state->data, state->len); + result = (bytea *) palloc(state->len - state->cursor + VARHDRSZ); + SET_VARSIZE(result, state->len - state->cursor + VARHDRSZ); + memcpy(VARDATA(result), &state->data[state->cursor], + state->len - state->cursor); PG_RETURN_BYTEA_P(result); } else @@ -4656,10 +4737,28 @@ string_agg_transfn(PG_FUNCTION_ARGS) /* Append the value unless null. */ if (!PG_ARGISNULL(1)) { - /* On the first time through, we ignore the delimiter. */ + /* + * It's important that we store the delimiter text for all aggregated + * items, even the first one, which at first thought you might think + * could just be discarded. The reason for this is that if this + * function is being called from a parallel worker, then we'll need + * the first delimiter in order to properly combine the partially + * aggregated state with the states coming from other workers. In the + * final output, the first delimiter will be stripped off of the final + * aggregate state, but in order to know where the actual first data + * is we must store the position of the first data value somewhere. + * Conveniently, StringInfo has a cursor property which can be used + * to serve our needs here. + */ if (state == NULL) + { state = makeStringAggState(fcinfo); - else if (!PG_ARGISNULL(2)) + + if (!PG_ARGISNULL(2)) + state->cursor = VARSIZE_ANY_EXHDR(PG_GETARG_TEXT_PP(2)); + } + + if (!PG_ARGISNULL(2)) appendStringInfoText(state, PG_GETARG_TEXT_PP(2)); /* delimiter */ appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */ @@ -4672,6 +4771,108 @@ string_agg_transfn(PG_FUNCTION_ARGS) PG_RETURN_POINTER(state); } +/* + * string_agg_combine + * Aggregate combine function for string_agg(text) and string_agg(bytea) + */ +Datum +string_agg_combine(PG_FUNCTION_ARGS) +{ + StringInfo state1; + StringInfo state2; + MemoryContext agg_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1); + + if (state2 == NULL) + PG_RETURN_POINTER(state1); + + if (state1 == NULL) + { + MemoryContext old_context; + + old_context = MemoryContextSwitchTo(agg_context); + state1 = makeStringAggState(fcinfo); + + /* The cursor marks the start of the actual data */ + state1->cursor = state2->cursor; + + appendBinaryStringInfo(state1, state2->data, state2->len); + + MemoryContextSwitchTo(old_context); + } + else if (state2->len > 0) + appendBinaryStringInfo(state1, state2->data, state2->len); + + PG_RETURN_POINTER(state1); +} + +Datum +string_agg_serialize(PG_FUNCTION_ARGS) +{ + StringInfo state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (StringInfo) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * cursor -- we must send this position so we can properly skip past the + * delimiter for the first aggregated string + */ + pq_sendint(&buf, state->cursor, 4); + + pq_sendtext(&buf, state->data, state->len); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +string_agg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + StringInfo result; + StringInfoData buf; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + result = makeStringAggState(fcinfo); + + /* cursor */ + result->cursor = pq_getmsgint(&buf, 4); + + /* data: pq_getmsgtext will allocate memory in the required context */ + result->data = pq_getmsgtext(&buf, VARSIZE_ANY_EXHDR(sstate) - 4, + &result->len); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + Datum string_agg_finalfn(PG_FUNCTION_ARGS) { @@ -4682,8 +4883,13 @@ string_agg_finalfn(PG_FUNCTION_ARGS) state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); + /* + * Return the text removing the first delimiter text as marked by the + * cursor. + */ if (state != NULL) - PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len)); + PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor], + state->len - state->cursor)); else PG_RETURN_NULL(); } diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index 125bb5b479d..73f36a0d982 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -300,14 +300,14 @@ DATA(insert ( 2243 n 0 bitor - bitor - - - - - f f r r 0 1560 0 0 DATA(insert ( 2901 n 0 xmlconcat2 - - - - - - - f f r r 0 142 0 0 0 _null_ _null_ )); /* array */ -DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn - - - - - - t f r r 0 2281 0 0 0 _null_ _null_ )); -DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn - - - - - - t f r r 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn array_agg_combine array_agg_serialize array_agg_deserialize - - - t f r r 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn array_agg_array_combine array_agg_array_serialize array_agg_array_deserialize - - - t f r r 0 2281 0 0 0 _null_ _null_ )); /* text */ -DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn string_agg_combine string_agg_serialize string_agg_deserialize - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* bytea */ -DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn string_agg_combine bytea_string_agg_serialize bytea_string_agg_deserialize - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* json */ DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 0fdb42f6397..6efd38fe452 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -942,12 +942,24 @@ DATA(insert OID = 3168 ( array_replace PGNSP PGUID 12 1 0 0 0 f f f f f i s DESCR("replace any occurrences of an element in an array"); DATA(insert OID = 2333 ( array_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); +DATA(insert OID = 3423 ( array_agg_combine PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_combine _null_ _null_ _null_ )); +DESCR("aggregate combine function"); +DATA(insert OID = 3424 ( array_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_serialize _null_ _null_ _null_ )); +DESCR("aggregate serial function"); +DATA(insert OID = 3425 ( array_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_deserialize _null_ _null_ _null_ )); +DESCR("aggregate deserial function"); DATA(insert OID = 2334 ( array_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2277 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 2335 ( array_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 1 0 2277 "2776" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); DESCR("concatenate aggregate input into an array"); DATA(insert OID = 4051 ( array_agg_array_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); +DATA(insert OID = 3426 ( array_agg_array_combine PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_combine _null_ _null_ _null_ )); +DESCR("aggregate combine function"); +DATA(insert OID = 3427 ( array_agg_array_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_serialize _null_ _null_ _null_ )); +DESCR("aggregate serial function"); +DATA(insert OID = 3428 ( array_agg_array_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_deserialize _null_ _null_ _null_ )); +DESCR("aggregate deserial function"); DATA(insert OID = 4052 ( array_agg_array_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2277 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 4053 ( array_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 1 0 2277 "2277" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); @@ -2710,12 +2722,23 @@ DESCR("aggregate final function"); DATA(insert OID = 3535 ( string_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 3 0 2281 "2281 25 25" _null_ _null_ _null_ _null_ _null_ string_agg_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); +DATA(insert OID = 3429 ( string_agg_combine PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ string_agg_combine _null_ _null_ _null_ )); +DESCR("aggregate combine function"); +DATA(insert OID = 3430 ( string_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_serialize _null_ _null_ _null_ )); +DESCR("aggregate serial function"); +DATA(insert OID = 3431 ( string_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ string_agg_deserialize _null_ _null_ _null_ )); +DESCR("aggregate deserial function"); + DATA(insert OID = 3536 ( string_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 1 0 25 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 3538 ( string_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 2 0 25 "25 25" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); DESCR("concatenate aggregate input into a string"); DATA(insert OID = 3543 ( bytea_string_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 3 0 2281 "2281 17 17" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); +DATA(insert OID = 3432 ( bytea_string_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_serialize _null_ _null_ _null_ )); +DESCR("aggregate serial function"); +DATA(insert OID = 3433 ( bytea_string_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_deserialize _null_ _null_ _null_ )); +DESCR("aggregate deserial function"); DATA(insert OID = 3544 ( bytea_string_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 3545 ( string_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 2 0 17 "17 17" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); diff --git a/src/include/utils/array.h b/src/include/utils/array.h index afbb532e9c2..f99e05a2584 100644 --- a/src/include/utils/array.h +++ b/src/include/utils/array.h @@ -393,6 +393,9 @@ extern bool array_contains_nulls(ArrayType *array); extern ArrayBuildState *initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext); +extern ArrayBuildState *initArrayResultWithSize(Oid element_type, + MemoryContext rcontext, + bool subcontext, int initsize); extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate, Datum dvalue, bool disnull, Oid element_type,