From e791d3c501523e569b5893377a8c7f5af96fb57b Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Thu, 2 Jul 2020 12:18:40 -0400 Subject: [PATCH 7/8] check that the subscriber is compatible with the publisher --- .../libpqwalreceiver/libpqwalreceiver.c | 36 +++- src/backend/replication/pgoutput/pgoutput.c | 179 ++++++++++++++++++ 2 files changed, 212 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2a9d966ec1..9130d645b6 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -402,7 +402,6 @@ libpqrcv_startstreaming(WalReceiverConn *conn, char *pubnames_str; List *pubnames; char *pubnames_literal; - bool binary; appendStringInfoString(&cmd, " ("); @@ -424,9 +423,40 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); PQfreemem(pubnames_literal); pfree(pubnames_str); - if (options->proto.logical.binary) + if (options->proto.logical.binary) { appendStringInfo(&cmd, ", binary 'true'"); - + appendStringInfo(&cmd, ", sizeof_datum %zu", sizeof(Datum)); + appendStringInfo(&cmd, ", sizeof_int %zu", sizeof(int)); + appendStringInfo(&cmd, ", sizeof_long %zu", sizeof(long)); + appendStringInfo(&cmd, ", bigendian %d", +#ifdef WORDS_BIGENDIAN + true +#else + false +#endif + ); + appendStringInfo(&cmd, ", float4_byval %d", +#ifdef USE_FLOAT4_BYVAL + true +#else + false +#endif + ); + appendStringInfo(&cmd, ", float8_byval %d", +#ifdef USE_FLOAT8_BYVAL + true +#else + false +#endif + ); + appendStringInfo(&cmd, ", integer_datetimes %d", +#ifdef USE_INTEGER_DATETIMES + true +#else + false +#endif + ); + } appendStringInfoChar(&cmd, ')'); } else diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 5752731374..70c9008af9 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -124,10 +124,25 @@ parse_output_parameters(List *options, uint32 *protocol_version, bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool sizeof_int_given = false; + bool sizeof_datum_given = false; + bool sizeof_long_given = false; + bool big_endian_given = false; + bool float4_byval_given = false; + bool float8_byval_given = false; + bool integer_datetimes_given = false; + long datum_size; + long int_size; + long long_size; + bool bigendian; + bool float4_byval; + bool float8_byval; + bool integer_datetimes; // default to false *binary_basetypes = false; + foreach(lc, options) { DefElem *defel = (DefElem *) lfirst(lc); @@ -187,10 +202,174 @@ parse_output_parameters(List *options, uint32 *protocol_version, errmsg("invalid binary option"))); *binary_basetypes = parsed; + + + } + else if (strcmp(defel->defname, "sizeof_datum") == 0) + { + + if (sizeof_datum_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + sizeof_datum_given = true; + + if (!scanint8(strVal(defel->arg), true, &datum_size)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid sizeof_datum option"))); + } + else if (strcmp(defel->defname, "sizeof_int") == 0) + { + + if (sizeof_int_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + sizeof_int_given = true; + + if (!scanint8(strVal(defel->arg), true, &int_size)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid sizeof_int option"))); + } + else if (strcmp(defel->defname, "sizeof_long") == 0) + { + + if (sizeof_long_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + sizeof_int_given = true; + + if (!scanint8(strVal(defel->arg), true, &long_size)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid sizeof_long option"))); + } + else if (strcmp(defel->defname, "bigendian") == 0) + { + + if (big_endian_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + big_endian_given = true; + + if (!parse_bool(strVal(defel->arg), &bigendian)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid bigendian option"))); + } + else if (strcmp(defel->defname, "float4_byval") == 0) + { + + if (float4_byval_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + float4_byval_given = true; + + if (!parse_bool(strVal(defel->arg), &float4_byval)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid float4_byval option"))); + } + else if (strcmp(defel->defname, "float8_byval") == 0) + { + + if (float8_byval_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + float8_byval_given = true; + + if (!parse_bool(strVal(defel->arg), &float8_byval)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid float8_byval option"))); + } + else if (strcmp(defel->defname, "integer_date_times") == 0) + { + + if (integer_datetimes_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + integer_datetimes_given = true; + + if (!parse_bool(strVal(defel->arg), &integer_datetimes)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid integer_date_times option"))); } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } + +/* + * after we know that the subscriber is requesting binary check to make sure + * we are compatible with the subscriber. + */ + if ( *binary_basetypes == true ) + { + if (sizeof(Datum) != datum_size) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible datum size"))); + + if (sizeof(int) != int_size) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible integer size"))); + + if (sizeof(long) != long_size) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible long size"))); + if( +#ifdef WORDS_BIGENDIAN + true +#else + false +#endif + != bigendian) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible endianness"))); + if( float4_byval != +#ifdef USE_FLOAT4_BYVAL + true +#else + false +#endif + ) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible float4_byval"))); + if( float8_byval != +#ifdef USE_FLOAT4_BYVAL + true +#else + false +#endif + ) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible float8_byval"))); + + if ( integer_datetimes != +#ifdef USE_INTEGER_DATETIMES + true +#else + false +#endif + ) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible integer_datetimes"))); + + } } /* -- 2.20.1 (Apple Git-117)