From 62920acda9a9fd96d564d9ca02965e8653e2be98 Mon Sep 17 00:00:00 2001 From: "MyungKyu, Lim" Date: Thu, 2 Aug 2018 18:05:19 +0900 Subject: [PATCH] Implement following TODO list item : Add entry creation timestamp column to pg_stat_replication mailing list : http://archives.postgresql.org/pgsql-hackers/2011-08/msg00694.php --- src/backend/catalog/system_views.sql | 3 ++- src/backend/replication/walsender.c | 25 +++++++++++++++++++++---- src/include/catalog/pg_proc.dat | 6 +++--- src/include/replication/walsender_private.h | 5 +++++ src/test/regress/expected/rules.out | 5 +++-- 5 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 7251552..4e30eeb 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS W.flush_lag, W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.last_msg_send_time FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c83ff3b..a241ec6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1775,6 +1775,8 @@ ProcessStandbyReplyMessage(void) applyLag; bool clearLagTimes; TimestampTz now; + TimestampTz sendTimestamp; + char *sendTime; static bool fullyAppliedLastTime = false; @@ -1782,14 +1784,20 @@ ProcessStandbyReplyMessage(void) writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + sendTimestamp = pq_getmsgint64(&reply_message); /* sendTime */ replyRequested = pq_getmsgbyte(&reply_message); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", + /* Copy because timestamptz_to_str returns a static buffer */ + sendTime = pstrdup(timestamptz_to_str(sendTimestamp)); + + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s replytime %s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, (uint32) (applyPtr >> 32), (uint32) applyPtr, - replyRequested ? " (reply requested)" : ""); + replyRequested ? " (reply requested)" : "", + sendTime); + + pfree(sendTime); /* See if we can compute the round-trip lag for these positions. */ now = GetCurrentTimestamp(); @@ -1836,6 +1844,7 @@ ProcessStandbyReplyMessage(void) walsnd->flushLag = flushLag; if (applyLag != -1 || clearLagTimes) walsnd->applyLag = applyLag; + walsnd->lastMsgSendTime = sendTimestamp; SpinLockRelease(&walsnd->mutex); } @@ -2276,6 +2285,7 @@ InitWalSenderSlot(void) walsnd->applyLag = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; + walsnd->lastMsgSendTime = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3194,7 +3204,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 11 +#define PG_STAT_GET_WAL_SENDERS_COLS 12 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3248,6 +3258,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int priority; int pid; WalSndState state; + TimestampTz last_send_time; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -3267,6 +3278,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) flushLag = walsnd->flushLag; applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; + last_send_time = walsnd->lastMsgSendTime; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); @@ -3343,6 +3355,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[10] = CStringGetTextDatum("potential"); + + if (last_send_time == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(last_send_time); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a146510..d09533c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5181,9 +5181,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,last_msg_send_time}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 4b90477..057bcb4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -75,6 +75,11 @@ typedef struct WalSnd * SyncRepLock. */ int sync_standby_priority; + + /* + * timestamp of the latest message, reported by standby server + */ + TimestampTz lastMsgSendTime; } WalSnd; extern WalSnd *MyWalSnd; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 078129f..40220cb 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid, w.flush_lag, w.replay_lag, w.sync_priority, - w.sync_state + w.sync_state, + w.last_msg_send_time FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, last_msg_send_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_ssl| SELECT s.pid, s.ssl, -- 1.8.3.1