diff --git a/configure.in b/configure.in index cea7fd0755..1a163a1be5 100644 --- a/configure.in +++ b/configure.in @@ -1004,6 +1004,7 @@ AC_SEARCH_LIBS(crypt, crypt) AC_SEARCH_LIBS(shm_open, rt) AC_SEARCH_LIBS(shm_unlink, rt) AC_SEARCH_LIBS(clock_gettime, [rt posix4]) +AC_SEARCH_LIBS(ppoll, c) # Solaris: AC_SEARCH_LIBS(fdatasync, [rt posix4]) # Required for thread_test.c on Solaris @@ -1418,7 +1419,7 @@ PGAC_FUNC_WCSTOMBS_L LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` -AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range utime utimes wcstombs_l]) +AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range utime utimes wcstombs_l]) AC_REPLACE_FUNCS(fseeko) case $host_os in diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 5d8a01c72c..342a55baab 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -47,6 +47,9 @@ #ifdef HAVE_SYS_SELECT_H #include #endif +#ifdef HAVE_POLL_H +#include +#endif #ifdef HAVE_SYS_RESOURCE_H #include /* for getrlimit */ @@ -56,6 +59,10 @@ #define M_PI 3.14159265358979323846 #endif +#ifdef HAVE_PPOLL +#define USE_PPOLL 1 +#endif + #include "pgbench.h" #define ERRCODE_UNDEFINED_TABLE "42P01" @@ -90,6 +97,27 @@ static int pthread_join(pthread_t th, void **thread_return); #define MAXCLIENTS 1024 #endif +#ifdef USE_PPOLL +#define SCKTWTMTHD "ppoll" +#undef MAXCLIENTS +#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP) +#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP) +#define PFD_CLREV(s) { do { if ((s)->pfdp) { (s)->pfdp->revents = 0; } } while(0); } +#define PFD_ZERO(s) { do { if ((s)->pfdp) { (s)->pfdp->fd = 0; (s)->pfdp->events = (s)->pfdp->revents = 0; } } while(0); } +#define PFD_SETFD(s) { do { (s)->pfdp->fd = PQsocket((s)->con); } while(0); } +#define PFD_STRUCT_POLLFD(p) struct pollfd (p); +#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); } +#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; (s)[_i].pfdp->fd = -1; } } while (0); } +#else +#define SCKTWTMTHD "select" +#define PFD_CLREV(s) +#define PFD_ZERO(s) +#define PFD_SETFD(s) +#define PFD_STRUCT_POLLFD(p) +#define PFD_THREAD_FREE(t) +#define PFD_THREAD_INIT(t,s,n) +#endif + #define LOG_STEP_SECONDS 5 /* seconds between log messages */ #define DEFAULT_NXACTS 10 /* default nxacts */ @@ -331,6 +359,7 @@ typedef struct /* per client collected stats */ int64 cnt; /* client transaction count, for -t */ int ecnt; /* error count */ + PFD_STRUCT_POLLFD(*pfdp) } CState; /* @@ -351,6 +380,7 @@ typedef struct instr_time conn_time; StatsData stats; int64 latency_late; /* executed but late transactions */ + PFD_STRUCT_POLLFD(*pfds) } TState; #define INVALID_THREAD ((pthread_t) 0) @@ -458,6 +488,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2); static void addScript(ParsedScript script); static void *threadRun(void *arg); static void setalarm(int seconds); +static void finishCon(CState *st); /* callback functions for our flex lexer */ @@ -2137,6 +2168,8 @@ doCustom(TState *thread, CState *st, StatsData *agg) /* Reset session-local state */ memset(st->prepared, 0, sizeof(st->prepared)); + PFD_ZERO(st); + PFD_SETFD(st); } /* @@ -2402,8 +2435,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) if (is_connect) { - PQfinish(st->con); - st->con = NULL; + finishCon(st); INSTR_TIME_SET_ZERO(now); } @@ -2441,10 +2473,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) case CSTATE_ABORTED: case CSTATE_FINISHED: if (st->con != NULL) - { - PQfinish(st->con); - st->con = NULL; - } + finishCon(st); return; } } @@ -2592,13 +2621,7 @@ disconnect_all(CState *state, int length) int i; for (i = 0; i < length; i++) - { - if (state[i].con) - { - PQfinish(state[i].con); - state[i].con = NULL; - } - } + finishCon(&state[i]); } /* create tables and setup data */ @@ -3763,7 +3786,11 @@ main(int argc, char **argv) case 'c': benchmarking_option_set = true; nclients = atoi(optarg); +#ifdef USE_PPOLL + if (nclients <= 0) +#else if (nclients <= 0 || nclients > MAXCLIENTS) +#endif { fprintf(stderr, "invalid number of clients: \"%s\"\n", optarg); @@ -4459,6 +4486,8 @@ threadRun(void *arg) } } + PFD_THREAD_INIT(thread, state, nstate); + if (!is_connect) { /* make connections to the database */ @@ -4466,6 +4495,7 @@ threadRun(void *arg) { if ((state[i].con = doConnect()) == NULL) goto done; + PFD_SETFD(&state[i]); } } @@ -4482,14 +4512,16 @@ threadRun(void *arg) /* loop till all clients have terminated */ while (remains > 0) { - fd_set input_mask; int maxsock; /* max socket number to be waited for */ int64 min_usec; int64 now_usec = 0; /* set this only if needed */ +#ifndef USE_PPOLL + fd_set input_mask; /* identify which client sockets should be checked for input */ FD_ZERO(&input_mask); maxsock = -1; +#endif min_usec = PG_INT64_MAX; for (i = 0; i < nstate; i++) { @@ -4499,8 +4531,7 @@ threadRun(void *arg) { /* interrupt client that has not started a transaction */ st->state = CSTATE_FINISHED; - PQfinish(st->con); - st->con = NULL; + finishCon(st); remains--; } else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE) @@ -4538,7 +4569,13 @@ threadRun(void *arg) goto done; } +#ifdef USE_PPOLL + maxsock = 0; + (st->pfdp)->events = POLL_EVENTS; + (st->pfdp)->revents = 0; +#else FD_SET(sock, &input_mask); +#endif if (maxsock < sock) maxsock = sock; } @@ -4586,11 +4623,19 @@ threadRun(void *arg) { if (maxsock != -1) { +#ifdef USE_PPOLL + struct timespec timeout; + + timeout.tv_sec = min_usec / 1000000; + timeout.tv_nsec = min_usec % 1000000000; + nsocks = ppoll(thread->pfds, nstate, &timeout, NULL); +#else struct timeval timeout; timeout.tv_sec = min_usec / 1000000; timeout.tv_usec = min_usec % 1000000; nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout); +#endif } else /* nothing active, simple sleep */ { @@ -4599,7 +4644,11 @@ threadRun(void *arg) } else /* no explicit delay, select without timeout */ { +#ifdef USE_PPOLL + nsocks = ppoll(thread->pfds, nstate, NULL, NULL); +#else nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL); +#endif } if (nsocks < 0) @@ -4610,15 +4659,17 @@ threadRun(void *arg) continue; } /* must be something wrong */ - fprintf(stderr, "select() failed: %s\n", strerror(errno)); + fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno)); goto done; } } +#ifndef USE_PPOLL else /* min_usec == 0, i.e. something needs to be executed */ { /* If we didn't call select(), don't try to read any data */ FD_ZERO(&input_mask); } +#endif /* ok, advance the state machine of each connection */ for (i = 0; i < nstate; i++) @@ -4637,7 +4688,18 @@ threadRun(void *arg) goto done; } +#ifdef USE_PPOLL + if ((st->pfdp)->revents & POLL_FAIL) + { + fprintf(stderr, + "ppoll() fail - errno: %d, i: %d, events: %x\n", + errno, i, ((st->pfdp)->revents & POLL_FAIL)); + } + + if (!(st->pfdp)->revents) +#else if (!FD_ISSET(sock, &input_mask)) +#endif continue; } else if (st->state == CSTATE_FINISHED || @@ -4651,7 +4713,10 @@ threadRun(void *arg) /* If doCustom changed client to finished state, reduce remains */ if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) + { remains--; + PFD_ZERO(st); + } } /* progress report is made by thread 0 for all threads */ @@ -4765,9 +4830,21 @@ done: fclose(thread->logfile); thread->logfile = NULL; } + PFD_THREAD_FREE(thread); return NULL; } +static void +finishCon(CState *st) +{ + if (st->con) + { + PQfinish(st->con); + st->con = NULL; + } + PFD_ZERO(st); +} + /* * Support for duration option: set timer_exceeded after so many seconds. */ diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index cfdcc5ac62..21d9bc0300 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -394,6 +394,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_POLL_H +/* Define to 1 if you have the `ppoll' function. */ +#undef HAVE_PPOLL + /* Define to 1 if you have the `posix_fadvise' function. */ #undef HAVE_POSIX_FADVISE