From b109ee7d04483b053676755223ba0ccb1b53c511 Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Mon, 6 Jul 2020 19:50:21 -0400 Subject: [PATCH 1/2] Add binary protocol for publications and subscriptions for base types add binary column to pg_subscription support create and alter subcription with binary option check that the subscriber is compatible with the publisher Removed the array representing binary types in favour of an array representing the format of each type and use existing codes 't' for text, 'b' for binary and 'u' for unchanged. Dynamically allocate the arrays to avoid allocating max columns sized arrays. Fixed numerous issues with checking for validity --- src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 39 ++- .../libpqwalreceiver/libpqwalreceiver.c | 46 ++++ src/backend/replication/logical/proto.c | 122 ++++++---- src/backend/replication/logical/worker.c | 128 +++++++--- src/backend/replication/pgoutput/pgoutput.c | 225 +++++++++++++++++- src/include/catalog/pg_subscription.h | 4 + src/include/pg_config_manual.h | 2 +- src/include/replication/logicalproto.h | 20 +- src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + 12 files changed, 490 insertions(+), 101 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index cb15731115..e6afb3203e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->binary = subform->subbinary; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5314e9348f..9a853fe48d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1125,7 +1125,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications) +GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9ebb026187..f2590ff565 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -59,7 +59,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, - bool *refresh) + bool *refresh, bool *binary_given, bool *binary) { ListCell *lc; bool connect_given = false; @@ -90,6 +90,12 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *synchronous_commit = NULL; if (refresh) *refresh = true; + if (binary) + { + *binary_given = false; + /* not all versions of pgoutput will understand this option default to false */ + *binary = false; + } /* Parse options */ foreach(lc, options) @@ -175,6 +181,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, refresh_given = true; *refresh = defGetBoolean(defel); } + else if (strcmp(defel->defname, "binary") == 0 && binary) + { + *binary_given = true; + *binary = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -324,8 +335,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool slotname_given; char originname[NAMEDATALEN]; bool create_slot; - List *publications; + bool binary; + bool binary_given; + List *publications; /* * Parse and check options. * @@ -334,7 +347,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) parse_subscription_options(stmt->options, &connect, &enabled_given, &enabled, &create_slot, &slotname_given, &slotname, ©_data, &synchronous_commit, - NULL); + NULL, &binary_given, &binary); /* * Since creating a replication slot is not transactional, rolling back @@ -400,6 +413,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -669,10 +683,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *slotname; bool slotname_given; char *synchronous_commit; + bool binary_given; + bool binary; parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, &slotname_given, &slotname, - NULL, &synchronous_commit, NULL); + NULL, &synchronous_commit, NULL, + &binary_given, &binary); if (slotname_given) { @@ -697,6 +714,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subsynccommit - 1] = true; } + if (binary_given) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(binary); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + update_tuple = true; break; } @@ -708,7 +732,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL, NULL, NULL); + NULL, NULL, NULL, NULL, NULL, + NULL, NULL); Assert(enabled_given); if (!sub->slotname && enabled) @@ -746,7 +771,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, &refresh); + NULL, &refresh, NULL, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -783,7 +808,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, NULL); + NULL, NULL, NULL, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e4fd1f9bb6..08313fa2a5 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -423,7 +423,53 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); PQfreemem(pubnames_literal); pfree(pubnames_str); + if (options->proto.logical.binary) { + appendStringInfo(&cmd, ", binary 'true'"); + appendStringInfo(&cmd, ", sizeof_datum '%zu'", sizeof(Datum)); + appendStringInfo(&cmd, ", sizeof_int '%zu'", sizeof(int)); + appendStringInfo(&cmd, ", sizeof_long '%zu'", sizeof(long)); + appendStringInfo(&cmd, ", bigendian '%d'", +#ifdef WORDS_BIGENDIAN + true +#else + false +#endif + ); + appendStringInfo(&cmd, ", float4_byval '%d'", +#if PG_VERSION_NUM >= 130000 + true +#else +#ifdef USE_FLOAT4_BYVAL + true +#else + false +#endif +#endif + ); + appendStringInfo(&cmd, ", float8_byval '%d'", +#ifdef USE_FLOAT8_BYVAL + true +#else + false +#endif + ); + appendStringInfo(&cmd, ", integer_datetimes '%d'", + +/* integer date times are always enabled in version 10 and up */ +#if PG_VERSION_NUM >= 100000 + true +#else + +#ifdef USE_INTEGER_DATETIMES + true +#else + false +#endif +#endif + + ); + } appendStringInfoChar(&cmd, ')'); } else diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3c6d0cd171..73148f39f3 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -16,9 +16,11 @@ #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" #include "libpq/pqformat.h" +#include "replication/logicalrelation.h" #include "replication/logicalproto.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/rel.h" #include "utils/syscache.h" /* @@ -31,7 +33,7 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple); + HeapTuple tuple, bool binary_basetypes); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -139,7 +141,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) +logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary_basetypes) { pq_sendbyte(out, 'I'); /* action INSERT */ @@ -147,7 +149,7 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary_basetypes); } /* @@ -161,16 +163,13 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) char action; LogicalRepRelId relid; - /* read the relation id */ relid = pq_getmsgint(in, 4); - action = pq_getmsgbyte(in); if (action != 'N') elog(ERROR, "expected new tuple but got %d", action); logicalrep_read_tuple(in, newtup); - return relid; } @@ -179,7 +178,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple) + HeapTuple newtuple, bool binary_basetypes) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -196,26 +195,22 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary_basetypes); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary_basetypes); } /* * Read UPDATE from stream. */ -LogicalRepRelId +void logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup) { char action; - LogicalRepRelId relid; - - /* read the relation id */ - relid = pq_getmsgint(in, 4); /* read and verify action */ action = pq_getmsgbyte(in); @@ -241,14 +236,13 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, logicalrep_read_tuple(in, newtup); - return relid; } /* * Write DELETE to the output stream. */ void -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) +logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary_basetypes) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -264,7 +258,7 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary_basetypes); } /* @@ -272,14 +266,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) * * Fills the old tuple. */ -LogicalRepRelId +void logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) { char action; - LogicalRepRelId relid; - - /* read the relation id */ - relid = pq_getmsgint(in, 4); /* read and verify action */ action = pq_getmsgbyte(in); @@ -288,7 +278,6 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) logicalrep_read_tuple(in, oldtup); - return relid; } /* @@ -437,7 +426,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary_basetypes) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; @@ -453,6 +442,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) continue; nliveatts++; } + pq_sendint16(out, nliveatts); /* try to allocate enough memory from the get-go */ @@ -488,12 +478,31 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) elog(ERROR, "cache lookup failed for type %u", att->atttypid); typclass = (Form_pg_type) GETSTRUCT(typtup); - pq_sendbyte(out, 't'); /* 'text' data follows */ - outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); - pq_sendcountedtext(out, outputstr, strlen(outputstr), false); - pfree(outputstr); + if (binary_basetypes && + OidIsValid(typclass->typreceive) && + (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') && + (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid)) + { + bytea *outputbytes; + int len; + pq_sendbyte(out, 'b'); /* binary send/recv data follows */ + + outputbytes = OidSendFunctionCall(typclass->typsend, + values[i]); + len = VARSIZE(outputbytes) - VARHDRSZ; + pq_sendint(out, len, 4); /* length */ + pq_sendbytes(out, VARDATA(outputbytes), len); /* data */ + pfree(outputbytes); + } + else + { + pq_sendbyte(out, 't'); /* 'text' data follows */ + outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); + pq_sendcountedtext(out, outputstr, strlen(outputstr), false); + pfree(outputstr); + } ReleaseSysCache(typtup); } } @@ -512,7 +521,12 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) /* Get number of attributes */ natts = pq_getmsgint(in, 2); - memset(tuple->changed, 0, sizeof(tuple->changed)); + tuple->values = palloc(natts * sizeof(StringInfoData *)); + + /* default is unchanged */ + tuple->format = palloc(natts * sizeof(char)); + memset(tuple->format, 't', natts * sizeof(char)); + /* Read the data */ for (i = 0; i < natts; i++) @@ -524,25 +538,51 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) switch (kind) { case 'n': /* null */ - tuple->values[i] = NULL; - tuple->changed[i] = true; - break; + { + tuple->values[i] = (StringInfoData *)NULL; + tuple->format[i] = 't'; + break; + } case 'u': /* unchanged column */ - /* we don't receive the value of an unchanged column */ - tuple->values[i] = NULL; - break; - case 't': /* text formatted value */ { - int len; + /* we don't receive the value of an unchanged column */ + tuple->values[i] = (StringInfoData *)NULL; + tuple->format[i] = 'u'; /* be explicit */ + break; + } + case 'b': /* binary formatted value */ + { + int len; + StringInfoData *value = palloc(sizeof(StringInfoData)); + tuple->format[i] = 'b'; + + len = pq_getmsgint(in, 4); /* read length */ - tuple->changed[i] = true; + value->data = palloc(len + 1); + /* and data */ + pq_copymsgbytes(in, value->data, len); + value->len = len; + value->cursor = 0; + value->maxlen = len; + /* not strictly necessary but the docs say it is required */ + value->data[len] = '\0'; + tuple->values[i] = value; + break; + } + case 't': /* text formatted value */ + { + int len; + StringInfoData *value = palloc(sizeof(StringInfoData)); + tuple->format[i] = 't'; len = pq_getmsgint(in, 4); /* read length */ /* and data */ - tuple->values[i] = palloc(len + 1); - pq_copymsgbytes(in, tuple->values[i], len); - tuple->values[i][len] = '\0'; + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + value->len = len; + value->data[len] = '\0'; + tuple->values[i] = value; } break; default: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a752a1224d..806e0a8bed 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -319,13 +319,12 @@ slot_store_error_callback(void *arg) } /* - * Store data in C string form into slot. - * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our - * use better. + * Store data into slot. + * Data can be either text or binary transfer format */ static void -slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values) +slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -351,18 +350,40 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, int remoteattnum = rel->attrmap->attnums[i]; if (!att->attisdropped && remoteattnum >= 0 && - values[remoteattnum] != NULL) + tupleData->values[remoteattnum] != NULL) { - Oid typinput; - Oid typioparam; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); + if (tupleData->format[remoteattnum] == 'b') + { + Oid typreceive; + Oid typioparam; + + int cursor = tupleData->values[remoteattnum]->cursor; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum], + typioparam, att->atttypmod); + /* + * Do not advance the cursor in case we need to re-read this + * This saves us from pushing all of this type logic into proto.c + */ + tupleData->values[remoteattnum]->cursor = cursor; + + } + else + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data, + typioparam, att->atttypmod); + } slot->tts_isnull[i] = false; errarg.local_attnum = -1; @@ -396,11 +417,17 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, * Caution: unreplaced pass-by-ref columns in "slot" will point into the * storage for "srcslot". This is OK for current usage, but someday we may * need to materialize "slot" at the end to make it independent of "srcslot". + * + * TODO: figure out the right comment here + * Modify slot with user data provided. + * This is somewhat similar to heap_modify_tuple but also calls the type + * input function on the user data as the input is either text or binary transfer + * format */ static void -slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, - LogicalRepRelMapEntry *rel, - char **values, bool *replaces) +slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, + LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -438,21 +465,42 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, if (remoteattnum < 0) continue; - if (!replaces[remoteattnum]) + if (tupleData->format[remoteattnum] =='u') continue; - if (values[remoteattnum] != NULL) + if (tupleData->values[remoteattnum] != NULL) { - Oid typinput; - Oid typioparam; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); + if (tupleData->format[remoteattnum] == 'b') + { + Oid typreceive; + Oid typioparam; + + int cursor = tupleData->values[remoteattnum]->cursor; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum], + typioparam, att->atttypmod); + /* + * Do not advance the cursor in case we need to re-read this + * This saves us from pushing all of this type logic into proto.c + */ + tupleData->values[remoteattnum]->cursor = cursor; + } + else + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data, + typioparam, att->atttypmod); + } slot->tts_isnull[i] = false; errarg.local_attnum = -1; @@ -618,8 +666,12 @@ apply_handle_insert(StringInfo s) ensure_transaction(); + /* read the relation id */ relid = logicalrep_read_insert(s, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); + + + if (!should_apply_changes_for_rel(rel)) { /* @@ -641,7 +693,7 @@ apply_handle_insert(StringInfo s) /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, newtup.values); + slot_store_data(remoteslot, rel, &newtup); slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); @@ -733,9 +785,12 @@ apply_handle_update(StringInfo s) ensure_transaction(); - relid = logicalrep_read_update(s, &has_oldtup, &oldtup, - &newtup); + /* read the relation id */ + relid = pq_getmsgint(s, 4); rel = logicalrep_rel_open(relid, RowExclusiveLock); + + logicalrep_read_update(s, &has_oldtup, &oldtup, + &newtup); if (!should_apply_changes_for_rel(rel)) { /* @@ -765,7 +820,7 @@ apply_handle_update(StringInfo s) target_rte = list_nth(estate->es_range_table, 0); for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) { - if (newtup.changed[i]) + if (newtup.format[i] != 'u') target_rte->updatedCols = bms_add_member(target_rte->updatedCols, i + 1 - FirstLowInvalidHeapAttributeNumber); } @@ -776,8 +831,8 @@ apply_handle_update(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, - has_oldtup ? oldtup.values : newtup.values); + slot_store_data(remoteslot, rel, + has_oldtup ? &oldtup : &newtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply update to correct partition. */ @@ -831,8 +886,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo, { /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_cstrings(remoteslot, localslot, relmapentry, - newtup->values, newtup->changed); + slot_modify_data(remoteslot, localslot, relmapentry, newtup); MemoryContextSwitchTo(oldctx); EvalPlanQualSetSlot(&epqstate, remoteslot); @@ -875,8 +929,11 @@ apply_handle_delete(StringInfo s) ensure_transaction(); - relid = logicalrep_read_delete(s, &oldtup); + /* read the relation id */ + relid = pq_getmsgint(s, 4); rel = logicalrep_rel_open(relid, RowExclusiveLock); + + logicalrep_read_delete(s, &oldtup); if (!should_apply_changes_for_rel(rel)) { /* @@ -900,7 +957,7 @@ apply_handle_delete(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, oldtup.values); + slot_store_data(remoteslot, rel, &oldtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply delete to correct partition. */ @@ -1096,9 +1153,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, if (found) { /* Apply the update. */ - slot_modify_cstrings(remoteslot_part, localslot, + slot_modify_data(remoteslot_part, localslot, part_entry, - newtup->values, newtup->changed); + newtup); MemoryContextSwitchTo(oldctx); } else @@ -2106,6 +2163,7 @@ ApplyWorkerMain(Datum main_arg) options.slotname = myslotname; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; + options.proto.logical.binary = MySubscription->binary; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 15379e3118..65c2e5d658 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -20,6 +20,7 @@ #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -118,11 +119,30 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names) + List **publication_names, bool *binary_basetypes) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; + bool binary_option_given = false; + bool sizeof_int_given = false; + bool sizeof_datum_given = false; + bool sizeof_long_given = false; + bool big_endian_given = false; + bool float4_byval_given = false; + bool float8_byval_given = false; + bool integer_datetimes_given = false; + long datum_size; + long int_size; + long long_size; + bool bigendian; + bool float4_byval; + bool float8_byval; + bool integer_datetimes; + + // default to false + *binary_basetypes = false; + foreach(lc, options) { @@ -138,7 +158,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (protocol_version_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); protocol_version_given = true; if (!scanint8(strVal(defel->arg), true, &parsed)) @@ -159,7 +179,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (publication_names_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options %s already provided", defel->defname))); publication_names_given = true; if (!SplitIdentifierString(strVal(defel->arg), ',', @@ -168,9 +188,197 @@ parse_output_parameters(List *options, uint32 *protocol_version, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax"))); } + else if (strcmp(defel->defname, "binary") == 0 ) + { + bool parsed; + if (binary_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options %s already provided", defel->defname))); + binary_option_given = true; + + if (!parse_bool(strVal(defel->arg), &parsed)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid binary option"))); + + *binary_basetypes = parsed; + + + } + else if (strcmp(defel->defname, "sizeof_datum") == 0) + { + + if (sizeof_datum_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options %s already provided", defel->defname))); + sizeof_datum_given = true; + + if (!scanint8(strVal(defel->arg), true, &datum_size)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid sizeof_datum option"))); + } + else if (strcmp(defel->defname, "sizeof_int") == 0) + { + + if (sizeof_int_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options %s already provided", defel->defname))); + sizeof_int_given = true; + + if (!scanint8(strVal(defel->arg), true, &int_size)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid sizeof_int option"))); + } + else if (strcmp(defel->defname, "sizeof_long") == 0) + { + + if (sizeof_long_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options %s already provided", defel->defname))); + sizeof_long_given = true; + + if (!scanint8(strVal(defel->arg), true, &long_size)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid sizeof_long option"))); + } + else if (strcmp(defel->defname, "bigendian") == 0) + { + + if (big_endian_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options %s already provided", defel->defname))); + big_endian_given = true; + + if (!parse_bool(strVal(defel->arg), &bigendian)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid bigendian option"))); + } + else if (strcmp(defel->defname, "float4_byval") == 0) + { + + if (float4_byval_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options %s already provided", defel->defname))); + float4_byval_given = true; + + if (!parse_bool(strVal(defel->arg), &float4_byval)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid float4_byval option"))); + } + else if (strcmp(defel->defname, "float8_byval") == 0) + { + + if (float8_byval_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options %s already provided", defel->defname))); + float8_byval_given = true; + + if (!parse_bool(strVal(defel->arg), &float8_byval)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid float8_byval option"))); + } + else if (strcmp(defel->defname, "integer_datetimes") == 0) + { + + if (integer_datetimes_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options %s already provided", defel->defname))); + integer_datetimes_given = true; + + if (!parse_bool(strVal(defel->arg), &integer_datetimes)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid integer_date_times option"))); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } + +/* + * after we know that the subscriber is requesting binary check to make sure + * we are compatible with the subscriber. + */ + if ( *binary_basetypes == true ) + { + if (sizeof(Datum) != datum_size) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible datum size"))); + + if (sizeof(int) != int_size) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible integer size"))); + + if (sizeof(long) != long_size) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible long size"))); + if( +#ifdef WORDS_BIGENDIAN + true +#else + false +#endif + != bigendian) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible endianness"))); + if( float4_byval != +#if PG_VERSION_NUM >= 130000 + true +#else +#ifdef USE_FLOAT4_BYVAL + true +#else + false +#endif +#endif + ) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible float4_byval"))); + if( float8_byval != +#ifdef USE_FLOAT8_BYVAL + true +#else + false +#endif + ) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible float8_byval"))); + + if ( integer_datetimes != +#if PG_VERSION_NUM >= 100000 + true +#else +#ifdef USE_INTEGER_DATETIMES + true +#else + false +#endif +#endif + ) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incompatible integer_datetimes"))); + + } } /* @@ -202,7 +410,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Parse the params and ERROR if we see any we don't recognize */ parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, - &data->publication_names); + &data->publication_names, + &data->binary_basetypes); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) @@ -397,6 +606,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: + { HeapTuple tuple = &change->data.tp.newtuple->tuple; @@ -411,7 +621,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, tuple); + logicalrep_write_insert(ctx->out, relation, tuple, data->binary_basetypes); OutputPluginWrite(ctx, true); break; } @@ -435,7 +645,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, newtuple); + logicalrep_write_update(ctx->out, relation, oldtuple, newtuple, data->binary_basetypes); OutputPluginWrite(ctx, true); break; } @@ -455,7 +665,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, oldtuple); + logicalrep_write_delete(ctx->out, relation, oldtuple, data->binary_basetypes); + OutputPluginWrite(ctx, true); } else diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0a756d42d8..100789a52f 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + bool subbinary; /* True if the subscription wants the + * output plugin to send data in binary */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -73,6 +76,7 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + bool binary; /* Indicates if the subscription wants data in binary format */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h index 8f3ec6bde1..ec4fa01f30 100644 --- a/src/include/pg_config_manual.h +++ b/src/include/pg_config_manual.h @@ -336,7 +336,7 @@ * Enable debugging print statements for WAL-related operations; see * also the wal_debug GUC var. */ -/* #define WAL_DEBUG */ +#define WAL_DEBUG /* * Enable tracing of resource consumption during sort operations; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 4860561be9..b209af4cf2 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -30,10 +30,12 @@ /* Tuple coming via logical replication. */ typedef struct LogicalRepTupleData { - /* column values in text format, or NULL for a null value: */ - char *values[MaxTupleAttributeNumber]; - /* markers for changed/unchanged column values: */ - bool changed[MaxTupleAttributeNumber]; + /* column values */ + StringInfoData **values; + + /* markers for changed/unchanged/binary/text */ + char *format; + } LogicalRepTupleData; typedef uint32 LogicalRepRelId; @@ -87,16 +89,16 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, Relation rel, - HeapTuple newtuple); + HeapTuple newtuple, bool binary_basetypes); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple); -extern LogicalRepRelId logicalrep_read_update(StringInfo in, + HeapTuple newtuple, bool binary_basetypes); +extern void logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, Relation rel, - HeapTuple oldtuple); -extern LogicalRepRelId logicalrep_read_delete(StringInfo in, + HeapTuple oldtuple, bool binary_basetypes); +extern void logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 2e8e9daf44..1252e9132b 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -25,6 +25,7 @@ typedef struct PGOutputData List *publication_names; List *publications; + bool binary_basetypes; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c75dcebea0..1ee316f6d7 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -177,6 +177,7 @@ typedef struct { uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ + bool binary; /* Ask publisher output plugin to use binary */ } logical; } proto; } WalRcvStreamOptions; -- 2.20.1 (Apple Git-117)