From db3f5b04d89c30d545512ddceddbca40ea0a2795 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Wed, 4 Nov 2020 17:43:04 +1100 Subject: [PATCH v16] Support 2PC txn - spoolfile. This patch only refactors to isolate the streaming spool-file processing to a separate function. Later, two-phase commit logic will require this common processing to be called from multiple places. --- src/backend/replication/logical/worker.c | 58 +++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0468491..d282336 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -244,6 +244,8 @@ static void apply_handle_tuple_routing(ResultRelInfo *relinfo, LogicalRepRelMapEntry *relmapentry, CmdType operation); +static int apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); + /* * Should this worker apply changes for given relation. * @@ -933,30 +935,21 @@ apply_handle_stream_abort(StringInfo s) } /* - * Handle STREAM COMMIT message. + * Common spoolfile processing. + * Returns how many changes were applied. */ -static void -apply_handle_stream_commit(StringInfo s) +static int +apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) { - TransactionId xid; StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; bool found; - LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; - Assert(!in_streamed_transaction); - - xid = logicalrep_read_stream_commit(s, &commit_data); - - elog(DEBUG1, "received commit for streamed transaction %u", xid); - - ensure_transaction(); - /* * Allocate file handle and memory required to process all the messages in * TopTransactionContext to avoid them getting reset after each message is @@ -964,7 +957,7 @@ apply_handle_stream_commit(StringInfo s) */ oldcxt = MemoryContextSwitchTo(TopTransactionContext); - /* open the spool file for the committed transaction */ + /* open the spool file for the committed/prepared transaction */ changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); ent = (StreamXidHash *) hash_search(xidhash, @@ -979,7 +972,7 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; + remote_final_lsn = lsn; /* * Make sure the handle apply_dispatch methods are aware we're in a remote @@ -1048,6 +1041,35 @@ apply_handle_stream_commit(StringInfo s) BufFileClose(fd); + pfree(buffer); + pfree(s2.data); + + elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", + nchanges, path); + + return nchanges; +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + LogicalRepCommitData commit_data; + int nchanges = 0; + + Assert(!in_streamed_transaction); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + ensure_transaction(); + + nchanges = apply_spooled_messages(xid, commit_data.commit_lsn); + /* * Update origin state so we can restart streaming from correct position * in case of crash. @@ -1055,16 +1077,12 @@ apply_handle_stream_commit(StringInfo s) replorigin_session_origin_lsn = commit_data.end_lsn; replorigin_session_origin_timestamp = commit_data.committime; - pfree(buffer); - pfree(s2.data); - CommitTransactionCommand(); pgstat_report_stat(false); store_flush_position(commit_data.end_lsn); - elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", - nchanges, path); + elog(DEBUG1, "apply_handle_stream_commit: replayed %d (all) changes.", nchanges); in_remote_transaction = false; -- 1.8.3.1