From f6e875650d479fbee6c01e3fd2c25012a5773232 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 2 Nov 2020 14:32:10 +0900 Subject: [PATCH v32 09/11] postgres_fdw marks foreign transaction as modified on modification. This commit enables postgres_fdw to execute two-phase commit protocol on transaction commit (without explicitly executing PREPARE TRANSACTION). Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- contrib/postgres_fdw/connection.c | 19 ++++++++++++++++++- contrib/postgres_fdw/postgres_fdw.c | 2 ++ contrib/postgres_fdw/postgres_fdw.h | 1 + 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 3c22060f27..a17c934006 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -58,6 +58,7 @@ typedef struct ConnCacheEntry bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ bool invalidated; /* true if reconnect is pending */ + bool modified; /* true if data on the foreign server is modified */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ } ConnCacheEntry; @@ -285,6 +286,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->have_error = false; entry->changing_xact_state = false; entry->invalidated = false; + entry->modified = false; entry->server_hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID, ObjectIdGetDatum(server->serverid)); @@ -299,6 +301,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->conn, server->servername, user->umid, user->userid); } +void +MarkConnectionModified(UserMapping *user) +{ + ConnCacheEntry *entry; + + entry = GetConnectionCacheEntry(user->umid); + + if (entry && !entry->modified) + { + FdwXactRegisterXact(user->serverid, user->userid, true); + entry->modified = true; + } +} + /* * Connect to remote server using specified server and user mapping properties. */ @@ -570,7 +586,7 @@ begin_remote_xact(ConnCacheEntry *entry, UserMapping *user) entry->conn); /* Register the foreign server to the transaction */ - FdwXactRegisterXact(user->serverid, user->userid); + FdwXactRegisterXact(user->serverid, user->userid, false); if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; @@ -579,6 +595,7 @@ begin_remote_xact(ConnCacheEntry *entry, UserMapping *user) entry->changing_xact_state = true; do_sql_command(entry->conn, sql); entry->xact_depth = 1; + entry->modified = false; entry->changing_xact_state = false; } diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 8162e0ace7..71f3d91695 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2380,6 +2380,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * establish new connection if necessary. */ dmstate->conn = GetConnection(user, false); + MarkConnectionModified(user); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -3565,6 +3566,7 @@ create_foreign_modify(EState *estate, /* Open connection; report that we'll create a prepared statement. */ fmstate->conn = GetConnection(user, true); + MarkConnectionModified(user); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 788605cfc2..144fe5cd16 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -132,6 +132,7 @@ extern void reset_transmission_modes(int nestlevel); /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); extern void ReleaseConnection(PGconn *conn); +extern void MarkConnectionModified(UserMapping *user); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); -- 2.27.0