diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9e10263bb..d3cb777f9f 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -318,6 +318,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) /* * Helper function for advancing physical replication slot forward. + * + * This function accepts arbitrary LSN even if the LSN is not at the beginning + * of a record. This can lead to any kind of misbehavior but currently the + * value is used only to determine up to what wal segment to keep and + * successive implicit advancing fixes the state. */ static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) @@ -344,6 +349,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; XLogRecPtr retlsn = InvalidXLogRecPtr; + XLogRecPtr upto; PG_TRY(); { @@ -354,6 +360,13 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) logical_read_local_xlog_page, NULL, NULL, NULL); + /* + * startlsn can be on page boundary but it is not accepted as explicit + * parameter to XLogReadRecord. Set it in reader context. + */ + Assert(startlsn != InvalidXLogRecPtr); + upto = ctx->reader->EndRecPtr = startlsn; + CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding"); @@ -361,22 +374,18 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) InvalidateSystemCaches(); /* Decode until we run out of records */ - while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) || - (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto)) + while (ctx->reader->EndRecPtr <= moveto) { XLogRecord *record; char *errm = NULL; + + /* ctx->reader->EndRecPtr cannot be go backward here */ + upto = ctx->reader->EndRecPtr; - record = XLogReadRecord(ctx->reader, startlsn, &errm); + record = XLogReadRecord(ctx->reader, InvalidXLogRecPtr, &errm); if (errm) elog(ERROR, "%s", errm); - /* - * Now that we've set up the xlog reader state, subsequent calls - * pass InvalidXLogRecPtr to say "continue from last record" - */ - startlsn = InvalidXLogRecPtr; - /* * The {begin_txn,change,commit_txn}_wrapper callbacks above will * store the description into our tuplestore. @@ -384,18 +393,14 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) if (record != NULL) LogicalDecodingProcessRecord(ctx, ctx->reader); - /* check limits */ - if (moveto <= ctx->reader->EndRecPtr) - break; - CHECK_FOR_INTERRUPTS(); } CurrentResourceOwner = old_resowner; - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + if (startlsn != upto) { - LogicalConfirmReceivedLocation(moveto); + LogicalConfirmReceivedLocation(upto); /* * If only the confirmed_flush_lsn has changed the slot won't get