54
54
#include "utils/guc.h"
55
55
#include "utils/rel.h"
56
56
#include "utils/builtins.h"
57
+ #include "utils/memutils.h"
57
58
#include "utils/resowner_private.h"
58
59
59
60
@@ -603,7 +604,10 @@ static bool cfs_recover(FileMap* map, int md,
603
604
else if (pg_fsync (md ) < 0 )
604
605
elog (WARNING , "CFS failed to sync map %s: %m" , map_path );
605
606
else
607
+ {
606
608
ok = true;
609
+ map -> generation += 1 ; /* it is here just for pg_probackup */
610
+ }
607
611
close (md2 );
608
612
}
609
613
else
@@ -851,6 +855,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
851
855
char * file_path = (char * )palloc (suf + 1 );
852
856
char * map_bck_path = (char * )palloc (suf + 10 );
853
857
char * file_bck_path = (char * )palloc (suf + 5 );
858
+ char * state ;
854
859
int rc ;
855
860
856
861
pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc , 1 );
@@ -859,6 +864,9 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
859
864
if (pg_atomic_read_u32 (& cfs_state -> gc_disabled ) != 0 )
860
865
{
861
866
pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc , 1 );
867
+ pfree (file_path );
868
+ pfree (map_bck_path );
869
+ pfree (file_bck_path );
862
870
return false;
863
871
}
864
872
}
@@ -869,6 +877,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
869
877
{
870
878
pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc , 1 );
871
879
880
+ pgstat_report_activity (STATE_DISABLED , "GC is disabled" );
872
881
rc = WaitLatch (MyLatch ,
873
882
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
874
883
CFS_DISABLE_TIMEOUT /* ms */ );
@@ -907,13 +916,17 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
907
916
strcat (strcpy (map_bck_path , map_path ), ".bck" );
908
917
strcat (strcpy (file_bck_path , file_path ), ".bck" );
909
918
919
+ state = psprintf ("Check file %s" , file_path );
920
+ pgstat_report_activity (STATE_RUNNING , state );
921
+ pfree (state );
922
+
910
923
/* mostly same as for cfs_lock_file */
911
924
if (pg_atomic_read_u32 (& map -> gc_active )) /* Check if GC was not normally completed at previous Postgres run */
912
925
{
913
926
/* there could not be concurrent GC for this file here, so recover */
914
927
if (!cfs_recover (map , md , file_path , map_path , file_bck_path , map_bck_path ))
915
928
{
916
- elog (ERROR , "CFS found that file %s is completely destroyed" , file_path );
929
+ elog (WARNING , "CFS found that file %s is completely destroyed" , file_path );
917
930
goto FinUnmap ;
918
931
}
919
932
}
@@ -936,6 +949,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
936
949
uint32 second_pass_bytes = 0 ;
937
950
inode_t * * inodes = (inode_t * * )palloc (RELSEG_SIZE * sizeof (inode_t * ));
938
951
bool remove_backups = true;
952
+ bool got_lock = false;
939
953
int second_pass_whole = 0 ;
940
954
int n_pages , n_pages1 ;
941
955
TimestampTz startTime , secondTime , endTime ;
@@ -950,6 +964,10 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
950
964
951
965
lock = cfs_get_lock (file_path );
952
966
967
+ state = psprintf ("Process file %s" , file_path );
968
+ pgstat_report_activity (STATE_RUNNING , state );
969
+ pfree (state );
970
+
953
971
fd2 = open (file_bck_path , O_CREAT |O_RDWR |PG_BINARY |O_TRUNC , 0600 );
954
972
if (fd2 < 0 )
955
973
{
@@ -991,6 +1009,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
991
1009
retry :
992
1010
/* temporary lock file for fetching map snapshot */
993
1011
cfs_gc_lock (lock );
1012
+ got_lock = true;
994
1013
995
1014
/* Reread variables after locking file */
996
1015
physSize = pg_atomic_read_u32 (& map -> hdr .physSize );
@@ -1006,6 +1025,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1006
1025
}
1007
1026
/* may unlock until second phase */
1008
1027
cfs_gc_unlock (lock );
1028
+ got_lock = false;
1009
1029
1010
1030
if (!cfs_copy_inodes (inodes , n_pages , fd , fd2 , & writeback , & newSize ,
1011
1031
file_path , file_bck_path ))
@@ -1027,6 +1047,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1027
1047
secondTime = GetCurrentTimestamp ();
1028
1048
1029
1049
cfs_gc_lock (lock );
1050
+ got_lock = true;
1030
1051
1031
1052
/* Reread variables after locking file */
1032
1053
n_pages1 = n_pages ;
@@ -1080,6 +1101,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1080
1101
if (second_pass_whole == 1 && physSize < CFS_RETRY_GC_THRESHOLD )
1081
1102
{
1082
1103
cfs_gc_unlock (lock );
1104
+ got_lock = false;
1083
1105
/* sleep, cause there is possibly checkpoint is on a way */
1084
1106
pg_usleep (CFS_LOCK_MAX_TIMEOUT );
1085
1107
second_pass = 0 ;
@@ -1278,7 +1300,8 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1278
1300
else
1279
1301
remove_backups = true; /* we don't need backups anymore */
1280
1302
1281
- cfs_gc_unlock (lock );
1303
+ if (got_lock )
1304
+ cfs_gc_unlock (lock );
1282
1305
1283
1306
/* remove map backup file */
1284
1307
if (remove_backups && unlink (map_bck_path ))
@@ -1300,9 +1323,6 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1300
1323
secs2 * USECS_PER_SEC + usecs2 );
1301
1324
}
1302
1325
1303
- pfree (file_path );
1304
- pfree (file_bck_path );
1305
- pfree (map_bck_path );
1306
1326
pfree (inodes );
1307
1327
pfree (newMap );
1308
1328
@@ -1325,6 +1345,9 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1325
1345
}
1326
1346
1327
1347
FinishGC :
1348
+ pfree (file_path );
1349
+ pfree (file_bck_path );
1350
+ pfree (map_bck_path );
1328
1351
if (background == CFS_BACKGROUND )
1329
1352
{
1330
1353
LWLockRelease (CfsGcLock );
@@ -1333,10 +1356,12 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
1333
1356
1334
1357
if (background == CFS_BACKGROUND )
1335
1358
{
1336
- int rc = WaitLatch (MyLatch ,
1337
- WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
1338
- performed ? cfs_gc_delay : 0 /* ms */ );
1339
- if (rc & WL_POSTMASTER_DEATH )
1359
+ int rc ;
1360
+ pgstat_report_activity (STATE_IDLE , "Processing pause" );
1361
+ rc = WaitLatch (MyLatch ,
1362
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
1363
+ performed ? cfs_gc_delay : 0 /* ms */ );
1364
+ if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
1340
1365
exit (1 );
1341
1366
1342
1367
ResetLatch (MyLatch );
@@ -1401,6 +1426,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
1401
1426
static void cfs_gc_cancel (int sig )
1402
1427
{
1403
1428
cfs_gc_stop = true;
1429
+ SetLatch (MyLatch );
1404
1430
}
1405
1431
1406
1432
static void cfs_sighup (SIGNAL_ARGS )
@@ -1424,18 +1450,29 @@ static bool cfs_gc_scan_tablespace(int worker_id)
1424
1450
1425
1451
static void cfs_gc_bgworker_main (Datum arg )
1426
1452
{
1453
+ MemoryContext MemCxt ;
1427
1454
int worker_id = DatumGetInt32 (arg );
1455
+ char * appname ;
1428
1456
1429
1457
pqsignal (SIGINT , cfs_gc_cancel );
1430
1458
pqsignal (SIGQUIT , cfs_gc_cancel );
1431
1459
pqsignal (SIGTERM , cfs_gc_cancel );
1432
1460
pqsignal (SIGHUP , cfs_sighup );
1433
1461
1462
+ InitPostgres (NULL , InvalidOid , NULL , InvalidOid , NULL );
1463
+
1464
+ appname = psprintf ("CFS GC worker %d" , worker_id );
1465
+ pgstat_report_appname (appname );
1466
+ pfree (appname );
1467
+
1434
1468
/* We're now ready to receive signals */
1435
1469
BackgroundWorkerUnblockSignals ();
1436
1470
1437
1471
elog (INFO , "Start CFS garbage collector %d (enabled=%d)" , MyProcPid , cfs_state -> background_gc_enabled );
1438
1472
1473
+ MemCxt = AllocSetContextCreate (TopMemoryContext , "CFS worker ctx" ,
1474
+ ALLOCSET_DEFAULT_SIZES );
1475
+ MemoryContextSwitchTo (MemCxt );
1439
1476
while (true)
1440
1477
{
1441
1478
int timeout = cfs_gc_period ;
@@ -1445,14 +1482,16 @@ static void cfs_gc_bgworker_main(Datum arg)
1445
1482
{
1446
1483
timeout = CFS_RETRY_TIMEOUT ;
1447
1484
}
1485
+ MemoryContextReset (MemCxt );
1448
1486
if (cfs_gc_stop || -- cfs_state -> max_iterations <= 0 )
1449
1487
{
1450
1488
break ;
1451
1489
}
1490
+ pgstat_report_activity (STATE_IDLE , "Pause between GC iterations" );
1452
1491
rc = WaitLatch (MyLatch ,
1453
1492
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
1454
1493
timeout /* ms */ );
1455
- if (rc & WL_POSTMASTER_DEATH )
1494
+ if (( rc & WL_POSTMASTER_DEATH ) || cfs_gc_stop )
1456
1495
exit (1 );
1457
1496
1458
1497
ResetLatch (MyLatch );
@@ -1508,7 +1547,7 @@ void cfs_control_gc_lock(void)
1508
1547
}
1509
1548
1510
1549
/* Enable garbage collection. */
1511
- void cfs_control_gc_unlock (void )
1550
+ void cfs_control_gc_unlock () /* argument could be given by PG_ENSURE_ERROR_CLEANUP */
1512
1551
{
1513
1552
pg_atomic_fetch_sub_u32 (& cfs_state -> gc_disabled , 1 );
1514
1553
}
@@ -1834,18 +1873,18 @@ void cfs_recover_map(FileMap* map)
1834
1873
}
1835
1874
usedSize += size ;
1836
1875
}
1837
- if ( usedSize != pg_atomic_read_u32 ( & map -> hdr . usedSize ))
1838
- {
1839
- pg_atomic_write_u32 ( & map -> hdr . usedSize , usedSize );
1840
- }
1841
- if ( physSize != pg_atomic_read_u32 ( & map -> hdr . physSize ))
1842
- {
1843
- pg_atomic_write_u32 ( & map -> hdr . physSize , physSize );
1844
- }
1845
- if ( virtSize != pg_atomic_read_u32 ( & map -> hdr . virtSize ))
1846
- {
1847
- pg_atomic_write_u32 ( & map -> hdr . virtSize , virtSize );
1848
- }
1876
+ }
1877
+ if ( usedSize != pg_atomic_read_u32 ( & map -> hdr . usedSize ))
1878
+ {
1879
+ pg_atomic_write_u32 ( & map -> hdr . usedSize , usedSize );
1880
+ }
1881
+ if ( physSize != pg_atomic_read_u32 ( & map -> hdr . physSize ))
1882
+ {
1883
+ pg_atomic_write_u32 ( & map -> hdr . physSize , physSize );
1884
+ }
1885
+ if ( virtSize != pg_atomic_read_u32 ( & map -> hdr . virtSize ))
1886
+ {
1887
+ pg_atomic_write_u32 ( & map -> hdr . virtSize , virtSize );
1849
1888
}
1850
1889
}
1851
1890
@@ -1869,3 +1908,8 @@ Datum cfs_gc_activity_scanned_files(PG_FUNCTION_ARGS)
1869
1908
{
1870
1909
PG_RETURN_INT64 (cfs_state -> gc_stat .scannedFiles );
1871
1910
}
1911
+
1912
+ void cfs_on_exit_callback (int code , Datum arg )
1913
+ {
1914
+ cfs_control_gc_unlock ();
1915
+ }
0 commit comments