From e577260dce26ac501c9ad00b899469a1cc028e1a Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 11 Aug 2016 17:11:32 -0400 Subject: [PATCH 2/4] Condition variable code. Reviewed in an earlier version by Rahila Syed. --- src/backend/access/transam/xact.c | 4 + src/backend/bootstrap/bootstrap.c | 2 + src/backend/postmaster/bgwriter.c | 2 + src/backend/postmaster/checkpointer.c | 2 + src/backend/postmaster/walwriter.c | 2 + src/backend/replication/walsender.c | 2 + src/backend/storage/lmgr/Makefile | 2 +- src/backend/storage/lmgr/condition_variable.c | 157 ++++++++++++++++++++++++++ src/backend/storage/lmgr/proc.c | 7 ++ src/include/storage/condition_variable.h | 58 ++++++++++ src/include/storage/proc.h | 4 + src/include/storage/proclist.h | 16 +++ 12 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 src/backend/storage/lmgr/condition_variable.c create mode 100644 src/include/storage/condition_variable.h diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 23f36ea..b40b2e0 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -45,6 +45,7 @@ #include "replication/origin.h" #include "replication/syncrep.h" #include "replication/walsender.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/lmgr.h" #include "storage/predicate.h" @@ -2476,6 +2477,9 @@ AbortTransaction(void) /* Reset WAL record construction state */ XLogResetInsertion(); + /* Cancel condition variable sleep */ + ConditionVariableCancelSleep(); + /* * Also clean up any open wait for lock, since the lock manager will choke * if we try to wait for another lock before doing this. diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index e518e17..9eeb49c 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -33,6 +33,7 @@ #include "replication/walreceiver.h" #include "storage/bufmgr.h" #include "storage/bufpage.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/proc.h" #include "tcop/tcopprot.h" @@ -535,6 +536,7 @@ static void ShutdownAuxiliaryProcess(int code, Datum arg) { LWLockReleaseAll(); + ConditionVariableCancelSleep(); pgstat_report_wait_end(); } diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index 00f03d8..40f3f80 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -46,6 +46,7 @@ #include "postmaster/bgwriter.h" #include "storage/bufmgr.h" #include "storage/buf_internals.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -189,6 +190,7 @@ BackgroundWriterMain(void) * about in bgwriter, but we do have LWLocks, buffers, and temp files. */ LWLockReleaseAll(); + ConditionVariableCancelSleep(); AbortBufferIO(); UnlockBuffers(); /* buffer pins are released here: */ diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 8d4b353..0c072f3 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -49,6 +49,7 @@ #include "postmaster/bgwriter.h" #include "replication/syncrep.h" #include "storage/bufmgr.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -273,6 +274,7 @@ CheckpointerMain(void) * files. */ LWLockReleaseAll(); + ConditionVariableCancelSleep(); pgstat_report_wait_end(); AbortBufferIO(); UnlockBuffers(); diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c index 228190a..e5de019 100644 --- a/src/backend/postmaster/walwriter.c +++ b/src/backend/postmaster/walwriter.c @@ -50,6 +50,7 @@ #include "pgstat.h" #include "postmaster/walwriter.h" #include "storage/bufmgr.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -169,6 +170,7 @@ WalWriterMain(void) * about in walwriter, but we do have LWLocks, and perhaps buffers? */ LWLockReleaseAll(); + ConditionVariableCancelSleep(); pgstat_report_wait_end(); AbortBufferIO(); UnlockBuffers(); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a0dba19..44143d7 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -66,6 +66,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/walsender_private.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pmsignal.h" @@ -253,6 +254,7 @@ void WalSndErrorCleanup(void) { LWLockReleaseAll(); + ConditionVariableCancelSleep(); pgstat_report_wait_end(); if (sendFile >= 0) diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile index cd6ec73..e1b787e 100644 --- a/src/backend/storage/lmgr/Makefile +++ b/src/backend/storage/lmgr/Makefile @@ -13,7 +13,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \ - s_lock.o predicate.o + s_lock.o predicate.o condition_variable.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c new file mode 100644 index 0000000..0639689 --- /dev/null +++ b/src/backend/storage/lmgr/condition_variable.c @@ -0,0 +1,157 @@ +/*------------------------------------------------------------------------- + * + * condition_variable.c + * Implementation of condition variables. Condition variables provide + * a way for one process to wait until a specific condition occurs, + * without needing to know the specific identity of the process for + * which they are waiting. Waits for condition variables can be + * interrupted, unlike LWLock waits. Condition variables are safe + * to use within dynamic shared memory segments. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/storage/lmgr/condition_variable.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/condition_variable.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/proclist.h" +#include "storage/spin.h" + +/* Initially, we are not prepared to sleep on any condition variable. */ +static ConditionVariable *cv_sleep_target = NULL; + +/* + * Initialize a condition variable. + */ +void +ConditionVariableInit(ConditionVariable *cv) +{ + SpinLockInit(&cv->mutex); + proclist_init(&cv->wakeup); +} + +/* + * Add ourselves to the wait queue for a condition variable and mark + * ourselves as sleeping. + */ +void +ConditionVariablePrepareToSleep(ConditionVariable *cv) +{ + int pgprocno = MyProc->pgprocno; + + /* + * It's not legal to prepare a sleep until the previous sleep has been + * completed or cancelled. + */ + Assert(cv_sleep_target == NULL); + + /* Record the condition variable on which we will sleep. */ + cv_sleep_target = cv; + + /* Mark myself as sleeping. */ + MyProc->cvSleeping = true; + + /* Add myself to the wait queue. */ + SpinLockAcquire(&cv->mutex); + proclist_push_head(&cv->wakeup, pgprocno, cvWaitLink); + SpinLockRelease(&cv->mutex); +} + +/* + * Sleeping on a condition variable is extremely simple. We just repeatedly + * wait on our latch until someone clears our cvSleeping flag. This may + * even happen immediately, since a signal or broadcast operation could have + * happened after we prepared to sleep and before we reach this function. + */ +void +ConditionVariableSleep(void) +{ + Assert(cv_sleep_target != NULL); + + while (MyProc->cvSleeping) + { + CHECK_FOR_INTERRUPTS(); + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1); + ResetLatch(&MyProc->procLatch); + } + + cv_sleep_target = NULL; +} + +/* + * Cancel any pending sleep operation. We just need to remove ourselves + * from the wait queue of any condition variable for which we have previously + * prepared a sleep. + */ +void +ConditionVariableCancelSleep(void) +{ + ConditionVariable *cv = cv_sleep_target; + + if (cv_sleep_target == NULL) + return; + + SpinLockAcquire(&cv->mutex); + proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink); + SpinLockRelease(&cv->mutex); + + MyProc->cvSleeping = false; + cv_sleep_target = NULL; +} + +/* + * Wake up one sleeping process, assuming there is at least one. + * + * The return value indicates whether or not we woke somebody up. + */ +bool +ConditionVariableSignal(ConditionVariable *cv) +{ + PGPROC *proc = NULL; + + /* Remove the first process from the wakeup queue (if any). */ + SpinLockAcquire(&cv->mutex); + if (!proclist_is_empty(&cv->wakeup)) + proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink); + SpinLockRelease(&cv->mutex); + + /* If we found someone sleeping, set their latch to wake them up. */ + if (proc != NULL) + { + SetLatch(&proc->procLatch); + return true; + } + + /* No sleeping processes. */ + return false; +} + +/* + * Wake up all sleeping processes. + * + * The return value indicates the number of processes we woke. + */ +int +ConditionVariableBroadcast(ConditionVariable *cv) +{ + int nwoken = 0; + + /* + * Let's just do this the dumbest way possible. We could try to dequeue + * all the sleepers at once to save spinlock cycles, but it's a bit hard + * to get that right in the face of possible sleep cancellations, and + * we don't want to loop holding the mutex. + */ + while (ConditionVariableSignal(cv)) + ++nwoken; + + return nwoken; +} diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 9a758bd..ec08091 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -42,6 +42,7 @@ #include "postmaster/autovacuum.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "storage/condition_variable.h" #include "storage/standby.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -805,6 +806,9 @@ ProcKill(int code, Datum arg) */ LWLockReleaseAll(); + /* Cancel any pending condition variable sleep, too */ + ConditionVariableCancelSleep(); + /* Make sure active replication slots are released */ if (MyReplicationSlot != NULL) ReplicationSlotRelease(); @@ -910,6 +914,9 @@ AuxiliaryProcKill(int code, Datum arg) /* Release any LW locks I am holding (see notes above) */ LWLockReleaseAll(); + /* Cancel any pending condition variable sleep, too */ + ConditionVariableCancelSleep(); + /* * Reset MyLatch to the process local one. This is so that signal * handlers et al can continue using the latch after the shared latch diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h new file mode 100644 index 0000000..54b7fba --- /dev/null +++ b/src/include/storage/condition_variable.h @@ -0,0 +1,58 @@ +/*------------------------------------------------------------------------- + * + * condition_variable.h + * Condition variables + * + * A condition variable is a method of waiting until a certain condition + * becomes true. Conventionally, a condition variable supports three + * operations: (1) sleep; (2) signal, which wakes up one process sleeping + * on the condition variable; and (3) broadcast, which wakes up every + * process sleeping on the condition variable. In our implementation, + * condition variables put a process into an interruptible sleep (so it + * can be cancelled prior to the fulfillment of the condition) and do not + * use pointers internally (so that they are safe to use within DSMs). + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/condition_variable.h + * + *------------------------------------------------------------------------- + */ +#ifndef CONDITION_VARIABLE_H +#define CONDITION_VARIABLE_H + +#include "storage/s_lock.h" +#include "storage/proclist_types.h" + +typedef struct +{ + slock_t mutex; + proclist_head wakeup; +} ConditionVariable; + +/* Initialize a condition variable. */ +extern void ConditionVariableInit(ConditionVariable *); + +/* + * Sleep on a condition variable. In order to avoid race conditions, a + * process should first prepare to sleep, then recheck whether the desired + * condition has been met. If not, the process should then sleep. If so, + * it should cancel the sleep. A non-local exit via ERROR or FATAL will + * automatically cancel a pending sleep. + * + * After sleeping, a process may or may not need to recheck the condition + * and possibly sleep again. If the condition variable is never signalled + * or broadcast except when the condition is guaranteed to hold, then + * there is no need to recheck the condition. Otherwise, it must be + * rechecked. + */ +extern void ConditionVariablePrepareToSleep(ConditionVariable *); +extern void ConditionVariableSleep(void); +extern void ConditionVariableCancelSleep(void); + +/* Wake up a single waiter (via signal) or all waiters (via broadcast). */ +extern bool ConditionVariableSignal(ConditionVariable *); +extern int ConditionVariableBroadcast(ConditionVariable *); + +#endif /* CONDITION_VARIABLE_H */ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index f576f05..812008a 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -115,6 +115,10 @@ struct PGPROC uint8 lwWaitMode; /* lwlock mode being waited for */ proclist_node lwWaitLink; /* position in LW lock wait list */ + /* Support for condition variables. */ + bool cvSleeping; /* true if sleeping on a condition variable */ + proclist_node cvWaitLink; /* position in CV wait list */ + /* Info about lock the process is currently waiting for, if any. */ /* waitLock and waitProcLock are NULL if not currently waiting. */ LOCK *waitLock; /* Lock object we're sleeping on ... */ diff --git a/src/include/storage/proclist.h b/src/include/storage/proclist.h index 2013a40..0d7935c 100644 --- a/src/include/storage/proclist.h +++ b/src/include/storage/proclist.h @@ -120,6 +120,20 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset) } /* + * Remove and return the first node from a list (there must be one). + */ +static inline PGPROC * +proclist_pop_head_node_offset(proclist_head *list, size_t node_offset) +{ + PGPROC *proc; + + Assert(!proclist_is_empty(list)); + proc = GetPGProcByNumber(list->head); + proclist_delete_offset(list, list->head, node_offset); + return proc; +} + +/* * Helper macros to avoid repetition of offsetof(PGPROC, ). * 'link_member' is the name of a proclist_node member in PGPROC. */ @@ -129,6 +143,8 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset) proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member)) #define proclist_push_tail(list, procno, link_member) \ proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member)) +#define proclist_pop_head_node(list, link_member) \ + proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member)) /* * Iterate through the list pointed at by 'lhead', storing the current -- 2.5.4 (Apple Git-61)