From 36e1c2fdf754d89ca817440eab87edeb87cfe96e Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Wed, 1 Aug 2018 11:04:29 -0400 Subject: [PATCH] Ensure pg_create_logical_replication_slot fails if the output plugin does not exist Error early when creating DecodingContext before sending CopyBoth in the event that the output plugin does not exist --- contrib/test_decoding/expected/slot.out | 2 ++ contrib/test_decoding/sql/slot.sql | 2 ++ src/backend/replication/logical/logical.c | 2 +- src/backend/replication/walsender.c | 24 ++++++++++++++---------- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 2737a8a..523621a 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -30,6 +30,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'tes init (1 row) +SELECT pg_create_logical_replication_slot('foo', 'nonexistent'); +ERROR: could not access file "nonexistent": No such file or directory -- here we want to start a new session and wait till old one is gone select pg_backend_pid() as oldpid \gset \c - diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 24cdf71..c8d08f8 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -9,6 +9,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true); +SELECT pg_create_logical_replication_slot('foo', 'nonexistent'); + -- here we want to start a new session and wait till old one is gone select pg_backend_pid() as oldpid \gset \c - diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3cd4eef..bb83fc9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -312,7 +312,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - need_full_snapshot, true, + need_full_snapshot, false, read_page, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d60026d..c0e4acc 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1068,6 +1068,20 @@ StartLogicalReplication(StartReplicationCmd *cmd) got_STOPPING = true; } + /* + * Initialize position to the last ack'ed one, then the xlog records begin + * to be shipped from that position. + * This is done before sending CopyBoth in the event there is an error + * the error message will be returned early + */ + logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, + false, + logical_read_xlog_page, + WalSndPrepareWrite, + WalSndWriteData, + WalSndUpdateProgress); + + WalSndSetState(WALSNDSTATE_CATCHUP); /* Send a CopyBothResponse message, and start streaming */ @@ -1077,16 +1091,6 @@ StartLogicalReplication(StartReplicationCmd *cmd) pq_endmessage(&buf); pq_flush(); - /* - * Initialize position to the last ack'ed one, then the xlog records begin - * to be shipped from that position. - */ - logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, - false, - logical_read_xlog_page, - WalSndPrepareWrite, - WalSndWriteData, - WalSndUpdateProgress); /* Start reading WAL from the oldest required WAL. */ logical_startptr = MyReplicationSlot->data.restart_lsn; -- 2.6.4