From 6a961ebaca47eb54bb20f2cdba1811653274d21b Mon Sep 17 00:00:00 2001 From: Daniel Gustafsson Date: Tue, 14 Jul 2020 13:54:37 +0200 Subject: [PATCH] Add binary protocol for publications and subscriptions This adds support for transferring data using binary format between subscriber and publisher. A new column is added to pg_subscription, subbinary, in order to track whether binary is enabled or not. Author: Dave Cramer --- doc/src/sgml/catalogs.sgml | 10 ++ doc/src/sgml/ref/alter_subscription.sgml | 4 +- doc/src/sgml/ref/create_subscription.sgml | 16 +++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 41 +++++- .../libpqwalreceiver/libpqwalreceiver.c | 3 +- src/backend/replication/logical/proto.c | 123 ++++++++++------ src/backend/replication/logical/worker.c | 134 ++++++++++++------ src/backend/replication/pgoutput/pgoutput.c | 34 ++++- src/bin/psql/describe.c | 8 +- src/include/catalog/pg_subscription.h | 5 + src/include/replication/logicalproto.h | 30 ++-- src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + src/test/regress/expected/subscription.out | 56 +++++--- src/test/regress/sql/subscription.sql | 12 ++ src/test/subscription/t/014_binary.pl | 108 ++++++++++++++ 18 files changed, 458 insertions(+), 131 deletions(-) create mode 100644 src/test/subscription/t/014_binary.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index e9cdff4864..fe317ec37f 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7504,6 +7504,16 @@ SCRAM-SHA-256$<iteration count>:&l + + + subbinary bool + + + If true, the subscription will request that the publisher send data + in binary format. + + + subconninfo text diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index c24ace14d1..98ca11cb1c 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -163,8 +163,8 @@ ALTER SUBSCRIPTION name RENAME TO < This clause alters parameters originally set by . See there for more - information. The allowed options are slot_name and - synchronous_commit + information. The allowed options are slot_name, + synchronous_commit and binary. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 5bbc165f70..1d71368254 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -128,6 +128,22 @@ CREATE SUBSCRIPTION subscription_name + + binary (boolean) + + + Specifies whether the subscription will request the publisher to + send the data in binary or not. The default + is false. + + + + Note: Only types that have send and receive functions will be + transferred in binary + + + + slot_name (string) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index cb15731115..e6afb3203e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->binary = subform->subbinary; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b6d35c2d11..7f02d6dbb2 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1122,7 +1122,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications) +GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9ebb026187..d87de2ac2f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -59,7 +59,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, - bool *refresh) + bool *refresh, bool *binary_given, bool *binary) { ListCell *lc; bool connect_given = false; @@ -90,6 +90,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *synchronous_commit = NULL; if (refresh) *refresh = true; + if (binary) + { + *binary_given = false; + + /* + * Not all versions of pgoutput will understand this option, so + * default to false. + */ + *binary = false; + } /* Parse options */ foreach(lc, options) @@ -175,6 +185,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, refresh_given = true; *refresh = defGetBoolean(defel); } + else if (strcmp(defel->defname, "binary") == 0 && binary) + { + *binary_given = true; + *binary = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -325,6 +340,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char originname[NAMEDATALEN]; bool create_slot; List *publications; + bool binary; + bool binary_given; /* * Parse and check options. @@ -334,7 +351,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) parse_subscription_options(stmt->options, &connect, &enabled_given, &enabled, &create_slot, &slotname_given, &slotname, ©_data, &synchronous_commit, - NULL); + NULL, &binary_given, &binary); /* * Since creating a replication slot is not transactional, rolling back @@ -400,6 +417,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -669,10 +687,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *slotname; bool slotname_given; char *synchronous_commit; + bool binary_given; + bool binary; parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, &slotname_given, &slotname, - NULL, &synchronous_commit, NULL); + NULL, &synchronous_commit, NULL, + &binary_given, &binary); if (slotname_given) { @@ -697,6 +718,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subsynccommit - 1] = true; } + if (binary_given) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(binary); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + update_tuple = true; break; } @@ -708,7 +736,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL, NULL, NULL); + NULL, NULL, NULL, NULL, NULL, + NULL, NULL); Assert(enabled_given); if (!sub->slotname && enabled) @@ -746,7 +775,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, &refresh); + NULL, &refresh, NULL, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -783,7 +812,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, NULL); + NULL, NULL, NULL, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e4fd1f9bb6..7989b58019 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -423,7 +423,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); PQfreemem(pubnames_literal); pfree(pubnames_str); - + if (options->proto.logical.binary) + appendStringInfoString(&cmd, ", binary 'true'"); appendStringInfoChar(&cmd, ')'); } else diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3c6d0cd171..b91ca2714d 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -17,8 +17,10 @@ #include "catalog/pg_type.h" #include "libpq/pqformat.h" #include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/rel.h" #include "utils/syscache.h" /* @@ -31,7 +33,7 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple); + HeapTuple tuple, bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -139,7 +141,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) +logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'I'); /* action INSERT */ @@ -147,7 +149,7 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary); } /* @@ -179,7 +181,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple) + HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -196,26 +198,22 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary); } /* * Read UPDATE from stream. */ -LogicalRepRelId +void logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup) { char action; - LogicalRepRelId relid; - - /* read the relation id */ - relid = pq_getmsgint(in, 4); /* read and verify action */ action = pq_getmsgbyte(in); @@ -240,15 +238,13 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, action); logicalrep_read_tuple(in, newtup); - - return relid; } /* * Write DELETE to the output stream. */ void -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) +logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -264,7 +260,7 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary); } /* @@ -272,14 +268,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) * * Fills the old tuple. */ -LogicalRepRelId +void logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) { char action; - LogicalRepRelId relid; - - /* read the relation id */ - relid = pq_getmsgint(in, 4); /* read and verify action */ action = pq_getmsgbyte(in); @@ -287,8 +279,6 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) elog(ERROR, "expected action 'O' or 'K', got %c", action); logicalrep_read_tuple(in, oldtup); - - return relid; } /* @@ -437,7 +427,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; @@ -453,6 +443,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) continue; nliveatts++; } + pq_sendint16(out, nliveatts); /* try to allocate enough memory from the get-go */ @@ -479,7 +470,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) } else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])) { - pq_sendbyte(out, 'u'); /* unchanged toast column */ + pq_sendbyte(out, LOGICALREP_UNCHANGED); continue; } @@ -488,12 +479,31 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) elog(ERROR, "cache lookup failed for type %u", att->atttypid); typclass = (Form_pg_type) GETSTRUCT(typtup); - pq_sendbyte(out, 't'); /* 'text' data follows */ + if (binary && + OidIsValid(typclass->typreceive) && + (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') && + (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid)) + { + bytea *outputbytes; + int len; + + pq_sendbyte(out, LOGICALREP_BINARY); - outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); - pq_sendcountedtext(out, outputstr, strlen(outputstr), false); - pfree(outputstr); + outputbytes = OidSendFunctionCall(typclass->typsend, + values[i]); + len = VARSIZE(outputbytes) - VARHDRSZ; + pq_sendint(out, len, 4); /* length */ + pq_sendbytes(out, VARDATA(outputbytes), len); /* data */ + pfree(outputbytes); + } + else + { + pq_sendbyte(out, LOGICALREP_TEXT); + outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); + pq_sendcountedtext(out, outputstr, strlen(outputstr), false); + pfree(outputstr); + } ReleaseSysCache(typtup); } } @@ -512,7 +522,11 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) /* Get number of attributes */ natts = pq_getmsgint(in, 2); - memset(tuple->changed, 0, sizeof(tuple->changed)); + tuple->values = palloc(natts * sizeof(StringInfoData *)); + + /* default is unchanged */ + tuple->format = palloc(natts * sizeof(char)); + memset(tuple->format, LOGICALREP_UNCHANGED, natts * sizeof(char)); /* Read the data */ for (i = 0; i < natts; i++) @@ -524,25 +538,52 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) switch (kind) { case 'n': /* null */ - tuple->values[i] = NULL; - tuple->changed[i] = true; - break; - case 'u': /* unchanged column */ - /* we don't receive the value of an unchanged column */ - tuple->values[i] = NULL; - break; - case 't': /* text formatted value */ + { + tuple->values[i] = (StringInfoData *) NULL; + tuple->format[i] = LOGICALREP_TEXT; + break; + } + case LOGICALREP_UNCHANGED: + { + /* we don't receive the value of an unchanged column */ + tuple->values[i] = (StringInfoData *) NULL; + tuple->format[i] = LOGICALREP_UNCHANGED; /* be explicit */ + break; + } + case LOGICALREP_BINARY: { int len; + StringInfoData *value = palloc(sizeof(StringInfoData)); + + tuple->format[i] = LOGICALREP_BINARY; + + len = pq_getmsgint(in, 4); /* read length */ - tuple->changed[i] = true; + /* and data */ + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + value->len = len; + value->cursor = 0; + value->maxlen = len; + /* not strictly necessary but the docs say it is required */ + value->data[len] = '\0'; + tuple->values[i] = value; + break; + } + case LOGICALREP_TEXT: + { + int len; + StringInfoData *value = palloc(sizeof(StringInfoData)); + tuple->format[i] = LOGICALREP_TEXT; len = pq_getmsgint(in, 4); /* read length */ /* and data */ - tuple->values[i] = palloc(len + 1); - pq_copymsgbytes(in, tuple->values[i], len); - tuple->values[i][len] = '\0'; + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + value->len = len; + value->data[len] = '\0'; + tuple->values[i] = value; } break; default: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f90a896fc3..844845c473 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -319,13 +319,12 @@ slot_store_error_callback(void *arg) } /* - * Store data in C string form into slot. - * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our - * use better. + * Store data into slot. + * Data can be either text or binary transfer format */ static void -slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values) +slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -351,18 +350,42 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, int remoteattnum = rel->attrmap->attnums[i]; if (!att->attisdropped && remoteattnum >= 0 && - values[remoteattnum] != NULL) + tupleData->values[remoteattnum] != NULL) { - Oid typinput; - Oid typioparam; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); + if (tupleData->format[remoteattnum] == LOGICALREP_BINARY) + { + Oid typreceive; + Oid typioparam; + int cursor; + + cursor = tupleData->values[remoteattnum]->cursor; + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum], + typioparam, att->atttypmod); + + /* + * Do not advance the cursor in case we need to re-read this + * This saves us from pushing all of this type logic into + * proto.c + */ + tupleData->values[remoteattnum]->cursor = cursor; + + } + else + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data, + typioparam, att->atttypmod); + } slot->tts_isnull[i] = false; errarg.local_attnum = -1; @@ -387,9 +410,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, } /* - * Replace selected columns with user data provided as C strings. - * This is somewhat similar to heap_modify_tuple but also calls the type - * input functions on the user data. + * Replace selected columns with user data provided either as C strings or in + * binary format. This is somewhat similar to heap_modify_tuple but also calls + * the type input functions on the user data. * "slot" is filled with a copy of the tuple in "srcslot", with * columns selected by the "replaces" array replaced with data values * from "values". @@ -398,9 +421,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, * need to materialize "slot" at the end to make it independent of "srcslot". */ static void -slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, - LogicalRepRelMapEntry *rel, - char **values, bool *replaces) +slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, + LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -438,21 +461,44 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, if (remoteattnum < 0) continue; - if (!replaces[remoteattnum]) + if (tupleData->format[remoteattnum] == LOGICALREP_UNCHANGED) continue; - if (values[remoteattnum] != NULL) + if (tupleData->values[remoteattnum] != NULL) { - Oid typinput; - Oid typioparam; - errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); + if (tupleData->format[remoteattnum] == LOGICALREP_BINARY) + { + Oid typreceive; + Oid typioparam; + int cursor; + + cursor = tupleData->values[remoteattnum]->cursor; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum], + typioparam, att->atttypmod); + + /* + * Do not advance the cursor in case we need to re-read this + * This saves us from pushing all of this type logic into + * proto.c + */ + tupleData->values[remoteattnum]->cursor = cursor; + } + else + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data, + typioparam, att->atttypmod); + } slot->tts_isnull[i] = false; errarg.local_attnum = -1; @@ -618,8 +664,10 @@ apply_handle_insert(StringInfo s) ensure_transaction(); + /* read the relation id */ relid = logicalrep_read_insert(s, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) { /* @@ -641,7 +689,7 @@ apply_handle_insert(StringInfo s) /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, newtup.values); + slot_store_data(remoteslot, rel, &newtup); slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); @@ -733,9 +781,12 @@ apply_handle_update(StringInfo s) ensure_transaction(); - relid = logicalrep_read_update(s, &has_oldtup, &oldtup, - &newtup); + /* read the relation id */ + relid = pq_getmsgint(s, 4); rel = logicalrep_rel_open(relid, RowExclusiveLock); + + logicalrep_read_update(s, &has_oldtup, &oldtup, + &newtup); if (!should_apply_changes_for_rel(rel)) { /* @@ -765,7 +816,7 @@ apply_handle_update(StringInfo s) target_rte = list_nth(estate->es_range_table, 0); for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) { - if (newtup.changed[i]) + if (newtup.format[i] != LOGICALREP_UNCHANGED) target_rte->updatedCols = bms_add_member(target_rte->updatedCols, i + 1 - FirstLowInvalidHeapAttributeNumber); } @@ -776,8 +827,8 @@ apply_handle_update(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, - has_oldtup ? oldtup.values : newtup.values); + slot_store_data(remoteslot, rel, + has_oldtup ? &oldtup : &newtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply update to correct partition. */ @@ -831,8 +882,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo, { /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_cstrings(remoteslot, localslot, relmapentry, - newtup->values, newtup->changed); + slot_modify_data(remoteslot, localslot, relmapentry, newtup); MemoryContextSwitchTo(oldctx); EvalPlanQualSetSlot(&epqstate, remoteslot); @@ -875,8 +925,11 @@ apply_handle_delete(StringInfo s) ensure_transaction(); - relid = logicalrep_read_delete(s, &oldtup); + /* read the relation id */ + relid = pq_getmsgint(s, 4); rel = logicalrep_rel_open(relid, RowExclusiveLock); + + logicalrep_read_delete(s, &oldtup); if (!should_apply_changes_for_rel(rel)) { /* @@ -900,7 +953,7 @@ apply_handle_delete(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, oldtup.values); + slot_store_data(remoteslot, rel, &oldtup); MemoryContextSwitchTo(oldctx); /* For a partitioned table, apply delete to correct partition. */ @@ -1096,9 +1149,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, if (found) { /* Apply the update. */ - slot_modify_cstrings(remoteslot_part, localslot, - part_entry, - newtup->values, newtup->changed); + slot_modify_data(remoteslot_part, localslot, + part_entry, + newtup); MemoryContextSwitchTo(oldctx); } else @@ -2106,6 +2159,7 @@ ApplyWorkerMain(Datum main_arg) options.slotname = myslotname; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; + options.proto.logical.binary = MySubscription->binary; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 15379e3118..e209ad8aa5 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -20,6 +20,7 @@ #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -118,11 +119,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names) + List **publication_names, bool *binary) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; + bool binary_option_given = false; + + *binary = false; foreach(lc, options) { @@ -138,7 +142,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (protocol_version_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options, \"%s\" already provided", defel->defname))); protocol_version_given = true; if (!scanint8(strVal(defel->arg), true, &parsed)) @@ -159,7 +163,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (publication_names_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options, \"%s\" already provided", defel->defname))); publication_names_given = true; if (!SplitIdentifierString(strVal(defel->arg), ',', @@ -168,6 +172,19 @@ parse_output_parameters(List *options, uint32 *protocol_version, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax"))); } + else if (strcmp(defel->defname, "binary") == 0) + { + if (binary_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options, \"%s\" already provided", defel->defname))); + binary_option_given = true; + + if (!parse_bool(strVal(defel->arg), binary)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid binary option"))); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -202,7 +219,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Parse the params and ERROR if we see any we don't recognize */ parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, - &data->publication_names); + &data->publication_names, + &data->binary); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) @@ -397,6 +415,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: + { HeapTuple tuple = &change->data.tp.newtuple->tuple; @@ -411,7 +430,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, tuple); + logicalrep_write_insert(ctx->out, relation, tuple, data->binary); OutputPluginWrite(ctx, true); break; } @@ -435,7 +454,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, newtuple); + logicalrep_write_update(ctx->out, relation, oldtuple, newtuple, data->binary); OutputPluginWrite(ctx, true); break; } @@ -455,7 +474,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, oldtuple); + logicalrep_write_delete(ctx->out, relation, oldtuple, data->binary); + OutputPluginWrite(ctx, true); } else diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 3b870c3b17..a7fd29085f 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5963,7 +5963,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false}; + false, false, false}; if (pset.sversion < 100000) { @@ -5987,6 +5987,12 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Enabled"), gettext_noop("Publication")); + /* Binary mode is only supported in v14 and higher */ + if (pset.sversion >= 140000) + appendPQExpBuffer(&buf, + ", subbinary AS \"%s\"\n", + gettext_noop("Binary")); + if (verbose) { appendPQExpBuffer(&buf, diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0a756d42d8..e8b3712574 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + bool subbinary; /* True if the subscription wants the output + * plugin to send data in binary */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -73,6 +76,8 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + bool binary; /* Indicates if the subscription wants data in + * binary format */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 4860561be9..3971f007ae 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -27,13 +27,19 @@ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_VERSION_NUM 1 +#define LOGICALREP_UNCHANGED 'u' +#define LOGICALREP_BINARY 'b' +#define LOGICALREP_TEXT 't' + /* Tuple coming via logical replication. */ typedef struct LogicalRepTupleData { - /* column values in text format, or NULL for a null value: */ - char *values[MaxTupleAttributeNumber]; - /* markers for changed/unchanged column values: */ - bool changed[MaxTupleAttributeNumber]; + /* column values */ + StringInfoData **values; + + /* markers for changed/unchanged/binary/text */ + char *format; + } LogicalRepTupleData; typedef uint32 LogicalRepRelId; @@ -87,17 +93,17 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, Relation rel, - HeapTuple newtuple); + HeapTuple newtuple, bool binary); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple); -extern LogicalRepRelId logicalrep_read_update(StringInfo in, - bool *has_oldtuple, LogicalRepTupleData *oldtup, - LogicalRepTupleData *newtup); + HeapTuple newtuple, bool binary); +extern void logicalrep_read_update(StringInfo in, + bool *has_oldtuple, LogicalRepTupleData *oldtup, + LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, Relation rel, - HeapTuple oldtuple); -extern LogicalRepRelId logicalrep_read_delete(StringInfo in, - LogicalRepTupleData *oldtup); + HeapTuple oldtuple, bool binary); +extern void logicalrep_read_delete(StringInfo in, + LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 2e8e9daf44..a2d76689ed 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -25,6 +25,7 @@ typedef struct PGOutputData List *publication_names; List *publications; + bool binary; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c75dcebea0..7cd1458819 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -177,6 +177,7 @@ typedef struct { uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ + bool binary; /* Ask publisher output plugin to use binary */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index e7add9d2b8..af6ed982ee 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -91,27 +91,27 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2 (1 row) BEGIN; ALTER SUBSCRIPTION regress_testsub ENABLE; \dRs - List of subscriptions - Name | Owner | Enabled | Publication ------------------+---------------------------+---------+--------------------- - regress_testsub | regress_subscription_user | t | {testpub2,testpub3} + List of subscriptions + Name | Owner | Enabled | Publication | Binary +-----------------+---------------------------+---------+---------------------+-------- + regress_testsub | regress_subscription_user | t | {testpub2,testpub3} | f (1 row) ALTER SUBSCRIPTION regress_testsub DISABLE; \dRs - List of subscriptions - Name | Owner | Enabled | Publication ------------------+---------------------------+---------+--------------------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} + List of subscriptions + Name | Owner | Enabled | Publication | Binary +-----------------+---------------------------+---------+---------------------+-------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f (1 row) COMMIT; @@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -155,6 +155,22 @@ DROP SUBSCRIPTION IF EXISTS regress_testsub; NOTICE: subscription "regress_testsub" does not exist, skipping DROP SUBSCRIPTION regress_testsub; -- fail ERROR: subscription "regress_testsub" does not exist +-- fail - binary must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo); +ERROR: binary requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (binary = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 9e234ab8b3..835bd05721 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -117,6 +117,18 @@ COMMIT; DROP SUBSCRIPTION IF EXISTS regress_testsub; DROP SUBSCRIPTION regress_testsub; -- fail +-- fail - binary must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (binary = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl new file mode 100644 index 0000000000..a252f470ef --- /dev/null +++ b/src/test/subscription/t/014_binary.pl @@ -0,0 +1,108 @@ +# Binary mode logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +# Create and initialize a publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create and initialize subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $ddl = qq( + CREATE TABLE public.test_numerical ( + a INTEGER PRIMARY KEY, + b NUMERIC, + c FLOAT, + d BIGINT + ); + CREATE TABLE public.test_arrays ( + a INTEGER[] PRIMARY KEY, + b NUMERIC[], + c TEXT[] + );); + +# Create content on both sides of the replication +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Configure logical replication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tpub FOR ALL TABLES"); + +my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' " . + "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)"); + +# Ensure nodes are in sync with eachother +$node_publisher->wait_for_catchup('tsub'); +$node_subscriber->poll_query_until('postgres', + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');") + or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert some content and make sure it's replicated across +$node_publisher->safe_psql('postgres', qq( + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), + ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); + + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (1, 1.2, 1.3, 10), + (2, 2.2, 2.3, 20), + (3, 3.2, 3.3, 30); + )); + +$node_publisher->wait_for_catchup('tsub'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is($result, '1|1.2|1.3|10 +2|2.2|2.3|20 +3|3.2|3.3|30', 'check replicated data on subscriber'); + +# Test to reset back to text formatting, and then to binary again +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tsub SET (binary = false);"); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (4, 4.2, 4.3, 40); + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is($result, '1|1.2|1.3|10 +2|2.2|2.3|20 +3|3.2|3.3|30 +4|4.2|4.3|40', 'check replicated data on subscriber'); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tsub SET (binary = true);"); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}'); + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c FROM test_arrays ORDER BY a"); + +is($result, '{1,2,3}|{1.1,1.2,1.3}|{one,two,three} +{2,3,1}|{1.2,1.3,1.1}|{two,three,one} +{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check replicated data on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.21.1 (Apple Git-122.3)