diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 2f59af25a6f..2ca969ffa92 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -13358,7 +13358,7 @@ NULL baz(3 rows)
array of the argument type
- No
+ Yes
input values, including nulls, concatenated into an array
@@ -13372,7 +13372,7 @@ NULL baz(3 rows)
same as argument data type
- No
+ Yes
input arrays concatenated into array of one higher dimension
(inputs must all have same dimensionality,
and cannot be empty or NULL)
@@ -13633,7 +13633,7 @@ NULL baz(3 rows)
same as argument types
- No
+ Yes
input values concatenated into a string, separated by delimiter
diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c
index cb7a6b6d010..ae698482899 100644
--- a/src/backend/utils/adt/array_userfuncs.c
+++ b/src/backend/utils/adt/array_userfuncs.c
@@ -13,12 +13,33 @@
#include "postgres.h"
#include "catalog/pg_type.h"
+#include "libpq/pqformat.h"
#include "common/int.h"
#include "utils/array.h"
+#include "utils/datum.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/typcache.h"
+/*
+ * DeserialIOData
+ * Used for caching type data in array deserial function
+ */
+typedef struct DeserialIOData
+{
+ Oid typreceive;
+ Oid typioparam;
+} DeserialIOData;
+
+/*
+ * SerialIOData
+ * Used for caching type data in array serial function
+ */
+typedef struct SerialIOData
+{
+ Oid typsend;
+ bool typisvarlena;
+} SerialIOData;
static Datum array_position_common(FunctionCallInfo fcinfo);
@@ -498,6 +519,267 @@ array_agg_transfn(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
+Datum
+array_agg_combine(PG_FUNCTION_ARGS)
+{
+ ArrayBuildState *state1;
+ ArrayBuildState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+ int i;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* Manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = initArrayResultWithSize(state2->element_type, agg_context,
+ false, state2->alen);
+
+ state1->nelems = state2->nelems;
+
+ for (i = 0; i < state2->nelems; i++)
+ state1->dvalues[i] = datumCopy(state2->dvalues[i],
+ state1->typbyval, state1->typlen);
+
+ memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * state2->nelems);
+
+ state1->nelems = state2->nelems;
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ /* We only need to combine the two states if state2 has any elements */
+ else if (state2->nelems > 0)
+ {
+ int reqsize = state1->nelems + state2->nelems;
+ MemoryContext oldContext = MemoryContextSwitchTo(state1->mcontext);
+
+ /*
+ * If there's not enough space in state1 then we'll need to reallocate
+ * more.
+ */
+ if (state1->alen < reqsize)
+ {
+ /* Use a power of 2 size rather than allocating just reqsize */
+ while (state1->alen < reqsize)
+ state1->alen *= 2;
+
+ state1->dvalues = (Datum *) repalloc(state1->dvalues, state1->alen * sizeof(Datum));
+ state1->dnulls = (bool *) repalloc(state1->dnulls, state1->alen * sizeof(bool));
+ }
+
+ /* Copy in the state2 elements to the end of the state1 arrays */
+ for (i = 0; i < state2->nelems; i++)
+ state1->dvalues[i + state1->nelems] = datumCopy(state2->dvalues[i],
+ state1->typbyval, state1->typlen);
+
+ memcpy(&state1->dnulls[state1->nelems], state2->dnulls, sizeof(bool) * state2->nelems);
+
+ state1->nelems = reqsize;
+
+ MemoryContextSwitchTo(oldContext);
+ }
+
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * array_agg_serialize
+ * Serialize ArrayBuildState into bytea.
+ */
+Datum
+array_agg_serialize(PG_FUNCTION_ARGS)
+{
+ ArrayBuildState *state;
+ StringInfoData buf;
+ bytea *result;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ state = (ArrayBuildState *) PG_GETARG_POINTER(0);
+
+ pq_begintypsend(&buf);
+
+ /*
+ * element_type. Putting this first is more convenient in deserialization
+ */
+ pq_sendint32(&buf, state->element_type);
+
+ /*
+ * nelems -- send first so we know how large to make the dvalues and
+ * dnulls array during deserialization.
+ */
+ pq_sendint64(&buf, state->nelems);
+
+ /* alen can be decided during deserialization */
+
+ /* typlen */
+ pq_sendint16(&buf, state->typlen);
+
+ /* typbyval */
+ pq_sendbyte(&buf, state->typbyval);
+
+ /* typalign */
+ pq_sendbyte(&buf, state->typalign);
+
+
+ /*
+ * dvalues -- this is very simple when the value type is byval, we can
+ * simply just send the Datums over, however, for non-byval types we must
+ * work a little harder.
+ */
+ if (state->typbyval)
+ pq_sendbytes(&buf, (char *) state->dvalues, sizeof(Datum) * state->nelems);
+ else
+ {
+ SerialIOData *iodata;
+ bytea *outputbytes;
+ int i;
+
+ /*
+ * To avoid having to constantly make calls to getTypeBinaryOutputInfo
+ * we'll cache that information in fn_extra for the next call. Let's
+ * see if we've done that already...
+ */
+ if (fcinfo->flinfo->fn_extra)
+ iodata = (SerialIOData *) fcinfo->flinfo->fn_extra;
+ else
+ {
+ fcinfo->flinfo->fn_extra = iodata =
+ MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+ sizeof(SerialIOData));
+ getTypeBinaryOutputInfo(state->element_type, &iodata->typsend,
+ &iodata->typisvarlena);
+ }
+
+ for (i = 0; i < state->nelems; i++)
+ {
+ outputbytes = OidSendFunctionCall(iodata->typsend, state->dvalues[i]);
+ pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
+ pq_sendbytes(&buf, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
+ }
+ }
+
+ /* dnulls */
+ pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * state->nelems);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+array_agg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate;
+ ArrayBuildState *result;
+ StringInfoData buf;
+ Oid element_type;
+ int64 nelems;
+ const char *temp;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ sstate = PG_GETARG_BYTEA_PP(0);
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard recv-function infrastructure.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf,
+ VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+ /* element_type */
+ element_type = pq_getmsgint(&buf, 4);
+
+ /* nelems */
+ nelems = pq_getmsgint64(&buf);
+
+ result = initArrayResultWithSize(element_type, CurrentMemoryContext,
+ false, nelems);
+
+ result->nelems = nelems;
+
+ /* typlen */
+ result->typlen = pq_getmsgint(&buf, 2);
+
+ /* typbyval */
+ result->typbyval = pq_getmsgbyte(&buf);
+
+ /* typalign */
+ result->typalign = pq_getmsgbyte(&buf);
+
+ /*
+ * dvalues -- this is very simple when the value type is byval, we can
+ * simply just get all the Datums at once, however, for non-byval types we
+ * must work a little harder.
+ */
+ if (result->typbyval)
+ {
+ temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems);
+ memcpy(result->dvalues, temp, sizeof(Datum) * nelems);
+ }
+ else
+ {
+ DeserialIOData *iodata;
+ int i;
+
+ /*
+ * To avoid having to constantly make calls to getTypeBinaryInputInfo
+ * we'll cache that information in fn_extra for the next call. Let's
+ * see if we've done that already...
+ */
+ if (fcinfo->flinfo->fn_extra)
+ iodata = (DeserialIOData *) fcinfo->flinfo->fn_extra;
+ else
+ {
+ fcinfo->flinfo->fn_extra = iodata =
+ MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+ sizeof(DeserialIOData));
+ getTypeBinaryInputInfo(element_type, &iodata->typreceive,
+ &iodata->typioparam);
+ }
+
+ for (i = 0; i < nelems; i++)
+ {
+ StringInfoData tmp;
+ tmp.cursor = 0;
+ tmp.maxlen = tmp.len = pq_getmsgint(&buf, 4);
+ tmp.data = (char *) pq_getmsgbytes(&buf, tmp.len);
+
+ result->dvalues[i] = OidReceiveFunctionCall(iodata->typreceive,
+ &tmp,
+ iodata->typioparam,
+ -1);
+ }
+ }
+
+ /* dnulls */
+ temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems);
+ memcpy(result->dnulls, temp, sizeof(bool) * nelems);
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
Datum
array_agg_finalfn(PG_FUNCTION_ARGS)
{
@@ -577,6 +859,295 @@ array_agg_array_transfn(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
+Datum
+array_agg_array_combine(PG_FUNCTION_ARGS)
+{
+ ArrayBuildStateArr *state1;
+ ArrayBuildStateArr *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* Manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = initArrayResultArr(state2->array_type, InvalidOid,
+ agg_context, false);
+
+ state1->abytes = state2->abytes;
+ state1->data = (char *) palloc(state1->abytes);
+
+ if (state2->nullbitmap)
+ {
+ int size = (state2->aitems + 7) / 8;
+ state1->nullbitmap = (bits8 *) palloc(size);
+ memcpy(state1->nullbitmap, state2->nullbitmap, size);
+ }
+
+ memcpy(state1->data, state2->data, state2->nbytes);
+ state1->nbytes = state2->nbytes;
+ state1->aitems = state2->aitems;
+ state1->nitems = state2->nitems;
+ state1->ndims = state2->ndims;
+ memcpy(state1->dims, state2->dims, sizeof(state2->dims));
+ memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs));
+ state1->array_type = state2->array_type;
+ state1->element_type = state2->element_type;
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ /* We only need to combine the two states if state2 has any items */
+ else if (state2->nitems > 0)
+ {
+ MemoryContext oldContext;
+ int reqsize = state1->nbytes + state2->nbytes;
+ int i;
+
+ /*
+ * Check the states are compatible with each other. Ensure we use the
+ * same error messages that are listed in accumArrayResultArr so that
+ * the same error is shown as would have been if we'd not used the
+ * combine function for the aggregation.
+ */
+ if (state1->ndims != state2->ndims)
+ ereport(ERROR,
+ (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
+ errmsg("cannot accumulate arrays of different dimensionality")));
+
+ /* Check dimensions match ignoring the first dimension. */
+ for (i = 1; i < state1->ndims; i++)
+ {
+ if (state1->dims[i] != state2->dims[i] || state1->lbs[i] != state2->lbs[i])
+ ereport(ERROR,
+ (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
+ errmsg("cannot accumulate arrays of different dimensionality")));
+ }
+
+
+ oldContext = MemoryContextSwitchTo(state1->mcontext);
+
+ /*
+ * If there's not enough space in state1 then we'll need to reallocate
+ * more.
+ */
+ if (state1->abytes < reqsize)
+ {
+ /* use a power of 2 size rather than allocating just reqsize */
+ while (state1->abytes < reqsize)
+ state1->abytes *= 2;
+
+ state1->data = (char *) repalloc(state1->data, state1->abytes);
+ }
+
+ if (state2->nullbitmap)
+ {
+ int newnitems = state1->nitems + state2->nitems;
+
+ if (state1->nullbitmap == NULL)
+ {
+ /*
+ * First input with nulls; we must retrospectively handle any
+ * previous inputs by marking all their items non-null.
+ */
+ state1->aitems = 256;
+ while (state1->aitems <= newnitems)
+ state1->aitems *= 2;
+ state1->nullbitmap = (bits8 *) palloc((state1->aitems + 7) / 8);
+ array_bitmap_copy(state1->nullbitmap, 0,
+ NULL, 0,
+ state1->nitems);
+ }
+ else if (newnitems > state1->aitems)
+ {
+ int newaitems = state1->aitems + state2->aitems;
+
+ while (state1->aitems < newaitems)
+ state1->aitems *= 2;
+
+ state1->nullbitmap = (bits8 *)
+ repalloc(state1->nullbitmap, (state1->aitems + 7) / 8);
+ }
+ array_bitmap_copy(state1->nullbitmap, state1->nitems,
+ state2->nullbitmap, 0,
+ state2->nitems);
+ }
+
+ memcpy(state1->data + state1->nbytes, state2->data, state2->nbytes);
+ state1->nbytes += state2->nbytes;
+ state1->nitems += state2->nitems;
+
+ state1->dims[0] += state2->dims[0];
+ /* remaing dims already match, per test above */
+
+ Assert(state1->array_type == state2->array_type);
+ Assert(state1->element_type = state2->element_type);
+
+ MemoryContextSwitchTo(oldContext);
+ }
+
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * array_agg_array_serialize
+ * Serialize ArrayBuildStateArr into bytea.
+ */
+Datum
+array_agg_array_serialize(PG_FUNCTION_ARGS)
+{
+ ArrayBuildStateArr *state;
+ StringInfoData buf;
+ bytea *result;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0);
+
+ pq_begintypsend(&buf);
+
+ /*
+ * element_type. Putting this first is more convenient in deserialization
+ * so that we can init the new state sooner.
+ */
+ pq_sendint32(&buf, state->element_type);
+
+ /* array_type */
+ pq_sendint32(&buf, state->array_type);
+
+ /* nbytes */
+ pq_sendint32(&buf, state->nbytes);
+
+ /* data */
+ pq_sendbytes(&buf, state->data, state->nbytes);
+
+ /* abytes */
+ pq_sendint32(&buf, state->abytes);
+
+ /* aitems */
+ pq_sendint32(&buf, state->aitems);
+
+ /* nullbitmap */
+ if (state->nullbitmap)
+ {
+ Assert(state->aitems > 0);
+ pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 7) / 8);
+ }
+
+ /* nitems */
+ pq_sendint32(&buf, state->nitems);
+
+ /* ndims */
+ pq_sendint32(&buf, state->ndims);
+
+ /* dims: XXX or should we just send ndim's worth? */
+ pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims));
+
+ /* lbs */
+ pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs));
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+array_agg_array_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate;
+ ArrayBuildStateArr *result;
+ StringInfoData buf;
+ Oid element_type;
+ Oid array_type;
+ int nbytes;
+ const char *temp;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ sstate = PG_GETARG_BYTEA_PP(0);
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard recv-function infrastructure.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf,
+ VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+ /* element_type */
+ element_type = pq_getmsgint(&buf, 4);
+
+ /* array_type */
+ array_type = pq_getmsgint(&buf, 4);
+
+ /* nbytes */
+ nbytes = pq_getmsgint(&buf, 4);
+
+ result = initArrayResultArr(array_type, element_type,
+ CurrentMemoryContext, false);
+
+ result->abytes = 1024;
+ while (result->abytes < nbytes)
+ result->abytes *= 2;
+
+ result->data = (char *) palloc(result->abytes);
+
+ /* data */
+ temp = pq_getmsgbytes(&buf, nbytes);
+ memcpy(result->data, temp, nbytes);
+ result->nbytes = nbytes;
+
+ /* abytes */
+ result->abytes = pq_getmsgint(&buf, 4);
+
+ /* aitems: might be 0 */
+ result->aitems = pq_getmsgint(&buf, 4);
+
+ /* nullbitmap */
+ if (result->aitems > 0)
+ {
+ int size = (result->aitems + 7) / 8;
+ result->nullbitmap = (bits8 *) palloc(size);
+ temp = pq_getmsgbytes(&buf, size);
+ memcpy(result->nullbitmap, temp, size);
+ }
+ else
+ result->nullbitmap = NULL;
+
+ /* nitems */
+ result->nitems = pq_getmsgint(&buf, 4);
+
+ /* ndims */
+ result->ndims = pq_getmsgint(&buf, 4);
+
+ /* dims */
+ temp = pq_getmsgbytes(&buf, sizeof(result->dims));
+ memcpy(result->dims, temp, sizeof(result->dims));
+
+ /* lbs */
+ temp = pq_getmsgbytes(&buf, sizeof(result->lbs));
+ memcpy(result->lbs, temp, sizeof(result->lbs));
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
Datum
array_agg_array_finalfn(PG_FUNCTION_ARGS)
{
diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c
index 0cbdbe5587e..063182326de 100644
--- a/src/backend/utils/adt/arrayfuncs.c
+++ b/src/backend/utils/adt/arrayfuncs.c
@@ -5001,6 +5001,19 @@ array_insert_slice(ArrayType *destArray,
*/
ArrayBuildState *
initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext)
+{
+ return initArrayResultWithSize(element_type, rcontext, subcontext,
+ subcontext ? 64 : 8);
+}
+
+/*
+ * initArrayResultWithSize
+ * As initArrayResult, but allow the initial size of the allocated arrays
+ * to be specified.
+ */
+ArrayBuildState *
+initArrayResultWithSize(Oid element_type, MemoryContext rcontext,
+ bool subcontext, int initsize)
{
ArrayBuildState *astate;
MemoryContext arr_context = rcontext;
@@ -5015,7 +5028,7 @@ initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext)
MemoryContextAlloc(arr_context, sizeof(ArrayBuildState));
astate->mcontext = arr_context;
astate->private_cxt = subcontext;
- astate->alen = (subcontext ? 64 : 8); /* arbitrary starting array size */
+ astate->alen = initsize;
astate->dvalues = (Datum *)
MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum));
astate->dnulls = (bool *)
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 4346410d5a9..e047ba07bbf 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -458,13 +458,30 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS)
{
bytea *value = PG_GETARG_BYTEA_PP(1);
- /* On the first time through, we ignore the delimiter. */
+ /*
+ * It's important that we store the delimiter text for all aggregated
+ * items, even the first one, which at first thought you might think
+ * could just be discarded. The reason for this is that if this
+ * function is being called from a parallel worker, then we'll need
+ * the first delimiter in order to properly combine the partially
+ * aggregated state with the states coming from other workers. In the
+ * final output, the first delimiter will be stripped off of the final
+ * aggregate state, but in order to know where the actual first data
+ * is we must store the position of the first data value somewhere.
+ * Conveniently, StringInfo has a cursor property which can be used
+ * to serve our needs here.
+ */
if (state == NULL)
+ {
state = makeStringAggState(fcinfo);
- else if (!PG_ARGISNULL(2))
+
+ if (!PG_ARGISNULL(2))
+ state->cursor = VARSIZE_ANY_EXHDR(PG_GETARG_BYTEA_PP(2));
+ }
+
+ if (!PG_ARGISNULL(2))
{
bytea *delim = PG_GETARG_BYTEA_PP(2);
-
appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim));
}
@@ -478,6 +495,69 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
+Datum
+bytea_string_agg_serialize(PG_FUNCTION_ARGS)
+{
+ StringInfo state;
+ StringInfoData buf;
+ bytea *result;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ state = (StringInfo) PG_GETARG_POINTER(0);
+
+ pq_begintypsend(&buf);
+
+ /*
+ * cursor -- we must send this position so we can properly skip past the
+ * delimiter for the first aggregated string
+ */
+ pq_sendint(&buf, state->cursor, 4);
+
+ pq_sendbytes(&buf, state->data, state->len);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+bytea_string_agg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate;
+ StringInfo result;
+ StringInfoData buf;
+ char *temp;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ sstate = PG_GETARG_BYTEA_PP(0);
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard recv-function infrastructure.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf,
+ VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+ result = makeStringAggState(fcinfo);
+
+ /* cursor */
+ result->cursor = pq_getmsgint(&buf, 4);
+
+ /* data */
+ temp = (char *) pq_getmsgbytes(&buf, VARSIZE_ANY_EXHDR(sstate) - 4);
+ appendBinaryStringInfo(result, temp, VARSIZE_ANY_EXHDR(sstate) - 4);
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
Datum
bytea_string_agg_finalfn(PG_FUNCTION_ARGS)
{
@@ -492,9 +572,10 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS)
{
bytea *result;
- result = (bytea *) palloc(state->len + VARHDRSZ);
- SET_VARSIZE(result, state->len + VARHDRSZ);
- memcpy(VARDATA(result), state->data, state->len);
+ result = (bytea *) palloc(state->len - state->cursor + VARHDRSZ);
+ SET_VARSIZE(result, state->len - state->cursor + VARHDRSZ);
+ memcpy(VARDATA(result), &state->data[state->cursor],
+ state->len - state->cursor);
PG_RETURN_BYTEA_P(result);
}
else
@@ -4656,10 +4737,28 @@ string_agg_transfn(PG_FUNCTION_ARGS)
/* Append the value unless null. */
if (!PG_ARGISNULL(1))
{
- /* On the first time through, we ignore the delimiter. */
+ /*
+ * It's important that we store the delimiter text for all aggregated
+ * items, even the first one, which at first thought you might think
+ * could just be discarded. The reason for this is that if this
+ * function is being called from a parallel worker, then we'll need
+ * the first delimiter in order to properly combine the partially
+ * aggregated state with the states coming from other workers. In the
+ * final output, the first delimiter will be stripped off of the final
+ * aggregate state, but in order to know where the actual first data
+ * is we must store the position of the first data value somewhere.
+ * Conveniently, StringInfo has a cursor property which can be used
+ * to serve our needs here.
+ */
if (state == NULL)
+ {
state = makeStringAggState(fcinfo);
- else if (!PG_ARGISNULL(2))
+
+ if (!PG_ARGISNULL(2))
+ state->cursor = VARSIZE_ANY_EXHDR(PG_GETARG_TEXT_PP(2));
+ }
+
+ if (!PG_ARGISNULL(2))
appendStringInfoText(state, PG_GETARG_TEXT_PP(2)); /* delimiter */
appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */
@@ -4672,6 +4771,108 @@ string_agg_transfn(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
+/*
+ * string_agg_combine
+ * Aggregate combine function for string_agg(text) and string_agg(bytea)
+ */
+Datum
+string_agg_combine(PG_FUNCTION_ARGS)
+{
+ StringInfo state1;
+ StringInfo state2;
+ MemoryContext agg_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ if (state1 == NULL)
+ {
+ MemoryContext old_context;
+
+ old_context = MemoryContextSwitchTo(agg_context);
+ state1 = makeStringAggState(fcinfo);
+
+ /* The cursor marks the start of the actual data */
+ state1->cursor = state2->cursor;
+
+ appendBinaryStringInfo(state1, state2->data, state2->len);
+
+ MemoryContextSwitchTo(old_context);
+ }
+ else if (state2->len > 0)
+ appendBinaryStringInfo(state1, state2->data, state2->len);
+
+ PG_RETURN_POINTER(state1);
+}
+
+Datum
+string_agg_serialize(PG_FUNCTION_ARGS)
+{
+ StringInfo state;
+ StringInfoData buf;
+ bytea *result;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ state = (StringInfo) PG_GETARG_POINTER(0);
+
+ pq_begintypsend(&buf);
+
+ /*
+ * cursor -- we must send this position so we can properly skip past the
+ * delimiter for the first aggregated string
+ */
+ pq_sendint(&buf, state->cursor, 4);
+
+ pq_sendtext(&buf, state->data, state->len);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+string_agg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate;
+ StringInfo result;
+ StringInfoData buf;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ sstate = PG_GETARG_BYTEA_PP(0);
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard recv-function infrastructure.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf,
+ VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+ result = makeStringAggState(fcinfo);
+
+ /* cursor */
+ result->cursor = pq_getmsgint(&buf, 4);
+
+ /* data: pq_getmsgtext will allocate memory in the required context */
+ result->data = pq_getmsgtext(&buf, VARSIZE_ANY_EXHDR(sstate) - 4,
+ &result->len);
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
Datum
string_agg_finalfn(PG_FUNCTION_ARGS)
{
@@ -4682,8 +4883,13 @@ string_agg_finalfn(PG_FUNCTION_ARGS)
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
+ /*
+ * Return the text removing the first delimiter text as marked by the
+ * cursor.
+ */
if (state != NULL)
- PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len));
+ PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor],
+ state->len - state->cursor));
else
PG_RETURN_NULL();
}
diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h
index 125bb5b479d..73f36a0d982 100644
--- a/src/include/catalog/pg_aggregate.h
+++ b/src/include/catalog/pg_aggregate.h
@@ -300,14 +300,14 @@ DATA(insert ( 2243 n 0 bitor - bitor - - - - - f f r r 0 1560 0 0
DATA(insert ( 2901 n 0 xmlconcat2 - - - - - - - f f r r 0 142 0 0 0 _null_ _null_ ));
/* array */
-DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn - - - - - - t f r r 0 2281 0 0 0 _null_ _null_ ));
-DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn - - - - - - t f r r 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn array_agg_combine array_agg_serialize array_agg_deserialize - - - t f r r 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn array_agg_array_combine array_agg_array_serialize array_agg_array_deserialize - - - t f r r 0 2281 0 0 0 _null_ _null_ ));
/* text */
-DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn string_agg_combine string_agg_serialize string_agg_deserialize - - - f f r r 0 2281 0 0 0 _null_ _null_ ));
/* bytea */
-DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ ));
+DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn string_agg_combine bytea_string_agg_serialize bytea_string_agg_deserialize - - - f f r r 0 2281 0 0 0 _null_ _null_ ));
/* json */
DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ ));
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 0fdb42f6397..6efd38fe452 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -942,12 +942,24 @@ DATA(insert OID = 3168 ( array_replace PGNSP PGUID 12 1 0 0 0 f f f f f i s
DESCR("replace any occurrences of an element in an array");
DATA(insert OID = 2333 ( array_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_transfn _null_ _null_ _null_ ));
DESCR("aggregate transition function");
+DATA(insert OID = 3423 ( array_agg_combine PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_combine _null_ _null_ _null_ ));
+DESCR("aggregate combine function");
+DATA(insert OID = 3424 ( array_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_serialize _null_ _null_ _null_ ));
+DESCR("aggregate serial function");
+DATA(insert OID = 3425 ( array_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_deserialize _null_ _null_ _null_ ));
+DESCR("aggregate deserial function");
DATA(insert OID = 2334 ( array_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2277 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_finalfn _null_ _null_ _null_ ));
DESCR("aggregate final function");
DATA(insert OID = 2335 ( array_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 1 0 2277 "2776" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ ));
DESCR("concatenate aggregate input into an array");
DATA(insert OID = 4051 ( array_agg_array_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_transfn _null_ _null_ _null_ ));
DESCR("aggregate transition function");
+DATA(insert OID = 3426 ( array_agg_array_combine PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_combine _null_ _null_ _null_ ));
+DESCR("aggregate combine function");
+DATA(insert OID = 3427 ( array_agg_array_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_serialize _null_ _null_ _null_ ));
+DESCR("aggregate serial function");
+DATA(insert OID = 3428 ( array_agg_array_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_deserialize _null_ _null_ _null_ ));
+DESCR("aggregate deserial function");
DATA(insert OID = 4052 ( array_agg_array_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2277 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_finalfn _null_ _null_ _null_ ));
DESCR("aggregate final function");
DATA(insert OID = 4053 ( array_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 1 0 2277 "2277" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ ));
@@ -2710,12 +2722,23 @@ DESCR("aggregate final function");
DATA(insert OID = 3535 ( string_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 3 0 2281 "2281 25 25" _null_ _null_ _null_ _null_ _null_ string_agg_transfn _null_ _null_ _null_ ));
DESCR("aggregate transition function");
+DATA(insert OID = 3429 ( string_agg_combine PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ string_agg_combine _null_ _null_ _null_ ));
+DESCR("aggregate combine function");
+DATA(insert OID = 3430 ( string_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_serialize _null_ _null_ _null_ ));
+DESCR("aggregate serial function");
+DATA(insert OID = 3431 ( string_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ string_agg_deserialize _null_ _null_ _null_ ));
+DESCR("aggregate deserial function");
+
DATA(insert OID = 3536 ( string_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 1 0 25 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_finalfn _null_ _null_ _null_ ));
DESCR("aggregate final function");
DATA(insert OID = 3538 ( string_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 2 0 25 "25 25" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ ));
DESCR("concatenate aggregate input into a string");
DATA(insert OID = 3543 ( bytea_string_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 3 0 2281 "2281 17 17" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_transfn _null_ _null_ _null_ ));
DESCR("aggregate transition function");
+DATA(insert OID = 3432 ( bytea_string_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_serialize _null_ _null_ _null_ ));
+DESCR("aggregate serial function");
+DATA(insert OID = 3433 ( bytea_string_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_deserialize _null_ _null_ _null_ ));
+DESCR("aggregate deserial function");
DATA(insert OID = 3544 ( bytea_string_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_finalfn _null_ _null_ _null_ ));
DESCR("aggregate final function");
DATA(insert OID = 3545 ( string_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 2 0 17 "17 17" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ ));
diff --git a/src/include/utils/array.h b/src/include/utils/array.h
index afbb532e9c2..f99e05a2584 100644
--- a/src/include/utils/array.h
+++ b/src/include/utils/array.h
@@ -393,6 +393,9 @@ extern bool array_contains_nulls(ArrayType *array);
extern ArrayBuildState *initArrayResult(Oid element_type,
MemoryContext rcontext, bool subcontext);
+extern ArrayBuildState *initArrayResultWithSize(Oid element_type,
+ MemoryContext rcontext,
+ bool subcontext, int initsize);
extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate,
Datum dvalue, bool disnull,
Oid element_type,