diff --git a/contrib/pg_stat_wait/Makefile b/contrib/pg_stat_wait/Makefile new file mode 100644 index 0000000..16ad170 --- /dev/null +++ b/contrib/pg_stat_wait/Makefile @@ -0,0 +1,54 @@ +# contrib/pg_stat_wait/Makefile + +MODULE_big = pg_stat_wait +OBJS = pg_stat_wait.o collector.o + +EXTENSION = pg_stat_wait +DATA = pg_stat_wait--1.0.sql +PG_CPPFLAGS = -DPG_STAT_WAIT_TESTS +EXTRA_CLEAN = $(pg_regress_clean_files) ./regression_output ./isolation_output +EXTRA_INSTALL=contrib/pg_stat_wait + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/pg_stat_wait +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif + +check: regresscheck isolationcheck + +submake-regress: + $(MAKE) -C $(top_builddir)/src/test/regress all + +submake-isolation: + $(MAKE) -C $(top_builddir)/src/test/isolation all + +submake-pg_stat_wait: + $(MAKE) -C $(top_builddir)/contrib/pg_stat_wait + +REGRESSCHECKS=file_trace descriptions + +regresscheck: all | submake-regress submake-pg_stat_wait + $(MKDIR_P) regression_output + $(pg_regress_check) \ + --temp-instance=./tmp_check \ + --load-extension=pg_stat_wait \ + --outputdir=./regression_output \ + $(REGRESSCHECKS) + +ISOLATIONCHECKS=history + +isolationcheck: all | submake-isolation submake-pg_stat_wait + $(MKDIR_P) isolation_output + $(pg_isolation_regress_check) \ + --temp-instance=./tmp_check \ + --temp-config=./waits.conf \ + --load-extension=pg_stat_wait \ + --outputdir=./isolation_output \ + $(ISOLATIONCHECKS) + diff --git a/contrib/pg_stat_wait/collector.c b/contrib/pg_stat_wait/collector.c new file mode 100644 index 0000000..c8b613c --- /dev/null +++ b/contrib/pg_stat_wait/collector.c @@ -0,0 +1,290 @@ +#include "postgres.h" + +#include "access/htup_details.h" +#include "catalog/pg_type.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "storage/s_lock.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "storage/spin.h" +#include "storage/wait.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "portability/instr_time.h" + +#include "pg_stat_wait.h" + +CollectorShmqHeader *hdr; + +static void *pgsw; +shm_toc *toc; +shm_mq *mq; +static volatile sig_atomic_t shutdown_requested = false; + +static void handle_sigterm(SIGNAL_ARGS); +static void collector_main(Datum main_arg); + +/* + * Estimate shared memory space needed. + */ +Size +CollectorShmemSize(void) +{ + shm_toc_estimator e; + Size size; + + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(CollectorShmqHeader)); + shm_toc_estimate_chunk(&e, (Size) COLLECTOR_QUEUE_SIZE); + shm_toc_estimate_keys(&e, 2); + size = shm_toc_estimate(&e); + + return size; +} + +void +AllocateCollectorMem(void) +{ + bool found; + Size segsize= CollectorShmemSize(); + + pgsw = ShmemInitStruct("pg_stat_wait", segsize, &found); + + if (!found) + { + void *mq_mem; + + toc = shm_toc_create(PG_STAT_WAIT_MAGIC, pgsw, segsize); + hdr = shm_toc_allocate(toc, sizeof(CollectorShmqHeader)); + shm_toc_insert(toc, 0, hdr); + + mq_mem = shm_toc_allocate(toc, COLLECTOR_QUEUE_SIZE); + shm_toc_insert(toc, 1, mq_mem); + + DefineCustomIntVariable("pg_stat_wait.history_size", + "Sets size of waits history.", NULL, + &hdr->historySize, 5000, 100, INT_MAX, + PGC_SUSET, 0, NULL, NULL, NULL); + + DefineCustomIntVariable("pg_stat_wait.history_period", + "Sets period of waits history sampling.", NULL, + &hdr->historyPeriod, 10, 1, INT_MAX, + PGC_SUSET, 0, NULL, NULL, NULL); + + DefineCustomBoolVariable("pg_stat_wait.history_skip_latch", + "Skip latch events in waits history", NULL, + &hdr->historySkipLatch, false, PGC_SUSET, 0, NULL, NULL, NULL); + } + else + { + toc = shm_toc_attach(PG_STAT_WAIT_MAGIC, pgsw); + hdr = shm_toc_lookup(toc, 0); + } +} + +void +RegisterWaitsCollector(void) +{ + BackgroundWorker worker; + + /* set up common data for all our workers */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = collector_main; + worker.bgw_notify_pid = 0; + snprintf(worker.bgw_name, BGW_MAXLEN, "pg_stat_wait collector"); + worker.bgw_main_arg = (Datum)0; + RegisterBackgroundWorker(&worker); +} + +void +AllocHistory(History *observations, int count) +{ + observations->items = (HistoryItem *) palloc0(sizeof(HistoryItem) * count); + observations->index = 0; + observations->count = count; + observations->wraparound = false; +} + +/* Read current wait information from proc, if readCurrent is true, + * then it reads from currently going wait, and can be inconsistent + */ +int +GetCurrentWaitsState(PGPROC *proc, HistoryItem *item, int idx) +{ + instr_time currentTime; + ProcWait *wait; + + if (idx == -1) + return 0; + + INSTR_TIME_SET_CURRENT(currentTime); + wait = &proc->waits.waitsBuf[idx]; + item->backendPid = proc->pid; + item->classId = (int)wait->classId; + if (item->classId == 0) + return 0; + + item->eventId = (int)wait->eventId; + + INSTR_TIME_SUBTRACT(currentTime, wait->startTime); + item->waitTime = INSTR_TIME_GET_MICROSEC(currentTime); + memcpy(item->params, wait->params, sizeof(item->params)); + return 1; +} + +static void +handle_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + shutdown_requested = true; + if (MyProc) + SetLatch(&MyProc->procLatch); + errno = save_errno; +} + +/* Circulation in history */ +static HistoryItem * +get_next_observation(History *observations) +{ + HistoryItem *result; + + result = &observations->items[observations->index]; + observations->index++; + if (observations->index >= observations->count) + { + observations->index = 0; + observations->wraparound = true; + } + return result; +} + +/* Gets current waits from backends */ +static void +write_waits_history(History *observations, TimestampTz current_ts) +{ + int i; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + for (i = 0; i < ProcGlobal->allProcCount; ++i) + { + HistoryItem item, *observation; + PGPROC *proc = &ProcGlobal->allProcs[i]; + int stateOk = GetCurrentWaitsState(proc, &item, proc->waits.readIdx); + + /* mark waits as read */ + proc->waits.readIdx = -1; + + if (stateOk) + { + if (hdr->historySkipLatch && item.classId == WAIT_LATCH) + continue; + + item.ts = current_ts; + observation = get_next_observation(observations); + *observation = item; + } + } + LWLockRelease(ProcArrayLock); +} + +static void +send_history(History *observations, shm_mq_handle *mqh) +{ + int count, i; + + if (observations->wraparound) + count = observations->count; + else + count = observations->index; + + shm_mq_send(mqh, sizeof(count), &count, false); + for (i = 0; i < count; i++) + shm_mq_send(mqh, sizeof(HistoryItem), &observations->items[i], false); +} + +static void +collector_main(Datum main_arg) +{ + shm_mq *mq; + shm_mq_handle *mqh; + History observations; + MemoryContext old_context, collector_context; + + /* + * Establish signal handlers. + * + * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as + * it would a normal user backend. To make that happen, we establish a + * signal handler that is a stripped-down version of die(). We don't have + * any equivalent of the backend's command-read loop, where interrupts can + * be processed immediately, so make sure ImmediateInterruptOK is turned + * off. + */ + pqsignal(SIGTERM, handle_sigterm); + BackgroundWorkerUnblockSignals(); + + hdr->latch = &MyProc->procLatch; + CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_stat_wait collector"); + collector_context = AllocSetContextCreate(TopMemoryContext, + "pg_stat_wait context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + old_context = MemoryContextSwitchTo(collector_context); + AllocHistory(&observations, hdr->historySize); + MemoryContextSwitchTo(old_context); + + while (1) + { + int rc; + TimestampTz current_ts; + + ResetLatch(&MyProc->procLatch); + current_ts = GetCurrentTimestamp(); + write_waits_history(&observations, current_ts); + + if (shutdown_requested) + break; + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + hdr->historyPeriod); + + if (rc & WL_POSTMASTER_DEATH) + exit(1); + + if (hdr->request == HISTORY_REQUEST) + { + hdr->request = NO_REQUEST; + + mq = (shm_mq *)shm_toc_lookup(toc, 1); + shm_mq_set_sender(mq, MyProc); + mqh = shm_mq_attach(mq, NULL, NULL); + shm_mq_wait_for_attach(mqh); + + if (shm_mq_get_receiver(mq) != NULL) + send_history(&observations, mqh); + + shm_mq_detach(mq); + } + } + + MemoryContextReset(collector_context); + + /* + * We're done. Explicitly detach the shared memory segment so that we + * don't get a resource leak warning at commit time. This will fire any + * on_dsm_detach callbacks we've registered, as well. Once that's done, + * we can go ahead and exit. + */ + proc_exit(0); +} diff --git a/contrib/pg_stat_wait/expected/descriptions.out b/contrib/pg_stat_wait/expected/descriptions.out new file mode 100644 index 0000000..5fed726 --- /dev/null +++ b/contrib/pg_stat_wait/expected/descriptions.out @@ -0,0 +1,165 @@ +SELECT * FROM pg_wait_event; + class_id | event_id | name +----------+----------+----------------------------------- + 0 | 0 | CPU + 1 | 0 | + 1 | 1 | ShmemIndexLock + 1 | 2 | OidGenLock + 1 | 3 | XidGenLock + 1 | 4 | ProcArrayLock + 1 | 5 | SInvalReadLock + 1 | 6 | SInvalWriteLock + 1 | 7 | WALBufMappingLock + 1 | 8 | WALWriteLock + 1 | 9 | ControlFileLock + 1 | 10 | CheckpointLock + 1 | 11 | CLogControlLock + 1 | 12 | SubtransControlLock + 1 | 13 | MultiXactGenLock + 1 | 14 | MultiXactOffsetControlLock + 1 | 15 | MultiXactMemberControlLock + 1 | 16 | RelCacheInitLock + 1 | 17 | CheckpointerCommLock + 1 | 18 | TwoPhaseStateLock + 1 | 19 | TablespaceCreateLock + 1 | 20 | BtreeVacuumLock + 1 | 21 | AddinShmemInitLock + 1 | 22 | AutovacuumLock + 1 | 23 | AutovacuumScheduleLock + 1 | 24 | SyncScanLock + 1 | 25 | RelationMappingLock + 1 | 26 | AsyncCtlLock + 1 | 27 | AsyncQueueLock + 1 | 28 | SerializableXactHashLock + 1 | 29 | SerializableFinishedListLock + 1 | 30 | SerializablePredicateLockListLock + 1 | 31 | OldSerXidLock + 1 | 32 | SyncRepLock + 1 | 33 | BackgroundWorkerLock + 1 | 34 | DynamicSharedMemoryControlLock + 1 | 35 | AutoFileLock + 1 | 36 | ReplicationSlotAllocationLock + 1 | 37 | ReplicationSlotControlLock + 1 | 38 | CommitTsControlLock + 1 | 39 | CommitTsLock + 1 | 40 | ReplicationOriginLock + 1 | 41 | BufferPartitionLock + 1 | 42 | LockManagerPartitionLock + 1 | 43 | PredicatePartitionLock + 1 | 44 | SharedBufferLocks + 1 | 45 | ProcessLock + 1 | 46 | CLogBufferLock + 1 | 47 | CommitTsBufferLock + 1 | 48 | SubtransBufferLock + 1 | 49 | MultixactBufferLock + 1 | 50 | AsyncBufferLock + 1 | 51 | OldSerializalbeXidBufferLock + 1 | 52 | ReplicationSlotLock + 1 | 53 | UserDefinedLock + 1 | 54 | WALInsertLocks + 1 | 55 | ReplicationOriginLocks + 2 | 0 | Relation + 2 | 1 | RelationExtend + 2 | 2 | Page + 2 | 3 | Tuple + 2 | 4 | Transaction + 2 | 5 | VirtualTransaction + 2 | 6 | SpeculativeToken + 2 | 7 | Object + 2 | 8 | Userlock + 2 | 9 | Advisory + 3 | 0 | READ + 3 | 1 | WRITE + 4 | 0 | Latch + 5 | 0 | READ + 5 | 1 | WRITE +(72 rows) + +SELECT * FROM pg_wait_class; + class_id | name +----------+--------- + 0 | CPU + 1 | LWLocks + 2 | Locks + 3 | Storage + 4 | Latch + 5 | Network +(6 rows) + +SELECT * FROM pg_wait_events; + class_id | class_name | event_id | event_name +----------+------------+----------+----------------------------------- + 0 | CPU | 0 | CPU + 1 | LWLocks | 0 | + 1 | LWLocks | 1 | ShmemIndexLock + 1 | LWLocks | 2 | OidGenLock + 1 | LWLocks | 3 | XidGenLock + 1 | LWLocks | 4 | ProcArrayLock + 1 | LWLocks | 5 | SInvalReadLock + 1 | LWLocks | 6 | SInvalWriteLock + 1 | LWLocks | 7 | WALBufMappingLock + 1 | LWLocks | 8 | WALWriteLock + 1 | LWLocks | 9 | ControlFileLock + 1 | LWLocks | 10 | CheckpointLock + 1 | LWLocks | 11 | CLogControlLock + 1 | LWLocks | 12 | SubtransControlLock + 1 | LWLocks | 13 | MultiXactGenLock + 1 | LWLocks | 14 | MultiXactOffsetControlLock + 1 | LWLocks | 15 | MultiXactMemberControlLock + 1 | LWLocks | 16 | RelCacheInitLock + 1 | LWLocks | 17 | CheckpointerCommLock + 1 | LWLocks | 18 | TwoPhaseStateLock + 1 | LWLocks | 19 | TablespaceCreateLock + 1 | LWLocks | 20 | BtreeVacuumLock + 1 | LWLocks | 21 | AddinShmemInitLock + 1 | LWLocks | 22 | AutovacuumLock + 1 | LWLocks | 23 | AutovacuumScheduleLock + 1 | LWLocks | 24 | SyncScanLock + 1 | LWLocks | 25 | RelationMappingLock + 1 | LWLocks | 26 | AsyncCtlLock + 1 | LWLocks | 27 | AsyncQueueLock + 1 | LWLocks | 28 | SerializableXactHashLock + 1 | LWLocks | 29 | SerializableFinishedListLock + 1 | LWLocks | 30 | SerializablePredicateLockListLock + 1 | LWLocks | 31 | OldSerXidLock + 1 | LWLocks | 32 | SyncRepLock + 1 | LWLocks | 33 | BackgroundWorkerLock + 1 | LWLocks | 34 | DynamicSharedMemoryControlLock + 1 | LWLocks | 35 | AutoFileLock + 1 | LWLocks | 36 | ReplicationSlotAllocationLock + 1 | LWLocks | 37 | ReplicationSlotControlLock + 1 | LWLocks | 38 | CommitTsControlLock + 1 | LWLocks | 39 | CommitTsLock + 1 | LWLocks | 40 | ReplicationOriginLock + 1 | LWLocks | 41 | BufferPartitionLock + 1 | LWLocks | 42 | LockManagerPartitionLock + 1 | LWLocks | 43 | PredicatePartitionLock + 1 | LWLocks | 44 | SharedBufferLocks + 1 | LWLocks | 45 | ProcessLock + 1 | LWLocks | 46 | CLogBufferLock + 1 | LWLocks | 47 | CommitTsBufferLock + 1 | LWLocks | 48 | SubtransBufferLock + 1 | LWLocks | 49 | MultixactBufferLock + 1 | LWLocks | 50 | AsyncBufferLock + 1 | LWLocks | 51 | OldSerializalbeXidBufferLock + 1 | LWLocks | 52 | ReplicationSlotLock + 1 | LWLocks | 53 | UserDefinedLock + 1 | LWLocks | 54 | WALInsertLocks + 1 | LWLocks | 55 | ReplicationOriginLocks + 2 | Locks | 0 | Relation + 2 | Locks | 1 | RelationExtend + 2 | Locks | 2 | Page + 2 | Locks | 3 | Tuple + 2 | Locks | 4 | Transaction + 2 | Locks | 5 | VirtualTransaction + 2 | Locks | 6 | SpeculativeToken + 2 | Locks | 7 | Object + 2 | Locks | 8 | Userlock + 2 | Locks | 9 | Advisory + 3 | Storage | 0 | READ + 3 | Storage | 1 | WRITE + 4 | Latch | 0 | Latch + 5 | Network | 0 | READ + 5 | Network | 1 | WRITE +(72 rows) + diff --git a/contrib/pg_stat_wait/expected/file_trace.out b/contrib/pg_stat_wait/expected/file_trace.out new file mode 100644 index 0000000..e12788e --- /dev/null +++ b/contrib/pg_stat_wait/expected/file_trace.out @@ -0,0 +1,18 @@ +select pg_start_trace(0, '/tmp/pg_stat_wait.trace'); + pg_start_trace +---------------- + +(1 row) + +select pg_is_in_trace(0); + pg_is_in_trace +---------------- + t +(1 row) + +select pg_stop_trace(0); + pg_stop_trace +--------------- + +(1 row) + diff --git a/contrib/pg_stat_wait/expected/history.out b/contrib/pg_stat_wait/expected/history.out new file mode 100644 index 0000000..35ce463 --- /dev/null +++ b/contrib/pg_stat_wait/expected/history.out @@ -0,0 +1,107 @@ +Parsed test spec with 2 sessions + +starting permutation: start_lwlock get_current_lwlock stop_wait get_profile get_history_lwlock +step start_lwlock: + SELECT pg_stat_wait_start_wait(1, 10, 1, 0, 0, 0, 5); + SELECT pg_sleep(1); + +pg_stat_wait_start_wait + + +pg_sleep + + +step get_current_lwlock: + SELECT + pid > 0, + (now() - sample_ts) < interval '1 hour', + class_id, + class_name, + event_id, + event_name, + p1, p2, p3, p4, p5 + FROM pg_stat_wait_current + WHERE class_id = 1; + +?column? ?column? class_id class_name event_id event_name p1 p2 p3 p4 p5 + +t t 1 LWLocks 10 CheckpointLock 1 0 0 0 5 +step stop_wait: + SELECT pg_stat_wait_stop_wait(); + +pg_stat_wait_stop_wait + + +step get_profile: + SELECT pid > 0, class_id, class_name, event_id, event_name, wait_time > 0 + FROM pg_stat_wait_profile + where class_id = 1 AND event_id = 10 AND wait_count > 0; + +?column? class_id class_name event_id event_name ?column? + +t 1 LWLocks 10 CheckpointLock t +step get_history_lwlock: + SELECT + pid > 0, + (now() - sample_ts) < interval '1 hour', + class_id, + class_name, + event_id, + event_name, + wait_time > 0, + p1, p2, p3, p4, p5 + FROM pg_stat_wait_history + WHERE class_id=1 AND p5=5; + +?column? ?column? class_id class_name event_id event_name ?column? p1 p2 p3 p4 p5 + +t t 1 LWLocks 10 CheckpointLock t 1 0 0 0 5 + +starting permutation: start_io get_current_io stop_wait get_profile get_history_io +step start_io: + SELECT pg_stat_wait_start_wait(3, 1, 1, 2, 3, 4, 5); + SELECT pg_sleep(1); + +pg_stat_wait_start_wait + + +pg_sleep + + +step get_current_io: + SELECT + pid > 0, + (now() - sample_ts) < interval '1 minute', + class_id, + class_name, + event_id, + event_name, + p1, p2, p3, p4, p5 + FROM pg_stat_wait_current + WHERE class_id = 3; + +?column? ?column? class_id class_name event_id event_name p1 p2 p3 p4 p5 + +t t 3 Storage 1 WRITE 1 2 3 4 5 +step stop_wait: + SELECT pg_stat_wait_stop_wait(); + +pg_stat_wait_stop_wait + + +step get_profile: + SELECT pid > 0, class_id, class_name, event_id, event_name, wait_time > 0 + FROM pg_stat_wait_profile + where class_id = 1 AND event_id = 10 AND wait_count > 0; + +?column? class_id class_name event_id event_name ?column? + +t 1 LWLocks 10 CheckpointLock t +step get_history_io: + SELECT (count(*) > 0) AS io_wait_recorded + FROM pg_stat_wait_history + WHERE class_id = 3 AND event_id=1 AND p1=1 AND p2=2 AND p3=3 AND p4=4 AND p5=5; + +io_wait_recorded + +t diff --git a/contrib/pg_stat_wait/pg_stat_wait--1.0.sql b/contrib/pg_stat_wait/pg_stat_wait--1.0.sql new file mode 100644 index 0000000..f4faee3 --- /dev/null +++ b/contrib/pg_stat_wait/pg_stat_wait--1.0.sql @@ -0,0 +1,127 @@ +/* contrib/pg_stat_wait/pg_stat_wait--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pg_stat_wait" to load this file. \quit + +CREATE FUNCTION pg_wait_class_list( + OUT class_id int4, + OUT name cstring +) +RETURNS SETOF record +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE; + +CREATE FUNCTION pg_wait_event_list( + OUT class_id int4, + OUT event_id int4, + OUT name cstring +) +RETURNS SETOF record +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE; + +CREATE VIEW pg_wait_event AS + SELECT class_id, event_id, CAST(name as text) as name FROM pg_wait_event_list(); + +CREATE VIEW pg_wait_class AS + SELECT class_id, CAST(name as text) as name FROM pg_wait_class_list(); + +CREATE VIEW pg_wait_events AS + SELECT c.class_id, CAST(c.name as text) as class_name, e.event_id, CAST(e.name as text) as event_name + FROM pg_wait_class c + INNER JOIN pg_wait_event e ON c.class_id = e.class_id + ORDER BY c.class_id, e.event_id; + +/* Returns history, parameters count must be equal with WAIT_PARAMS_COUNT in proc.h */ +CREATE FUNCTION pg_stat_wait_get_history( + OUT pid int4, + OUT sample_ts timestamptz, + OUT class_id int4, + OUT event_id int4, + OUT wait_time int8, + OUT p1 int4, + OUT p2 int4, + OUT p3 int4, + OUT p4 int4, + OUT p5 int4 +) +RETURNS SETOF record +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE STRICT; + +CREATE VIEW pg_stat_wait_history AS + SELECT pid, sample_ts, h.class_id, e.class_name, h.event_id, e.event_name, wait_time, p1, p2, p3, p4, p5 + FROM pg_stat_wait_get_history() h + INNER JOIN pg_wait_events e + ON e.class_id = h.class_id and e.event_id = h.event_id; + +CREATE FUNCTION pg_stat_wait_get_profile( + pid int4, + reset boolean, + OUT pid int4, + OUT class_id int4, + OUT event_id int4, + OUT wait_time int8, + OUT wait_count int4 +) +RETURNS SETOF record +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE CALLED ON NULL INPUT; + +CREATE VIEW pg_stat_wait_profile AS + SELECT pid, p.class_id, e.class_name, p.event_id, e.event_name, wait_time, wait_count + FROM pg_stat_wait_get_profile(NULL, false) p + INNER JOIN pg_wait_events e + ON e.class_id = p.class_id and e.event_id = p.event_id; + +CREATE FUNCTION pg_stat_wait_reset_profile() +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE STRICT; + +CREATE FUNCTION pg_stat_wait_get_current( + pid int4, + OUT pid int4, + OUT sample_ts timestamptz, + OUT class_id int4, + OUT event_id int4, + OUT wait_time int8, + OUT p1 int4, + OUT p2 int4, + OUT p3 int4, + OUT p4 int4, + OUT p5 int4 +) +RETURNS SETOF record +AS 'MODULE_PATHNAME' +LANGUAGE c VOLATILE CALLED on NULL INPUT; + +CREATE VIEW pg_stat_wait_current AS + SELECT pid, sample_ts, c.class_id, e.class_name, c.event_id, e.event_name, + wait_time, p1, p2, p3, p4, p5 + FROM pg_stat_wait_get_current(null) c + INNER JOIN pg_wait_events e + ON e.class_id = c.class_id and e.event_id = c.event_id; + +CREATE FUNCTION pg_start_trace( + backend_pid int4, + filename cstring +) +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE CALLED ON NULL INPUT; + +CREATE FUNCTION pg_is_in_trace( + backend_pid int4 +) +RETURNS bool +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE CALLED ON NULL INPUT; + +CREATE FUNCTION pg_stop_trace( + backend_pid int4 +) +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE CALLED ON NULL INPUT; + diff --git a/contrib/pg_stat_wait/pg_stat_wait.c b/contrib/pg_stat_wait/pg_stat_wait.c new file mode 100644 index 0000000..af427f3 --- /dev/null +++ b/contrib/pg_stat_wait/pg_stat_wait.c @@ -0,0 +1,750 @@ +#include "postgres.h" +#include "fmgr.h" +#include "funcapi.h" +#include "catalog/pg_type.h" +#include "port/atomics.h" +#include "storage/wait.h" +#include "storage/spin.h" +#include "storage/ipc.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "access/htup_details.h" +#include "pg_stat_wait.h" +#include "miscadmin.h" +#include "utils/guc.h" + +PG_MODULE_MAGIC; + +void _PG_init(void); +void _PG_fini(void); + +extern CollectorShmqHeader *hdr; +extern shm_toc *toc; + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; +static PGPROC * search_proc(int backendPid); +static TupleDesc get_history_item_tupledesc(); +static HeapTuple get_history_item_tuple(HistoryItem *item, TupleDesc tuple_desc); + +static void +pgsw_shmem_startup(void) +{ + + if (WaitsHistoryOn) + { + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + AllocateCollectorMem(); + } +} + +/* + * Module load callback + */ +void +_PG_init(void) +{ + if (!process_shared_preload_libraries_in_progress) + return; + + DefineCustomBoolVariable("pg_stat_wait.history", "Collect waits history", + NULL, &WaitsHistoryOn, false, PGC_SUSET, 0, NULL, NULL, NULL); + + if (WaitsHistoryOn) + { + /* + * Request additional shared resources. (These are no-ops if we're not in + * the postmaster process.) We'll allocate or attach to the shared + * resources in pgss_shmem_startup(). + */ + RequestAddinShmemSpace(CollectorShmemSize()); + RegisterWaitsCollector(); + + /* + * Install hooks. + */ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = pgsw_shmem_startup; + } +} + +/* + * Module unload callback + */ +void +_PG_fini(void) +{ + /* Uninstall hooks. */ + shmem_startup_hook = prev_shmem_startup_hook; +} + +PG_FUNCTION_INFO_V1(pg_wait_class_list); +Datum +pg_wait_class_list(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + TupleDesc tupdesc; + + funcctx = SRF_FIRSTCALL_INIT(); + funcctx->max_calls = WAITS_COUNT; + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + tupdesc = CreateTemplateTupleDesc(2, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "class_id", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "name", + CSTRINGOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + if (funcctx->call_cntr < funcctx->max_calls) + { + /* for each row */ + Datum values[2]; + bool nulls[2]; + HeapTuple tuple; + + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(funcctx->call_cntr); + values[1] = CStringGetDatum(WAIT_CLASSES[funcctx->call_cntr]); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); + } + else + { + /* nothing left */ + SRF_RETURN_DONE(funcctx); + } +} + +PG_FUNCTION_INFO_V1(pg_wait_event_list); +Datum +pg_wait_event_list(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + WaitEventContext *ctx; + + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + TupleDesc tupdesc; + + funcctx = SRF_FIRSTCALL_INIT(); + /* 4 arrays length and WAIT_LATCH + WAIT_CPU */ + funcctx->max_calls = 2 + WAIT_LWLOCKS_COUNT + WAIT_LOCKS_COUNT + + WAIT_IO_EVENTS_COUNT + WAIT_NETWORK_EVENTS_COUNT; + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + tupdesc = CreateTemplateTupleDesc(3, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "class_id", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "event_id", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "name", + CSTRINGOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + funcctx->user_fctx = palloc0(sizeof(WaitEventContext)); + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + ctx = (WaitEventContext *)funcctx->user_fctx; + + if (ctx->class_cnt < WAITS_COUNT) + { + /* for each row */ + Datum values[3]; + bool nulls[3]; + HeapTuple tuple; + + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(ctx->class_cnt); + values[1] = Int32GetDatum(ctx->event_cnt); + values[2] = CStringGetDatum(WaitsEventName(ctx->class_cnt, ctx->event_cnt)); + + if (ctx->class_cnt == WAIT_LWLOCK && ctx->event_cnt < (WAIT_LWLOCKS_COUNT-1)) + ctx->event_cnt++; + else if (ctx->class_cnt == WAIT_LOCK && ctx->event_cnt < (WAIT_LOCKS_COUNT-1)) + ctx->event_cnt++; + else if (ctx->class_cnt == WAIT_IO && ctx->event_cnt < (WAIT_IO_EVENTS_COUNT-1)) + ctx->event_cnt++; + else if (ctx->class_cnt == WAIT_NETWORK && ctx->event_cnt < (WAIT_NETWORK_EVENTS_COUNT-1)) + ctx->event_cnt++; + else + { + ctx->event_cnt = 0; + ctx->class_cnt++; + } + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); + } + else + { + /* nothing left */ + SRF_RETURN_DONE(funcctx); + } +} + +static TupleDesc +get_history_item_tupledesc() +{ + int i; + TupleDesc tupdesc; + tupdesc = CreateTemplateTupleDesc(HISTORY_COLUMNS_COUNT, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "sample_ts", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "class_id", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "event_id", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "wait_time", + INT8OID, -1, 0); + + for (i=0; i < WAIT_PARAMS_COUNT; i++) + { + TupleDescInitEntry(tupdesc, (AttrNumber) (6 + i), "p", + INT4OID, -1, 0); + } + + return BlessTupleDesc(tupdesc); +} + +static HeapTuple +get_history_item_tuple(HistoryItem *item, TupleDesc tuple_desc) +{ + int i; + HeapTuple tuple; + Datum values[HISTORY_COLUMNS_COUNT]; + bool nulls[HISTORY_COLUMNS_COUNT]; + + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + /* Values available to all callers */ + values[0] = Int32GetDatum(item->backendPid); + values[1] = TimestampTzGetDatum(item->ts); + values[2] = Int32GetDatum(item->classId); + values[3] = Int32GetDatum(item->eventId); + values[4] = Int64GetDatum(item->waitTime); + + for (i=0; i < WAIT_PARAMS_COUNT; i++) + values[5 + i] = Int32GetDatum(item->params[i]); + + tuple = heap_form_tuple(tuple_desc, values, nulls); + return tuple; +} + +PG_FUNCTION_INFO_V1(pg_stat_wait_get_current); +Datum +pg_stat_wait_get_current(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + WaitCurrentContext *params; + HistoryItem *currentState; + + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + WaitCurrentContext *params; + + funcctx = SRF_FIRSTCALL_INIT(); + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + params = (WaitCurrentContext *)palloc0(sizeof(WaitCurrentContext)); + params->ts = GetCurrentTimestamp(); + + funcctx->user_fctx = params; + funcctx->max_calls = ProcGlobal->allProcCount; + funcctx->tuple_desc = get_history_item_tupledesc(); + + if (!PG_ARGISNULL(0)) + { + HistoryItem item; + PGPROC *proc = search_proc(PG_GETARG_UINT32(0)); + int stateOk = GetCurrentWaitsState(proc, &item, + proc->waits.writeIdx); + + if (stateOk) + { + params->state = (HistoryItem *)palloc0(sizeof(HistoryItem)); + funcctx->max_calls = 1; + *params->state = item; + } + else + params->done = true; + } + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + params = (WaitCurrentContext *)funcctx->user_fctx; + currentState = NULL; + + if (!params->done) + { + if (params->state != NULL) + { + currentState = params->state; + params->done = true; + params->state = NULL; + } + else + { + while (funcctx->call_cntr <= funcctx->max_calls) + { + PGPROC *proc; + HistoryItem item; + + if (funcctx->call_cntr == funcctx->max_calls + || params->idx >= ProcGlobal->allProcCount) + { + params->done = true; + break; + } + + LWLockAcquire(ProcArrayLock, LW_SHARED); + proc = &ProcGlobal->allProcs[params->idx]; + if (proc != NULL && proc->pid) + { + int stateOk = GetCurrentWaitsState(proc, &item, + proc->waits.writeIdx); + if (stateOk) + { + currentState = &item; + LWLockRelease(ProcArrayLock); + params->idx++; + break; + } + } + + LWLockRelease(ProcArrayLock); + params->idx++; + } + } + } + + if (currentState) + { + HeapTuple tuple; + + currentState->ts = params->ts; + tuple = get_history_item_tuple(currentState, funcctx->tuple_desc); + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); + } + else if (params->done) + SRF_RETURN_DONE(funcctx); + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(pg_stat_wait_get_profile); +Datum +pg_stat_wait_get_profile(PG_FUNCTION_ARGS) +{ + WaitProfileContext *params; + FuncCallContext *funcctx; + BackendWaitCells *shmemCells; + + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + TupleDesc tupdesc; + + funcctx = SRF_FIRSTCALL_INIT(); + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + params = (WaitProfileContext *)palloc0(sizeof(WaitProfileContext)); + + if (!PG_ARGISNULL(0)) + params->backendPid = PG_GETARG_UINT32(0); + + if (!PG_ARGISNULL(1)) + params->reset = PG_GETARG_BOOL(1); + + funcctx->user_fctx = params; + funcctx->max_calls = MaxBackends * WAIT_EVENTS_COUNT; + + tupdesc = CreateTemplateTupleDesc(5, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "class_id", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "event_id", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "wait_time", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "wait_count", + INT4OID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + params = (WaitProfileContext *)funcctx->user_fctx; + shmemCells = (BackendWaitCells *)((char *)WaitShmem + + sizeof(int) + /* counter */ + + MAXALIGN(sizeof(slock_t))); /* lock */ + + while (params->backendIdx <= MaxBackends) + { + Datum values[5]; + bool nulls[5]; + HeapTuple tuple; + int count; + BackendWaitCells bcells, *item; + + if (params->backendIdx == MaxBackends) + { + SRF_RETURN_DONE(funcctx); + break; + } + + item = &shmemCells[params->backendIdx]; + + do + { + /* wait until backend is updating this block */ + } while (!pg_atomic_test_set_flag(&item->isBusy)); + + memcpy(&bcells, item, sizeof(BackendWaitCells)); + if (params->reset) + { + item->cells[params->eventIdx].interval = 0; + item->cells[params->eventIdx].count = 0; + } + pg_atomic_clear_flag(&item->isBusy); + + /* filtering */ + if (bcells.backendPid == 0 || + (params->backendPid && params->backendPid != bcells.backendPid)) + { + params->backendIdx++; + continue; + } + + if (params->eventIdx == WAIT_EVENTS_COUNT) + { + params->classIdx = WAIT_LWLOCK; + params->eventIdx = 0; + params->backendIdx++; + Assert(params->backendIdx <= MaxBackends); + continue; + } + + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + if ((params->classIdx+1) < WAITS_COUNT && + (params->eventIdx == WAIT_OFFSETS[params->classIdx+1])) + params->classIdx++; + + count = bcells.cells[params->eventIdx].count; + if (count == 0) + { + params->eventIdx++; + continue; + } + + values[0] = Int32GetDatum(bcells.backendPid); + values[1] = Int32GetDatum(params->classIdx); + values[2] = Int32GetDatum(params->eventIdx - WAIT_OFFSETS[params->classIdx]); + values[3] = Int64GetDatum(bcells.cells[params->eventIdx].interval); + values[4] = Int32GetDatum(bcells.cells[params->eventIdx].count); + params->eventIdx += 1; + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); + break; + } + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(pg_stat_wait_reset_profile); +Datum +pg_stat_wait_reset_profile(PG_FUNCTION_ARGS) +{ + int i; + BackendWaitCells *cells = (BackendWaitCells *)((char *)WaitShmem + + sizeof(int) + /* counter */ + + MAXALIGN(sizeof(slock_t))); /* lock */ + + for(i=0; i < MaxBackends; i++) + { + BackendWaitCells *item = cells + i; + do + { + /* wait until backend is updating this block */ + } while (!pg_atomic_test_set_flag(&item->isBusy)); + + for (i=0; i < WAIT_EVENTS_COUNT; i++) + { + item->cells[i].interval = 0; + item->cells[i].count = 0; + } + pg_atomic_clear_flag(&item->isBusy); + } + + PG_RETURN_VOID(); +} + +static void +initLockTag(LOCKTAG *tag) +{ + tag->locktag_field1 = PG_STAT_WAIT_MAGIC; + tag->locktag_field2 = 0; + tag->locktag_field3 = 0; + tag->locktag_field4 = 0; + tag->locktag_type = LOCKTAG_USERLOCK; + tag->locktag_lockmethodid = USER_LOCKMETHOD; +} + +static History * +receive_observations(shm_mq_handle *mqh) +{ + Size len; + void *data; + History *result; + int count, i; + shm_mq_result res; + + res = shm_mq_receive(mqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS) + elog(ERROR, "Error reading mq."); + if (len != sizeof(count)) + elog(ERROR, "Invalid message length."); + memcpy(&count, data, sizeof(count)); + + result = (History *)palloc(sizeof(History)); + AllocHistory(result, count); + + for (i = 0; i < count; i++) + { + res = shm_mq_receive(mqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS) + elog(ERROR, "Error reading mq."); + if (len != sizeof(HistoryItem)) + elog(ERROR, "Invalid message length."); + memcpy(&result->items[i], data, sizeof(HistoryItem)); + } + + return result; +} + +PG_FUNCTION_INFO_V1(pg_stat_wait_get_history); +Datum +pg_stat_wait_get_history(PG_FUNCTION_ARGS) +{ + History *observations; + FuncCallContext *funcctx; + + if (!WaitsHistoryOn) + ereport(ERROR, (errcode(ERRCODE_CONFIG_FILE_ERROR), + errmsg("Waits history turned off"))); + + if (SRF_IS_FIRSTCALL()) + { + shm_mq *mq; + shm_mq_handle *mqh; + LOCKTAG tag; + MemoryContext oldcontext; + + funcctx = SRF_FIRSTCALL_INIT(); + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + initLockTag(&tag); + LockAcquire(&tag, ExclusiveLock, false, false); + + mq = shm_mq_create(shm_toc_lookup(toc, 1), COLLECTOR_QUEUE_SIZE); + hdr->request = HISTORY_REQUEST; + + SetLatch(hdr->latch); + + shm_mq_set_receiver(mq, MyProc); + mqh = shm_mq_attach(mq, NULL, NULL); + + observations = receive_observations(mqh); + funcctx->user_fctx = observations; + funcctx->max_calls = observations->count; + + shm_mq_detach(mq); + LockRelease(&tag, ExclusiveLock, false); + + funcctx->tuple_desc = get_history_item_tupledesc(); + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + observations = (History *)funcctx->user_fctx; + + if (observations->index < observations->count) + { + HeapTuple tuple; + HistoryItem *observation; + + observation = &observations->items[observations->index]; + tuple = get_history_item_tuple(observation, funcctx->tuple_desc); + observations->index++; + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); + } + else + { + /* nothing left */ + SRF_RETURN_DONE(funcctx); + } + + PG_RETURN_VOID(); +} + +static PGPROC * +search_proc(int backendPid) +{ + int i; + if (backendPid == 0) + return MyProc; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + for (i = 0; i < ProcGlobal->allProcCount; ++i) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + if (proc->pid && proc->pid == backendPid) + { + LWLockRelease(ProcArrayLock); + return proc; + } + } + + LWLockRelease(ProcArrayLock); + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("backend with pid=%d not found", backendPid))); + return NULL; +} + +PG_FUNCTION_INFO_V1(pg_start_trace); +Datum +pg_start_trace(PG_FUNCTION_ARGS) +{ + PGPROC *proc; + char *filename = PG_GETARG_CSTRING(1); + + if (strlen(filename) >= WAIT_TRACE_FN_LEN) + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("length of filename limited to %d", (WAIT_TRACE_FN_LEN-1)))); + + if (!is_absolute_path(filename)) + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("path must be absolute"))); + + proc = NULL; + if (PG_ARGISNULL(0)) + proc = MyProc; + else + proc = search_proc(PG_GETARG_INT32(0)); + + if (proc != NULL) + { + if (proc->waits.traceOn) + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("trace is already working in backend"))); + + strcpy(proc->waits.traceFn, filename); + proc->waits.traceOn = true; + } + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(pg_is_in_trace); +Datum +pg_is_in_trace(PG_FUNCTION_ARGS) +{ + PGPROC *proc = NULL; + + if (PG_ARGISNULL(0)) + proc = MyProc; + else + proc = search_proc(PG_GETARG_INT32(0)); + + if (proc) + PG_RETURN_BOOL(proc->waits.traceOn); + + PG_RETURN_BOOL(false); +} + +PG_FUNCTION_INFO_V1(pg_stop_trace); +Datum +pg_stop_trace(PG_FUNCTION_ARGS) +{ + PGPROC *proc = NULL; + if (PG_ARGISNULL(0)) + proc = MyProc; + else + proc = search_proc(PG_GETARG_INT32(0)); + + if (proc != NULL) + { + if (!proc->waits.traceOn) + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("trace is not started"))); + + proc->waits.traceOn = false; + } + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(pg_stat_wait_start_wait); + +Datum +pg_stat_wait_start_wait(PG_FUNCTION_ARGS) +{ + int classId = PG_GETARG_INT32(0); + int eventId = PG_GETARG_INT32(1); + int p1 = PG_GETARG_INT32(2); + int p2 = PG_GETARG_INT32(3); + int p3 = PG_GETARG_INT32(4); + int p4 = PG_GETARG_INT32(5); + int p5 = PG_GETARG_INT32(6); + + WAIT_START(classId, eventId, p1, p2, p3, p4, p5); + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(pg_stat_wait_stop_wait); + +Datum +pg_stat_wait_stop_wait(PG_FUNCTION_ARGS) +{ + WAIT_STOP(); + PG_RETURN_VOID(); +} diff --git a/contrib/pg_stat_wait/pg_stat_wait.control b/contrib/pg_stat_wait/pg_stat_wait.control new file mode 100644 index 0000000..c432790 --- /dev/null +++ b/contrib/pg_stat_wait/pg_stat_wait.control @@ -0,0 +1,5 @@ +# pg_stat_wait extension +comment = 'track execution statistics of waits' +default_version = '1.0' +module_pathname = '$libdir/pg_stat_wait' +relocatable = true diff --git a/contrib/pg_stat_wait/pg_stat_wait.h b/contrib/pg_stat_wait/pg_stat_wait.h new file mode 100644 index 0000000..7cd7b8e --- /dev/null +++ b/contrib/pg_stat_wait/pg_stat_wait.h @@ -0,0 +1,83 @@ +#ifndef __PG_STAT_WAIT_H__ +#define __PG_STAT_WAIT_H__ + +#include +#include "storage/proc.h" +#include "utils/timestamp.h" + +#define PG_STAT_WAIT_MAGIC 0xca94b107 +#define COLLECTOR_QUEUE_SIZE 16384 +#define HISTORY_TIME_MULTIPLIER 10 +#define HISTORY_COLUMNS_COUNT (5 + WAIT_PARAMS_COUNT) + +typedef struct +{ + uint32 backendPid; + bool reset; + int backendIdx; + int classIdx; + int eventIdx; +} WaitProfileContext; + +typedef struct +{ + int class_cnt; + int event_cnt; +} WaitEventContext; + +typedef struct +{ + int classId; + int eventId; + int params[WAIT_PARAMS_COUNT]; + int backendPid; + uint64 waitTime; + + TimestampTz ts; +} HistoryItem; + +typedef struct +{ + int idx; + HistoryItem *state; + bool done; + TimestampTz ts; +} WaitCurrentContext; + +typedef struct +{ + bool wraparound; + int index; + int count; + HistoryItem *items; +} History; + +typedef enum +{ + NO_REQUEST, + HISTORY_REQUEST +} SHMRequest; + +typedef struct +{ + Latch *latch; + SHMRequest request; + int historySize; + int historyPeriod; + bool historySkipLatch; +} CollectorShmqHeader; + +extern PGDLLIMPORT char *WAIT_CLASSES[]; +extern PGDLLIMPORT char *WAIT_LOCK_NAMES[]; +extern PGDLLIMPORT char *WAIT_LWLOCK_NAMES[]; +extern PGDLLIMPORT char *WAIT_IO_NAMES[]; +extern PGDLLIMPORT char *WAIT_NETWORK_NAMES[]; +extern PGDLLIMPORT const int WAIT_OFFSETS[]; + +Size CollectorShmemSize(void); +void AllocateCollectorMem(void); +void RegisterWaitsCollector(void); +void AllocHistory(History *, int); +int GetCurrentWaitsState(PGPROC *, HistoryItem *, int); + +#endif diff --git a/contrib/pg_stat_wait/specs/history.spec b/contrib/pg_stat_wait/specs/history.spec new file mode 100644 index 0000000..e4b2e3b --- /dev/null +++ b/contrib/pg_stat_wait/specs/history.spec @@ -0,0 +1,99 @@ +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); + DROP FUNCTION IF EXISTS pg_stat_wait_start_wait(int4, int4, int4, int4, int4, int4, int4); + DROP FUNCTION IF EXISTS pg_stat_wait_stop_wait(); + + CREATE FUNCTION pg_stat_wait_start_wait( + class_id int4, + event_id int4, + p1 int4, + p2 int4, + p3 int4, + p4 int4, + p5 int4 + ) + RETURNS void + AS 'pg_stat_wait.so' + LANGUAGE C VOLATILE STRICT; + + CREATE FUNCTION pg_stat_wait_stop_wait() + RETURNS void + AS 'pg_stat_wait.so' + LANGUAGE C VOLATILE STRICT; +} + +teardown +{ + DROP TABLE IF EXISTS do_write; +} + +session "s0" +step "start_lwlock" { + SELECT pg_stat_wait_start_wait(1, 10, 1, 0, 0, 0, 5); + SELECT pg_sleep(1); +} +step "stop_wait" { + SELECT pg_stat_wait_stop_wait(); +} +step "start_io" { + SELECT pg_stat_wait_start_wait(3, 1, 1, 2, 3, 4, 5); + SELECT pg_sleep(1); +} + +session "s1" +step "get_current_lwlock" { + SELECT + pid > 0, + (now() - sample_ts) < interval '1 hour', + class_id, + class_name, + event_id, + event_name, + p1, p2, p3, p4, p5 + FROM pg_stat_wait_current + WHERE class_id = 1; +} + +step "get_current_io" { + SELECT + pid > 0, + (now() - sample_ts) < interval '1 minute', + class_id, + class_name, + event_id, + event_name, + p1, p2, p3, p4, p5 + FROM pg_stat_wait_current + WHERE class_id = 3; +} + +step "get_profile" { + SELECT pid > 0, class_id, class_name, event_id, event_name, wait_time > 0 + FROM pg_stat_wait_profile + where class_id = 1 AND event_id = 10 AND wait_count > 0; +} + +step "get_history_lwlock" { + SELECT + pid > 0, + (now() - sample_ts) < interval '1 hour', + class_id, + class_name, + event_id, + event_name, + wait_time > 0, + p1, p2, p3, p4, p5 + FROM pg_stat_wait_history + WHERE class_id=1 AND p5=5; +} + +step "get_history_io" { + SELECT (count(*) > 0) AS io_wait_recorded + FROM pg_stat_wait_history + WHERE class_id = 3 AND event_id=1 AND p1=1 AND p2=2 AND p3=3 AND p4=4 AND p5=5; +} + +permutation "start_lwlock" "get_current_lwlock" "stop_wait" "get_profile" "get_history_lwlock" +permutation "start_io" "get_current_io" "stop_wait" "get_profile" "get_history_io" diff --git a/contrib/pg_stat_wait/sql/descriptions.sql b/contrib/pg_stat_wait/sql/descriptions.sql new file mode 100644 index 0000000..8216675 --- /dev/null +++ b/contrib/pg_stat_wait/sql/descriptions.sql @@ -0,0 +1,3 @@ +SELECT * FROM pg_wait_event; +SELECT * FROM pg_wait_class; +SELECT * FROM pg_wait_events; \ No newline at end of file diff --git a/contrib/pg_stat_wait/sql/file_trace.sql b/contrib/pg_stat_wait/sql/file_trace.sql new file mode 100644 index 0000000..c9b4b96 --- /dev/null +++ b/contrib/pg_stat_wait/sql/file_trace.sql @@ -0,0 +1,4 @@ +select pg_start_trace(0, '/tmp/pg_stat_wait.trace'); +select pg_is_in_trace(0); +select pg_stop_trace(0); + diff --git a/contrib/pg_stat_wait/sql/history.sql b/contrib/pg_stat_wait/sql/history.sql new file mode 100644 index 0000000..e69de29 diff --git a/contrib/pg_stat_wait/waits.conf b/contrib/pg_stat_wait/waits.conf new file mode 100644 index 0000000..e542073 --- /dev/null +++ b/contrib/pg_stat_wait/waits.conf @@ -0,0 +1,8 @@ +waits_monitoring = on +waits_flush_period = 10 +shared_preload_libraries = 'pg_stat_wait.so' + +pg_stat_wait.history = on +pg_stat_wait.history_size = 4096 +pg_stat_wait.history_period = 1 +pg_stat_wait.history_skip_latch = on diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1dd31b3..e29ccbe 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -63,6 +63,7 @@ #include "storage/reinit.h" #include "storage/smgr.h" #include "storage/spin.h" +#include "storage/wait.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -4661,8 +4662,10 @@ XLOGShmemInit(void) LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, &XLogCtl->Insert.WALInsertLockTranche); for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++) { - LWLockInitialize(&WALInsertLocks[i].l.lock, + LWLock *lock = &WALInsertLocks[i].l.lock; + LWLockInitialize(lock, XLogCtl->Insert.WALInsertLockTrancheId); + lock->group = WAIT_LWLOCK_WAL_INSERT; WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr; } diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 4a650cc..05e7b6a 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -36,6 +36,7 @@ #include "tcop/tcopprot.h" #include "utils/memutils.h" #include "storage/proc.h" +#include "storage/wait.h" char *ssl_cert_file; @@ -129,6 +130,8 @@ secure_read(Port *port, void *ptr, size_t len) ssize_t n; int waitfor; + WAIT_START(WAIT_NETWORK, WAIT_NETWORK_READ, 0, 0, 0, 0, 0); + retry: #ifdef USE_SSL waitfor = 0; @@ -175,6 +178,7 @@ retry: * interrupts from being processed. */ ProcessClientReadInterrupt(false); + WAIT_STOP(); return n; } @@ -209,6 +213,8 @@ secure_write(Port *port, void *ptr, size_t len) ssize_t n; int waitfor; + WAIT_START(WAIT_NETWORK, WAIT_NETWORK_WRITE, 0, 0, 0, 0, 0); + retry: waitfor = 0; #ifdef USE_SSL @@ -254,6 +260,7 @@ retry: * interrupts from being processed. */ ProcessClientWriteInterrupt(false); + WAIT_STOP(); return n; } diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c index 147e22c..c53807f 100644 --- a/src/backend/port/unix_latch.c +++ b/src/backend/port/unix_latch.c @@ -55,6 +55,7 @@ #include "storage/latch.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "storage/wait.h" /* Are we currently in WaitLatch? The signal handler would like to know. */ static volatile sig_atomic_t waiting = false; @@ -262,6 +263,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, #endif } + WAIT_START(WAIT_LATCH, 0, 0, 0, 0, 0, 0); waiting = true; do { @@ -494,6 +496,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, } } while (result == 0); waiting = false; + WAIT_STOP(); return result; } diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c index ee95262..2c2d71c 100644 --- a/src/backend/port/win32_latch.c +++ b/src/backend/port/win32_latch.c @@ -31,6 +31,7 @@ #include "storage/latch.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "storage/wait.h" void @@ -177,6 +178,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, /* Ensure that signals are serviced even if latch is already set */ pgwin32_dispatch_queued_signals(); + WAIT_START(WAIT_LATCH, 0, 0, 0, 0, 0, 0); do { /* @@ -275,6 +277,8 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, } } while (result == 0); + WAIT_STOP(); + /* Clean up the event object we created for the socket */ if (sockevent != WSA_INVALID_EVENT) { diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index f4ba86e..404dd3b 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -89,6 +89,7 @@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/copydir.h" +#include "storage/wait.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -142,7 +143,7 @@ typedef struct ReplicationStateOnDisk typedef struct ReplicationStateCtl { - int tranche_id; + int tranche_id; LWLockTranche tranche; ReplicationState states[FLEXIBLE_ARRAY_MEMBER]; } ReplicationStateCtl; @@ -479,8 +480,11 @@ ReplicationOriginShmemInit(void) MemSet(replication_states, 0, ReplicationOriginShmemSize()); for (i = 0; i < max_replication_slots; i++) - LWLockInitialize(&replication_states[i].lock, - replication_states_ctl->tranche_id); + { + LWLock *lock = &replication_states[i].lock; + LWLockInitialize(lock, replication_states_ctl->tranche_id); + lock->group = WAIT_LWLOCK_REPLICATIONORIGIN; + } } LWLockRegisterTranche(replication_states_ctl->tranche_id, diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 32ac58f..d438be8 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -43,6 +43,7 @@ #include "storage/procsignal.h" #include "storage/sinvaladt.h" #include "storage/spin.h" +#include "storage/wait.h" shmem_startup_hook_type shmem_startup_hook = NULL; @@ -139,6 +140,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, WaitsShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -250,6 +252,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); + WaitsAllocateShmem(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 4f3c5c9..44426af 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -56,6 +56,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "storage/spin.h" +#include "storage/wait.h" #include "utils/builtins.h" #include "utils/rel.h" #include "utils/snapmgr.h" @@ -333,6 +334,8 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid) DisplayXidCache(); #endif + WaitsFreeBackendCells(proc); + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); if (TransactionIdIsValid(latestXid)) diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile index e12a854..ea612b9 100644 --- a/src/backend/storage/lmgr/Makefile +++ b/src/backend/storage/lmgr/Makefile @@ -12,7 +12,8 @@ subdir = src/backend/storage/lmgr top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o predicate.o +OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o predicate.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 1eb2d4b..b2e086c 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -43,6 +43,7 @@ #include "storage/sinvaladt.h" #include "storage/spin.h" #include "storage/standby.h" +#include "storage/wait.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/resowner_private.h" @@ -979,8 +980,16 @@ LockAcquireExtended(const LOCKTAG *locktag, locktag->locktag_type, lockmode); + WAIT_START(WAIT_LOCK, locktag->locktag_type, lockmode, + locktag->locktag_field1, + locktag->locktag_field2, + locktag->locktag_field3, + locktag->locktag_field4); + WaitOnLock(locallock, owner); + WAIT_STOP(); + TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1, locktag->locktag_field2, locktag->locktag_field3, diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 46cab49..88a7bea 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -87,6 +87,7 @@ #include "storage/predicate.h" #include "storage/proc.h" #include "storage/spin.h" +#include "storage/wait.h" #include "utils/memutils.h" #ifdef LWLOCK_STATS @@ -127,6 +128,7 @@ static int LWLockTranchesAllocated = 0; * where we have special measures to pass it down). */ LWLockPadded *MainLWLockArray = NULL; +int *main_lwlock_groups = NULL; static LWLockTranche MainLWLockTranche; /* @@ -318,59 +320,101 @@ get_lwlock_stats_entry(LWLock *lock) #endif /* LWLOCK_STATS */ +/* Initiate 2-dimensional array with lwlock groups. + * We use it in wait.c for calculating lwlock group + * First dimension contains count of lwlocks contained in this group + * Second dimension contains wait type (index of group) + */ +static int +init_main_lwlock_groups() +{ + int idx = 0; + if (main_lwlock_groups == NULL) + { + main_lwlock_groups = MemoryContextAllocZero(TopMemoryContext, + sizeof(int) * NUM_ADD_LWLOCK_GROUPS * 2); + } + + /* bufmap lwlocks */ + main_lwlock_groups[0] = LOCK_MANAGER_LWLOCK_OFFSET; + main_lwlock_groups[1] = NUM_INDIVIDUAL_LWLOCKS; + + /* lock manager lwlocks */ + idx++; + main_lwlock_groups[idx * 2] = PREDICATELOCK_MANAGER_LWLOCK_OFFSET; + main_lwlock_groups[idx * 2 + 1] = NUM_INDIVIDUAL_LWLOCKS + idx; + + /* predicate lwlocks */ + idx++; + main_lwlock_groups[idx * 2] = NUM_FIXED_LWLOCKS; + main_lwlock_groups[idx * 2 + 1] = NUM_INDIVIDUAL_LWLOCKS + idx; + return idx; +} + + /* * Compute number of LWLocks to allocate in the main array. */ static int NumLWLocks(void) { - int numLocks; + int numLocks, i; + int sizes[NUM_DYN_LWLOCK_GROUPS] = { + /* bufmgr.c needs two for each shared buffer */ + 2 * NBuffers, - /* - * Possibly this logic should be spread out among the affected modules, - * the same way that shmem space estimation is done. But for now, there - * are few enough users of LWLocks that we can get away with just keeping - * the knowledge here. - */ + /* proc.c needs one for each backend or auxiliary process */ + MaxBackends + NUM_AUXILIARY_PROCS, - /* Predefined LWLocks */ - numLocks = NUM_FIXED_LWLOCKS; - - /* bufmgr.c needs two for each shared buffer */ - numLocks += 2 * NBuffers; - - /* proc.c needs one for each backend or auxiliary process */ - numLocks += MaxBackends + NUM_AUXILIARY_PROCS; + /* clog.c needs one per CLOG buffer */ + CLOGShmemBuffers(), - /* clog.c needs one per CLOG buffer */ - numLocks += CLOGShmemBuffers(); + /* commit_ts.c needs one per CommitTs buffer */ + CommitTsShmemBuffers(), - /* commit_ts.c needs one per CommitTs buffer */ - numLocks += CommitTsShmemBuffers(); + /* subtrans.c needs one per SubTrans buffer */ + NUM_SUBTRANS_BUFFERS, - /* subtrans.c needs one per SubTrans buffer */ - numLocks += NUM_SUBTRANS_BUFFERS; + /* multixact.c needs two SLRU areas */ + NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS, - /* multixact.c needs two SLRU areas */ - numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS; + /* async.c needs one per Async buffer */ + NUM_ASYNC_BUFFERS, - /* async.c needs one per Async buffer */ - numLocks += NUM_ASYNC_BUFFERS; + /* predicate.c needs one per old serializable xid buffer */ + NUM_OLDSERXID_BUFFERS, - /* predicate.c needs one per old serializable xid buffer */ - numLocks += NUM_OLDSERXID_BUFFERS; + /* slot.c needs one for each slot */ + max_replication_slots, - /* slot.c needs one for each slot */ - numLocks += max_replication_slots; + /* + * Add any requested by loadable modules; for backwards-compatibility + * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if + * there are no explicit requests. + */ + Max(lock_addin_request, NUM_USER_DEFINED_LWLOCKS) + }; + int idx = init_main_lwlock_groups(); /* - * Add any requested by loadable modules; for backwards-compatibility - * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if - * there are no explicit requests. + * Possibly this logic should be spread out among the affected modules, + * the same way that shmem space estimation is done. But for now, there + * are few enough users of LWLocks that we can get away with just keeping + * the knowledge here. */ - lock_addin_request_allowed = false; - numLocks += Max(lock_addin_request, NUM_USER_DEFINED_LWLOCKS); + + /* Predefined LWLocks */ + numLocks = NUM_FIXED_LWLOCKS; + for (i=0; i < NUM_DYN_LWLOCK_GROUPS; ++i) + { + numLocks += sizes[i]; + idx++; + main_lwlock_groups[idx*2] = numLocks; + main_lwlock_groups[idx*2+1] = NUM_INDIVIDUAL_LWLOCKS + idx; + } + + lock_addin_request_allowed = false; return numLocks; } @@ -568,6 +612,7 @@ LWLockInitialize(LWLock *lock, int tranche_id) pg_atomic_init_u32(&lock->nwaiters, 0); #endif lock->tranche = tranche_id; + lock->group = -1; dlist_init(&lock->waiters); } @@ -1036,6 +1081,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) #endif TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode); + WAIT_LWLOCK_START(lock, mode); for (;;) { @@ -1057,6 +1103,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) } #endif + WAIT_STOP(); TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode); LOG_LWDEBUG("LWLockAcquire", lock, "awakened"); @@ -1198,6 +1245,7 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) lwstats->block_count++; #endif TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode); + WAIT_LWLOCK_START(lock, mode); for (;;) { @@ -1215,6 +1263,7 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) Assert(nwaiters < MAX_BACKENDS); } #endif + WAIT_STOP(); TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode); LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened"); @@ -1400,6 +1449,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE); + WAIT_LWLOCK_START(lock, LW_EXCLUSIVE); for (;;) { @@ -1418,6 +1468,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) } #endif + WAIT_STOP(); TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE); diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 455ad26..e29fe21 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -49,6 +49,7 @@ #include "storage/procarray.h" #include "storage/procsignal.h" #include "storage/spin.h" +#include "storage/wait.h" #include "utils/timeout.h" #include "utils/timestamp.h" @@ -380,6 +381,8 @@ InitProcess(void) #endif MyProc->recoveryConflictPending = false; + WaitsInitProcessFields(MyProc); + /* Initialize fields for sync rep */ MyProc->waitLSN = 0; MyProc->syncRepState = SYNC_REP_NOT_WAITING; @@ -526,6 +529,9 @@ InitAuxiliaryProcess(void) MyProc->lwWaitMode = 0; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + + WaitsInitProcessFields(MyProc); + #ifdef USE_ASSERT_CHECKING { int i; diff --git a/src/backend/storage/lmgr/wait.c b/src/backend/storage/lmgr/wait.c new file mode 100644 index 0000000..f713767 --- /dev/null +++ b/src/backend/storage/lmgr/wait.c @@ -0,0 +1,519 @@ +#include "postgres.h" +#include "storage/wait.h" +#include "utils/datetime.h" +#include "utils/memutils.h" +#include "port/atomics.h" +#include + +extern int max_replication_slots; +extern int *main_lwlock_groups; +static slock_t *WaitCounterLock; +static bool lwlock_group_notified = false; + +void *WaitShmem; +bool WaitsOn; +bool WaitsHistoryOn; +int WaitsFlushPeriod; + +#define SHMEM_WAIT_CELLS ((BackendWaitCells *)((char *)WaitCounterLock \ + + MAXALIGN(sizeof(slock_t)))) + +const char *WAIT_CLASSES[WAITS_COUNT] = +{ + "CPU", + "LWLocks", + "Locks", + "Storage", + "Latch", + "Network" +}; + +const char *WAIT_LOCK_NAMES[WAIT_LOCKS_COUNT] = +{ + "Relation", + "RelationExtend", + "Page", + "Tuple", + "Transaction", + "VirtualTransaction", + "SpeculativeToken", + "Object", + "Userlock", + "Advisory", +}; + +/* We keep all LWLock names here. Order is very important here. + * Index is equal to lwlock group field + */ +const char *WAIT_LWLOCK_NAMES[WAIT_LWLOCKS_COUNT] = +{ + "", /* formely was BufFreelistLock */ + "ShmemIndexLock", + "OidGenLock", + "XidGenLock", + "ProcArrayLock", + "SInvalReadLock", + "SInvalWriteLock", + "WALBufMappingLock", + "WALWriteLock", + "ControlFileLock", + "CheckpointLock", + "CLogControlLock", + "SubtransControlLock", + "MultiXactGenLock", + "MultiXactOffsetControlLock", + "MultiXactMemberControlLock", + "RelCacheInitLock", + "CheckpointerCommLock", + "TwoPhaseStateLock", + "TablespaceCreateLock", + "BtreeVacuumLock", + "AddinShmemInitLock", + "AutovacuumLock", + "AutovacuumScheduleLock", + "SyncScanLock", + "RelationMappingLock", + "AsyncCtlLock", + "AsyncQueueLock", + "SerializableXactHashLock", + "SerializableFinishedListLock", + "SerializablePredicateLockListLock", + "OldSerXidLock", + "SyncRepLock", + "BackgroundWorkerLock", + "DynamicSharedMemoryControlLock", + "AutoFileLock", + "ReplicationSlotAllocationLock", + "ReplicationSlotControlLock", + "CommitTsControlLock", + "CommitTsLock", + "ReplicationOriginLock", + "BufferPartitionLock", + "LockManagerPartitionLock", + "PredicatePartitionLock", + "SharedBufferLocks", + "ProcessLock", + "CLogBufferLock", + "CommitTsBufferLock", + "SubtransBufferLock", + "MultixactBufferLock", + "AsyncBufferLock", + "OldSerializalbeXidBufferLock", + "ReplicationSlotLock", + "UserDefinedLock", + "WALInsertLocks", + "ReplicationOriginLocks", +}; + +const char *WAIT_IO_NAMES[WAIT_IO_EVENTS_COUNT] = +{ + "READ", + "WRITE", +}; + +const char *WAIT_NETWORK_NAMES[WAIT_NETWORK_EVENTS_COUNT] = +{ + "READ", + "WRITE" +}; + +const int WAIT_OFFSETS[] = +{ + 0, /* skip */ + WAIT_LWLOCKS_OFFSET, + WAIT_LOCKS_OFFSET, + WAIT_IO_OFFSET, + WAIT_LATCH_OFFSET, + WAIT_NETWORK_OFFSET +}; + +/* Returns event name for wait. All names defined in arrays above */ +const char * +WaitsEventName(int classId, int eventId) +{ + switch (classId) + { + case WAIT_LOCK: return WAIT_LOCK_NAMES[eventId]; + case WAIT_LWLOCK: return WAIT_LWLOCK_NAMES[eventId]; + case WAIT_IO: return WAIT_IO_NAMES[eventId]; + case WAIT_NETWORK: return WAIT_NETWORK_NAMES[eventId]; + case WAIT_LATCH: return WAIT_CLASSES[WAIT_LATCH]; + case WAIT_CPU: return WAIT_CLASSES[WAIT_CPU]; + }; + return NULL; +} + +/* + * Gets lwlock group. If this lwlock from main tranche, returns index + * from `main_lwlock_groups`, for other tranches that group must be + * defined manually (by define in wait.h, and then assigning group in + * tranche lwlocks creation) + */ +static int +get_lwlock_group(volatile LWLock *lock) +{ + int group = -1; + if (lock->tranche == 0) + { + int numLocks, i; + int idx = (union LWLockPadded *)lock - MainLWLockArray; + + if (idx < NUM_INDIVIDUAL_LWLOCKS) + group = idx; + + numLocks = NUM_INDIVIDUAL_LWLOCKS; + for (i=0; i < NUM_ADD_LWLOCK_GROUPS; i++) + { + numLocks += main_lwlock_groups[i*2]; + if (idx < numLocks) + { + group = main_lwlock_groups[i*2+1]; + break; + } + } + } + else if (!lwlock_group_notified) + { + elog(INFO, "LWLock with tranche id #%d not monitored", lock->tranche); + lwlock_group_notified = true; + } + + return group; +} + +static void +write_trace_start(FILE *fd, int classId, int eventId, + int p1, int p2, int p3, int p4, int p5) +{ + TimestampTz current_ts; + int n; + + /* Buffer calculation: + * 4 integers + * timestamp (MAXDATELEN) + * 33 for max name (SerializablePredicateLockListLock at this time) + * 7 for max wait class (Storage or Network) + * and spaces + * format like: start 2015-05-18 06:52:03.244103-04 LWlocks SerializablePredicateLockListLock 0 0 0 0 + */ + + char buf[10 * 4 + MAXDATELEN + 33 + 7 + 10]; + const char *event_name; + + Assert(fd != NULL); + current_ts = GetCurrentTimestamp(); + event_name = WaitsEventName(classId, eventId); + n = snprintf(buf, sizeof(buf), "start %s %s %s %d %d %d %d %d\n", + DatumGetCString(DirectFunctionCall1(timestamptz_out, current_ts)), + WAIT_CLASSES[classId], + event_name == NULL? "" : event_name, + p1, p2, p3, p4, p5); + + if (n != -1) + { + fwrite(buf, sizeof(char), n, fd); + fflush(fd); + } + else + elog(INFO, "Wait trace formatting error"); +} + +static void +write_trace_stop(FILE *fd, int classId) +{ + TimestampTz current_ts; + int n; + char buf[MAXDATELEN + 33 + 5]; + + Assert(fd != NULL); + current_ts = GetCurrentTimestamp(); + n = snprintf(buf, sizeof(buf), "stop %s %s\n", + DatumGetCString(DirectFunctionCall1(timestamptz_out, current_ts)), + WAIT_CLASSES[classId]); + + if (n != -1) + { + fwrite(buf, sizeof(char), n, fd); + fflush(fd); + } +} + +/* Flushes all waits from backend local memory to shared memory block */ +static void +flush_waits(ProcWaits *waits) +{ + int offset; + bool flushed; + BackendWaitCells *sh_cells; + + if (waits->smWaitCells == NULL) + return; + + sh_cells = (BackendWaitCells *)(waits->smWaitCells); + + for (offset=0; offset < WAIT_EVENTS_COUNT; offset++) + { + WaitCell *cell = ((WaitCell *)waits->waitCells) + offset; + if (cell->count == 0) + continue; + + /* If TAS ok we can update data in shared memory, + * if not then we skip this time + */ + if (pg_atomic_test_set_flag(&sh_cells->isBusy)) + { + sh_cells->cells[offset].interval += cell->interval; + sh_cells->cells[offset].count += cell->count; + cell->count = cell->interval = 0; + pg_atomic_clear_flag(&sh_cells->isBusy); + flushed = true; + } + } + + if (flushed) + INSTR_TIME_SET_CURRENT(waits->flushTime); +} + +/* Init backend's block in shared memory + * Backends will flush data to this block by some interval + */ +static void +init_backend_shmem_cells(PGPROC *proc) +{ + int *counter; + bool counter_was_restarted = false; + BackendWaitCells *cells, *curcells; + + // init variables + counter = (int *)WaitShmem; + // start of cells + cells = SHMEM_WAIT_CELLS; + + Assert(proc->waits.smWaitCells == NULL); + SpinLockAcquire(WaitCounterLock); + + do + { + if (*counter >= MaxBackends) + { + if (counter_was_restarted) + { + elog(INFO, "No available wait cells for backend: %d", proc->pid); + break; + } + *counter = 0; + counter_was_restarted = true; + } + + curcells = cells + (*counter)++; + if (pg_atomic_test_set_flag(&curcells->isTaken)) + { + do + { + /* Wait until block is certainly free */ + } while (!pg_atomic_unlocked_test_flag(&curcells->isBusy)); + + pg_atomic_init_flag(&curcells->isBusy); + curcells->backendPid = proc->pid; + MemSet(curcells->cells, 0, sizeof(WaitCell) * WAIT_EVENTS_COUNT); + proc->waits.smWaitCells = (void *) curcells; + break; + } + } while (1); + + SpinLockRelease(WaitCounterLock); +} + +/* Sets current wait in backend, it fills current buffer and remembers + * time when wait is started. Current buffer is opposite of current + * reading buffer. When collector reads data from its buffer, it sets + * -1 to reading index and backend can switch buffers + */ +void +StartWait(int classId, int eventId, int p1, int p2, int p3, int p4, int p5) +{ + ProcWaits *waits; + ProcWait *curwait; + + Assert(classId > 0 && classId < WAITS_COUNT); + + if (!MyProc) + return; + + /* preventing nested waits */ + waits = &MyProc->waits; + if (waits->nested++ > 0) return; + Assert(waits->nested == 1); + + /* if tracing was started with `pg_start_trace`, + * we initialize it here + */ + if (waits->traceOn && waits->traceFd == NULL) + { + waits->traceFd = fopen(waits->traceFn, "w"); + if (waits->traceFd == NULL) + { + waits->traceOn = false; + elog(WARNING, "could not open trace file \"%s\": %m", + waits->traceFn); + } + else + elog(INFO, "Trace was started to: %s", waits->traceFn); + } + else if (!waits->traceOn && waits->traceFd != NULL) + { + fclose(waits->traceFd); + waits->traceFd = NULL; + elog(INFO, "Trace was stopped"); + } + + if (waits->traceFd != NULL) + write_trace_start(waits->traceFd, classId, eventId, + p1, p2, p3, p4, p5); + + /* switching buffers */ + waits->writeIdx = !waits->readIdx; + curwait = &waits->waitsBuf[waits->writeIdx]; + curwait->classId = classId; + curwait->eventId = eventId; + curwait->params[0] = p1; + curwait->params[1] = p2; + curwait->params[2] = p3; + curwait->params[3] = p4; + curwait->params[4] = p5; + INSTR_TIME_SET_CURRENT(curwait->startTime); + + /* we don't care about result, if reader didn't changed it index, + then keep the value */ + if (waits->readIdx == -1) + waits->readIdx = waits->writeIdx; +} + +/* Special function for lwlock waits, because we need to determine lwlock + * group + */ +void +StartLWLockWait(volatile LWLock *lock, LWLockMode mode) +{ + if (lock->group == -1) + lock->group = get_lwlock_group(lock); + + if (lock->group >= 0) + WAIT_START(WAIT_LWLOCK, lock->group, mode, 0, 0, 0, 0); +} + +/* Stops current wait, calculates interval of wait, and flushes + * collected waits info to shared memory if last flush has been more than + * WaitsFlushPeriod milliseconds ago + */ +void +StopWait() +{ + int offset; + WaitCell *waitCell; + instr_time currentTime, currentTimeCopy; + ProcWaits *waits; + ProcWait *curwait; + + if (!MyProc) + return; + + waits = &MyProc->waits; + + /* prevent nested waits */ + if ((--waits->nested) > 0) return; + Assert(waits->nested == 0); + + /* first thing we save the time after wait */ + INSTR_TIME_SET_CURRENT(currentTime); + currentTimeCopy = currentTime; + curwait = &waits->waitsBuf[waits->writeIdx]; + + /* file tracing */ + if (waits->traceFd != NULL) + write_trace_stop(waits->traceFd, curwait->classId); + + /* determine offset of current wait in proc wait cells */ + offset = WAIT_OFFSETS[(int)curwait->classId] + curwait->eventId; + Assert(offset <= WAIT_EVENTS_COUNT); + + waitCell = &((WaitCell *)waits->waitCells)[offset]; + INSTR_TIME_SUBTRACT(currentTime, curwait->startTime); + waitCell->interval += INSTR_TIME_GET_MICROSEC(currentTime); + waitCell->count++; + + /* determine difference between last flush time, and write + * current profile to cells in shared memory if needed + */ + INSTR_TIME_SUBTRACT(currentTimeCopy, waits->flushTime); + if ((long) INSTR_TIME_GET_MICROSEC(currentTimeCopy) >= (1000L * WaitsFlushPeriod)) + flush_waits(waits); +} + + +/* Returns size in shared memory enough to hold data of all procs */ +Size +WaitsShmemSize() +{ + int size; + + size = mul_size(MaxBackends, sizeof(BackendWaitCells)); + size = add_size(size, sizeof(int)); // for counter + size = add_size(size, MAXALIGN(sizeof(slock_t))); //for counter lock + return size; +} + +/* Allocate space in shared memory */ +void +WaitsAllocateShmem() +{ + BackendWaitCells *cells; + int i; + + Size size = WaitsShmemSize(); + WaitShmem = ShmemAlloc(size); + MemSet(WaitShmem, 0, size); + WaitCounterLock = (slock_t *)((char *)WaitShmem + sizeof(int)); + + cells = SHMEM_WAIT_CELLS; + for (i=0; i < MaxBackends; i++) + pg_atomic_init_flag(&cells->isTaken); + + SpinLockInit(WaitCounterLock); +} + +/* Marks reserved block in shared memory used by process as free, so new + * processes can take it + */ +void +WaitsFreeBackendCells(PGPROC *proc) +{ + // deattach backend from waits shared memory + if (proc->waits.smWaitCells != NULL) + { + BackendWaitCells *cells; + + flush_waits(&proc->waits); + cells = ((BackendWaitCells *)proc->waits.smWaitCells); + + /* Stop writing to shmem */ + proc->waits.smWaitCells = NULL; + + /* Mark shmem block as free */ + pg_atomic_clear_flag(&cells->isTaken); + } +} + +/* Init fields needed by monitoring in PGPROC structure. Also it reserves + * block in shared memory + */ +void +WaitsInitProcessFields(PGPROC *proc) +{ + MemSet(&proc->waits, 0, sizeof(ProcWaits)); + MemSet(proc->waits.traceFn, 0, WAIT_TRACE_FN_LEN); + proc->waits.waitCells = MemoryContextAllocZero(TopMemoryContext, + sizeof(WaitCell) * WAIT_EVENTS_COUNT); + proc->waits.readIdx = -1; + init_backend_shmem_cells(proc); +} diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 42a43bb..7d317c8 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -34,6 +34,7 @@ #include "storage/bufmgr.h" #include "storage/relfilenode.h" #include "storage/smgr.h" +#include "storage/wait.h" #include "utils/hsearch.h" #include "utils/memutils.h" #include "pg_trace.h" @@ -680,6 +681,14 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, reln->smgr_rnode.node.relNode, reln->smgr_rnode.backend); + WAIT_START(WAIT_IO, + WAIT_IO_READ, + reln->smgr_rnode.node.spcNode, + reln->smgr_rnode.node.dbNode, + reln->smgr_rnode.node.relNode, + forknum, + blocknum); + v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL); seekpos = (off_t) BLCKSZ *(blocknum % ((BlockNumber) RELSEG_SIZE)); @@ -694,6 +703,7 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ); + WAIT_STOP(); TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, @@ -754,6 +764,12 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode, reln->smgr_rnode.backend); + WAIT_START(WAIT_IO, + WAIT_IO_WRITE, + reln->smgr_rnode.node.spcNode, + reln->smgr_rnode.node.dbNode, + reln->smgr_rnode.node.relNode, + forknum, blocknum); v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_FAIL); @@ -769,6 +785,7 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ); + WAIT_STOP(); TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 595a609..ef7816d 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -69,6 +69,7 @@ #include "storage/pg_shmem.h" #include "storage/proc.h" #include "storage/predicate.h" +#include "storage/wait.h" #include "tcop/tcopprot.h" #include "tsearch/ts_cache.h" #include "utils/builtins.h" @@ -1618,6 +1619,17 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"waits_monitoring", PGC_SIGHUP, WAITS_MONITORING, + gettext_noop("Monitore waits"), + NULL, + GUC_NOT_IN_SAMPLE + }, + &WaitsOn, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -2673,6 +2685,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"waits_flush_period", PGC_SIGHUP, WAITS_MONITORING, + gettext_noop("Period (in milliseconds) after that profiles from backend flushed to shared memory"), + NULL, + GUC_UNIT_MS + }, + &WaitsFlushPeriod, + 1000, 10, 3600000, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index cff3b99..67e6d51 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -49,6 +49,7 @@ typedef struct LWLock { slock_t mutex; /* Protects LWLock and queue of PGPROCs */ uint16 tranche; /* tranche ID */ + char group; /* for monitoring */ pg_atomic_uint32 state; /* state of exlusive/nonexclusive lockers */ #ifdef LOCK_DEBUG @@ -164,6 +165,21 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray; #define NUM_FIXED_LWLOCKS \ (PREDICATELOCK_MANAGER_LWLOCK_OFFSET + NUM_PREDICATELOCK_PARTITIONS) +/* LWLocks grouped by type, these groups are used as LWLock id in + * waits monitoring. Each group has name in wait.c. In case of new + * lwlocks they must be added in WAIT_LWLOCK_NAMES between main tranche + * and other tranche lwlocks +*/ + +/* Number of groups with dynamic sizes in main tranche */ +#define NUM_DYN_LWLOCK_GROUPS 10 + +/* Number of groups that are not individual. Includes lwlock groups + * defined above by offset and with dynamic size + */ +#define NUM_ADD_LWLOCK_GROUPS (3 + NUM_DYN_LWLOCK_GROUPS) +#define NUM_LWLOCK_GROUPS (NUM_INDIVIDUAL_LWLOCKS + NUM_ADD_LWLOCK_GROUPS) + typedef enum LWLockMode { LW_EXCLUSIVE, diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index e807a2e..0a47378 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -19,6 +19,7 @@ #include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" +#include "portability/instr_time.h" /* * Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds @@ -33,6 +34,34 @@ */ #define PGPROC_MAX_CACHED_SUBXIDS 64 /* XXX guessed-at value */ +#define WAIT_PARAMS_COUNT 5 +#define WAIT_TRACE_FN_LEN (4096 + 1) + +typedef struct ProcWait +{ + int classId, eventId; + int params[WAIT_PARAMS_COUNT]; + instr_time startTime; +} ProcWait; + +typedef struct ProcWaits +{ + /* Double buffering */ + volatile int readIdx; + ProcWait waitsBuf[2]; + int writeIdx; + + int nested; /* for detecting nested waits */ + void *smWaitCells; /* link to cells in shared memory */ + void *waitCells; + instr_time flushTime; + + /* Trace support */ + volatile bool traceOn; + char traceFn[WAIT_TRACE_FN_LEN]; + FILE *traceFd; +} ProcWaits; + struct XidCache { TransactionId xids[PGPROC_MAX_CACHED_SUBXIDS]; @@ -143,6 +172,7 @@ struct PGPROC bool fpVXIDLock; /* are we holding a fast-path VXID lock? */ LocalTransactionId fpLocalTransactionId; /* lxid for fast-path VXID * lock */ + ProcWaits waits; }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ diff --git a/src/include/storage/wait.h b/src/include/storage/wait.h new file mode 100644 index 0000000..2f9f8d5 --- /dev/null +++ b/src/include/storage/wait.h @@ -0,0 +1,122 @@ +#ifndef _WAIT_H_ +#define _WAIT_H_ + +#include +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/spin.h" +#include "portability/instr_time.h" + +/* + Waits logging + + There are two main functions: StartWait and StopWait. + StartWait is called at the beginning, StopWait at the end of wait. + Every wait has it's own parameters. Parameters count must be equal with + WAIT_PARAMS_COUNT. Every backend contains sum of intervals and count for each wait. + GUC `waits_flush_period` regulate how often data from backend will be flushed + to shared memory and will be visible in views. + In shared memory we allocate space that is enough to hold data for all backends. + When process is starting it reservers block in shared memory, + when dies it marks that the block is free. + + Monitored waits by now: + + 1) Heavy-weight locks (lock.c) + 2) LW-Locks (lwlock.c) + 3) IO read-write (md.c) + 4) Network (be-secure.c) + 5) Latches (pg_latch.c) + */ + +/* count of waits defined below */ +#define WAITS_COUNT 6 + +/* wait classes */ +#define WAIT_CPU 0 +#define WAIT_LWLOCK 1 +#define WAIT_LOCK 2 +#define WAIT_IO 3 +#define WAIT_LATCH 4 +#define WAIT_NETWORK 5 + +/* wait groups for additional tranches */ +#define WAIT_LWLOCK_WAL_INSERT NUM_LWLOCK_GROUPS +#define WAIT_LWLOCK_REPLICATIONORIGIN (NUM_LWLOCK_GROUPS + 1) + +#define WAIT_LWLOCK_LAST_TYPE WAIT_LWLOCK_REPLICATIONORIGIN + +/* storage events */ +#define WAIT_IO_READ 0 +#define WAIT_IO_WRITE 1 + +/* network events */ +#define WAIT_NETWORK_READ 0 +#define WAIT_NETWORK_WRITE 1 + +/* length of arrays from wait.c */ +#define WAIT_LWLOCKS_COUNT (WAIT_LWLOCK_LAST_TYPE + 1) +#define WAIT_LOCKS_COUNT (LOCKTAG_LAST_TYPE + 1) +#define WAIT_IO_EVENTS_COUNT 2 +#define WAIT_LATCH_EVENTS_COUNT 1 +#define WAIT_NETWORK_EVENTS_COUNT 2 + +#define WAIT_LWLOCKS_OFFSET 0 +#define WAIT_LOCKS_OFFSET (WAIT_LWLOCKS_OFFSET + WAIT_LWLOCKS_COUNT) +#define WAIT_IO_OFFSET (WAIT_LOCKS_OFFSET + WAIT_LOCKS_COUNT) +#define WAIT_LATCH_OFFSET (WAIT_IO_OFFSET + WAIT_IO_EVENTS_COUNT) +#define WAIT_NETWORK_OFFSET (WAIT_LATCH_OFFSET + WAIT_LATCH_EVENTS_COUNT) +#define WAIT_EVENTS_COUNT (WAIT_NETWORK_OFFSET + WAIT_NETWORK_EVENTS_COUNT) + +#define WAIT_START(classId, eventId, p1, p2, p3, p4, p5) \ + do { \ + if (WaitsOn) StartWait(classId, eventId, p1, p2, p3, p4, p5);\ + } while(0) + +#define WAIT_LWLOCK_START(lock, mode) \ + do { \ + if (WaitsOn) StartLWLockWait(lock, mode); \ + } while (0); + +#define WAIT_STOP() \ + do { \ + if (WaitsOn) StopWait(); \ + } while (0); + +typedef struct +{ + uint64 count; /* count of waits */ + uint64 interval; /* in microseconds */ +} WaitCell; + +/* To avoid waste of memory, we keep all waits data in one array, + * and each class of wait has its offset in that array. + * Offsets defined in WAIT_OFFSETS const array. + */ +typedef struct +{ + /* indicates that block is buzy (by backend or user query) at current time */ + volatile pg_atomic_flag isBusy; + /* marks that block is already taken by backend in shared memory */ + volatile pg_atomic_flag isTaken; + int backendPid; + WaitCell cells[WAIT_EVENTS_COUNT]; +} BackendWaitCells; + +void StartWait(int classId, int eventId, int p1, int p2, int p3, int p4, int p5); +void StartLWLockWait(volatile LWLock *lock, LWLockMode mode); +void StopWait(void); +void WaitsAllocateShmem(void); +void WaitsFreeBackendCells(PGPROC *proc); +void WaitsInitProcessFields(PGPROC *proc); + +Size WaitsShmemSize(void); +const char *WaitsEventName(int classId, int eventId); + +extern PGDLLIMPORT bool WaitsOn; +extern PGDLLIMPORT bool WaitsHistoryOn; +extern PGDLLIMPORT int WaitsFlushPeriod; +extern PGDLLIMPORT int MaxBackends; +extern PGDLLIMPORT void *WaitShmem; + +#endif diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index 7a58ddb..86fa06b 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -97,7 +97,8 @@ enum config_group ERROR_HANDLING_OPTIONS, PRESET_OPTIONS, CUSTOM_OPTIONS, - DEVELOPER_OPTIONS + DEVELOPER_OPTIONS, + WAITS_MONITORING }; /*