diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index ae83291..be65ae7 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -84,6 +84,7 @@ #include "access/twophase_rmgr.h" #include "access/xact.h" #include "access/xlog.h" +#include "access/xlog_internal.h" #include "access/xloginsert.h" #include "access/xlogutils.h" #include "access/xlogreader.h" @@ -2408,3 +2409,87 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) return; } + +Datum +pg_prepared_xact_status(PG_FUNCTION_ARGS) +{ + char const* gid = PG_GETARG_CSTRING(0); + XLogRecord *record; + XLogReaderState *xlogreader; + char *errormsg; + XLogRecPtr lsn; + char const* xact_status = "unknown"; + bool done = false; + TimeLineID timeline; + TransactionId xid = InvalidTransactionId; + XLogRecPtr wal_end = GetFlushRecPtr(); + + GetOldestRestartPoint(&lsn, &timeline); + + xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL); + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + do + { + record = XLogReadRecord(xlogreader, lsn, &errormsg); + if (record == NULL) + break; + lsn = InvalidXLogRecPtr; /* continue after the record */ + if (XLogRecGetRmid(xlogreader) == RM_XACT_ID) + { + uint32 info = XLogRecGetInfo(xlogreader); + switch (info & XLOG_XACT_OPMASK) + { + case XLOG_XACT_PREPARE: + { + TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *)XLogRecGetData(xlogreader); + char* xact_gid = (char*)hdr + MAXALIGN(sizeof(TwoPhaseFileHeader)); + if (strcmp(xact_gid, gid) == 0) + { + xid = hdr->xid; + xact_status = "prepared"; + } + break; + } + case XLOG_XACT_COMMIT_PREPARED: + { + xl_xact_commit *xlrec; + xl_xact_parsed_commit parsed; + + xlrec = (xl_xact_commit *) XLogRecGetData(xlogreader); + ParseCommitRecord(info, xlrec, &parsed); + if (xid == parsed.twophase_xid) + { + Assert(TransactionIdIsValid(xid)); + xact_status = "committed"; + done = true; + } + break; + } + case XLOG_XACT_ABORT_PREPARED: + { + xl_xact_abort *xlrec; + xl_xact_parsed_abort parsed; + + xlrec = (xl_xact_abort *) XLogRecGetData(xlogreader); + ParseAbortRecord(info, xlrec, &parsed); + if (xid == parsed.twophase_xid) + { + Assert(TransactionIdIsValid(xid)); + xact_status = "aborted"; + done = true; + } + break; + } + default: + break; + } + } + } while (!done && xlogreader->EndRecPtr < wal_end); + + XLogReaderFree(xlogreader); + PG_RETURN_CSTRING(xact_status); +} diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 8b33b4e..001f586 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -3148,6 +3148,10 @@ DATA(insert OID = 3378 ( pg_isolation_test_session_is_blocked PGNSP PGUID 12 1 DESCR("isolationtester support function"); DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 0 0 2249 "" "{28,25,1184,26,26}" "{o,o,o,o,o}" "{transaction,gid,prepared,ownerid,dbid}" _null_ _null_ pg_prepared_xact _null_ _null_ _null_ )); DESCR("view two-phase transactions"); + +DATA(insert OID = 6015 ( pg_prepared_xact_status PGNSP PGUID 12 1 0 0 0 f f f f t f i s 1 0 2275 "2275" _null_ _null_ _null_ _null_ _null_ pg_prepared_xact_status _null_ _null_ _null_ )); +DESCR("I/O"); + DATA(insert OID = 3819 ( pg_get_multixact_members PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 1 0 2249 "28" "{28,28,25}" "{i,o,o}" "{multixid,xid,mode}" _null_ _null_ pg_get_multixact_members _null_ _null_ _null_ )); DESCR("view members of a multixactid");