From 62a8f97e7362f470ab38c75c36292384fd86d8df Mon Sep 17 00:00:00 2001 From: "MyungKyu, Lim" Date: Tue, 31 Jul 2018 12:03:15 +0900 Subject: [PATCH] Implement following TODO list item : Add entry creation timestamp column to pg_stat_replication mailing list : http://archives.postgresql.org/pgsql-hackers/2011-08/msg00694.php --- src/backend/catalog/system_views.sql | 3 ++- src/backend/replication/walsender.c | 25 +++++++++++++++++++++---- src/include/catalog/pg_proc.dat | 6 +++--- src/include/replication/walsender_private.h | 5 +++++ src/test/regress/expected/rules.out | 5 +++-- 5 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 7251552..eef834b 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS W.flush_lag, W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.reply_time FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d60026d..e8a33a3 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1772,6 +1772,8 @@ ProcessStandbyReplyMessage(void) applyLag; bool clearLagTimes; TimestampTz now; + TimestampTz reply_timestamp; + char *replytime; static bool fullyAppliedLastTime = false; @@ -1779,14 +1781,20 @@ ProcessStandbyReplyMessage(void) writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + reply_timestamp = pq_getmsgint64(&reply_message); /* sendTime */ replyRequested = pq_getmsgbyte(&reply_message); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", + /* Copy because timestamptz_to_str returns a static buffer */ + replytime = pstrdup(timestamptz_to_str(reply_timestamp)); + + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s replytime %s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, (uint32) (applyPtr >> 32), (uint32) applyPtr, - replyRequested ? " (reply requested)" : ""); + replyRequested ? " (reply requested)" : "", + replytime); + + pfree(replytime); /* See if we can compute the round-trip lag for these positions. */ now = GetCurrentTimestamp(); @@ -1833,6 +1841,7 @@ ProcessStandbyReplyMessage(void) walsnd->flushLag = flushLag; if (applyLag != -1 || clearLagTimes) walsnd->applyLag = applyLag; + walsnd->reply_timestamp = reply_timestamp; SpinLockRelease(&walsnd->mutex); } @@ -2273,6 +2282,7 @@ InitWalSenderSlot(void) walsnd->applyLag = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; + walsnd->reply_timestamp = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3191,7 +3201,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 11 +#define PG_STAT_GET_WAL_SENDERS_COLS 12 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3245,6 +3255,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int priority; int pid; WalSndState state; + TimestampTz reply_timestamp; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -3264,6 +3275,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) flushLag = walsnd->flushLag; applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; + reply_timestamp = walsnd->reply_timestamp; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); @@ -3340,6 +3352,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[10] = CStringGetTextDatum("potential"); + + if (reply_timestamp == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(reply_timestamp); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a146510..ef262a2 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5181,9 +5181,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 4b90477..dc53314 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -75,6 +75,11 @@ typedef struct WalSnd * SyncRepLock. */ int sync_standby_priority; + + /* + * The timestamp of reply message when created by the standby. + */ + TimestampTz reply_timestamp; } WalSnd; extern WalSnd *MyWalSnd; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 744d501..6fa62d0 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid, w.flush_lag, w.replay_lag, w.sync_priority, - w.sync_state + w.sync_state, + w.reply_time FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_ssl| SELECT s.pid, s.ssl, -- 1.8.3.1