diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index dfa9d0d641..e17f2de3d1 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -314,6 +314,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
+
+ pg_stat_replication_slotspg_stat_replication_slots
+ One row per replication slot, showing statistics about
+ replication slot usage.
+ See
+ pg_stat_replication_slot for details.
+
+
+
pg_stat_wal_receiverpg_stat_wal_receiverOnly one row, showing statistics about the WAL receiver from
@@ -2522,12 +2531,6 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
mechanism will simply display NULL lag.
-
- Tracking of spilled transactions works only for logical replication. In
- physical replication, the tracking mechanism will display 0 for spilled
- statistics.
-
-
The reported lag times are not predictions of how long it will take for
@@ -2549,7 +2552,86 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
-
+
+ pg_stat_replication_slots
+
+
+ pg_stat_replication_slots
+
+
+
+ The pg_stat_replication_slots view will contain
+ one row per replication slot, showing statistics about replication
+ slot usage.
+
+
+
+ pg_stat_replication_slots View
+
+
+
+
+ Column Type
+
+
+ Description
+
+
+
+
+
+
+
+ nametext
+
+
+ A unique, cluster-wide identifier for the replication slot
+
+
+
+
+
+ spill_txnsbigint
+
+
+ Number of transactions spilled to disk after the memory used by
+ logical decoding exceeds logical_decoding_work_mem. The
+ counter gets incremented both for toplevel transactions and
+ subtransactions.
+
+
+
+
+
+ spill_countbigint
+
+
+ Number of times transactions were spilled to disk. Transactions
+ may get spilled repeatedly, and this counter gets incremented on every
+ such invocation.
+
+
+
+
+
+ spill_bytesbigint
+
+
+ Amount of decoded transaction data spilled to disk.
+
+
+
+
+
+
+
+ Tracking of spilled transactions works only for logical replication. In
+ physical replication, the tracking mechanism will display 0 for spilled
+ statistics.
+
+
+
+ pg_stat_wal_receiver
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 5314e9348f..7b1916f44a 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -785,14 +785,19 @@ CREATE VIEW pg_stat_replication AS
W.replay_lag,
W.sync_priority,
W.sync_state,
- W.reply_time,
- W.spill_txns,
- W.spill_count,
- W.spill_bytes
+ W.reply_time
FROM pg_stat_get_activity(NULL) AS S
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
+CREATE VIEW pg_stat_replication_slots AS
+ SELECT
+ s.name,
+ s.spill_txns,
+ s.spill_count,
+ s.spill_bytes
+ FROM pg_stat_get_replication_slots() AS s;
+
CREATE VIEW pg_stat_slru AS
SELECT
s.name,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index c022597bc0..5a38142a3b 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -51,6 +51,7 @@
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
+#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/backendid.h"
#include "storage/dsm.h"
@@ -282,6 +283,8 @@ static int localNumBackends = 0;
static PgStat_ArchiverStats archiverStats;
static PgStat_GlobalStats globalStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
+static PgStat_ReplSlotStats *replSlotStats;
+static int nReplSlotStats;
/*
* List of OIDs of databases we need to write out. If an entry is InvalidOid,
@@ -340,6 +343,8 @@ static const char *pgstat_get_wait_io(WaitEventIO w);
static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
static void pgstat_send(void *msg, int len);
+static int pgstat_replslot_index(const char *name);
+
static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
@@ -360,6 +365,7 @@ static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int le
static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
/* ------------------------------------------------------------
* Public functions called from postmaster follow
@@ -1629,6 +1635,46 @@ pgstat_report_tempfile(size_t filesize)
pgstat_send(&msg, sizeof(msg));
}
+/* ----------
+ * pgstat_report_replslot() -
+ *
+ * Tell the collector about replication slot statistics.
+ * ----------
+ */
+void
+pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
+ int spillbytes)
+{
+ PgStat_MsgReplSlot msg;
+
+ /*
+ * Prepare and send the message
+ */
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+ msg.m_drop = false;
+ msg.m_spill_txns = spilltxns;
+ msg.m_spill_count = spillcount;
+ msg.m_spill_bytes = spillbytes;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
+/* ----------
+ * pgstat_report_replslot_drop() -
+ *
+ * Tell the collector about dropping the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_drop(const char *slotname)
+{
+ PgStat_MsgReplSlot msg;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+ msg.m_drop = true;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
/* ----------
* pgstat_ping() -
@@ -2691,6 +2737,23 @@ pgstat_fetch_slru(void)
return slruStats;
}
+/*
+ * ---------
+ * pgstat_fetch_replslot() -
+ *
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * a pointer to the replication slot statistics struct and set the number
+ * entries to nslots_p.
+ * ---------
+ */
+PgStat_ReplSlotStats *
+pgstat_fetch_replslot(int *nslots_p)
+{
+ backend_read_statsfile();
+
+ *nslots_p = nReplSlotStats;
+ return replSlotStats;
+}
/* ------------------------------------------------------------
* Functions for management of the shared-memory PgBackendStatus array
@@ -4665,6 +4728,10 @@ PgstatCollectorMain(int argc, char *argv[])
len);
break;
+ case PGSTAT_MTYPE_REPLSLOT:
+ pgstat_recv_replslot(&msg.msg_replslot, len);
+ break;
+
default:
break;
}
@@ -4868,6 +4935,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
int rc;
+ int i;
elog(DEBUG2, "writing stats file \"%s\"", statfile);
@@ -4914,6 +4982,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
(void) rc; /* we'll check for error with ferror */
+ /*
+ * Write replication slot stats struct
+ */
+ for (i = 0; i < nReplSlotStats; i++)
+ {
+ fputc('R', fpout);
+ rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
+
/*
* Walk through the database table.
*/
@@ -5166,6 +5244,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* Allocate the space for replication slot statistics */
+ replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats));
+ nReplSlotStats = 0;
+
/*
* Clear out global and archiver statistics so they start from zero in
* case we can't load an existing statsfile.
@@ -5350,6 +5432,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break;
+ /*
+ * 'R' A PgStat_ReplSlotStats struct describing a replication slot
+ * follows.
+ */
+ case 'R':
+ if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
+ != sizeof(PgStat_ReplSlotStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+ goto done;
+ }
+ nReplSlotStats++;
+ break;
+
case 'E':
goto done;
@@ -5559,6 +5658,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_GlobalStats myGlobalStats;
PgStat_ArchiverStats myArchiverStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
+ PgStat_ReplSlotStats myReplSlotStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -5661,6 +5761,21 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
break;
+ /*
+ * 'R' A PgStat_ReplSlotStats struct describing a replication slot
+ * follows.
+ */
+ case 'R':
+ if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
+ != sizeof(PgStat_ReplSlotStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+ break;
+
case 'E':
goto done;
@@ -6494,6 +6609,59 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
dbentry->last_checksum_failure = msg->m_failure_time;
}
+/*
+ * pgstat_replslot_index
+ *
+ * Determine index of entry for a replication slot with a given name.
+ */
+static int
+pgstat_replslot_index(const char *name)
+{
+ int i;
+
+ Assert(nReplSlotStats <= max_replication_slots);
+ for (i = 0; i < nReplSlotStats; i++)
+ {
+ if (strcmp(replSlotStats[i].slotname, name) == 0)
+ return i; /* found */
+ }
+
+ /* not found, register new slot */
+ memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+ memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
+ return nReplSlotStats++;
+}
+
+/* ----------
+ * pgstat_recv_replslot() -
+ *
+ * Process a REPLSLOT message.
+ * ----------
+ */
+static void
+pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
+{
+ int idx;
+
+ idx = pgstat_replslot_index(msg->m_slotname);
+ Assert(idx >= 0 && idx < max_replication_slots);
+
+ if (msg->m_drop)
+ {
+ /* Remove the replication slot statistics with the given name */
+ memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
+ sizeof(PgStat_ReplSlotStats));
+ nReplSlotStats--;
+ }
+ else
+ {
+ /* Update the replication slot statistics */
+ replSlotStats[idx].spill_txns += msg->m_spill_txns;
+ replSlotStats[idx].spill_count += msg->m_spill_count;
+ replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+ }
+}
+
/* ----------
* pgstat_recv_tempfile() -
*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 61902be3b0..378c78f896 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -32,6 +32,7 @@
#include "access/xlog_internal.h"
#include "fmgr.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/origin.h"
@@ -66,6 +67,7 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
const char *prefix, Size message_size, const char *message);
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
+static void UpdateSpillStats(LogicalDecodingContext *ctx);
/*
* Make sure the current settings & environment are capable of doing logical
@@ -721,6 +723,11 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
/* Pop the error context stack */
error_context_stack = errcallback.previous;
+
+ /*
+ * Update statistics about transactions that spilled to disk.
+ */
+ UpdateSpillStats(ctx);
}
static void
@@ -1091,3 +1098,21 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
SpinLockRelease(&MyReplicationSlot->mutex);
}
}
+
+static void
+UpdateSpillStats(LogicalDecodingContext *ctx)
+{
+ ReorderBuffer *rb = ctx->reorder;
+
+ elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
+ rb,
+ (long long) rb->spillTxns,
+ (long long) rb->spillCount,
+ (long long) rb->spillBytes);
+
+ pgstat_report_replslot(NameStr(ctx->slot->data.name),
+ rb->spillTxns, rb->spillCount, rb->spillBytes);
+ rb->spillTxns = 0;
+ rb->spillCount = 0;
+ rb->spillBytes = 0;
+}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 57bbb6288c..a677365945 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -322,6 +322,9 @@ ReplicationSlotCreate(const char *name, bool db_specific,
/* Let everybody know we've modified this slot */
ConditionVariableBroadcast(&slot->active_cv);
+
+ /* Create statistics entry for the new slot */
+ pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
}
/*
@@ -682,6 +685,13 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
ereport(WARNING,
(errmsg("could not remove directory \"%s\"", tmppath)));
+ /*
+ * Report to drop the replication slot to stats collector. This is done
+ * while holding ReplicationSlotAllocationLock to avoid a race condition
+ * where the same name slot is created before purging the old statistics.
+ */
+ pgstat_report_replslot_drop(NameStr(slot->data.name));
+
/*
* We release this at the very end, so that nobody starts trying to create
* a slot while we're still cleaning up the detritus of the old one.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e2477c47e0..3815776441 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -254,8 +254,6 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p);
-static void UpdateSpillStats(LogicalDecodingContext *ctx);
-
/* Initialize walsender process before entering the main command loop */
void
@@ -1368,11 +1366,6 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
LagTrackerWrite(lsn, now);
sendTime = now;
-
- /*
- * Update statistics about transactions that spilled to disk.
- */
- UpdateSpillStats(ctx);
}
/*
@@ -2418,9 +2411,6 @@ InitWalSenderSlot(void)
walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
- walsnd->spillTxns = 0;
- walsnd->spillCount = 0;
- walsnd->spillBytes = 0;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
@@ -3256,7 +3246,7 @@ offset_to_interval(TimeOffset offset)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 15
+#define PG_STAT_GET_WAL_SENDERS_COLS 12
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -3310,9 +3300,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int pid;
WalSndState state;
TimestampTz replyTime;
- int64 spillTxns;
- int64 spillCount;
- int64 spillBytes;
bool is_sync_standby;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -3336,9 +3323,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
applyLag = walsnd->applyLag;
priority = walsnd->sync_standby_priority;
replyTime = walsnd->replyTime;
- spillTxns = walsnd->spillTxns;
- spillCount = walsnd->spillCount;
- spillBytes = walsnd->spillBytes;
SpinLockRelease(&walsnd->mutex);
/*
@@ -3436,11 +3420,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls[11] = true;
else
values[11] = TimestampTzGetDatum(replyTime);
-
- /* spill to disk */
- values[12] = Int64GetDatum(spillTxns);
- values[13] = Int64GetDatum(spillCount);
- values[14] = Int64GetDatum(spillBytes);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -3677,21 +3656,3 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Assert(time != 0);
return now - time;
}
-
-static void
-UpdateSpillStats(LogicalDecodingContext *ctx)
-{
- ReorderBuffer *rb = ctx->reorder;
-
- elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
- rb,
- (long long) rb->spillTxns,
- (long long) rb->spillCount,
- (long long) rb->spillBytes);
-
- SpinLockAcquire(&MyWalSnd->mutex);
- MyWalSnd->spillTxns = rb->spillTxns;
- MyWalSnd->spillCount = rb->spillCount;
- MyWalSnd->spillBytes = rb->spillBytes;
- SpinLockRelease(&MyWalSnd->mutex);
-}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 2aff739466..7cb186edea 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2092,3 +2092,63 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
+
+Datum
+pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 4
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ PgStat_ReplSlotStats *stats;
+ int nstats;
+ int i;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ stats = pgstat_fetch_replslot(&nstats);
+ for (i = 0; i < nstats; i++)
+ {
+ Datum values[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+ bool nulls[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+ PgStat_ReplSlotStats stat = stats[i];
+
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ values[0] = PointerGetDatum(cstring_to_text(stat.slotname));
+ values[1] = Int64GetDatum(stat.spill_txns);
+ values[2] = Int64GetDatum(stat.spill_count);
+ values[3] = Int64GetDatum(stat.spill_bytes);
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 61f2c2f5b4..af2df204e0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5237,9 +5237,9 @@
proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes}',
+ proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
prosrc => 'pg_stat_get_wal_senders' },
{ oid => '3317', descr => 'statistics: information about WAL receiver',
proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
@@ -5481,6 +5481,15 @@
proargnames => '{name,blks_zeroed,blks_hit,blks_read,blks_written,blks_exists,flushes,truncates,stats_reset}',
prosrc => 'pg_stat_get_slru' },
+{ oid => '8595', descr => 'statistics: information about replication slots',
+ proname => 'pg_stat_get_replication_slots', prorows => '100', proisstrict => 'f',
+ proretset => 't', provolatile => 's', proparallel => 'r',
+ prorettype => 'record', proargtypes => '',
+ proallargtypes => '{text,int8,int8,int8}',
+ proargmodes => '{o,o,o,o}',
+ proargnames => '{name,spill_txns,spill_count,spill_bytes}',
+ prosrc => 'pg_stat_get_replication_slots' },
+
{ oid => '2978', descr => 'statistics: number of function calls',
proname => 'pg_stat_get_function_calls', provolatile => 's',
proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 1387201382..96824735ff 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -67,7 +67,8 @@ typedef enum StatMsgType
PGSTAT_MTYPE_RECOVERYCONFLICT,
PGSTAT_MTYPE_TEMPFILE,
PGSTAT_MTYPE_DEADLOCK,
- PGSTAT_MTYPE_CHECKSUMFAILURE
+ PGSTAT_MTYPE_CHECKSUMFAILURE,
+ PGSTAT_MTYPE_REPLSLOT,
} StatMsgType;
/* ----------
@@ -453,6 +454,22 @@ typedef struct PgStat_MsgSLRU
PgStat_Counter m_truncate;
} PgStat_MsgSLRU;
+/* ----------
+ * PgStat_MsgReplSlot Sent by a backend or a wal sender to update replication
+ * slot statistics.
+ * ----------
+ */
+typedef struct PgStat_MsgReplSlot
+{
+ PgStat_MsgHdr m_hdr;
+ char m_slotname[NAMEDATALEN];
+ bool m_drop;
+ PgStat_Counter m_spill_txns;
+ PgStat_Counter m_spill_count;
+ PgStat_Counter m_spill_bytes;
+} PgStat_MsgReplSlot;
+
+
/* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
* ----------
@@ -603,6 +620,7 @@ typedef union PgStat_Msg
PgStat_MsgDeadlock msg_deadlock;
PgStat_MsgTempFile msg_tempfile;
PgStat_MsgChecksumFailure msg_checksumfailure;
+ PgStat_MsgReplSlot msg_replslot;
} PgStat_Msg;
@@ -760,6 +778,16 @@ typedef struct PgStat_SLRUStats
TimestampTz stat_reset_timestamp;
} PgStat_SLRUStats;
+/*
+ * Replication slot statistics kept in the stats collector
+ */
+typedef struct PgStat_ReplSlotStats
+{
+ char slotname[NAMEDATALEN];
+ PgStat_Counter spill_txns;
+ PgStat_Counter spill_count;
+ PgStat_Counter spill_bytes;
+} PgStat_ReplSlotStats;
/* ----------
* Backend states
@@ -1310,6 +1338,9 @@ extern void pgstat_report_recovery_conflict(int reason);
extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
+extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
+ int spillbytes);
+extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
extern void pgstat_bestart(void);
@@ -1474,6 +1505,7 @@ extern int pgstat_fetch_stat_numbackends(void);
extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
extern PgStat_GlobalStats *pgstat_fetch_global(void);
extern PgStat_SLRUStats *pgstat_fetch_slru(void);
+extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
extern void pgstat_count_slru_page_zeroed(int slru_idx);
extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 734acec2a4..509856c057 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -78,11 +78,6 @@ typedef struct WalSnd
* Timestamp of the last message received from standby.
*/
TimestampTz replyTime;
-
- /* Statistics for transactions spilled to disk. */
- int64 spillTxns;
- int64 spillCount;
- int64 spillBytes;
} WalSnd;
extern WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index b813e32215..e15fe2b8b7 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2002,13 +2002,15 @@ pg_stat_replication| SELECT s.pid,
w.replay_lag,
w.sync_priority,
w.sync_state,
- w.reply_time,
- w.spill_txns,
- w.spill_count,
- w.spill_bytes
+ w.reply_time
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
- JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid)))
+ JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
+pg_stat_replication_slots| SELECT s.name,
+ s.spill_txns,
+ s.spill_count,
+ s.spill_bytes
+ FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,