From 9ad3dda95b96137232aef4be7632389921830892 Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Thu, 2 Jul 2020 12:15:17 -0400 Subject: [PATCH 2/8] add binary column to pg_subscription support create and alter subcription with binary option --- src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 39 +++++++++++++++---- .../libpqwalreceiver/libpqwalreceiver.c | 3 ++ src/backend/replication/logical/worker.c | 1 + src/include/catalog/pg_subscription.h | 4 ++ src/include/replication/walreceiver.h | 1 + 6 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5314e9348f..9a853fe48d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1125,7 +1125,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications) +GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9ebb026187..f2590ff565 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -59,7 +59,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, - bool *refresh) + bool *refresh, bool *binary_given, bool *binary) { ListCell *lc; bool connect_given = false; @@ -90,6 +90,12 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *synchronous_commit = NULL; if (refresh) *refresh = true; + if (binary) + { + *binary_given = false; + /* not all versions of pgoutput will understand this option default to false */ + *binary = false; + } /* Parse options */ foreach(lc, options) @@ -175,6 +181,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, refresh_given = true; *refresh = defGetBoolean(defel); } + else if (strcmp(defel->defname, "binary") == 0 && binary) + { + *binary_given = true; + *binary = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -324,8 +335,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool slotname_given; char originname[NAMEDATALEN]; bool create_slot; - List *publications; + bool binary; + bool binary_given; + List *publications; /* * Parse and check options. * @@ -334,7 +347,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) parse_subscription_options(stmt->options, &connect, &enabled_given, &enabled, &create_slot, &slotname_given, &slotname, ©_data, &synchronous_commit, - NULL); + NULL, &binary_given, &binary); /* * Since creating a replication slot is not transactional, rolling back @@ -400,6 +413,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -669,10 +683,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *slotname; bool slotname_given; char *synchronous_commit; + bool binary_given; + bool binary; parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, &slotname_given, &slotname, - NULL, &synchronous_commit, NULL); + NULL, &synchronous_commit, NULL, + &binary_given, &binary); if (slotname_given) { @@ -697,6 +714,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subsynccommit - 1] = true; } + if (binary_given) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(binary); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + update_tuple = true; break; } @@ -708,7 +732,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL, NULL, NULL); + NULL, NULL, NULL, NULL, NULL, + NULL, NULL); Assert(enabled_given); if (!sub->slotname && enabled) @@ -746,7 +771,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, &refresh); + NULL, &refresh, NULL, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -783,7 +808,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, NULL); + NULL, NULL, NULL, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e4fd1f9bb6..2a9d966ec1 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -402,6 +402,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn, char *pubnames_str; List *pubnames; char *pubnames_literal; + bool binary; appendStringInfoString(&cmd, " ("); @@ -423,6 +424,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); PQfreemem(pubnames_literal); pfree(pubnames_str); + if (options->proto.logical.binary) + appendStringInfo(&cmd, ", binary 'true'"); appendStringInfoChar(&cmd, ')'); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c958c4d8a7..b46e8991cd 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2164,6 +2164,7 @@ ApplyWorkerMain(Datum main_arg) options.slotname = myslotname; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; + options.proto.logical.binary = MySubscription->binary; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0a756d42d8..100789a52f 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + bool subbinary; /* True if the subscription wants the + * output plugin to send data in binary */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -73,6 +76,7 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + bool binary; /* Indicates if the subscription wants data in binary format */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c75dcebea0..1ee316f6d7 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -177,6 +177,7 @@ typedef struct { uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ + bool binary; /* Ask publisher output plugin to use binary */ } logical; } proto; } WalRcvStreamOptions; -- 2.20.1 (Apple Git-117)