From 14c0ac1aacfacb3b6b42bf0c5e453ab4e989aa2c Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Tue, 10 May 2016 10:34:10 +0800 Subject: [PATCH] Respect client-initiated CopyDone in walsender --- src/backend/replication/walsender.c | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 926a247..4e2164e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -759,6 +759,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); + /* + * If the client sent CopyDone while we were waiting, + * bail out so we can wind up the decoding session. + */ + if (streamingDoneSending) + return -1; + /* more than one block available */ if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; @@ -1220,8 +1227,11 @@ WalSndWaitForWal(XLogRecPtr loc) * It's important to do this check after the recomputation of * RecentFlushPtr, so we can send all remaining data before shutting * down. + * + * We'll also exit here if the client sent CopyDone because it wants + * to return to command mode. */ - if (walsender_ready_to_stop) + if (walsender_ready_to_stop || streamingDoneReceiving) break; /* @@ -1789,7 +1799,14 @@ WalSndCheckTimeOut(TimestampTz now) } } -/* Main loop of walsender process that streams the WAL over Copy messages. */ +/* + * Main loop of walsender process that streams the WAL over Copy messages. + * + * The send_data callback must enqueue complete CopyData messages to libpq + * using pq_putmessage_noblock or similar, since the walsender loop may send + * CopyDone then exit and return to command mode in response to a client + * CopyDone between calls to send_data. + */ static void WalSndLoop(WalSndSendDataCallback send_data) { @@ -1852,10 +1869,15 @@ WalSndLoop(WalSndSendDataCallback send_data) * some more. If there is some, we don't bother to call send_data * again until we've flushed it ... but we'd better assume we are not * caught up. + * + * If we're trying to finish sending and exit we shouldn't enqueue more + * data to libpq. We need to finish writing out whatever we already + * have in libpq's send buffer to maintain protocol sync so we still + * need to loop until it's flushed. */ - if (!pq_is_send_pending()) + if (!pq_is_send_pending() && !streamingDoneSending) send_data(); - else + else if (!streamingDoneSending) WalSndCaughtUp = false; /* Try to flush pending output to the client */ @@ -2911,7 +2933,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now) if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) return; - if (waiting_for_ping_response) + if (waiting_for_ping_response || streamingDoneSending) return; /* -- 2.5.5