diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index dfa9d0d641..e17f2de3d1 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -314,6 +314,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + pg_stat_replication_slotspg_stat_replication_slots + One row per replication slot, showing statistics about + replication slot usage. + See + pg_stat_replication_slot for details. + + + pg_stat_wal_receiverpg_stat_wal_receiver Only one row, showing statistics about the WAL receiver from @@ -2522,12 +2531,6 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i mechanism will simply display NULL lag. - - Tracking of spilled transactions works only for logical replication. In - physical replication, the tracking mechanism will display 0 for spilled - statistics. - - The reported lag times are not predictions of how long it will take for @@ -2549,7 +2552,86 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i - + + <structname>pg_stat_replication_slots</structname> + + + pg_stat_replication_slots + + + + The pg_stat_replication_slots view will contain + one row per replication slot, showing statistics about replication + slot usage. + + + + <structname>pg_stat_replication_slots</structname> View + + + + + Column Type + + + Description + + + + + + + + name text + + + A unique, cluster-wide identifier for the replication slot + + + + + + spill_txns bigint + + + Number of transactions spilled to disk after the memory used by + logical decoding exceeds logical_decoding_work_mem. The + counter gets incremented both for toplevel transactions and + subtransactions. + + + + + + spill_count bigint + + + Number of times transactions were spilled to disk. Transactions + may get spilled repeatedly, and this counter gets incremented on every + such invocation. + + + + + + spill_bytes bigint + + + Amount of decoded transaction data spilled to disk. + + + + +
+ + + Tracking of spilled transactions works only for logical replication. In + physical replication, the tracking mechanism will display 0 for spilled + statistics. + +
+ + <structname>pg_stat_wal_receiver</structname> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5314e9348f..7b1916f44a 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -785,14 +785,19 @@ CREATE VIEW pg_stat_replication AS W.replay_lag, W.sync_priority, W.sync_state, - W.reply_time, - W.spill_txns, - W.spill_count, - W.spill_bytes + W.reply_time FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); +CREATE VIEW pg_stat_replication_slots AS + SELECT + s.name, + s.spill_txns, + s.spill_count, + s.spill_bytes + FROM pg_stat_get_replication_slots() AS s; + CREATE VIEW pg_stat_slru AS SELECT s.name, diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index c022597bc0..5a38142a3b 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -51,6 +51,7 @@ #include "postmaster/fork_process.h" #include "postmaster/interrupt.h" #include "postmaster/postmaster.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/backendid.h" #include "storage/dsm.h" @@ -282,6 +283,8 @@ static int localNumBackends = 0; static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; +static PgStat_ReplSlotStats *replSlotStats; +static int nReplSlotStats; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -340,6 +343,8 @@ static const char *pgstat_get_wait_io(WaitEventIO w); static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype); static void pgstat_send(void *msg, int len); +static int pgstat_replslot_index(const char *name); + static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len); static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len); static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len); @@ -360,6 +365,7 @@ static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int le static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len); static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1629,6 +1635,46 @@ pgstat_report_tempfile(size_t filesize) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_report_replslot() - + * + * Tell the collector about replication slot statistics. + * ---------- + */ +void +pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, + int spillbytes) +{ + PgStat_MsgReplSlot msg; + + /* + * Prepare and send the message + */ + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + memcpy(&msg.m_slotname, slotname, NAMEDATALEN); + msg.m_drop = false; + msg.m_spill_txns = spilltxns; + msg.m_spill_count = spillcount; + msg.m_spill_bytes = spillbytes; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} + +/* ---------- + * pgstat_report_replslot_drop() - + * + * Tell the collector about dropping the replication slot. + * ---------- + */ +void +pgstat_report_replslot_drop(const char *slotname) +{ + PgStat_MsgReplSlot msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + memcpy(&msg.m_slotname, slotname, NAMEDATALEN); + msg.m_drop = true; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} /* ---------- * pgstat_ping() - @@ -2691,6 +2737,23 @@ pgstat_fetch_slru(void) return slruStats; } +/* + * --------- + * pgstat_fetch_replslot() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the replication slot statistics struct and set the number + * entries to nslots_p. + * --------- + */ +PgStat_ReplSlotStats * +pgstat_fetch_replslot(int *nslots_p) +{ + backend_read_statsfile(); + + *nslots_p = nReplSlotStats; + return replSlotStats; +} /* ------------------------------------------------------------ * Functions for management of the shared-memory PgBackendStatus array @@ -4665,6 +4728,10 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_REPLSLOT: + pgstat_recv_replslot(&msg.msg_replslot, len); + break; + default: break; } @@ -4868,6 +4935,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; int rc; + int i; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -4914,6 +4982,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) rc = fwrite(slruStats, sizeof(slruStats), 1, fpout); (void) rc; /* we'll check for error with ferror */ + /* + * Write replication slot stats struct + */ + for (i = 0; i < nReplSlotStats; i++) + { + fputc('R', fpout); + rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + /* * Walk through the database table. */ @@ -5166,6 +5244,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* Allocate the space for replication slot statistics */ + replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats)); + nReplSlotStats = 0; + /* * Clear out global and archiver statistics so they start from zero in * case we can't load an existing statsfile. @@ -5350,6 +5432,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; + /* + * 'R' A PgStat_ReplSlotStats struct describing a replication slot + * follows. + */ + case 'R': + if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin) + != sizeof(PgStat_ReplSlotStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); + goto done; + } + nReplSlotStats++; + break; + case 'E': goto done; @@ -5559,6 +5658,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_GlobalStats myGlobalStats; PgStat_ArchiverStats myArchiverStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; + PgStat_ReplSlotStats myReplSlotStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -5661,6 +5761,21 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, break; + /* + * 'R' A PgStat_ReplSlotStats struct describing a replication slot + * follows. + */ + case 'R': + if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin) + != sizeof(PgStat_ReplSlotStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + break; + case 'E': goto done; @@ -6494,6 +6609,59 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len) dbentry->last_checksum_failure = msg->m_failure_time; } +/* + * pgstat_replslot_index + * + * Determine index of entry for a replication slot with a given name. + */ +static int +pgstat_replslot_index(const char *name) +{ + int i; + + Assert(nReplSlotStats <= max_replication_slots); + for (i = 0; i < nReplSlotStats; i++) + { + if (strcmp(replSlotStats[i].slotname, name) == 0) + return i; /* found */ + } + + /* not found, register new slot */ + memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); + memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN); + return nReplSlotStats++; +} + +/* ---------- + * pgstat_recv_replslot() - + * + * Process a REPLSLOT message. + * ---------- + */ +static void +pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) +{ + int idx; + + idx = pgstat_replslot_index(msg->m_slotname); + Assert(idx >= 0 && idx < max_replication_slots); + + if (msg->m_drop) + { + /* Remove the replication slot statistics with the given name */ + memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1], + sizeof(PgStat_ReplSlotStats)); + nReplSlotStats--; + } + else + { + /* Update the replication slot statistics */ + replSlotStats[idx].spill_txns += msg->m_spill_txns; + replSlotStats[idx].spill_count += msg->m_spill_count; + replSlotStats[idx].spill_bytes += msg->m_spill_bytes; + } +} + /* ---------- * pgstat_recv_tempfile() - * diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 61902be3b0..378c78f896 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -32,6 +32,7 @@ #include "access/xlog_internal.h" #include "fmgr.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/origin.h" @@ -66,6 +67,7 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, const char *prefix, Size message_size, const char *message); static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); +static void UpdateSpillStats(LogicalDecodingContext *ctx); /* * Make sure the current settings & environment are capable of doing logical @@ -721,6 +723,11 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + /* + * Update statistics about transactions that spilled to disk. + */ + UpdateSpillStats(ctx); } static void @@ -1091,3 +1098,21 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +static void +UpdateSpillStats(LogicalDecodingContext *ctx) +{ + ReorderBuffer *rb = ctx->reorder; + + elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld", + rb, + (long long) rb->spillTxns, + (long long) rb->spillCount, + (long long) rb->spillBytes); + + pgstat_report_replslot(NameStr(ctx->slot->data.name), + rb->spillTxns, rb->spillCount, rb->spillBytes); + rb->spillTxns = 0; + rb->spillCount = 0; + rb->spillBytes = 0; +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 57bbb6288c..a677365945 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -322,6 +322,9 @@ ReplicationSlotCreate(const char *name, bool db_specific, /* Let everybody know we've modified this slot */ ConditionVariableBroadcast(&slot->active_cv); + + /* Create statistics entry for the new slot */ + pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0); } /* @@ -682,6 +685,13 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) ereport(WARNING, (errmsg("could not remove directory \"%s\"", tmppath))); + /* + * Report to drop the replication slot to stats collector. This is done + * while holding ReplicationSlotAllocationLock to avoid a race condition + * where the same name slot is created before purging the old statistics. + */ + pgstat_report_replslot_drop(NameStr(slot->data.name)); + /* * We release this at the very end, so that nobody starts trying to create * a slot while we're still cleaning up the detritus of the old one. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e2477c47e0..3815776441 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -254,8 +254,6 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); -static void UpdateSpillStats(LogicalDecodingContext *ctx); - /* Initialize walsender process before entering the main command loop */ void @@ -1368,11 +1366,6 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId LagTrackerWrite(lsn, now); sendTime = now; - - /* - * Update statistics about transactions that spilled to disk. - */ - UpdateSpillStats(ctx); } /* @@ -2418,9 +2411,6 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; - walsnd->spillTxns = 0; - walsnd->spillCount = 0; - walsnd->spillBytes = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3256,7 +3246,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 15 +#define PG_STAT_GET_WAL_SENDERS_COLS 12 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3310,9 +3300,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int pid; WalSndState state; TimestampTz replyTime; - int64 spillTxns; - int64 spillCount; - int64 spillBytes; bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -3336,9 +3323,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; replyTime = walsnd->replyTime; - spillTxns = walsnd->spillTxns; - spillCount = walsnd->spillCount; - spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); /* @@ -3436,11 +3420,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[11] = true; else values[11] = TimestampTzGetDatum(replyTime); - - /* spill to disk */ - values[12] = Int64GetDatum(spillTxns); - values[13] = Int64GetDatum(spillCount); - values[14] = Int64GetDatum(spillBytes); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -3677,21 +3656,3 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) Assert(time != 0); return now - time; } - -static void -UpdateSpillStats(LogicalDecodingContext *ctx) -{ - ReorderBuffer *rb = ctx->reorder; - - elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld", - rb, - (long long) rb->spillTxns, - (long long) rb->spillCount, - (long long) rb->spillBytes); - - SpinLockAcquire(&MyWalSnd->mutex); - MyWalSnd->spillTxns = rb->spillTxns; - MyWalSnd->spillCount = rb->spillCount; - MyWalSnd->spillBytes = rb->spillBytes; - SpinLockRelease(&MyWalSnd->mutex); -} diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 2aff739466..7cb186edea 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2092,3 +2092,63 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +Datum +pg_stat_get_replication_slots(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_REPLICATION_SLOT_CLOS 4 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + PgStat_ReplSlotStats *stats; + int nstats; + int i; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + stats = pgstat_fetch_replslot(&nstats); + for (i = 0; i < nstats; i++) + { + Datum values[PG_STAT_GET_REPLICATION_SLOT_CLOS]; + bool nulls[PG_STAT_GET_REPLICATION_SLOT_CLOS]; + PgStat_ReplSlotStats stat = stats[i]; + + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = PointerGetDatum(cstring_to_text(stat.slotname)); + values[1] = Int64GetDatum(stat.spill_txns); + values[2] = Int64GetDatum(stat.spill_count); + values[3] = Int64GetDatum(stat.spill_bytes); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 61f2c2f5b4..af2df204e0 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5237,9 +5237,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', @@ -5481,6 +5481,15 @@ proargnames => '{name,blks_zeroed,blks_hit,blks_read,blks_written,blks_exists,flushes,truncates,stats_reset}', prosrc => 'pg_stat_get_slru' }, +{ oid => '8595', descr => 'statistics: information about replication slots', + proname => 'pg_stat_get_replication_slots', prorows => '100', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => '', + proallargtypes => '{text,int8,int8,int8}', + proargmodes => '{o,o,o,o}', + proargnames => '{name,spill_txns,spill_count,spill_bytes}', + prosrc => 'pg_stat_get_replication_slots' }, + { oid => '2978', descr => 'statistics: number of function calls', proname => 'pg_stat_get_function_calls', provolatile => 's', proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 1387201382..96824735ff 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -67,7 +67,8 @@ typedef enum StatMsgType PGSTAT_MTYPE_RECOVERYCONFLICT, PGSTAT_MTYPE_TEMPFILE, PGSTAT_MTYPE_DEADLOCK, - PGSTAT_MTYPE_CHECKSUMFAILURE + PGSTAT_MTYPE_CHECKSUMFAILURE, + PGSTAT_MTYPE_REPLSLOT, } StatMsgType; /* ---------- @@ -453,6 +454,22 @@ typedef struct PgStat_MsgSLRU PgStat_Counter m_truncate; } PgStat_MsgSLRU; +/* ---------- + * PgStat_MsgReplSlot Sent by a backend or a wal sender to update replication + * slot statistics. + * ---------- + */ +typedef struct PgStat_MsgReplSlot +{ + PgStat_MsgHdr m_hdr; + char m_slotname[NAMEDATALEN]; + bool m_drop; + PgStat_Counter m_spill_txns; + PgStat_Counter m_spill_count; + PgStat_Counter m_spill_bytes; +} PgStat_MsgReplSlot; + + /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict * ---------- @@ -603,6 +620,7 @@ typedef union PgStat_Msg PgStat_MsgDeadlock msg_deadlock; PgStat_MsgTempFile msg_tempfile; PgStat_MsgChecksumFailure msg_checksumfailure; + PgStat_MsgReplSlot msg_replslot; } PgStat_Msg; @@ -760,6 +778,16 @@ typedef struct PgStat_SLRUStats TimestampTz stat_reset_timestamp; } PgStat_SLRUStats; +/* + * Replication slot statistics kept in the stats collector + */ +typedef struct PgStat_ReplSlotStats +{ + char slotname[NAMEDATALEN]; + PgStat_Counter spill_txns; + PgStat_Counter spill_count; + PgStat_Counter spill_bytes; +} PgStat_ReplSlotStats; /* ---------- * Backend states @@ -1310,6 +1338,9 @@ extern void pgstat_report_recovery_conflict(int reason); extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); +extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, + int spillbytes); +extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); extern void pgstat_bestart(void); @@ -1474,6 +1505,7 @@ extern int pgstat_fetch_stat_numbackends(void); extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); +extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p); extern void pgstat_count_slru_page_zeroed(int slru_idx); extern void pgstat_count_slru_page_hit(int slru_idx); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 734acec2a4..509856c057 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -78,11 +78,6 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; - - /* Statistics for transactions spilled to disk. */ - int64 spillTxns; - int64 spillCount; - int64 spillBytes; } WalSnd; extern WalSnd *MyWalSnd; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index b813e32215..e15fe2b8b7 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2002,13 +2002,15 @@ pg_stat_replication| SELECT s.pid, w.replay_lag, w.sync_priority, w.sync_state, - w.reply_time, - w.spill_txns, - w.spill_count, - w.spill_bytes + w.reply_time FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); +pg_stat_replication_slots| SELECT s.name, + s.spill_txns, + s.spill_count, + s.spill_bytes + FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit,