From 4715dd6c444ed5c5021f8330bc151277474397b1 Mon Sep 17 00:00:00 2001 From: amitlan Date: Fri, 27 Mar 2020 20:54:30 +0900 Subject: [PATCH v15 1/3] worker.c: refactor code to look up local tuple --- src/backend/replication/logical/worker.c | 78 +++++++++++++++++--------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fa38117..51c0278 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -122,6 +122,10 @@ static void apply_handle_update_internal(ResultRelInfo *relinfo, static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel); +static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, + LogicalRepRelation *remoterel, + TupleTableSlot *remoteslot, + TupleTableSlot **localslot); /* * Should this worker apply changes for given relation. @@ -788,33 +792,17 @@ apply_handle_update_internal(ResultRelInfo *relinfo, LogicalRepRelMapEntry *relmapentry) { Relation localrel = relinfo->ri_RelationDesc; - Oid idxoid; EPQState epqstate; TupleTableSlot *localslot; bool found; MemoryContext oldctx; - localslot = table_slot_create(localrel, &estate->es_tupleTable); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); - ExecOpenIndices(relinfo, false); - /* - * Try to find tuple using either replica identity index, primary key or - * if needed, sequential scan. - */ - idxoid = GetRelationIdentityOrPK(localrel); - Assert(OidIsValid(idxoid) || - (relmapentry->remoterel.replident == REPLICA_IDENTITY_FULL)); - - if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(localrel, idxoid, - LockTupleExclusive, - remoteslot, localslot); - else - found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, - remoteslot, localslot); - + found = FindReplTupleInLocalRel(estate, localrel, + &relmapentry->remoterel, + remoteslot, &localslot); ExecClearTuple(remoteslot); /* @@ -922,31 +910,15 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, LogicalRepRelation *remoterel) { Relation localrel = relinfo->ri_RelationDesc; - Oid idxoid; EPQState epqstate; TupleTableSlot *localslot; bool found; - localslot = table_slot_create(localrel, &estate->es_tupleTable); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); - ExecOpenIndices(relinfo, false); - /* - * Try to find tuple using either replica identity index, primary key or - * if needed, sequential scan. - */ - idxoid = GetRelationIdentityOrPK(localrel); - Assert(OidIsValid(idxoid) || - (remoterel->replident == REPLICA_IDENTITY_FULL)); - - if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(localrel, idxoid, - LockTupleExclusive, - remoteslot, localslot); - else - found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, - remoteslot, localslot); + found = FindReplTupleInLocalRel(estate, localrel, remoterel, + remoteslot, &localslot); /* If found delete it. */ if (found) @@ -971,6 +943,38 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, } /* + * Try to find a tuple received from the publication side (one in 'remoteslot') + * in the corresponding local relation using either replica identity index, + * primary key or if needed, sequential scan. + * + * Local tuple, if found, is returned in '*localslot'. + */ +static bool +FindReplTupleInLocalRel(EState *estate, Relation localrel, + LogicalRepRelation *remoterel, + TupleTableSlot *remoteslot, + TupleTableSlot **localslot) +{ + Oid idxoid; + bool found; + + idxoid = GetRelationIdentityOrPK(localrel); + Assert(OidIsValid(idxoid) || + (remoterel->replident == REPLICA_IDENTITY_FULL)); + + *localslot = table_slot_create(localrel, &estate->es_tupleTable); + if (OidIsValid(idxoid)) + found = RelationFindReplTupleByIndex(localrel, idxoid, + LockTupleExclusive, + remoteslot, *localslot); + else + found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, + remoteslot, *localslot); + + return found; +} + +/* * Handle TRUNCATE message. * * TODO: FDW support -- 1.8.3.1