src/backend/commands/async.c | 291 +++++++++++++++++++++---- src/backend/nodes/copyfuncs.c | 5 +- src/backend/nodes/equalfuncs.c | 5 +- src/backend/parser/gram.y | 20 +- src/backend/tcop/utility.c | 6 +- src/include/commands/async.h | 4 +- src/include/nodes/parsenodes.h | 5 +- src/test/isolation/expected/async-notify-2.out | 118 ++++++++++ src/test/isolation/isolation_schedule | 1 + src/test/isolation/isolationtester.c | 36 ++- src/test/isolation/specs/async-notify-2.spec | 31 +++ src/test/regress/expected/async.out | 45 +++- src/test/regress/sql/async.sql | 26 ++- 13 files changed, 530 insertions(+), 63 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index bacc08e..51acf44 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -138,7 +138,10 @@ #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/timestamp.h" - +#include "utils/elog.h" +#include "regex/regex.h" +#include "catalog/pg_collation.h" +#include "miscadmin.h" /* * Maximum size of a NOTIFY payload, including terminating NULL. This @@ -286,9 +289,19 @@ static SlruCtlData AsyncCtlData; */ #define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1) + /* + * Currently listened channel consisting of compiled RE + * used for pattern listeners and the pattern send by the user. + */ +typedef struct ListenChannel +{ + regex_t *compiledRegex; + char userPattern[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ +} ListenChannel; + /* * listenChannels identifies the channels we are actually listening to - * (ie, have committed a LISTEN on). It is a simple list of channel names, + * (ie, have committed a LISTEN on). It is a list of ListenChannel, * allocated in TopMemoryContext. */ static List *listenChannels = NIL; /* list of C strings */ @@ -313,7 +326,9 @@ typedef enum typedef struct { ListenActionKind action; - char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ + bool actionApplied; + regex_t *compiledRegex; + char userPattern[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ } ListenAction; static List *pendingActions = NIL; /* list of ListenAction */ @@ -369,12 +384,12 @@ bool Trace_notify = false; /* local function prototypes */ static bool asyncQueuePagePrecedes(int p, int q); -static void queue_listen(ListenActionKind action, const char *channel); +static void queue_listen(ListenActionKind action, const char *pattern, bool isSimilarToPattern); static void Async_UnlistenOnExit(int code, Datum arg); static void Exec_ListenPreCommit(void); -static void Exec_ListenCommit(const char *channel); -static void Exec_UnlistenCommit(const char *channel); -static void Exec_UnlistenAllCommit(void); +static bool Exec_ListenCommit(const char *pattern, regex_t *compiledRegex); +static bool Exec_UnlistenCommit(const char *pattern); +static bool Exec_UnlistenAllCommit(void); static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); static bool asyncQueueIsFull(void); @@ -594,6 +609,36 @@ Async_Notify(const char *channel, const char *payload) } /* +* compile_regex +* Compiles RE pattern into a compiled RE. +* +* Returns result code from pg_regcomp. +*/ +static int +compile_regex(const char *pattern, regex_t *compiled_regex) +{ + pg_wchar *wcharpattern; + int resregcomp; + int lenwchar; + int lenpattern = strlen(pattern); + + wcharpattern = (pg_wchar *)palloc((lenpattern + 1) * sizeof(pg_wchar)); + lenwchar = pg_mb2wchar_with_len(pattern, + wcharpattern, + lenpattern); + + resregcomp = pg_regcomp(compiled_regex, + wcharpattern, + lenwchar, + REG_ADVANCED, + DEFAULT_COLLATION_OID); + + pfree(wcharpattern); + + return resregcomp; +} + +/* * queue_listen * Common code for listen, unlisten, unlisten all commands. * @@ -602,10 +647,15 @@ Async_Notify(const char *channel, const char *payload) * commit. */ static void -queue_listen(ListenActionKind action, const char *channel) +queue_listen(ListenActionKind action, const char *pattern, bool isSimilarToPattern) { MemoryContext oldcontext; ListenAction *actrec; + regex_t compreg; + regex_t *pcompreg; + int resregcomp; + char errormsg[100]; + Datum datum; /* * Unlike Async_Notify, we don't try to collapse out duplicates. It would @@ -615,11 +665,44 @@ queue_listen(ListenActionKind action, const char *channel) */ oldcontext = MemoryContextSwitchTo(CurTransactionContext); + if (isSimilarToPattern) + { + /* convert to regex pattern */ + datum = DirectFunctionCall1(similar_escape, CStringGetTextDatum(pattern)); + + /* + * Regex pattern is now compiled to ensure any errors are captured at this point. + * Compiled regex is copied to top memory context when we reach transaction commit. + * If compiled RE was not applied as a listener then it is freed at transaction commit. + */ + resregcomp = compile_regex(TextDatumGetCString(datum), &compreg); + + if (resregcomp != REG_OKAY) + { + MemoryContextSwitchTo(oldcontext); + + CHECK_FOR_INTERRUPTS(); + pg_regerror(resregcomp, &compreg, errormsg, sizeof(errormsg)); + ereport(ERROR, + (errcode(ERRCODE_INVALID_REGULAR_EXPRESSION), + errmsg("invalid regular expression: %s", errormsg))); + } + + pcompreg = palloc(sizeof(regex_t)); + memcpy(pcompreg, &compreg, sizeof(regex_t)); + } + else + { + pcompreg = NULL; + } + /* space for terminating null is included in sizeof(ListenAction) */ - actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) + - strlen(channel) + 1); + actrec = (ListenAction *) palloc(offsetof(ListenAction, userPattern) + + strlen(pattern) + 1); actrec->action = action; - strcpy(actrec->channel, channel); + actrec->actionApplied = false; + actrec->compiledRegex = pcompreg; + strcpy(actrec->userPattern, pattern); pendingActions = lappend(pendingActions, actrec); @@ -632,12 +715,12 @@ queue_listen(ListenActionKind action, const char *channel) * This is executed by the SQL listen command. */ void -Async_Listen(const char *channel) +Async_Listen(const char *pattern, bool isSimilarToPattern) { if (Trace_notify) - elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); + elog(DEBUG1, "Async_Listen(%s,%d)", pattern, MyProcPid); - queue_listen(LISTEN_LISTEN, channel); + queue_listen(LISTEN_LISTEN, pattern, isSimilarToPattern); } /* @@ -646,16 +729,16 @@ Async_Listen(const char *channel) * This is executed by the SQL unlisten command. */ void -Async_Unlisten(const char *channel) +Async_Unlisten(const char *pattern) { if (Trace_notify) - elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); + elog(DEBUG1, "Async_Unlisten(%s,%d)", pattern, MyProcPid); /* If we couldn't possibly be listening, no need to queue anything */ if (pendingActions == NIL && !unlistenExitRegistered) return; - queue_listen(LISTEN_UNLISTEN, channel); + queue_listen(LISTEN_UNLISTEN, pattern, false); } /* @@ -673,7 +756,7 @@ Async_UnlistenAll(void) if (pendingActions == NIL && !unlistenExitRegistered) return; - queue_listen(LISTEN_UNLISTEN_ALL, ""); + queue_listen(LISTEN_UNLISTEN_ALL, "", false); } /* @@ -714,10 +797,10 @@ pg_listening_channels(PG_FUNCTION_ARGS) while (*lcp != NULL) { - char *channel = (char *) lfirst(*lcp); + ListenChannel *channel = (ListenChannel *) lfirst(*lcp); *lcp = lnext(*lcp); - SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); + SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel->userPattern)); } SRF_RETURN_DONE(funcctx); @@ -890,13 +973,13 @@ AtCommit_Notify(void) switch (actrec->action) { case LISTEN_LISTEN: - Exec_ListenCommit(actrec->channel); + actrec->actionApplied = Exec_ListenCommit(actrec->userPattern, actrec->compiledRegex); break; case LISTEN_UNLISTEN: - Exec_UnlistenCommit(actrec->channel); + actrec->actionApplied = Exec_UnlistenCommit(actrec->userPattern); break; case LISTEN_UNLISTEN_ALL: - Exec_UnlistenAllCommit(); + actrec->actionApplied = Exec_UnlistenAllCommit(); break; } } @@ -1000,14 +1083,23 @@ Exec_ListenPreCommit(void) * * Add the channel to the list of channels we are listening on. */ -static void -Exec_ListenCommit(const char *channel) +static bool +Exec_ListenCommit(const char *pattern, regex_t *compiledRegex) { + ListCell *p; MemoryContext oldcontext; + ListenChannel *lchan; + regex_t *copiedcompreg; - /* Do nothing if we are already listening on this channel */ - if (IsListeningOn(channel)) - return; + /* Do nothing if we are already using this pattern for listening */ + + foreach(p, listenChannels) + { + ListenChannel *lchan = (ListenChannel *)lfirst(p); + + if (strcmp(lchan->userPattern, pattern) == 0) + return false; + } /* * Add the new channel name to listenChannels. @@ -1017,9 +1109,31 @@ Exec_ListenCommit(const char *channel) * doesn't seem worth trying to guard against that, but maybe improve this * later. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); - listenChannels = lappend(listenChannels, pstrdup(channel)); + + if (compiledRegex != NULL) + { + /* copy the compiled RE to top memory context */ + + copiedcompreg = (regex_t *)palloc(sizeof(regex_t)); + memcpy(copiedcompreg, compiledRegex, sizeof(regex_t)); + } + else + { + copiedcompreg = NULL; + } + + lchan = (ListenChannel *)palloc(offsetof(ListenChannel, userPattern) + + strlen(pattern) + 1); + lchan->compiledRegex = copiedcompreg; + strcpy(lchan->userPattern, pattern); + + listenChannels = lappend(listenChannels, lchan); + MemoryContextSwitchTo(oldcontext); + + return true; } /* @@ -1027,24 +1141,35 @@ Exec_ListenCommit(const char *channel) * * Remove the specified channel name from listenChannels. */ -static void -Exec_UnlistenCommit(const char *channel) +static bool +Exec_UnlistenCommit(const char *pattern) { ListCell *q; ListCell *prev; + bool found; if (Trace_notify) - elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); + elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", pattern, MyProcPid); + found = false; prev = NULL; foreach(q, listenChannels) { - char *lchan = (char *) lfirst(q); + ListenChannel *lchan = (ListenChannel *) lfirst(q); - if (strcmp(lchan, channel) == 0) + if (strcmp(lchan->userPattern, pattern) == 0) { + if (lchan->compiledRegex != NULL) + { + pg_regfree(lchan->compiledRegex); + pfree(lchan->compiledRegex); + } + listenChannels = list_delete_cell(listenChannels, q, prev); + pfree(lchan); + + found = true; break; } prev = q; @@ -1054,6 +1179,8 @@ Exec_UnlistenCommit(const char *channel) * We do not complain about unlistening something not being listened; * should we? */ + + return found; } /* @@ -1061,14 +1188,34 @@ Exec_UnlistenCommit(const char *channel) * * Unlisten on all channels for this backend. */ -static void +static bool Exec_UnlistenAllCommit(void) { + ListCell *p; + bool is_empty; + if (Trace_notify) elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); + is_empty = true; + + foreach(p, listenChannels) + { + ListenChannel *lchan = (ListenChannel *)lfirst(p); + + if (lchan->compiledRegex != NULL) + { + pg_regfree(lchan->compiledRegex); + pfree(lchan->compiledRegex); + } + + is_empty = false; + } + list_free_deep(listenChannels); listenChannels = NIL; + + return is_empty; } /* @@ -1163,16 +1310,66 @@ ProcessCompletedNotifies(void) static bool IsListeningOn(const char *channel) { - ListCell *p; + ListCell *p; + pg_wchar *wcharchannel; + int lenwchar; + int resregexec; + char errormsg[100]; + bool matches; + + wcharchannel = NULL; + matches = false; foreach(p, listenChannels) { - char *lchan = (char *) lfirst(p); + ListenChannel *lchan = (ListenChannel *) lfirst(p); - if (strcmp(lchan, channel) == 0) - return true; + if (lchan->compiledRegex == NULL) + { + if (strcmp(lchan->userPattern, channel) == 0) + { + matches = true; + break; + } + } + else + { + if (wcharchannel == NULL) + { + /* Convert channel string to wide characters */ + wcharchannel = (pg_wchar *)palloc((strlen(channel) + 1) * sizeof(pg_wchar)); + lenwchar = pg_mb2wchar_with_len(channel, wcharchannel, strlen(channel)); + } + + /* Check RE match */ + resregexec = pg_regexec(lchan->compiledRegex, wcharchannel, lenwchar, 0, NULL, 0, NULL, 0); + + if (resregexec != REG_OKAY && resregexec != REG_NOMATCH) + { + pfree(wcharchannel); + wcharchannel = NULL; + + CHECK_FOR_INTERRUPTS(); + pg_regerror(resregexec, lchan->compiledRegex, errormsg, sizeof(errormsg)); + ereport(ERROR, + (errcode(ERRCODE_INVALID_REGULAR_EXPRESSION), + errmsg("regular expression failed: %s", errormsg))); + } + + if (resregexec == REG_OKAY) + { + matches = true; + break; + } + } } - return false; + + if (wcharchannel != NULL) + { + pfree(wcharchannel); + } + + return matches; } /* @@ -2149,6 +2346,20 @@ AsyncExistsPendingNotify(const char *channel, const char *payload) static void ClearPendingActionsAndNotifies(void) { + ListCell *p; + + /* free compiled REs that were not added as new listeners */ + foreach(p, pendingActions) + { + ListenAction *actrec = (ListenAction *)lfirst(p); + + if (!actrec->actionApplied && actrec->compiledRegex != NULL) + { + pg_regfree(actrec->compiledRegex); + pfree(actrec->compiledRegex); + } + } + /* * We used to have to explicitly deallocate the list members and nodes, * because they were malloc'd. Now, since we know they are palloc'd in diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 45a04b0..dd1d1ef 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -3545,7 +3545,8 @@ _copyListenStmt(const ListenStmt *from) { ListenStmt *newnode = makeNode(ListenStmt); - COPY_STRING_FIELD(conditionname); + COPY_STRING_FIELD(pattern); + COPY_SCALAR_FIELD(isSimilarToPattern); return newnode; } @@ -3555,7 +3556,7 @@ _copyUnlistenStmt(const UnlistenStmt *from) { UnlistenStmt *newnode = makeNode(UnlistenStmt); - COPY_STRING_FIELD(conditionname); + COPY_STRING_FIELD(pattern); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 8d92c03..a3ac36b 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1482,7 +1482,8 @@ _equalNotifyStmt(const NotifyStmt *a, const NotifyStmt *b) static bool _equalListenStmt(const ListenStmt *a, const ListenStmt *b) { - COMPARE_STRING_FIELD(conditionname); + COMPARE_STRING_FIELD(pattern); + COMPARE_SCALAR_FIELD(isSimilarToPattern); return true; } @@ -1490,7 +1491,7 @@ _equalListenStmt(const ListenStmt *a, const ListenStmt *b) static bool _equalUnlistenStmt(const UnlistenStmt *a, const UnlistenStmt *b) { - COMPARE_STRING_FIELD(conditionname); + COMPARE_STRING_FIELD(pattern); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 4b1ce09..b7a4cf4 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9451,7 +9451,15 @@ notify_payload: ListenStmt: LISTEN ColId { ListenStmt *n = makeNode(ListenStmt); - n->conditionname = $2; + n->pattern = $2; + n->isSimilarToPattern = false; + $$ = (Node *)n; + } + | LISTEN SIMILAR TO Sconst + { + ListenStmt *n = makeNode(ListenStmt); + n->pattern = $4; + n->isSimilarToPattern = true; $$ = (Node *)n; } ; @@ -9460,13 +9468,19 @@ UnlistenStmt: UNLISTEN ColId { UnlistenStmt *n = makeNode(UnlistenStmt); - n->conditionname = $2; + n->pattern = $2; + $$ = (Node *)n; + } + | UNLISTEN Sconst + { + UnlistenStmt *n = makeNode(UnlistenStmt); + n->pattern = $2; $$ = (Node *)n; } | UNLISTEN '*' { UnlistenStmt *n = makeNode(UnlistenStmt); - n->conditionname = NULL; + n->pattern = NULL; $$ = (Node *)n; } ; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index ddacac8..9251ad2 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -630,7 +630,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, PreventCommandDuringRecovery("LISTEN"); CheckRestrictedOperation("LISTEN"); - Async_Listen(stmt->conditionname); + Async_Listen(stmt->pattern, stmt->isSimilarToPattern); } break; @@ -640,8 +640,8 @@ standard_ProcessUtility(PlannedStmt *pstmt, PreventCommandDuringRecovery("UNLISTEN"); CheckRestrictedOperation("UNLISTEN"); - if (stmt->conditionname) - Async_Unlisten(stmt->conditionname); + if (stmt->pattern) + Async_Unlisten(stmt->pattern); else Async_UnlistenAll(); } diff --git a/src/include/commands/async.h b/src/include/commands/async.h index 939711d..f86a482 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -34,8 +34,8 @@ extern void NotifyMyFrontEnd(const char *channel, /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); -extern void Async_Listen(const char *channel); -extern void Async_Unlisten(const char *channel); +extern void Async_Listen(const char *pattern, bool isSimilarToPattern); +extern void Async_Unlisten(const char *pattern); extern void Async_UnlistenAll(void); /* perform (or cancel) outbound notify processing at transaction commit */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 5f2a4a7..3f9fe9d 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2889,7 +2889,8 @@ typedef struct NotifyStmt typedef struct ListenStmt { NodeTag type; - char *conditionname; /* condition name to listen on */ + bool isSimilarToPattern; + char *pattern; /* condition pattern to listen on */ } ListenStmt; /* ---------------------- @@ -2899,7 +2900,7 @@ typedef struct ListenStmt typedef struct UnlistenStmt { NodeTag type; - char *conditionname; /* name to unlisten on, or NULL for all */ + char *pattern; /* condition pattern to unlisten on, or NULL for all */ } UnlistenStmt; /* ---------------------- diff --git a/src/test/isolation/expected/async-notify-2.out b/src/test/isolation/expected/async-notify-2.out new file mode 100644 index 0000000..081cae3 --- /dev/null +++ b/src/test/isolation/expected/async-notify-2.out @@ -0,0 +1,118 @@ +Parsed test spec with 2 sessions + +starting permutation: listen_normal notify consume +step listen_normal: LISTEN test; +step notify: + SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s; + SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s; + +count + +5 +count + +5 +step consume: BEGIN; END; +ASYNC NOTIFY of 'test' with payload '1' received +ASYNC NOTIFY of 'test' with payload '2' received +ASYNC NOTIFY of 'test' with payload '3' received +ASYNC NOTIFY of 'test' with payload '4' received +ASYNC NOTIFY of 'test' with payload '5' received + +starting permutation: listen_pattern_1 notify consume +step listen_pattern_1: LISTEN SIMILAR TO 'te%'; +step notify: + SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s; + SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s; + +count + +5 +count + +5 +step consume: BEGIN; END; +ASYNC NOTIFY of 'test' with payload '1' received +ASYNC NOTIFY of 'test' with payload '2' received +ASYNC NOTIFY of 'test' with payload '3' received +ASYNC NOTIFY of 'test' with payload '4' received +ASYNC NOTIFY of 'test' with payload '5' received +ASYNC NOTIFY of 'test_2' with payload '1' received +ASYNC NOTIFY of 'test_2' with payload '2' received +ASYNC NOTIFY of 'test_2' with payload '3' received +ASYNC NOTIFY of 'test_2' with payload '4' received +ASYNC NOTIFY of 'test_2' with payload '5' received + +starting permutation: listen_pattern_2 notify consume +step listen_pattern_2: LISTEN SIMILAR TO 'test'; +step notify: + SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s; + SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s; + +count + +5 +count + +5 +step consume: BEGIN; END; +ASYNC NOTIFY of 'test' with payload '1' received +ASYNC NOTIFY of 'test' with payload '2' received +ASYNC NOTIFY of 'test' with payload '3' received +ASYNC NOTIFY of 'test' with payload '4' received +ASYNC NOTIFY of 'test' with payload '5' received + +starting permutation: listen_pattern_3 notify consume +step listen_pattern_3: LISTEN SIMILAR TO 'te'; +step notify: + SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s; + SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s; + +count + +5 +count + +5 +step consume: BEGIN; END; + +starting permutation: listen_pattern_invalid notify consume +step listen_pattern_invalid: LISTEN SIMILAR TO '*'; +ERROR: invalid regular expression: quantifier operand invalid +step notify: + SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s; + SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s; + +count + +5 +count + +5 +step consume: BEGIN; END; + +starting permutation: listen_normal listen_pattern_1 unlisten_1 notify consume +step listen_normal: LISTEN test; +step listen_pattern_1: LISTEN SIMILAR TO 'te%'; +step unlisten_1: UNLISTEN 't%'; +step notify: + SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s; + SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s; + +count + +5 +count + +5 +step consume: BEGIN; END; +ASYNC NOTIFY of 'test' with payload '1' received +ASYNC NOTIFY of 'test' with payload '2' received +ASYNC NOTIFY of 'test' with payload '3' received +ASYNC NOTIFY of 'test' with payload '4' received +ASYNC NOTIFY of 'test' with payload '5' received +ASYNC NOTIFY of 'test_2' with payload '1' received +ASYNC NOTIFY of 'test_2' with payload '2' received +ASYNC NOTIFY of 'test_2' with payload '3' received +ASYNC NOTIFY of 'test_2' with payload '4' received +ASYNC NOTIFY of 'test_2' with payload '5' received diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule index 32c965b..a25cdeb 100644 --- a/src/test/isolation/isolation_schedule +++ b/src/test/isolation/isolation_schedule @@ -60,5 +60,6 @@ test: alter-table-3 test: create-trigger test: sequence-ddl test: async-notify +test: async-notify-2 test: vacuum-reltuples test: timeouts diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c index ba8082c..b1eb17d 100644 --- a/src/test/isolation/isolationtester.c +++ b/src/test/isolation/isolationtester.c @@ -46,7 +46,8 @@ static bool try_complete_step(Step *step, int flags); static int step_qsort_cmp(const void *a, const void *b); static int step_bsearch_cmp(const void *a, const void *b); -static void printResultSet(PGresult *res); +static void printResultSet(PGresult *res, PGconn *conn); +static void printAsyncNotify(PGconn *conn); /* close all connections and exit */ static void @@ -487,7 +488,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) res = PQexec(conns[0], testspec->setupsqls[i]); if (PQresultStatus(res) == PGRES_TUPLES_OK) { - printResultSet(res); + printResultSet(res, conns[i + 1]); } else if (PQresultStatus(res) != PGRES_COMMAND_OK) { @@ -505,7 +506,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) res = PQexec(conns[i + 1], testspec->sessions[i]->setupsql); if (PQresultStatus(res) == PGRES_TUPLES_OK) { - printResultSet(res); + printResultSet(res, conns[i + 1]); } else if (PQresultStatus(res) != PGRES_COMMAND_OK) { @@ -640,7 +641,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) res = PQexec(conns[i + 1], testspec->sessions[i]->teardownsql); if (PQresultStatus(res) == PGRES_TUPLES_OK) { - printResultSet(res); + printResultSet(res, conns[i + 1]); } else if (PQresultStatus(res) != PGRES_COMMAND_OK) { @@ -659,7 +660,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps) res = PQexec(conns[0], testspec->teardownsql); if (PQresultStatus(res) == PGRES_TUPLES_OK) { - printResultSet(res); + printResultSet(res, conns[0]); } else if (PQresultStatus(res) != PGRES_COMMAND_OK) { @@ -821,9 +822,10 @@ try_complete_step(Step *step, int flags) switch (PQresultStatus(res)) { case PGRES_COMMAND_OK: + printAsyncNotify(conn); break; case PGRES_TUPLES_OK: - printResultSet(res); + printResultSet(res, conn); break; case PGRES_FATAL_ERROR: if (step->errormsg != NULL) @@ -860,7 +862,7 @@ try_complete_step(Step *step, int flags) } static void -printResultSet(PGresult *res) +printResultSet(PGresult *res, PGconn *conn) { int nFields; int i, @@ -879,4 +881,24 @@ printResultSet(PGresult *res) printf("%-15s", PQgetvalue(res, i, j)); printf("\n"); } + + printAsyncNotify(conn); +} + +static void +printAsyncNotify(PGconn *conn) +{ + PGnotify *notify; + + while ((notify = PQnotifies(conn)) != NULL) + { + if (notify->extra[0]) + printf("ASYNC NOTIFY of '%s' with payload '%s' received\n", + notify->relname, notify->extra); + else + printf("ASYNC NOTIFY of '%s' received\n", + notify->relname); + + PQfreemem(notify); + } } diff --git a/src/test/isolation/specs/async-notify-2.spec b/src/test/isolation/specs/async-notify-2.spec new file mode 100644 index 0000000..dd1d38f --- /dev/null +++ b/src/test/isolation/specs/async-notify-2.spec @@ -0,0 +1,31 @@ +# Verify that messages are consumed from the notify queue. + +session "listener" +step "listen_normal" { LISTEN test; } +step "listen_pattern_1" { LISTEN SIMILAR TO 'te%'; } +step "listen_pattern_2" { LISTEN SIMILAR TO 'test'; } +step "listen_pattern_3" { LISTEN SIMILAR TO 'te'; } +step "listen_pattern_invalid" { LISTEN SIMILAR TO '*'; } +step "unlisten_1" { UNLISTEN 't%'; } +step "consume" { BEGIN; END; } +teardown { UNLISTEN *; } + +session "notifier" +step "notify" +{ + SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s; + SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s; +} + +# Should print first notify channel +permutation "listen_normal" "notify" "consume" +# Should print both notify channels +permutation "listen_pattern_1" "notify" "consume" +# Should print first notify channel +permutation "listen_pattern_2" "notify" "consume" +# Should not print either notify channels +permutation "listen_pattern_3" "notify" "consume" +# Should fail to invalid RE pattern +permutation "listen_pattern_invalid" "notify" "consume" +# Test that UNLISTEN with a pattern does not work as a RE matcher +permutation "listen_normal" "listen_pattern_1" "unlisten_1" "notify" "consume" \ No newline at end of file diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out index 19cbe38..963b1d0 100644 --- a/src/test/regress/expected/async.out +++ b/src/test/regress/expected/async.out @@ -27,11 +27,54 @@ SELECT pg_notify(NULL,'sample message1'); ERROR: channel name cannot be empty SELECT pg_notify('notify_async_channel_name_too_long______________________________','sample_message1'); ERROR: channel name too long ---Should work. Valid NOTIFY/LISTEN/UNLISTEN commands +-- Should work. Valid NOTIFY/LISTEN/UNLISTEN commands +-- src/test/isolation/specs/async-notify-2.spec tests for actual usage. NOTIFY notify_async2; LISTEN notify_async2; +LISTEN SIMILAR TO 'notify_%'; UNLISTEN notify_async2; +UNLISTEN 'notify_%'; +UNLISTEN 'notify_(%'; UNLISTEN *; +-- Should fail. Invalid LISTEN command +LISTEN *; +ERROR: syntax error at or near "*" +LINE 1: LISTEN *; + ^ +LISTEN notify_%; +ERROR: syntax error at or near "%" +LINE 1: LISTEN notify_%; + ^ +LISTEN SIMILAR TO 'notify_(%'; +ERROR: invalid regular expression: parentheses () not balanced +LISTEN SIMILAR TO '*'; +ERROR: invalid regular expression: quantifier operand invalid +-- Should contain two listeners +LISTEN notify_async2; +LISTEN SIMILAR TO 'notify_async2'; +LISTEN SIMILAR TO 'notify_%'; +SELECT pg_listening_channels(); + pg_listening_channels +----------------------- + notify_async2 + notify_% +(2 rows) + +-- Should contain one listener +UNLISTEN 'notify_%'; +SELECT pg_listening_channels(); + pg_listening_channels +----------------------- + notify_async2 +(1 row) + +-- Should not contain listeners +UNLISTEN *; +SELECT pg_listening_channels(); + pg_listening_channels +----------------------- +(0 rows) + -- Should return zero while there are no pending notifications. -- src/test/isolation/specs/async-notify.spec tests for actual usage. SELECT pg_notification_queue_usage(); diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql index 40f6e01..69d8f98 100644 --- a/src/test/regress/sql/async.sql +++ b/src/test/regress/sql/async.sql @@ -12,12 +12,36 @@ SELECT pg_notify('','sample message1'); SELECT pg_notify(NULL,'sample message1'); SELECT pg_notify('notify_async_channel_name_too_long______________________________','sample_message1'); ---Should work. Valid NOTIFY/LISTEN/UNLISTEN commands +-- Should work. Valid NOTIFY/LISTEN/UNLISTEN commands +-- src/test/isolation/specs/async-notify-2.spec tests for actual usage. NOTIFY notify_async2; LISTEN notify_async2; +LISTEN SIMILAR TO 'notify_%'; UNLISTEN notify_async2; +UNLISTEN 'notify_%'; +UNLISTEN 'notify_(%'; UNLISTEN *; +-- Should fail. Invalid LISTEN command +LISTEN *; +LISTEN notify_%; +LISTEN SIMILAR TO 'notify_(%'; +LISTEN SIMILAR TO '*'; + +-- Should contain two listeners +LISTEN notify_async2; +LISTEN SIMILAR TO 'notify_async2'; +LISTEN SIMILAR TO 'notify_%'; +SELECT pg_listening_channels(); + +-- Should contain one listener +UNLISTEN 'notify_%'; +SELECT pg_listening_channels(); + +-- Should not contain listeners +UNLISTEN *; +SELECT pg_listening_channels(); + -- Should return zero while there are no pending notifications. -- src/test/isolation/specs/async-notify.spec tests for actual usage. SELECT pg_notification_queue_usage();