diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index cec37ce..892d807 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -114,7 +114,7 @@ PrepareQuery(PrepareStmt *stmt, const char *queryString) */ query = parse_analyze_varparams((Node *) copyObject(stmt->query), queryString, - &argtypes, &nargs); + &argtypes, &nargs, NULL); /* * Check that all parameter types were determined. diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index defafa5..a522c69 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -674,12 +674,17 @@ show_random_seed(void) * SET CLIENT_ENCODING */ +void (*check_client_encoding_hook)(void); + bool check_client_encoding(char **newval, void **extra, GucSource source) { int encoding; const char *canonical_name; + if (check_client_encoding_hook) + check_client_encoding_hook(); + /* Look up the encoding by name */ encoding = pg_valid_client_encoding(*newval); if (encoding < 0) diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index bfe66c6..7c7dc92 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -47,6 +47,11 @@ static PQcommMethods PqCommMqMethods = { mq_endcopyout }; +static PQcommMethods *save_PqCommMethods; +static CommandDest save_whereToSendOutput; +static ProtocolVersion save_FrontendProtocol; +static dsm_segment *save_seg; + /* * Arrange to redirect frontend/backend protocol messages to a shared-memory * message queue. @@ -54,12 +59,30 @@ static PQcommMethods PqCommMqMethods = { void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) { + save_PqCommMethods = PqCommMethods; + save_whereToSendOutput = whereToSendOutput; + save_FrontendProtocol = FrontendProtocol; + PqCommMethods = &PqCommMqMethods; pq_mq = shm_mq_get_queue(mqh); pq_mq_handle = mqh; whereToSendOutput = DestRemote; FrontendProtocol = PG_PROTOCOL_LATEST; on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0); + + save_seg = seg; +} + +void +pq_stop_redirect_to_shm_mq(void) +{ + cancel_on_dsm_detach(save_seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0); + PqCommMethods = save_PqCommMethods; + whereToSendOutput = save_whereToSendOutput; + FrontendProtocol = save_FrontendProtocol; + pq_mq = NULL; + pq_mq_handle = NULL; + save_seg = NULL; } /* diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index eac86cc..5b94d85 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -124,7 +124,7 @@ parse_analyze(Node *parseTree, const char *sourceText, */ Query * parse_analyze_varparams(Node *parseTree, const char *sourceText, - Oid **paramTypes, int *numParams) + Oid **paramTypes, int *numParams, const char *paramNames[]) { ParseState *pstate = make_parsestate(NULL); Query *query; @@ -133,7 +133,7 @@ parse_analyze_varparams(Node *parseTree, const char *sourceText, pstate->p_sourcetext = sourceText; - parse_variable_parameters(pstate, paramTypes, numParams); + parse_variable_parameters(pstate, paramTypes, numParams, paramNames); query = transformTopLevelStmt(pstate, parseTree); diff --git a/src/backend/parser/parse_param.c b/src/backend/parser/parse_param.c index b402843..c459c00 100644 --- a/src/backend/parser/parse_param.c +++ b/src/backend/parser/parse_param.c @@ -49,8 +49,10 @@ typedef struct VarParamState { Oid **paramTypes; /* array of parameter type OIDs */ int *numParams; /* number of array entries */ + const char **paramNames; } VarParamState; +static Node *variable_post_column_ref_hook(ParseState *pstate, ColumnRef *cref, Node *var); static Node *fixed_paramref_hook(ParseState *pstate, ParamRef *pref); static Node *variable_paramref_hook(ParseState *pstate, ParamRef *pref); static Node *variable_coerce_param_hook(ParseState *pstate, Param *param, @@ -81,17 +83,58 @@ parse_fixed_parameters(ParseState *pstate, */ void parse_variable_parameters(ParseState *pstate, - Oid **paramTypes, int *numParams) + Oid **paramTypes, int *numParams, const char *paramNames[]) { VarParamState *parstate = palloc(sizeof(VarParamState)); parstate->paramTypes = paramTypes; parstate->numParams = numParams; + parstate->paramNames = paramNames; + pstate->p_post_columnref_hook = variable_post_column_ref_hook; pstate->p_ref_hook_state = (void *) parstate; pstate->p_paramref_hook = variable_paramref_hook; pstate->p_coerce_param_hook = variable_coerce_param_hook; } +static Node * +variable_post_column_ref_hook(ParseState *pstate, ColumnRef *cref, Node *var) +{ + VarParamState *parstate = (VarParamState *) pstate->p_ref_hook_state; + + /* already resolved */ + if (var != NULL) + return NULL; + + /* did not supply parameter names */ + if (!parstate->paramNames) + return NULL; + + if (list_length(cref->fields) == 1) + { + Node *field1 = (Node *) linitial(cref->fields); + char *name1; + int i; + Param *param; + + Assert(IsA(field1, String)); + name1 = strVal(field1); + for (i = 0; i < *parstate->numParams; i++) + if (strcmp(name1, parstate->paramNames[i]) == 0) + { + param = makeNode(Param); + param->paramkind = PARAM_EXTERN; + param->paramid = i + 1; + param->paramtype = *parstate->paramTypes[i]; + param->paramtypmod = -1; + param->paramcollid = InvalidOid; + param->location = -1; + return (Node *) param; + } + } + + return NULL; +} + /* * Transform a ParamRef using fixed parameter types. */ diff --git a/src/backend/tcop/Makefile b/src/backend/tcop/Makefile index 674302f..c3b337e 100644 --- a/src/backend/tcop/Makefile +++ b/src/backend/tcop/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/tcop top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS= dest.o fastpath.o postgres.o pquery.o utility.o +OBJS= autonomous.o dest.o fastpath.o postgres.o pquery.o utility.o ifneq (,$(filter $(PORTNAME),cygwin win32)) override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT) diff --git a/src/backend/tcop/autonomous.c b/src/backend/tcop/autonomous.c new file mode 100644 index 0000000..bf78382 --- /dev/null +++ b/src/backend/tcop/autonomous.c @@ -0,0 +1,882 @@ +/*-------------------------------------------------------------------------- + * + * autonomous.c + * Run SQL commands using a background worker. + * + * Copyright (C) 2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/tcop/autonomous.c + * + * + * This implements a C API to open an autonomous session and run SQL queries + * in it. The session looks much like a normal database connection, but it is + * always to the same database, and there is no authentication needed. The + * "backend" for that connection is a background worker. The normal backend + * and the autonomous session worker communicate over the normal FE/BE + * protocol. + * + * Types: + * + * AutonomousSession -- opaque connection handle + * AutonomousPreparedStatement -- opaque prepared statement handle + * AutonomousResult -- query result + * + * Functions: + * + * AutonomousSessionStart() -- start a session (launches background worker) + * and return a handle + * + * AutonomousSessionEnd() -- close session and free resources + * + * AutonomousSessionExecute() -- run SQL string and return result (rows or + * status) + * + * AutonomousSessionPrepare() -- prepare an SQL string for subsequent + * execution + * + * AutonomousSessionExecutePrepared() -- run prepared statement + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/tupdesc.h" +#include "access/xact.h" +#include "commands/async.h" +#include "commands/variable.h" +#include "lib/stringinfo.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "nodes/pg_list.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "storage/dsm.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "tcop/autonomous.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +/* Table-of-contents constants for our dynamic shared memory segment. */ +#define AUTONOMOUS_MAGIC 0x50674267 + +#define AUTONOMOUS_KEY_FIXED_DATA 0 +#define AUTONOMOUS_KEY_GUC 1 +#define AUTONOMOUS_KEY_COMMAND_QUEUE 2 +#define AUTONOMOUS_KEY_RESPONSE_QUEUE 3 +#define AUTONOMOUS_NKEYS 4 + +#define AUTONOMOUS_QUEUE_SIZE 16384 + +/* Fixed-size data passed via our dynamic shared memory segment. */ +typedef struct autonomous_session_fixed_data +{ + Oid database_id; + Oid authenticated_user_id; + Oid current_user_id; + int sec_context; +} autonomous_session_fixed_data; + +struct AutonomousSession +{ + dsm_segment *seg; + BackgroundWorkerHandle *worker_handle; + shm_mq_handle *command_qh; + shm_mq_handle *response_qh; + int transaction_status; +}; + +struct AutonomousPreparedStatement +{ + AutonomousSession *session; + Oid *argtypes; + TupleDesc tupdesc; +}; + +static void autonomous_worker_main(Datum main_arg); +static void shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg); +static void autonomous_check_client_encoding_hook(void); +static TupleDesc TupleDesc_from_RowDescription(StringInfo msg); +static HeapTuple HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg); +static void forward_NotifyResponse(StringInfo msg); +static void rethrow_errornotice(StringInfo msg); +static void invalid_protocol_message(char msgtype) pg_attribute_noreturn(); + + +AutonomousSession * +AutonomousSessionStart(void) +{ + BackgroundWorker worker; + pid_t pid; + AutonomousSession *session; + shm_toc_estimator e; + Size segsize; + Size guc_len; + char *gucstate; + dsm_segment *seg; + shm_toc *toc; + autonomous_session_fixed_data *fdata; + shm_mq *command_mq; + shm_mq *response_mq; + BgwHandleStatus bgwstatus; + StringInfoData msg; + char msgtype; + + session = palloc(sizeof(*session)); + + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(autonomous_session_fixed_data)); + shm_toc_estimate_chunk(&e, AUTONOMOUS_QUEUE_SIZE); + shm_toc_estimate_chunk(&e, AUTONOMOUS_QUEUE_SIZE); + guc_len = EstimateGUCStateSpace(); + shm_toc_estimate_chunk(&e, guc_len); + shm_toc_estimate_keys(&e, AUTONOMOUS_NKEYS); + segsize = shm_toc_estimate(&e); + seg = dsm_create(segsize, 0); + + session->seg = seg; + + toc = shm_toc_create(AUTONOMOUS_MAGIC, dsm_segment_address(seg), segsize); + + /* Store fixed-size data in dynamic shared memory. */ + fdata = shm_toc_allocate(toc, sizeof(*fdata)); + fdata->database_id = MyDatabaseId; + fdata->authenticated_user_id = GetAuthenticatedUserId(); + GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context); + shm_toc_insert(toc, AUTONOMOUS_KEY_FIXED_DATA, fdata); + + /* Store GUC state in dynamic shared memory. */ + gucstate = shm_toc_allocate(toc, guc_len); + SerializeGUCState(guc_len, gucstate); + shm_toc_insert(toc, AUTONOMOUS_KEY_GUC, gucstate); + + command_mq = shm_mq_create(shm_toc_allocate(toc, AUTONOMOUS_QUEUE_SIZE), + AUTONOMOUS_QUEUE_SIZE); + shm_toc_insert(toc, AUTONOMOUS_KEY_COMMAND_QUEUE, command_mq); + shm_mq_set_sender(command_mq, MyProc); + + response_mq = shm_mq_create(shm_toc_allocate(toc, AUTONOMOUS_QUEUE_SIZE), + AUTONOMOUS_QUEUE_SIZE); + shm_toc_insert(toc, AUTONOMOUS_KEY_RESPONSE_QUEUE, response_mq); + shm_mq_set_receiver(response_mq, MyProc); + + session->command_qh = shm_mq_attach(command_mq, seg, NULL); + session->response_qh = shm_mq_attach(response_mq, seg, NULL); + + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = autonomous_worker_main; + snprintf(worker.bgw_name, BGW_MAXLEN, "autonomous session by PID %d", MyProcPid); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &session->worker_handle)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You might need to increase max_worker_processes."))); + + shm_mq_set_handle(session->command_qh, session->worker_handle); + shm_mq_set_handle(session->response_qh, session->worker_handle); + + bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid); + if (bgwstatus != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start background worker"))); + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'E': + case 'N': + rethrow_errornotice(&msg); + break; + case 'Z': + session->transaction_status = pq_getmsgbyte(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'Z'); + + return session; +} + + +void +AutonomousSessionEnd(AutonomousSession *session) +{ + StringInfoData msg; + + if (session->transaction_status == 'T') + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("autonomous session ended with transaction block open"))); + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'X'); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + pfree(session->worker_handle); + dsm_detach(session->seg); + pfree(session); +} + + +AutonomousResult * +AutonomousSessionExecute(AutonomousSession *session, const char *sql) +{ + StringInfoData msg; + char msgtype; + AutonomousResult *result; + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'Q'); + pq_sendstring(&msg, sql); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + result = palloc0(sizeof(*result)); + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'C': + { + const char *tag = pq_getmsgstring(&msg); + result->command = pstrdup(tag); + pq_getmsgend(&msg); + break; + } + case 'D': + if (!result->tupdesc) + elog(ERROR, "no T before D"); + result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(result->tupdesc, &msg)); + pq_getmsgend(&msg); + break; + case 'E': + case 'N': + rethrow_errornotice(&msg); + break; + case 'T': + if (result->tupdesc) + elog(ERROR, "already received a T message"); + result->tupdesc = TupleDesc_from_RowDescription(&msg); + pq_getmsgend(&msg); + break; + case 'Z': + session->transaction_status = pq_getmsgbyte(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'Z'); + + return result; +} + + +AutonomousPreparedStatement * +AutonomousSessionPrepare(AutonomousSession *session, const char *sql, int16 nargs, + Oid argtypes[], const char *argnames[]) +{ + AutonomousPreparedStatement *result; + StringInfoData msg; + int16 i; + char msgtype; + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'P'); + pq_sendstring(&msg, ""); + pq_sendstring(&msg, sql); + pq_sendint(&msg, nargs, 2); + for (i = 0; i < nargs; i++) + pq_sendint(&msg, argtypes[i], 4); + if (argnames) + for (i = 0; i < nargs; i++) + pq_sendstring(&msg, argnames[i]); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + result = palloc0(sizeof(*result)); + result->session = session; + result->argtypes = palloc(nargs * sizeof(*result->argtypes)); + memcpy(result->argtypes, argtypes, nargs * sizeof(*result->argtypes)); + + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case '1': + break; + case 'E': + rethrow_errornotice(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'D'); + pq_sendbyte(&msg, 'S'); + pq_sendstring(&msg, ""); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'E': + rethrow_errornotice(&msg); + break; + case 'n': + break; + case 't': + /* ignore for now */ + break; + case 'T': + if (result->tupdesc) + elog(ERROR, "already received a T message"); + result->tupdesc = TupleDesc_from_RowDescription(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'n' && msgtype != 'T'); + + return result; +} + + +AutonomousResult * +AutonomousSessionExecutePrepared(AutonomousPreparedStatement *stmt, int16 nargs, Datum *values, bool *nulls) +{ + AutonomousSession *session; + StringInfoData msg; + AutonomousResult *result; + char msgtype; + int16 i; + + session = stmt->session; + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'B'); + pq_sendstring(&msg, ""); + pq_sendstring(&msg, ""); + pq_sendint(&msg, 1, 2); /* number of parameter format codes */ + pq_sendint(&msg, 1, 2); + pq_sendint(&msg, nargs, 2); /* number of parameter values */ + for (i = 0; i < nargs; i++) + { + if (nulls[i]) + pq_sendint(&msg, -1, 4); + else + { + Oid typsend; + bool typisvarlena; + bytea *outputbytes; + + getTypeBinaryOutputInfo(stmt->argtypes[i], &typsend, &typisvarlena); + outputbytes = OidSendFunctionCall(typsend, values[i]); + pq_sendint(&msg, VARSIZE(outputbytes) - VARHDRSZ, 4); + pq_sendbytes(&msg, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ); + pfree(outputbytes); + } + } + pq_sendint(&msg, 1, 2); /* number of result column format codes */ + pq_sendint(&msg, 1, 2); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case '2': + break; + case 'E': + rethrow_errornotice(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'E'); + pq_sendstring(&msg, ""); + pq_sendint(&msg, 0, 4); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + result = palloc0(sizeof(*result)); + result->tupdesc = stmt->tupdesc; + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'C': + { + const char *tag = pq_getmsgstring(&msg); + result->command = pstrdup(tag); + pq_getmsgend(&msg); + break; + } + case 'D': + if (!stmt->tupdesc) + elog(ERROR, "did not expect any rows"); + result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(stmt->tupdesc, &msg)); + pq_getmsgend(&msg); + break; + case 'E': + case 'N': + rethrow_errornotice(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'C'); + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_putemptymessage('S'); + pq_stop_redirect_to_shm_mq(); + + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'Z': + session->transaction_status = pq_getmsgbyte(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + + return result; +} + + +static void +autonomous_worker_main(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + autonomous_session_fixed_data *fdata; + char *gucstate; + shm_mq *command_mq; + shm_mq *response_mq; + shm_mq_handle *command_qh; + shm_mq_handle *response_qh; + StringInfoData msg; + char msgtype; + + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "autonomous"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "autonomous session", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory segment"))); + + toc = shm_toc_attach(AUTONOMOUS_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Find data structures in dynamic shared memory. */ + fdata = shm_toc_lookup(toc, AUTONOMOUS_KEY_FIXED_DATA); + + gucstate = shm_toc_lookup(toc, AUTONOMOUS_KEY_GUC); + + command_mq = shm_toc_lookup(toc, AUTONOMOUS_KEY_COMMAND_QUEUE); + shm_mq_set_receiver(command_mq, MyProc); + command_qh = shm_mq_attach(command_mq, seg, NULL); + + response_mq = shm_toc_lookup(toc, AUTONOMOUS_KEY_RESPONSE_QUEUE); + shm_mq_set_sender(response_mq, MyProc); + response_qh = shm_mq_attach(response_mq, seg, NULL); + + pq_redirect_to_shm_mq(seg, response_qh); + BackgroundWorkerInitializeConnectionByOid(fdata->database_id, + fdata->authenticated_user_id); + + SetClientEncoding(GetDatabaseEncoding()); + + StartTransactionCommand(); + RestoreGUCState(gucstate); + CommitTransactionCommand(); + + process_session_preload_libraries(); + + SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context); + + whereToSendOutput = DestRemote; + ReadyForQuery(whereToSendOutput); + + MessageContext = AllocSetContextCreate(TopMemoryContext, + "MessageContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + do + { + MemoryContextSwitchTo(MessageContext); + MemoryContextResetAndDeleteChildren(MessageContext); + + ProcessCompletedNotifies(); + pgstat_report_stat(false); + pgstat_report_activity(STATE_IDLE, NULL); + + shm_mq_receive_stringinfo(command_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'B': + { + SetCurrentStatementStartTimestamp(); + exec_bind_message(&msg); + break; + } + case 'D': + { + int describe_type; + const char *describe_target; + + SetCurrentStatementStartTimestamp(); + + describe_type = pq_getmsgbyte(&msg); + describe_target = pq_getmsgstring(&msg); + pq_getmsgend(&msg); + + switch (describe_type) + { + case 'S': + exec_describe_statement_message(describe_target); + break; +#ifdef TODO + case 'P': + exec_describe_portal_message(describe_target); + break; +#endif + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid DESCRIBE message subtype %d", + describe_type))); + break; + } + } + break; + case 'E': + { + const char *portal_name; + int max_rows; + + SetCurrentStatementStartTimestamp(); + + portal_name = pq_getmsgstring(&msg); + max_rows = pq_getmsgint(&msg, 4); + pq_getmsgend(&msg); + + exec_execute_message(portal_name, max_rows); + } + break; + + case 'P': + { + const char *stmt_name; + const char *query_string; + int numParams; + Oid *paramTypes = NULL; + const char **paramNames = NULL; + + SetCurrentStatementStartTimestamp(); + + stmt_name = pq_getmsgstring(&msg); + query_string = pq_getmsgstring(&msg); + numParams = pq_getmsgint(&msg, 2); + if (numParams > 0) + { + int i; + + paramTypes = palloc(numParams * sizeof(Oid)); + for (i = 0; i < numParams; i++) + paramTypes[i] = pq_getmsgint(&msg, 4); + } + /* If data left in message, read parameter names. */ + if (msg.cursor != msg.len) + { + int i; + + paramNames = palloc(numParams * sizeof(char *)); + for (i = 0; i < numParams; i++) + paramNames[i] = pq_getmsgstring(&msg); + } + pq_getmsgend(&msg); + + exec_parse_message(query_string, stmt_name, + paramTypes, numParams, paramNames); + break; + } + case 'Q': + { + const char *sql; + int save_log_statement; + bool save_log_duration; + int save_log_min_duration_statement; + + sql = pq_getmsgstring(&msg); + pq_getmsgend(&msg); + + /* XXX room for improvement */ + save_log_statement = log_statement; + save_log_duration = log_duration; + save_log_min_duration_statement = log_min_duration_statement; + + check_client_encoding_hook = autonomous_check_client_encoding_hook; + log_statement = LOGSTMT_NONE; + log_duration = false; + log_min_duration_statement = -1; + + SetCurrentStatementStartTimestamp(); + exec_simple_query(sql, 1); + + log_statement = save_log_statement; + log_duration = save_log_duration; + log_min_duration_statement = save_log_min_duration_statement; + check_client_encoding_hook = NULL; + + ReadyForQuery(whereToSendOutput); + break; + } + case 'S': + { + pq_getmsgend(&msg); + finish_xact_command(); + ReadyForQuery(whereToSendOutput); + break; + } + case 'X': + break; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid protocol message type from autonomous session leader: %c", + msgtype))); + break; + } + } + while (msgtype != 'X'); +} + + +static void +shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg) +{ + shm_mq_result res; + Size nbytes; + void *data; + + res = shm_mq_receive(qh, &nbytes, &data, false); + if (res != SHM_MQ_SUCCESS) + elog(ERROR, "shm_mq_receive failed: %d", res); + + initStringInfo(msg); + appendBinaryStringInfo(msg, data, nbytes); +} + + +static void +autonomous_check_client_encoding_hook(void) +{ + elog(ERROR, "cannot set client encoding in autonomous session"); +} + + +static TupleDesc +TupleDesc_from_RowDescription(StringInfo msg) +{ + TupleDesc tupdesc; + int16 natts = pq_getmsgint(msg, 2); + int16 i; + + tupdesc = CreateTemplateTupleDesc(natts, false); + for (i = 0; i < natts; i++) + { + const char *colname; + Oid type_oid; + int32 typmod; + int16 format; + + colname = pq_getmsgstring(msg); + (void) pq_getmsgint(msg, 4); /* table OID */ + (void) pq_getmsgint(msg, 2); /* table attnum */ + type_oid = pq_getmsgint(msg, 4); + (void) pq_getmsgint(msg, 2); /* type length */ + typmod = pq_getmsgint(msg, 4); + format = pq_getmsgint(msg, 2); + (void) format; +#ifdef TODO + /* XXX The protocol sometimes sends 0 (text) if the format is not + * determined yet. We always use binary, so this check is probably + * not useful. */ + if (format != 1) + elog(ERROR, "format must be binary"); +#endif + + TupleDescInitEntry(tupdesc, i + 1, colname, type_oid, typmod, 0); + } + return tupdesc; +} + + +static HeapTuple +HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg) +{ + int16 natts = pq_getmsgint(msg, 2); + int16 i; + Datum *values; + bool *nulls; + StringInfoData buf; + + Assert(tupdesc); + + if (natts != tupdesc->natts) + elog(ERROR, "malformed DataRow"); + + values = palloc(natts * sizeof(*values)); + nulls = palloc(natts * sizeof(*nulls)); + initStringInfo(&buf); + + for (i = 0; i < natts; i++) + { + int32 len = pq_getmsgint(msg, 4); + + if (len < 0) + nulls[i] = true; + else + { + Oid recvid; + Oid typioparams; + + nulls[i] = false; + + getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid, + &recvid, + &typioparams); + resetStringInfo(&buf); + appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, len), len); + values[i] = OidReceiveFunctionCall(recvid, &buf, typioparams, + tupdesc->attrs[i]->atttypmod); + } + } + + return heap_form_tuple(tupdesc, values, nulls); +} + + +static void +forward_NotifyResponse(StringInfo msg) +{ + int32 pid; + const char *channel; + const char *payload; + + pid = pq_getmsgint(msg, 4); + channel = pq_getmsgrawstring(msg); + payload = pq_getmsgrawstring(msg); + pq_endmessage(msg); + + NotifyMyFrontEnd(channel, payload, pid); +} + + +static void +rethrow_errornotice(StringInfo msg) +{ + ErrorData edata; + + pq_parse_errornotice(msg, &edata); + edata.elevel = Min(edata.elevel, ERROR); + ThrowErrorData(&edata); +} + + +static void +invalid_protocol_message(char msgtype) +{ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid protocol message type from autonomous session: %c", + msgtype))); +} diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 98ccbbb..2a7d4d7 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -180,8 +180,6 @@ static int errdetail_execute(List *raw_parsetree_list); static int errdetail_params(ParamListInfo params); static int errdetail_abort(void); static int errdetail_recovery_conflict(void); -static void start_xact_command(void); -static void finish_xact_command(void); static bool IsTransactionExitStmt(Node *parsetree); static bool IsTransactionExitStmtList(List *parseTrees); static bool IsTransactionStmtList(List *parseTrees); @@ -869,8 +867,8 @@ pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams) * * Execute a "simple Query" protocol message. */ -static void -exec_simple_query(const char *query_string) +void +exec_simple_query(const char *query_string, int16 format) { CommandDest dest = whereToSendOutput; MemoryContext oldcontext; @@ -963,7 +961,6 @@ exec_simple_query(const char *query_string) *plantree_list; Portal portal; DestReceiver *receiver; - int16 format; /* * Get the command name for use in status display (it also becomes the @@ -1054,6 +1051,8 @@ exec_simple_query(const char *query_string) */ PortalStart(portal, NULL, 0, InvalidSnapshot); + if (format < 0) + { /* * Select the appropriate output format: text unless we are doing a * FETCH from a binary cursor. (Pretty grotty to have to do this here @@ -1074,6 +1073,7 @@ exec_simple_query(const char *query_string) format = 1; /* BINARY */ } } + } PortalSetResultFormat(portal, 1, &format); /* @@ -1185,11 +1185,12 @@ exec_simple_query(const char *query_string) * * Execute a "Parse" protocol message. */ -static void +void exec_parse_message(const char *query_string, /* string to execute */ const char *stmt_name, /* name for prepared stmt */ - Oid *paramTypes, /* parameter types */ - int numParams) /* number of parameters */ + Oid paramTypes[], /* parameter types */ + int numParams, /* number of parameters */ + const char *paramNames[]) { MemoryContext unnamed_stmt_context = NULL; MemoryContext oldcontext; @@ -1328,7 +1329,8 @@ exec_parse_message(const char *query_string, /* string to execute */ query = parse_analyze_varparams(raw_parse_tree, query_string, ¶mTypes, - &numParams); + &numParams, + paramNames); /* * Check all parameter types got determined. @@ -1447,7 +1449,7 @@ exec_parse_message(const char *query_string, /* string to execute */ * * Process a "Bind" message to create a portal from a prepared statement */ -static void +void exec_bind_message(StringInfo input_message) { const char *portal_name; @@ -1829,7 +1831,7 @@ exec_bind_message(StringInfo input_message) * * Process an "Execute" message for a portal */ -static void +void exec_execute_message(const char *portal_name, long max_rows) { CommandDest dest; @@ -2278,7 +2280,7 @@ errdetail_recovery_conflict(void) * * Process a "Describe" message for a prepared statement */ -static void +void exec_describe_statement_message(const char *stmt_name) { CachedPlanSource *psrc; @@ -2422,7 +2424,7 @@ exec_describe_portal_message(const char *portal_name) /* * Convenience routines for starting/committing a single command. */ -static void +void start_xact_command(void) { if (!xact_started) @@ -2442,7 +2444,7 @@ start_xact_command(void) } } -static void +void finish_xact_command(void) { if (xact_started) @@ -4067,7 +4069,7 @@ PostgresMain(int argc, char *argv[], if (am_walsender) exec_replication_command(query_string); else - exec_simple_query(query_string); + exec_simple_query(query_string, -1); send_ready_for_query = true; } @@ -4099,7 +4101,7 @@ PostgresMain(int argc, char *argv[], pq_getmsgend(&input_message); exec_parse_message(query_string, stmt_name, - paramTypes, numParams); + paramTypes, numParams, NULL); } break; diff --git a/src/include/commands/variable.h b/src/include/commands/variable.h index 8105951..73faff7 100644 --- a/src/include/commands/variable.h +++ b/src/include/commands/variable.h @@ -29,6 +29,7 @@ extern bool check_transaction_deferrable(bool *newval, void **extra, GucSource s extern bool check_random_seed(double *newval, void **extra, GucSource source); extern void assign_random_seed(double newval, void *extra); extern const char *show_random_seed(void); +extern void (*check_client_encoding_hook)(void); extern bool check_client_encoding(char **newval, void **extra, GucSource source); extern void assign_client_encoding(const char *newval, void *extra); extern bool check_session_authorization(char **newval, void **extra, GucSource source); diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h index 8c03acb..6cc0090 100644 --- a/src/include/libpq/pqmq.h +++ b/src/include/libpq/pqmq.h @@ -17,6 +17,7 @@ #include "storage/shm_mq.h" extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh); +extern void pq_stop_redirect_to_shm_mq(void); extern void pq_set_parallel_master(pid_t pid, BackendId backend_id); extern void pq_parse_errornotice(StringInfo str, ErrorData *edata); diff --git a/src/include/parser/analyze.h b/src/include/parser/analyze.h index 5ba322a..2168c00 100644 --- a/src/include/parser/analyze.h +++ b/src/include/parser/analyze.h @@ -25,7 +25,7 @@ extern PGDLLIMPORT post_parse_analyze_hook_type post_parse_analyze_hook; extern Query *parse_analyze(Node *parseTree, const char *sourceText, Oid *paramTypes, int numParams); extern Query *parse_analyze_varparams(Node *parseTree, const char *sourceText, - Oid **paramTypes, int *numParams); + Oid **paramTypes, int *numParams, const char *paramNames[]); extern Query *parse_sub_analyze(Node *parseTree, ParseState *parentParseState, CommonTableExpr *parentCTE, diff --git a/src/include/parser/parse_param.h b/src/include/parser/parse_param.h index bbf608a..5f7e9fa 100644 --- a/src/include/parser/parse_param.h +++ b/src/include/parser/parse_param.h @@ -18,7 +18,7 @@ extern void parse_fixed_parameters(ParseState *pstate, Oid *paramTypes, int numParams); extern void parse_variable_parameters(ParseState *pstate, - Oid **paramTypes, int *numParams); + Oid **paramTypes, int *numParams, const char *paramNames[]); extern void check_variable_parameters(ParseState *pstate, Query *query); extern bool query_contains_extern_params(Query *query); diff --git a/src/include/tcop/autonomous.h b/src/include/tcop/autonomous.h new file mode 100644 index 0000000..9f833d7 --- /dev/null +++ b/src/include/tcop/autonomous.h @@ -0,0 +1,27 @@ +#ifndef AUTONOMOUS_H +#define AUTONOMOUS_H + +#include "access/tupdesc.h" +#include "nodes/pg_list.h" + +struct AutonomousSession; +typedef struct AutonomousSession AutonomousSession; + +struct AutonomousPreparedStatement; +typedef struct AutonomousPreparedStatement AutonomousPreparedStatement; + +typedef struct AutonomousResult +{ + TupleDesc tupdesc; + List *tuples; + char *command; +} AutonomousResult; + +AutonomousSession *AutonomousSessionStart(void); +void AutonomousSessionEnd(AutonomousSession *session); +AutonomousResult *AutonomousSessionExecute(AutonomousSession *session, const char *sql); +AutonomousPreparedStatement *AutonomousSessionPrepare(AutonomousSession *session, const char *sql, int16 nargs, + Oid argtypes[], const char *argnames[]); +AutonomousResult *AutonomousSessionExecutePrepared(AutonomousPreparedStatement *stmt, int16 nargs, Datum *values, bool *nulls); + +#endif /* AUTONOMOUS_H */ diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 7254355..150e972 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -57,6 +57,12 @@ extern PlannedStmt *pg_plan_query(Query *querytree, int cursorOptions, ParamListInfo boundParams); extern List *pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams); +extern void exec_simple_query(const char *query_string, int16 format); +extern void exec_parse_message(const char *query_string, const char *stmt_name, + Oid paramTypes[], int numParams, const char *paramNames[]); +extern void exec_bind_message(StringInfo input_message); +extern void exec_execute_message(const char *portal_name, long max_rows); +extern void exec_describe_statement_message(const char *stmt_name); extern bool check_max_stack_depth(int *newval, void **extra, GucSource source); extern void assign_max_stack_depth(int newval, void *extra); @@ -70,6 +76,9 @@ extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from S extern void ProcessClientReadInterrupt(bool blocked); extern void ProcessClientWriteInterrupt(bool blocked); +extern void start_xact_command(void); +extern void finish_xact_command(void); + extern void process_postgres_switches(int argc, char *argv[], GucContext ctx, const char **dbname); extern void PostgresMain(int argc, char *argv[], diff --git a/src/pl/plpgsql/src/.gitignore b/src/pl/plpgsql/src/.gitignore index 92387fa..ff6ac96 100644 --- a/src/pl/plpgsql/src/.gitignore +++ b/src/pl/plpgsql/src/.gitignore @@ -1,3 +1,6 @@ /pl_gram.c /pl_gram.h /plerrcodes.h +/log/ +/results/ +/tmp_check/ diff --git a/src/pl/plpgsql/src/Makefile b/src/pl/plpgsql/src/Makefile index e073b2a..97998ba 100644 --- a/src/pl/plpgsql/src/Makefile +++ b/src/pl/plpgsql/src/Makefile @@ -24,6 +24,8 @@ OBJS = pl_gram.o pl_handler.o pl_comp.o pl_exec.o \ DATA = plpgsql.control plpgsql--1.0.sql plpgsql--unpackaged--1.0.sql +REGRESS = plpgsql_autonomous + all: all-lib # Shared library stuff @@ -65,6 +67,19 @@ pl_gram.c: BISONFLAGS += -d plerrcodes.h: $(top_srcdir)/src/backend/utils/errcodes.txt generate-plerrcodes.pl $(PERL) $(srcdir)/generate-plerrcodes.pl $< > $@ + +check: submake + $(pg_regress_check) $(REGRESS_OPTS) $(REGRESS) + +installcheck: submake + $(pg_regress_installcheck) $(REGRESS_OPTS) $(REGRESS) + + +.PHONY: submake +submake: + $(MAKE) -C $(top_builddir)/src/test/regress pg_regress$(X) + + distprep: pl_gram.h pl_gram.c plerrcodes.h # pl_gram.c, pl_gram.h and plerrcodes.h are in the distribution tarball, diff --git a/src/pl/plpgsql/src/expected/plpgsql_autonomous.out b/src/pl/plpgsql/src/expected/plpgsql_autonomous.out new file mode 100644 index 0000000..3822c7a --- /dev/null +++ b/src/pl/plpgsql/src/expected/plpgsql_autonomous.out @@ -0,0 +1,72 @@ +CREATE TABLE test1 (a int); +CREATE FUNCTION autonomous_test() RETURNS integer +LANGUAGE plpgsql +AS $$ +DECLARE + PRAGMA AUTONOMOUS_TRANSACTION; +BEGIN + FOR i IN 0..9 LOOP + START TRANSACTION; + EXECUTE 'INSERT INTO test1 VALUES (' || i::text || ')'; + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + + RETURN 42; +END; +$$; +SELECT autonomous_test(); + autonomous_test +----------------- + 42 +(1 row) + +SELECT * FROM test1; + a +--- + 0 + 2 + 4 + 6 + 8 +(5 rows) + +TRUNCATE test1; +CREATE FUNCTION autonomous_test2() RETURNS integer +LANGUAGE plpgsql +AS $$ +DECLARE + PRAGMA AUTONOMOUS_TRANSACTION; +BEGIN + FOR i IN 0..9 LOOP + START TRANSACTION; + INSERT INTO test1 VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + + RETURN 42; +END; +$$; +SELECT autonomous_test2(); + autonomous_test2 +------------------ + 42 +(1 row) + +SELECT * FROM test1; + a +--- + + + + + +(5 rows) + diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c index 2f8b6ff..9f29a17 100644 --- a/src/pl/plpgsql/src/pl_exec.c +++ b/src/pl/plpgsql/src/pl_exec.c @@ -30,6 +30,7 @@ #include "parser/parse_coerce.h" #include "parser/scansup.h" #include "storage/proc.h" +#include "tcop/autonomous.h" #include "tcop/tcopprot.h" #include "utils/array.h" #include "utils/builtins.h" @@ -1176,6 +1177,9 @@ exec_stmt_block(PLpgSQL_execstate *estate, PLpgSQL_stmt_block *block) int i; int n; + if (block->autonomous) + estate->autonomous_session = AutonomousSessionStart(); + /* * First initialize all variables declared in this block */ @@ -1470,6 +1474,9 @@ exec_stmt_block(PLpgSQL_execstate *estate, PLpgSQL_stmt_block *block) estate->err_text = NULL; + if (block->autonomous) + AutonomousSessionEnd(estate->autonomous_session); + /* * Handle the return code. */ @@ -3437,6 +3444,8 @@ plpgsql_estate_setup(PLpgSQL_execstate *estate, } estate->rsi = rsi; + estate->autonomous_session = NULL; + estate->found_varno = func->found_varno; estate->ndatums = func->ndatums; estate->datums = palloc(sizeof(PLpgSQL_datum *) * estate->ndatums); @@ -3614,6 +3623,66 @@ exec_prepare_plan(PLpgSQL_execstate *estate, } +static +void build_symbol_table(PLpgSQL_execstate *estate, + PLpgSQL_nsitem *ns_start, + int *ret_nitems, + const char ***ret_names, + Oid **ret_types) +{ + PLpgSQL_nsitem *nsitem; + List *names = NIL; + List *types = NIL; + ListCell *lc1, *lc2; + int i, nitems; + const char **names_vector; + Oid *types_vector; + + for (nsitem = ns_start; + nsitem; + nsitem = nsitem->prev) + { + if (nsitem->itemtype == PLPGSQL_NSTYPE_VAR) + { + PLpgSQL_datum *datum; + PLpgSQL_var *var; + Oid typoid; + Value *name; + + if (strcmp(nsitem->name, "found") == 0) + continue; // XXX + elog(LOG, "namespace item variable itemno %d, name %s", + nsitem->itemno, nsitem->name); + datum = estate->datums[nsitem->itemno]; + Assert(datum->dtype == PLPGSQL_DTYPE_VAR); + var = (PLpgSQL_var *) datum; + name = makeString(nsitem->name); + typoid = var->datatype->typoid; + if (!list_member(names, name)) + { + names = lappend(names, name); + types = lappend_oid(types, typoid); + } + } + } + + Assert(list_length(names) == list_length(types)); + nitems = list_length(names); + names_vector = palloc(nitems * sizeof(char *)); + types_vector = palloc(nitems * sizeof(Oid)); + i = 0; + forboth(lc1, names, lc2, types) + { + names_vector[i] = pstrdup(strVal(lfirst(lc1))); + types_vector[i] = lfirst_oid(lc2); + i++; + } + + *ret_nitems = nitems; + *ret_names = names_vector; + *ret_types = types_vector; +} + /* ---------- * exec_stmt_execsql Execute an SQL statement (possibly with INTO). * @@ -3630,6 +3699,32 @@ exec_stmt_execsql(PLpgSQL_execstate *estate, int rc; PLpgSQL_expr *expr = stmt->sqlstmt; + if (estate->autonomous_session) + { + int nparams = 0; + int i; + const char **param_names = NULL; + Oid *param_types = NULL; + AutonomousPreparedStatement *astmt; + Datum *values; + bool *nulls; + AutonomousResult *aresult; + + build_symbol_table(estate, stmt->sqlstmt->ns, &nparams, ¶m_names, ¶m_types); + astmt = AutonomousSessionPrepare(estate->autonomous_session, stmt->sqlstmt->query, nparams, param_types, param_names); + + values = palloc(nparams * sizeof(*values)); + nulls = palloc(nparams * sizeof(*nulls)); + for (i = 0; i < nparams; i++) + { + nulls[i] = true; + //values[i] = TODO; + } + aresult = AutonomousSessionExecutePrepared(astmt, nparams, values, nulls); + exec_set_found(estate, (list_length(aresult->tuples) != 0)); + return PLPGSQL_RC_OK; + } + /* * On the first call for this statement generate the plan, and detect * whether the statement is INSERT/UPDATE/DELETE @@ -3871,6 +3966,12 @@ exec_stmt_dynexecute(PLpgSQL_execstate *estate, exec_eval_cleanup(estate); + if (estate->autonomous_session) + { + AutonomousSessionExecute(estate->autonomous_session, querystr); + return PLPGSQL_RC_OK; + } + /* * Execute the query without preparing a saved plan. */ diff --git a/src/pl/plpgsql/src/pl_gram.y b/src/pl/plpgsql/src/pl_gram.y index 0b41e3a..c017757 100644 --- a/src/pl/plpgsql/src/pl_gram.y +++ b/src/pl/plpgsql/src/pl_gram.y @@ -108,6 +108,8 @@ static PLpgSQL_expr *read_cursor_args(PLpgSQL_var *cursor, static List *read_raise_options(void); static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); +static bool last_pragma; + %} %expect 0 @@ -144,6 +146,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); char *label; int n_initvars; int *initvarnos; + bool autonomous; } declhdr; struct { @@ -313,6 +316,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); %token K_PG_EXCEPTION_CONTEXT %token K_PG_EXCEPTION_DETAIL %token K_PG_EXCEPTION_HINT +%token K_PRAGMA %token K_PRINT_STRICT_PARAMS %token K_PRIOR %token K_QUERY @@ -405,6 +409,7 @@ pl_block : decl_sect K_BEGIN proc_sect exception_sect K_END opt_label new->cmd_type = PLPGSQL_STMT_BLOCK; new->lineno = plpgsql_location_to_lineno(@2); new->label = $1.label; + new->autonomous = $1.autonomous; new->n_initvars = $1.n_initvars; new->initvarnos = $1.initvarnos; new->body = $3; @@ -425,6 +430,7 @@ decl_sect : opt_block_label $$.label = $1; $$.n_initvars = 0; $$.initvarnos = NULL; + $$.autonomous = false; } | opt_block_label decl_start { @@ -432,6 +438,7 @@ decl_sect : opt_block_label $$.label = $1; $$.n_initvars = 0; $$.initvarnos = NULL; + $$.autonomous = false; } | opt_block_label decl_start decl_stmts { @@ -439,6 +446,8 @@ decl_sect : opt_block_label $$.label = $1; /* Remember variables declared in decl_stmts */ $$.n_initvars = plpgsql_add_initdatums(&($$.initvarnos)); + $$.autonomous = last_pragma; + last_pragma = false; } ; @@ -446,6 +455,7 @@ decl_start : K_DECLARE { /* Forget any variables created before block */ plpgsql_add_initdatums(NULL); + last_pragma = false; /* * Disable scanner lookup of identifiers while * we process the decl_stmts @@ -586,6 +596,13 @@ decl_statement : decl_varname decl_const decl_datatype decl_collate decl_notnull new->cursor_explicit_argrow = $5->dno; new->cursor_options = CURSOR_OPT_FAST_PLAN | $2; } + | K_PRAGMA any_identifier ';' + { + if (pg_strcasecmp($2, "autonomous_transaction") == 0) + last_pragma = true; + else + elog(ERROR, "invalid pragma"); + } ; opt_scrollable : diff --git a/src/pl/plpgsql/src/pl_scanner.c b/src/pl/plpgsql/src/pl_scanner.c index 2737fff..ff26126 100644 --- a/src/pl/plpgsql/src/pl_scanner.c +++ b/src/pl/plpgsql/src/pl_scanner.c @@ -147,6 +147,7 @@ static const ScanKeyword unreserved_keywords[] = { PG_KEYWORD("pg_exception_context", K_PG_EXCEPTION_CONTEXT, UNRESERVED_KEYWORD) PG_KEYWORD("pg_exception_detail", K_PG_EXCEPTION_DETAIL, UNRESERVED_KEYWORD) PG_KEYWORD("pg_exception_hint", K_PG_EXCEPTION_HINT, UNRESERVED_KEYWORD) + PG_KEYWORD("pragma", K_PRAGMA, UNRESERVED_KEYWORD) PG_KEYWORD("print_strict_params", K_PRINT_STRICT_PARAMS, UNRESERVED_KEYWORD) PG_KEYWORD("prior", K_PRIOR, UNRESERVED_KEYWORD) PG_KEYWORD("query", K_QUERY, UNRESERVED_KEYWORD) diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h index b416e50..55216fc 100644 --- a/src/pl/plpgsql/src/plpgsql.h +++ b/src/pl/plpgsql/src/plpgsql.h @@ -22,6 +22,7 @@ #include "commands/event_trigger.h" #include "commands/trigger.h" #include "executor/spi.h" +#include "tcop/autonomous.h" /********************************************************************** * Definitions @@ -407,6 +408,7 @@ typedef struct PLpgSQL_stmt_block int cmd_type; int lineno; char *label; + bool autonomous; List *body; /* List of statements */ int n_initvars; int *initvarnos; @@ -903,6 +905,8 @@ typedef struct PLpgSQL_execstate ResourceOwner tuple_store_owner; ReturnSetInfo *rsi; + AutonomousSession *autonomous_session; + /* the datums representing the function's local variables */ int found_varno; int ndatums; diff --git a/src/pl/plpgsql/src/sql/plpgsql_autonomous.sql b/src/pl/plpgsql/src/sql/plpgsql_autonomous.sql new file mode 100644 index 0000000..35e7d15 --- /dev/null +++ b/src/pl/plpgsql/src/sql/plpgsql_autonomous.sql @@ -0,0 +1,54 @@ +CREATE TABLE test1 (a int); + +CREATE FUNCTION autonomous_test() RETURNS integer +LANGUAGE plpgsql +AS $$ +DECLARE + PRAGMA AUTONOMOUS_TRANSACTION; +BEGIN + FOR i IN 0..9 LOOP + START TRANSACTION; + EXECUTE 'INSERT INTO test1 VALUES (' || i::text || ')'; + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + + RETURN 42; +END; +$$; + + +SELECT autonomous_test(); + +SELECT * FROM test1; + +TRUNCATE test1; + + +CREATE FUNCTION autonomous_test2() RETURNS integer +LANGUAGE plpgsql +AS $$ +DECLARE + PRAGMA AUTONOMOUS_TRANSACTION; +BEGIN + FOR i IN 0..9 LOOP + START TRANSACTION; + INSERT INTO test1 VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + + RETURN 42; +END; +$$; + + +SELECT autonomous_test2(); + +SELECT * FROM test1; diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile index 647b4b1..2617bae 100644 --- a/src/pl/plpython/Makefile +++ b/src/pl/plpython/Makefile @@ -20,6 +20,7 @@ PGFILEDESC = "PL/Python - procedural language" NAME = plpython$(python_majorversion) OBJS = \ + plpy_autonomousobject.o \ plpy_cursorobject.o \ plpy_elog.o \ plpy_exec.o \ @@ -89,6 +90,7 @@ REGRESS = \ plpython_quote \ plpython_composite \ plpython_subtransaction \ + plpython_autonomous \ plpython_drop REGRESS_PLPYTHON3_MANGLE := $(REGRESS) diff --git a/src/pl/plpython/expected/plpython_autonomous.out b/src/pl/plpython/expected/plpython_autonomous.out new file mode 100644 index 0000000..7c23720 --- /dev/null +++ b/src/pl/plpython/expected/plpython_autonomous.out @@ -0,0 +1,172 @@ +CREATE TABLE test1 (a int, b text); +CREATE FUNCTION autonomous_test() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + for i in range(0, 10): + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + a.execute("COMMIT") + else: + a.execute("ROLLBACK") + +return 42 +$$; +SELECT autonomous_test(); + autonomous_test +----------------- + 42 +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +CREATE FUNCTION autonomous_test2() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (11)") + rv = a.execute("SELECT * FROM test1") + plpy.info(rv) + a.execute("ROLLBACK") + +return 42 +$$; +SELECT autonomous_test2(); +INFO: + autonomous_test2 +------------------ + 42 +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +CREATE FUNCTION autonomous_test3() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$") + a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$") + +return 42 +$$; +SELECT autonomous_test3(); +NOTICE: notice +ERROR: error +CONTEXT: PL/pgSQL function inline_code_block line 1 at RAISE +PL/Python function "autonomous_test3" +CREATE FUNCTION autonomous_test4() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("SET client_encoding TO SJIS") + +return 42 +$$; +SELECT autonomous_test4(); +ERROR: cannot set client encoding in autonomous session +CONTEXT: PL/Python function "autonomous_test4" +TRUNCATE test1; +CREATE FUNCTION autonomous_test5() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"]) + a.execute_prepared(plan, [1, "one"]) + a.execute_prepared(plan, [2, "two"]) + +return 42 +$$; +SELECT autonomous_test5(); + autonomous_test5 +------------------ + 42 +(1 row) + +SELECT * FROM test1; + a | b +---+----- + 1 | one + 2 | two +(2 rows) + +TRUNCATE test1; +CREATE FUNCTION autonomous_test6() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + plan = a.prepare("INSERT INTO test1 (a) VALUES (i)", {"i": "int4"}) + a.execute_prepared(plan, [1]) + a.execute_prepared(plan, [2]) + +return 42 +$$; +SELECT autonomous_test6(); + autonomous_test6 +------------------ + 42 +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 1 | + 2 | +(2 rows) + +TRUNCATE test1; +CREATE FUNCTION autonomous_test7() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("BEGIN") + plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"]) + a.execute_prepared(plan, [11]) + plan = a.prepare("SELECT * FROM test1") + rv = a.execute_prepared(plan, []) + plpy.info(rv) + a.execute("ROLLBACK") + +return 42 +$$; +SELECT autonomous_test7(); +INFO: + autonomous_test7 +------------------ + 42 +(1 row) + +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +CREATE FUNCTION autonomous_test8() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("BEGIN") + +return 42 +$$; +SELECT autonomous_test8(); +ERROR: autonomous session ended with transaction block open +CONTEXT: PL/Python function "autonomous_test8" +DROP TABLE test1; diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index adb82a8..fa89c60 100644 --- a/src/pl/plpython/expected/plpython_test.out +++ b/src/pl/plpython/expected/plpython_test.out @@ -43,9 +43,9 @@ contents.sort() return ", ".join(contents) $$ LANGUAGE plpythonu; select module_contents(); - module_contents ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Error, Fatal, SPIError, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning + module_contents +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Error, Fatal, SPIError, autonomous, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning (1 row) CREATE FUNCTION elog_test_basic() RETURNS void diff --git a/src/pl/plpython/plpy_autonomousobject.c b/src/pl/plpython/plpy_autonomousobject.c new file mode 100644 index 0000000..2125452 --- /dev/null +++ b/src/pl/plpython/plpy_autonomousobject.c @@ -0,0 +1,459 @@ +/* + * the PLyAutonomous class + * + * src/pl/plpython/plpy_autonomousobject.c + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "executor/spi.h" +#include "parser/parse_type.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + +#include "plpython.h" + +#include "plpy_autonomousobject.h" + +#include "plpy_elog.h" +#include "plpy_main.h" +#include "plpy_planobject.h" +#include "plpy_spi.h" + + +static void PLy_autonomous_dealloc(PyObject *subxact); +static PyObject *PLy_autonomous_enter(PyObject *self, PyObject *unused); +static PyObject *PLy_autonomous_exit(PyObject *self, PyObject *args); +static PyObject *PLy_autonomous_execute(PyObject *self, PyObject *args); +static PyObject *PLy_autonomous_prepare(PyObject *self, PyObject *args); +static PyObject *PLy_autonomous_execute_prepared(PyObject *self, PyObject *args); + +static char PLy_autonomous_doc[] = { + "PostgreSQL autonomous session context manager" +}; + +static PyMethodDef PLy_autonomous_methods[] = { + {"__enter__", PLy_autonomous_enter, METH_VARARGS, NULL}, + {"__exit__", PLy_autonomous_exit, METH_VARARGS, NULL}, + /* user-friendly names for Python <2.6 */ + {"enter", PLy_autonomous_enter, METH_VARARGS, NULL}, + {"exit", PLy_autonomous_exit, METH_VARARGS, NULL}, + {"execute", PLy_autonomous_execute, METH_VARARGS, NULL}, + {"prepare", PLy_autonomous_prepare, METH_VARARGS, NULL}, + {"execute_prepared", PLy_autonomous_execute_prepared, METH_VARARGS, NULL}, + {NULL, NULL, 0, NULL} +}; + +static PyTypeObject PLy_AutonomousType = { + PyVarObject_HEAD_INIT(NULL, 0) + "PLyAutonomous", /* tp_name */ + sizeof(PLyAutonomousObject), /* tp_size */ + 0, /* tp_itemsize */ + + /* + * methods + */ + PLy_autonomous_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */ + PLy_autonomous_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + PLy_autonomous_methods, /* tp_tpmethods */ +}; + + +void +PLy_autonomous_init_type(void) +{ + if (PyType_Ready(&PLy_AutonomousType) < 0) + elog(ERROR, "could not initialize PLy_AutonomousType"); +} + +/* s = plpy.autonomous() */ +PyObject * +PLy_autonomous_new(PyObject *self, PyObject *unused) +{ + PLyAutonomousObject *ob; + + ob = PyObject_New(PLyAutonomousObject, &PLy_AutonomousType); + + if (ob == NULL) + return NULL; + + ob->started = false; + ob->exited = false; + + return (PyObject *) ob; +} + +/* Python requires a dealloc function to be defined */ +static void +PLy_autonomous_dealloc(PyObject *auton) +{ +} + +/* + * autonomous.__enter__() or autonomous.enter() + */ +static PyObject * +PLy_autonomous_enter(PyObject *self, PyObject *unused) +{ + PLyAutonomousObject *auton = (PLyAutonomousObject *) self; + + if (auton->started) + { + PLy_exception_set(PyExc_ValueError, "this autonomous session has already been entered"); + return NULL; + } + + if (auton->exited) + { + PLy_exception_set(PyExc_ValueError, "this autonomous session has already been exited"); + return NULL; + } + + auton->started = true; + auton->asession = AutonomousSessionStart(); + + Py_INCREF(self); + return self; +} + +/* + * autonomous.__exit__(exc_type, exc, tb) or autonomous.exit(exc_type, exc, tb) + * + * Exit an explicit subtransaction. exc_type is an exception type, exc + * is the exception object, tb is the traceback. + * + * The method signature is chosen to allow subtransaction objects to + * be used as context managers as described in + * . + */ +static PyObject * +PLy_autonomous_exit(PyObject *self, PyObject *args) +{ + PyObject *type; + PyObject *value; + PyObject *traceback; + PLyAutonomousObject *auton = (PLyAutonomousObject *) self; + + if (!PyArg_ParseTuple(args, "OOO", &type, &value, &traceback)) + return NULL; + + if (!auton->started) + { + PLy_exception_set(PyExc_ValueError, "this autonomous session has not been entered"); + return NULL; + } + + if (auton->exited) + { + PLy_exception_set(PyExc_ValueError, "this autonomous session has already been exited"); + return NULL; + } + + auton->exited = true; + AutonomousSessionEnd(auton->asession); + + Py_INCREF(Py_None); + return Py_None; +} + +static PyObject * +PLy_autonomous_execute(PyObject *self, PyObject *args) +{ + PLyAutonomousObject *auton = (PLyAutonomousObject *) self; + char *query; + + if (PyArg_ParseTuple(args, "s", &query)) + { + AutonomousResult *result; + HeapTuple *tuples; + ListCell *lc; + int i; + SPITupleTable faketupletable; + + result = AutonomousSessionExecute(auton->asession, query); + if (result->tupdesc) + { + tuples = palloc(list_length(result->tuples) * sizeof(*tuples)); + i = 0; + foreach (lc, result->tuples) + { + HeapTuple tuple = (HeapTuple) lfirst(lc); + tuples[i++] = tuple; + } + faketupletable.tupdesc = result->tupdesc; + faketupletable.vals = tuples; + return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT); + } + else + return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY); + } + else + PLy_exception_set(PLy_exc_error, "autonomous execute expected a query"); + return NULL; +} + +// XXX lots of overlap with PLy_spi_prepare +static PyObject * +PLy_autonomous_prepare(PyObject *self, PyObject *args) +{ + PLyAutonomousObject *auton = (PLyAutonomousObject *) self; + char *query; + PyObject *paraminfo = NULL; + AutonomousPreparedStatement *astmt; + int nargs = 0; + const char **argnames = NULL; + PLyPlanObject *plan; + PyObject *volatile optr = NULL; + volatile MemoryContext oldcontext; + int i; + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + PyObject *keys; + + if (!PyArg_ParseTuple(args, "s|O:prepare", &query, ¶minfo)) + return NULL; + + if (paraminfo && + !PySequence_Check(paraminfo) && !PyMapping_Check(paraminfo)) + { + PLy_exception_set(PyExc_TypeError, + "second argument of prepare must be a sequence or mapping"); + return NULL; + } + + if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL) + return NULL; + + plan->mcxt = AllocSetContextCreate(TopMemoryContext, + "PL/Python autonomous plan context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + oldcontext = MemoryContextSwitchTo(plan->mcxt); + + if (!paraminfo) + nargs = 0; + else if (PySequence_Check(paraminfo)) + nargs = PySequence_Length(paraminfo); + else + nargs = PyMapping_Length(paraminfo); + + plan->nargs = nargs; + plan->types = nargs ? palloc(sizeof(Oid) * nargs) : NULL; + plan->values = nargs ? palloc(sizeof(Datum) * nargs) : NULL; + plan->args = nargs ? palloc(sizeof(PLyTypeInfo) * nargs) : NULL; + + MemoryContextSwitchTo(oldcontext); + + if (PyMapping_Check(paraminfo)) + { + argnames = palloc(nargs * sizeof(char *)); + keys = PyMapping_Keys(paraminfo); + } + else + { + argnames = NULL; + keys = NULL; + } + + for (i = 0; i < nargs; i++) + { + PLy_typeinfo_init(&plan->args[i], plan->mcxt); + plan->values[i] = PointerGetDatum(NULL); + } + + for (i = 0; i < nargs; i++) + { + char *sptr; + HeapTuple typeTup; + Oid typeId; + int32 typmod; + + if (keys) + { + PyObject *key; + char *keystr; + + key = PySequence_GetItem(keys, i); + argnames[i] = keystr = PyString_AsString(key); + optr = PyMapping_GetItemString(paraminfo, keystr); + Py_DECREF(key); + } + else + optr = PySequence_GetItem(paraminfo, i); + + if (PyString_Check(optr)) + sptr = PyString_AsString(optr); + else if (PyUnicode_Check(optr)) + sptr = PLyUnicode_AsString(optr); + else + { + ereport(ERROR, + (errmsg("autonomous prepare: type name at ordinal position %d is not a string", i))); + sptr = NULL; /* keep compiler quiet */ + } + + /******************************************************** + * Resolve argument type names and then look them up by + * oid in the system cache, and remember the required + *information for input conversion. + ********************************************************/ + + parseTypeString(sptr, &typeId, &typmod, false); + + typeTup = SearchSysCache1(TYPEOID, + ObjectIdGetDatum(typeId)); + if (!HeapTupleIsValid(typeTup)) + elog(ERROR, "cache lookup failed for type %u", typeId); + + Py_DECREF(optr); + + /* + * set optr to NULL, so we won't try to unref it again in case of + * an error + */ + optr = NULL; + + plan->types[i] = typeId; + PLy_output_datum_func(&plan->args[i], typeTup, exec_ctx->curr_proc->langid, exec_ctx->curr_proc->trftypes); + ReleaseSysCache(typeTup); + } + + astmt = AutonomousSessionPrepare(auton->asession, query, nargs, plan->types, argnames); + + plan->astmt = astmt; + + return (PyObject *) plan; +} + +static PyObject * +PLy_autonomous_execute_prepared(PyObject *self, PyObject *args) +{ + PLyAutonomousObject *auton pg_attribute_unused() = (PLyAutonomousObject *) self; + PyObject *ob; + PLyPlanObject *plan; + PyObject *list = NULL; + int nargs; + bool *nulls; + AutonomousResult *result; + HeapTuple *tuples; + ListCell *lc; + int i; + SPITupleTable faketupletable; + + if (!PyArg_ParseTuple(args, "O|O:execute_prepared", &ob, &list)) + return NULL; + + if (!is_PLyPlanObject(ob)) + { + PLy_exception_set(PyExc_TypeError, + "first argument of execute_prepared must be a plan"); + return NULL; + } + + plan = (PLyPlanObject *) ob; + + if (list && (!PySequence_Check(list))) + { + PLy_exception_set(PyExc_TypeError, + "second argument of execute_prepared must be a sequence"); + return NULL; + } + + nargs = list ? PySequence_Length(list) : 0; + + if (nargs != plan->nargs) + { + char *sv; + PyObject *so = PyObject_Str(list); + + if (!so) + PLy_elog(ERROR, "could not execute plan"); + sv = PyString_AsString(so); + PLy_exception_set_plural(PyExc_TypeError, + "Expected sequence of %d argument, got %d: %s", + "Expected sequence of %d arguments, got %d: %s", + plan->nargs, + plan->nargs, nargs, sv); + Py_DECREF(so); + + return NULL; + } + + nulls = palloc(nargs * sizeof(*nulls)); + + for (i = 0; i < nargs; i++) + { + PyObject *elem; + + elem = PySequence_GetItem(list, i); + if (elem != Py_None) + { + PG_TRY(); + { + plan->values[i] = + plan->args[i].out.d.func(&(plan->args[i].out.d), + -1, + elem); + } + PG_CATCH(); + { + Py_DECREF(elem); + PG_RE_THROW(); + } + PG_END_TRY(); + + Py_DECREF(elem); + nulls[i] = false; + } + else + { + Py_DECREF(elem); + plan->values[i] = + InputFunctionCall(&(plan->args[i].out.d.typfunc), + NULL, + plan->args[i].out.d.typioparam, + -1); + nulls[i] = true; + } + } + + result = AutonomousSessionExecutePrepared(plan->astmt, nargs, plan->values, nulls); + if (result->tupdesc) + { + tuples = palloc(list_length(result->tuples) * sizeof(*tuples)); + i = 0; + foreach (lc, result->tuples) + { + HeapTuple tuple = (HeapTuple) lfirst(lc); + tuples[i++] = tuple; + } + faketupletable.tupdesc = result->tupdesc; + faketupletable.vals = tuples; + return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT); + } + else + return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY); +} diff --git a/src/pl/plpython/plpy_autonomousobject.h b/src/pl/plpython/plpy_autonomousobject.h new file mode 100644 index 0000000..f5fdaff --- /dev/null +++ b/src/pl/plpython/plpy_autonomousobject.h @@ -0,0 +1,21 @@ +/* + * src/pl/plpython/plpy_autonomousobject.h + */ + +#ifndef PLPY_AUTONOMOUSOBJECT +#define PLPY_AUTONOMOUSOBJECT + +#include "tcop/autonomous.h" + +typedef struct PLyAutonomousObject +{ + PyObject_HEAD + bool started; + bool exited; + AutonomousSession *asession; +} PLyAutonomousObject; + +extern void PLy_autonomous_init_type(void); +extern PyObject *PLy_autonomous_new(PyObject *self, PyObject *unused); + +#endif /* PLPY_AUTONOMOUSOBJECT */ diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h index 10426c4..690506a 100644 --- a/src/pl/plpython/plpy_main.h +++ b/src/pl/plpython/plpy_main.h @@ -7,6 +7,8 @@ #include "plpy_procedure.h" +#include "tcop/autonomous.h" + /* the interpreter's globals dict */ extern PyObject *PLy_interp_globals; @@ -19,6 +21,7 @@ typedef struct PLyExecutionContext { PLyProcedure *curr_proc; /* the currently executing procedure */ MemoryContext scratch_ctx; /* a context for things like type I/O */ + AutonomousSession *asession; struct PLyExecutionContext *next; /* previous stack level */ } PLyExecutionContext; diff --git a/src/pl/plpython/plpy_planobject.c b/src/pl/plpython/plpy_planobject.c index a9040ef..bd44245 100644 --- a/src/pl/plpython/plpy_planobject.c +++ b/src/pl/plpython/plpy_planobject.c @@ -77,6 +77,7 @@ PLy_plan_new(void) return NULL; ob->plan = NULL; + ob->astmt = NULL; ob->nargs = 0; ob->types = NULL; ob->values = NULL; diff --git a/src/pl/plpython/plpy_planobject.h b/src/pl/plpython/plpy_planobject.h index c675592..934aa3c 100644 --- a/src/pl/plpython/plpy_planobject.h +++ b/src/pl/plpython/plpy_planobject.h @@ -6,6 +6,7 @@ #define PLPY_PLANOBJECT_H #include "executor/spi.h" +#include "tcop/autonomous.h" #include "plpy_typeio.h" @@ -13,6 +14,7 @@ typedef struct PLyPlanObject { PyObject_HEAD SPIPlanPtr plan; + AutonomousPreparedStatement *astmt; int nargs; Oid *types; Datum *values; diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c index f520e77..c29acf1 100644 --- a/src/pl/plpython/plpy_plpymodule.c +++ b/src/pl/plpython/plpy_plpymodule.c @@ -13,6 +13,7 @@ #include "plpy_plpymodule.h" +#include "plpy_autonomousobject.h" #include "plpy_cursorobject.h" #include "plpy_elog.h" #include "plpy_planobject.h" @@ -88,6 +89,11 @@ static PyMethodDef PLy_methods[] = { {"subtransaction", PLy_subtransaction_new, METH_NOARGS, NULL}, /* + * create autonomous session context manager + */ + {"autonomous", PLy_autonomous_new, METH_NOARGS, NULL}, + + /* * create a cursor */ {"cursor", PLy_cursor, METH_VARARGS, NULL}, @@ -156,6 +162,7 @@ PLy_init_plpy(void) PLy_plan_init_type(); PLy_result_init_type(); PLy_subtransaction_init_type(); + PLy_autonomous_init_type(); PLy_cursor_init_type(); #if PY_MAJOR_VERSION >= 3 diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c index 09ee06d..995166e 100644 --- a/src/pl/plpython/plpy_spi.c +++ b/src/pl/plpython/plpy_spi.c @@ -31,8 +31,6 @@ static PyObject *PLy_spi_execute_query(char *query, long limit); static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit); -static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, - uint64 rows, int status); static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata); @@ -291,6 +289,7 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) rv = SPI_execute_plan(plan->plan, plan->values, nulls, exec_ctx->curr_proc->fn_readonly, limit); ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); + SPI_freetuptable(SPI_tuptable); if (nargs > 0) pfree(nulls); @@ -360,6 +359,7 @@ PLy_spi_execute_query(char *query, long limit) pg_verifymbstr(query, strlen(query), false); rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit); ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); + SPI_freetuptable(SPI_tuptable); PLy_spi_subtransaction_commit(oldcontext, oldowner); } @@ -382,7 +382,7 @@ PLy_spi_execute_query(char *query, long limit) return ret; } -static PyObject * +PyObject * PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) { PLyResultObject *result; @@ -469,7 +469,6 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) PG_END_TRY(); MemoryContextDelete(cxt); - SPI_freetuptable(tuptable); } return (PyObject *) result; diff --git a/src/pl/plpython/plpy_spi.h b/src/pl/plpython/plpy_spi.h index b042794..9ed37e5 100644 --- a/src/pl/plpython/plpy_spi.h +++ b/src/pl/plpython/plpy_spi.h @@ -5,12 +5,15 @@ #ifndef PLPY_SPI_H #define PLPY_SPI_H +#include "executor/spi.h" #include "utils/palloc.h" #include "utils/resowner.h" extern PyObject *PLy_spi_prepare(PyObject *self, PyObject *args); extern PyObject *PLy_spi_execute(PyObject *self, PyObject *args); +extern PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status); + typedef struct PLyExceptionEntry { int sqlstate; /* hash key, must be first */ diff --git a/src/pl/plpython/sql/plpython_autonomous.sql b/src/pl/plpython/sql/plpython_autonomous.sql new file mode 100644 index 0000000..d7bec05 --- /dev/null +++ b/src/pl/plpython/sql/plpython_autonomous.sql @@ -0,0 +1,136 @@ +CREATE TABLE test1 (a int, b text); + +CREATE FUNCTION autonomous_test() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + for i in range(0, 10): + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + a.execute("COMMIT") + else: + a.execute("ROLLBACK") + +return 42 +$$; + +SELECT autonomous_test(); + +SELECT * FROM test1; + + +CREATE FUNCTION autonomous_test2() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (11)") + rv = a.execute("SELECT * FROM test1") + plpy.info(rv) + a.execute("ROLLBACK") + +return 42 +$$; + +SELECT autonomous_test2(); + +SELECT * FROM test1; + + +CREATE FUNCTION autonomous_test3() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$") + a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$") + +return 42 +$$; + +SELECT autonomous_test3(); + + +CREATE FUNCTION autonomous_test4() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("SET client_encoding TO SJIS") + +return 42 +$$; + +SELECT autonomous_test4(); + + +TRUNCATE test1; + +CREATE FUNCTION autonomous_test5() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"]) + a.execute_prepared(plan, [1, "one"]) + a.execute_prepared(plan, [2, "two"]) + +return 42 +$$; + +SELECT autonomous_test5(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +CREATE FUNCTION autonomous_test6() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + plan = a.prepare("INSERT INTO test1 (a) VALUES (i)", {"i": "int4"}) + a.execute_prepared(plan, [1]) + a.execute_prepared(plan, [2]) + +return 42 +$$; + +SELECT autonomous_test6(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +CREATE FUNCTION autonomous_test7() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("BEGIN") + plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"]) + a.execute_prepared(plan, [11]) + plan = a.prepare("SELECT * FROM test1") + rv = a.execute_prepared(plan, []) + plpy.info(rv) + a.execute("ROLLBACK") + +return 42 +$$; + +SELECT autonomous_test7(); + +SELECT * FROM test1; + + +CREATE FUNCTION autonomous_test8() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.autonomous() as a: + a.execute("BEGIN") + +return 42 +$$; + +SELECT autonomous_test8(); + + +DROP TABLE test1;