39
39
* BufFile infrastructure supports temporary files that exceed the OS file size
40
40
* limit, (b) provides a way for automatic clean up on the error and (c) provides
41
41
* a way to survive these files across local transactions and allow to open and
42
- * close at stream start and close. We decided to use SharedFileSet
42
+ * close at stream start and close. We decided to use FileSet
43
43
* infrastructure as without that it deletes the files on the closure of the
44
44
* file and if we decide to keep stream files open across the start/stop stream
45
45
* then it will consume a lot of memory (more than 8K for each BufFile and
46
46
* there could be multiple such BufFiles as the subscriber could receive
47
47
* multiple start/stop streams for different transactions before getting the
48
- * commit). Moreover, if we don't use SharedFileSet then we also need to invent
48
+ * commit). Moreover, if we don't use FileSet then we also need to invent
49
49
* a new way to pass filenames to BufFile APIs so that we are allowed to open
50
50
* the file we desired across multiple stream-open calls for the same
51
51
* transaction.
@@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
246
246
typedef struct StreamXidHash
247
247
{
248
248
TransactionId xid ; /* xid is the hash key and must be first */
249
- SharedFileSet * stream_fileset ; /* shared file set for stream data */
250
- SharedFileSet * subxact_fileset ; /* shared file set for subxact info */
249
+ FileSet * stream_fileset ; /* file set for stream data */
250
+ FileSet * subxact_fileset ; /* file set for subxact info */
251
251
} StreamXidHash ;
252
252
253
253
static MemoryContext ApplyMessageContext = NULL ;
@@ -270,8 +270,8 @@ static bool in_streamed_transaction = false;
270
270
static TransactionId stream_xid = InvalidTransactionId ;
271
271
272
272
/*
273
- * Hash table for storing the streaming xid information along with shared file
274
- * set for streaming and subxact files.
273
+ * Hash table for storing the streaming xid information along with filesets
274
+ * for streaming and subxact files.
275
275
*/
276
276
static HTAB * xidhash = NULL ;
277
277
@@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s)
1297
1297
1298
1298
/* open the changes file */
1299
1299
changes_filename (path , MyLogicalRepWorker -> subid , xid );
1300
- fd = BufFileOpenShared (ent -> stream_fileset , path , O_RDWR );
1300
+ fd = BufFileOpenFileSet (ent -> stream_fileset , path , O_RDWR );
1301
1301
1302
1302
/* OK, truncate the file at the right offset */
1303
- BufFileTruncateShared (fd , subxact_data .subxacts [subidx ].fileno ,
1304
- subxact_data .subxacts [subidx ].offset );
1303
+ BufFileTruncateFileSet (fd , subxact_data .subxacts [subidx ].fileno ,
1304
+ subxact_data .subxacts [subidx ].offset );
1305
1305
BufFileClose (fd );
1306
1306
1307
1307
/* discard the subxacts added later */
@@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
1355
1355
errmsg_internal ("transaction %u not found in stream XID hash table" ,
1356
1356
xid )));
1357
1357
1358
- fd = BufFileOpenShared (ent -> stream_fileset , path , O_RDONLY );
1358
+ fd = BufFileOpenFileSet (ent -> stream_fileset , path , O_RDONLY );
1359
1359
1360
1360
buffer = palloc (BLCKSZ );
1361
1361
initStringInfo (& s2 );
@@ -2541,6 +2541,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2541
2541
}
2542
2542
}
2543
2543
2544
+ /*
2545
+ * Cleanup filesets.
2546
+ */
2547
+ void
2548
+ logicalrep_worker_cleanupfileset (void )
2549
+ {
2550
+ HASH_SEQ_STATUS status ;
2551
+ StreamXidHash * hentry ;
2552
+
2553
+ /* Remove all the pending stream and subxact filesets. */
2554
+ if (xidhash )
2555
+ {
2556
+ hash_seq_init (& status , xidhash );
2557
+ while ((hentry = (StreamXidHash * ) hash_seq_search (& status )) != NULL )
2558
+ {
2559
+ FileSetDeleteAll (hentry -> stream_fileset );
2560
+
2561
+ /* Delete the subxact fileset iff it is created. */
2562
+ if (hentry -> subxact_fileset )
2563
+ FileSetDeleteAll (hentry -> subxact_fileset );
2564
+ }
2565
+ }
2566
+ }
2567
+
2544
2568
/*
2545
2569
* Apply main loop.
2546
2570
*/
@@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid)
3024
3048
if (ent -> subxact_fileset )
3025
3049
{
3026
3050
cleanup_subxact_info ();
3027
- SharedFileSetDeleteAll (ent -> subxact_fileset );
3051
+ FileSetDeleteAll (ent -> subxact_fileset );
3028
3052
pfree (ent -> subxact_fileset );
3029
3053
ent -> subxact_fileset = NULL ;
3030
3054
}
@@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid)
3042
3066
MemoryContext oldctx ;
3043
3067
3044
3068
/*
3045
- * We need to maintain shared fileset across multiple stream
3046
- * start/stop calls. So, need to allocate it in a persistent context.
3069
+ * We need to maintain fileset across multiple stream start/stop
3070
+ * calls. So, need to allocate it in a persistent context.
3047
3071
*/
3048
3072
oldctx = MemoryContextSwitchTo (ApplyContext );
3049
- ent -> subxact_fileset = palloc (sizeof (SharedFileSet ));
3050
- SharedFileSetInit (ent -> subxact_fileset , NULL );
3073
+ ent -> subxact_fileset = palloc (sizeof (FileSet ));
3074
+ FileSetInit (ent -> subxact_fileset );
3051
3075
MemoryContextSwitchTo (oldctx );
3052
3076
3053
- fd = BufFileCreateShared (ent -> subxact_fileset , path );
3077
+ fd = BufFileCreateFileSet (ent -> subxact_fileset , path );
3054
3078
}
3055
3079
else
3056
- fd = BufFileOpenShared (ent -> subxact_fileset , path , O_RDWR );
3080
+ fd = BufFileOpenFileSet (ent -> subxact_fileset , path , O_RDWR );
3057
3081
3058
3082
len = sizeof (SubXactInfo ) * subxact_data .nsubxacts ;
3059
3083
@@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid)
3107
3131
3108
3132
subxact_filename (path , subid , xid );
3109
3133
3110
- fd = BufFileOpenShared (ent -> subxact_fileset , path , O_RDONLY );
3134
+ fd = BufFileOpenFileSet (ent -> subxact_fileset , path , O_RDONLY );
3111
3135
3112
3136
/* read number of subxact items */
3113
3137
if (BufFileRead (fd , & subxact_data .nsubxacts ,
@@ -3264,15 +3288,15 @@ stream_cleanup_files(Oid subid, TransactionId xid)
3264
3288
3265
3289
/* Delete the change file and release the stream fileset memory */
3266
3290
changes_filename (path , subid , xid );
3267
- SharedFileSetDeleteAll (ent -> stream_fileset );
3291
+ FileSetDeleteAll (ent -> stream_fileset );
3268
3292
pfree (ent -> stream_fileset );
3269
3293
ent -> stream_fileset = NULL ;
3270
3294
3271
3295
/* Delete the subxact file and release the memory, if it exist */
3272
3296
if (ent -> subxact_fileset )
3273
3297
{
3274
3298
subxact_filename (path , subid , xid );
3275
- SharedFileSetDeleteAll (ent -> subxact_fileset );
3299
+ FileSetDeleteAll (ent -> subxact_fileset );
3276
3300
pfree (ent -> subxact_fileset );
3277
3301
ent -> subxact_fileset = NULL ;
3278
3302
}
@@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
3288
3312
*
3289
3313
* Open a file for streamed changes from a toplevel transaction identified
3290
3314
* by stream_xid (global variable). If it's the first chunk of streamed
3291
- * changes for this transaction, initialize the shared fileset and create the
3292
- * buffile, otherwise open the previously created file.
3315
+ * changes for this transaction, initialize the fileset and create the buffile,
3316
+ * otherwise open the previously created file.
3293
3317
*
3294
3318
* This can only be called at the beginning of a "streaming" block, i.e.
3295
3319
* between stream_start/stream_stop messages from the upstream.
@@ -3330,24 +3354,24 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
3330
3354
if (first_segment )
3331
3355
{
3332
3356
MemoryContext savectx ;
3333
- SharedFileSet * fileset ;
3357
+ FileSet * fileset ;
3334
3358
3335
3359
if (found )
3336
3360
ereport (ERROR ,
3337
3361
(errcode (ERRCODE_PROTOCOL_VIOLATION ),
3338
3362
errmsg_internal ("incorrect first-segment flag for streamed replication transaction" )));
3339
3363
3340
3364
/*
3341
- * We need to maintain shared fileset across multiple stream
3342
- * start/stop calls. So, need to allocate it in a persistent context.
3365
+ * We need to maintain fileset across multiple stream start/stop
3366
+ * calls. So, need to allocate it in a persistent context.
3343
3367
*/
3344
3368
savectx = MemoryContextSwitchTo (ApplyContext );
3345
- fileset = palloc (sizeof (SharedFileSet ));
3369
+ fileset = palloc (sizeof (FileSet ));
3346
3370
3347
- SharedFileSetInit (fileset , NULL );
3371
+ FileSetInit (fileset );
3348
3372
MemoryContextSwitchTo (savectx );
3349
3373
3350
- stream_fd = BufFileCreateShared (fileset , path );
3374
+ stream_fd = BufFileCreateFileSet (fileset , path );
3351
3375
3352
3376
/* Remember the fileset for the next stream of the same transaction */
3353
3377
ent -> xid = xid ;
@@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
3365
3389
* Open the file and seek to the end of the file because we always
3366
3390
* append the changes file.
3367
3391
*/
3368
- stream_fd = BufFileOpenShared (ent -> stream_fileset , path , O_RDWR );
3392
+ stream_fd = BufFileOpenFileSet (ent -> stream_fileset , path , O_RDWR );
3369
3393
BufFileSeek (stream_fd , 0 , 0 , SEEK_END );
3370
3394
}
3371
3395
0 commit comments