From 94be15a2f7893f7decafaa4a20eff194778a7194 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 15 May 2020 14:07:04 +0530 Subject: [PATCH v21 07/12] Track statistics for streaming --- doc/src/sgml/monitoring.sgml | 33 +++++++++++++++++++ src/backend/catalog/system_views.sql | 5 ++- .../replication/logical/reorderbuffer.c | 13 ++++++++ src/backend/replication/walsender.c | 32 +++++++++++++++--- src/include/catalog/pg_proc.dat | 6 ++-- src/include/replication/reorderbuffer.h | 13 +++++--- src/include/replication/walsender_private.h | 5 +++ src/test/regress/expected/rules.out | 7 ++-- 8 files changed, 99 insertions(+), 15 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 87502a49b6..5b64410e7d 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2404,6 +2404,39 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i Amount of decoded transaction data spilled to disk. + + + + stream_txns bigint + + + Number of in-progress transactions streamed to subscriber after + memory used by logical decoding exceeds logical_work_mem. + Streaming only works with toplevel transactions (subtransactions can't + be streamed independently), so the counter does not get incremented for + subtransactions. + + + + + + stream_count bigint + + + Number of times in-progress transactions were streamed to subscriber. + Transactions may get streamed repeatedly, and this counter gets incremented + on every such invocation. + + + + + + stream_bytes bigint + + + Amount of decoded in-progress transaction data streamed to subscriber. + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2bd5f5ea14..8f34ce8deb 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -788,7 +788,10 @@ CREATE VIEW pg_stat_replication AS W.reply_time, W.spill_txns, W.spill_count, - W.spill_bytes + W.spill_bytes, + W.stream_txns, + W.stream_count, + W.stream_bytes FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e7249f874d..c2a012dd3c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -332,6 +332,10 @@ ReorderBufferAllocate(void) buffer->spillTxns = 0; buffer->spillBytes = 0; + buffer->streamCount = 0; + buffer->streamTxns = 0; + buffer->streamBytes = 0; + buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; dlist_init(&buffer->toplevel_by_lsn); @@ -3323,6 +3327,15 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); + /* + * Update the stream statistics. + */ + rb->streamCount += 1; + rb->streamBytes += txn->size; + + /* Don't consider already streamed transaction. */ + rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1; + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 6def1b96c9..5d23691930 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1359,7 +1359,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, * LogicalDecodingContext 'update_progress' callback. * * Write the current position to the lag tracker (see XLogSendPhysical), - * and update the spill statistics. + * and update the spill/stream statistics. */ static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) @@ -1380,7 +1380,8 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId sendTime = now; /* - * Update statistics about transactions that spilled to disk. + * Update statistics about transactions that spilled to disk or streamed to + * subscriber (before being committed). */ UpdateSpillStats(ctx); } @@ -2429,6 +2430,9 @@ InitWalSenderSlot(void) walsnd->spillTxns = 0; walsnd->spillCount = 0; walsnd->spillBytes = 0; + walsnd->streamTxns = 0; + walsnd->streamCount = 0; + walsnd->streamBytes = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3264,7 +3268,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 15 +#define PG_STAT_GET_WAL_SENDERS_COLS 18 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3322,6 +3326,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillCount; int64 spillBytes; bool is_sync_standby; + int64 streamTxns; + int64 streamCount; + int64 streamBytes; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; int j; @@ -3347,6 +3354,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillTxns = walsnd->spillTxns; spillCount = walsnd->spillCount; spillBytes = walsnd->spillBytes; + streamTxns = walsnd->streamTxns; + streamCount = walsnd->streamCount; + streamBytes = walsnd->streamBytes; SpinLockRelease(&walsnd->mutex); /* @@ -3449,6 +3459,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) values[12] = Int64GetDatum(spillTxns); values[13] = Int64GetDatum(spillCount); values[14] = Int64GetDatum(spillBytes); + + /* stream over-sized transactions */ + values[15] = Int64GetDatum(streamTxns); + values[16] = Int64GetDatum(streamCount); + values[17] = Int64GetDatum(streamBytes); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -3697,11 +3712,18 @@ UpdateSpillStats(LogicalDecodingContext *ctx) MyWalSnd->spillCount = rb->spillCount; MyWalSnd->spillBytes = rb->spillBytes; - elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld", + MyWalSnd->streamTxns = rb->streamTxns; + MyWalSnd->streamCount = rb->streamCount; + MyWalSnd->streamBytes = rb->streamBytes; + + elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld %lld %lld %lld", rb, (long long) rb->spillTxns, (long long) rb->spillCount, - (long long) rb->spillBytes); + (long long) rb->spillBytes, + (long long) rb->streamTxns, + (long long) rb->streamCount, + (long long) rb->streamBytes); SpinLockRelease(&MyWalSnd->mutex); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9edae40ed8..5a8826cc67 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5237,9 +5237,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8,int8,int8,int8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index da32fbfd1c..34f93d600b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -518,15 +518,20 @@ struct ReorderBuffer Size size; /* - * Statistics about transactions spilled to disk. + * Statistics about transactions streamed or spilled to disk. * - * A single transaction may be spilled repeatedly, which is why we keep - * two different counters. For spilling, the transaction counter includes - * both toplevel transactions and subtransactions. + * A single transaction may be streamed/spilled repeatedly, which is + * why we keep two different counters. For spilling, the transaction + * counter includes both toplevel transactions and subtransactions. + * For streaming, it only includes toplevel transactions (we never + * stream individual subtransactions). */ int64 spillCount; /* spill-to-disk invocation counter */ int64 spillTxns; /* number of transactions spilled to disk */ int64 spillBytes; /* amount of data spilled to disk */ + int64 streamCount; /* streaming invocation counter */ + int64 streamTxns; /* number of transactions spilled to disk */ + int64 streamBytes; /* amount of data streamed to subscriber */ }; diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 734acec2a4..b997d1710e 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -83,6 +83,11 @@ typedef struct WalSnd int64 spillTxns; int64 spillCount; int64 spillBytes; + + /* Statistics for in-progress transactions streamed to subscriber. */ + int64 streamTxns; + int64 streamCount; + int64 streamBytes; } WalSnd; extern WalSnd *MyWalSnd; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 8876025aaa..0c4952a1fa 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2005,9 +2005,12 @@ pg_stat_replication| SELECT s.pid, w.reply_time, w.spill_txns, w.spill_count, - w.spill_bytes + w.spill_bytes, + w.stream_txns, + w.stream_count, + w.stream_bytes FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_slru| SELECT s.name, s.blks_zeroed, -- 2.23.0