diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index f4e5ea8..44b8fe5 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -33,6 +33,7 @@ #include "catalog/namespace.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/subscriptioncmds.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/spi.h" @@ -2128,6 +2129,7 @@ CommitTransaction(void) AtEOXact_HashTables(true); AtEOXact_PgStat(true); AtEOXact_Snapshot(true, false); + AtEOXact_Subscription(); AtEOXact_ApplyLauncher(true); pgstat_report_xact_timestamp(0); @@ -2607,6 +2609,7 @@ AbortTransaction(void) AtEOXact_ComboCid(); AtEOXact_HashTables(false); AtEOXact_PgStat(false); + AtEOXact_Subscription(); AtEOXact_ApplyLauncher(false); pgstat_report_xact_timestamp(0); } @@ -4534,6 +4537,7 @@ StartSubTransaction(void) AtSubStart_ResourceOwner(); AtSubStart_Notify(); AfterTriggerBeginSubXact(); + AtSubStart_ApplyLauncher(); s->state = TRANS_INPROGRESS; @@ -4637,6 +4641,7 @@ CommitSubTransaction(void) AtEOSubXact_HashTables(true, s->nestingLevel); AtEOSubXact_PgStat(true, s->nestingLevel); AtSubCommit_Snapshot(s->nestingLevel); + AtEOSubXact_ApplyLauncher(true, s->nestingLevel); /* * We need to restore the upper transaction's read-only state, in case the @@ -4790,6 +4795,7 @@ AbortSubTransaction(void) AtEOSubXact_HashTables(false, s->nestingLevel); AtEOSubXact_PgStat(false, s->nestingLevel); AtSubAbort_Snapshot(s->nestingLevel); + AtEOSubXact_ApplyLauncher(false, s->nestingLevel); } /* diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f138e61..4f2c930 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -51,7 +51,29 @@ #include "utils/memutils.h" #include "utils/syscache.h" + +/* + * List of subscriptions, each containing the relations for that subscription. + * Each element has the relids for a given subscription that were present at + * the last COMMIT. For a subid, there exists an entry in this list only when + * the subscription relations are altered. Once the transaction ends, this list + * is again set back to NIL. This is done so that during commit, we know + * exactly which workers to stop: the relations for the last altered + * subscription should be compared with the relations for the last committed + * subscription changes. + */ +static List *committed_subrel_list = NIL; + +typedef struct SubscriptionRels +{ + Oid subid; + int numrels; + Oid *relids; +} SubscriptionRels; + static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static SubscriptionRels *get_subrels(Oid sub_oid, + SubscriptionRels **commited_subrels); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -504,9 +526,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) { char *err; List *pubrel_names; - List *subrel_states; - Oid *subrel_local_oids; + SubscriptionRels *subrels; + SubscriptionRels *committed_subrels; Oid *pubrel_local_oids; + List *stop_relids = NIL; ListCell *lc; int off; @@ -525,24 +548,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) /* We are done with the remote side, close connection. */ walrcv_disconnect(wrconn); - /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid); - - /* - * Build qsorted array of local table oids for faster lookup. This can - * potentially contain all tables in the database so speed of lookup is - * important. - */ - subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); - off = 0; - foreach(lc, subrel_states) - { - SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); - - subrel_local_oids[off++] = relstate->relid; - } - qsort(subrel_local_oids, list_length(subrel_states), - sizeof(Oid), oid_cmp); + subrels = get_subrels(sub->oid, &committed_subrels); /* * Walk over the remote tables and try to match them to locally known @@ -566,8 +572,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) pubrel_local_oids[off++] = relid; - if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + if (!bsearch(&relid, subrels->relids, + subrels->numrels, sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, @@ -585,16 +591,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) qsort(pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp); - for (off = 0; off < list_length(subrel_states); off++) + for (off = 0; off < subrels->numrels; off++) { - Oid relid = subrel_local_oids[off]; + Oid relid = subrels->relids[off]; if (!bsearch(&relid, pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp)) { RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop_at_commit(sub->oid, relid); + /* If these are the committed subrels, build the "stop" list right away */ + if (subrels == committed_subrels) + stop_relids = lappend_oid(stop_relids, relid); ereport(DEBUG1, (errmsg("table \"%s.%s\" removed from subscription \"%s\"", @@ -603,6 +611,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) sub->name))); } } + + /* + * Now derive the workers to be stopped using the committed reloids. At + * commit time, we will terminate them. + */ + if (subrels != committed_subrels) + { + for (off = 0; off < committed_subrels->numrels; off++) + { + Oid relid = committed_subrels->relids[off]; + + if (!bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + stop_relids = lappend_oid(stop_relids, relid); + } + } + + logicalrep_insert_stop_workers(sub->oid, stop_relids); } /* @@ -1172,3 +1198,88 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } + +/* + * Get the relation oids for a given subscription. Also update committed_subrels + * parameter with the rel oids for the relations that were present in the last + * committed change in the subscription. + */ +static SubscriptionRels * +get_subrels(Oid sub_oid, SubscriptionRels **commited_subrels) +{ + ListCell *lc; + int off; + List *subrel_states; + Oid *subrel_local_oids; + SubscriptionRels *subrels = NULL; + MemoryContext old_context = CurrentMemoryContext; + + /* Ger the committed subrels for the given subscription */ + foreach(lc, committed_subrel_list) + { + SubscriptionRels *subrel = (SubscriptionRels *) lfirst(lc); + + if (sub_oid == subrel->subid) + break; + } + + /* + * If we found a committed entry, that means an earlier ALTER-SUBSCRIPTION + * has generated this entry. + */ + if (lc != NULL) + *commited_subrels = (SubscriptionRels *) lfirst(lc); + + subrel_states = GetSubscriptionRelations(sub_oid); + + /* + * If we are creating this list for the first time in this transaction, we + * need to maintain this list until transaction end. + */ + if (lc == NULL) + old_context = MemoryContextSwitchTo(TopTransactionContext); + + /* + * Build qsorted array of local table oids for faster lookup. This can + * potentially contain all tables in the database so speed of lookup is + * important. + */ + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp); + + subrels = palloc(sizeof(SubscriptionRels)); + subrels->subid = sub_oid; + subrels->relids = subrel_local_oids; + + /* + * If there isn't already a subrel for this subscription saved, save this + * one into the committed_subrel_list. And also pass this as the committed + * rel oids. + */ + if (lc == NULL) + { + committed_subrel_list = lappend(committed_subrel_list, subrels); + *commited_subrels = subrels; + } + + MemoryContextSwitchTo(old_context); + + return subrels; +} + +void +AtEOXact_Subscription(void) +{ + /* + * No need to pfree the list. In fact, it must have been already + * freed because it was allocated in TopTransactionContext. + */ + committed_subrel_list = NIL; +} diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 6ef333b..6f462ee 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -76,11 +76,32 @@ LogicalRepCtxStruct *LogicalRepCtx; typedef struct LogicalRepWorkerId { Oid subid; - Oid relid; + int numrels; + List *relids; } LogicalRepWorkerId; +typedef struct SubTransOnCommitStopWorkers +{ + struct SubTransOnCommitStopWorkers *parent; /* This might not be an + * immediate parent */ + int nest_level; + + /* List of subscriptions for current subtransaction nest level */ + List *sub; +} SubTransOnCommitStopWorkers; + +/* + * List of LogicalRepWorkerId elements. This list belongs to current + * subtransaction level + */ static List *on_commit_stop_workers = NIL; +/* + * Stack of subscription lists. Each stack element belongs to one particular + * subtransaction. + */ +static SubTransOnCommitStopWorkers *subtrans_stop_workers = NULL; + static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); @@ -553,25 +574,42 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockRelease(LogicalRepWorkerLock); } -/* - * Request worker for specified sub/rel to be stopped on commit. - */ void -logicalrep_worker_stop_at_commit(Oid subid, Oid relid) +logicalrep_insert_stop_workers(Oid subid, List *relids) { - LogicalRepWorkerId *wid; - MemoryContext oldctx; + ListCell *lc; - /* Make sure we store the info in context that survives until commit. */ - oldctx = MemoryContextSwitchTo(TopTransactionContext); + foreach(lc, on_commit_stop_workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + if (wid->subid == subid) + break; + } + + /* Didn't find a sub ? Insert a new one */ + if (lc == NULL) + { + MemoryContext oldctx; + LogicalRepWorkerId *wid; - wid = palloc(sizeof(LogicalRepWorkerId)); - wid->subid = subid; - wid->relid = relid; + /* Make sure we store the info in context that survives until commit. */ + oldctx = MemoryContextSwitchTo(TopTransactionContext); - on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + wid = palloc(sizeof(LogicalRepWorkerId)); + wid->subid = subid; + wid->relids = list_copy(relids); /* TODO: Avoid the copy. */ + lappend(on_commit_stop_workers, wid); + + MemoryContextSwitchTo(oldctx); + } + else + { + /* Replace the existing reloids with the new set */ + LogicalRepWorkerId *wid = lfirst(lc); + list_free(wid->relids); + wid->relids = relids; + } - MemoryContextSwitchTo(oldctx); } /* @@ -827,20 +865,54 @@ XactManipulatesLogicalReplicationWorkers(void) } /* + * AtSubStart_ApplyLauncher() --- Take care of subtransaction start. + * + * Push the current on_commit_stop_workers into the stack. + */ +void +AtSubStart_ApplyLauncher(void) +{ + + if (on_commit_stop_workers != NIL) + { + SubTransOnCommitStopWorkers *temp; + MemoryContext old_cxt; + + /* Keep the stack elements in TopTransactionContext for simplicity */ + old_cxt = MemoryContextSwitchTo(TopTransactionContext); + + temp = palloc(sizeof(SubTransOnCommitStopWorkers)); + temp->parent = subtrans_stop_workers; + temp->nest_level = GetCurrentTransactionNestLevel() - 1; + temp->sub = on_commit_stop_workers; + subtrans_stop_workers = temp; + + on_commit_stop_workers = NIL; + + MemoryContextSwitchTo(old_cxt); + } +} + + +/* * Wakeup the launcher on commit if requested. */ void AtEOXact_ApplyLauncher(bool isCommit) { + Assert(subtrans_stop_workers == NULL); + if (isCommit) { - ListCell *lc; + ListCell *wlc; - foreach(lc, on_commit_stop_workers) + foreach(wlc, on_commit_stop_workers) { - LogicalRepWorkerId *wid = lfirst(lc); + LogicalRepWorkerId *wid = lfirst(wlc); + ListCell *rlc; - logicalrep_worker_stop(wid->subid, wid->relid); + foreach(rlc, wid->relids) + logicalrep_worker_stop(wid->subid, lfirst_oid(rlc)); } if (on_commit_launcher_wakeup) @@ -853,6 +925,116 @@ AtEOXact_ApplyLauncher(bool isCommit) */ on_commit_stop_workers = NIL; on_commit_launcher_wakeup = false; + subtrans_stop_workers = NULL; +} + +/* + * On commit, merge the on_commit_stop_workers list into the immediate parent, + * if present. + * On rollback, discard the on_commit_stop_workers list. + * Pop out the immediate parent stack element, and assign it's workers list + * to the on_commit_stop_workers list. + */ +void +AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth) +{ + + if (isCommit) + { + MemoryContext oldctx; + ListCell *lc; + + /* Make sure we store the info in context that survives until commit. */ + oldctx = MemoryContextSwitchTo(TopTransactionContext); + + /* + * If the upper level is present, and it is not an immediate + * parent subtransaction, we don't have to do anything; the current + * on_commit_stop_workers will be regarded as belonging to the + * immediate parent sub-transaction. But if the upper level is an + * immediate parent subtransaction, we need to merge the current + * on_commit_stop_workers list into the immediate parent, make this + * merged list as the current on_commit_stop_workers list. + */ + if (subtrans_stop_workers != NULL && + subtrans_stop_workers->nest_level == nestDepth -1) + { + List *temp_list = NIL; + + /* + * Merge the current list into the immediate parent. + * So say, parent has sub1(tab1, tab2), sub2(tab2, tab3), + * and current on_commit_workers has sub2(tab4) and sub3(tab1), + * then the merged list will have : + * sub1(tab1, tab2), sub2(tab4), sub3(tab1) + */ + foreach(lc, on_commit_stop_workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + ListCell *lc1; + + /* Search this subrel into the subrels of the top stack element */ + foreach(lc1, subtrans_stop_workers->sub) + { + LogicalRepWorkerId *wid1 = lfirst(lc1); + + if (wid->subid == wid1->subid) + break; + } + + if (lc1 == NULL) + { + /* + * Didn't find a subscription in the stack element. So + * insert it. + */ + temp_list = lappend(temp_list, wid); + } + else + { + /* + * Replace the earlier subrels of this subscription with + * the new subrels. + */ + LogicalRepWorkerId *wid1 = lfirst(lc1); + + list_free(wid1->relids); + pfree(wid1); + lfirst(lc1) = wid; + } + + } + /* Add the new subscriptions that were not present in outer level */ + subtrans_stop_workers->sub = + list_concat(subtrans_stop_workers->sub, temp_list); + } + + MemoryContextSwitchTo(oldctx); + } + else + { + /* Abandon the current subtransaction workers list. */ + list_free(on_commit_stop_workers); + on_commit_stop_workers = NIL; + } + + /* + * This is common for commit and abort. For commit, above we have already + * merged the current list into parent. + */ + if (subtrans_stop_workers != NULL && + subtrans_stop_workers->nest_level == nestDepth -1) + { + SubTransOnCommitStopWorkers *temp; + + /* Make the parent transaction list as the current on_commit_stop_workers. */ + on_commit_stop_workers = subtrans_stop_workers->sub; + + /* Pop out the stack element */ + temp = subtrans_stop_workers->parent; + pfree(subtrans_stop_workers); + subtrans_stop_workers = temp; + } } /* diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 6d70ad7..e14b91e 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -25,5 +25,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel); extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); +extern void AtEOXact_Subscription(void); #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index ef02512..aa02041 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -23,6 +23,8 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); extern bool XactManipulatesLogicalReplicationWorkers(void); +extern void AtSubStart_ApplyLauncher(void); +extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1ce3b6b..1da6d6d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -75,7 +75,7 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running); extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid); extern void logicalrep_worker_stop(Oid subid, Oid relid); -extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid); +extern void logicalrep_insert_stop_workers(Oid subid, List *relids); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);