diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile index 8a55392..9dbdc26 100644 --- a/src/backend/storage/ipc/Makefile +++ b/src/backend/storage/ipc/Makefile @@ -8,7 +8,7 @@ subdir = src/backend/storage/ipc top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \ +OBJS = barrier.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \ procsignal.o shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o \ sinvaladt.o standby.o diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c new file mode 100644 index 0000000..8bcae8c --- /dev/null +++ b/src/backend/storage/ipc/barrier.c @@ -0,0 +1,209 @@ +/*------------------------------------------------------------------------- + * + * barrier.c + * Barriers for synchronizing cooperating processes. + * + * Portions Copyright (c) 2016, PostgreSQL Global Development Group + * + * This simple mechanism allows for simple cases of phase-based + * fork/join-style cooperation by a static set of participants. + * + * IDENTIFICATION + * src/backend/storage/ipc/barrier.c + * + *------------------------------------------------------------------------- + */ + +#include "storage/barrier.h" + +/* + * Initialize this barrier, setting a static number of participants that we + * will wait for at each computation phase. To use a dynamic number of + * participants, this number should be zero, and BarrierAttach and + * BarrierDetach should be used to register and deregister participants. + */ +void +BarrierInit(Barrier *barrier, int participants) +{ + SpinLockInit(&barrier->mutex); + barrier->participants = participants; + barrier->arrived = 0; + barrier->phase = 0; + ConditionVariableInit(&barrier->condition_variable); +} + +/* + * Wait for all participants to arrive at this barrier, and then return in all + * participants. Sets the phase to the given phase, which must not be equal + * to the current phase. The caller must be attached to this barrier. + * + * While waiting, pg_stat_activity shows a wait_event_class and wait_event + * controlled by the wait_event_info passed in, which should be a value from + * from one of the WaitEventXXX enums defined in pgstat.h. + * + * Returns true in one arbitrarily selected participant (currently the first + * one to arrive). Returns false in all others. The differing return code + * can be used to coordinate a phase of work that must be done in only one + * participant while the others wait. + * + * TODO: How should conditions like postmaster death, or being killed by the a + * parallel context owner/leader (if there is one), be handled? Should this + * component be usable in contexts other than parallel workers? + */ +bool +BarrierWaitSet(Barrier *barrier, int new_phase, uint32 wait_event_info) +{ + bool first; + bool release; + int phase; + + SpinLockAcquire(&barrier->mutex); + Assert(barrier->phase != new_phase); + ++barrier->arrived; + first = barrier->arrived == 1; + if (barrier->arrived == barrier->participants) + { + release = true; + barrier->arrived = 0; + barrier->phase = new_phase; + } + else + { + release = false; + phase = barrier->phase; + } + SpinLockRelease(&barrier->mutex); + + /* Check if we can release our peers and return. */ + if (release) + { + ConditionVariableBroadcast(&barrier->condition_variable); + return first; + } + + /* Wait for phase to change. */ + for (;;) + { + ConditionVariablePrepareToSleep(&barrier->condition_variable); + SpinLockAcquire(&barrier->mutex); + release = barrier->phase != phase; + SpinLockRelease(&barrier->mutex); + if (release) + break; + ConditionVariableSleep(wait_event_info); + } + ConditionVariableCancelSleep(); + + /* The callers should all agree on the new phase. */ + Assert(barrier->phase == new_phase); + + return first; +} + +/* + * Wait for all participants to arrive at this barrier, and then return in all + * participants. Advances the phase by one. The caller must be attached to + * this barrier. + * + * While waiting, pg_stat_activity shows a wait_event_class and wait_event + * controlled by the wait_event_info passed in, which should be a value from + * from one of the WaitEventXXX enums defined in pgstat.h. + * + * Returns true in one arbitrarily selected participant (currently the first + * one to arrive). Returns false in all others. The differing return code + * can be used to coordinate a phase of work that must be done in only one + * worker while the others wait. + */ +bool +BarrierWait(Barrier *barrier, uint32 wait_event_info) +{ + /* See BarrierPhase for why it is safe to read the phase without lock. */ + return BarrierWaitSet(barrier, barrier->phase + 1, wait_event_info); +} + +/* + * Attach to a barrier. All waiting participants will now wait for this + * participant to call BarrierWait or BarrierDetach. Returns the current + * phase. + */ +int +BarrierAttach(Barrier *barrier) +{ + int phase; + + SpinLockAcquire(&barrier->mutex); + ++barrier->participants; + phase = barrier->phase; + SpinLockRelease(&barrier->mutex); + + return phase; +} + +/* + * Detach from a barrier. This may release other waiters from BarrierWait and + * advance the phase, if they were only waiting for this backend. Returns + * true if this participant was the last to detach. + */ +bool +BarrierDetach(Barrier *barrier) +{ + bool release; + bool last; + + SpinLockAcquire(&barrier->mutex); + Assert(barrier->participants > 0); + --barrier->participants; + + /* + * If any other participants are waiting and we were the last participant + * waited for, release them. + */ + if (barrier->participants > 0 && + barrier->arrived == barrier->participants) + { + release = true; + barrier->arrived = 0; + barrier->phase++; + } + else + release = false; + + last = barrier->participants == 0; + SpinLockRelease(&barrier->mutex); + + if (release) + ConditionVariableBroadcast(&barrier->condition_variable); + + return last; +} + +/* + * Return the current phase of a barrier. The caller must be attached. + */ +int +BarrierPhase(Barrier *barrier) +{ + /* + * It is OK to read barrier->phase without locking, because it can't + * change without us (we are attached to it), and we executed a memory + * barrier when we either attached or participated in changing it last + * time. + */ + return barrier->phase; +} + +/* + * Return an instantaneous snapshot of the number of participants currently + * attached to this barrier. For debugging purposes only. + */ +int +BarrierParticipants(Barrier *barrier) +{ + int participants; + + SpinLockAcquire(&barrier->mutex); + participants = barrier->participants; + SpinLockRelease(&barrier->mutex); + + return participants; +} diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h new file mode 100644 index 0000000..45895a7 --- /dev/null +++ b/src/include/storage/barrier.h @@ -0,0 +1,45 @@ +/*------------------------------------------------------------------------- + * + * barrier.h + * Barriers for synchronizing workers. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/barrier.h + * + *------------------------------------------------------------------------- + */ +#ifndef BARRIER_H +#define BARRIER_H + +/* + * For the header previously known as "barrier.h", please include + * "port/atomics.h", which deals with atomics, compiler barriers and memory + * barriers. + */ + +#include "postgres.h" + +#include "storage/condition_variable.h" +#include "storage/spin.h" + +typedef struct Barrier +{ + slock_t mutex; + int phase; + int participants; + int arrived; + ConditionVariable condition_variable; +} Barrier; + +extern void BarrierInit(Barrier *barrier, int num_workers); +extern bool BarrierWait(Barrier *barrier, uint32 wait_event_info); +extern bool BarrierWaitSet(Barrier *barrier, int new_phase, + uint32 wait_event_info); +extern int BarrierAttach(Barrier *barrier); +extern bool BarrierDetach(Barrier *barrier); +extern int BarrierPhase(Barrier *barrier); +extern int BarrierParticipants(Barrier *barrier); + +#endif /* BARRIER_H */