From f3631c2b668c5d17423d2d205ef08daf179e8e51 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Thu, 9 Mar 2017 14:20:28 +0100 Subject: [PATCH 1/2] Add option to control snapshot export to CREATE_REPLICATION_SLOT We used to export snapshots unconditionally in CREATE_REPLICATION_SLOT, but several upcoming patches want more control over what happens with the slot. --- doc/src/sgml/protocol.sgml | 16 +++++- src/backend/commands/subscriptioncmds.c | 6 ++- .../libpqwalreceiver/libpqwalreceiver.c | 15 ++++-- src/backend/replication/repl_gram.y | 43 ++++++++++++---- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/walsender.c | 58 ++++++++++++++++++++-- src/include/nodes/replnodes.h | 2 +- src/include/replication/walreceiver.h | 6 +-- 8 files changed, 125 insertions(+), 23 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 3d6e8ee..95603d3 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1487,7 +1487,7 @@ The commands accepted in walsender mode are: - CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin } + CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT ] } CREATE_REPLICATION_SLOT @@ -1538,6 +1538,20 @@ The commands accepted in walsender mode are: + + EXPORT_SNAPSHOT + NOEXPORT_SNAPSHOT + + + Decides what to do with the snapshot created during logical slot + initialization. EXPORT_SNAPSHOT, which is the + default, will export the snapshot for use in other sessions. This + option can't be used inside a transaction. The + NOEXPORT_SNAPSHOT will just use the snapshot for logical + decoding as normal but won't do anything else with it. + + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1868bf5..e300245 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -314,7 +314,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) PG_TRY(); { - walrcv_create_slot(wrconn, slotname, false, &lsn); + /* + * Create permanent slot for the subscription, we won't use + * the initial snapshot for anything so no need to export it. + */ + walrcv_create_slot(wrconn, slotname, false, false, &lsn); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname))); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index ebadf36..cd2e578 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -68,6 +68,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, + bool export_snapshot, XLogRecPtr *lsn); static bool libpqrcv_command(WalReceiverConn *conn, const char *cmd, char **err); @@ -720,7 +721,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, XLogRecPtr *lsn) + bool temporary, bool export_snapshot, XLogRecPtr *lsn) { PGresult *res; StringInfoData cmd; @@ -728,13 +729,19 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, initStringInfo(&cmd); - appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname); + appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname); if (temporary) - appendStringInfo(&cmd, "TEMPORARY "); + appendStringInfo(&cmd, " TEMPORARY"); if (conn->logical) - appendStringInfo(&cmd, "LOGICAL pgoutput"); + { + appendStringInfo(&cmd, " LOGICAL pgoutput"); + if (export_snapshot) + appendStringInfo(&cmd, " EXPORT_SNAPSHOT"); + else + appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT"); + } res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index b35d0f0..f1e43bc 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -79,6 +79,8 @@ Node *replication_parse_result; %token K_SLOT %token K_RESERVE_WAL %token K_TEMPORARY +%token K_EXPORT_SNAPSHOT +%token K_NOEXPORT_SNAPSHOT %type command %type base_backup start_replication start_logical_replication @@ -91,7 +93,9 @@ Node *replication_parse_result; %type plugin_opt_elem %type plugin_opt_arg %type opt_slot var_name -%type opt_reserve_wal opt_temporary +%type opt_temporary +%type create_slot_opt_list +%type create_slot_opt %% @@ -202,18 +206,18 @@ base_backup_opt: create_replication_slot: /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ - K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal + K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_PHYSICAL; cmd->slotname = $2; cmd->temporary = $3; - cmd->reserve_wal = $5; + cmd->options = $5; $$ = (Node *) cmd; } /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT + | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); @@ -221,10 +225,36 @@ create_replication_slot: cmd->slotname = $2; cmd->temporary = $3; cmd->plugin = $5; + cmd->options = $6; $$ = (Node *) cmd; } ; +create_slot_opt_list: + create_slot_opt_list create_slot_opt + { $$ = lappend($1, $2); } + | /* EMPTY */ + { $$ = NIL; } + ; + +create_slot_opt: + K_EXPORT_SNAPSHOT + { + $$ = makeDefElem("export_snapshot", + (Node *)makeInteger(TRUE), -1); + } + | K_NOEXPORT_SNAPSHOT + { + $$ = makeDefElem("export_snapshot", + (Node *)makeInteger(FALSE), -1); + } + | K_RESERVE_WAL + { + $$ = makeDefElem("reserve_wal", + (Node *)makeInteger(TRUE), -1); + } + ; + /* DROP_REPLICATION_SLOT slot */ drop_replication_slot: K_DROP_REPLICATION_SLOT IDENT @@ -291,11 +321,6 @@ opt_physical: | /* EMPTY */ ; -opt_reserve_wal: - K_RESERVE_WAL { $$ = true; } - | /* EMPTY */ { $$ = false; } - ; - opt_temporary: K_TEMPORARY { $$ = true; } | /* EMPTY */ { $$ = false; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 37f8579..f56d41d 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -100,6 +100,8 @@ RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } +EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; } +NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; } "," { return ','; } ";" { return ';'; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index dd3a936..127efec 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -51,6 +51,7 @@ #include "catalog/pg_type.h" #include "commands/dbcommands.h" +#include "commands/defrem.h" #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -738,6 +739,48 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req } /* + * Process extra options given to CREATE_REPLICATION_SLOT. + */ +static void +parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, + bool *reserve_wal, + bool *export_snapshot) +{ + ListCell *lc; + bool snapshot_action_given = false; + bool reserve_wal_given = false; + + /* Parse options */ + foreach (lc, cmd->options) + { + DefElem *defel = (DefElem *) lfirst(lc); + + if (strcmp(defel->defname, "export_snapshot") == 0) + { + if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + snapshot_action_given = true; + *export_snapshot = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "reserve_wal") == 0) + { + if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + reserve_wal_given = true; + *reserve_wal = true; + } + else + elog(ERROR, "unrecognized option: %s", defel->defname); + } +} + +/* * Create a new replication slot. */ static void @@ -746,6 +789,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) const char *snapshot_name = NULL; char xpos[MAXFNAMELEN]; char *slot_name; + bool reserve_wal = false; + bool export_snapshot = true; DestReceiver *dest; TupOutputState *tstate; TupleDesc tupdesc; @@ -754,6 +799,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); + parseCreateReplSlotOptions(cmd, &reserve_wal, &export_snapshot); + /* setup state for XLogReadPage */ sendTimeLineIsHistoric = false; sendTimeLine = ThisTimeLineID; @@ -799,10 +846,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) DecodingContextFindStartpoint(ctx); /* - * Export a plain (not of the snapbuild.c type) snapshot to the user - * that can be imported into another session. + * Export the snapshot if we've been asked to do so. + * + * NB. We will convert the snapbuild.c kind of snapshot to normal + * snapshot when doing this. */ - snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); + if (export_snapshot) + snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); /* don't need the decoding context anymore */ FreeDecodingContext(ctx); @@ -810,7 +860,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (!cmd->temporary) ReplicationSlotPersist(); } - else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal) + else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal) { ReplicationSlotReserveWal(); diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index f27354f..996da3c 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -56,7 +56,7 @@ typedef struct CreateReplicationSlotCmd ReplicationKind kind; char *plugin; bool temporary; - bool reserve_wal; + List *options; } CreateReplicationSlotCmd; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 0857bdc..78e577c 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -183,7 +183,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer, int nbytes); typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, - XLogRecPtr *lsn); + bool export_snapshot, XLogRecPtr *lsn); typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd, char **err); typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); @@ -224,8 +224,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn) +#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) #define walrcv_command(conn, cmd, err) \ WalReceiverFunctions->walrcv_command(conn, cmd, err) #define walrcv_disconnect(conn) \ -- 2.7.4