diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 14e057c..53c1fe8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1946,49 +1946,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } /* - * Cleanup function. - * - * Called on logical replication worker exit. - */ -static void -worker_onexit(int code, Datum arg) -{ - HASH_SEQ_STATUS status; - StreamXidHash *ent; - char path[MAXPGPATH]; - - /* nothing to clean */ - if (xidhash == NULL) - return; - - /* - * Scan complete hash and delete the underlying files for the xids. - * Also release the memory for the shared file sets. - */ - hash_seq_init(&status, xidhash); - while ((ent = (StreamXidHash *) hash_seq_search(&status)) != NULL) - { - changes_filename(path, MyLogicalRepWorker->subid, ent->xid); - BufFileDeleteShared(ent->stream_fileset, path); - pfree(ent->stream_fileset); - - /* - * We might not have created the subxact fileset if there is no sub - * transaction. - */ - if (ent->subxact_fileset) - { - subxact_filename(path, MyLogicalRepWorker->subid, ent->xid); - BufFileDeleteShared(ent->subxact_fileset, path); - pfree(ent->subxact_fileset); - } - } - - /* Remove the xid hash */ - hash_destroy(xidhash); -} - -/* * Apply main loop. */ static void @@ -2012,9 +1969,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "LogicalStreamingContext", ALLOCSET_DEFAULT_SIZES); - /* do cleanup on worker exit (e.g. after DROP SUBSCRIPTION) */ - before_shmem_exit(worker_onexit, (Datum) 0); - /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -2503,6 +2457,7 @@ subxact_info_write(Oid subid, TransactionId xid) { cleanup_subxact_info(); BufFileDeleteShared(ent->subxact_fileset, path); + SharedFileSetUnregister(ent->subxact_fileset); pfree(ent->subxact_fileset); ent->subxact_fileset = NULL; } @@ -2515,10 +2470,13 @@ subxact_info_write(Oid subid, TransactionId xid) */ if (ent->subxact_fileset == NULL) { - ent->subxact_fileset = - MemoryContextAlloc(ApplyContext, sizeof(SharedFileSet)); + MemoryContext oldctx; + /* Shared fileset handle must be allocated in the persistent context */ + oldctx = MemoryContextSwitchTo(ApplyContext); + ent->subxact_fileset = palloc(sizeof(SharedFileSet)); SharedFileSetInit(ent->subxact_fileset, NULL); + MemoryContextSwitchTo(oldctx); fd = BufFileCreateShared(ent->subxact_fileset, path); } else @@ -2726,6 +2684,7 @@ stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok) /* Delete the change file and release the stream fileset memory */ changes_filename(path, subid, xid); BufFileDeleteShared(ent->stream_fileset, path); + SharedFileSetUnregister(ent->stream_fileset); pfree(ent->stream_fileset); /* Delete the subxact file and release the memory, if it exist */ @@ -2733,6 +2692,7 @@ stream_cleanup_files(Oid subid, TransactionId xid, bool missing_ok) { subxact_filename(path, subid, xid); BufFileDeleteShared(ent->subxact_fileset, path); + SharedFileSetUnregister(ent->subxact_fileset); pfree(ent->subxact_fileset); } } @@ -2784,13 +2744,15 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) */ if (first_segment) { - /* - * Shared fileset handle must be allocated in the persistent context. - */ - SharedFileSet *fileset = - MemoryContextAlloc(ApplyContext, sizeof(SharedFileSet)); + MemoryContext oldctx; + SharedFileSet *fileset; + /* Shared fileset handle must be allocated in the persistent context */ + oldctx = MemoryContextSwitchTo(ApplyContext); + fileset = palloc(sizeof(SharedFileSet)); SharedFileSetInit(fileset, NULL); + oldctx = MemoryContextSwitchTo(oldctx); + stream_fd = BufFileCreateShared(fileset, path); /* Remember the fileset for the next stream of the same transaction */ diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c index c81d298..9bfe71c 100644 --- a/src/backend/storage/file/sharedfileset.c +++ b/src/backend/storage/file/sharedfileset.c @@ -25,10 +25,14 @@ #include "common/hashfn.h" #include "miscadmin.h" #include "storage/dsm.h" +#include "storage/ipc.h" #include "storage/sharedfileset.h" #include "utils/builtins.h" +static List * filesetlist = NULL; + static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum); +static void SharedFileSetOnProcExit(int status, Datum arg); static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace); static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name); static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name); @@ -76,6 +80,13 @@ SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg) /* Register our cleanup callback. */ if (seg) on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset)); + else + { + if (filesetlist == NULL) + on_proc_exit(SharedFileSetOnProcExit, 0); + + filesetlist = lcons((void *)fileset, filesetlist); + } } /* @@ -214,6 +225,52 @@ SharedFileSetOnDetach(dsm_segment *segment, Datum datum) } /* + * Callback function that will be invoked on the process exit. This will + * process the list if all the sharedfileset registered and delete the + * underlying files. + */ +static void +SharedFileSetOnProcExit(int status, Datum arg) +{ + ListCell *l; + + /* Loop over all the pending shared fileset entry */ + foreach (l, filesetlist) + { + SharedFileSet *fileset = (SharedFileSet *) lfirst(l); + SharedFileSetDeleteAll(fileset); + } +} + +/* + * Unregister the shared fileset entry, registered for cleanup on proc exit. + */ +void +SharedFileSetUnregister(SharedFileSet *input_fileset) +{ + bool found = false; + ListCell *l; + + Assert(filesetlist != NULL); + + /* Loop over all the pending shared fileset entry */ + foreach (l, filesetlist) + { + SharedFileSet *fileset = (SharedFileSet *) lfirst(l); + + /* remove the entry from the list and delete the underlying files */ + if (input_fileset->number == fileset->number) + { + SharedFileSetDeleteAll(fileset); + filesetlist = list_delete_cell(filesetlist, l); + found = true; + break; + } + } + Assert(found); +} + +/* * Build the path for the directory holding the files backing a SharedFileSet * in a given tablespace. */ diff --git a/src/include/storage/sharedfileset.h b/src/include/storage/sharedfileset.h index b2f4ba4..d5edb60 100644 --- a/src/include/storage/sharedfileset.h +++ b/src/include/storage/sharedfileset.h @@ -42,5 +42,6 @@ extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name, extern bool SharedFileSetDelete(SharedFileSet *fileset, const char *name, bool error_on_failure); extern void SharedFileSetDeleteAll(SharedFileSet *fileset); +extern void SharedFileSetUnregister(SharedFileSet *input_fileset); #endif