diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index f19649b..dd0c1ce 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -31,7 +31,7 @@ static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); -static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); +static int logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); static void logicalrep_write_namespace(StringInfo out, Oid nspid); static const char *logicalrep_read_namespace(StringInfo in); @@ -210,7 +210,8 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, - LogicalRepTupleData *newtup) + LogicalRepTupleData *newtup, + Bitmapset **updatedCols) { char action; LogicalRepRelId relid; @@ -227,9 +228,20 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, /* check for old tuple */ if (action == 'K' || action == 'O') { - logicalrep_read_tuple(in, oldtup); + int i; + int natts; + + natts = logicalrep_read_tuple(in, oldtup); *has_oldtuple = true; + /* make a bitmap of updated columns */ + for (i = 0; i < natts; i++) + { + if (oldtup->changed[i]) + *updatedCols = bms_add_member(*updatedCols, + i - FirstLowInvalidHeapAttributeNumber); + } + action = pq_getmsgbyte(in); } else @@ -449,11 +461,12 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) } /* - * Read tuple in remote format from stream. + * Read tuple in remote format from stream. Returns the number + * of attributes. * * The returned tuple points into the input stringinfo. */ -static void +static int logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) { int i; @@ -499,6 +512,8 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) elog(ERROR, "unrecognized data representation type '%c'", kind); } } + + return natts; } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index bc6d824..b701f82 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -178,12 +178,13 @@ ensure_transaction(void) /* * Executor state preparation for evaluation of constraint expressions, - * indexes and triggers. + * indexes and triggers. updatedCols should not be NULL if we create the + * executor state for an update operation. * * This is based on similar code in copy.c */ static EState * -create_estate_for_relation(LogicalRepRelMapEntry *rel) +create_estate_for_relation(LogicalRepRelMapEntry *rel, Bitmapset *updatedCols) { EState *estate; ResultRelInfo *resultRelInfo; @@ -195,6 +196,7 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) rte->rtekind = RTE_RELATION; rte->relid = RelationGetRelid(rel->localrel); rte->relkind = rel->localrel->rd_rel->relkind; + rte->updatedCols = updatedCols; estate->es_range_table = list_make1(rte); resultRelInfo = makeNode(ResultRelInfo); @@ -579,7 +581,7 @@ apply_handle_insert(StringInfo s) } /* Initialize the executor state. */ - estate = create_estate_for_relation(rel); + estate = create_estate_for_relation(rel, NULL); remoteslot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel)); @@ -663,11 +665,12 @@ apply_handle_update(StringInfo s) TupleTableSlot *remoteslot; bool found; MemoryContext oldctx; + Bitmapset *updatedCols = NULL; ensure_transaction(); relid = logicalrep_read_update(s, &has_oldtup, &oldtup, - &newtup); + &newtup, &updatedCols); rel = logicalrep_rel_open(relid, RowExclusiveLock); if (!should_apply_changes_for_rel(rel)) { @@ -683,7 +686,7 @@ apply_handle_update(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel); + estate = create_estate_for_relation(rel, updatedCols); remoteslot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel)); localslot = ExecInitExtraTupleSlot(estate); @@ -801,7 +804,7 @@ apply_handle_delete(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel); + estate = create_estate_for_relation(rel, NULL); remoteslot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel)); localslot = ExecInitExtraTupleSlot(estate); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index a9736e1..ccacb19 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -93,7 +93,7 @@ extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldt HeapTuple newtuple); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, - LogicalRepTupleData *newtup); + LogicalRepTupleData *newtup, Bitmapset **updatedCols); extern void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple); extern LogicalRepRelId logicalrep_read_delete(StringInfo in,