diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 77667bd..72c5390 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory.
+
diff --git a/doc/src/sgml/ref/waitlsn.sgml b/doc/src/sgml/ref/waitlsn.sgml
new file mode 100644
index 0000000..6a8bdca
--- /dev/null
+++ b/doc/src/sgml/ref/waitlsn.sgml
@@ -0,0 +1,108 @@
+
+
+
+
+ WAITLSN
+
+
+
+ WAITLSN
+ 7
+ SQL - Language Statements
+
+
+
+ WAITLSN
+ wait when target LSN> been replayed
+
+
+
+
+WAITLSN 'LSN' [ , delay ]
+
+
+
+
+ Description
+
+
+ The WAITLSN wait till target LSN> will
+ be replayed with an optional delay> (milliseconds by default
+ infinity) to be wait for LSN to replayed.
+
+
+
+ WAITLSN provides a simple
+ interprocess LSN> wait mechanism for a backends on slave
+ in master-slave replication scheme on PostgreSQL database.
+
+
+
+
+ Parameters
+
+
+
+ LSN
+
+
+ Target log sequence number to be wait for.
+
+
+
+
+ delay
+
+
+ Time in miliseconds to waiting for LSN to be replayed.
+
+
+
+
+
+
+
+ Notes
+
+
+ Delay time to waiting for LSN to be replayed must be integer. For
+ default it is infinity. Waiting can be interupped using Ctl+C, or
+ by Postmaster death.
+
+
+
+
+ Examples
+
+
+ Configure and execute a waitlsn from
+ psql:
+
+
+WAITLSN '0/3F07A6B1', 10000;
+NOTICE: LSN is not reached. Try to make bigger delay.
+WAITLSN
+
+WAITLSN '0/3F07A611';
+WAITLSN
+
+WAITLSN '0/3F0FF791', 500000;
+^CCancel request sent
+NOTICE: LSN is not reached. Try to make bigger delay.
+ERROR: canceling statement due to user request
+
+
+
+
+
+ Compatibility
+
+
+ There is no WAITLSN statement in the SQL
+ standard.
+
+
+
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index 8acdff1..3733ad9 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -200,6 +200,7 @@
&update;
&vacuum;
&values;
+ &waitlsn;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f13f9c1..609c83e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -39,6 +39,7 @@
#include "catalog/pg_control.h"
#include "catalog/pg_database.h"
#include "commands/tablespace.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
@@ -6922,6 +6923,11 @@ StartupXLOG(void)
break;
}
+ /*
+ * After update lastReplayedEndRecPtr set Latches in SHMEM array
+ */
+ WaitLSNSetLatch();
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 6b3742c..091cbe2 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \
policy.o portalcmds.o prepare.o proclang.o \
schemacmds.o seclabel.o sequence.o tablecmds.o tablespace.o trigger.o \
tsearchcmds.o typecmds.o user.o vacuum.o vacuumlazy.o \
- variable.o view.o
+ variable.o view.o waitlsn.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 716f1c3..9ad3275 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -139,7 +139,6 @@
#include "utils/ps_status.h"
#include "utils/timestamp.h"
-
/*
* Maximum size of a NOTIFY payload, including terminating NULL. This
* must be kept small enough so that a notification message fits on one
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 0000000..b441b85
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,195 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ * WaitLSN statment: WAITLSN
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+/*-------------------------------------------------------------------------
+ * Wait for LSN been replayed on slave as of 9.5:
+ * README
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "utils/pg_lsn.h"
+#include "storage/latch.h"
+#include "miscadmin.h"
+#include "storage/spin.h"
+#include "storage/backendid.h"
+#include "access/xact.h"
+#include "storage/shmem.h"
+#include "storage/ipc.h"
+#include "access/xlog_fn.h"
+#include "utils/timestamp.h"
+#include "storage/pmsignal.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "commands/waitlsn.h"
+
+
+/* Latches Own-DisownLatch and AbortCаllBack */
+static uint32 GetSHMEMSize(void);
+static void WLDisownLatchAbort(XactEvent event, void *arg);
+static void WLOwnLatch(void);
+static void WLDisownLatch(void);
+
+void _PG_init(void);
+
+/* Shared memory structures */
+typedef struct
+{
+ int pid;
+ volatile slock_t slock;
+ Latch latch;
+} BIDLatch;
+
+typedef struct
+{
+ int backend_maxid;
+ BIDLatch l_arr[FLEXIBLE_ARRAY_MEMBER];
+} GlobState;
+
+static volatile GlobState *state;
+bool is_latch_owned = false;
+
+static void
+WLOwnLatch(void)
+{
+ SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+ OwnLatch(&state->l_arr[MyBackendId].latch);
+ is_latch_owned = true;
+ if (MyBackendId > state->backend_maxid)
+ state->backend_maxid += 1;
+ state->l_arr[MyBackendId].pid = MyProcPid;
+ SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+static void
+WLDisownLatch(void)
+{
+ SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+ DisownLatch(&state->l_arr[MyBackendId].latch);
+ is_latch_owned = false;
+ if (MyBackendId = state->backend_maxid)
+ state->backend_maxid -= 1;
+ state->l_arr[MyBackendId].pid = 0;
+ SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+/* CallBack function */
+static void
+WLDisownLatchAbort(XactEvent event, void *arg)
+{
+ if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT ||
+ event == XACT_EVENT_ABORT))
+ {
+ WLDisownLatch();
+ }
+}
+
+/* Module load callback */
+void
+_PG_init(void)
+{
+ if (!IsUnderPostmaster)
+ RegisterXactCallback(WLDisownLatchAbort, NULL);
+}
+
+static uint32
+GetSHMEMSize(void)
+{
+ return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1);
+}
+
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+ uint i;
+
+ state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
+ GetSHMEMSize(),
+ &found);
+ if (!found)
+ {
+ state->backend_maxid = 1;
+ for (i = 0; i < (MaxConnections+1); i++)
+ {
+ state->l_arr[i].pid = 0;
+ SpinLockInit(&state->l_arr[i].slock);
+ InitSharedLatch(&state->l_arr[i].latch);
+ }
+ }
+}
+
+void
+WaitLSNSetLatch(void)
+{
+ uint i;
+ for (i = 1; i < (state->backend_maxid+1); i++)
+ {
+ SpinLockAcquire(&state->l_arr[i].slock);
+ if (state->l_arr[i].pid != 0)
+ SetLatch(&state->l_arr[i].latch);
+ SpinLockRelease(&state->l_arr[i].slock);
+ }
+}
+
+void
+WaitLSNUtility(const char *lsn, const int *delay)
+{
+ XLogRecPtr trg_lsn;
+ XLogRecPtr cur_lsn;
+ int latch_events;
+ int tdelay = delay;
+ TimestampTz timer = GetCurrentTimestamp();
+ trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn)));
+
+
+ if (delay > 0)
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
+ else
+ latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+ WLOwnLatch();
+
+ for (;;)
+ {
+ ResetLatch(&state->l_arr[MyBackendId].latch);
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN had been Replayed */
+ if (trg_lsn <= cur_lsn)
+ break;
+
+ /* If the postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ /* If Delay time is over */
+ if (latch_events & WL_TIMEOUT)
+ {
+ tdelay -= (GetCurrentTimestamp() - timer);
+ if (tdelay <= 0)
+ break;
+ timer = GetCurrentTimestamp();
+ }
+
+ CHECK_FOR_INTERRUPTS();
+ WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay);
+ }
+
+ WLDisownLatch();
+
+ if (trg_lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to make bigger delay.");
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index cb5cfc4..5fb43f6 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -267,7 +267,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
- CreateMatViewStmt RefreshMatViewStmt CreateAmStmt
+ CreateMatViewStmt RefreshMatViewStmt CreateAmStmt WaitLSNStmt
%type select_no_parens select_with_parens select_clause
simple_select values_clause
@@ -306,7 +306,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type OptSchemaEltList
%type TriggerForSpec TriggerForType
-%type TriggerActionTime
+%type TriggerActionTime WaitDelay
%type TriggerEvents TriggerOneEvent
%type TriggerFuncArg
%type TriggerWhen
@@ -644,7 +644,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAITLSN WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLPARSE
XMLPI XMLROOT XMLSERIALIZE
@@ -882,6 +882,7 @@ stmt :
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitLSNStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -12852,7 +12853,26 @@ frame_bound:
}
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * WAITLSN can appear as a query-level command
+ *
+ *
+ *****************************************************************************/
+WaitLSNStmt: WAITLSN Sconst WaitDelay
+ {
+ WaitLSNStmt *n = makeNode(WaitLSNStmt);
+ n->lsn = $2;
+ n->delay = $3;
+ $$ = (Node *)n;
+ }
+ ;
+WaitDelay:
+ ',' Iconst { $$ = $2; }
+ | /*EMPTY*/ { $$ = 0; }
+ ;
/*
* Supporting nonterminals for expressions.
*/
@@ -13908,6 +13928,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAITLSN
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index c04b17f..66001ae 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -254,6 +255,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
SyncScanShmemInit();
AsyncShmemInit();
+ /*
+ * Init array of Latches in SHMEM for WAITLSN
+ */
+ WaitLSNShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index ac50c2a..6c2447d 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -54,6 +54,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -902,6 +903,20 @@ standard_ProcessUtility(Node *parsetree,
break;
}
+ case T_WaitLSNStmt:
+ {
+ WaitLSNStmt *stmt = (WaitLSNStmt *) parsetree;
+ if (!RecoveryInProgress())
+ {
+ ereport(ERROR,(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
+ errmsg("cannot execute %s not during recovery",
+ "WaitLSN")));
+ }
+ else
+ WaitLSNUtility(stmt->lsn, stmt->delay);
+ }
+ break;
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(parsetree, queryString,
@@ -2359,6 +2374,10 @@ CreateCommandTag(Node *parsetree)
tag = "NOTIFY";
break;
+ case T_WaitLSNStmt:
+ tag = "WAITLSN";
+ break;
+
case T_ListenStmt:
tag = "LISTEN";
break;
@@ -2951,6 +2970,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitLSNStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 0000000..12d224e
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,20 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ * WaitLSN notification: WAITLSN
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAITLSN_H
+#define WAITLSN_H
+
+extern void WaitLSNUtility(const char *lsn, const int *delay);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatch(void);
+
+#endif /* WAITLSN_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 2f7efa8..8b9fc2b 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -463,6 +463,7 @@ typedef enum NodeTag
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
+ T_WaitLSNStmt,
/*
* TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 1481fff..ee8e0f3 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3101,4 +3101,15 @@ typedef struct AlterTSConfigurationStmt
bool missing_ok; /* for DROP - skip error if missing? */
} AlterTSConfigurationStmt;
+/* ----------------------
+ * WaitLSN Statement
+ * ----------------------
+ */
+typedef struct WaitLSNStmt
+{
+ NodeTag type;
+ char *lsn; /* Taraget LSN to wait for */
+ int *delay; /* Delay to wait for LSN*/
+} WaitLSNStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 17ffef5..b14193e 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -422,6 +422,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("waitlsn", WAITLSN, UNRESERVED_KEYWORD)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)