@@ -201,6 +201,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
201
201
static void ReorderBufferRestoreChange (ReorderBuffer * rb , ReorderBufferTXN * txn ,
202
202
char * change );
203
203
static void ReorderBufferRestoreCleanup (ReorderBuffer * rb , ReorderBufferTXN * txn );
204
+ static void ReorderBufferCleanupSerializedTXNs (const char * slotname );
205
+ static void ReorderBufferSerializedPath (char * path , ReplicationSlot * slot ,
206
+ TransactionId xid , XLogSegNo segno );
204
207
205
208
static void ReorderBufferFreeSnap (ReorderBuffer * rb , Snapshot snap );
206
209
static Snapshot ReorderBufferCopySnap (ReorderBuffer * rb , Snapshot orig_snap ,
@@ -219,7 +222,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
219
222
220
223
221
224
/*
222
- * Allocate a new ReorderBuffer
225
+ * Allocate a new ReorderBuffer and clean out any old serialized state from
226
+ * prior ReorderBuffer instances for the same slot.
223
227
*/
224
228
ReorderBuffer *
225
229
ReorderBufferAllocate (void )
@@ -228,6 +232,8 @@ ReorderBufferAllocate(void)
228
232
HASHCTL hash_ctl ;
229
233
MemoryContext new_ctx ;
230
234
235
+ Assert (MyReplicationSlot != NULL );
236
+
231
237
/* allocate memory in own context, to have better accountability */
232
238
new_ctx = AllocSetContextCreate (CurrentMemoryContext ,
233
239
"ReorderBuffer" ,
@@ -267,6 +273,13 @@ ReorderBufferAllocate(void)
267
273
dlist_init (& buffer -> cached_changes );
268
274
slist_init (& buffer -> cached_tuplebufs );
269
275
276
+ /*
277
+ * Ensure there's no stale data from prior uses of this slot, in case some
278
+ * prior exit avoided calling ReorderBufferFree. Failure to do this can
279
+ * produce duplicated txns, and it's very cheap if there's nothing there.
280
+ */
281
+ ReorderBufferCleanupSerializedTXNs (NameStr (MyReplicationSlot -> data .name ));
282
+
270
283
return buffer ;
271
284
}
272
285
@@ -283,6 +296,9 @@ ReorderBufferFree(ReorderBuffer *rb)
283
296
* memory context.
284
297
*/
285
298
MemoryContextDelete (context );
299
+
300
+ /* Free disk space used by unconsumed reorder buffers */
301
+ ReorderBufferCleanupSerializedTXNs (NameStr (MyReplicationSlot -> data .name ));
286
302
}
287
303
288
304
/*
@@ -1964,7 +1980,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1964
1980
int fd = -1 ;
1965
1981
XLogSegNo curOpenSegNo = 0 ;
1966
1982
Size spilled = 0 ;
1967
- char path [MAXPGPATH ];
1968
1983
1969
1984
elog (DEBUG2 , "spill %u changes in XID %u to disk" ,
1970
1985
(uint32 ) txn -> nentries_mem , txn -> xid );
@@ -1991,21 +2006,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1991
2006
*/
1992
2007
if (fd == -1 || !XLByteInSeg (change -> lsn , curOpenSegNo ))
1993
2008
{
1994
- XLogRecPtr recptr ;
2009
+ char path [ MAXPGPATH ] ;
1995
2010
1996
2011
if (fd != -1 )
1997
2012
CloseTransientFile (fd );
1998
2013
1999
2014
XLByteToSeg (change -> lsn , curOpenSegNo );
2000
- XLogSegNoOffsetToRecPtr (curOpenSegNo , 0 , recptr );
2001
2015
2002
2016
/*
2003
2017
* No need to care about TLIs here, only used during a single run,
2004
2018
* so each LSN only maps to a specific WAL record.
2005
2019
*/
2006
- sprintf (path , "pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2007
- NameStr (MyReplicationSlot -> data .name ), txn -> xid ,
2008
- (uint32 ) (recptr >> 32 ), (uint32 ) recptr );
2020
+ ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid ,
2021
+ curOpenSegNo );
2009
2022
2010
2023
/* open segment, create it if necessary */
2011
2024
fd = OpenTransientFile (path ,
@@ -2015,8 +2028,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2015
2028
if (fd < 0 )
2016
2029
ereport (ERROR ,
2017
2030
(errcode_for_file_access (),
2018
- errmsg ("could not open file \"%s\": %m" ,
2019
- path )));
2031
+ errmsg ("could not open file \"%s\": %m" , path )));
2020
2032
}
2021
2033
2022
2034
ReorderBufferSerializeChange (rb , txn , fd , change );
@@ -2203,25 +2215,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2203
2215
2204
2216
if (* fd == -1 )
2205
2217
{
2206
- XLogRecPtr recptr ;
2207
2218
char path [MAXPGPATH ];
2208
2219
2209
2220
/* first time in */
2210
2221
if (* segno == 0 )
2211
- {
2212
2222
XLByteToSeg (txn -> first_lsn , * segno );
2213
- }
2214
2223
2215
2224
Assert (* segno != 0 || dlist_is_empty (& txn -> changes ));
2216
- XLogSegNoOffsetToRecPtr (* segno , 0 , recptr );
2217
2225
2218
2226
/*
2219
2227
* No need to care about TLIs here, only used during a single run,
2220
2228
* so each LSN only maps to a specific WAL record.
2221
2229
*/
2222
- sprintf (path , "pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2223
- NameStr (MyReplicationSlot -> data .name ), txn -> xid ,
2224
- (uint32 ) (recptr >> 32 ), (uint32 ) recptr );
2230
+ ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid ,
2231
+ * segno );
2225
2232
2226
2233
* fd = OpenTransientFile (path , O_RDONLY | PG_BINARY , 0 );
2227
2234
if (* fd < 0 && errno == ENOENT )
@@ -2428,20 +2435,72 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2428
2435
for (cur = first ; cur <= last ; cur ++ )
2429
2436
{
2430
2437
char path [MAXPGPATH ];
2431
- XLogRecPtr recptr ;
2432
-
2433
- XLogSegNoOffsetToRecPtr (cur , 0 , recptr );
2434
2438
2435
- sprintf (path , "pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2436
- NameStr (MyReplicationSlot -> data .name ), txn -> xid ,
2437
- (uint32 ) (recptr >> 32 ), (uint32 ) recptr );
2439
+ ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid , cur );
2438
2440
if (unlink (path ) != 0 && errno != ENOENT )
2439
2441
ereport (ERROR ,
2440
2442
(errcode_for_file_access (),
2441
2443
errmsg ("could not remove file \"%s\": %m" , path )));
2442
2444
}
2443
2445
}
2444
2446
2447
+ /*
2448
+ * Remove any leftover serialized reorder buffers from a slot directory after a
2449
+ * prior crash or decoding session exit.
2450
+ */
2451
+ static void
2452
+ ReorderBufferCleanupSerializedTXNs (const char * slotname )
2453
+ {
2454
+ DIR * spill_dir ;
2455
+ struct dirent * spill_de ;
2456
+ struct stat statbuf ;
2457
+ char path [MAXPGPATH * 2 + 12 ];
2458
+
2459
+ sprintf (path , "pg_replslot/%s" , slotname );
2460
+
2461
+ /* we're only handling directories here, skip if it's not ours */
2462
+ if (lstat (path , & statbuf ) == 0 && !S_ISDIR (statbuf .st_mode ))
2463
+ return ;
2464
+
2465
+ spill_dir = AllocateDir (path );
2466
+ while ((spill_de = ReadDirExtended (spill_dir , path , INFO )) != NULL )
2467
+ {
2468
+ /* only look at names that can be ours */
2469
+ if (strncmp (spill_de -> d_name , "xid" , 3 ) == 0 )
2470
+ {
2471
+ snprintf (path , sizeof (path ),
2472
+ "pg_replslot/%s/%s" , slotname ,
2473
+ spill_de -> d_name );
2474
+
2475
+ if (unlink (path ) != 0 )
2476
+ ereport (ERROR ,
2477
+ (errcode_for_file_access (),
2478
+ errmsg ("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m" ,
2479
+ path , slotname )));
2480
+ }
2481
+ }
2482
+ FreeDir (spill_dir );
2483
+ }
2484
+
2485
+ /*
2486
+ * Given a replication slot, transaction ID and segment number, fill in the
2487
+ * corresponding spill file into 'path', which is a caller-owned buffer of size
2488
+ * at least MAXPGPATH.
2489
+ */
2490
+ static void
2491
+ ReorderBufferSerializedPath (char * path , ReplicationSlot * slot , TransactionId xid ,
2492
+ XLogSegNo segno )
2493
+ {
2494
+ XLogRecPtr recptr ;
2495
+
2496
+ XLogSegNoOffsetToRecPtr (segno , 0 , recptr );
2497
+
2498
+ snprintf (path , MAXPGPATH , "pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2499
+ NameStr (MyReplicationSlot -> data .name ),
2500
+ xid ,
2501
+ (uint32 ) (recptr >> 32 ), (uint32 ) recptr );
2502
+ }
2503
+
2445
2504
/*
2446
2505
* Delete all data spilled to disk after we've restarted/crashed. It will be
2447
2506
* recreated when the respective slots are reused.
@@ -2452,15 +2511,9 @@ StartupReorderBuffer(void)
2452
2511
DIR * logical_dir ;
2453
2512
struct dirent * logical_de ;
2454
2513
2455
- DIR * spill_dir ;
2456
- struct dirent * spill_de ;
2457
-
2458
2514
logical_dir = AllocateDir ("pg_replslot" );
2459
2515
while ((logical_de = ReadDir (logical_dir , "pg_replslot" )) != NULL )
2460
2516
{
2461
- struct stat statbuf ;
2462
- char path [MAXPGPATH * 2 + 12 ];
2463
-
2464
2517
if (strcmp (logical_de -> d_name , "." ) == 0 ||
2465
2518
strcmp (logical_de -> d_name , ".." ) == 0 )
2466
2519
continue ;
@@ -2473,33 +2526,7 @@ StartupReorderBuffer(void)
2473
2526
* ok, has to be a surviving logical slot, iterate and delete
2474
2527
* everything starting with xid-*
2475
2528
*/
2476
- sprintf (path , "pg_replslot/%s" , logical_de -> d_name );
2477
-
2478
- /* we're only creating directories here, skip if it's not our's */
2479
- if (lstat (path , & statbuf ) == 0 && !S_ISDIR (statbuf .st_mode ))
2480
- continue ;
2481
-
2482
- spill_dir = AllocateDir (path );
2483
- while ((spill_de = ReadDir (spill_dir , path )) != NULL )
2484
- {
2485
- if (strcmp (spill_de -> d_name , "." ) == 0 ||
2486
- strcmp (spill_de -> d_name , ".." ) == 0 )
2487
- continue ;
2488
-
2489
- /* only look at names that can be ours */
2490
- if (strncmp (spill_de -> d_name , "xid" , 3 ) == 0 )
2491
- {
2492
- sprintf (path , "pg_replslot/%s/%s" , logical_de -> d_name ,
2493
- spill_de -> d_name );
2494
-
2495
- if (unlink (path ) != 0 )
2496
- ereport (PANIC ,
2497
- (errcode_for_file_access (),
2498
- errmsg ("could not remove file \"%s\": %m" ,
2499
- path )));
2500
- }
2501
- }
2502
- FreeDir (spill_dir );
2529
+ ReorderBufferCleanupSerializedTXNs (logical_de -> d_name );
2503
2530
}
2504
2531
FreeDir (logical_dir );
2505
2532
}
0 commit comments