From 72609a154b5fe79536166f35581dca6e1ef2e260 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Fri, 1 Sep 2017 16:54:57 +1200 Subject: [PATCH 1/2] Enable parallel query with SERIALIZABLE isolation. Previously, the SERIALIZABLE isolation level prevented parallel query from being used. Allow the two features to be used together by sharing the leader's SERIALIZABLEXACT with parallel workers. Remove the serializable_okay flag added to CreateParallelContext() by commit 9da0cc35284bdbe8d442d732963303ff0e0a40bc, because it's now redundant. The optimization allowing SSI checks to be skipped after a certain point in read-only transactions is disabled in parallel mode. It could be implemented in a later commit. Author: Thomas Munro Reviewed-By: Haribabu Kommi, Robert Haas Discussion: https://postgr.es/m/CAEepm=0gXGYhtrVDWOTHS8SQQy_=S9xo+8oCxGLWZAOoeJ=yzQ@mail.gmail.com --- doc/src/sgml/monitoring.sgml | 5 + doc/src/sgml/parallel.sgml | 17 ---- src/backend/access/nbtree/nbtsort.c | 2 +- src/backend/access/transam/parallel.c | 18 ++-- src/backend/access/transam/xact.c | 14 ++- src/backend/executor/execParallel.c | 2 +- src/backend/optimizer/plan/planner.c | 11 +-- src/backend/storage/lmgr/lwlock.c | 1 + src/backend/storage/lmgr/predicate.c | 109 ++++++++++++++++++--- src/include/access/parallel.h | 3 +- src/include/storage/lwlock.h | 1 + src/include/storage/predicate.h | 9 ++ src/include/storage/predicate_internals.h | 4 + .../isolation/expected/serializable-parallel-2.out | 44 +++++++++ .../isolation/expected/serializable-parallel.out | 44 +++++++++ src/test/isolation/isolation_schedule | 2 + .../isolation/specs/serializable-parallel-2.spec | 30 ++++++ .../isolation/specs/serializable-parallel.spec | 48 +++++++++ 18 files changed, 307 insertions(+), 57 deletions(-) create mode 100644 src/test/isolation/expected/serializable-parallel-2.out create mode 100644 src/test/isolation/expected/serializable-parallel.out create mode 100644 src/test/isolation/specs/serializable-parallel-2.spec create mode 100644 src/test/isolation/specs/serializable-parallel.spec diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3bc4de57d5a..daa961a3226 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -979,6 +979,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting to perform an operation on a list of locks held by serializable transactions. + + sxact + Waiting to perform an operation on a serializable transaction + in a parallel query. + OldSerXidLock Waiting to read or record conflicting serializable diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml index f15a9233cbf..9507f1ae2ef 100644 --- a/doc/src/sgml/parallel.sgml +++ b/doc/src/sgml/parallel.sgml @@ -192,13 +192,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; using a very large number of processes. - - - - The transaction isolation level is serializable. This is - a limitation of the current implementation. - - @@ -241,16 +234,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; that may be suboptimal when run serially. - - - - The transaction isolation level is serializable. This situation - does not normally arise, because parallel query plans are not - generated when the transaction isolation level is serializable. - However, it can happen if the transaction isolation level is changed to - serializable after the plan is generated and before it is executed. - - diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index f0c276b52a1..c5807294ba8 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1207,7 +1207,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) EnterParallelMode(); Assert(request > 0); pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main", - request, true); + request); scantuplesortstates = leaderparticipates ? request + 1 : request; /* diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 9d4efc0f8fc..cf0ee1499ab 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -30,6 +30,7 @@ #include "optimizer/planmain.h" #include "pgstat.h" #include "storage/ipc.h" +#include "storage/predicate.h" #include "storage/sinval.h" #include "storage/spin.h" #include "tcop/tcopprot.h" @@ -86,6 +87,7 @@ typedef struct FixedParallelState PGPROC *parallel_master_pgproc; pid_t parallel_master_pid; BackendId parallel_master_backend_id; + SerializableXactHandle serializable_xact_handle; /* Mutex protects remaining fields. */ slock_t mutex; @@ -150,7 +152,7 @@ static void ParallelWorkerShutdown(int code, Datum arg); */ ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, - int nworkers, bool serializable_okay) + int nworkers) { MemoryContext oldcontext; ParallelContext *pcxt; @@ -168,16 +170,6 @@ CreateParallelContext(const char *library_name, const char *function_name, if (dynamic_shared_memory_type == DSM_IMPL_NONE) nworkers = 0; - /* - * If we are running under serializable isolation, we can't use parallel - * workers, at least not until somebody enhances that mechanism to be - * parallel-aware. Utility statement callers may ask us to ignore this - * restriction because they're always able to safely ignore the fact that - * SIREAD locks do not work with parallelism. - */ - if (IsolationIsSerializable() && !serializable_okay) - nworkers = 0; - /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); @@ -321,6 +313,7 @@ InitializeParallelDSM(ParallelContext *pcxt) fps->parallel_master_pgproc = MyProc; fps->parallel_master_pid = MyProcPid; fps->parallel_master_backend_id = MyBackendId; + fps->serializable_xact_handle = ShareSerializableXact(); SpinLockInit(&fps->mutex); fps->last_xlog_end = 0; shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); @@ -1384,6 +1377,9 @@ ParallelWorkerMain(Datum main_arg) reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false); RestoreReindexState(reindexspace); + /* Attach to the leader's serializable transaction, if SERIALIZABLE. */ + AttachSerializableXact(fps->serializable_xact_handle); + /* * We've initialized all of our state now; nothing should change * hereafter. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index dbaaf8e0053..dd486835867 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2004,9 +2004,12 @@ CommitTransaction(void) /* * Mark serializable transaction as complete for predicate locking * purposes. This should be done as late as we can put it and still allow - * errors to be raised for failure patterns found at commit. + * errors to be raised for failure patterns found at commit. This is not + * appropriate in a parallel worker however, because we aren't committing + * the leader's transaction and its serializable state will live on. */ - PreCommit_CheckForSerializationFailure(); + if (!is_parallel_worker) + PreCommit_CheckForSerializationFailure(); /* * Insert notifications sent by NOTIFY commands into the queue. This @@ -2232,9 +2235,12 @@ PrepareTransaction(void) /* * Mark serializable transaction as complete for predicate locking * purposes. This should be done as late as we can put it and still allow - * errors to be raised for failure patterns found at commit. + * errors to be raised for failure patterns found at commit. This is not + * appropriate for parallel workers however, because we aren't committing + * the leader's transaction and its serializable state will live on. */ - PreCommit_CheckForSerializationFailure(); + if (!IsParallelWorker()) + PreCommit_CheckForSerializationFailure(); /* NOTIFY will be handled below */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 14b0b89463c..f8b72ebab99 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -592,7 +592,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, pstmt_data = ExecSerializePlan(planstate->plan, estate); /* Create a parallel context. */ - pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false); + pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers); pei->pcxt = pcxt; /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index de1257d9c22..44e14d6a8b7 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -289,14 +289,6 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * parallel worker. We might eventually be able to relax this * restriction, but for now it seems best not to have parallel workers * trying to create their own parallel workers. - * - * We can't use parallelism in serializable mode because the predicate - * locking code is not parallel-aware. It's not catastrophic if someone - * tries to run a parallel plan in serializable mode; it just won't get - * any workers and will run serially. But it seems like a good heuristic - * to assume that the same serialization level will be in effect at plan - * time and execution time, so don't generate a parallel plan if we're in - * serializable mode. */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && @@ -304,8 +296,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) parse->commandType == CMD_SELECT && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && - !IsParallelWorker() && - !IsolationIsSerializable()) + !IsParallelWorker()) { /* all the cheap tests pass, so scan the query tree */ glob->maxParallelHazard = max_parallel_hazard(parse); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index a6fda81feb6..a11b4bebd61 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -521,6 +521,7 @@ RegisterLWLockTranches(void) LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append"); LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join"); + LWLockRegisterTranche(LWTRANCHE_SXACT, "sxact"); /* Register named tranches. */ for (i = 0; i < NamedLWLockTrancheRequests; i++) diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 654eca4f3f5..617208c42cb 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -97,7 +97,9 @@ * - All transactions share this single lock (with no partitioning). * - There is never a need for a process other than the one running * an active transaction to walk the list of locks held by that - * transaction. + * transaction, except parallel query workers sharing the leader's + * transaction. In the parallel case, an extra per-sxact lock is + * taken; see below. * - It is relatively infrequent that another process needs to * modify the list for a transaction, but it does happen for such * things as index page splits for pages with predicate locks and @@ -116,6 +118,12 @@ * than its own active transaction must acquire an exclusive * lock. * + * SERIALIZABLEXACT's member 'predicateLockListLock' + * - Protects the linked list of locks held by a transaction. Only + * needed for parallel mode, where multiple backends share the + * same SERIALIZABLEXACT object. Not needed if + * SerializablePredicateLockListLock is held exclusively. + * * PredicateLockHashPartitionLock(hashcode) * - The same lock protects a target, all locks on that target, and * the linked list of locks on the target. @@ -186,6 +194,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "access/slru.h" #include "access/subtrans.h" #include "access/transam.h" @@ -469,6 +478,7 @@ static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag); static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); +static void CreateLocalPredicateLockHash(void); /*------------------------------------------------------------------------*/ @@ -522,8 +532,10 @@ SerializationNeededForRead(Relation relation, Snapshot snapshot) * A transaction is flagged as RO_SAFE if all concurrent R/W transactions * commit without having conflicts out to an earlier snapshot, thus * ensuring that no conflicts are possible for this transaction. + * + * This optimization is not yet supported in parallel mode. */ - if (SxactIsROSafe(MySerializableXact)) + if (SxactIsROSafe(MySerializableXact) && !IsInParallelMode()) { ReleasePredicateLocks(false); return false; @@ -1214,6 +1226,8 @@ InitPredicateLocks(void) memset(PredXact->element, 0, requestSize); for (i = 0; i < max_table_size; i++) { + LWLockInitialize(&PredXact->element[i].sxact.predicateLockListLock, + LWTRANCHE_SXACT); SHMQueueInsertBefore(&(PredXact->availableList), &(PredXact->element[i].link)); } @@ -1679,6 +1693,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, { Assert(IsolationIsSerializable()); + /* + * If this is called by parallel.c in a parallel worker, we don't want to + * create a SERIALIZABLEXACT just yet because the leader's + * SERIALIZABLEXACT will be installed with AttachSerializableXact(). We + * also don't want to reject SERIALIZABLE READ ONLY DEFERRABLE in this + * case, because the leader has already determined that the snapshot it + * has passed us is safe. So there is nothing for us to do. + */ + if (IsParallelWorker()) + return; + /* * We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to * import snapshots, since there's no way to wait for a safe snapshot when @@ -1712,7 +1737,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, VirtualTransactionId vxid; SERIALIZABLEXACT *sxact, *othersxact; - HASHCTL hash_ctl; /* We only do this for serializable transactions. Once. */ Assert(MySerializableXact == InvalidSerializableXact); @@ -1859,6 +1883,16 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, LWLockRelease(SerializableXactHashLock); + CreateLocalPredicateLockHash(); + + return snapshot; +} + +static void +CreateLocalPredicateLockHash(void) +{ + HASHCTL hash_ctl; + /* Initialize the backend-local hash table of parent locks */ Assert(LocalPredicateLockHash == NULL); MemSet(&hash_ctl, 0, sizeof(hash_ctl)); @@ -1868,8 +1902,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, max_predicate_locks_per_xact, &hash_ctl, HASH_ELEM | HASH_BLOBS); - - return snapshot; } /* @@ -2124,7 +2156,9 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) * This implementation is assuming that the usage of each target tag field * is uniform. No need to make this hard if we don't have to. * - * We aren't acquiring lightweight locks for the predicate lock or lock + * We acquire an LWLock in the case of parallel mode, because worker + * backends have access to the leader's SERIALIZABLEXACT. Otherwise, + * we aren't acquiring lightweight locks for the predicate lock or lock * target structures associated with this transaction unless we're going * to modify them, because no other process is permitted to modify our * locks. @@ -2137,6 +2171,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); sxact = MySerializableXact; + if (IsInParallelMode()) + LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE); predlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), &(sxact->predicateLocks), @@ -2190,6 +2226,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) predlock = nextpredlock; } + if (IsInParallelMode()) + LWLockRelease(&sxact->predicateLockListLock); LWLockRelease(SerializablePredicateLockListLock); } @@ -2388,6 +2426,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, partitionLock = PredicateLockHashPartitionLock(targettaghash); LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + if (IsInParallelMode()) + LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* Make sure that the target is represented. */ @@ -2425,6 +2465,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, } LWLockRelease(partitionLock); + if (IsInParallelMode()) + LWLockRelease(&sxact->predicateLockListLock); LWLockRelease(SerializablePredicateLockListLock); } @@ -2612,7 +2654,8 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) PREDICATELOCK *nextpredlock; bool found; - Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); + Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock, + LW_EXCLUSIVE)); Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash))); predlock = (PREDICATELOCK *) @@ -2672,7 +2715,7 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) * covers it, or if we are absolutely certain that no one will need to * refer to that lock in the future. * - * Caller must hold SerializablePredicateLockListLock. + * Caller must hold SerializablePredicateLockListLock exclusively. */ static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, @@ -2687,7 +2730,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, bool found; bool outOfShmem = false; - Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); + Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock, + LW_EXCLUSIVE)); oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag); newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag); @@ -3290,6 +3334,10 @@ ReleasePredicateLocks(bool isCommit) return; } + /* Parallel workers mustn't release predicate locks. */ + if (IsParallelWorker()) + goto backend_local_cleanup; + LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); Assert(!isCommit || SxactIsPrepared(MySerializableXact)); @@ -3319,8 +3367,8 @@ ReleasePredicateLocks(bool isCommit) MySerializableXact->finishedBefore = ShmemVariableCache->nextXid; /* - * If it's not a commit it's a rollback, and we can clear our locks - * immediately. + * If it's not a commit it's either a rollback or a read-only transaction + * flagged SXACT_FLAG_RO_SAFE, and we can clear our locks immediately. */ if (isCommit) { @@ -3567,6 +3615,7 @@ ReleasePredicateLocks(bool isCommit) if (needToClear) ClearOldPredicateLocks(); +backend_local_cleanup: MySerializableXact = InvalidSerializableXact; MyXactDidWrite = false; @@ -4259,6 +4308,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) PREDICATELOCK *rmpredlock; LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + if (IsInParallelMode()) + LWLockAcquire(&MySerializableXact->predicateLockListLock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); @@ -4293,6 +4344,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) LWLockRelease(SerializableXactHashLock); LWLockRelease(partitionLock); + if (IsInParallelMode()) + LWLockRelease(&MySerializableXact->predicateLockListLock); LWLockRelease(SerializablePredicateLockListLock); if (rmpredlock != NULL) @@ -4841,6 +4894,13 @@ AtPrepare_PredicateLocks(void) */ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + /* + * No need to take sxact->predicateLockListLock in parallel mode because + * there cannot be any parallel workers running while we are preparing a + * transaction. + */ + Assert(!IsParallelWorker() && !ParallelContextActive()); + predlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), &(sxact->predicateLocks), @@ -5049,3 +5109,30 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, CreatePredicateLock(&lockRecord->target, targettaghash, sxact); } } + +/* + * Prepare to share the current SERIALIZABLEXACT with parallel workers. + * Return a handle object that can be used by AttachSerializableXact() in a + * parallel worker. + */ +SerializableXactHandle +ShareSerializableXact(void) +{ + Assert(!IsParallelWorker()); + + return MySerializableXact; +} + +/* + * Allow parallel workers to import the leader's SERIALIZABLEXACT. + */ +void +AttachSerializableXact(SerializableXactHandle handle) +{ + + Assert(MySerializableXact == InvalidSerializableXact); + + MySerializableXact = (SERIALIZABLEXACT *) handle; + if (MySerializableXact != InvalidSerializableXact) + CreateLocalPredicateLockHash(); +} diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 025691fd82d..45e7fbb43f8 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -60,8 +60,7 @@ extern PGDLLIMPORT bool InitializingParallelWorker; #define IsParallelWorker() (ParallelWorkerNumber >= 0) extern ParallelContext *CreateParallelContext(const char *library_name, - const char *function_name, int nworkers, - bool serializable_okay); + const char *function_name, int nworkers); extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void LaunchParallelWorkers(ParallelContext *pcxt); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index c21bfe2f666..b25c43fc6be 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -219,6 +219,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_SHARED_TUPLESTORE, LWTRANCHE_TBM, LWTRANCHE_PARALLEL_APPEND, + LWTRANCHE_SXACT, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h index 6a3464daa1e..23f3acc3ce1 100644 --- a/src/include/storage/predicate.h +++ b/src/include/storage/predicate.h @@ -30,6 +30,11 @@ extern int max_predicate_locks_per_page; /* Number of SLRU buffers to use for predicate locking */ #define NUM_OLDSERXID_BUFFERS 16 +/* + * A handle used for sharing SERIALIZABLEXACT objects between the participants + * in a parallel query. + */ +typedef void *SerializableXactHandle; /* * function prototypes @@ -74,4 +79,8 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit); extern void predicatelock_twophase_recover(TransactionId xid, uint16 info, void *recdata, uint32 len); +/* parallel query support */ +extern SerializableXactHandle ShareSerializableXact(void); +extern void AttachSerializableXact(SerializableXactHandle handle); + #endif /* PREDICATE_H */ diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h index 0f736d37dff..59eb49e57ee 100644 --- a/src/include/storage/predicate_internals.h +++ b/src/include/storage/predicate_internals.h @@ -15,6 +15,7 @@ #define PREDICATE_INTERNALS_H #include "storage/lock.h" +#include "storage/lwlock.h" /* * Commit number. @@ -91,6 +92,9 @@ typedef struct SERIALIZABLEXACT SHM_QUEUE finishedLink; /* list link in * FinishedSerializableTransactions */ + LWLock predicateLockListLock; /* protects predicateLocks in parallel + * mode */ + /* * for r/o transactions: list of concurrent r/w transactions that we could * potentially have conflicts with, and vice versa for r/w transactions diff --git a/src/test/isolation/expected/serializable-parallel-2.out b/src/test/isolation/expected/serializable-parallel-2.out new file mode 100644 index 00000000000..9a693c4dc62 --- /dev/null +++ b/src/test/isolation/expected/serializable-parallel-2.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s1r s2r1 s1c s2r2 s2c +step s1r: SELECT * FROM foo; +a + +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +step s2r1: SELECT * FROM foo; +a + +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +step s1c: COMMIT; +step s2r2: SELECT * FROM foo; +a + +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +step s2c: COMMIT; diff --git a/src/test/isolation/expected/serializable-parallel.out b/src/test/isolation/expected/serializable-parallel.out new file mode 100644 index 00000000000..f43aa6a2990 --- /dev/null +++ b/src/test/isolation/expected/serializable-parallel.out @@ -0,0 +1,44 @@ +Parsed test spec with 3 sessions + +starting permutation: s2rx s2ry s1ry s1wy s1c s2wx s2c s3c +step s2rx: SELECT balance FROM bank_account WHERE id = 'X'; +balance + +0 +step s2ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y'; +step s1c: COMMIT; +step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X'; +step s2c: COMMIT; +step s3c: COMMIT; + +starting permutation: s2rx s2ry s1ry s1wy s1c s3r s3c s2wx +step s2rx: SELECT balance FROM bank_account WHERE id = 'X'; +balance + +0 +step s2ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y'; +step s1c: COMMIT; +step s3r: SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id; +id balance + +X 0 +Y 20 +step s3c: COMMIT; +step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X'; +ERROR: could not serialize access due to read/write dependencies among transactions diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule index 74d7d59546a..36890d74b60 100644 --- a/src/test/isolation/isolation_schedule +++ b/src/test/isolation/isolation_schedule @@ -66,3 +66,5 @@ test: async-notify test: vacuum-reltuples test: timeouts test: vacuum-concurrent-drop +test: serializable-parallel +test: serializable-parallel-2 diff --git a/src/test/isolation/specs/serializable-parallel-2.spec b/src/test/isolation/specs/serializable-parallel-2.spec new file mode 100644 index 00000000000..7f90f75d882 --- /dev/null +++ b/src/test/isolation/specs/serializable-parallel-2.spec @@ -0,0 +1,30 @@ +# Exercise the case where a read-only serializable transaction has +# SXACT_FLAG_RO_SAFE set in a parallel query. + +setup +{ + CREATE TABLE foo AS SELECT generate_series(1, 10)::int a; + ALTER TABLE foo SET (parallel_workers = 2); +} + +teardown +{ + DROP TABLE foo; +} + +session "s1" +setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; } +step "s1r" { SELECT * FROM foo; } +step "s1c" { COMMIT; } + +session "s2" +setup { + BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY; + SET parallel_setup_cost = 0; + SET parallel_tuple_cost = 0; + } +step "s2r1" { SELECT * FROM foo; } +step "s2r2" { SELECT * FROM foo; } +step "s2c" { COMMIT; } + +permutation "s1r" "s2r1" "s1c" "s2r2" "s2c" diff --git a/src/test/isolation/specs/serializable-parallel.spec b/src/test/isolation/specs/serializable-parallel.spec new file mode 100644 index 00000000000..0e7c2c7c1fa --- /dev/null +++ b/src/test/isolation/specs/serializable-parallel.spec @@ -0,0 +1,48 @@ +# The example from the paper "A read-only transaction anomaly under snapshot +# isolation"[1]. +# +# Here we test that serializable snapshot isolation (SERIALIZABLE) doesn't +# suffer from the anomaly, because s2 is aborted upon detection of a cycle. +# In this case the read only query s3 happens to be running in a parallel +# worker. +# +# [1] http://www.cs.umb.edu/~poneil/ROAnom.pdf + +setup +{ + CREATE TABLE bank_account (id TEXT PRIMARY KEY, balance DECIMAL NOT NULL); + INSERT INTO bank_account (id, balance) VALUES ('X', 0), ('Y', 0); +} + +teardown +{ + DROP TABLE bank_account; +} + +session "s1" +setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; } +step "s1ry" { SELECT balance FROM bank_account WHERE id = 'Y'; } +step "s1wy" { UPDATE bank_account SET balance = 20 WHERE id = 'Y'; } +step "s1c" { COMMIT; } + +session "s2" +setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; } +step "s2rx" { SELECT balance FROM bank_account WHERE id = 'X'; } +step "s2ry" { SELECT balance FROM bank_account WHERE id = 'Y'; } +step "s2wx" { UPDATE bank_account SET balance = -11 WHERE id = 'X'; } +step "s2c" { COMMIT; } + +session "s3" +setup { + BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; + SET max_parallel_workers_per_gather = 2; + SET force_parallel_mode = on; + } +step "s3r" { SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id; } +step "s3c" { COMMIT; } + +# without s3, s1 and s2 commit +permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s2wx" "s2c" "s3c" + +# once s3 observes the data committed by s1, a cycle is created and s2 aborts +permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s3r" "s3c" "s2wx" -- 2.15.1