diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index 77667bd..72c5390 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory. + diff --git a/doc/src/sgml/ref/waitlsn.sgml b/doc/src/sgml/ref/waitlsn.sgml new file mode 100644 index 0000000..6a8bdca --- /dev/null +++ b/doc/src/sgml/ref/waitlsn.sgml @@ -0,0 +1,108 @@ + + + + + WAITLSN + + + + WAITLSN + 7 + SQL - Language Statements + + + + WAITLSN + wait when target LSN been replayed + + + + +WAITLSN 'LSN' [ , delay ] + + + + + Description + + + The WAITLSN wait till target LSN will + be replayed with an optional delay (milliseconds by default + infinity) to be wait for LSN to replayed. + + + + WAITLSN provides a simple + interprocess LSN wait mechanism for a backends on slave + in master-slave replication scheme on PostgreSQL database. + + + + + Parameters + + + + LSN + + + Target log sequence number to be wait for. + + + + + delay + + + Time in miliseconds to waiting for LSN to be replayed. + + + + + + + + Notes + + + Delay time to waiting for LSN to be replayed must be integer. For + default it is infinity. Waiting can be interupped using Ctl+C, or + by Postmaster death. + + + + + Examples + + + Configure and execute a waitlsn from + psql: + + +WAITLSN '0/3F07A6B1', 10000; +NOTICE: LSN is not reached. Try to make bigger delay. +WAITLSN + +WAITLSN '0/3F07A611'; +WAITLSN + +WAITLSN '0/3F0FF791', 500000; +^CCancel request sent +NOTICE: LSN is not reached. Try to make bigger delay. +ERROR: canceling statement due to user request + + + + + + Compatibility + + + There is no WAITLSN statement in the SQL + standard. + + + diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index 8acdff1..3733ad9 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -200,6 +200,7 @@ &update; &vacuum; &values; + &waitlsn; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f13f9c1..609c83e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -39,6 +39,7 @@ #include "catalog/pg_control.h" #include "catalog/pg_database.h" #include "commands/tablespace.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/bgwriter.h" @@ -6922,6 +6923,11 @@ StartupXLOG(void) break; } + /* + * After update lastReplayedEndRecPtr set Latches in SHMEM array + */ + WaitLSNSetLatch(); + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } while (record != NULL); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 6b3742c..091cbe2 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \ policy.o portalcmds.o prepare.o proclang.o \ schemacmds.o seclabel.o sequence.o tablecmds.o tablespace.o trigger.o \ tsearchcmds.o typecmds.o user.o vacuum.o vacuumlazy.o \ - variable.o view.o + variable.o view.o waitlsn.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 716f1c3..9ad3275 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -139,7 +139,6 @@ #include "utils/ps_status.h" #include "utils/timestamp.h" - /* * Maximum size of a NOTIFY payload, including terminating NULL. This * must be kept small enough so that a notification message fits on one diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c new file mode 100644 index 0000000..b441b85 --- /dev/null +++ b/src/backend/commands/waitlsn.c @@ -0,0 +1,195 @@ +/*------------------------------------------------------------------------- + * + * waitlsn.c + * WaitLSN statment: WAITLSN + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/commands/waitlsn.c + * + *------------------------------------------------------------------------- + */ + +/*------------------------------------------------------------------------- + * Wait for LSN been replayed on slave as of 9.5: + * README + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" +#include "utils/pg_lsn.h" +#include "storage/latch.h" +#include "miscadmin.h" +#include "storage/spin.h" +#include "storage/backendid.h" +#include "access/xact.h" +#include "storage/shmem.h" +#include "storage/ipc.h" +#include "access/xlog_fn.h" +#include "utils/timestamp.h" +#include "storage/pmsignal.h" +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "commands/waitlsn.h" + + +/* Latches Own-DisownLatch and AbortCаllBack */ +static uint32 GetSHMEMSize(void); +static void WLDisownLatchAbort(XactEvent event, void *arg); +static void WLOwnLatch(void); +static void WLDisownLatch(void); + +void _PG_init(void); + +/* Shared memory structures */ +typedef struct +{ + int pid; + volatile slock_t slock; + Latch latch; +} BIDLatch; + +typedef struct +{ + int backend_maxid; + BIDLatch l_arr[FLEXIBLE_ARRAY_MEMBER]; +} GlobState; + +static volatile GlobState *state; +bool is_latch_owned = false; + +static void +WLOwnLatch(void) +{ + SpinLockAcquire(&state->l_arr[MyBackendId].slock); + OwnLatch(&state->l_arr[MyBackendId].latch); + is_latch_owned = true; + if (MyBackendId > state->backend_maxid) + state->backend_maxid += 1; + state->l_arr[MyBackendId].pid = MyProcPid; + SpinLockRelease(&state->l_arr[MyBackendId].slock); +} + +static void +WLDisownLatch(void) +{ + SpinLockAcquire(&state->l_arr[MyBackendId].slock); + DisownLatch(&state->l_arr[MyBackendId].latch); + is_latch_owned = false; + if (MyBackendId = state->backend_maxid) + state->backend_maxid -= 1; + state->l_arr[MyBackendId].pid = 0; + SpinLockRelease(&state->l_arr[MyBackendId].slock); +} + +/* CallBack function */ +static void +WLDisownLatchAbort(XactEvent event, void *arg) +{ + if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT || + event == XACT_EVENT_ABORT)) + { + WLDisownLatch(); + } +} + +/* Module load callback */ +void +_PG_init(void) +{ + if (!IsUnderPostmaster) + RegisterXactCallback(WLDisownLatchAbort, NULL); +} + +static uint32 +GetSHMEMSize(void) +{ + return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1); +} + +void +WaitLSNShmemInit(void) +{ + bool found; + uint i; + + state = (GlobState *) ShmemInitStruct("pg_wait_lsn", + GetSHMEMSize(), + &found); + if (!found) + { + state->backend_maxid = 1; + for (i = 0; i < (MaxConnections+1); i++) + { + state->l_arr[i].pid = 0; + SpinLockInit(&state->l_arr[i].slock); + InitSharedLatch(&state->l_arr[i].latch); + } + } +} + +void +WaitLSNSetLatch(void) +{ + uint i; + for (i = 1; i < (state->backend_maxid+1); i++) + { + SpinLockAcquire(&state->l_arr[i].slock); + if (state->l_arr[i].pid != 0) + SetLatch(&state->l_arr[i].latch); + SpinLockRelease(&state->l_arr[i].slock); + } +} + +void +WaitLSNUtility(const char *lsn, const int *delay) +{ + XLogRecPtr trg_lsn; + XLogRecPtr cur_lsn; + int latch_events; + int tdelay = delay; + TimestampTz timer = GetCurrentTimestamp(); + trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn))); + + + if (delay > 0) + latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; + else + latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + WLOwnLatch(); + + for (;;) + { + ResetLatch(&state->l_arr[MyBackendId].latch); + cur_lsn = GetXLogReplayRecPtr(NULL); + + /* If LSN had been Replayed */ + if (trg_lsn <= cur_lsn) + break; + + /* If the postmaster dies, finish immediately */ + if (!PostmasterIsAlive()) + break; + + /* If Delay time is over */ + if (latch_events & WL_TIMEOUT) + { + tdelay -= (GetCurrentTimestamp() - timer); + if (tdelay <= 0) + break; + timer = GetCurrentTimestamp(); + } + + CHECK_FOR_INTERRUPTS(); + WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay); + } + + WLDisownLatch(); + + if (trg_lsn > cur_lsn) + elog(NOTICE,"LSN is not reached. Try to make bigger delay."); +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index cb5cfc4..5fb43f6 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -267,7 +267,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt - CreateMatViewStmt RefreshMatViewStmt CreateAmStmt + CreateMatViewStmt RefreshMatViewStmt CreateAmStmt WaitLSNStmt %type select_no_parens select_with_parens select_clause simple_select values_clause @@ -306,7 +306,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type OptSchemaEltList %type TriggerForSpec TriggerForType -%type TriggerActionTime +%type TriggerActionTime WaitDelay %type TriggerEvents TriggerOneEvent %type TriggerFuncArg %type TriggerWhen @@ -644,7 +644,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING VERBOSE VERSION_P VIEW VIEWS VOLATILE - WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE + WAITLSN WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLPARSE XMLPI XMLROOT XMLSERIALIZE @@ -882,6 +882,7 @@ stmt : | VariableSetStmt | VariableShowStmt | ViewStmt + | WaitLSNStmt | /*EMPTY*/ { $$ = NULL; } ; @@ -12852,7 +12853,26 @@ frame_bound: } ; +/***************************************************************************** + * + * QUERY: + * WAITLSN can appear as a query-level command + * + * + *****************************************************************************/ +WaitLSNStmt: WAITLSN Sconst WaitDelay + { + WaitLSNStmt *n = makeNode(WaitLSNStmt); + n->lsn = $2; + n->delay = $3; + $$ = (Node *)n; + } + ; +WaitDelay: + ',' Iconst { $$ = $2; } + | /*EMPTY*/ { $$ = 0; } + ; /* * Supporting nonterminals for expressions. */ @@ -13908,6 +13928,7 @@ unreserved_keyword: | VIEW | VIEWS | VOLATILE + | WAITLSN | WHITESPACE_P | WITHIN | WITHOUT diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index c04b17f..66001ae 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/subtrans.h" #include "access/twophase.h" #include "commands/async.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -254,6 +255,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) SyncScanShmemInit(); AsyncShmemInit(); + /* + * Init array of Latches in SHMEM for WAITLSN + */ + WaitLSNShmemInit(); + #ifdef EXEC_BACKEND /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index ac50c2a..6c2447d 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -54,6 +54,7 @@ #include "commands/user.h" #include "commands/vacuum.h" #include "commands/view.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" @@ -902,6 +903,20 @@ standard_ProcessUtility(Node *parsetree, break; } + case T_WaitLSNStmt: + { + WaitLSNStmt *stmt = (WaitLSNStmt *) parsetree; + if (!RecoveryInProgress()) + { + ereport(ERROR,(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), + errmsg("cannot execute %s not during recovery", + "WaitLSN"))); + } + else + WaitLSNUtility(stmt->lsn, stmt->delay); + } + break; + default: /* All other statement types have event trigger support */ ProcessUtilitySlow(parsetree, queryString, @@ -2359,6 +2374,10 @@ CreateCommandTag(Node *parsetree) tag = "NOTIFY"; break; + case T_WaitLSNStmt: + tag = "WAITLSN"; + break; + case T_ListenStmt: tag = "LISTEN"; break; @@ -2951,6 +2970,10 @@ GetCommandLogLevel(Node *parsetree) lev = LOGSTMT_ALL; break; + case T_WaitLSNStmt: + lev = LOGSTMT_ALL; + break; + case T_ListenStmt: lev = LOGSTMT_ALL; break; diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h new file mode 100644 index 0000000..12d224e --- /dev/null +++ b/src/include/commands/waitlsn.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * waitlsn.h + * WaitLSN notification: WAITLSN + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 2016, Regents of PostgresPRO + * + * src/include/commands/waitlsn.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAITLSN_H +#define WAITLSN_H + +extern void WaitLSNUtility(const char *lsn, const int *delay); +extern void WaitLSNShmemInit(void); +extern void WaitLSNSetLatch(void); + +#endif /* WAITLSN_H */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 2f7efa8..8b9fc2b 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -463,6 +463,7 @@ typedef enum NodeTag T_DropReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, + T_WaitLSNStmt, /* * TAGS FOR RANDOM OTHER STUFF diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 1481fff..ee8e0f3 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3101,4 +3101,15 @@ typedef struct AlterTSConfigurationStmt bool missing_ok; /* for DROP - skip error if missing? */ } AlterTSConfigurationStmt; +/* ---------------------- + * WaitLSN Statement + * ---------------------- + */ +typedef struct WaitLSNStmt +{ + NodeTag type; + char *lsn; /* Taraget LSN to wait for */ + int *delay; /* Delay to wait for LSN*/ +} WaitLSNStmt; + #endif /* PARSENODES_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 17ffef5..b14193e 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -422,6 +422,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD) PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD) PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD) PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD) +PG_KEYWORD("waitlsn", WAITLSN, UNRESERVED_KEYWORD) PG_KEYWORD("when", WHEN, RESERVED_KEYWORD) PG_KEYWORD("where", WHERE, RESERVED_KEYWORD) PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)