diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index cf3f773..14c4163 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -23,6 +23,8 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for -- test db filtering \set prevdb :DBNAME +show session_pool_size; +show session_pool_ports; \c template1 SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1'); diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 5d13e6a..5a93c7e 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -178,7 +178,6 @@ static List *overrideStack = NIL; * committed its creation, depending on whether myTempNamespace is valid. */ static Oid myTempNamespace = InvalidOid; - static Oid myTempToastNamespace = InvalidOid; static SubTransactionId myTempNamespaceSubID = InvalidSubTransactionId; @@ -193,6 +192,7 @@ char *namespace_search_path = NULL; /* Local functions */ static void recomputeNamespacePath(void); static void InitTempTableNamespace(void); +static Oid GetTempTableNamespace(void); static void RemoveTempRelations(Oid tempNamespaceId); static void RemoveTempRelationsCallback(int code, Datum arg); static void NamespaceCallback(Datum arg, int cacheid, uint32 hashvalue); @@ -460,9 +460,7 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation) if (strcmp(newRelation->schemaname, "pg_temp") == 0) { /* Initialize temp namespace if first time through */ - if (!OidIsValid(myTempNamespace)) - InitTempTableNamespace(); - return myTempNamespace; + return GetTempTableNamespace(); } /* use exact schema given */ namespaceId = get_namespace_oid(newRelation->schemaname, false); @@ -471,9 +469,7 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation) else if (newRelation->relpersistence == RELPERSISTENCE_TEMP) { /* Initialize temp namespace if first time through */ - if (!OidIsValid(myTempNamespace)) - InitTempTableNamespace(); - return myTempNamespace; + return GetTempTableNamespace(); } else { @@ -482,8 +478,7 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation) if (activeTempCreationPending) { /* Need to initialize temp namespace */ - InitTempTableNamespace(); - return myTempNamespace; + return GetTempTableNamespace(); } namespaceId = activeCreationNamespace; if (!OidIsValid(namespaceId)) @@ -2921,9 +2916,7 @@ LookupCreationNamespace(const char *nspname) if (strcmp(nspname, "pg_temp") == 0) { /* Initialize temp namespace if first time through */ - if (!OidIsValid(myTempNamespace)) - InitTempTableNamespace(); - return myTempNamespace; + return GetTempTableNamespace(); } namespaceId = get_namespace_oid(nspname, false); @@ -2986,9 +2979,7 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p) if (strcmp(schemaname, "pg_temp") == 0) { /* Initialize temp namespace if first time through */ - if (!OidIsValid(myTempNamespace)) - InitTempTableNamespace(); - return myTempNamespace; + return GetTempTableNamespace(); } /* use exact schema given */ namespaceId = get_namespace_oid(schemaname, false); @@ -3001,8 +2992,7 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p) if (activeTempCreationPending) { /* Need to initialize temp namespace */ - InitTempTableNamespace(); - return myTempNamespace; + return GetTempTableNamespace(); } namespaceId = activeCreationNamespace; if (!OidIsValid(namespaceId)) @@ -3254,16 +3244,28 @@ int GetTempNamespaceBackendId(Oid namespaceId) { int result; - char *nspname; + char *nspname, + *addlevel; /* See if the namespace name starts with "pg_temp_" or "pg_toast_temp_" */ nspname = get_namespace_name(namespaceId); if (!nspname) return InvalidBackendId; /* no such namespace? */ if (strncmp(nspname, "pg_temp_", 8) == 0) - result = atoi(nspname + 8); + { + /* check for session id */ + if ((addlevel = strstr(nspname + 8, "_")) != NULL) + result = atoi(addlevel + 1); + else + result = atoi(nspname + 8); + } else if (strncmp(nspname, "pg_toast_temp_", 14) == 0) - result = atoi(nspname + 14); + { + if ((addlevel = strstr(nspname + 14, "_")) != NULL) + result = atoi(addlevel + 1); + else + result = atoi(nspname + 14); + } else result = InvalidBackendId; pfree(nspname); @@ -3309,8 +3311,11 @@ void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId) { /* Worker should not have created its own namespaces ... */ - Assert(myTempNamespace == InvalidOid); - Assert(myTempToastNamespace == InvalidOid); + if (!ActiveSession) + { + Assert(myTempNamespace == InvalidOid); + Assert(myTempToastNamespace == InvalidOid); + } Assert(myTempNamespaceSubID == InvalidSubTransactionId); /* Assign same namespace OIDs that leader has */ @@ -3830,6 +3835,24 @@ recomputeNamespacePath(void) list_free(oidlist); } +static Oid +GetTempTableNamespace(void) +{ + if (ActiveSession) + { + if (!OidIsValid(ActiveSession->tempNamespace)) + InitTempTableNamespace(); + else + myTempNamespace = ActiveSession->tempNamespace; + } + else + { + if (!OidIsValid(myTempNamespace)) + InitTempTableNamespace(); + } + return myTempNamespace; +} + /* * InitTempTableNamespace * Initialize temp table namespace on first use in a particular backend @@ -3841,8 +3864,6 @@ InitTempTableNamespace(void) Oid namespaceId; Oid toastspaceId; - Assert(!OidIsValid(myTempNamespace)); - /* * First, do permission check to see if we are authorized to make temp * tables. We use a nonstandard error message here since "databasename: @@ -3881,7 +3902,12 @@ InitTempTableNamespace(void) (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), errmsg("cannot create temporary tables during a parallel operation"))); - snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d", MyBackendId); + if (ActiveSession) + snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d_%u", + ActiveSession->id, MyBackendId); + else + snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d", + MyBackendId); namespaceId = get_namespace_oid(namespaceName, true); if (!OidIsValid(namespaceId)) @@ -3913,8 +3939,12 @@ InitTempTableNamespace(void) * it. (We assume there is no need to clean it out if it does exist, since * dropping a parent table should make its toast table go away.) */ - snprintf(namespaceName, sizeof(namespaceName), "pg_toast_temp_%d", - MyBackendId); + if (ActiveSession) + snprintf(namespaceName, sizeof(namespaceName), "pg_toast_temp_%d_%u", + ActiveSession->id, MyBackendId); + else + snprintf(namespaceName, sizeof(namespaceName), "pg_toast_temp_%u", + MyBackendId); toastspaceId = get_namespace_oid(namespaceName, true); if (!OidIsValid(toastspaceId)) @@ -3945,6 +3975,11 @@ InitTempTableNamespace(void) */ MyProc->tempNamespaceId = namespaceId; + if (ActiveSession) + { + ActiveSession->tempNamespace = namespaceId; + ActiveSession->tempToastNamespace = toastspaceId; + } /* It should not be done already. */ AssertState(myTempNamespaceSubID == InvalidSubTransactionId); myTempNamespaceSubID = GetCurrentSubTransactionId(); @@ -3974,6 +4009,11 @@ AtEOXact_Namespace(bool isCommit, bool parallel) { myTempNamespace = InvalidOid; myTempToastNamespace = InvalidOid; + if (ActiveSession) + { + ActiveSession->tempNamespace = InvalidOid; + ActiveSession->tempToastNamespace = InvalidOid; + } baseSearchPathValid = false; /* need to rebuild list */ /* @@ -4121,13 +4161,16 @@ RemoveTempRelations(Oid tempNamespaceId) static void RemoveTempRelationsCallback(int code, Datum arg) { - if (OidIsValid(myTempNamespace)) /* should always be true */ + Oid tempNamespace = ActiveSession ? + ActiveSession->tempNamespace : myTempNamespace; + + if (OidIsValid(tempNamespace)) /* should always be true */ { /* Need to ensure we have a usable transaction. */ AbortOutOfAnyTransaction(); StartTransactionCommand(); - RemoveTempRelations(myTempNamespace); + RemoveTempRelations(tempNamespace); CommitTransactionCommand(); } @@ -4137,10 +4180,19 @@ RemoveTempRelationsCallback(int code, Datum arg) * Remove all temp tables from the temporary namespace. */ void -ResetTempTableNamespace(void) +ResetTempTableNamespace(Oid npc) { - if (OidIsValid(myTempNamespace)) - RemoveTempRelations(myTempNamespace); + if (OidIsValid(npc)) + { + AbortOutOfAnyTransaction(); + StartTransactionCommand(); + RemoveTempRelations(npc); + CommitTransactionCommand(); + } + else + /* global */ + if (OidIsValid(myTempNamespace)) + RemoveTempRelations(myTempNamespace); } diff --git a/src/backend/catalog/pg_db_role_setting.c b/src/backend/catalog/pg_db_role_setting.c index e123691..23ff527 100644 --- a/src/backend/catalog/pg_db_role_setting.c +++ b/src/backend/catalog/pg_db_role_setting.c @@ -16,6 +16,7 @@ #include "catalog/indexing.h" #include "catalog/objectaccess.h" #include "catalog/pg_db_role_setting.h" +#include "storage/proc.h" #include "utils/fmgroids.h" #include "utils/rel.h" #include "utils/tqual.h" diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c index 5df4382..f57a950 100644 --- a/src/backend/catalog/storage.c +++ b/src/backend/catalog/storage.c @@ -24,6 +24,7 @@ #include "access/xlog.h" #include "access/xloginsert.h" #include "access/xlogutils.h" +#include "catalog/namespace.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" #include "storage/freespace.h" diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 9bc67ce..3c90f8d 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -2447,7 +2447,7 @@ CopyFrom(CopyState cstate) * registers the snapshot it uses. */ InvalidateCatalogSnapshot(); - if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals()) + if (!ThereAreNoPriorRegisteredSnapshots() || (SessionPoolSize == 0 && !ThereAreNoReadyPortals())) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot perform FREEZE because of prior transaction activity"))); diff --git a/src/backend/commands/discard.c b/src/backend/commands/discard.c index 01a999c..363a52a 100644 --- a/src/backend/commands/discard.c +++ b/src/backend/commands/discard.c @@ -45,7 +45,7 @@ DiscardCommand(DiscardStmt *stmt, bool isTopLevel) break; case DISCARD_TEMP: - ResetTempTableNamespace(); + ResetTempTableNamespace(InvalidOid); break; default: @@ -73,6 +73,6 @@ DiscardAll(bool isTopLevel) Async_UnlistenAll(); LockReleaseAll(USER_LOCKMETHOD, true); ResetPlanCache(); - ResetTempTableNamespace(); + ResetTempTableNamespace(InvalidOid); ResetSequenceCaches(); } diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index b945b15..1696500 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -30,9 +30,11 @@ #include "parser/parse_expr.h" #include "parser/parse_type.h" #include "rewrite/rewriteHandler.h" +#include "storage/proc.h" #include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/builtins.h" +#include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/timestamp.h" @@ -43,9 +45,7 @@ * The keys for this hash table are the arguments to PREPARE and EXECUTE * (statement names); the entries are PreparedStatement structs. */ -static HTAB *prepared_queries = NULL; - -static void InitQueryHashTable(void); +static HTAB *InitQueryHashTable(MemoryContext mcxt); static ParamListInfo EvaluateParams(PreparedStatement *pstmt, List *params, const char *queryString, EState *estate); static Datum build_regtype_array(Oid *param_types, int num_params); @@ -427,20 +427,43 @@ EvaluateParams(PreparedStatement *pstmt, List *params, /* * Initialize query hash table upon first use. */ -static void -InitQueryHashTable(void) +static HTAB * +InitQueryHashTable(MemoryContext mcxt) { - HASHCTL hash_ctl; + HTAB *res; + MemoryContext old_mcxt; + HASHCTL hash_ctl; MemSet(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = NAMEDATALEN; hash_ctl.entrysize = sizeof(PreparedStatement); + hash_ctl.hcxt = mcxt; + + old_mcxt = MemoryContextSwitchTo(mcxt); + res = hash_create("Prepared Queries", 32, &hash_ctl, HASH_ELEM | HASH_CONTEXT); + MemoryContextSwitchTo(old_mcxt); - prepared_queries = hash_create("Prepared Queries", - 32, - &hash_ctl, - HASH_ELEM); + return res; +} + +static HTAB * +get_prepared_queries_htab(bool init) +{ + static HTAB *prepared_queries = NULL; + + if (ActiveSession) + { + if (init && !ActiveSession->prepared_queries) + ActiveSession->prepared_queries = InitQueryHashTable(ActiveSession->memory); + return ActiveSession->prepared_queries; + } + + /* Initialize the global hash table, if necessary */ + if (init && !prepared_queries) + prepared_queries = InitQueryHashTable(TopMemoryContext); + + return prepared_queries; } /* @@ -458,12 +481,9 @@ StorePreparedStatement(const char *stmt_name, TimestampTz cur_ts = GetCurrentStatementStartTimestamp(); bool found; - /* Initialize the hash table, if necessary */ - if (!prepared_queries) - InitQueryHashTable(); /* Add entry to hash table */ - entry = (PreparedStatement *) hash_search(prepared_queries, + entry = (PreparedStatement *) hash_search(get_prepared_queries_htab(true), stmt_name, HASH_ENTER, &found); @@ -495,13 +515,14 @@ PreparedStatement * FetchPreparedStatement(const char *stmt_name, bool throwError) { PreparedStatement *entry; + HTAB *queries = get_prepared_queries_htab(false); /* * If the hash table hasn't been initialized, it can't be storing * anything, therefore it couldn't possibly store our plan. */ - if (prepared_queries) - entry = (PreparedStatement *) hash_search(prepared_queries, + if (queries) + entry = (PreparedStatement *) hash_search(queries, stmt_name, HASH_FIND, NULL); @@ -579,7 +600,11 @@ DeallocateQuery(DeallocateStmt *stmt) void DropPreparedStatement(const char *stmt_name, bool showError) { - PreparedStatement *entry; + PreparedStatement *entry; + HTAB *queries = get_prepared_queries_htab(false); + + if (!queries) + return; /* Find the query's hash table entry; raise error if wanted */ entry = FetchPreparedStatement(stmt_name, showError); @@ -590,7 +615,7 @@ DropPreparedStatement(const char *stmt_name, bool showError) DropCachedPlan(entry->plansource); /* Now we can remove the hash table entry */ - hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL); + hash_search(queries, entry->stmt_name, HASH_REMOVE, NULL); } } @@ -602,20 +627,21 @@ DropAllPreparedStatements(void) { HASH_SEQ_STATUS seq; PreparedStatement *entry; + HTAB *queries = get_prepared_queries_htab(false); /* nothing cached */ - if (!prepared_queries) + if (!queries) return; /* walk over cache */ - hash_seq_init(&seq, prepared_queries); + hash_seq_init(&seq, queries); while ((entry = hash_seq_search(&seq)) != NULL) { /* Release the plancache entry */ DropCachedPlan(entry->plansource); /* Now we can remove the hash table entry */ - hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL); + hash_search(queries, entry->stmt_name, HASH_REMOVE, NULL); } } @@ -710,10 +736,11 @@ Datum pg_prepared_statement(PG_FUNCTION_ARGS) { ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupdesc; + TupleDesc tupdesc; Tuplestorestate *tupstore; - MemoryContext per_query_ctx; - MemoryContext oldcontext; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + HTAB *queries; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) @@ -757,13 +784,13 @@ pg_prepared_statement(PG_FUNCTION_ARGS) /* generate junk in short-term context */ MemoryContextSwitchTo(oldcontext); - /* hash table might be uninitialized */ - if (prepared_queries) + queries = get_prepared_queries_htab(false); + if (queries) { HASH_SEQ_STATUS hash_seq; PreparedStatement *prep_stmt; - hash_seq_init(&hash_seq, prepared_queries); + hash_seq_init(&hash_seq, queries); while ((prep_stmt = hash_seq_search(&hash_seq)) != NULL) { Datum values[5]; diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 89122d4..7843d9d 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -90,8 +90,6 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */ * last_used_seq is updated by nextval() to point to the last used * sequence. */ -static SeqTableData *last_used_seq = NULL; - static void fill_seq_with_data(Relation rel, HeapTuple tuple); static Relation lock_and_open_sequence(SeqTable seq); static void create_seq_hashtable(void); diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index d349d7c..3afacee 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -144,6 +144,7 @@ secure_read(Port *port, void *ptr, size_t len) { ssize_t n; int waitfor; + WaitEventSet *waitset = pq_get_current_waitset(); retry: #ifdef USE_SSL @@ -166,9 +167,9 @@ retry: Assert(waitfor); - ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL); + ModifyWaitEvent(waitset, 0, waitfor, NULL); - WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, + WaitEventSetWait(waitset, -1 /* no timeout */ , &event, 1, WAIT_EVENT_CLIENT_READ); /* @@ -247,6 +248,7 @@ secure_write(Port *port, void *ptr, size_t len) { ssize_t n; int waitfor; + WaitEventSet *waitset = pq_get_current_waitset(); retry: waitfor = 0; @@ -268,9 +270,9 @@ retry: Assert(waitfor); - ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL); + ModifyWaitEvent(waitset, 0, waitfor, NULL); - WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, + WaitEventSetWait(waitset, -1 /* no timeout */ , &event, 1, WAIT_EVENT_CLIENT_WRITE); /* See comments in secure_read. */ diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index a4f6d4d..51d4f0b 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -13,7 +13,7 @@ * copy is aborted by an ereport(ERROR), we need to close out the copy so that * the frontend gets back into sync. Therefore, these routines have to be * aware of COPY OUT state. (New COPY-OUT is message-based and does *not* - * set the DoingCopyOut flag.) + * set the is_doing_copyout flag.) * * NOTE: generally, it's a bad idea to emit outgoing messages directly with * pq_putbytes(), especially if the message would require multiple calls @@ -87,12 +87,14 @@ #ifdef _MSC_VER /* mstcpip.h is missing on mingw */ #include #endif +#include #include "common/ip.h" #include "libpq/libpq.h" #include "miscadmin.h" #include "port/pg_bswap.h" #include "storage/ipc.h" +#include "storage/proc.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -134,23 +136,6 @@ static List *sock_paths = NIL; #define PQ_SEND_BUFFER_SIZE 8192 #define PQ_RECV_BUFFER_SIZE 8192 -static char *PqSendBuffer; -static int PqSendBufferSize; /* Size send buffer */ -static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */ -static int PqSendStart; /* Next index to send a byte in PqSendBuffer */ - -static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; -static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ -static int PqRecvLength; /* End of data available in PqRecvBuffer */ - -/* - * Message status - */ -static bool PqCommBusy; /* busy sending data to the client */ -static bool PqCommReadingMsg; /* in the middle of reading a message */ -static bool DoingCopyOut; /* in old-protocol COPY OUT processing */ - - /* Internal functions */ static void socket_comm_reset(void); static void socket_close(int code, Datum arg); @@ -181,28 +166,55 @@ static PQcommMethods PqCommSocketMethods = { socket_endcopyout }; -PQcommMethods *PqCommMethods = &PqCommSocketMethods; +/* These variables used to be global */ +struct PQcommState { + Port *port; + MemoryContext mcxt; -WaitEventSet *FeBeWaitSet; + /* Message status */ + bool is_busy; /* busy sending data to the client */ + bool is_reading; /* in the middle of reading a message */ + bool is_doing_copyout; /* in old-protocol COPY OUT processing */ + char *send_buf; + int send_bufsize; /* Size send buffer */ + int send_offset; /* Next index to store a byte in send_buf */ + int send_start; /* Next index to send a byte in send_buf */ -/* -------------------------------- - * pq_init - initialize libpq at backend startup - * -------------------------------- + char recv_buf[PQ_RECV_BUFFER_SIZE]; + int recv_offset; /* Next index to read a byte from pqstate->recv_buf */ + int recv_len; /* End of data available in pqstate->recv_buf */ + + /* Wait events set */ + WaitEventSet *wait_events; +}; + +static struct PQcommState *pqstate = NULL; +PQcommMethods *PqCommMethods = &PqCommSocketMethods; + +/* + * Create common wait event for a backend */ -void -pq_init(void) +WaitEventSet * +pq_create_backend_event_set(MemoryContext mcxt, Port *port, + bool onlySock) { - /* initialize state variables */ - PqSendBufferSize = PQ_SEND_BUFFER_SIZE; - PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize); - PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0; - PqCommBusy = false; - PqCommReadingMsg = false; - DoingCopyOut = false; + WaitEventSet *result; + int nevents = onlySock ? 1 : 3; + + result = CreateWaitEventSet(mcxt, nevents); + + AddWaitEventToSet(result, WL_SOCKET_WRITEABLE, port->sock, + NULL, NULL); + + if (!onlySock) + { + AddWaitEventToSet(result, WL_LATCH_SET, -1, MyLatch, NULL); + AddWaitEventToSet(result, WL_POSTMASTER_DEATH, -1, NULL, NULL); - /* set up process-exit hook to close the socket */ - on_proc_exit(socket_close, 0); + /* set up process-exit hook to close the socket */ + on_proc_exit(socket_close, 0); + } /* * In backends (as soon as forked) we operate the underlying socket in @@ -215,16 +227,65 @@ pq_init(void) * infinite recursion. */ #ifndef WIN32 - if (!pg_set_noblock(MyProcPort->sock)) + if (!pg_set_noblock(port->sock)) ereport(COMMERROR, (errmsg("could not set socket to nonblocking mode: %m"))); #endif - FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3); - AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, - NULL, NULL); - AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL); - AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL); + return result; +} + +/* -------------------------------- + * pq_init - initialize libpq at backend startup + * -------------------------------- + */ +void * +pq_init(MemoryContext mcxt) +{ + struct PQcommState *state = + MemoryContextAllocZero(mcxt, sizeof(struct PQcommState)); + + /* initialize state variables */ + state->mcxt = mcxt; + + state->send_bufsize = PQ_SEND_BUFFER_SIZE; + state->send_buf = MemoryContextAlloc(mcxt, state->send_bufsize); + state->send_offset = state->send_start = state->recv_offset = state->recv_len = 0; + state->is_busy = false; + state->is_reading = false; + state->is_doing_copyout = false; + + state->wait_events = NULL; + return (void *) state; +} + +void +pq_set_current_state(void *state, Port *port, WaitEventSet *set) +{ + pqstate = (struct PQcommState *) state; + + if (pqstate) + { + pq_reset(); + pqstate->port = port; + pqstate->wait_events = set; + } +} + +WaitEventSet * +pq_get_current_waitset(void) +{ + return pqstate ? pqstate->wait_events : NULL; +} + +void +pq_reset(void) +{ + pqstate->send_offset = pqstate->send_start = 0; + pqstate->recv_offset = pqstate->recv_len = 0; + pqstate->is_busy = false; + pqstate->is_reading = false; + pqstate->is_doing_copyout = false; } /* -------------------------------- @@ -239,7 +300,7 @@ static void socket_comm_reset(void) { /* Do not throw away pending data, but do reset the busy flag */ - PqCommBusy = false; + pqstate->is_busy = false; /* We can abort any old-style COPY OUT, too */ pq_endcopyout(true); } @@ -255,8 +316,8 @@ socket_comm_reset(void) static void socket_close(int code, Datum arg) { - /* Nothing to do in a standalone backend, where MyProcPort is NULL. */ - if (MyProcPort != NULL) + /* Nothing to do in a standalone backend, where pqstate->port is NULL. */ + if (pqstate->port != NULL) { #if defined(ENABLE_GSS) || defined(ENABLE_SSPI) #ifdef ENABLE_GSS @@ -267,11 +328,11 @@ socket_close(int code, Datum arg) * BackendInitialize(), because pg_GSS_recvauth() makes first use of * "ctx" and "cred". */ - if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT) - gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL); + if (pqstate->port->gss->ctx != GSS_C_NO_CONTEXT) + gss_delete_sec_context(&min_s, &pqstate->port->gss->ctx, NULL); - if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL) - gss_release_cred(&min_s, &MyProcPort->gss->cred); + if (pqstate->port->gss->cred != GSS_C_NO_CREDENTIAL) + gss_release_cred(&min_s, &pqstate->port->gss->cred); #endif /* ENABLE_GSS */ /* @@ -279,14 +340,14 @@ socket_close(int code, Datum arg) * postmaster child free this, doing so is safe when interrupting * BackendInitialize(). */ - free(MyProcPort->gss); + free(pqstate->port->gss); #endif /* ENABLE_GSS || ENABLE_SSPI */ /* * Cleanly shut down SSL layer. Nowhere else does a postmaster child * call this, so this is safe when interrupting BackendInitialize(). */ - secure_close(MyProcPort); + secure_close(pqstate->port); /* * Formerly we did an explicit close() here, but it seems better to @@ -298,7 +359,7 @@ socket_close(int code, Datum arg) * We do set sock to PGINVALID_SOCKET to prevent any further I/O, * though. */ - MyProcPort->sock = PGINVALID_SOCKET; + pqstate->port->sock = PGINVALID_SOCKET; } } @@ -921,12 +982,12 @@ RemoveSocketFiles(void) static void socket_set_nonblocking(bool nonblocking) { - if (MyProcPort == NULL) + if (pqstate->port == NULL) ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), errmsg("there is no client connection"))); - MyProcPort->noblock = nonblocking; + pqstate->port->noblock = nonblocking; } /* -------------------------------- @@ -938,30 +999,30 @@ socket_set_nonblocking(bool nonblocking) static int pq_recvbuf(void) { - if (PqRecvPointer > 0) + if (pqstate->recv_offset > 0) { - if (PqRecvLength > PqRecvPointer) + if (pqstate->recv_len > pqstate->recv_offset) { /* still some unread data, left-justify it in the buffer */ - memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer, - PqRecvLength - PqRecvPointer); - PqRecvLength -= PqRecvPointer; - PqRecvPointer = 0; + memmove(pqstate->recv_buf, pqstate->recv_buf + pqstate->recv_offset, + pqstate->recv_len - pqstate->recv_offset); + pqstate->recv_len -= pqstate->recv_offset; + pqstate->recv_offset = 0; } else - PqRecvLength = PqRecvPointer = 0; + pqstate->recv_len = pqstate->recv_offset = 0; } /* Ensure that we're in blocking mode */ socket_set_nonblocking(false); - /* Can fill buffer from PqRecvLength and upwards */ + /* Can fill buffer from pqstate->recv_len and upwards */ for (;;) { int r; - r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, - PQ_RECV_BUFFER_SIZE - PqRecvLength); + r = secure_read(pqstate->port, pqstate->recv_buf + pqstate->recv_len, + PQ_RECV_BUFFER_SIZE - pqstate->recv_len); if (r < 0) { @@ -987,7 +1048,7 @@ pq_recvbuf(void) return EOF; } /* r contains number of bytes read, so just incr length */ - PqRecvLength += r; + pqstate->recv_len += r; return 0; } } @@ -999,14 +1060,14 @@ pq_recvbuf(void) int pq_getbyte(void) { - Assert(PqCommReadingMsg); + Assert(pqstate->is_reading); - while (PqRecvPointer >= PqRecvLength) + while (pqstate->recv_offset >= pqstate->recv_len) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } - return (unsigned char) PqRecvBuffer[PqRecvPointer++]; + return (unsigned char) pqstate->recv_buf[pqstate->recv_offset++]; } /* -------------------------------- @@ -1018,14 +1079,25 @@ pq_getbyte(void) int pq_peekbyte(void) { - Assert(PqCommReadingMsg); + Assert(pqstate->is_reading); - while (PqRecvPointer >= PqRecvLength) + while (pqstate->recv_offset >= pqstate->recv_len) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } - return (unsigned char) PqRecvBuffer[PqRecvPointer]; + return (unsigned char) pqstate->recv_buf[pqstate->recv_offset]; +} + +/* -------------------------------- + * pq_available_bytes - get number of buffered bytes available for reading. + * + * -------------------------------- + */ +int +pq_available_bytes(void) +{ + return pqstate->recv_len - pqstate->recv_offset; } /* -------------------------------- @@ -1041,18 +1113,18 @@ pq_getbyte_if_available(unsigned char *c) { int r; - Assert(PqCommReadingMsg); + Assert(pqstate->is_reading); - if (PqRecvPointer < PqRecvLength) + if (pqstate->recv_offset < pqstate->recv_len) { - *c = PqRecvBuffer[PqRecvPointer++]; + *c = pqstate->recv_buf[pqstate->recv_offset++]; return 1; } /* Put the socket into non-blocking mode */ socket_set_nonblocking(true); - r = secure_read(MyProcPort, c, 1); + r = secure_read(pqstate->port, c, 1); if (r < 0) { /* @@ -1095,20 +1167,20 @@ pq_getbytes(char *s, size_t len) { size_t amount; - Assert(PqCommReadingMsg); + Assert(pqstate->is_reading); while (len > 0) { - while (PqRecvPointer >= PqRecvLength) + while (pqstate->recv_offset >= pqstate->recv_len) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } - amount = PqRecvLength - PqRecvPointer; + amount = pqstate->recv_len - pqstate->recv_offset; if (amount > len) amount = len; - memcpy(s, PqRecvBuffer + PqRecvPointer, amount); - PqRecvPointer += amount; + memcpy(s, pqstate->recv_buf + pqstate->recv_offset, amount); + pqstate->recv_offset += amount; s += amount; len -= amount; } @@ -1129,19 +1201,19 @@ pq_discardbytes(size_t len) { size_t amount; - Assert(PqCommReadingMsg); + Assert(pqstate->is_reading); while (len > 0) { - while (PqRecvPointer >= PqRecvLength) + while (pqstate->recv_offset >= pqstate->recv_len) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } - amount = PqRecvLength - PqRecvPointer; + amount = pqstate->recv_len - pqstate->recv_offset; if (amount > len) amount = len; - PqRecvPointer += amount; + pqstate->recv_offset += amount; len -= amount; } return 0; @@ -1167,35 +1239,35 @@ pq_getstring(StringInfo s) { int i; - Assert(PqCommReadingMsg); + Assert(pqstate->is_reading); resetStringInfo(s); /* Read until we get the terminating '\0' */ for (;;) { - while (PqRecvPointer >= PqRecvLength) + while (pqstate->recv_offset >= pqstate->recv_len) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } - for (i = PqRecvPointer; i < PqRecvLength; i++) + for (i = pqstate->recv_offset; i < pqstate->recv_len; i++) { - if (PqRecvBuffer[i] == '\0') + if (pqstate->recv_buf[i] == '\0') { /* include the '\0' in the copy */ - appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer, - i - PqRecvPointer + 1); - PqRecvPointer = i + 1; /* advance past \0 */ + appendBinaryStringInfo(s, pqstate->recv_buf + pqstate->recv_offset, + i - pqstate->recv_offset + 1); + pqstate->recv_offset = i + 1; /* advance past \0 */ return 0; } } /* If we're here we haven't got the \0 in the buffer yet. */ - appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer, - PqRecvLength - PqRecvPointer); - PqRecvPointer = PqRecvLength; + appendBinaryStringInfo(s, pqstate->recv_buf + pqstate->recv_offset, + pqstate->recv_len - pqstate->recv_offset); + pqstate->recv_offset = pqstate->recv_len; } } @@ -1213,12 +1285,12 @@ pq_startmsgread(void) * There shouldn't be a read active already, but let's check just to be * sure. */ - if (PqCommReadingMsg) + if (pqstate->is_reading) ereport(FATAL, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("terminating connection because protocol synchronization was lost"))); - PqCommReadingMsg = true; + pqstate->is_reading = true; } @@ -1233,9 +1305,9 @@ pq_startmsgread(void) void pq_endmsgread(void) { - Assert(PqCommReadingMsg); + Assert(pqstate->is_reading); - PqCommReadingMsg = false; + pqstate->is_reading = false; } /* -------------------------------- @@ -1249,7 +1321,7 @@ pq_endmsgread(void) bool pq_is_reading_msg(void) { - return PqCommReadingMsg; + return pqstate->is_reading; } /* -------------------------------- @@ -1273,7 +1345,7 @@ pq_getmessage(StringInfo s, int maxlen) { int32 len; - Assert(PqCommReadingMsg); + Assert(pqstate->is_reading); resetStringInfo(s); @@ -1318,7 +1390,7 @@ pq_getmessage(StringInfo s, int maxlen) errmsg("incomplete message from client"))); /* we discarded the rest of the message so we're back in sync. */ - PqCommReadingMsg = false; + pqstate->is_reading = false; PG_RE_THROW(); } PG_END_TRY(); @@ -1337,7 +1409,7 @@ pq_getmessage(StringInfo s, int maxlen) } /* finished reading the message. */ - PqCommReadingMsg = false; + pqstate->is_reading = false; return 0; } @@ -1355,13 +1427,13 @@ pq_putbytes(const char *s, size_t len) int res; /* Should only be called by old-style COPY OUT */ - Assert(DoingCopyOut); + Assert(pqstate->is_doing_copyout); /* No-op if reentrant call */ - if (PqCommBusy) + if (pqstate->is_busy) return 0; - PqCommBusy = true; + pqstate->is_busy = true; res = internal_putbytes(s, len); - PqCommBusy = false; + pqstate->is_busy = false; return res; } @@ -1373,23 +1445,24 @@ internal_putbytes(const char *s, size_t len) while (len > 0) { /* If buffer is full, then flush it out */ - if (PqSendPointer >= PqSendBufferSize) + if (pqstate->send_offset >= pqstate->send_bufsize) { socket_set_nonblocking(false); if (internal_flush()) return EOF; } - amount = PqSendBufferSize - PqSendPointer; + amount = pqstate->send_bufsize - pqstate->send_offset; if (amount > len) amount = len; - memcpy(PqSendBuffer + PqSendPointer, s, amount); - PqSendPointer += amount; + memcpy(pqstate->send_buf + pqstate->send_offset, s, amount); + pqstate->send_offset += amount; s += amount; len -= amount; } return 0; } + /* -------------------------------- * socket_flush - flush pending output * @@ -1401,13 +1474,17 @@ socket_flush(void) { int res; + if (pqstate->port->sock == PGINVALID_SOCKET) + return 0; + /* No-op if reentrant call */ - if (PqCommBusy) + if (pqstate->is_busy) return 0; - PqCommBusy = true; + + pqstate->is_busy = true; socket_set_nonblocking(false); res = internal_flush(); - PqCommBusy = false; + pqstate->is_busy = false; return res; } @@ -1423,14 +1500,14 @@ internal_flush(void) { static int last_reported_send_errno = 0; - char *bufptr = PqSendBuffer + PqSendStart; - char *bufend = PqSendBuffer + PqSendPointer; + char *bufptr = pqstate->send_buf + pqstate->send_start; + char *bufend = pqstate->send_buf + pqstate->send_offset; while (bufptr < bufend) { int r; - r = secure_write(MyProcPort, bufptr, bufend - bufptr); + r = secure_write(pqstate->port, bufptr, bufend - bufptr); if (r <= 0) { @@ -1470,7 +1547,7 @@ internal_flush(void) * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate * the connection. */ - PqSendStart = PqSendPointer = 0; + pqstate->send_start = pqstate->send_offset = 0; ClientConnectionLost = 1; InterruptPending = 1; return EOF; @@ -1478,10 +1555,10 @@ internal_flush(void) last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; - PqSendStart += r; + pqstate->send_start += r; } - PqSendStart = PqSendPointer = 0; + pqstate->send_start = pqstate->send_offset = 0; return 0; } @@ -1496,20 +1573,23 @@ socket_flush_if_writable(void) { int res; + if (pqstate->port->sock == PGINVALID_SOCKET) + return 0; + /* Quick exit if nothing to do */ - if (PqSendPointer == PqSendStart) + if (pqstate->send_offset == pqstate->send_start) return 0; /* No-op if reentrant call */ - if (PqCommBusy) + if (pqstate->is_busy) return 0; /* Temporarily put the socket into non-blocking mode */ socket_set_nonblocking(true); - PqCommBusy = true; + pqstate->is_busy = true; res = internal_flush(); - PqCommBusy = false; + pqstate->is_busy = false; return res; } @@ -1520,7 +1600,7 @@ socket_flush_if_writable(void) static bool socket_is_send_pending(void) { - return (PqSendStart < PqSendPointer); + return (pqstate->send_start < pqstate->send_offset); } /* -------------------------------- @@ -1559,9 +1639,9 @@ socket_is_send_pending(void) static int socket_putmessage(char msgtype, const char *s, size_t len) { - if (DoingCopyOut || PqCommBusy) + if (pqstate->is_doing_copyout || pqstate->is_busy) return 0; - PqCommBusy = true; + pqstate->is_busy = true; if (msgtype) if (internal_putbytes(&msgtype, 1)) goto fail; @@ -1575,11 +1655,11 @@ socket_putmessage(char msgtype, const char *s, size_t len) } if (internal_putbytes(s, len)) goto fail; - PqCommBusy = false; + pqstate->is_busy = false; return 0; fail: - PqCommBusy = false; + pqstate->is_busy = false; return EOF; } @@ -1599,11 +1679,11 @@ socket_putmessage_noblock(char msgtype, const char *s, size_t len) * Ensure we have enough space in the output buffer for the message header * as well as the message itself. */ - required = PqSendPointer + 1 + 4 + len; - if (required > PqSendBufferSize) + required = pqstate->send_offset + 1 + 4 + len; + if (required > pqstate->send_bufsize) { - PqSendBuffer = repalloc(PqSendBuffer, required); - PqSendBufferSize = required; + pqstate->send_buf = repalloc(pqstate->send_buf, required); + pqstate->send_bufsize = required; } res = pq_putmessage(msgtype, s, len); Assert(res == 0); /* should not fail when the message fits in @@ -1619,7 +1699,7 @@ socket_putmessage_noblock(char msgtype, const char *s, size_t len) static void socket_startcopyout(void) { - DoingCopyOut = true; + pqstate->is_doing_copyout = true; } /* -------------------------------- @@ -1635,12 +1715,12 @@ socket_startcopyout(void) static void socket_endcopyout(bool errorAbort) { - if (!DoingCopyOut) + if (!pqstate->is_doing_copyout) return; if (errorAbort) pq_putbytes("\n\n\\.\n", 5); /* in non-error case, copy.c will have emitted the terminator line */ - DoingCopyOut = false; + pqstate->is_doing_copyout = false; } /* diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile index aba1e92..56ec998 100644 --- a/src/backend/port/Makefile +++ b/src/backend/port/Makefile @@ -21,7 +21,7 @@ subdir = src/backend/port top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS) +OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS) ifeq ($(PORTNAME), win32) SUBDIRS += win32 diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c new file mode 100644 index 0000000..b69cc78 --- /dev/null +++ b/src/backend/port/send_sock.c @@ -0,0 +1,158 @@ +/*------------------------------------------------------------------------- + * + * send_sock.c + * Send socket descriptor to another process + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/port/send_sock.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef WIN32 +typedef struct +{ + SOCKET origsocket; + WSAPROTOCOL_INFO wsainfo; +} InheritableSocket; +#endif + +/* + * Send socket descriptor "sock" to backend process through Unix socket "chan" + */ +int +pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid) +{ +#ifdef WIN32 + InheritableSocket dst; + size_t rc; + dst.origsocket = sock; + if (WSADuplicateSocket(sock, pid, &dst.wsainfo) != 0) + { + ereport(FATAL, + (errmsg("could not duplicate socket %d for use in backend: error code %d", + (int)sock, WSAGetLastError()))); + return -1; + } + rc = send(chan, &dst, sizeof(dst), 0); + if (rc != sizeof(dst)) + { + ereport(FATAL, + (errmsg("Failed to send inheritable socket: rc=%d, error code %d", + (int)rc, WSAGetLastError()))); + return -1; + } + return 0; +#else + struct msghdr msg = { 0 }; + struct iovec io; + struct cmsghdr * cmsg; + char buf[CMSG_SPACE(sizeof(sock))]; + memset(buf, '\0', sizeof(buf)); + + /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */ + io.iov_base = ""; + io.iov_len = 1; + + msg.msg_iov = &io; + msg.msg_iovlen = 1; + msg.msg_control = buf; + msg.msg_controllen = sizeof(buf); + + cmsg = CMSG_FIRSTHDR(&msg); + if (!cmsg) + return PGINVALID_SOCKET; + + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(sock)); + + memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock)); + msg.msg_controllen = cmsg->cmsg_len; + + if (sendmsg(chan, &msg, 0) < 0) + return PGINVALID_SOCKET; + + return 0; +#endif +} + + +/* + * Receive socket descriptor from postmaster process through Unix socket "chan" + */ +pgsocket +pg_recv_sock(pgsocket chan) +{ +#ifdef WIN32 + InheritableSocket src; + SOCKET s; + size_t rc = recv(chan, &src, sizeof(src), 0); + if (rc != sizeof(src)) + { + ereport(FATAL, + (errmsg("Failed to receive inheritable socket: rc=%d, error code %d", + (int)rc, WSAGetLastError()))); + } + s = WSASocket(FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + &src.wsainfo, + 0, + 0); + if (s == INVALID_SOCKET) + { + ereport(FATAL, + (errmsg("could not create inherited socket: error code %d\n", + WSAGetLastError()))); + } + + /* + * To make sure we don't get two references to the same socket, close + * the original one. (This would happen when inheritance actually + * works.. + */ + closesocket(src.origsocket); + return s; +#else + struct msghdr msg = {0}; + char c_buffer[256]; + char m_buffer[256]; + struct iovec io; + struct cmsghdr * cmsg; + pgsocket sock; + + io.iov_base = m_buffer; + io.iov_len = sizeof(m_buffer); + msg.msg_iov = &io; + msg.msg_iovlen = 1; + + msg.msg_control = c_buffer; + msg.msg_controllen = sizeof(c_buffer); + + if (recvmsg(chan, &msg, 0) < 0) + return PGINVALID_SOCKET; + + cmsg = CMSG_FIRSTHDR(&msg); + if (!cmsg) + return PGINVALID_SOCKET; + + memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock)); + + pg_set_noblock(sock); + + return sock; +#endif +} diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c index f4356fe..7fd901f 100644 --- a/src/backend/port/win32/socket.c +++ b/src/backend/port/win32/socket.c @@ -726,3 +726,65 @@ pgwin32_socket_strerror(int err) } return wserrbuf; } + +int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2]) +{ + union { + struct sockaddr_in inaddr; + struct sockaddr addr; + } a; + SOCKET listener; + int e; + socklen_t addrlen = sizeof(a.inaddr); + DWORD flags = 0; + int reuse = 1; + + socks[0] = socks[1] = -1; + + listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (listener == -1) + return SOCKET_ERROR; + + memset(&a, 0, sizeof(a)); + a.inaddr.sin_family = AF_INET; + a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + a.inaddr.sin_port = 0; + + for (;;) { + if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, + (char*) &reuse, (socklen_t) sizeof(reuse)) == -1) + break; + if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) + break; + + memset(&a, 0, sizeof(a)); + if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR) + break; + a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + a.inaddr.sin_family = AF_INET; + + if (listen(listener, 1) == SOCKET_ERROR) + break; + + socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags); + if (socks[0] == -1) + break; + if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) + break; + + socks[1] = accept(listener, NULL, NULL); + if (socks[1] == -1) + break; + + closesocket(listener); + return 0; + } + + e = WSAGetLastError(); + closesocket(listener); + closesocket(socks[0]); + closesocket(socks[1]); + WSASetLastError(e); + socks[0] = socks[1] = -1; + return SOCKET_ERROR; +} diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 71c2321..b0bd173 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -13,6 +13,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \ - pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o + pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o \ + connpool.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index d2b695e..15b9eb5 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -21,6 +21,7 @@ #include "port/atomics.h" #include "postmaster/bgworker_internals.h" #include "postmaster/postmaster.h" +#include "postmaster/connpool.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "storage/dsm.h" @@ -129,7 +130,10 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain - } + }, + { + "StartupPacketReaderMain", StartupPacketReaderMain + } }; /* Private functions. */ diff --git a/src/backend/postmaster/connpool.c b/src/backend/postmaster/connpool.c new file mode 100644 index 0000000..e2d041a --- /dev/null +++ b/src/backend/postmaster/connpool.c @@ -0,0 +1,269 @@ +/*------------------------------------------------------------------------- + * connpool.c + * PostgreSQL connection pool workers. + * + * Copyright (c) 2018, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/postmaster/connpool.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include + +#include "lib/stringinfo.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/connpool.h" +#include "postmaster/postmaster.h" +#include "storage/proc.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "tcop/tcopprot.h" + +/* + * GUC parameters + */ +int NumConnPoolWorkers = 2; + +/* + * Global variables + */ +ConnPoolWorker *ConnPoolWorkers; + +/* + * Signals management + */ +static volatile sig_atomic_t shutdown_requested = false; +static void handle_sigterm(SIGNAL_ARGS); + +static void *pqstate; + +static void +handle_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + shutdown_requested = true; + SetLatch(&MyProc->procLatch); + errno = save_errno; +} + +Size +ConnPoolShmemSize(void) +{ + return MAXALIGN(sizeof(ConnPoolWorker) * NumConnPoolWorkers); +} + +void +ConnectionPoolWorkersInit(void) +{ + int i; + bool found; + Size size = ConnPoolShmemSize(); + + ConnPoolWorkers = ShmemInitStruct("connection pool workers", + size, &found); + + if (!found) + { + MemSet(ConnPoolWorkers, 0, size); + for (i = 0; i < NumConnPoolWorkers; i++) + { + ConnPoolWorker *worker = &ConnPoolWorkers[i]; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, worker->pipes) < 0) + elog(FATAL, "could not create socket pair for connection pool"); + } + } +} + +/* + * Register background workers for startup packet reading. + */ +void +RegisterConnPoolWorkers(void) +{ + int i; + BackgroundWorker bgw; + + if (SessionPoolSize == 0) + /* no need to start workers */ + return; + + for (i = 0; i < NumConnPoolWorkers; i++) + { + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; + bgw.bgw_start_time = BgWorkerStart_PostmasterStart; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "StartupPacketReaderMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "connection pool worker %d", i + 1); + bgw.bgw_restart_time = 3; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) i; + + RegisterBackgroundWorker(&bgw); + } + + elog(LOG, "Connection pool have been started"); +} + +static void +resetWorkerState(ConnPoolWorker *worker, Port *port) +{ + /* Cleanup */ + whereToSendOutput = DestNone; + if (port != NULL) + { + if (port->sock != PGINVALID_SOCKET) + closesocket(port->sock); + if (port->pqcomm_waitset != NULL) + FreeWaitEventSet(port->pqcomm_waitset); + port = NULL; + } + pq_set_current_state(pqstate, NULL, NULL); +} + +void +StartupPacketReaderMain(Datum arg) +{ + sigjmp_buf local_sigjmp_buf; + ConnPoolWorker *worker = &ConnPoolWorkers[(int) arg]; + MemoryContext mcxt; + int status; + Port *port = NULL; + + pqsignal(SIGTERM, handle_sigterm); + BackgroundWorkerUnblockSignals(); + + mcxt = AllocSetContextCreate(TopMemoryContext, + "temporary context", + ALLOCSET_DEFAULT_SIZES); + pqstate = pq_init(TopMemoryContext); + worker->pid = MyProcPid; + worker->latch = MyLatch; + Assert(MyLatch == &MyProc->procLatch); + + MemoryContextSwitchTo(mcxt); + + /* In an exception is encountered, processing resumes here */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* Since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevent interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log and to the client */ + EmitErrorReport(); + + /* + * Now return to normal top-level context and clear ErrorContext for + * next time. + */ + MemoryContextSwitchTo(mcxt); + FlushErrorState(); + + /* + * We only reset worker state here, but memory will be cleaned + * after next cycle. That's enough for now. + */ + resetWorkerState(worker, port); + + /* Ready for new sockets */ + worker->state = CPW_FREE; + + /* Now we can allow interrupts again */ + RESUME_INTERRUPTS(); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + while (!shutdown_requested) + { + ListCell *lc; + int rc; + StringInfoData buf; + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_POSTMASTER_DEATH, + 0, PG_WAIT_EXTENSION); + + if (rc & WL_POSTMASTER_DEATH) + break; + + ResetLatch(&MyProc->procLatch); + + if (shutdown_requested) + break; + + if (worker->state != CPW_NEW_SOCKET) + /* we woke up for other reason */ + continue; + + /* Set up temporary pq state for startup packet */ + port = palloc0(sizeof(Port)); + port->sock = PGINVALID_SOCKET; + + while (port->sock == PGINVALID_SOCKET) + port->sock = pg_recv_sock(worker->pipes[1]); + + /* init pqcomm */ + port->pqcomm_waitset = pq_create_backend_event_set(mcxt, port, true); + port->canAcceptConnections = worker->cac_state; + pq_set_current_state(pqstate, port, port->pqcomm_waitset); + whereToSendOutput = DestRemote; + + /* TODO: deal with timeouts */ + status = ProcessStartupPacket(port, false, mcxt, ERROR); + if (status != STATUS_OK) + { + worker->state = CPW_FREE; + goto cleanup; + } + + /* Serialize a port into stringinfo */ + pq_beginmessage(&buf, 'P'); + pq_sendint(&buf, port->proto, 4); + pq_sendstring(&buf, port->database_name); + pq_sendstring(&buf, port->user_name); + pq_sendint(&buf, list_length(port->guc_options), 4); + + foreach(lc, port->guc_options) + { + char *str = (char *) lfirst(lc); + pq_sendstring(&buf, str); + } + + if (port->cmdline_options) + { + pq_sendint(&buf, 1, 4); + pq_sendstring(&buf, port->cmdline_options); + } + else pq_sendint(&buf, 0, 4); + + worker->state = CPW_PROCESSED; + + while ((rc = send(worker->pipes[1], &buf.len, sizeof(buf.len), 0)) < 0 && errno == EINTR); + if (rc != (int)sizeof(buf.len)) + elog(ERROR, "could not send data to postmaster"); + while ((rc = send(worker->pipes[1], buf.data, buf.len, 0)) < 0 && errno == EINTR); + if (rc != buf.len) + elog(ERROR, "could not send data to postmaster"); + pfree(buf.data); + buf.data = NULL; + cleanup: + resetWorkerState(worker, port); + MemoryContextReset(mcxt); + } + + resetWorkerState(worker, NULL); +} diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 8a5b2b3..8bdc988 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -868,7 +868,8 @@ pgstat_report_stat(bool force) PgStat_TableEntry *this_ent; /* Shouldn't have any pending transaction-dependent counts */ - Assert(entry->trans == NULL); + if (entry->trans != NULL) + continue; /* * Ignore entries that didn't accumulate any actual counts, such diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index a4b53b3..56fef63 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -76,6 +76,7 @@ #include #include #include +#include #ifdef HAVE_SYS_SELECT_H #include @@ -114,6 +115,7 @@ #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" +#include "postmaster/connpool.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" #include "storage/fd.h" @@ -170,6 +172,7 @@ typedef struct bkend pid_t pid; /* process id of backend */ int32 cancel_key; /* cancel key for cancels for this backend */ int child_slot; /* PMChildSlot for this backend, if any */ + pgsocket session_send_sock; /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */ /* * Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND @@ -178,8 +181,11 @@ typedef struct bkend */ int bkend_type; bool dead_end; /* is it going to send an error and quit? */ - bool bgworker_notify; /* gets bgworker start/stop notifications */ + bool bgworker_notify;/* gets bgworker start/stop notifications */ dlist_node elem; /* list link in BackendList */ + int session_pool_id;/* identifier of backends session pool */ + int worker_id; /* identifier of worker within session pool */ + void *pool; /* pool of backends */ } Backend; static dlist_head BackendList = DLIST_STATIC_INIT(BackendList); @@ -190,7 +196,27 @@ static Backend *ShmemBackendArray; BackgroundWorker *MyBgworkerEntry = NULL; +struct DatabasePoolKey { + char database[NAMEDATALEN]; + char username[NAMEDATALEN]; +}; + +typedef struct DatabasePool +{ + struct DatabasePoolKey key; + Backend **workers; /* pool backends */ + int n_workers; /* number of launched worker backends + in this pool so far */ + int rr_index; /* index of current backends used to implement + * round-robin distribution of sessions through + * backends */ +} DatabasePool; + +static struct +{ + HTAB *pools; +} PostmasterSessionPool; /* The socket number we are listening for connections on */ int PostPortNumber; @@ -214,7 +240,7 @@ int ReservedBackends; /* The socket(s) we're listening to. */ #define MAXLISTEN 64 -static pgsocket ListenSocket[MAXLISTEN]; +static pgsocket ListenSocket[MAXLISTEN + MAX_CONNPOOL_WORKERS]; /* * Set by the -o option @@ -393,15 +419,19 @@ static void unlink_external_pid_file(int status, Datum arg); static void getInstallationPaths(const char *argv0); static void checkControlFile(void); static Port *ConnCreate(int serverFd); +static Port *PoolConnCreate(pgsocket poolFd, int workerId); static void ConnFree(Port *port); +static void ConnDispatch(Port *port); static void reset_shared(int port); static void SIGHUP_handler(SIGNAL_ARGS); +static CAC_state canAcceptConnections(void); static void pmdie(SIGNAL_ARGS); static void reaper(SIGNAL_ARGS); static void sigusr1_handler(SIGNAL_ARGS); static void startup_die(SIGNAL_ARGS); static void dummy_handler(SIGNAL_ARGS); static void StartupPacketTimeoutHandler(void); +static int BackendStartup(DatabasePool *pool, Port *port); static void CleanupBackend(int pid, int exitstatus); static bool CleanupBackgroundWorker(int pid, int exitstatus); static void HandleChildCrash(int pid, int exitstatus, const char *procname); @@ -412,13 +442,10 @@ static void BackendInitialize(Port *port); static void BackendRun(Port *port) pg_attribute_noreturn(); static void ExitPostmaster(int status) pg_attribute_noreturn(); static int ServerLoop(void); -static int BackendStartup(Port *port); -static int ProcessStartupPacket(Port *port, bool SSLdone); static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options); static void processCancelRequest(Port *port, void *pkt); static int initMasks(fd_set *rmask); static void report_fork_failure_to_client(Port *port, int errnum); -static CAC_state canAcceptConnections(void); static bool RandomCancelKey(int32 *cancel_key); static void signal_child(pid_t pid, int signal); static bool SignalSomeChildren(int signal, int targets); @@ -486,6 +513,7 @@ typedef struct { Port port; InheritableSocket portsocket; + InheritableSocket sessionsocket; char DataDir[MAXPGPATH]; pgsocket ListenSocket[MAXLISTEN]; int32 MyCancelKey; @@ -988,6 +1016,11 @@ PostmasterMain(int argc, char *argv[]) ApplyLauncherRegister(); /* + * Register connnection pool workers + */ + RegisterConnPoolWorkers(); + + /* * process any libraries that should be preloaded at postmaster start */ process_shared_preload_libraries(); @@ -1613,6 +1646,177 @@ DetermineSleepTime(struct timeval *timeout) } } +static bool +IsDedicatedDatabase(char const* dbname) +{ + List *namelist; + ListCell *l; + char *databases; + bool found = false; + + /* Need a modifiable copy of namespace_search_path string */ + databases = pstrdup(DedicatedDatabases); + + if (!SplitIdentifierString(databases, ',', &namelist)) { + elog(ERROR, "invalid list syntax"); + } + foreach(l, namelist) + { + char *curname = (char *) lfirst(l); + if (strcmp(curname, dbname) == 0) + { + found = true; + break; + } + } + list_free(namelist); + pfree(databases); + + return found; +} + +/* + * Find free worker and send socket + */ +static void +SendPortToConnectionPool(Port *port) +{ + int i; + bool sent; + + /* By default is not dedicated */ + IsDedicatedBackend = false; + + sent = false; + +again: + for (i = 0; i < NumConnPoolWorkers; i++) + { + ConnPoolWorker *worker = &ConnPoolWorkers[i]; + if (worker->pid == 0) + continue; + + if (worker->state == CPW_PROCESSED) + { + Port *conn = PoolConnCreate(worker->pipes[0], i); + if (conn) + ConnDispatch(conn); + } + if (worker->state == CPW_FREE) + { + worker->port = port; + worker->state = CPW_NEW_SOCKET; + worker->cac_state = canAcceptConnections(); + + if (pg_send_sock(worker->pipes[0], port->sock, worker->pid) < 0) + { + elog(LOG, "could not send socket to connection pool: %m"); + ExitPostmaster(1); + } + SetLatch(worker->latch); + sent = true; + break; + } + } + + if (!sent) + { + pg_usleep(1000L); + goto again; + } +} + +static void +ConnDispatch(Port *port) +{ + bool found; + DatabasePool *pool; + struct DatabasePoolKey key; + + Assert(port->sock != PGINVALID_SOCKET); + if (IsDedicatedDatabase(port->database_name)) + { + IsDedicatedBackend = true; + BackendStartup(NULL, port); + goto cleanup; + } + +#ifdef USE_SSL + if (port->ssl_in_use) + { + /* + * We don't (yet) support SSL connections with connection pool, + * since we need to move whole SSL context to already working + * backend. This task needs more investigation. + */ + elog(ERROR, "connection pool does not support SSL connections"); + goto cleanup; + } +#endif + MemSet(key.database, 0, NAMEDATALEN); + MemSet(key.username, 0, NAMEDATALEN); + + strlcpy(key.database, port->database_name, NAMEDATALEN); + strlcpy(key.username, port->user_name, NAMEDATALEN); + + pool = hash_search(PostmasterSessionPool.pools, &key, HASH_ENTER, &found); + if (!found) + { + pool->key = key; + pool->workers = NULL; + pool->n_workers = 0; + pool->rr_index = 0; + } + + BackendStartup(pool, port); + +cleanup: + /* + * We no longer need the open socket or port structure + * in this process + */ + StreamClose(port->sock); + ConnFree(port); +} + +/* + * Init wait event set for connection pool workers, + * and hash table for backends in pool. + */ +static int +InitConnPoolState(fd_set *rmask, int numSockets) +{ + int i; + HASHCTL ctl; + + /* + * create hashtable that indexes the relcache + */ + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(struct DatabasePoolKey); + ctl.entrysize = sizeof(DatabasePool); + ctl.hcxt = PostmasterContext; + PostmasterSessionPool.pools = hash_create("Pool by database and user", 100, + &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + for (i = 0; i < NumConnPoolWorkers; i++) + { + ConnPoolWorker *worker = &ConnPoolWorkers[i]; + worker->port = NULL; + + /* + * we use same pselect(3) call for connection pool workers and + * clients + */ + ListenSocket[MAXLISTEN + i] = worker->pipes[0]; + FD_SET(worker->pipes[0], rmask); + if (worker->pipes[0] > numSockets) + numSockets = worker->pipes[0]; + } + + return numSockets + 1; +} + /* * Main idle loop of postmaster * @@ -1630,6 +1834,9 @@ ServerLoop(void) nSockets = initMasks(&readmask); + if (SessionPoolSize > 0) + nSockets = InitConnPoolState(&readmask, nSockets); + for (;;) { fd_set rmask; @@ -1690,27 +1897,43 @@ ServerLoop(void) */ if (selres > 0) { + Port *port; int i; + /* Check for client connections */ for (i = 0; i < MAXLISTEN; i++) { if (ListenSocket[i] == PGINVALID_SOCKET) break; if (FD_ISSET(ListenSocket[i], &rmask)) { - Port *port; - port = ConnCreate(ListenSocket[i]); if (port) { - BackendStartup(port); - - /* - * We no longer need the open socket or port structure - * in this process - */ - StreamClose(port->sock); - ConnFree(port); + if (SessionPoolSize == 0) + { + IsDedicatedBackend = true; + BackendStartup(NULL, port); + StreamClose(port->sock); + ConnFree(port); + } + else + SendPortToConnectionPool(port); + } + } + } + + /* Check for some data from connections pool */ + if (SessionPoolSize > 0) + { + for (i = 0; i < NumConnPoolWorkers; i++) + { + if (FD_ISSET(ListenSocket[MAXLISTEN + i], &rmask)) + { + port = PoolConnCreate(ListenSocket[MAXLISTEN + i], i); + if (port) + ConnDispatch(port); + } } } @@ -1893,13 +2116,15 @@ initMasks(fd_set *rmask) * send anything to the client, which would typically be appropriate * if we detect a communications failure.) */ -static int -ProcessStartupPacket(Port *port, bool SSLdone) +int +ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx, + int errlevel) { int32 len; void *buf; ProtocolVersion proto; - MemoryContext oldcontext; + MemoryContext oldcontext = MemoryContextSwitchTo(memctx); + int result; pq_startmsgread(); if (pq_getbytes((char *) &len, 4) == EOF) @@ -1992,7 +2217,7 @@ retry1: #endif /* regular startup packet, cancel, etc packet should follow... */ /* but not another SSL negotiation request */ - return ProcessStartupPacket(port, true); + return ProcessStartupPacket(port, true, memctx, errlevel); } /* Could add additional special packet types here */ @@ -2006,13 +2231,16 @@ retry1: /* Check that the major protocol version is in range. */ if (PG_PROTOCOL_MAJOR(proto) < PG_PROTOCOL_MAJOR(PG_PROTOCOL_EARLIEST) || PG_PROTOCOL_MAJOR(proto) > PG_PROTOCOL_MAJOR(PG_PROTOCOL_LATEST)) - ereport(FATAL, + { + ereport(errlevel, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("unsupported frontend protocol %u.%u: server supports %u.0 to %u.%u", PG_PROTOCOL_MAJOR(proto), PG_PROTOCOL_MINOR(proto), PG_PROTOCOL_MAJOR(PG_PROTOCOL_EARLIEST), PG_PROTOCOL_MAJOR(PG_PROTOCOL_LATEST), PG_PROTOCOL_MINOR(PG_PROTOCOL_LATEST)))); + return STATUS_ERROR; + } /* * Now fetch parameters out of startup packet and save them into the Port @@ -2022,7 +2250,7 @@ retry1: * not worry about leaking this storage on failure, since we aren't in the * postmaster process anymore. */ - oldcontext = MemoryContextSwitchTo(TopMemoryContext); + oldcontext = MemoryContextSwitchTo(memctx); if (PG_PROTOCOL_MAJOR(proto) >= 3) { @@ -2070,12 +2298,15 @@ retry1: am_db_walsender = true; } else if (!parse_bool(valptr, &am_walsender)) - ereport(FATAL, + { + ereport(errlevel, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid value for parameter \"%s\": \"%s\"", "replication", valptr), errhint("Valid values are: \"false\", 0, \"true\", 1, \"database\"."))); + return STATUS_ERROR; + } } else if (strncmp(nameptr, "_pq_.", 5) == 0) { @@ -2103,9 +2334,12 @@ retry1: * given packet length, complain. */ if (offset != len - 1) - ereport(FATAL, + { + ereport(errlevel, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid startup packet layout: expected terminator as last byte"))); + return STATUS_ERROR; + } /* * If the client requested a newer protocol version or if the client @@ -2141,9 +2375,12 @@ retry1: /* Check a user name was given. */ if (port->user_name == NULL || port->user_name[0] == '\0') - ereport(FATAL, + { + ereport(errlevel, (errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION), errmsg("no PostgreSQL user name specified in startup packet"))); + return STATUS_ERROR; + } /* The database defaults to the user name. */ if (port->database_name == NULL || port->database_name[0] == '\0') @@ -2197,27 +2434,32 @@ retry1: * now instead of wasting cycles on an authentication exchange. (This also * allows a pg_ping utility to be written.) */ + result = STATUS_OK; switch (port->canAcceptConnections) { case CAC_STARTUP: - ereport(FATAL, + ereport(errlevel, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("the database system is starting up"))); + result = STATUS_ERROR; break; case CAC_SHUTDOWN: - ereport(FATAL, + ereport(errlevel, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("the database system is shutting down"))); + result = STATUS_ERROR; break; case CAC_RECOVERY: - ereport(FATAL, + ereport(errlevel, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("the database system is in recovery mode"))); + result = STATUS_ERROR; break; case CAC_TOOMANY: - ereport(FATAL, + ereport(errlevel, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("sorry, too many clients already"))); + result = STATUS_ERROR; break; case CAC_WAITBACKUP: /* OK for now, will check in InitPostgres */ @@ -2226,7 +2468,7 @@ retry1: break; } - return STATUS_OK; + return result; } /* @@ -2322,7 +2564,7 @@ processCancelRequest(Port *port, void *pkt) /* * canAcceptConnections --- check to see if database state allows connections. */ -static CAC_state +CAC_state canAcceptConnections(void) { CAC_state result = CAC_OK; @@ -2398,7 +2640,7 @@ ConnCreate(int serverFd) ConnFree(port); return NULL; } - + SessionPoolSock = PGINVALID_SOCKET; /* * Allocate GSSAPI specific state struct */ @@ -2418,6 +2660,66 @@ ConnCreate(int serverFd) return port; } +#define CONN_BUF_SIZE 8192 + +static Port * +PoolConnCreate(pgsocket poolFd, int workerId) +{ + char recv_buf[CONN_BUF_SIZE]; + int recv_len = 0, + i, + rc, + offs, + len; + StringInfoData buf; + ConnPoolWorker *worker = &ConnPoolWorkers[workerId]; + Port *port = worker->port; + + if (worker->state != CPW_PROCESSED) + return NULL; + + /* In any case we should free the worker */ + worker->port = NULL; + worker->state = CPW_FREE; + + while ((rc = read(poolFd, &recv_len, sizeof recv_len)) < 0 && errno == EINTR); + if (rc != (int)sizeof(recv_len)) + { + io_error: + StreamClose(port->sock); + ConnFree(port); + return NULL; + } + + for (offs = 0; offs < recv_len; offs += rc) + { + while ((rc = read(poolFd, recv_buf + offs, CONN_BUF_SIZE - offs)) < 0 && errno == EINTR); + if (rc <= 0) + goto io_error; + } + + buf.cursor = 0; + buf.data = recv_buf; + buf.len = recv_len; + + port->proto = pq_getmsgint(&buf, 4); + port->database_name = MemoryContextStrdup(TopMemoryContext, pq_getmsgstring(&buf)); + port->user_name = MemoryContextStrdup(TopMemoryContext, pq_getmsgstring(&buf)); + port->guc_options = NIL; + + /* GUC */ + len = pq_getmsgint(&buf, 4); + for (i = 0; i < len; i++) + { + char *val = MemoryContextStrdup(TopMemoryContext, pq_getmsgstring(&buf)); + port->guc_options = lappend(port->guc_options, val); + } + + if (pq_getmsgint(&buf, 4) > 0) + port->cmdline_options = MemoryContextStrdup(TopMemoryContext, pq_getmsgstring(&buf)); + + return port; +} /* * ConnFree -- free a local connection data structure @@ -2430,6 +2732,12 @@ ConnFree(Port *conn) #endif if (conn->gss) free(conn->gss); + if (conn->database_name) + pfree(conn->database_name); + if (conn->user_name) + pfree(conn->user_name); + if (conn->cmdline_options) + pfree(conn->cmdline_options); free(conn); } @@ -3185,6 +3493,44 @@ CleanupBackgroundWorker(int pid, } /* + * Unlink backend from backend's list and free memory. + */ +static void +UnlinkPooledBackend(Backend *bp) +{ + DatabasePool *pool = bp->pool; + + if (!pool || + bp->bkend_type != BACKEND_TYPE_NORMAL || + bp->session_send_sock == PGINVALID_SOCKET) + return; + + Assert(pool->n_workers > bp->worker_id && + pool->workers[bp->worker_id] == bp); + + if (--pool->n_workers != 0) + { + pool->workers[bp->worker_id] = pool->workers[pool->n_workers]; + pool->workers[bp->worker_id]->worker_id = bp->worker_id; + pool->rr_index %= pool->n_workers; + } + + closesocket(bp->session_send_sock); + bp->session_send_sock = PGINVALID_SOCKET; + + elog(DEBUG2, "Cleanup backend %d", bp->pid); +} + +static void +DeleteBackend(Backend *bp) +{ + UnlinkPooledBackend(bp); + + dlist_delete(&bp->elem); + free(bp); +} + +/* * CleanupBackend -- cleanup after terminated backend. * * Remove all local state associated with backend. @@ -3261,8 +3607,7 @@ CleanupBackend(int pid, */ BackgroundWorkerStopNotifications(bp->pid); } - dlist_delete(iter.cur); - free(bp); + DeleteBackend(bp); break; } } @@ -3364,8 +3709,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) ShmemBackendArrayRemove(bp); #endif } - dlist_delete(iter.cur); - free(bp); + DeleteBackend(bp); /* Keep looping so we can signal remaining backends */ } else @@ -3962,16 +4306,42 @@ TerminateChildren(int signal) * Note: if you change this code, also consider StartAutovacuumWorker. */ static int -BackendStartup(Port *port) +BackendStartup(DatabasePool *pool, Port *port) { Backend *bn; /* for backend cleanup */ pid_t pid; + pgsocket session_pipe[2]; + + /* + * In case of session pooling instead of spawning new backend open + * new session at one of the existed backends. + */ + while (pool && pool->n_workers >= SessionPoolSize) + { + Backend *worker = pool->workers[pool->rr_index]; + pool->rr_index = (pool->rr_index + 1) % pool->n_workers; /* round-robin */ + + /* Send connection socket to the worker backend */ + if (pg_send_sock(worker->session_send_sock, port->sock, worker->pid) < 0) + { + elog(LOG, "Failed to send session socket %d: %m", + worker->session_send_sock); + UnlinkPooledBackend(worker); + continue; + } + + elog(DEBUG2, "Start new session for socket %d at backend %d", + port->sock, worker->pid); + + /* TODO: serialize the port and send it through socket */ + return STATUS_OK; + } /* * Create backend data structure. Better before the fork() so we can * handle failure cleanly. */ - bn = (Backend *) malloc(sizeof(Backend)); + bn = (Backend *) calloc(1, sizeof(Backend)); if (!bn) { ereport(LOG, @@ -4012,12 +4382,30 @@ BackendStartup(Port *port) /* Hasn't asked to be notified about any bgworkers yet */ bn->bgworker_notify = false; + /* Create socket pair for sending session sockets to the backend */ + if (!IsDedicatedBackend) + { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, session_pipe) < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg_internal("could not create socket pair for launching sessions: %m"))); +#ifdef WIN32 + SessionPoolSock = session_pipe[0]; +#endif + } #ifdef EXEC_BACKEND pid = backend_forkexec(port); #else /* !EXEC_BACKEND */ pid = fork_process(); if (pid == 0) /* child */ { + whereToSendOutput = DestNone; + + if (!IsDedicatedBackend) + { + SessionPoolSock = session_pipe[0]; /* Use this socket for receiving client session socket descriptor */ + close(session_pipe[1]); /* Close unused end of the pipe */ + } free(bn); /* Detangle from postmaster */ @@ -4026,11 +4414,14 @@ BackendStartup(Port *port) /* Close the postmaster's sockets */ ClosePostmasterPorts(false); - /* Perform additional initialization and collect startup packet */ + /* Perform additional initialization */ BackendInitialize(port); /* And run the backend */ BackendRun(port); + + /* Unreachable */ + Assert(false); } #endif /* EXEC_BACKEND */ @@ -4041,6 +4432,7 @@ BackendStartup(Port *port) if (!bn->dead_end) (void) ReleasePostmasterChildSlot(bn->child_slot); + free(bn); errno = save_errno; ereport(LOG, @@ -4059,9 +4451,27 @@ BackendStartup(Port *port) * of backends. */ bn->pid = pid; + bn->session_send_sock = PGINVALID_SOCKET; bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */ + bn->pool = pool; dlist_push_head(&BackendList, &bn->elem); + if (!IsDedicatedBackend) + { + /* Use this socket for sending client session socket descriptor */ + bn->session_send_sock = session_pipe[1]; + + /* Close unused end of the pipe */ + closesocket(session_pipe[0]); + + if (pool->workers == NULL) + pool->workers = (Backend **) calloc(sizeof(Backend *), SessionPoolSize); + + bn->worker_id = pool->n_workers++; + pool->workers[bn->worker_id] = bn; + + elog(DEBUG1, "Start %d-th worker with pid %d", pool->n_workers, pid); + } #ifdef EXEC_BACKEND if (!bn->dead_end) ShmemBackendArrayAdd(bn); @@ -4122,6 +4532,7 @@ BackendInitialize(Port *port) /* Save port etc. for ps status */ MyProcPort = port; + FrontendProtocol = port->proto; /* * PreAuthDelay is a debugging aid for investigating problems in the @@ -4148,7 +4559,10 @@ BackendInitialize(Port *port) * Initialize libpq and enable reporting of ereport errors to the client. * Must do this now because authentication uses libpq to send messages. */ - pq_init(); /* initialize libpq to talk to client */ + port->pqcomm_state = pq_init(TopMemoryContext); /* initialize libpq to talk to client */ + port->pqcomm_waitset = pq_create_backend_event_set(TopMemoryContext, port, false); + pq_set_current_state(port->pqcomm_state, port, port->pqcomm_waitset); + whereToSendOutput = DestRemote; /* now safe to ereport to client */ /* @@ -4227,35 +4641,46 @@ BackendInitialize(Port *port) port->remote_hostname = strdup(remote_host); /* - * Ready to begin client interaction. We will give up and exit(1) after a - * time delay, so that a broken client can't hog a connection - * indefinitely. PreAuthDelay and any DNS interactions above don't count - * against the time limit. - * - * Note: AuthenticationTimeout is applied here while waiting for the - * startup packet, and then again in InitPostgres for the duration of any - * authentication operations. So a hostile client could tie up the - * process for nearly twice AuthenticationTimeout before we kick him off. - * - * Note: because PostgresMain will call InitializeTimeouts again, the - * registration of STARTUP_PACKET_TIMEOUT will be lost. This is okay - * since we never use it again after this function. + * Read startup backend only if we don't use session pool */ - RegisterTimeout(STARTUP_PACKET_TIMEOUT, StartupPacketTimeoutHandler); - enable_timeout_after(STARTUP_PACKET_TIMEOUT, AuthenticationTimeout * 1000); + if (IsDedicatedBackend && !port->proto) + { + /* + * Ready to begin client interaction. We will give up and exit(1) after a + * time delay, so that a broken client can't hog a connection + * indefinitely. PreAuthDelay and any DNS interactions above don't count + * against the time limit. + * + * Note: AuthenticationTimeout is applied here while waiting for the + * startup packet, and then again in InitPostgres for the duration of any + * authentication operations. So a hostile client could tie up the + * process for nearly twice AuthenticationTimeout before we kick him off. + * + * Note: because PostgresMain will call InitializeTimeouts again, the + * registration of STARTUP_PACKET_TIMEOUT will be lost. This is okay + * since we never use it again after this function. + */ + RegisterTimeout(STARTUP_PACKET_TIMEOUT, StartupPacketTimeoutHandler); + enable_timeout_after(STARTUP_PACKET_TIMEOUT, AuthenticationTimeout * 1000); - /* - * Receive the startup packet (which might turn out to be a cancel request - * packet). - */ - status = ProcessStartupPacket(port, false); + /* + * Receive the startup packet (which might turn out to be a cancel request + * packet). + */ + status = ProcessStartupPacket(port, false, TopMemoryContext, FATAL); - /* - * Stop here if it was bad or a cancel packet. ProcessStartupPacket - * already did any appropriate error reporting. - */ - if (status != STATUS_OK) - proc_exit(0); + /* + * Stop here if it was bad or a cancel packet. ProcessStartupPacket + * already did any appropriate error reporting. + */ + if (status != STATUS_OK) + proc_exit(0); + + /* + * Disable the timeout + */ + disable_timeout(STARTUP_PACKET_TIMEOUT, false); + } /* * Now that we have the user and database name, we can set the process @@ -4277,9 +4702,8 @@ BackendInitialize(Port *port) update_process_title ? "authentication" : ""); /* - * Disable the timeout, and prevent SIGTERM/SIGQUIT again. + * Prevent SIGTERM/SIGQUIT again. */ - disable_timeout(STARTUP_PACKET_TIMEOUT, false); PG_SETMASK(&BlockSig); } @@ -5990,6 +6414,9 @@ save_backend_variables(BackendParameters *param, Port *port, if (!write_inheritable_socket(¶m->portsocket, port->sock, childPid)) return false; + if (!write_inheritable_socket(¶m->sessionsocket, SessionPoolSock, childPid)) + return false; + strlcpy(param->DataDir, DataDir, MAXPGPATH); memcpy(¶m->ListenSocket, &ListenSocket, sizeof(ListenSocket)); @@ -6222,6 +6649,7 @@ restore_backend_variables(BackendParameters *param, Port *port) { memcpy(port, ¶m->port, sizeof(Port)); read_inheritable_socket(&port->sock, ¶m->portsocket); + read_inheritable_socket(&SessionPoolSock, ¶m->sessionsocket); SetDataDir(param->DataDir); diff --git a/src/backend/storage/ipc/ipc.c b/src/backend/storage/ipc/ipc.c index a85a1c6..9802ca0 100644 --- a/src/backend/storage/ipc/ipc.c +++ b/src/backend/storage/ipc/ipc.c @@ -413,3 +413,12 @@ on_exit_reset(void) on_proc_exit_index = 0; reset_on_dsm_detach(); } + +void +on_shmem_exit_reset(void) +{ + before_shmem_exit_index = 0; + on_shmem_exit_index = 0; + on_proc_exit_index = 0; + reset_on_dsm_detach(); +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 0c86a58..10e4613 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -28,6 +28,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" +#include "postmaster/connpool.h" #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); size = add_size(size, BackendRandomShmemSize()); + size = add_size(size, ConnPoolShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -271,6 +273,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) AsyncShmemInit(); BackendRandomShmemInit(); + /* + * Set up connection pool workers + */ + ConnectionPoolWorkersInit(); + #ifdef EXEC_BACKEND /* diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index f6dda9c..605f054 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -76,6 +76,7 @@ struct WaitEventSet { int nevents; /* number of registered events */ int nevents_space; /* maximum number of events in this set */ + int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/ /* * Array, of nevents_space length, storing the definition of events this @@ -129,9 +130,9 @@ static void drainSelfPipe(void); #if defined(WAIT_USE_EPOLL) static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action); #elif defined(WAIT_USE_POLL) -static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event); +static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove); #elif defined(WAIT_USE_WIN32) -static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event); +static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove); #endif static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, @@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents) set->latch = NULL; set->nevents_space = nevents; + set->free_events = -1; #if defined(WAIT_USE_EPOLL) #ifdef EPOLL_CLOEXEC @@ -667,9 +669,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data) { WaitEvent *event; + int free_event; /* not enough space */ - Assert(set->nevents < set->nevents_space); + if (set->nevents == set->nevents_space) + return -1; if (latch) { @@ -690,8 +694,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK)) elog(ERROR, "cannot wait on socket event without a socket"); - event = &set->events[set->nevents]; - event->pos = set->nevents++; + free_event = set->free_events; + if (free_event >= 0) + { + event = &set->events[free_event]; + set->free_events = event->pos; + event->pos = free_event; + } + else + { + event = &set->events[set->nevents]; + event->pos = set->nevents; + } + set->nevents += 1; event->fd = fd; event->events = events; event->user_data = user_data; @@ -718,15 +733,38 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, #if defined(WAIT_USE_EPOLL) WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD); #elif defined(WAIT_USE_POLL) - WaitEventAdjustPoll(set, event); + WaitEventAdjustPoll(set, event, false); #elif defined(WAIT_USE_WIN32) - WaitEventAdjustWin32(set, event); + WaitEventAdjustWin32(set, event, false); #endif return event->pos; } /* + * Remove event with specified socket descriptor + */ +void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd) +{ + int i, n = set->nevents; + for (i = 0; i < n; i++) + { + WaitEvent *event = &set->events[i]; + if (event->fd == fd) + { +#if defined(WAIT_USE_EPOLL) + WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL); +#elif defined(WAIT_USE_POLL) + WaitEventAdjustPoll(set, event, true); +#elif defined(WAIT_USE_WIN32) + WaitEventAdjustWin32(set, event, true); +#endif + break; + } + } +} + +/* * Change the event mask and, in the WL_LATCH_SET case, the latch associated * with the WaitEvent. * @@ -774,9 +812,9 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) #if defined(WAIT_USE_EPOLL) WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD); #elif defined(WAIT_USE_POLL) - WaitEventAdjustPoll(set, event); + WaitEventAdjustPoll(set, event, false); #elif defined(WAIT_USE_WIN32) - WaitEventAdjustWin32(set, event); + WaitEventAdjustWin32(set, event, false); #endif } @@ -822,19 +860,38 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) * requiring that, and actually it makes the code simpler... */ rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev); - + Assert(rc >= 0); if (rc < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("epoll_ctl() failed: %m"))); + + if (action == EPOLL_CTL_DEL) + { + int pos = event->pos; + event->fd = PGINVALID_SOCKET; + set->nevents -= 1; + event->pos = set->free_events; + set->free_events = pos; + } } #endif #if defined(WAIT_USE_POLL) static void -WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) +WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove) { - struct pollfd *pollfd = &set->pollfds[event->pos]; + int pos = event->pos; + struct pollfd *pollfd = &set->pollfds[pos]; + + if (remove) + { + set->nevents -= 1; + *pollfd = set->pollfds[set->nevents]; + set->events[pos] = set->events[set->nevents]; + event->pos = pos; + return; + } pollfd->revents = 0; pollfd->fd = event->fd; @@ -865,9 +922,25 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) #if defined(WAIT_USE_WIN32) static void -WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) +WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove) { - HANDLE *handle = &set->handles[event->pos + 1]; + int pos = event->pos; + HANDLE *handle = &set->handles[pos + 1]; + + if (remove) + { + Assert(event->fd != PGINVALID_SOCKET); + + if (*handle != WSA_INVALID_EVENT) + WSACloseEvent(*handle); + + set->nevents -= 1; + set->events[pos] = set->events[set->nevents]; + *handle = set->handles[set->nevents + 1]; + set->handles[set->nevents + 1] = WSA_INVALID_EVENT; + event->pos = pos; + return; + } if (event->events == WL_LATCH_SET) { @@ -880,7 +953,7 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) } else { - int flags = FD_CLOSE; /* always check for errors/EOF */ + int flags = FD_CLOSE; /* always check for errors/EOF */ if (event->events & WL_SOCKET_READABLE) flags |= FD_READ; @@ -897,8 +970,8 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) WSAGetLastError()); } if (WSAEventSelect(event->fd, *handle, flags) != 0) - elog(ERROR, "failed to set up event for socket: error code %u", - WSAGetLastError()); + elog(ERROR, "failed to set up event for socket %p: error code %u", + event->fd, WSAGetLastError()); Assert(event->fd != PGINVALID_SOCKET); } @@ -1296,7 +1369,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, { if (cur_event->reset) { - WaitEventAdjustWin32(set, cur_event); + WaitEventAdjustWin32(set, cur_event, false); cur_event->reset = false; } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 6f9aaa5..5356000 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -597,6 +597,15 @@ InitAuxiliaryProcess(void) } /* + * Generate unique session ID. + */ +uint32 +CreateSessionId(void) +{ + return ++SessionPool->sessionCount; +} + +/* * Record the PID and PGPROC structures for the Startup process, for use in * ProcSendSignal(). See comments there for further explanation. */ diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 7a9ada2..42017e4 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -40,6 +40,7 @@ #include "access/printtup.h" #include "access/xact.h" #include "catalog/pg_type.h" +#include "catalog/namespace.h" #include "commands/async.h" #include "commands/prepare.h" #include "executor/spi.h" @@ -77,9 +78,12 @@ #include "utils/snapmgr.h" #include "utils/timeout.h" #include "utils/timestamp.h" +#include "utils/builtins.h" +#include "utils/varlena.h" +#include "utils/inval.h" +#include "utils/catcache.h" #include "mb/pg_wchar.h" - /* ---------------- * global variables * ---------------- @@ -100,6 +104,41 @@ int max_stack_depth = 100; /* wait N seconds to allow attach from a debugger */ int PostAuthDelay = 0; +/* Local socket for redirecting sessions to the backends */ +pgsocket SessionPoolSock = PGINVALID_SOCKET; + +/* Pointer to pool of sessions */ +BackendSessionPool *SessionPool = NULL; + +/* Pointer to the active session */ +SessionContext *ActiveSession; +SessionContext DefaultContext; +bool IsDedicatedBackend = false; + +#define SessionVariable(type,name,init) type name = init; +#include "storage/sessionvars.h" + +static void SaveSessionVariables(SessionContext* session) +{ + if (session != NULL) + { +#define SessionVariable(type,name,init) session->name = name; +#include "storage/sessionvars.h" + } +} + +static void LoadSessionVariables(SessionContext* session) +{ +#define SessionVariable(type,name,init) name = session->name; +#include "storage/sessionvars.h" +} + +static void InitializeSessionVariables(SessionContext* session) +{ +#define SessionVariable(type,name,init) session->name = DefaultContext.name; +#include "storage/sessionvars.h" +} + /* ---------------- @@ -171,6 +210,8 @@ static ProcSignalReason RecoveryConflictReason; static MemoryContext row_description_context = NULL; static StringInfoData row_description_buf; +static bool IdleInTransactionSessionError; + /* ---------------------------------------------------------------- * decls for routines only used in this file * ---------------------------------------------------------------- @@ -196,6 +237,8 @@ static void log_disconnections(int code, Datum arg); static void enable_statement_timeout(void); static void disable_statement_timeout(void); +static void DeleteSession(SessionContext *session); +static void ResetCurrentSession(void); /* ---------------------------------------------------------------- * routines to obtain user input @@ -1234,10 +1277,6 @@ exec_parse_message(const char *query_string, /* string to execute */ bool save_log_statement_stats = log_statement_stats; char msec_str[32]; - /* - * Report query to various monitoring facilities. - */ - debug_query_string = query_string; pgstat_report_activity(STATE_RUNNING, query_string); @@ -2930,9 +2969,29 @@ ProcessInterrupts(void) LockErrorCleanup(); /* don't send to client, we already know the connection to be dead. */ whereToSendOutput = DestNone; - ereport(FATAL, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("connection to client lost"))); + + if (ActiveSession) + { + Port *port = ActiveSession->port; + DeleteWaitEventFromSet(SessionPool->waitEvents, port->sock); + + elog(LOG, "Lost connection on session %d in backend %d", MyProcPort->sock, MyProcPid); + + closesocket(port->sock); + port->sock = PGINVALID_SOCKET; + + MyProcPort = NULL; + + StartTransactionCommand(); + UserAbortTransactionBlock(); + CommitTransactionCommand(); + + ResetCurrentSession(); + } + else + ereport(FATAL, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection to client lost"))); } /* @@ -3043,9 +3102,20 @@ ProcessInterrupts(void) { /* Has the timeout setting changed since last we looked? */ if (IdleInTransactionSessionTimeout > 0) - ereport(FATAL, - (errcode(ERRCODE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT), - errmsg("terminating connection due to idle-in-transaction timeout"))); + { + if (ActiveSession) + { + IdleInTransactionSessionTimeoutPending = false; + IdleInTransactionSessionError = true; + ereport(ERROR, + (errcode(ERRCODE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT), + errmsg("canceling current transaction due to idle-in-transaction timeout"))); + } + else + ereport(FATAL, + (errcode(ERRCODE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT), + errmsg("terminating connection due to idle-in-transaction timeout"))); + } else IdleInTransactionSessionTimeoutPending = false; @@ -3605,6 +3675,97 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx, #endif } +#define ACTIVE_SESSION_MAGIC 0xDEFA1234U +#define REMOVED_SESSION_MAGIC 0xDEADDEEDU + +static SessionContext * +CreateSession(void) +{ + SessionContext *session = (SessionContext *) + MemoryContextAllocZero(SessionPool->mcxt, sizeof(SessionContext)); + + session->memory = AllocSetContextCreate(SessionPool->mcxt, + "SessionMemoryContext", ALLOCSET_DEFAULT_SIZES); + session->prepared_queries = NULL; + session->id = CreateSessionId(); + session->portals = CreatePortalsHashTable(session->memory); + session->magic = ACTIVE_SESSION_MAGIC; + return session; +} + +static void +SwitchToSession(SessionContext *session) +{ + /* epoll may return even for already closed session if socket is still openned. + * From epoll documentation: + * Q6 Will closing a file descriptor cause it to be removed from all epoll sets automatically? + * + * A6 Yes, but be aware of the following point. A file descriptor is a reference to an open file description (see + * open(2)). Whenever a descriptor is duplicated via dup(2), dup2(2), fcntl(2) F_DUPFD, or fork(2), a new file + * descriptor referring to the same open file description is created. An open file description continues to + * exist until all file descriptors referring to it have been closed. A file descriptor is removed from an + * epoll set only after all the file descriptors referring to the underlying open file description have been + * closed (or before if the descriptor is explicitly removed using epoll_ctl(2) EPOLL_CTL_DEL). This means + * that even after a file descriptor that is part of an epoll set has been closed, events may be reported for + * that file descriptor if other file descriptors referring to the same underlying file description remain + * open. + * + * Using this check for valid magic field we try to ignore such events. + */ + if (ActiveSession == session || session->magic != ACTIVE_SESSION_MAGIC) + return; + + SaveSessionVariables(ActiveSession); + RestoreSessionGUCs(ActiveSession); + ActiveSession = session; + + MyProcPort = ActiveSession->port; + SetTempNamespaceState(ActiveSession->tempNamespace, + ActiveSession->tempToastNamespace); + pq_set_current_state(session->port->pqcomm_state, session->port, + session->eventSet); + whereToSendOutput = DestRemote; + + RestoreSessionGUCs(ActiveSession); + LoadSessionVariables(ActiveSession); +} + +static void +ResetCurrentSession(void) +{ + if (!ActiveSession) + return; + + whereToSendOutput = DestNone; + DeleteSession(ActiveSession); + pq_set_current_state(NULL, NULL, NULL); + SetTempNamespaceState(InvalidOid, InvalidOid); + ActiveSession = NULL; +} + +/* + * Free all memory associated with session and delete session object itself. + */ +static void +DeleteSession(SessionContext *session) +{ + elog(DEBUG1, "Delete session %p, id=%u, memory context=%p", + session, session->id, session->memory); + + if (OidIsValid(session->tempNamespace)) + ResetTempTableNamespace(session->tempNamespace); + + DropAllPreparedStatements(); + FreeWaitEventSet(session->eventSet); + RestoreSessionGUCs(session); + ReleaseSessionGUCs(session); + MemoryContextDelete(session->memory); + session->magic = REMOVED_SESSION_MAGIC; + pfree(session); + + on_shmem_exit_reset(); + pgstat_report_stat(true); +} /* ---------------------------------------------------------------- * PostgresMain @@ -3656,6 +3817,33 @@ PostgresMain(int argc, char *argv[], progname))); } + /* Serve all conections to "postgres" database by dedicated backends */ + if (IsDedicatedBackend) + { + SessionPoolSize = 0; + closesocket(SessionPoolSock); + SessionPoolSock = PGINVALID_SOCKET; + } + + if (IsUnderPostmaster && !IsDedicatedBackend) + { + elog(DEBUG1, "Session pooling is active on %s database", dbname); + + /* Initialize sessions pool for this backend */ + Assert(SessionPool == NULL); + SessionPool = (BackendSessionPool *) MemoryContextAllocZero( + TopMemoryContext, sizeof(BackendSessionPool)); + SessionPool->mcxt = AllocSetContextCreate(TopMemoryContext, + "SessionPoolContext", ALLOCSET_DEFAULT_SIZES); + + /* Save the original backend port here */ + SessionPool->backendPort = MyProcPort; + + ActiveSession = CreateSession(); + ActiveSession->port = MyProcPort; + ActiveSession->eventSet = pq_get_current_waitset(); + } + /* Acquire configuration parameters, unless inherited from postmaster */ if (!IsUnderPostmaster) { @@ -3784,7 +3972,7 @@ PostgresMain(int argc, char *argv[], * ... else we'd need to copy the Port data first. Also, subsidiary data * such as the username isn't lost either; see ProcessStartupPacket(). */ - if (PostmasterContext) + if (PostmasterContext && SessionPoolSize == 0) { MemoryContextDelete(PostmasterContext); PostmasterContext = NULL; @@ -3922,7 +4110,8 @@ PostgresMain(int argc, char *argv[], pq_comm_reset(); /* Report the error to the client and/or server log */ - EmitErrorReport(); + if (MyProcPort) + EmitErrorReport(); /* * Make sure debug_query_string gets reset before we possibly clobber @@ -3982,13 +4171,26 @@ PostgresMain(int argc, char *argv[], * messages from the client, so there isn't much we can do with the * connection anymore. */ - if (pq_is_reading_msg()) + if (pq_is_reading_msg() && !ActiveSession) ereport(FATAL, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("terminating connection because protocol synchronization was lost"))); /* Now we can allow interrupts again */ RESUME_INTERRUPTS(); + + if (ActiveSession) + { + if (IdleInTransactionSessionError || (IsAbortedTransactionBlockState() && pq_is_reading_msg())) + { + StartTransactionCommand(); + UserAbortTransactionBlock(); + CommitTransactionCommand(); + IdleInTransactionSessionError = false; + } + if (pq_is_reading_msg()) + goto CloseSession; + } } /* We can now handle ereport(ERROR) */ @@ -3997,10 +4199,30 @@ PostgresMain(int argc, char *argv[], if (!ignore_till_sync) send_ready_for_query = true; /* initially, or after error */ + + /* Initialize wait event set if we're using sessions pool */ + if (SessionPool && SessionPool->waitEvents == NULL) + { + /* Construct wait event set if not constructed yet */ + SessionPool->waitEvents = CreateWaitEventSet(SessionPool->mcxt, MaxSessions + 3); + /* Add event to detect postmaster death */ + AddWaitEventToSet(SessionPool->waitEvents, WL_POSTMASTER_DEATH, + PGINVALID_SOCKET, NULL, ActiveSession); + /* Add event for backends latch */ + AddWaitEventToSet(SessionPool->waitEvents, WL_LATCH_SET, + PGINVALID_SOCKET, MyLatch, ActiveSession); + /* Add event for accepting new sessions */ + AddWaitEventToSet(SessionPool->waitEvents, WL_SOCKET_READABLE, + SessionPoolSock, NULL, ActiveSession); + /* Add event for current session */ + AddWaitEventToSet(SessionPool->waitEvents, WL_SOCKET_READABLE, + ActiveSession->port->sock, NULL, ActiveSession); + SaveSessionVariables(&DefaultContext); + } + /* * Non-error queries loop here. */ - for (;;) { /* @@ -4076,6 +4298,130 @@ PostgresMain(int argc, char *argv[], ReadyForQuery(whereToSendOutput); send_ready_for_query = false; + + /* + * Here we perform multiplexing of client sessions if session pooling is enabled. + * As far as we perform transaction level pooling, + * rescheduling is done only when we are not in transaction. + */ + if (SessionPoolSock != PGINVALID_SOCKET + && !IsTransactionState() + && !IsAbortedTransactionBlockState() + && pq_available_bytes() == 0) + { + WaitEvent ready_client; + +ChooseSession: + DoingCommandRead = true; + /* Select which client session is ready to send new query */ + if (WaitEventSetWait(SessionPool->waitEvents, -1, + &ready_client, 1, PG_WAIT_CLIENT) != 1) + { + /* TODO: do some error recovery here */ + elog(FATAL, "Failed to poll client sessions"); + } + CHECK_FOR_INTERRUPTS(); + DoingCommandRead = false; + + if (ready_client.events & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"))); + + if (ready_client.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + ProcessClientReadInterrupt(true); + goto ChooseSession; + } + + if (ready_client.fd == SessionPoolSock) + { + /* Here we handle case of attaching new session */ + SessionContext* session; + StringInfoData buf; + Port* port; + pgsocket sock; + MemoryContext oldcontext; + + sock = pg_recv_sock(SessionPoolSock); + if (sock == PGINVALID_SOCKET) + elog(ERROR, "Failed to receive session socket: %m"); + + session = CreateSession(); + + /* Initialize port and wait event set for this session */ + oldcontext = MemoryContextSwitchTo(session->memory); + MyProcPort = port = palloc(sizeof(Port)); + memcpy(port, SessionPool->backendPort, sizeof(Port)); + + /* + * Receive the startup packet (which might turn out to be + * a cancel request packet). + */ + port->sock = sock; + port->pqcomm_state = pq_init(session->memory); + + session->port = port; + session->eventSet = + pq_create_backend_event_set(session->memory, port, false); + pq_set_current_state(session->port->pqcomm_state, + port, + session->eventSet); + whereToSendOutput = DestRemote; + + MemoryContextSwitchTo(oldcontext); + + if (AddWaitEventToSet(SessionPool->waitEvents, WL_SOCKET_READABLE, + sock, NULL, session) < 0) + { + elog(WARNING, "Too much pooled sessions: %d", MaxSessions); + DeleteSession(session); + ActiveSession = NULL; + closesocket(sock); + goto ChooseSession; + } + + elog(DEBUG1, "Start new session %d in backend %d " + "for database %s user %s", (int)sock, MyProcPid, + port->database_name, port->user_name); + + SaveSessionVariables(ActiveSession); + RestoreSessionGUCs(ActiveSession); + ActiveSession = session; + InitializeSessionVariables(session); + LoadSessionVariables(session); + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + PerformAuthentication(MyProcPort); + process_settings(MyDatabaseId, GetSessionUserId()); + CommitTransactionCommand(); + SetTempNamespaceState(InvalidOid, InvalidOid); + + /* + * Send GUC options to the client + */ + BeginReportingGUCOptions(); + + /* + * Send this backend's cancellation info to the frontend. + */ + pq_beginmessage(&buf, 'K'); + pq_sendint(&buf, (int32) MyProcPid, 4); + pq_sendint(&buf, (int32) MyCancelKey, 4); + pq_endmessage(&buf); + + /* Need not flush since ReadyForQuery will do it. */ + send_ready_for_query = true; + + continue; + } + else + { + SessionContext* session = (SessionContext *) ready_client.user_data; + SwitchToSession(session); + } + } } /* @@ -4118,6 +4464,8 @@ PostgresMain(int argc, char *argv[], */ if (ConfigReloadPending) { + if (ActiveSession && RestartPoolerOnReload) + proc_exit(0); ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); } @@ -4355,6 +4703,46 @@ PostgresMain(int argc, char *argv[], * it will fail to be called during other backend-shutdown * scenarios. */ + + if (SessionPool) + { +CloseSession: + /* In case of session pooling close the session, but do not terminate the backend + * even if there are not more sessions in this backend. + * The reason for keeping backend alive is to prevent redundant process launches if + * some client repeatedly open/close connection to the database. + * Maximal number of launched backends in case of connection pooling is intended to be + * optimal for this system and workload, so there are no reasons to try to reduce this number + * when there are no active sessions. + */ + if (MyProcPort) + { + elog(DEBUG1, "Closing session %d in backend %d", MyProcPort->sock, MyProcPid); + + DeleteWaitEventFromSet(SessionPool->waitEvents, MyProcPort->sock); + + pq_getmsgend(&input_message); + if (pq_is_reading_msg()) + pq_endmsgread(); + + closesocket(MyProcPort->sock); + MyProcPort->sock = PGINVALID_SOCKET; + MyProcPort = NULL; + } + + if (ActiveSession) + { + StartTransactionCommand(); + UserAbortTransactionBlock(); + CommitTransactionCommand(); + + ResetCurrentSession(); + } + + /* Need to perform rescheduling to some other session or accept new session */ + goto ChooseSession; + } + elog(DEBUG1, "Terminate backend %d", MyProcPid); proc_exit(0); case 'd': /* copy data */ diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index e95e347..6726195 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -875,6 +875,17 @@ pg_backend_pid(PG_FUNCTION_ARGS) PG_RETURN_INT32(MyProcPid); } +Datum +pg_session_id(PG_FUNCTION_ARGS) +{ + char *s; + if (ActiveSession) + s = psprintf("%d.%u", MyProcPid, ActiveSession->id); + else + s = psprintf("%d", MyProcPid); + + PG_RETURN_TEXT_P(CStringGetTextDatum(s)); +} Datum pg_stat_get_backend_pid(PG_FUNCTION_ARGS) diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 7271b58..6b0cb54 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -61,6 +61,7 @@ #include "parser/analyze.h" #include "parser/parsetree.h" #include "storage/lmgr.h" +#include "storage/proc.h" #include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/inval.h" diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 6125421..7ce5671 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -78,6 +78,7 @@ #include "rewrite/rewriteDefine.h" #include "rewrite/rowsecurity.h" #include "storage/lmgr.h" +#include "storage/proc.h" #include "storage/smgr.h" #include "utils/array.h" #include "utils/builtins.h" @@ -1943,6 +1944,13 @@ RelationIdGetRelation(Oid relationId) Assert(rd->rd_isvalid || (rd->rd_isnailed && !criticalRelcachesBuilt)); } + /* + * In case of session pooling, relation descriptor can be constructed by some other session, + * so we need to recheck rd_islocaltemp value + */ + if (ActiveSession && RELATION_IS_OTHER_TEMP(rd) && isTempOrTempToastNamespace(rd->rd_rel->relnamespace)) + rd->rd_islocaltemp = true; + return rd; } diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index f7d6617..0617f4b 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -128,7 +128,9 @@ int max_parallel_maintenance_workers = 2; * register background workers. */ int NBuffers = 1000; +int SessionPoolSize = 0; int MaxConnections = 90; +int MaxSessions = 1000; int max_worker_processes = 8; int max_parallel_workers = 8; int MaxBackends = 0; @@ -147,3 +149,6 @@ int VacuumCostBalance = 0; /* working state for vacuum */ bool VacuumCostActive = false; double vacuum_cleanup_index_scale_factor; + +bool RestartPoolerOnReload = false; +char *DedicatedDatabases; diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 865119d..715429a 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -250,19 +250,6 @@ ChangeToDataDir(void) * convenient way to do it. * ---------------------------------------------------------------- */ -static Oid AuthenticatedUserId = InvalidOid; -static Oid SessionUserId = InvalidOid; -static Oid OuterUserId = InvalidOid; -static Oid CurrentUserId = InvalidOid; - -/* We also have to remember the superuser state of some of these levels */ -static bool AuthenticatedUserIsSuperuser = false; -static bool SessionUserIsSuperuser = false; - -static int SecurityRestrictionContext = 0; - -/* We also remember if a SET ROLE is currently active */ -static bool SetRoleIsActive = false; /* * Initialize the basic environment for a postmaster child @@ -345,13 +332,15 @@ InitStandaloneProcess(const char *argv0) void SwitchToSharedLatch(void) { + WaitEventSet *waitset; Assert(MyLatch == &LocalLatchData); Assert(MyProc != NULL); MyLatch = &MyProc->procLatch; - if (FeBeWaitSet) - ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch); + waitset = pq_get_current_waitset(); + if (waitset) + ModifyWaitEvent(waitset, 1, WL_LATCH_SET, MyLatch); /* * Set the shared latch as the local one might have been set. This @@ -364,13 +353,15 @@ SwitchToSharedLatch(void) void SwitchBackToLocalLatch(void) { + WaitEventSet *waitset; Assert(MyLatch != &LocalLatchData); Assert(MyProc != NULL && MyLatch == &MyProc->procLatch); MyLatch = &LocalLatchData; - if (FeBeWaitSet) - ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch); + waitset = pq_get_current_waitset(); + if (waitset) + ModifyWaitEvent(waitset, 1, WL_LATCH_SET, MyLatch); SetLatch(MyLatch); } @@ -434,6 +425,8 @@ SetSessionUserId(Oid userid, bool is_superuser) /* We force the effective user IDs to match, too */ OuterUserId = userid; CurrentUserId = userid; + + SysCacheInvalidate(AUTHOID, 0); } /* diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 5ef6315..f1d6834 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -62,10 +62,8 @@ #include "utils/timeout.h" #include "utils/tqual.h" - static HeapTuple GetDatabaseTuple(const char *dbname); static HeapTuple GetDatabaseTupleByOid(Oid dboid); -static void PerformAuthentication(Port *port); static void CheckMyDatabase(const char *name, bool am_superuser, bool override_allow_connections); static void InitCommunication(void); static void ShutdownPostgres(int code, Datum arg); @@ -74,7 +72,6 @@ static void LockTimeoutHandler(void); static void IdleInTransactionSessionTimeoutHandler(void); static bool ThereIsAtLeastOneRole(void); static void process_startup_options(Port *port, bool am_superuser); -static void process_settings(Oid databaseid, Oid roleid); /*** InitPostgres support ***/ @@ -180,7 +177,7 @@ GetDatabaseTupleByOid(Oid dboid) * * returns: nothing. Will not return at all if there's any failure. */ -static void +void PerformAuthentication(Port *port) { /* This should be set already, but let's make sure */ @@ -1126,7 +1123,7 @@ process_startup_options(Port *port, bool am_superuser) * We try specific settings for the database/role combination, as well as * general for this database and for this user. */ -static void +void process_settings(Oid databaseid, Oid roleid) { Relation relsetting; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 0625eff..f435356 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -59,6 +59,7 @@ #include "postmaster/autovacuum.h" #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" +#include "postmaster/connpool.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" @@ -587,6 +588,8 @@ const char *const config_group_names[] = gettext_noop("Connections and Authentication / Authentication"), /* CONN_AUTH_SSL */ gettext_noop("Connections and Authentication / SSL"), + /* CONN_POOLING */ + gettext_noop("Connections and Authentication / Connection Pooling"), /* RESOURCES */ gettext_noop("Resource Usage"), /* RESOURCES_MEM */ @@ -1192,6 +1195,16 @@ static struct config_bool ConfigureNamesBool[] = }, { + {"restart_pooler_on_reload", PGC_SIGHUP, CONN_POOLING, + gettext_noop("Restart session pool workers on pg_reload_conf()."), + NULL, + }, + &RestartPoolerOnReload, + false, + NULL, NULL, NULL + }, + + { {"log_duration", PGC_SUSET, LOGGING_WHAT, gettext_noop("Logs the duration of each completed SQL statement."), NULL @@ -1998,8 +2011,41 @@ static struct config_int ConfigureNamesInt[] = check_maxconnections, NULL, NULL }, + { + {"max_sessions", PGC_POSTMASTER, CONN_POOLING, + gettext_noop("Sets the maximum number of client session."), + gettext_noop("Maximal number of client sessions which can be handled by one backend if session pooling is switched on. " + "So maximal number of client connections is session_pool_size*max_sessions") + }, + &MaxSessions, + 1000, 1, INT_MAX, + NULL, NULL, NULL + }, + { - /* see max_connections and max_wal_senders */ + {"session_pool_size", PGC_POSTMASTER, CONN_POOLING, + gettext_noop("Sets number of backends serving client sessions."), + gettext_noop("If non-zero then session pooling will be used: " + "client connections will be redirected to one of the backends and maximal number of backends is determined by this parameter." + "Launched backend are never terminated even in case of no active sessions.") + }, + &SessionPoolSize, + 10, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"connection_pool_workers", PGC_POSTMASTER, CONN_POOLING, + gettext_noop("Number of connection pool workers"), + NULL, + }, + &NumConnPoolWorkers, + 2, 0, MAX_CONNPOOL_WORKERS, + NULL, NULL, NULL + }, + + { + /* see max_connections and max_wal_senders */ {"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the number of connection slots reserved for superusers."), NULL @@ -3340,9 +3386,9 @@ static struct config_string ConfigureNamesString[] = { {"temp_tablespaces", PGC_USERSET, CLIENT_CONN_STATEMENT, - gettext_noop("Sets the tablespace(s) to use for temporary tables and sort files."), - NULL, - GUC_LIST_INPUT | GUC_LIST_QUOTE + gettext_noop("Sets the tablespace(s) to use for temporary tables and sort files."), + NULL, + GUC_LIST_INPUT | GUC_LIST_QUOTE }, &temp_tablespaces, "", @@ -3350,6 +3396,16 @@ static struct config_string ConfigureNamesString[] = }, { + {"dedicated_databases", PGC_USERSET, CONN_POOLING, + gettext_noop("Set of databases for which session pooling is disabled."), + NULL, + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &DedicatedDatabases, + "template0, template1, postgres" + }, + + { {"dynamic_library_path", PGC_SUSET, CLIENT_CONN_OTHER, gettext_noop("Sets the path for dynamically loadable modules."), gettext_noop("If a dynamically loadable module needs to be opened and " @@ -5346,6 +5402,164 @@ NewGUCNestLevel(void) } /* + * Save changed variables after SET command. + * It's important to restore variables as we add them to the list. + */ +static void +SaveSessionGUCs(SessionContext *session, + struct config_generic *gconf, + config_var_value *prior_val) +{ + SessionGUC *sg; + + /* Find needed GUC in active session */ + for (sg = session->gucs; + sg != NULL && sg->var != gconf; sg = sg->next); + + if (sg != NULL) + /* already there */ + return; + + sg = MemoryContextAllocZero(session->memory, sizeof(SessionGUC)); + sg->var = gconf; + sg->saved.extra = prior_val->extra; + + switch (gconf->vartype) + { + case PGC_BOOL: + sg->saved.val.boolval = prior_val->val.boolval; + break; + case PGC_INT: + sg->saved.val.intval = prior_val->val.intval; + break; + case PGC_REAL: + sg->saved.val.realval = prior_val->val.realval; + break; + case PGC_STRING: + sg->saved.val.stringval = prior_val->val.stringval; + break; + case PGC_ENUM: + sg->saved.val.enumval = prior_val->val.enumval; + break; + } + + if (session->gucs) + { + SessionGUC *latest; + + /* Move to end of the list */ + for (latest = session->gucs; + latest->next != NULL; latest = latest->next); + latest->next = sg; + } + else + session->gucs = sg; +} + +/* + * Set GUCs for this session + */ +void +RestoreSessionGUCs(SessionContext* session) +{ + SessionGUC *sg; + bool save_reporting_enabled; + + if (session == NULL) + return; + + save_reporting_enabled = reporting_enabled; + reporting_enabled = false; + + for (sg = session->gucs; sg != NULL; sg = sg->next) + { + void *saved_extra = sg->saved.extra; + void *old_extra = sg->var->extra; + + /* restore extra */ + sg->var->extra = saved_extra; + sg->saved.extra = old_extra; + + /* restore actual values */ + switch (sg->var->vartype) + { + case PGC_BOOL: + { + struct config_bool *conf = (struct config_bool *)sg->var; + bool oldval = *conf->variable; + *conf->variable = sg->saved.val.boolval; + if (conf->assign_hook) + conf->assign_hook(sg->saved.val.boolval, saved_extra); + + sg->saved.val.boolval = oldval; + break; + } + case PGC_INT: + { + struct config_int *conf = (struct config_int*) sg->var; + int oldval = *conf->variable; + *conf->variable = sg->saved.val.intval; + if (conf->assign_hook) + conf->assign_hook(sg->saved.val.intval, saved_extra); + sg->saved.val.intval = oldval; + break; + } + case PGC_REAL: + { + struct config_real *conf = (struct config_real*) sg->var; + double oldval = *conf->variable; + *conf->variable = sg->saved.val.realval; + if (conf->assign_hook) + conf->assign_hook(sg->saved.val.realval, saved_extra); + sg->saved.val.realval = oldval; + break; + } + case PGC_STRING: + { + struct config_string *conf = (struct config_string*) sg->var; + char* oldval = *conf->variable; + *conf->variable = sg->saved.val.stringval; + if (conf->assign_hook) + conf->assign_hook(sg->saved.val.stringval, saved_extra); + sg->saved.val.stringval = oldval; + break; + } + case PGC_ENUM: + { + struct config_enum *conf = (struct config_enum*) sg->var; + int oldval = *conf->variable; + *conf->variable = sg->saved.val.enumval; + if (conf->assign_hook) + conf->assign_hook(sg->saved.val.enumval, saved_extra); + sg->saved.val.enumval = oldval; + break; + } + } + } + reporting_enabled = save_reporting_enabled; +} + +/* + * Deallocate memory for session GUCs + */ +void +ReleaseSessionGUCs(SessionContext* session) +{ + SessionGUC* sg; + for (sg = session->gucs; sg != NULL; sg = sg->next) + { + if (sg->saved.extra) + set_extra_field(sg->var, &sg->saved.extra, NULL); + + if (sg->var->vartype == PGC_STRING) + { + struct config_string* conf = (struct config_string*)sg->var; + set_string_field(conf, &sg->saved.val.stringval, NULL); + } + } +} + +/* * Do GUC processing at transaction or subtransaction commit or abort, or * when exiting a function that has proconfig settings, or when undoing a * transient assignment to some GUC variables. (The name is thus a bit of @@ -5413,8 +5627,10 @@ AtEOXact_GUC(bool isCommit, int nestLevel) restoreMasked = true; else if (stack->state == GUC_SET) { - /* we keep the current active value */ - discard_stack_value(gconf, &stack->prior); + if (ActiveSession) + SaveSessionGUCs(ActiveSession, gconf, &stack->prior); + else + discard_stack_value(gconf, &stack->prior); } else /* must be GUC_LOCAL */ restorePrior = true; @@ -5440,8 +5656,8 @@ AtEOXact_GUC(bool isCommit, int nestLevel) case GUC_SET: /* next level always becomes SET */ - discard_stack_value(gconf, &stack->prior); - if (prev->state == GUC_SET_LOCAL) + discard_stack_value(gconf, &stack->prior); + if (prev->state == GUC_SET_LOCAL) discard_stack_value(gconf, &prev->masked); prev->state = GUC_SET; break; diff --git a/src/backend/utils/misc/superuser.c b/src/backend/utils/misc/superuser.c index fbe83c9..1ebc379 100644 --- a/src/backend/utils/misc/superuser.c +++ b/src/backend/utils/misc/superuser.c @@ -24,6 +24,7 @@ #include "catalog/pg_authid.h" #include "utils/inval.h" #include "utils/syscache.h" +#include "storage/proc.h" #include "miscadmin.h" @@ -33,8 +34,6 @@ * the status of the last requested roleid. The cache can be flushed * at need by watching for cache update events on pg_authid. */ -static Oid last_roleid = InvalidOid; /* InvalidOid == cache not valid */ -static bool last_roleid_is_super = false; static bool roleid_callback_registered = false; static void RoleidCallback(Datum arg, int cacheid, uint32 hashvalue); diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c index 04ea32f..a8c27a3 100644 --- a/src/backend/utils/mmgr/portalmem.c +++ b/src/backend/utils/mmgr/portalmem.c @@ -23,6 +23,7 @@ #include "commands/portalcmds.h" #include "miscadmin.h" #include "storage/ipc.h" +#include "storage/proc.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/snapmgr.h" @@ -53,11 +54,14 @@ typedef struct portalhashent static HTAB *PortalHashTable = NULL; +#define CurrentPortalHashTable() \ + (ActiveSession ? ActiveSession->portals : PortalHashTable) + #define PortalHashTableLookup(NAME, PORTAL) \ do { \ PortalHashEnt *hentry; \ \ - hentry = (PortalHashEnt *) hash_search(PortalHashTable, \ + hentry = (PortalHashEnt *) hash_search(CurrentPortalHashTable(), \ (NAME), HASH_FIND, NULL); \ if (hentry) \ PORTAL = hentry->portal; \ @@ -69,7 +73,7 @@ do { \ do { \ PortalHashEnt *hentry; bool found; \ \ - hentry = (PortalHashEnt *) hash_search(PortalHashTable, \ + hentry = (PortalHashEnt *) hash_search(CurrentPortalHashTable(), \ (NAME), HASH_ENTER, &found); \ if (found) \ elog(ERROR, "duplicate portal name"); \ @@ -82,7 +86,7 @@ do { \ do { \ PortalHashEnt *hentry; \ \ - hentry = (PortalHashEnt *) hash_search(PortalHashTable, \ + hentry = (PortalHashEnt *) hash_search(CurrentPortalHashTable(), \ PORTAL->name, HASH_REMOVE, NULL); \ if (hentry == NULL) \ elog(WARNING, "trying to delete portal name that does not exist"); \ @@ -90,12 +94,33 @@ do { \ static MemoryContext TopPortalContext = NULL; - /* ---------------------------------------------------------------- * public portal interface functions * ---------------------------------------------------------------- */ +HTAB * +CreatePortalsHashTable(MemoryContext mcxt) +{ + HASHCTL ctl; + int flags = HASH_ELEM; + + ctl.keysize = MAX_PORTALNAME_LEN; + ctl.entrysize = sizeof(PortalHashEnt); + + if (mcxt) + { + ctl.hcxt = mcxt; + flags |= HASH_CONTEXT; + } + + /* + * use PORTALS_PER_USER as a guess of how many hash table entries to + * create, initially + */ + return hash_create("Portal hash", PORTALS_PER_USER, &ctl, flags); +} + /* * EnablePortalManager * Enables the portal management module at backend startup. @@ -103,23 +128,13 @@ static MemoryContext TopPortalContext = NULL; void EnablePortalManager(void) { - HASHCTL ctl; - Assert(TopPortalContext == NULL); TopPortalContext = AllocSetContextCreate(TopMemoryContext, - "TopPortalContext", - ALLOCSET_DEFAULT_SIZES); - - ctl.keysize = MAX_PORTALNAME_LEN; - ctl.entrysize = sizeof(PortalHashEnt); + "TopPortalContext", + ALLOCSET_DEFAULT_SIZES); - /* - * use PORTALS_PER_USER as a guess of how many hash table entries to - * create, initially - */ - PortalHashTable = hash_create("Portal hash", PORTALS_PER_USER, - &ctl, HASH_ELEM); + PortalHashTable = CreatePortalsHashTable(NULL); } /* @@ -602,11 +617,14 @@ PortalHashTableDeleteAll(void) { HASH_SEQ_STATUS status; PortalHashEnt *hentry; + HTAB *htab; - if (PortalHashTable == NULL) + htab = CurrentPortalHashTable(); + + if (htab == NULL) return; - hash_seq_init(&status, PortalHashTable); + hash_seq_init(&status, htab); while ((hentry = hash_seq_search(&status)) != NULL) { Portal portal = hentry->portal; @@ -619,7 +637,7 @@ PortalHashTableDeleteAll(void) /* Restart the iteration in case that led to other drops */ hash_seq_term(&status); - hash_seq_init(&status, PortalHashTable); + hash_seq_init(&status, htab); } } @@ -672,8 +690,10 @@ PreCommit_Portals(bool isPrepare) bool result = false; HASH_SEQ_STATUS status; PortalHashEnt *hentry; + HTAB *htab; - hash_seq_init(&status, PortalHashTable); + htab = CurrentPortalHashTable(); + hash_seq_init(&status, htab); while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { @@ -746,7 +766,7 @@ PreCommit_Portals(bool isPrepare) * caused a drop of the next portal in the hash chain. */ hash_seq_term(&status); - hash_seq_init(&status, PortalHashTable); + hash_seq_init(&status, htab); } return result; @@ -763,8 +783,11 @@ AtAbort_Portals(void) { HASH_SEQ_STATUS status; PortalHashEnt *hentry; + HTAB *htab; + + htab = CurrentPortalHashTable(); - hash_seq_init(&status, PortalHashTable); + hash_seq_init(&status, htab); while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { @@ -840,8 +863,11 @@ AtCleanup_Portals(void) { HASH_SEQ_STATUS status; PortalHashEnt *hentry; + HTAB *htab; - hash_seq_init(&status, PortalHashTable); + htab = CurrentPortalHashTable(); + + hash_seq_init(&status, htab); while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { @@ -899,8 +925,10 @@ PortalErrorCleanup(void) { HASH_SEQ_STATUS status; PortalHashEnt *hentry; + HTAB *htab; - hash_seq_init(&status, PortalHashTable); + htab = CurrentPortalHashTable(); + hash_seq_init(&status, htab); while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { @@ -927,8 +955,9 @@ AtSubCommit_Portals(SubTransactionId mySubid, { HASH_SEQ_STATUS status; PortalHashEnt *hentry; + HTAB *htab = CurrentPortalHashTable(); - hash_seq_init(&status, PortalHashTable); + hash_seq_init(&status, htab); while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { @@ -962,8 +991,11 @@ AtSubAbort_Portals(SubTransactionId mySubid, { HASH_SEQ_STATUS status; PortalHashEnt *hentry; + HTAB *htab; + + htab = CurrentPortalHashTable(); - hash_seq_init(&status, PortalHashTable); + hash_seq_init(&status, htab); while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { @@ -1072,8 +1104,9 @@ AtSubCleanup_Portals(SubTransactionId mySubid) { HASH_SEQ_STATUS status; PortalHashEnt *hentry; + HTAB *htab = CurrentPortalHashTable(); - hash_seq_init(&status, PortalHashTable); + hash_seq_init(&status, htab); while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { @@ -1161,7 +1194,7 @@ pg_cursor(PG_FUNCTION_ARGS) /* generate junk in short-term context */ MemoryContextSwitchTo(oldcontext); - hash_seq_init(&hash_seq, PortalHashTable); + hash_seq_init(&hash_seq, CurrentPortalHashTable()); while ((hentry = hash_seq_search(&hash_seq)) != NULL) { Portal portal = hentry->portal; @@ -1200,7 +1233,7 @@ ThereAreNoReadyPortals(void) HASH_SEQ_STATUS status; PortalHashEnt *hentry; - hash_seq_init(&status, PortalHashTable); + hash_seq_init(&status, CurrentPortalHashTable()); while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { @@ -1229,8 +1262,11 @@ HoldPinnedPortals(void) { HASH_SEQ_STATUS status; PortalHashEnt *hentry; + HTAB *htab; + + htab = CurrentPortalHashTable(); - hash_seq_init(&status, PortalHashTable); + hash_seq_init(&status, htab); while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) { diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index 0e20237..ddcc3c8 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -144,7 +144,9 @@ extern void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId); extern void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId); -extern void ResetTempTableNamespace(void); + +struct SessionContext; +extern void ResetTempTableNamespace(Oid npc); extern OverrideSearchPath *GetOverrideSearchPath(MemoryContext context); extern OverrideSearchPath *CopyOverrideSearchPath(OverrideSearchPath *path); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a146510..62fb7a4 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5202,6 +5202,9 @@ { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', prorettype => 'int4', proargtypes => '', prosrc => 'pg_backend_pid' }, +{ oid => '3436', descr => 'statistics: current session ID', + proname => 'pg_session_id', provolatile => 's', proparallel => 'r', + prorettype => 'int4', proargtypes => '', prosrc => 'pg_session_id' }, { oid => '1937', descr => 'statistics: PID of backend', proname => 'pg_stat_get_backend_pid', provolatile => 's', proparallel => 'r', prorettype => 'int4', proargtypes => 'int4', diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h index ffec029..fdf1854 100644 --- a/src/include/commands/prepare.h +++ b/src/include/commands/prepare.h @@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt); extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt); extern void DropAllPreparedStatements(void); +extern void DropSessionPreparedStatements(uint32 sessionId); #endif /* PREPARE_H */ diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index ef5528c..bb6d359 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -66,6 +66,7 @@ typedef struct #include "datatype/timestamp.h" #include "libpq/hba.h" #include "libpq/pqcomm.h" +#include "storage/latch.h" typedef enum CAC_state @@ -139,6 +140,12 @@ typedef struct Port List *guc_options; /* + * libpq communication state + */ + void *pqcomm_state; + WaitEventSet *pqcomm_waitset; + + /* * Information that needs to be held during the authentication cycle. */ HbaLine *hba; diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 36baf6b..10ba28b 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -60,7 +60,12 @@ extern int StreamConnection(pgsocket server_fd, Port *port); extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); extern void RemoveSocketFiles(void); -extern void pq_init(void); +extern void *pq_init(MemoryContext mcxt); +extern void pq_reset(void); +extern void pq_set_current_state(void *state, Port *port, WaitEventSet *set); +extern WaitEventSet *pq_get_current_waitset(void); +extern WaitEventSet *pq_create_backend_event_set(MemoryContext mcxt, + Port *port, bool onlySock); extern int pq_getbytes(char *s, size_t len); extern int pq_getstring(StringInfo s); extern void pq_startmsgread(void); @@ -71,6 +76,7 @@ extern int pq_getbyte(void); extern int pq_peekbyte(void); extern int pq_getbyte_if_available(unsigned char *c); extern int pq_putbytes(const char *s, size_t len); +extern int pq_available_bytes(void); /* * prototypes for functions in be-secure.c @@ -96,8 +102,6 @@ extern ssize_t secure_raw_write(Port *port, const void *ptr, size_t len); extern bool ssl_loaded_verify_locations; -extern WaitEventSet *FeBeWaitSet; - /* GUCs */ extern char *SSLCipherSuites; extern char *SSLECDHCurve; diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index e167ee8..9652340 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -26,6 +26,7 @@ #include #include "pgtime.h" /* for pg_time_t */ +#include "utils/palloc.h" #define InvalidPid (-1) @@ -150,6 +151,9 @@ extern PGDLLIMPORT bool IsUnderPostmaster; extern PGDLLIMPORT bool IsBackgroundWorker; extern PGDLLIMPORT bool IsBinaryUpgrade; +extern PGDLLIMPORT bool RestartPoolerOnReload; +extern PGDLLIMPORT char* DedicatedDatabases; + extern PGDLLIMPORT bool ExitOnAnyError; extern PGDLLIMPORT char *DataDir; @@ -158,10 +162,14 @@ extern PGDLLIMPORT int data_directory_mode; extern PGDLLIMPORT int NBuffers; extern PGDLLIMPORT int MaxBackends; extern PGDLLIMPORT int MaxConnections; +extern PGDLLIMPORT int MaxSessions; +extern PGDLLIMPORT int SessionPoolSize; +extern PGDLLIMPORT int SessionPoolPorts; extern PGDLLIMPORT int max_worker_processes; extern PGDLLIMPORT int max_parallel_workers; extern PGDLLIMPORT int MyProcPid; +extern PGDLLIMPORT uint32 MySessionId; extern PGDLLIMPORT pg_time_t MyStartTime; extern PGDLLIMPORT struct Port *MyProcPort; extern PGDLLIMPORT struct Latch *MyLatch; @@ -335,6 +343,9 @@ extern void SwitchBackToLocalLatch(void); extern bool superuser(void); /* current user is superuser */ extern bool superuser_arg(Oid roleid); /* given user is superuser */ +/* in utils/init/postinit.c */ +void process_settings(Oid databaseid, Oid roleid); + /***************************************************************************** * pmod.h -- * @@ -425,6 +436,7 @@ extern void InitializeMaxBackends(void); extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, char *out_dbname, bool override_allow_connections); extern void BaseInit(void); +extern void PerformAuthentication(struct Port *port); /* in utils/init/miscinit.c */ extern bool IgnoreSystemIndexes; @@ -445,6 +457,9 @@ extern void process_session_preload_libraries(void); extern void pg_bindtextdomain(const char *domain); extern bool has_rolreplication(Oid roleid); +void *GetLocalUserIdStateCopy(MemoryContext mcxt); +void SetCurrentUserIdState(void *userId); + /* in access/transam/xlog.c */ extern bool BackupInProgress(void); extern void CancelBackup(void); diff --git a/src/include/port.h b/src/include/port.h index 74a9dc4..ac53f3c 100644 --- a/src/include/port.h +++ b/src/include/port.h @@ -41,6 +41,10 @@ typedef SOCKET pgsocket; extern bool pg_set_noblock(pgsocket sock); extern bool pg_set_block(pgsocket sock); +/* send/receive socket descriptor */ +extern int pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid); +extern pgsocket pg_recv_sock(pgsocket chan); + /* Portable path handling for Unix/Win32 (in path.c) */ extern bool has_drive_prefix(const char *filename); diff --git a/src/include/port/win32_port.h b/src/include/port/win32_port.h index b398cd3..01971bc 100644 --- a/src/include/port/win32_port.h +++ b/src/include/port/win32_port.h @@ -447,6 +447,7 @@ extern int pgkill(int pid, int sig); #define select(n, r, w, e, timeout) pgwin32_select(n, r, w, e, timeout) #define recv(s, buf, len, flags) pgwin32_recv(s, buf, len, flags) #define send(s, buf, len, flags) pgwin32_send(s, buf, len, flags) +#define socketpair(af, type, protocol, socks) pgwin32_socketpair(af, type, protocol, socks) SOCKET pgwin32_socket(int af, int type, int protocol); int pgwin32_bind(SOCKET s, struct sockaddr *addr, int addrlen); @@ -456,6 +457,7 @@ int pgwin32_connect(SOCKET s, const struct sockaddr *name, int namelen); int pgwin32_select(int nfds, fd_set *readfs, fd_set *writefds, fd_set *exceptfds, const struct timeval *timeout); int pgwin32_recv(SOCKET s, char *buf, int len, int flags); int pgwin32_send(SOCKET s, const void *buf, int len, int flags); +int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2]); const char *pgwin32_socket_strerror(int err); int pgwin32_waitforsinglesocket(SOCKET s, int what, int timeout); diff --git a/src/include/postmaster/connpool.h b/src/include/postmaster/connpool.h new file mode 100644 index 0000000..45aa37c --- /dev/null +++ b/src/include/postmaster/connpool.h @@ -0,0 +1,54 @@ +#ifndef CONN_POOL_H +#define CONN_POOL_H + +#include "port.h" +#include "libpq/libpq-be.h" + +#define MAX_CONNPOOL_WORKERS 100 + +typedef enum +{ + CPW_FREE, + CPW_NEW_SOCKET, + CPW_PROCESSED +} ConnPoolWorkerState; + +enum CAC_STATE; + +typedef struct ConnPoolWorker +{ + Port *port; /* port in the pool */ + int pipes[2]; /* 0 for sending, 1 for receiving */ + + /* the communication procedure: + * ) find a worker with state == CPW_FREE + * ) assign client socket + * ) add pipe to wait set (if it's not there) + * ) wake up the worker. + * ) process data from the worker until state != CPW_PROCESSED + * ) set state to CPW_FREE + * ) fork or send socket and the data to backend. + * + * bgworker + * ) wokes up + * ) check the state + * ) if stats is CPW_NEW_SOCKET gets data from clientsock and + * send the data through pipe to postmaster. + * ) set state to CPW_PROCESSED. + */ + volatile ConnPoolWorkerState state; + volatile CAC_state cac_state; + pid_t pid; + Latch *latch; +} ConnPoolWorker; + +extern Size ConnPoolShmemSize(void); +extern void ConnectionPoolWorkersInit(void); +extern void RegisterConnPoolWorkers(void); +extern void StartupPacketReaderMain(Datum arg); + +/* global variables */ +extern int NumConnPoolWorkers; +extern ConnPoolWorker *ConnPoolWorkers; + +#endif diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 1877eef..1f16836 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -62,6 +62,10 @@ extern Size ShmemBackendArraySize(void); extern void ShmemBackendArrayAllocation(void); #endif +struct Port; +extern int ProcessStartupPacket(struct Port *port, bool SSLdone, + MemoryContext memctx, int errlevel); + /* * Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved * for buffer references in buf_internals.h. This limitation could be lifted diff --git a/src/include/storage/ipc.h b/src/include/storage/ipc.h index 6a05a89..9cddaf9 100644 --- a/src/include/storage/ipc.h +++ b/src/include/storage/ipc.h @@ -72,6 +72,7 @@ extern void on_shmem_exit(pg_on_exit_callback function, Datum arg); extern void before_shmem_exit(pg_on_exit_callback function, Datum arg); extern void cancel_before_shmem_exit(pg_on_exit_callback function, Datum arg); extern void on_exit_reset(void); +extern void on_shmem_exit_reset(void); /* ipci.c */ extern PGDLLIMPORT shmem_startup_hook_type shmem_startup_hook; diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index fd8735b..c7dd708 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); +extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd); + /* * Unix implementation uses SIGUSR1 for inter-process signaling. * Win32 doesn't need this. diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index cb613c8..f3c1079 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -21,6 +21,7 @@ #include "storage/lock.h" #include "storage/pg_sema.h" #include "storage/proclist_types.h" +#include "utils/guc_tables.h" /* * Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds @@ -276,6 +277,57 @@ extern PGDLLIMPORT PROC_HDR *ProcGlobal; extern PGPROC *PreparedXactProcs; +typedef struct SessionGUC +{ + struct SessionGUC *next; + config_var_value saved; + struct config_generic *var; +} SessionGUC; + +/* + * Information associated with client session. + */ +typedef struct SessionContext +{ + uint32 magic; /* Magic to validate content of session object */ + uint32 id; /* session identifier, unique across many backends */ + /* Memory context used for global session data (instead of TopMemoryContext) */ + MemoryContext memory; + struct Port* port; /* connection port */ + Oid tempNamespace; /* temporary namespace */ + Oid tempToastNamespace; /* temporary toast namespace */ + SessionGUC *gucs; /* session local GUCs */ + WaitEventSet *eventSet; /* Wait set for the session */ + HTAB *prepared_queries; /* Session prepared queries */ + HTAB *portals; /* Session portals */ + void *userId; /* Current role state */ + #define SessionVariable(type,name,init) type name; + #include "storage/sessionvars.h" +} SessionContext; + +#define SessionVariable(type,name,init) extern type name; +#include "storage/sessionvars.h" + +typedef struct Port Port; +typedef struct BackendSessionPool +{ + MemoryContext mcxt; + + WaitEventSet *waitEvents; /* Set of all sessions sockets */ + uint32 sessionCount; /* Number of sessions */ + + /* + * Reference to the original port of this backend created when this backend + * was launched. Session using this port may be already terminated, + * but since it is allocated in TopMemoryContext, its content is still + * valid and is used as template for ports of new sessions + */ + Port *backendPort; +} BackendSessionPool; + +extern PGDLLIMPORT SessionContext *ActiveSession; +extern PGDLLIMPORT BackendSessionPool *SessionPool; + /* Accessor for PGPROC given a pgprocno. */ #define GetPGProcByNumber(n) (&ProcGlobal->allProcs[(n)]) @@ -295,7 +347,7 @@ extern int StatementTimeout; extern int LockTimeout; extern int IdleInTransactionSessionTimeout; extern bool log_lock_waits; - +extern bool IsDedicatedBackend; /* * Function Prototypes @@ -321,6 +373,7 @@ extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock); extern void CheckDeadLockAlert(void); extern bool IsWaitingForLock(void); extern void LockErrorCleanup(void); +extern uint32 CreateSessionId(void); extern void ProcWaitForSignal(uint32 wait_event_info); extern void ProcSendSignal(int pid); diff --git a/src/include/storage/sessionvars.h b/src/include/storage/sessionvars.h new file mode 100644 index 0000000..690c56f --- /dev/null +++ b/src/include/storage/sessionvars.h @@ -0,0 +1,13 @@ +/* SessionVariable(type,name,init) */ +SessionVariable(Oid, AuthenticatedUserId, InvalidOid) +SessionVariable(Oid, SessionUserId, InvalidOid) +SessionVariable(Oid, OuterUserId, InvalidOid) +SessionVariable(Oid, CurrentUserId, InvalidOid) +SessionVariable(bool, AuthenticatedUserIsSuperuser, false) +SessionVariable(bool, SessionUserIsSuperuser, false) +SessionVariable(int, SecurityRestrictionContext, 0) +SessionVariable(bool, SetRoleIsActive, false) +SessionVariable(Oid, last_roleid, InvalidOid) +SessionVariable(bool, last_roleid_is_super, false) +SessionVariable(struct SeqTableData*, last_used_seq, NULL) +#undef SessionVariable diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 63b4e48..51d130c 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -31,9 +31,11 @@ #define STACK_DEPTH_SLOP (512 * 1024L) extern CommandDest whereToSendOutput; + extern PGDLLIMPORT const char *debug_query_string; extern int max_stack_depth; extern int PostAuthDelay; +extern pgsocket SessionPoolSock; /* GUC-configurable parameters */ diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index f462eab..338f0ec 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -395,6 +395,12 @@ extern Size EstimateGUCStateSpace(void); extern void SerializeGUCState(Size maxsize, char *start_address); extern void RestoreGUCState(void *gucstate); +/* Session polling support function */ +struct SessionContext; +extern void RestoreSessionGUCs(struct SessionContext* session); +extern void ReleaseSessionGUCs(struct SessionContext* session); + + /* Support for messages reported from GUC check hooks */ extern PGDLLIMPORT char *GUC_check_errmsg_string; diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index 668d9ef..e3f2e5a 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -58,6 +58,7 @@ enum config_group CONN_AUTH_SETTINGS, CONN_AUTH_AUTH, CONN_AUTH_SSL, + CONN_POOLING, RESOURCES, RESOURCES_MEM, RESOURCES_DISK, diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h index e4929b9..69ac10d 100644 --- a/src/include/utils/portal.h +++ b/src/include/utils/portal.h @@ -202,6 +202,7 @@ typedef struct PortalData /* Prototypes for functions in utils/mmgr/portalmem.c */ +HTAB *CreatePortalsHashTable(MemoryContext mcxt); extern void EnablePortalManager(void); extern bool PreCommit_Portals(bool isPrepare); extern void AtAbort_Portals(void);