From d3113e2f43b4a99b53ceceaedffd0940805f66ba Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Thu, 2 Jul 2020 12:18:59 -0400 Subject: [PATCH 8/8] Changed binary and changed to format and use existing codes to get rid of one array Dynamically allocate the arrays to avoid allocating max columns sized arrays Fixed numerous issues with checking for validity --- .../libpqwalreceiver/libpqwalreceiver.c | 39 ++++++++----- src/backend/replication/logical/proto.c | 56 +++++++++++-------- src/backend/replication/logical/worker.c | 30 +++++----- src/backend/replication/pgoutput/pgoutput.c | 35 +++++++----- src/include/pg_config_manual.h | 2 +- src/include/replication/logicalproto.h | 8 +-- 6 files changed, 99 insertions(+), 71 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9130d645b6..08313fa2a5 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -425,36 +425,49 @@ libpqrcv_startstreaming(WalReceiverConn *conn, pfree(pubnames_str); if (options->proto.logical.binary) { appendStringInfo(&cmd, ", binary 'true'"); - appendStringInfo(&cmd, ", sizeof_datum %zu", sizeof(Datum)); - appendStringInfo(&cmd, ", sizeof_int %zu", sizeof(int)); - appendStringInfo(&cmd, ", sizeof_long %zu", sizeof(long)); - appendStringInfo(&cmd, ", bigendian %d", + appendStringInfo(&cmd, ", sizeof_datum '%zu'", sizeof(Datum)); + appendStringInfo(&cmd, ", sizeof_int '%zu'", sizeof(int)); + appendStringInfo(&cmd, ", sizeof_long '%zu'", sizeof(long)); + appendStringInfo(&cmd, ", bigendian '%d'", #ifdef WORDS_BIGENDIAN true #else false #endif ); - appendStringInfo(&cmd, ", float4_byval %d", + appendStringInfo(&cmd, ", float4_byval '%d'", +#if PG_VERSION_NUM >= 130000 + true +#else #ifdef USE_FLOAT4_BYVAL - true + true #else - false + false #endif - ); - appendStringInfo(&cmd, ", float8_byval %d", +#endif + ); + appendStringInfo(&cmd, ", float8_byval '%d'", #ifdef USE_FLOAT8_BYVAL - true + true #else - false + false #endif - ); - appendStringInfo(&cmd, ", integer_datetimes %d", + ); + appendStringInfo(&cmd, ", integer_datetimes '%d'", + +/* integer date times are always enabled in version 10 and up */ + +#if PG_VERSION_NUM >= 100000 + true +#else + #ifdef USE_INTEGER_DATETIMES true #else false #endif +#endif + ); } appendStringInfoChar(&cmd, ')'); diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index d4c6283a17..73148f39f3 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -521,10 +521,12 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) /* Get number of attributes */ natts = pq_getmsgint(in, 2); - memset(tuple->changed, 0, sizeof(tuple->changed)); + tuple->values = palloc(natts * sizeof(StringInfoData *)); + + /* default is unchanged */ + tuple->format = palloc(natts * sizeof(char)); + memset(tuple->format, 't', natts * sizeof(char)); - /* default is text */ - memset(tuple->binary, 0, sizeof(tuple->binary)); /* Read the data */ for (i = 0; i < natts; i++) @@ -536,45 +538,51 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) switch (kind) { case 'n': /* null */ - tuple->values[i].len = 0; - tuple->values[i].data = NULL; - tuple->changed[i] = true; - break; + { + tuple->values[i] = (StringInfoData *)NULL; + tuple->format[i] = 't'; + break; + } case 'u': /* unchanged column */ - /* we don't receive the value of an unchanged column */ - tuple->values[i].len = 0; - tuple->values[i].data = NULL; - break; + { + /* we don't receive the value of an unchanged column */ + tuple->values[i] = (StringInfoData *)NULL; + tuple->format[i] = 'u'; /* be explicit */ + break; + } case 'b': /* binary formatted value */ { int len; - tuple->changed[i] = true; - tuple->binary[i] = true; + StringInfoData *value = palloc(sizeof(StringInfoData)); + tuple->format[i] = 'b'; len = pq_getmsgint(in, 4); /* read length */ - tuple->values[i].data = palloc(len + 1); + value->data = palloc(len + 1); /* and data */ - pq_copymsgbytes(in, tuple->values[i].data, len); - tuple->values[i].len = len; - tuple->values[i].cursor = 0; - tuple->values[i].maxlen = len; + pq_copymsgbytes(in, value->data, len); + value->len = len; + value->cursor = 0; + value->maxlen = len; /* not strictly necessary but the docs say it is required */ - tuple->values[i].data[len] = '\0'; + value->data[len] = '\0'; + tuple->values[i] = value; break; } case 't': /* text formatted value */ { int len; - tuple->changed[i] = true; + StringInfoData *value = palloc(sizeof(StringInfoData)); + tuple->format[i] = 't'; len = pq_getmsgint(in, 4); /* read length */ /* and data */ - tuple->values[i].data = palloc(len + 1); - pq_copymsgbytes(in, tuple->values[i].data, len); - tuple->values[i].data[len] = '\0'; - tuple->values[i].len = len; + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + value->len = len; + value->data[len] = '\0'; + tuple->values[i] = value; } break; default: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fadfd1dfaf..261d017fc0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -350,28 +350,28 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, int remoteattnum = rel->attrmap->attnums[i]; if (!att->attisdropped && remoteattnum >= 0 && - tupleData->values[remoteattnum].data != NULL) + tupleData->values[remoteattnum] != NULL) { errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - if (tupleData->binary[remoteattnum]) + if (tupleData->format[remoteattnum] == 'b') { Oid typreceive; Oid typioparam; getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); - int cursor = tupleData->values[remoteattnum].cursor; + int cursor = tupleData->values[remoteattnum]->cursor; slot->tts_values[i] = - OidReceiveFunctionCall(typreceive, &tupleData->values[remoteattnum], + OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum], typioparam, att->atttypmod); /* * Do not advance the cursor in case we need to re-read this * This saves us from pushing all of this type logic into proto.c */ - tupleData->values[remoteattnum].cursor = cursor; + tupleData->values[remoteattnum]->cursor = cursor; } else @@ -381,7 +381,7 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, getTypeInputInfo(att->atttypid, &typinput, &typioparam); slot->tts_values[i] = - OidInputFunctionCall(typinput, tupleData->values[remoteattnum].data, + OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data, typioparam, att->atttypmod); } slot->tts_isnull[i] = false; @@ -465,16 +465,16 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, if (remoteattnum < 0) continue; - if (!tupleData->changed[remoteattnum]) + if (tupleData->format[remoteattnum] =='u') continue; - if (tupleData->values[remoteattnum].data != NULL) + if (tupleData->values[remoteattnum] != NULL) { errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - if (tupleData->binary[remoteattnum]) + if (tupleData->format[remoteattnum] == 'b') { Oid typreceive; Oid typioparam; @@ -482,15 +482,15 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); - int cursor = tupleData->values[remoteattnum].cursor; + int cursor = tupleData->values[remoteattnum]->cursor; slot->tts_values[i] = - OidReceiveFunctionCall(typreceive, &tupleData->values[remoteattnum], + OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum], typioparam, att->atttypmod); /* * Do not advance the cursor in case we need to re-read this * This saves us from pushing all of this type logic into proto.c */ - tupleData->values[remoteattnum].cursor = cursor; + tupleData->values[remoteattnum]->cursor = cursor; } else { @@ -499,7 +499,7 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, getTypeInputInfo(att->atttypid, &typinput, &typioparam); slot->tts_values[i] = - OidInputFunctionCall(typinput, tupleData->values[remoteattnum].data, + OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data, typioparam, att->atttypmod); } slot->tts_isnull[i] = false; @@ -821,7 +821,7 @@ apply_handle_update(StringInfo s) target_rte = list_nth(estate->es_range_table, 0); for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) { - if (newtup.changed[i]) + if (newtup.format[i] != 'u') target_rte->updatedCols = bms_add_member(target_rte->updatedCols, i + 1 - FirstLowInvalidHeapAttributeNumber); } @@ -887,7 +887,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo, { /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_data(remoteslot, localslot, rel, &newtup); + slot_modify_data(remoteslot, localslot, relmapentry, newtup); MemoryContextSwitchTo(oldctx); EvalPlanQualSetSlot(&epqstate, remoteslot); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 70c9008af9..65c2e5d658 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -20,6 +20,7 @@ #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -157,7 +158,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (protocol_version_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); protocol_version_given = true; if (!scanint8(strVal(defel->arg), true, &parsed)) @@ -178,7 +179,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (publication_names_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); publication_names_given = true; if (!SplitIdentifierString(strVal(defel->arg), ',', @@ -193,7 +194,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (binary_option_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); binary_option_given = true; if (!parse_bool(strVal(defel->arg), &parsed)) @@ -211,7 +212,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (sizeof_datum_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); sizeof_datum_given = true; if (!scanint8(strVal(defel->arg), true, &datum_size)) @@ -225,7 +226,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (sizeof_int_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); sizeof_int_given = true; if (!scanint8(strVal(defel->arg), true, &int_size)) @@ -239,8 +240,8 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (sizeof_long_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); - sizeof_int_given = true; + errmsg("conflicting or redundant options %s already provided", defel->defname))); + sizeof_long_given = true; if (!scanint8(strVal(defel->arg), true, &long_size)) ereport(ERROR, @@ -253,7 +254,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (big_endian_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); big_endian_given = true; if (!parse_bool(strVal(defel->arg), &bigendian)) @@ -267,7 +268,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (float4_byval_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); float4_byval_given = true; if (!parse_bool(strVal(defel->arg), &float4_byval)) @@ -281,7 +282,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (float8_byval_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); float8_byval_given = true; if (!parse_bool(strVal(defel->arg), &float8_byval)) @@ -289,13 +290,13 @@ parse_output_parameters(List *options, uint32 *protocol_version, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid float8_byval option"))); } - else if (strcmp(defel->defname, "integer_date_times") == 0) + else if (strcmp(defel->defname, "integer_datetimes") == 0) { if (integer_datetimes_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); integer_datetimes_given = true; if (!parse_bool(strVal(defel->arg), &integer_datetimes)) @@ -338,17 +339,21 @@ parse_output_parameters(List *options, uint32 *protocol_version, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("incompatible endianness"))); if( float4_byval != +#if PG_VERSION_NUM >= 130000 + true +#else #ifdef USE_FLOAT4_BYVAL true #else false +#endif #endif ) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("incompatible float4_byval"))); if( float8_byval != -#ifdef USE_FLOAT4_BYVAL +#ifdef USE_FLOAT8_BYVAL true #else false @@ -359,10 +364,14 @@ parse_output_parameters(List *options, uint32 *protocol_version, errmsg("incompatible float8_byval"))); if ( integer_datetimes != +#if PG_VERSION_NUM >= 100000 + true +#else #ifdef USE_INTEGER_DATETIMES true #else false +#endif #endif ) ereport(ERROR, diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h index 8f3ec6bde1..ec4fa01f30 100644 --- a/src/include/pg_config_manual.h +++ b/src/include/pg_config_manual.h @@ -336,7 +336,7 @@ * Enable debugging print statements for WAL-related operations; see * also the wal_debug GUC var. */ -/* #define WAL_DEBUG */ +#define WAL_DEBUG /* * Enable tracing of resource consumption during sort operations; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 85351c6093..b209af4cf2 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -31,13 +31,11 @@ typedef struct LogicalRepTupleData { /* column values */ - StringInfoData values[MaxTupleAttributeNumber]; + StringInfoData **values; - /* markers for binary */ - bool binary[MaxTupleAttributeNumber]; + /* markers for changed/unchanged/binary/text */ + char *format; - /* markers for changed/unchanged column values: */ - bool changed[MaxTupleAttributeNumber]; } LogicalRepTupleData; typedef uint32 LogicalRepRelId; -- 2.20.1 (Apple Git-117)