diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 2485e6190dc..9bf46f0ba13 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2905,6 +2905,36 @@ include_dir 'conf.d' across the cluster without problems if that is required. + + All Servers + + These parameters can be set on the primary or any standby. + + + + synchronous_replay (boolean) + + synchronous_replay configuration parameter + + + + + Enables causal consistency between transactions run on different + servers. A transaction that is run on a standby + with synchronous_replay set to on is + guaranteed either to see the effects of all completed transactions + run on the primary with the setting on, or to receive an error + "standby is not available for synchronous replay". Note that both + transactions involved in a causal dependency (a write on the primary + followed by a read on any server which must see the write) must be + run with the setting on. See for + more details. + + + + + + Sending Server(s) @@ -3206,6 +3236,66 @@ ANY num_sync ( . + + + + + + synchronous_replay_lease_time (integer) + + synchronous_replay_lease_time configuration + parameter + + + + + Specifies the duration of 'leases' sent by the primary server to + standbys granting them the right to run synchronous replay queries for + a limited time. This affects the rate at which replacement leases + must be sent and the wait time if contact is lost with a standby, as + described in . + + + + + + synchronous_replay_standby_names (string) + + synchronous_replay_standby_names configuration parameter + + + + + Specifies a comma-separated list of standby names that can support + synchronous replay, as described in + . Follows the same convention + as synchronous_standby_name. + The default is *, matching all standbys. + + + This setting has no effect if synchronous_replay_max_lag + is not set. + + + + diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index e41df791b76..b8ff329e1ea 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1115,7 +1115,7 @@ primary_slot_name = 'node_a_slot' cause each commit to wait until the current synchronous standbys report that they have replayed the transaction, making it visible to user queries. In simple cases, this allows for load balancing with causal - consistency. + consistency. See also . @@ -1313,6 +1313,119 @@ synchronous_standby_names = 'ANY 2 (s1, s2, s3)' + + Synchronous replay + + synchronous replay + in standby + + + + The synchronous replay feature allows read-only queries to run on hot + standby servers without exposing stale data to the client, providing a + form of causal consistency. Transactions can run on any standby with the + following guarantee about the visibility of preceding transactions: If you + set synchronous_replay to on in any pair of + consecutive transactions tx1, tx2 where tx2 begins after tx1 successfully + returns, then tx2 will either see tx1 or fail with a new error "standby is + not available for synchronous replay", no matter which server it runs on. + Although the guarantee is expressed in terms of two individual + transactions, the GUC can also be set at session, role or system level to + make the guarantee generally, allowing for load balancing of applications + that were not designed with load balancing in mind. + + + + In order to enable the feature, synchronous_replay_max_lag + must be set to a non-zero value on the primary server. The + GUC synchronous_replay_standby_names can be used to limit the + set of standbys that can join the dynamic set of synchronous replay + standbys by providing a comma-separated list of application names. By + default, all standbys are candidates, if the feature is enabled. + + + + The current set of servers that the primary considers to be available for + synchronous replay can be seen in + the pg_stat_replication + view. Administrators, applications and load balancing middleware can use + this view to discover standbys that can currently handle synchronous + replay transactions without raising the error. Since that information is + only an instantantaneous snapshot, clients should still be prepared for + the error to be raised at any time, and consider redirecting transactions + to another standby. + + + + The advantages of the synchronous replay feature over simply + setting synchronous_commit to remote_apply are: + + + + It provides certainty about exactly which standbys can see a + transaction. + + + + + It places a configurable limit on how much replay lag (and therefore + delay at commit time) the primary tolerates from standbys before it + drops them from the dynamic set of standbys it waits for. + + + + + It upholds the synchronous replay guarantee during the transitions that + occur when new standbys are added or removed from the set of standbys, + including scenarios where contact has been lost between the primary + and standbys but the standby is still alive and running client + queries. + + + + + + + The protocol used to uphold the guarantee even in the case of network + failure depends on the system clocks of the primary and standby servers + being synchronized, with an allowance for a difference up to one quarter + of synchronous_replay_lease_time. For example, + if synchronous_replay_lease_time is set to 5s, + then the clocks must not be more than 1.25 second apart for the guarantee + to be upheld reliably during transitions. The ubiquity of the Network + Time Protocol (NTP) on modern operating systems and availability of high + quality time servers makes it possible to choose a tolerance significantly + higher than the maximum expected clock difference. An effort is + nevertheless made to detect and report misconfigured and faulty systems + with clock differences greater than the configured tolerance. + + + + + Current hardware clocks, NTP implementations and public time servers are + unlikely to allow the system clocks to differ more than tens or hundreds + of milliseconds, and systems synchronized with dedicated local time + servers may be considerably more accurate, but you should only consider + setting synchronous_replay_lease_time below the default of 5 + seconds (allowing up to 1.25 second of clock difference) after + researching your time synchronization infrastructure thoroughly. + + + + + + While similar to synchronous commit in the sense that both involve the + primary server waiting for responses from standby servers, the + synchronous replay feature is not concerned with avoiding data loss. A + primary configured for synchronous replay will drop all standbys that + stop responding or replay too slowly from the dynamic set that it waits + for, so you should consider configuring both synchronous replication and + synchronous replay if you need data loss avoidance guarantees and causal + consistency guarantees for load balancing. + + + + Continuous archiving in standby @@ -1661,7 +1774,16 @@ if (!triggered) so there will be a measurable delay between primary and standby. Running the same query nearly simultaneously on both primary and standby might therefore return differing results. We say that data on the standby is - eventually consistent with the primary. Once the + eventually consistent with the primary by default. + The data visible to a transaction running on a standby can be + made causally consistent with respect to a transaction that + has completed on the primary by setting synchronous_replay + to on in both transactions. For more details, + see . + + + + Once the commit record for a transaction is replayed on the standby, the changes made by that transaction will be visible to any new snapshots taken on the standby. Snapshots may be taken at the start of each query or at the diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index be3dc672bcc..c48243362c2 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1790,6 +1790,17 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + + sync_replay + text + Synchronous replay state of this standby server. This field will be + non-null only if synchronous_replay_max_lag is set. If a standby is + in available state, then it can currently serve synchronous replay + queries. If it is not replaying fast enough or not responding to + keepalive messages, it will be in unavailable state, and if + it is currently transitioning to availability it will be + in joining state for a short time. + diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b0aa69fe4b4..deb14e346a5 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5149,7 +5149,7 @@ XactLogCommitRecord(TimestampTz commit_time, * Check if the caller would like to ask standbys for immediate feedback * once this commit is applied. */ - if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY || synchronous_replay) xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK; /* diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 0fdad0c1197..cc8b565386f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -732,7 +732,8 @@ CREATE VIEW pg_stat_replication AS W.flush_lag, W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.sync_replay FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index a0b0eecbd5e..b3074a6578e 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3606,6 +3606,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_SYNC_REPLAY: + event_name = "SyncReplay"; + break; case WAIT_EVENT_LOGICAL_SYNC_DATA: event_name = "LogicalSyncData"; break; @@ -3640,6 +3643,9 @@ pgstat_get_wait_timeout(WaitEventTimeout w) case WAIT_EVENT_RECOVERY_APPLY_DELAY: event_name = "RecoveryApplyDelay"; break; + case WAIT_EVENT_SYNC_REPLAY_LEASE_REVOKE: + event_name = "SyncReplayLeaseRevoke"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 898c497d12c..3eb79a0fd2b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1295,6 +1295,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) pq_sendint64(reply_message, writepos); /* apply */ pq_sendint64(reply_message, now); /* sendTime */ pq_sendbyte(reply_message, requestReply); /* replyRequested */ + pq_sendint64(reply_message, -1); /* replyTo */ elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X", force, diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 5fd47689dd2..d794bef1d54 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -85,6 +85,13 @@ #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/ps_status.h" +#include "utils/varlena.h" + +/* GUC variables */ +int synchronous_replay_max_lag; +int synchronous_replay_lease_time; +bool synchronous_replay; +char *synchronous_replay_standby_names; /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; @@ -99,7 +106,9 @@ static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); -static int SyncRepWakeQueue(bool all, int mode); +static int SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn); + +static bool SyncRepCheckForEarlyExit(void); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, @@ -129,6 +138,229 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); */ /* + * Check if we can stop waiting for synchronous replay. We can stop waiting + * when the following conditions are met: + * + * 1. All walsenders currently in 'joining' or 'available' state have + * applied the target LSN. + * + * 2. All revoked leases have been acknowledged by the relevant standby or + * expired, so we know that the standby has started rejecting synchronous + * replay transactions. + * + * The output parameter 'waitingFor' is set to the number of nodes we are + * currently waiting for. The output parameters 'stallTimeMillis' is set to + * the number of milliseconds we need to wait for because a lease has been + * revoked. + * + * Returns true if commit can return control, because every standby has either + * applied the LSN or started rejecting synchronous replay transactions. + */ +static bool +SyncReplayCommitCanReturn(XLogRecPtr XactCommitLSN, + int *waitingFor, + long *stallTimeMillis) +{ + TimestampTz now = GetCurrentTimestamp(); + TimestampTz stallTime = 0; + int i; + + /* Count how many joining/available nodes we are waiting for. */ + *waitingFor = 0; + + for (i = 0; i < max_wal_senders; ++i) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + if (walsnd->pid != 0) + { + /* + * We need to hold the spinlock to read LSNs, because we can't be + * sure they can be read atomically. + */ + SpinLockAcquire(&walsnd->mutex); + if (walsnd->pid != 0) + { + switch (walsnd->syncReplayState) + { + case SYNC_REPLAY_UNAVAILABLE: + /* Nothing to wait for. */ + break; + case SYNC_REPLAY_JOINING: + case SYNC_REPLAY_AVAILABLE: + /* + * We have to wait until this standby tells us that is has + * replayed the commit record. + */ + if (walsnd->apply < XactCommitLSN) + ++*waitingFor; + break; + case SYNC_REPLAY_REVOKING: + /* + * We have to hold up commits until this standby + * acknowledges that its lease was revoked, or we know the + * most recently sent lease has expired anyway, whichever + * comes first. One way or the other, we don't release + * until this standby has started raising an error for + * synchronous replay transactions. + */ + if (walsnd->revokingUntil > now) + { + ++*waitingFor; + stallTime = Max(stallTime, walsnd->revokingUntil); + } + break; + } + } + SpinLockRelease(&walsnd->mutex); + } + } + + /* + * If a walsender has exitted uncleanly, then it writes itsrevoking wait + * time into a shared space before it gives up its WalSnd slot. So we + * have to wait for that too. + */ + LWLockAcquire(SyncRepLock, LW_SHARED); + if (WalSndCtl->revokingUntil > now) + { + long seconds; + int usecs; + + /* Compute how long we have to wait, rounded up to nearest ms. */ + TimestampDifference(now, WalSndCtl->revokingUntil, + &seconds, &usecs); + *stallTimeMillis = seconds * 1000 + (usecs + 999) / 1000; + } + else + *stallTimeMillis = 0; + LWLockRelease(SyncRepLock); + + /* We are done if we are not waiting for any nodes or stalls. */ + return *waitingFor == 0 && *stallTimeMillis == 0; +} + +/* + * Wait for all standbys in "available" and "joining" standbys to replay + * XactCommitLSN, and all "revoking" standbys' leases to be revoked. By the + * time we return, every standby will either have replayed XactCommitLSN or + * will have no lease, so an error would be raised if anyone tries to obtain a + * snapshot with synchronous_replay = on. + */ +static void +SyncReplayWaitForLSN(XLogRecPtr XactCommitLSN) +{ + long stallTimeMillis; + int waitingFor; + char *ps_display_buffer = NULL; + + for (;;) + { + /* Reset latch before checking state. */ + ResetLatch(MyLatch); + + /* + * Join the queue to be woken up if any synchronous replay + * joining/available standby applies XactCommitLSN or the set of + * synchronous replay standbys changes (if we aren't already in the + * queue). We don't actually know if we need to wait for any peers to + * reach the target LSN yet, but we have to register just in case + * before checking the walsenders' state to avoid a race condition + * that could occur if we did it after calling + * SynchronousReplayCommitCanReturn. (SyncRepWaitForLSN doesn't have + * to do this because it can check the highest-seen LSN in + * walsndctl->lsn[mode] which is protected by SyncRepLock, the same + * lock as the queues. We can't do that here, because there is no + * single highest-seen LSN that is useful. We must check + * walsnd->apply for all relevant walsenders. Therefore we must + * register for notifications first, so that we can be notified via + * our latch of any standby applying the LSN we're interested in after + * we check but before we start waiting, or we could wait forever for + * something that has already happened.) + */ + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + if (MyProc->syncRepState != SYNC_REP_WAITING) + { + MyProc->waitLSN = XactCommitLSN; + MyProc->syncRepState = SYNC_REP_WAITING; + SyncRepQueueInsert(SYNC_REP_WAIT_SYNC_REPLAY); + Assert(SyncRepQueueIsOrderedByLSN(SYNC_REP_WAIT_SYNC_REPLAY)); + } + LWLockRelease(SyncRepLock); + + /* Check if we're done. */ + if (SyncReplayCommitCanReturn(XactCommitLSN, &waitingFor, + &stallTimeMillis)) + { + SyncRepCancelWait(); + break; + } + + Assert(waitingFor > 0 || stallTimeMillis > 0); + + /* If we aren't actually waiting for any standbys, leave the queue. */ + if (waitingFor == 0) + SyncRepCancelWait(); + + /* Update the ps title. */ + if (update_process_title) + { + char buffer[80]; + + /* Remember the old value if this is our first update. */ + if (ps_display_buffer == NULL) + { + int len; + const char *ps_display = get_ps_display(&len); + + ps_display_buffer = palloc(len + 1); + memcpy(ps_display_buffer, ps_display, len); + ps_display_buffer[len] = '\0'; + } + + snprintf(buffer, sizeof(buffer), + "waiting for %d peer(s) to apply %X/%X%s", + waitingFor, + (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN, + stallTimeMillis > 0 ? " (revoking)" : ""); + set_ps_display(buffer, false); + } + + /* Check if we need to exit early due to postmaster death etc. */ + if (SyncRepCheckForEarlyExit()) /* Calls SyncRepCancelWait() if true. */ + break; + + /* + * If are still waiting for peers, then we wait for any joining or + * available peer to reach the LSN (or possibly stop being in one of + * those states or go away). + * + * If not, there must be a non-zero stall time, so we wait for that to + * elapse. + */ + if (waitingFor > 0) + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, + WAIT_EVENT_SYNC_REPLAY); + else + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, + stallTimeMillis, + WAIT_EVENT_SYNC_REPLAY_LEASE_REVOKE); + } + + /* There is no way out of the loop that could leave us in the queue. */ + Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + MyProc->syncRepState = SYNC_REP_NOT_WAITING; + MyProc->waitLSN = 0; + + /* Restore the ps display. */ + if (ps_display_buffer != NULL) + { + set_ps_display(ps_display_buffer, false); + pfree(ps_display_buffer); + } +} + +/* * Wait for synchronous replication, if requested by user. * * Initially backends start in state SYNC_REP_NOT_WAITING and then @@ -149,11 +381,9 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) const char *old_status; int mode; - /* Cap the level for anything other than commit to remote flush only. */ - if (commit) - mode = SyncRepWaitMode; - else - mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); + /* Wait for synchronous replay, if configured. */ + if (synchronous_replay) + SyncReplayWaitForLSN(lsn); /* * Fast exit if user has not requested sync replication, or there are no @@ -169,6 +399,12 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING); + /* Cap the level for anything other than commit to remote flush only. */ + if (commit) + mode = SyncRepWaitMode; + else + mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); + /* * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not * set. See SyncRepUpdateSyncStandbysDefined. @@ -229,57 +465,9 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE) break; - /* - * If a wait for synchronous replication is pending, we can neither - * acknowledge the commit nor raise ERROR or FATAL. The latter would - * lead the client to believe that the transaction aborted, which is - * not true: it's already committed locally. The former is no good - * either: the client has requested synchronous replication, and is - * entitled to assume that an acknowledged commit is also replicated, - * which might not be true. So in this case we issue a WARNING (which - * some clients may be able to interpret) and shut off further output. - * We do NOT reset ProcDiePending, so that the process will die after - * the commit is cleaned up. - */ - if (ProcDiePending) - { - ereport(WARNING, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); - whereToSendOutput = DestNone; - SyncRepCancelWait(); + /* Check if we need to break early due to cancel/shutdown/death. */ + if (SyncRepCheckForEarlyExit()) break; - } - - /* - * It's unclear what to do if a query cancel interrupt arrives. We - * can't actually abort at this point, but ignoring the interrupt - * altogether is not helpful, so we just terminate the wait with a - * suitable warning. - */ - if (QueryCancelPending) - { - QueryCancelPending = false; - ereport(WARNING, - (errmsg("canceling wait for synchronous replication due to user request"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); - SyncRepCancelWait(); - break; - } - - /* - * If the postmaster dies, we'll probably never get an - * acknowledgement, because all the wal sender processes will exit. So - * just bail out. - */ - if (!PostmasterIsAlive()) - { - ProcDiePending = true; - whereToSendOutput = DestNone; - SyncRepCancelWait(); - break; - } /* * Wait on latch. Any condition that should wake us up will set the @@ -399,6 +587,53 @@ SyncRepInitConfig(void) } /* + * Check if the current WALSender process's application_name matches a name in + * synchronous_replay_standby_names (including '*' for wildcard). + */ +bool +SyncReplayPotentialStandby(void) +{ + char *rawstring; + List *elemlist; + ListCell *l; + bool found = false; + + /* If the feature is disable, then no. */ + if (synchronous_replay_max_lag == 0) + return false; + + /* Need a modifiable copy of string */ + rawstring = pstrdup(synchronous_replay_standby_names); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawstring, ',', &elemlist)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist); + /* GUC machinery will have already complained - no need to do again */ + return false; + } + + foreach(l, elemlist) + { + char *standby_name = (char *) lfirst(l); + + if (pg_strcasecmp(standby_name, application_name) == 0 || + pg_strcasecmp(standby_name, "*") == 0) + { + found = true; + break; + } + } + + pfree(rawstring); + list_free(elemlist); + + return found; +} + +/* * Update the LSNs on each queue based upon our latest state. This * implements a simple policy of first-valid-sync-standby-releases-waiter. * @@ -406,7 +641,7 @@ SyncRepInitConfig(void) * perhaps also which information we store as well. */ void -SyncRepReleaseWaiters(void) +SyncRepReleaseWaiters(bool walsender_sr_blocker) { volatile WalSndCtlData *walsndctl = WalSndCtl; XLogRecPtr writePtr; @@ -420,13 +655,15 @@ SyncRepReleaseWaiters(void) /* * If this WALSender is serving a standby that is not on the list of - * potential sync standbys then we have nothing to do. If we are still - * starting up, still running base backup or the current flush position is - * still invalid, then leave quickly also. + * potential sync standbys and not in a state that synchronous_replay waits + * for, then we have nothing to do. If we are still starting up, still + * running base backup or the current flush position is still invalid, + * then leave quickly also. */ - if (MyWalSnd->sync_standby_priority == 0 || - MyWalSnd->state < WALSNDSTATE_STREAMING || - XLogRecPtrIsInvalid(MyWalSnd->flush)) + if (!walsender_sr_blocker && + (MyWalSnd->sync_standby_priority == 0 || + MyWalSnd->state < WALSNDSTATE_STREAMING || + XLogRecPtrIsInvalid(MyWalSnd->flush))) { announce_next_takeover = true; return; @@ -464,9 +701,10 @@ SyncRepReleaseWaiters(void) /* * If the number of sync standbys is less than requested or we aren't - * managing a sync standby then just leave. + * managing a sync standby or a standby in synchronous replay state that + * blocks then just leave. */ - if (!got_recptr || !am_sync) + if ((!got_recptr || !am_sync) && !walsender_sr_blocker) { LWLockRelease(SyncRepLock); announce_next_takeover = !am_sync; @@ -475,24 +713,36 @@ SyncRepReleaseWaiters(void) /* * Set the lsn first so that when we wake backends they will release up to - * this location. + * this location, for backends waiting for synchronous commit. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr) + if (got_recptr && am_sync) { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr; - numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); - } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr) - { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr; - numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); - } - if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr) - { - walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr; - numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr) + { + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr; + numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE, writePtr); + } + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr) + { + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr; + numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH, flushPtr); + } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY, applyPtr); + } } + /* + * Wake backends that are waiting for synchronous_replay, if this walsender + * manages a standby that is in synchronous replay 'available' or 'joining' + * state. + */ + if (walsender_sr_blocker) + SyncRepWakeQueue(false, SYNC_REP_WAIT_SYNC_REPLAY, + MyWalSnd->apply); + LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X", @@ -970,9 +1220,8 @@ SyncRepGetStandbyPriority(void) * Must hold SyncRepLock. */ static int -SyncRepWakeQueue(bool all, int mode) +SyncRepWakeQueue(bool all, int mode, XLogRecPtr lsn) { - volatile WalSndCtlData *walsndctl = WalSndCtl; PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; @@ -989,7 +1238,7 @@ SyncRepWakeQueue(bool all, int mode) /* * Assume the queue is ordered by LSN */ - if (!all && walsndctl->lsn[mode] < proc->waitLSN) + if (!all && lsn < proc->waitLSN) return numprocs; /* @@ -1049,7 +1298,7 @@ SyncRepUpdateSyncStandbysDefined(void) int i; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SyncRepWakeQueue(true, i); + SyncRepWakeQueue(true, i, InvalidXLogRecPtr); } /* @@ -1100,6 +1349,64 @@ SyncRepQueueIsOrderedByLSN(int mode) } #endif +static bool +SyncRepCheckForEarlyExit(void) +{ + /* + * If a wait for synchronous replication is pending, we can neither + * acknowledge the commit nor raise ERROR or FATAL. The latter would + * lead the client to believe that the transaction aborted, which is + * not true: it's already committed locally. The former is no good + * either: the client has requested synchronous replication, and is + * entitled to assume that an acknowledged commit is also replicated, + * which might not be true. So in this case we issue a WARNING (which + * some clients may be able to interpret) and shut off further output. + * We do NOT reset ProcDiePending, so that the process will die after + * the commit is cleaned up. + */ + if (ProcDiePending) + { + ereport(WARNING, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), + errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + whereToSendOutput = DestNone; + SyncRepCancelWait(); + return true; + } + + /* + * It's unclear what to do if a query cancel interrupt arrives. We + * can't actually abort at this point, but ignoring the interrupt + * altogether is not helpful, so we just terminate the wait with a + * suitable warning. + */ + if (QueryCancelPending) + { + QueryCancelPending = false; + ereport(WARNING, + (errmsg("canceling wait for synchronous replication due to user request"), + errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + SyncRepCancelWait(); + return true; + } + + /* + * If the postmaster dies, we'll probably never get an + * acknowledgement, because all the wal sender processes will exit. So + * just bail out. + */ + if (!PostmasterIsAlive()) + { + ProcDiePending = true; + whereToSendOutput = DestNone; + SyncRepCancelWait(); + return true; + } + + return false; +} + /* * =========================================================== * Synchronous Replication functions executed by any process @@ -1169,6 +1476,31 @@ assign_synchronous_standby_names(const char *newval, void *extra) SyncRepConfig = (SyncRepConfigData *) extra; } +bool +check_synchronous_replay_standby_names(char **newval, void **extra, GucSource source) +{ + char *rawstring; + List *elemlist; + + /* Need a modifiable copy of string */ + rawstring = pstrdup(*newval); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawstring, ',', &elemlist)) + { + /* syntax error in list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawstring); + list_free(elemlist); + return false; + } + + pfree(rawstring); + list_free(elemlist); + + return true; +} + void assign_synchronous_commit(int newval, void *extra) { diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 8a249e22b9f..c467a32d306 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -57,6 +57,7 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" @@ -139,9 +140,10 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); -static void XLogWalRcvSendReply(bool force, bool requestReply); +static void XLogWalRcvSendReply(bool force, bool requestReply, int64 replyTo); static void XLogWalRcvSendHSFeedback(bool immed); -static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime, + TimestampTz *syncReplayLease); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); @@ -466,7 +468,7 @@ WalReceiverMain(void) } /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, -1); /* * If we've written some records, flush them to disk and @@ -511,7 +513,7 @@ WalReceiverMain(void) */ walrcv->force_reply = false; pg_memory_barrier(); - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(true, false, -1); } } if (rc & WL_POSTMASTER_DEATH) @@ -569,7 +571,7 @@ WalReceiverMain(void) } } - XLogWalRcvSendReply(requestReply, requestReply); + XLogWalRcvSendReply(requestReply, requestReply, -1); XLogWalRcvSendHSFeedback(false); } } @@ -874,6 +876,8 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) XLogRecPtr walEnd; TimestampTz sendTime; bool replyRequested; + TimestampTz syncReplayLease; + int64 messageNumber; resetStringInfo(&incoming_message); @@ -893,7 +897,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) dataStart = pq_getmsgint64(&incoming_message); walEnd = pq_getmsgint64(&incoming_message); sendTime = pq_getmsgint64(&incoming_message); - ProcessWalSndrMessage(walEnd, sendTime); + ProcessWalSndrMessage(walEnd, sendTime, NULL); buf += hdrlen; len -= hdrlen; @@ -903,7 +907,8 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) case 'k': /* Keepalive */ { /* copy message to StringInfo */ - hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64) + + sizeof(char) + sizeof(int64); if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -911,15 +916,17 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) appendBinaryStringInfo(&incoming_message, buf, hdrlen); /* read the fields */ + messageNumber = pq_getmsgint64(&incoming_message); walEnd = pq_getmsgint64(&incoming_message); sendTime = pq_getmsgint64(&incoming_message); replyRequested = pq_getmsgbyte(&incoming_message); + syncReplayLease = pq_getmsgint64(&incoming_message); - ProcessWalSndrMessage(walEnd, sendTime); + ProcessWalSndrMessage(walEnd, sendTime, &syncReplayLease); /* If the primary requested a reply, send one immediately */ if (replyRequested) - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(true, false, messageNumber); break; } default: @@ -1082,7 +1089,7 @@ XLogWalRcvFlush(bool dying) /* Also let the master know that we made some progress */ if (!dying) { - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, -1); XLogWalRcvSendHSFeedback(false); } } @@ -1100,9 +1107,12 @@ XLogWalRcvFlush(bool dying) * If 'requestReply' is true, requests the server to reply immediately upon * receiving this message. This is used for heartbearts, when approaching * wal_receiver_timeout. + * + * If this is a reply to a specific message from the upstream server, then + * 'replyTo' should include the message number, otherwise -1. */ static void -XLogWalRcvSendReply(bool force, bool requestReply) +XLogWalRcvSendReply(bool force, bool requestReply, int64 replyTo) { static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; @@ -1149,6 +1159,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) pq_sendint64(&reply_message, applyPtr); pq_sendint64(&reply_message, GetCurrentTimestamp()); pq_sendbyte(&reply_message, requestReply ? 1 : 0); + pq_sendint64(&reply_message, replyTo); /* Send it */ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s", @@ -1281,10 +1292,13 @@ XLogWalRcvSendHSFeedback(bool immed) * Update shared memory status upon receiving a message from primary. * * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest - * message, reported by primary. + * message, reported by primary. 'syncReplayLease' is a pointer to the time + * the primary promises that this standby can safely claim to be causally + * consistent, to 0 if it cannot, or a NULL pointer for no change. */ static void -ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) +ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime, + TimestampTz *syncReplayLease) { WalRcvData *walrcv = WalRcv; @@ -1297,6 +1311,8 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) walrcv->latestWalEnd = walEnd; walrcv->lastMsgSendTime = sendTime; walrcv->lastMsgReceiptTime = lastMsgReceiptTime; + if (syncReplayLease != NULL) + walrcv->syncReplayLease = *syncReplayLease; SpinLockRelease(&walrcv->mutex); if (log_min_messages <= DEBUG2) @@ -1334,7 +1350,7 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) * This is called by the startup process whenever interesting xlog records * are applied, so that walreceiver can check if it needs to send an apply * notification back to the master which may be waiting in a COMMIT with - * synchronous_commit = remote_apply. + * synchronous_commit = remote_apply or synchronous_relay = on. */ void WalRcvForceReply(void) diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 8ed7254b5c6..dec98eb48c8 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -27,6 +27,7 @@ #include "replication/walreceiver.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "utils/guc.h" #include "utils/timestamp.h" WalRcvData *WalRcv = NULL; @@ -373,3 +374,21 @@ GetReplicationTransferLatency(void) return ms; } + +/* + * Used by snapmgr to check if this standby has a valid lease, granting it the + * right to consider itself available for synchronous replay. + */ +bool +WalRcvSyncReplayAvailable(void) +{ + WalRcvData *walrcv = WalRcv; + TimestampTz now = GetCurrentTimestamp(); + bool result; + + SpinLockAcquire(&walrcv->mutex); + result = walrcv->syncReplayLease != 0 && now <= walrcv->syncReplayLease; + SpinLockRelease(&walrcv->mutex); + + return result; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f845180873e..9563a87b08d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -167,9 +167,23 @@ static StringInfoData tmpbuf; */ static TimestampTz last_reply_timestamp = 0; +static TimestampTz last_keepalive_timestamp = 0; + /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool waiting_for_ping_response = false; +/* At what point in the WAL can we progress from JOINING state? */ +static XLogRecPtr synchronous_replay_joining_until = 0; + +/* The last synchronous replay lease sent to the standby. */ +static TimestampTz synchronous_replay_last_lease = 0; + +/* The last synchronous replay lease revocation message's number. */ +static int64 synchronous_replay_revoke_msgno = 0; + +/* Is this WALSender listed in synchronous_replay_standby_names? */ +static bool am_potential_synchronous_replay_standby = false; + /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages @@ -239,7 +253,7 @@ static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); -static void WalSndKeepalive(bool requestReply); +static int64 WalSndKeepalive(bool requestReply); static void WalSndKeepaliveIfNecessary(TimestampTz now); static void WalSndCheckTimeOut(TimestampTz now); static long WalSndComputeSleeptime(TimestampTz now); @@ -281,6 +295,61 @@ InitWalSender(void) } /* + * If we are exiting unexpectedly, we may need to hold up concurrent + * synchronous_replay commits to make sure any lease that was granted has + * expired. + */ +static void +PrepareUncleanExit(void) +{ + if (MyWalSnd->syncReplayState == SYNC_REPLAY_AVAILABLE) + { + /* + * We've lost contact with the standby, but it may still be alive. We + * can't let any committing synchronous_replay transactions return + * control until we've stalled for long enough for a zombie standby to + * start raising errors because its lease has expired. Because our + * WalSnd slot is going away, we need to use the shared + * WalSndCtl->revokingUntil variable. + */ + elog(LOG, + "contact lost with standby \"%s\", revoking synchronous replay lease by stalling", + application_name); + + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + WalSndCtl->revokingUntil = Max(WalSndCtl->revokingUntil, + synchronous_replay_last_lease); + LWLockRelease(SyncRepLock); + + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->syncReplayState = SYNC_REPLAY_UNAVAILABLE; + SpinLockRelease(&MyWalSnd->mutex); + } +} + +/* + * We are shutting down because we received a goodbye message from the + * walreceiver. + */ +static void +PrepareCleanExit(void) +{ + if (MyWalSnd->syncReplayState == SYNC_REPLAY_AVAILABLE) + { + /* + * The standby is shutting down, so it won't be running any more + * transactions. It is therefore safe to stop waiting for it without + * any kind of lease revocation protocol. + */ + elog(LOG, "standby \"%s\" is leaving synchronous replay set", application_name); + + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->syncReplayState = SYNC_REPLAY_UNAVAILABLE; + SpinLockRelease(&MyWalSnd->mutex); + } +} + +/* * Clean up after an error. * * WAL sender processes don't use transactions like regular backends do. @@ -308,7 +377,10 @@ WalSndErrorCleanup(void) replication_active = false; if (got_STOPPING || got_SIGUSR2) + { + PrepareUncleanExit(); proc_exit(0); + } /* Revert back to startup state */ WalSndSetState(WALSNDSTATE_STARTUP); @@ -320,6 +392,8 @@ WalSndErrorCleanup(void) static void WalSndShutdown(void) { + PrepareUncleanExit(); + /* * Reset whereToSendOutput to prevent ereport from attempting to send any * more messages to the standby. @@ -1578,6 +1652,7 @@ ProcessRepliesIfAny(void) if (r < 0) { /* unexpected error or EOF */ + PrepareUncleanExit(); ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); @@ -1594,6 +1669,7 @@ ProcessRepliesIfAny(void) resetStringInfo(&reply_message); if (pq_getmessage(&reply_message, 0)) { + PrepareUncleanExit(); ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); @@ -1643,6 +1719,7 @@ ProcessRepliesIfAny(void) * 'X' means that the standby is closing down the socket. */ case 'X': + PrepareCleanExit(); proc_exit(0); default: @@ -1740,9 +1817,11 @@ ProcessStandbyReplyMessage(void) flushLag, applyLag; bool clearLagTimes; + int64 replyTo; TimestampTz now; static bool fullyAppliedLastTime = false; + static TimestampTz fullyAppliedSince = 0; /* the caller already consumed the msgtype byte */ writePtr = pq_getmsgint64(&reply_message); @@ -1750,6 +1829,7 @@ ProcessStandbyReplyMessage(void) applyPtr = pq_getmsgint64(&reply_message); (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ replyRequested = pq_getmsgbyte(&reply_message); + replyTo = pq_getmsgint64(&reply_message); elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", (uint32) (writePtr >> 32), (uint32) writePtr, @@ -1764,17 +1844,17 @@ ProcessStandbyReplyMessage(void) applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now); /* - * If the standby reports that it has fully replayed the WAL in two - * consecutive reply messages, then the second such message must result - * from wal_receiver_status_interval expiring on the standby. This is a - * convenient time to forget the lag times measured when it last - * wrote/flushed/applied a WAL record, to avoid displaying stale lag data - * until more WAL traffic arrives. + * If the standby reports that it has fully replayed the WAL for at least + * 10 seconds, then let's clear the lag times that were measured when it + * last wrote/flushed/applied a WAL record. This way we avoid displaying + * stale lag data until more WAL traffic arrives. */ clearLagTimes = false; if (applyPtr == sentPtr) { - if (fullyAppliedLastTime) + if (!fullyAppliedLastTime) + fullyAppliedSince = now; + else if (now - fullyAppliedSince >= 10000000) /* 10 seconds */ clearLagTimes = true; fullyAppliedLastTime = true; } @@ -1790,8 +1870,53 @@ ProcessStandbyReplyMessage(void) * standby. */ { + int next_sr_state = -1; WalSnd *walsnd = MyWalSnd; + /* Handle synchronous replay state machine. */ + if (am_potential_synchronous_replay_standby && !am_cascading_walsender) + { + bool replay_lag_acceptable; + + /* Check if the lag is acceptable (includes -1 for caught up). */ + if (applyLag < synchronous_replay_max_lag * 1000) + replay_lag_acceptable = true; + else + replay_lag_acceptable = false; + + /* Figure out next if the state needs to change. */ + switch (walsnd->syncReplayState) + { + case SYNC_REPLAY_UNAVAILABLE: + /* Can we join? */ + if (replay_lag_acceptable) + next_sr_state = SYNC_REPLAY_JOINING; + break; + case SYNC_REPLAY_JOINING: + /* Are we still applying fast enough? */ + if (replay_lag_acceptable) + { + /* Have we reached the join point yet? */ + if (applyPtr >= synchronous_replay_joining_until) + next_sr_state = SYNC_REPLAY_AVAILABLE; + } + else + next_sr_state = SYNC_REPLAY_UNAVAILABLE; + break; + case SYNC_REPLAY_AVAILABLE: + /* Are we still applying fast enough? */ + if (!replay_lag_acceptable) + next_sr_state = SYNC_REPLAY_REVOKING; + break; + case SYNC_REPLAY_REVOKING: + /* Has the revocation been acknowledged or timed out? */ + if (replyTo == synchronous_replay_revoke_msgno || + now >= walsnd->revokingUntil) + next_sr_state = SYNC_REPLAY_UNAVAILABLE; + break; + } + } + SpinLockAcquire(&walsnd->mutex); walsnd->write = writePtr; walsnd->flush = flushPtr; @@ -1802,11 +1927,55 @@ ProcessStandbyReplyMessage(void) walsnd->flushLag = flushLag; if (applyLag != -1 || clearLagTimes) walsnd->applyLag = applyLag; + if (next_sr_state != -1) + walsnd->syncReplayState = next_sr_state; + if (next_sr_state == SYNC_REPLAY_REVOKING) + walsnd->revokingUntil = synchronous_replay_last_lease; SpinLockRelease(&walsnd->mutex); + + /* + * Post shmem-update actions for synchronous replay state transitions. + */ + switch (next_sr_state) + { + case SYNC_REPLAY_JOINING: + /* + * Now that we've started waiting for this standby, we need to + * make sure that everything flushed before now has been applied + * before we move to available and issue a lease. + */ + synchronous_replay_joining_until = GetFlushRecPtr(); + ereport(LOG, + (errmsg("standby \"%s\" joining synchronous replay set...", + application_name))); + break; + case SYNC_REPLAY_AVAILABLE: + /* Issue a new lease to the standby. */ + WalSndKeepalive(false); + ereport(LOG, + (errmsg("standby \"%s\" is available for synchronous replay", + application_name))); + break; + case SYNC_REPLAY_REVOKING: + /* Revoke the standby's lease, and note the message number. */ + synchronous_replay_revoke_msgno = WalSndKeepalive(true); + ereport(LOG, + (errmsg("revoking synchronous replay lease for standby \"%s\"...", + application_name))); + break; + case SYNC_REPLAY_UNAVAILABLE: + ereport(LOG, + (errmsg("standby \"%s\" is no longer available for synchronous replay", + application_name))); + break; + default: + /* No change. */ + break; + } } if (!am_cascading_walsender) - SyncRepReleaseWaiters(); + SyncRepReleaseWaiters(MyWalSnd->syncReplayState >= SYNC_REPLAY_JOINING); /* * Advance our local xmin horizon when the client confirmed a flush. @@ -1996,33 +2165,52 @@ ProcessStandbyHSFeedbackMessage(void) * If wal_sender_timeout is enabled we want to wake up in time to send * keepalives and to abort the connection if wal_sender_timeout has been * reached. + * + * But if syncronous_replay_max_lag is enabled, we override that and send + * keepalives at a constant rate to replace expiring leases. */ static long WalSndComputeSleeptime(TimestampTz now) { long sleeptime = 10000; /* 10 s */ - if (wal_sender_timeout > 0 && last_reply_timestamp > 0) + if ((wal_sender_timeout > 0 && last_reply_timestamp > 0) || + am_potential_synchronous_replay_standby) { TimestampTz wakeup_time; long sec_to_timeout; int microsec_to_timeout; - /* - * At the latest stop sleeping once wal_sender_timeout has been - * reached. - */ - wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout); - - /* - * If no ping has been sent yet, wakeup when it's time to do so. - * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of - * the timeout passed without a response. - */ - if (!waiting_for_ping_response) + if (am_potential_synchronous_replay_standby) + { + /* + * We need to keep replacing leases before they expire. We'll do + * that halfway through the lease time according to our clock, to + * allow for the standby's clock to be ahead of the primary's by + * 25% of synchronous_replay_lease_time. + */ + wakeup_time = + TimestampTzPlusMilliseconds(last_keepalive_timestamp, + synchronous_replay_lease_time / 2); + } + else + { + /* + * At the latest stop sleeping once wal_sender_timeout has been + * reached. + */ wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); + wal_sender_timeout); + + /* + * If no ping has been sent yet, wakeup when it's time to do so. + * WalSndKeepaliveIfNecessary() wants to send a keepalive once + * half of the timeout passed without a response. + */ + if (!waiting_for_ping_response) + wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + } /* Compute relative time until wakeup. */ TimestampDifference(now, wakeup_time, @@ -2038,20 +2226,33 @@ WalSndComputeSleeptime(TimestampTz now) /* * Check whether there have been responses by the client within * wal_sender_timeout and shutdown if not. + * + * If synchronous replay is configured we override that so that unresponsive + * standbys are detected sooner. */ static void WalSndCheckTimeOut(TimestampTz now) { TimestampTz timeout; + int allowed_time; /* don't bail out if we're doing something that doesn't require timeouts */ if (last_reply_timestamp <= 0) return; - timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout); + /* + * If a synchronous replay support is configured, we use + * synchronous_replay_lease_time instead of wal_sender_timeout, to limit + * the time before an unresponsive synchronous replay standby is dropped. + */ + if (am_potential_synchronous_replay_standby) + allowed_time = synchronous_replay_lease_time; + else + allowed_time = wal_sender_timeout; - if (wal_sender_timeout > 0 && now >= timeout) + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, + allowed_time); + if (allowed_time > 0 && now >= timeout) { /* * Since typically expiration of replication timeout means @@ -2079,6 +2280,9 @@ WalSndLoop(WalSndSendDataCallback send_data) /* Report to pgstat that this process is running */ pgstat_report_activity(STATE_RUNNING, NULL); + /* Check if we are managing a potential synchronous replay standby. */ + am_potential_synchronous_replay_standby = SyncReplayPotentialStandby(); + /* * Loop until we reach the end of this timeline or the client requests to * stop streaming. @@ -2243,6 +2447,7 @@ InitWalSenderSlot(void) walsnd->flushLag = -1; walsnd->applyLag = -1; walsnd->state = WALSNDSTATE_STARTUP; + walsnd->syncReplayState = SYNC_REPLAY_UNAVAILABLE; walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ @@ -3125,6 +3330,27 @@ WalSndGetStateString(WalSndState state) return "UNKNOWN"; } +/* + * Return a string constant representing the synchronous replay state. This is + * used in system views, and should *not* be translated. + */ +static const char * +WalSndGetSyncReplayStateString(SyncReplayState state) +{ + switch (state) + { + case SYNC_REPLAY_UNAVAILABLE: + return "unavailable"; + case SYNC_REPLAY_JOINING: + return "joining"; + case SYNC_REPLAY_AVAILABLE: + return "available"; + case SYNC_REPLAY_REVOKING: + return "revoking"; + } + return "UNKNOWN"; +} + static Interval * offset_to_interval(TimeOffset offset) { @@ -3144,7 +3370,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 11 +#define PG_STAT_GET_WAL_SENDERS_COLS 12 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3197,6 +3423,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) TimeOffset applyLag; int priority; WalSndState state; + SyncReplayState syncReplayState; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -3206,6 +3433,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockAcquire(&walsnd->mutex); sentPtr = walsnd->sentPtr; state = walsnd->state; + syncReplayState = walsnd->syncReplayState; write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; @@ -3288,6 +3516,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[10] = CStringGetTextDatum("potential"); + + values[11] = + CStringGetTextDatum(WalSndGetSyncReplayStateString(syncReplayState)); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -3303,21 +3534,69 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * This function is used to send a keepalive message to standby. * If requestReply is set, sets a flag in the message requesting the standby * to send a message back to us, for heartbeat purposes. + * Return the serial number of the message that was sent. */ -static void +static int64 WalSndKeepalive(bool requestReply) { + TimestampTz synchronous_replay_lease; + TimestampTz now; + + static int64 message_number = 0; + elog(DEBUG2, "sending replication keepalive"); + /* Grant a synchronous replay lease if appropriate. */ + now = GetCurrentTimestamp(); + if (MyWalSnd->syncReplayState != SYNC_REPLAY_AVAILABLE) + { + /* No lease granted, and any earlier lease is revoked. */ + synchronous_replay_lease = 0; + } + else + { + /* + * Since this timestamp is being sent to the standby where it will be + * compared against a time generated by the standby's system clock, we + * must consider clock skew. We use 25% of the lease time as max + * clock skew, and we subtract that from the time we send with the + * following reasoning: + * + * 1. If the standby's clock is slow (ie behind the primary's) by up + * to that much, then by subtracting this amount will make sure the + * lease doesn't survive past that time according to the primary's + * clock. + * + * 2. If the standby's clock is fast (ie ahead of the primary's) by + * up to that much, then by subtracting this amount there won't be any + * gaps between leases, since leases are reissued every time 50% of + * the lease time elapses (see WalSndKeepaliveIfNecessary and + * WalSndComputeSleepTime). + */ + int max_clock_skew = synchronous_replay_lease_time / 4; + + /* Compute and remember the expiry time of the lease we're granting. */ + synchronous_replay_last_lease = + TimestampTzPlusMilliseconds(now, synchronous_replay_lease_time); + /* Adjust the version we send for clock skew. */ + synchronous_replay_lease = + TimestampTzPlusMilliseconds(synchronous_replay_last_lease, + -max_clock_skew); + } + /* construct the message... */ resetStringInfo(&output_message); pq_sendbyte(&output_message, 'k'); + pq_sendint64(&output_message, ++message_number); pq_sendint64(&output_message, sentPtr); - pq_sendint64(&output_message, GetCurrentTimestamp()); + pq_sendint64(&output_message, now); pq_sendbyte(&output_message, requestReply ? 1 : 0); + pq_sendint64(&output_message, synchronous_replay_lease); /* ... and send it wrapped in CopyData */ pq_putmessage_noblock('d', output_message.data, output_message.len); + + return message_number; } /* @@ -3332,23 +3611,35 @@ WalSndKeepaliveIfNecessary(TimestampTz now) * Don't send keepalive messages if timeouts are globally disabled or * we're doing something not partaking in timeouts. */ - if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) - return; - - if (waiting_for_ping_response) - return; + if (!am_potential_synchronous_replay_standby) + { + if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) + return; + if (waiting_for_ping_response) + return; + } /* * If half of wal_sender_timeout has lapsed without receiving any reply * from the standby, send a keep-alive message to the standby requesting * an immediate reply. + * + * If synchronous replay has been configured, use + * synchronous_replay_lease_time to control keepalive intervals rather + * than wal_sender_timeout, so that we can keep replacing leases at the + * right frequency. */ - ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); + if (am_potential_synchronous_replay_standby) + ping_time = TimestampTzPlusMilliseconds(last_keepalive_timestamp, + synchronous_replay_lease_time / 2); + else + ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); if (now >= ping_time) { WalSndKeepalive(true); waiting_for_ping_response = true; + last_keepalive_timestamp = now; /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) @@ -3388,7 +3679,7 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) */ new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE; buffer_full = false; - for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i) + for (i = 0; i < SYNC_REP_WAIT_SYNC_REPLAY; ++i) { if (new_write_head == LagTracker.read_heads[i]) buffer_full = true; diff --git a/src/backend/utils/errcodes.txt b/src/backend/utils/errcodes.txt index 4f354717628..d1751f6e0c0 100644 --- a/src/backend/utils/errcodes.txt +++ b/src/backend/utils/errcodes.txt @@ -307,6 +307,7 @@ Section: Class 40 - Transaction Rollback 40001 E ERRCODE_T_R_SERIALIZATION_FAILURE serialization_failure 40003 E ERRCODE_T_R_STATEMENT_COMPLETION_UNKNOWN statement_completion_unknown 40P01 E ERRCODE_T_R_DEADLOCK_DETECTED deadlock_detected +40P02 E ERRCODE_T_R_SYNCHRONOUS_REPLAY_NOT_AVAILABLE synchronous_replay_not_available Section: Class 42 - Syntax Error or Access Rule Violation diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 82e54c084b8..1832bdf4de0 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1647,6 +1647,16 @@ static struct config_bool ConfigureNamesBool[] = }, { + {"synchronous_replay", PGC_USERSET, REPLICATION_STANDBY, + gettext_noop("Enables synchronous replay."), + NULL + }, + &synchronous_replay, + false, + NULL, NULL, NULL + }, + + { {"syslog_sequence_numbers", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Add sequence number to syslog messages to avoid duplicate suppression."), NULL @@ -2885,6 +2895,28 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"synchronous_replay_max_lag", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Sets the maximum allowed replay lag before standbys are removed from the synchronous replay set."), + NULL, + GUC_UNIT_MS + }, + &synchronous_replay_max_lag, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"synchronous_replay_lease_time", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Sets the duration of read leases granted to synchronous replay standbys."), + NULL, + GUC_UNIT_MS + }, + &synchronous_replay_lease_time, + 5000, 0, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL @@ -3567,6 +3599,17 @@ static struct config_string ConfigureNamesString[] = }, { + {"synchronous_replay_standby_names", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("List of names of potential synchronous replay standbys."), + NULL, + GUC_LIST_INPUT + }, + &synchronous_replay_standby_names, + "*", + check_synchronous_replay_standby_names, NULL, NULL + }, + + { {"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE, gettext_noop("Sets default text search configuration."), NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 2b1ebb797ec..e6dbcb58bbd 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -250,6 +250,17 @@ # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed +#synchronous_replay_max_lag = 0s # maximum replication delay to tolerate from + # standbys before dropping them from the synchronous + # replay set; 0 to disable synchronous replay + +#synchronous_replay_lease_time = 5s # how long individual leases granted to + # synchronous replay standbys should last; should be 4 times + # the max possible clock skew + +#synchronous_replay_standby_names = '*' # standby servers that can join the + # synchronous replay set; '*' = all + # - Standby Servers - # These settings are ignored on a master server. @@ -279,6 +290,14 @@ #max_logical_replication_workers = 4 # taken from max_worker_processes #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers +# - All Servers - + +#synchronous_replay = off # "on" in any pair of consecutive + # transactions guarantees that the second + # can see the first (even if the second + # is run on a standby), or will raise an + # error to report that the standby is + # unavailable for synchronous replay #------------------------------------------------------------------------------ # QUERY TUNING diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 08a08c8e8fc..55aef58fcd2 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -54,6 +54,8 @@ #include "catalog/catalog.h" #include "lib/pairingheap.h" #include "miscadmin.h" +#include "replication/syncrep.h" +#include "replication/walreceiver.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -332,6 +334,17 @@ GetTransactionSnapshot(void) "cannot take query snapshot during a parallel operation"); /* + * In synchronous_replay mode on a standby, check if we have definitely + * applied WAL for any COMMIT that returned successfully on the + * primary. + */ + if (synchronous_replay && RecoveryInProgress() && + !WalRcvSyncReplayAvailable()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SYNCHRONOUS_REPLAY_NOT_AVAILABLE), + errmsg("standby is not available for synchronous replay"))); + + /* * In transaction-snapshot mode, the first snapshot must live until * end of xact regardless of what the caller does with it, so we must * make a copy of it rather than returning CurrentSnapshotData diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 6811a55e764..02eaf97247f 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -117,7 +117,7 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested) static XLogRecPtr last_written_lsn = InvalidXLogRecPtr; static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr; - char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + char replybuf[1 + 8 + 8 + 8 + 8 + 1 + 8]; int len = 0; /* @@ -150,6 +150,8 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested) len += 8; replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; + fe_sendint64(-1, &replybuf[len]); /* replyTo */ + len += 8; startpos = output_written_lsn; last_written_lsn = output_written_lsn; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 15932c60b5a..501ecc849d1 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -325,7 +325,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested) { - char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + char replybuf[1 + 8 + 8 + 8 + 8 + 1 + 8]; int len = 0; replybuf[len] = 'r'; @@ -343,6 +343,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque len += 8; replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; + fe_sendint64(-1, &replybuf[len]); /* replyTo */ + len += 8; if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) { diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 8b33b4e0ea7..106f87989fb 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2832,7 +2832,7 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f DESCR("statistics: information about currently active backends"); DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ )); DESCR("statistics: information about progress of backends running maintenance command"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,sync_replay}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ )); DESCR("statistics: information about WAL receiver"); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 6bffe63ad6b..69e8c5bbc1b 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -811,6 +811,7 @@ typedef enum WAIT_EVENT_PROCARRAY_GROUP_UPDATE, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP, + WAIT_EVENT_SYNC_REPLAY, WAIT_EVENT_LOGICAL_SYNC_DATA, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE } WaitEventIPC; @@ -825,7 +826,8 @@ typedef enum { WAIT_EVENT_BASE_BACKUP_THROTTLE = PG_WAIT_TIMEOUT, WAIT_EVENT_PG_SLEEP, - WAIT_EVENT_RECOVERY_APPLY_DELAY + WAIT_EVENT_RECOVERY_APPLY_DELAY, + WAIT_EVENT_SYNC_REPLAY_LEASE_REVOKE } WaitEventTimeout; /* ---------- diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index ceafe2cbea1..e2bc88f7c23 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -15,6 +15,7 @@ #include "access/xlogdefs.h" #include "utils/guc.h" +#include "utils/timestamp.h" #define SyncRepRequested() \ (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) @@ -24,8 +25,9 @@ #define SYNC_REP_WAIT_WRITE 0 #define SYNC_REP_WAIT_FLUSH 1 #define SYNC_REP_WAIT_APPLY 2 +#define SYNC_REP_WAIT_SYNC_REPLAY 3 -#define NUM_SYNC_REP_WAIT_MODE 3 +#define NUM_SYNC_REP_WAIT_MODE 4 /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 @@ -36,6 +38,12 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* GUC variables */ +extern int synchronous_replay_max_lag; +extern int synchronous_replay_lease_time; +extern bool synchronous_replay; +extern char *synchronous_replay_standby_names; + /* * Struct for the configuration of synchronous replication. * @@ -71,7 +79,7 @@ extern void SyncRepCleanupAtProcExit(void); /* called by wal sender */ extern void SyncRepInitConfig(void); -extern void SyncRepReleaseWaiters(void); +extern void SyncRepReleaseWaiters(bool walsender_cr_available_or_joining); /* called by wal sender and user backend */ extern List *SyncRepGetSyncStandbys(bool *am_sync); @@ -79,8 +87,12 @@ extern List *SyncRepGetSyncStandbys(bool *am_sync); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); +/* called by wal sender */ +extern bool SyncReplayPotentialStandby(void); + /* GUC infrastructure */ extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); +extern bool check_synchronous_replay_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_standby_names(const char *newval, void *extra); extern void assign_synchronous_commit(int newval, void *extra); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c8652dbd489..0e396def022 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -83,6 +83,13 @@ typedef struct TimeLineID receivedTLI; /* + * syncReplayLease is the time until which the primary has authorized this + * standby to consider itself available for synchronous_replay mode, or 0 + * for not authorized. + */ + TimestampTz syncReplayLease; + + /* * latestChunkStart is the starting byte position of the current "batch" * of received WAL. It's actually the same as the previous value of * receivedUpto before the last flush to disk. Startup process can use @@ -298,4 +305,6 @@ extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvForceReply(void); +extern bool WalRcvSyncReplayAvailable(void); + #endif /* _WALRECEIVER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 0aa80d5c3e2..ac025ad535b 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -28,6 +28,14 @@ typedef enum WalSndState WALSNDSTATE_STOPPING } WalSndState; +typedef enum SyncReplayState +{ + SYNC_REPLAY_UNAVAILABLE = 0, + SYNC_REPLAY_JOINING, + SYNC_REPLAY_AVAILABLE, + SYNC_REPLAY_REVOKING +} SyncReplayState; + /* * Each walsender has a WalSnd struct in shared memory. */ @@ -53,6 +61,10 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* Synchronous replay state for this walsender. */ + SyncReplayState syncReplayState; + TimestampTz revokingUntil; + /* Protects shared variables shown above. */ slock_t mutex; @@ -94,6 +106,14 @@ typedef struct */ bool sync_standbys_defined; + /* + * Until when must commits in synchronous replay stall? This is used to + * wait for synchronous replay leases to expire when a walsender exists + * uncleanly, and we must stall synchronous replay commits until we're + * sure that the remote server's lease has expired. + */ + TimestampTz revokingUntil; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2e42b9ec05f..9df755a72ad 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1859,9 +1859,10 @@ pg_stat_replication| SELECT s.pid, w.flush_lag, w.replay_lag, w.sync_priority, - w.sync_state + w.sync_state, + w.sync_replay FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, sync_replay) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_ssl| SELECT s.pid, s.ssl,