Skip to content

Commit dcac5e7

Browse files
author
Amit Kapila
committed
Refactor sharedfileset.c to separate out fileset implementation.
Move fileset related implementation out of sharedfileset.c to allow its usage by backends that don't want to share filesets among different processes. After this split, fileset infrastructure is used by both sharedfileset.c and worker.c for the named temporary files that survive across transactions. Author: Dilip Kumar, based on suggestion by Andres Freund Reviewed-by: Hou Zhijie, Masahiko Sawada, Amit Kapila Discussion: https://postgr.es/m/E1mCC6U-0004Ik-Fs@gemulon.postgresql.org
1 parent d3fa876 commit dcac5e7

File tree

14 files changed

+368
-336
lines changed

14 files changed

+368
-336
lines changed

src/backend/replication/logical/launcher.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,9 @@ logicalrep_worker_onexit(int code, Datum arg)
648648

649649
logicalrep_worker_detach();
650650

651+
/* Cleanup filesets used for streaming transactions. */
652+
logicalrep_worker_cleanupfileset();
653+
651654
ApplyLauncherWakeup();
652655
}
653656

src/backend/replication/logical/worker.c

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@
3939
* BufFile infrastructure supports temporary files that exceed the OS file size
4040
* limit, (b) provides a way for automatic clean up on the error and (c) provides
4141
* a way to survive these files across local transactions and allow to open and
42-
* close at stream start and close. We decided to use SharedFileSet
42+
* close at stream start and close. We decided to use FileSet
4343
* infrastructure as without that it deletes the files on the closure of the
4444
* file and if we decide to keep stream files open across the start/stop stream
4545
* then it will consume a lot of memory (more than 8K for each BufFile and
4646
* there could be multiple such BufFiles as the subscriber could receive
4747
* multiple start/stop streams for different transactions before getting the
48-
* commit). Moreover, if we don't use SharedFileSet then we also need to invent
48+
* commit). Moreover, if we don't use FileSet then we also need to invent
4949
* a new way to pass filenames to BufFile APIs so that we are allowed to open
5050
* the file we desired across multiple stream-open calls for the same
5151
* transaction.
@@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
246246
typedef struct StreamXidHash
247247
{
248248
TransactionId xid; /* xid is the hash key and must be first */
249-
SharedFileSet *stream_fileset; /* shared file set for stream data */
250-
SharedFileSet *subxact_fileset; /* shared file set for subxact info */
249+
FileSet *stream_fileset; /* file set for stream data */
250+
FileSet *subxact_fileset; /* file set for subxact info */
251251
} StreamXidHash;
252252

253253
static MemoryContext ApplyMessageContext = NULL;
@@ -270,8 +270,8 @@ static bool in_streamed_transaction = false;
270270
static TransactionId stream_xid = InvalidTransactionId;
271271

272272
/*
273-
* Hash table for storing the streaming xid information along with shared file
274-
* set for streaming and subxact files.
273+
* Hash table for storing the streaming xid information along with filesets
274+
* for streaming and subxact files.
275275
*/
276276
static HTAB *xidhash = NULL;
277277

@@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s)
12971297

12981298
/* open the changes file */
12991299
changes_filename(path, MyLogicalRepWorker->subid, xid);
1300-
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
1300+
fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
13011301

13021302
/* OK, truncate the file at the right offset */
1303-
BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
1304-
subxact_data.subxacts[subidx].offset);
1303+
BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
1304+
subxact_data.subxacts[subidx].offset);
13051305
BufFileClose(fd);
13061306

13071307
/* discard the subxacts added later */
@@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
13551355
errmsg_internal("transaction %u not found in stream XID hash table",
13561356
xid)));
13571357

1358-
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
1358+
fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
13591359

13601360
buffer = palloc(BLCKSZ);
13611361
initStringInfo(&s2);
@@ -2541,6 +2541,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
25412541
}
25422542
}
25432543

2544+
/*
2545+
* Cleanup filesets.
2546+
*/
2547+
void
2548+
logicalrep_worker_cleanupfileset(void)
2549+
{
2550+
HASH_SEQ_STATUS status;
2551+
StreamXidHash *hentry;
2552+
2553+
/* Remove all the pending stream and subxact filesets. */
2554+
if (xidhash)
2555+
{
2556+
hash_seq_init(&status, xidhash);
2557+
while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
2558+
{
2559+
FileSetDeleteAll(hentry->stream_fileset);
2560+
2561+
/* Delete the subxact fileset iff it is created. */
2562+
if (hentry->subxact_fileset)
2563+
FileSetDeleteAll(hentry->subxact_fileset);
2564+
}
2565+
}
2566+
}
2567+
25442568
/*
25452569
* Apply main loop.
25462570
*/
@@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid)
30243048
if (ent->subxact_fileset)
30253049
{
30263050
cleanup_subxact_info();
3027-
SharedFileSetDeleteAll(ent->subxact_fileset);
3051+
FileSetDeleteAll(ent->subxact_fileset);
30283052
pfree(ent->subxact_fileset);
30293053
ent->subxact_fileset = NULL;
30303054
}
@@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid)
30423066
MemoryContext oldctx;
30433067

30443068
/*
3045-
* We need to maintain shared fileset across multiple stream
3046-
* start/stop calls. So, need to allocate it in a persistent context.
3069+
* We need to maintain fileset across multiple stream start/stop
3070+
* calls. So, need to allocate it in a persistent context.
30473071
*/
30483072
oldctx = MemoryContextSwitchTo(ApplyContext);
3049-
ent->subxact_fileset = palloc(sizeof(SharedFileSet));
3050-
SharedFileSetInit(ent->subxact_fileset, NULL);
3073+
ent->subxact_fileset = palloc(sizeof(FileSet));
3074+
FileSetInit(ent->subxact_fileset);
30513075
MemoryContextSwitchTo(oldctx);
30523076

3053-
fd = BufFileCreateShared(ent->subxact_fileset, path);
3077+
fd = BufFileCreateFileSet(ent->subxact_fileset, path);
30543078
}
30553079
else
3056-
fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
3080+
fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
30573081

30583082
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
30593083

@@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid)
31073131

31083132
subxact_filename(path, subid, xid);
31093133

3110-
fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
3134+
fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
31113135

31123136
/* read number of subxact items */
31133137
if (BufFileRead(fd, &subxact_data.nsubxacts,
@@ -3264,15 +3288,15 @@ stream_cleanup_files(Oid subid, TransactionId xid)
32643288

32653289
/* Delete the change file and release the stream fileset memory */
32663290
changes_filename(path, subid, xid);
3267-
SharedFileSetDeleteAll(ent->stream_fileset);
3291+
FileSetDeleteAll(ent->stream_fileset);
32683292
pfree(ent->stream_fileset);
32693293
ent->stream_fileset = NULL;
32703294

32713295
/* Delete the subxact file and release the memory, if it exist */
32723296
if (ent->subxact_fileset)
32733297
{
32743298
subxact_filename(path, subid, xid);
3275-
SharedFileSetDeleteAll(ent->subxact_fileset);
3299+
FileSetDeleteAll(ent->subxact_fileset);
32763300
pfree(ent->subxact_fileset);
32773301
ent->subxact_fileset = NULL;
32783302
}
@@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
32883312
*
32893313
* Open a file for streamed changes from a toplevel transaction identified
32903314
* by stream_xid (global variable). If it's the first chunk of streamed
3291-
* changes for this transaction, initialize the shared fileset and create the
3292-
* buffile, otherwise open the previously created file.
3315+
* changes for this transaction, initialize the fileset and create the buffile,
3316+
* otherwise open the previously created file.
32933317
*
32943318
* This can only be called at the beginning of a "streaming" block, i.e.
32953319
* between stream_start/stream_stop messages from the upstream.
@@ -3330,24 +3354,24 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
33303354
if (first_segment)
33313355
{
33323356
MemoryContext savectx;
3333-
SharedFileSet *fileset;
3357+
FileSet *fileset;
33343358

33353359
if (found)
33363360
ereport(ERROR,
33373361
(errcode(ERRCODE_PROTOCOL_VIOLATION),
33383362
errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
33393363

33403364
/*
3341-
* We need to maintain shared fileset across multiple stream
3342-
* start/stop calls. So, need to allocate it in a persistent context.
3365+
* We need to maintain fileset across multiple stream start/stop
3366+
* calls. So, need to allocate it in a persistent context.
33433367
*/
33443368
savectx = MemoryContextSwitchTo(ApplyContext);
3345-
fileset = palloc(sizeof(SharedFileSet));
3369+
fileset = palloc(sizeof(FileSet));
33463370

3347-
SharedFileSetInit(fileset, NULL);
3371+
FileSetInit(fileset);
33483372
MemoryContextSwitchTo(savectx);
33493373

3350-
stream_fd = BufFileCreateShared(fileset, path);
3374+
stream_fd = BufFileCreateFileSet(fileset, path);
33513375

33523376
/* Remember the fileset for the next stream of the same transaction */
33533377
ent->xid = xid;
@@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
33653389
* Open the file and seek to the end of the file because we always
33663390
* append the changes file.
33673391
*/
3368-
stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
3392+
stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
33693393
BufFileSeek(stream_fd, 0, 0, SEEK_END);
33703394
}
33713395

src/backend/storage/file/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ OBJS = \
1616
buffile.o \
1717
copydir.o \
1818
fd.o \
19+
fileset.o \
1920
reinit.o \
2021
sharedfileset.o
2122

0 commit comments

Comments
 (0)