From 5bc6936e96e71cd2e971e889f21b448b0e1f46a2 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Mon, 14 Sep 2020 16:11:02 +0530 Subject: [PATCH v2] Fix initialization of RelationSyncEntry for streaming transactions. In commit 464824323e, for each RelationSyncEntry we maintained the list of xids (streamed_txns) for which we have already sent the schema. This helps us to track when to send the schema to the downstream node for replication of streaming transactions. Before this list got initialized, we were processing invalidation messages which access this list and led to an assertion failure. In passing, clean up the nearby code: * Initialize the list of xids with NIL instead of NULL which is our usual coding practice. * Remove the MemoryContext switch for creating a RelationSyncEntry in dynahash. Diagnosed-by: Amit Kapila and Tom Lane Author: Amit Kapila Reviewed-by: Tom Lane and Dilip Kumar Discussion: https://postgr.es/m/904373.1600033123@sss.pgh.pa.us --- src/backend/replication/pgoutput/pgoutput.c | 29 +++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index c29c088813..e5922f8e30 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -945,16 +945,26 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Assert(RelationSyncCache != NULL); - /* Find cached function info, creating if not found */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); + /* Find cached relation info, creating if not found */ entry = (RelationSyncEntry *) hash_search(RelationSyncCache, (void *) &relid, HASH_ENTER, &found); - MemoryContextSwitchTo(oldctx); Assert(entry != NULL); /* Not found means schema wasn't sent */ - if (!found || !entry->replicate_valid) + if (!found) + { + /* immediately make a new entry valid enough to satisfy callbacks */ + entry->schema_sent = false; + entry->streamed_txns = NIL; + entry->replicate_valid = false; + entry->pubactions.pubinsert = entry->pubactions.pubupdate = + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->publish_as_relid = InvalidOid; + } + + /* Validate the entry */ + if (!entry->replicate_valid) { List *pubids = GetRelationPublications(relid); ListCell *lc; @@ -977,9 +987,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) * relcache considers all publications given relation is in, but here * we only need to consider ones that the subscriber requested. */ - entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; - foreach(lc, data->publications) { Publication *pub = lfirst(lc); @@ -1054,12 +1061,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->replicate_valid = true; } - if (!found) - { - entry->schema_sent = false; - entry->streamed_txns = NULL; - } - return entry; } @@ -1145,7 +1146,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) { entry->schema_sent = false; list_free(entry->streamed_txns); - entry->streamed_txns = NULL; + entry->streamed_txns = NIL; } } -- 2.28.0.windows.1