*** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 1151,1156 **** static void --- 1151,1158 ---- WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write) { + TimestampTz now = GetCurrentTimestamp(); + /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); *************** *** 1160,1235 **** WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, * several releases by streaming physical replication. */ resetStringInfo(&tmpbuf); ! pq_sendint64(&tmpbuf, GetCurrentTimestamp()); memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - /* fast path */ - /* Try to flush pending output to the client */ - if (pq_flush_if_writable() != 0) - WalSndShutdown(); - - if (!pq_is_send_pending()) - return; - for (;;) { int wakeEvents; long sleeptime; TimestampTz now; /* ! * Emergency bailout if postmaster has died. This is to avoid the ! * necessity for manual cleanup of all postmaster children. */ ! if (!PostmasterIsAlive()) ! exit(1); ! ! /* Clear any already-pending wakeups */ ! ResetLatch(MyLatch); ! ! CHECK_FOR_INTERRUPTS(); ! ! /* Process any requests or signals received recently */ ! if (ConfigReloadPending) ! { ! ConfigReloadPending = false; ! ProcessConfigFile(PGC_SIGHUP); ! SyncRepInitConfig(); ! } ! ! /* Check for input from the client */ ! ProcessRepliesIfAny(); ! ! /* Try to flush pending output to the client */ ! if (pq_flush_if_writable() != 0) ! WalSndShutdown(); ! ! /* If we finished clearing the buffered data, we're done here. */ ! if (!pq_is_send_pending()) ! break; ! now = GetCurrentTimestamp(); - - /* die if timeout was reached */ - WalSndCheckTimeOut(now); - - /* Send keepalive if the time has come */ - WalSndKeepaliveIfNecessary(now); - - sleeptime = WalSndComputeSleeptime(now); - - wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | - WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; - - /* Sleep until something happens or we time out */ - WaitLatchOrSocket(MyLatch, wakeEvents, - MyProcPort->sock, sleeptime, - WAIT_EVENT_WAL_SENDER_WRITE_DATA); } - - /* reactivate latch so WalSndLoop knows to continue */ - SetLatch(MyLatch); } /* --- 1162,1238 ---- * several releases by streaming physical replication. */ resetStringInfo(&tmpbuf); ! pq_sendint64(&tmpbuf, now); memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); for (;;) { int wakeEvents; long sleeptime; TimestampTz now; + if (now <= TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2)) + { + /* fast path */ + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + } + else + { + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive()) + exit(1); + + CHECK_FOR_INTERRUPTS(); + + /* Process any requests or signals received recently */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + } + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + /* If we finished clearing the buffered data, we're done here. */ + if (!pq_is_send_pending()) + break; + + /* die if timeout was reached */ + WalSndCheckTimeOut(now); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(now); + } + + /* return if finished */ + if (!pq_is_send_pending()) + return; + /* ! * Send buffer is filled up. Being different from WalSndLoop, since ! * this function is called at a commit time for all the modifications ! * within the commit, we should continue sending data without waiting ! * for the next WAL record. ! * ! * Not sure how long we should wait for a room to send more data, but ! * 1ms would be sufficient. */ ! pg_usleep(1000); now = GetCurrentTimestamp(); } } /*