Skip to content

Commit 56d399c

Browse files
committed
Merge branch 'PGPROEE9_6_CFS_385' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into PGPROEE9_6_CFS_385
2 parents 668cba1 + 37484ce commit 56d399c

File tree

8 files changed

+125
-80
lines changed

8 files changed

+125
-80
lines changed

src/backend/access/transam/xlog.c

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ int wal_level = WAL_LEVEL_MINIMAL;
103103
int CommitDelay = 0; /* precommit delay in microseconds */
104104
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
105105
int wal_retrieve_retry_interval = 5000;
106+
bool cfs_lock_taken = false;
106107

107108
TransactionId start_xid = 0;
108109
MultiXactId start_mx_id = 0;
@@ -9894,6 +9895,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
98949895
WALInsertLockRelease();
98959896

98969897
cfs_control_gc_lock(); /* disable GC during backup */
9898+
cfs_lock_taken = true;
98979899

98989900
/* Ensure we release forcePageWrites if fail below */
98999901
PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
@@ -10272,7 +10274,11 @@ pg_start_backup_callback(int code, Datum arg)
1027210274
}
1027310275
WALInsertLockRelease();
1027410276

10275-
cfs_control_gc_unlock(); /* Restore CFS GC activity */
10277+
if (cfs_lock_taken)
10278+
{
10279+
cfs_lock_taken = false;
10280+
cfs_control_gc_unlock(); /* Restore CFS GC activity */
10281+
}
1027610282
}
1027710283

1027810284
/*
@@ -10472,6 +10478,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
1047210478
/* Clean up session-level lock */
1047310479
sessionBackupState = SESSION_BACKUP_NONE;
1047410480

10481+
cfs_lock_taken = false;
1047510482
cfs_control_gc_unlock(); /* Restore CFS GC activity */
1047610483

1047710484
/*
@@ -10717,7 +10724,11 @@ do_pg_abort_backup(void)
1071710724
}
1071810725
WALInsertLockRelease();
1071910726

10720-
cfs_control_gc_unlock(); /* Restore CFS GC activity */
10727+
if (cfs_lock_taken)
10728+
{
10729+
cfs_lock_taken = false;
10730+
cfs_control_gc_unlock(); /* Restore CFS GC activity */
10731+
}
1072110732
}
1072210733

1072310734
/*

src/backend/storage/file/cfs.c

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
#include "utils/guc.h"
5555
#include "utils/rel.h"
5656
#include "utils/builtins.h"
57+
#include "utils/memutils.h"
5758
#include "utils/resowner_private.h"
5859

5960

@@ -603,7 +604,10 @@ static bool cfs_recover(FileMap* map, int md,
603604
else if (pg_fsync(md) < 0)
604605
elog(WARNING, "CFS failed to sync map %s: %m", map_path);
605606
else
607+
{
606608
ok = true;
609+
map->generation += 1; /* it is here just for pg_probackup */
610+
}
607611
close(md2);
608612
}
609613
else
@@ -851,6 +855,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
851855
char* file_path = (char*)palloc(suf+1);
852856
char* map_bck_path = (char*)palloc(suf+10);
853857
char* file_bck_path = (char*)palloc(suf+5);
858+
char* state;
854859
int rc;
855860

856861
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
@@ -859,6 +864,9 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
859864
if (pg_atomic_read_u32(&cfs_state->gc_disabled) != 0)
860865
{
861866
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
867+
pfree(file_path);
868+
pfree(map_bck_path);
869+
pfree(file_bck_path);
862870
return false;
863871
}
864872
}
@@ -869,6 +877,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
869877
{
870878
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
871879

880+
pgstat_report_activity(STATE_DISABLED, "GC is disabled");
872881
rc = WaitLatch(MyLatch,
873882
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
874883
CFS_DISABLE_TIMEOUT /* ms */);
@@ -907,13 +916,17 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
907916
strcat(strcpy(map_bck_path, map_path), ".bck");
908917
strcat(strcpy(file_bck_path, file_path), ".bck");
909918

919+
state = psprintf("Check file %s", file_path);
920+
pgstat_report_activity(STATE_RUNNING, state);
921+
pfree(state);
922+
910923
/* mostly same as for cfs_lock_file */
911924
if (pg_atomic_read_u32(&map->gc_active)) /* Check if GC was not normally completed at previous Postgres run */
912925
{
913926
/* there could not be concurrent GC for this file here, so recover */
914927
if (!cfs_recover(map, md, file_path, map_path, file_bck_path, map_bck_path))
915928
{
916-
elog(ERROR, "CFS found that file %s is completely destroyed", file_path);
929+
elog(WARNING, "CFS found that file %s is completely destroyed", file_path);
917930
goto FinUnmap;
918931
}
919932
}
@@ -936,6 +949,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
936949
uint32 second_pass_bytes = 0;
937950
inode_t** inodes = (inode_t**)palloc(RELSEG_SIZE*sizeof(inode_t*));
938951
bool remove_backups = true;
952+
bool got_lock = false;
939953
int second_pass_whole = 0;
940954
int n_pages, n_pages1;
941955
TimestampTz startTime, secondTime, endTime;
@@ -950,6 +964,10 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
950964

951965
lock = cfs_get_lock(file_path);
952966

967+
state = psprintf("Process file %s", file_path);
968+
pgstat_report_activity(STATE_RUNNING, state);
969+
pfree(state);
970+
953971
fd2 = open(file_bck_path, O_CREAT|O_RDWR|PG_BINARY|O_TRUNC, 0600);
954972
if (fd2 < 0)
955973
{
@@ -991,6 +1009,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
9911009
retry:
9921010
/* temporary lock file for fetching map snapshot */
9931011
cfs_gc_lock(lock);
1012+
got_lock = true;
9941013

9951014
/* Reread variables after locking file */
9961015
physSize = pg_atomic_read_u32(&map->hdr.physSize);
@@ -1006,6 +1025,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
10061025
}
10071026
/* may unlock until second phase */
10081027
cfs_gc_unlock(lock);
1028+
got_lock = false;
10091029

10101030
if (!cfs_copy_inodes(inodes, n_pages, fd, fd2, &writeback, &newSize,
10111031
file_path, file_bck_path))
@@ -1027,6 +1047,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
10271047
secondTime = GetCurrentTimestamp();
10281048

10291049
cfs_gc_lock(lock);
1050+
got_lock = true;
10301051

10311052
/* Reread variables after locking file */
10321053
n_pages1 = n_pages;
@@ -1080,6 +1101,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
10801101
if (second_pass_whole == 1 && physSize < CFS_RETRY_GC_THRESHOLD)
10811102
{
10821103
cfs_gc_unlock(lock);
1104+
got_lock = false;
10831105
/* sleep, cause there is possibly checkpoint is on a way */
10841106
pg_usleep(CFS_LOCK_MAX_TIMEOUT);
10851107
second_pass = 0;
@@ -1278,7 +1300,8 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
12781300
else
12791301
remove_backups = true; /* we don't need backups anymore */
12801302

1281-
cfs_gc_unlock(lock);
1303+
if (got_lock)
1304+
cfs_gc_unlock(lock);
12821305

12831306
/* remove map backup file */
12841307
if (remove_backups && unlink(map_bck_path))
@@ -1300,9 +1323,6 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
13001323
secs2*USECS_PER_SEC + usecs2);
13011324
}
13021325

1303-
pfree(file_path);
1304-
pfree(file_bck_path);
1305-
pfree(map_bck_path);
13061326
pfree(inodes);
13071327
pfree(newMap);
13081328

@@ -1325,6 +1345,9 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
13251345
}
13261346

13271347
FinishGC:
1348+
pfree(file_path);
1349+
pfree(file_bck_path);
1350+
pfree(map_bck_path);
13281351
if (background == CFS_BACKGROUND)
13291352
{
13301353
LWLockRelease(CfsGcLock);
@@ -1333,10 +1356,12 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
13331356

13341357
if (background == CFS_BACKGROUND)
13351358
{
1336-
int rc = WaitLatch(MyLatch,
1337-
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1338-
performed ? cfs_gc_delay : 0 /* ms */ );
1339-
if (rc & WL_POSTMASTER_DEATH)
1359+
int rc;
1360+
pgstat_report_activity(STATE_IDLE, "Processing pause");
1361+
rc = WaitLatch(MyLatch,
1362+
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1363+
performed ? cfs_gc_delay : 0 /* ms */ );
1364+
if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH))
13401365
exit(1);
13411366

13421367
ResetLatch(MyLatch);
@@ -1401,6 +1426,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
14011426
static void cfs_gc_cancel(int sig)
14021427
{
14031428
cfs_gc_stop = true;
1429+
SetLatch(MyLatch);
14041430
}
14051431

14061432
static void cfs_sighup(SIGNAL_ARGS)
@@ -1424,18 +1450,29 @@ static bool cfs_gc_scan_tablespace(int worker_id)
14241450

14251451
static void cfs_gc_bgworker_main(Datum arg)
14261452
{
1453+
MemoryContext MemCxt;
14271454
int worker_id = DatumGetInt32(arg);
1455+
char* appname;
14281456

14291457
pqsignal(SIGINT, cfs_gc_cancel);
14301458
pqsignal(SIGQUIT, cfs_gc_cancel);
14311459
pqsignal(SIGTERM, cfs_gc_cancel);
14321460
pqsignal(SIGHUP, cfs_sighup);
14331461

1462+
InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
1463+
1464+
appname = psprintf("CFS GC worker %d", worker_id);
1465+
pgstat_report_appname(appname);
1466+
pfree(appname);
1467+
14341468
/* We're now ready to receive signals */
14351469
BackgroundWorkerUnblockSignals();
14361470

14371471
elog(INFO, "Start CFS garbage collector %d (enabled=%d)", MyProcPid, cfs_state->background_gc_enabled);
14381472

1473+
MemCxt = AllocSetContextCreate(TopMemoryContext, "CFS worker ctx",
1474+
ALLOCSET_DEFAULT_SIZES);
1475+
MemoryContextSwitchTo(MemCxt);
14391476
while (true)
14401477
{
14411478
int timeout = cfs_gc_period;
@@ -1445,14 +1482,16 @@ static void cfs_gc_bgworker_main(Datum arg)
14451482
{
14461483
timeout = CFS_RETRY_TIMEOUT;
14471484
}
1485+
MemoryContextReset(MemCxt);
14481486
if (cfs_gc_stop || --cfs_state->max_iterations <= 0)
14491487
{
14501488
break;
14511489
}
1490+
pgstat_report_activity(STATE_IDLE, "Pause between GC iterations");
14521491
rc = WaitLatch(MyLatch,
14531492
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
14541493
timeout /* ms */ );
1455-
if (rc & WL_POSTMASTER_DEATH)
1494+
if ((rc & WL_POSTMASTER_DEATH) || cfs_gc_stop)
14561495
exit(1);
14571496

14581497
ResetLatch(MyLatch);
@@ -1508,7 +1547,7 @@ void cfs_control_gc_lock(void)
15081547
}
15091548

15101549
/* Enable garbage collection. */
1511-
void cfs_control_gc_unlock(void)
1550+
void cfs_control_gc_unlock() /* argument could be given by PG_ENSURE_ERROR_CLEANUP */
15121551
{
15131552
pg_atomic_fetch_sub_u32(&cfs_state->gc_disabled, 1);
15141553
}
@@ -1834,18 +1873,18 @@ void cfs_recover_map(FileMap* map)
18341873
}
18351874
usedSize += size;
18361875
}
1837-
if (usedSize != pg_atomic_read_u32(&map->hdr.usedSize))
1838-
{
1839-
pg_atomic_write_u32(&map->hdr.usedSize, usedSize);
1840-
}
1841-
if (physSize != pg_atomic_read_u32(&map->hdr.physSize))
1842-
{
1843-
pg_atomic_write_u32(&map->hdr.physSize, physSize);
1844-
}
1845-
if (virtSize != pg_atomic_read_u32(&map->hdr.virtSize))
1846-
{
1847-
pg_atomic_write_u32(&map->hdr.virtSize, virtSize);
1848-
}
1876+
}
1877+
if (usedSize != pg_atomic_read_u32(&map->hdr.usedSize))
1878+
{
1879+
pg_atomic_write_u32(&map->hdr.usedSize, usedSize);
1880+
}
1881+
if (physSize != pg_atomic_read_u32(&map->hdr.physSize))
1882+
{
1883+
pg_atomic_write_u32(&map->hdr.physSize, physSize);
1884+
}
1885+
if (virtSize != pg_atomic_read_u32(&map->hdr.virtSize))
1886+
{
1887+
pg_atomic_write_u32(&map->hdr.virtSize, virtSize);
18491888
}
18501889
}
18511890

@@ -1869,3 +1908,8 @@ Datum cfs_gc_activity_scanned_files(PG_FUNCTION_ARGS)
18691908
{
18701909
PG_RETURN_INT64(cfs_state->gc_stat.scannedFiles);
18711910
}
1911+
1912+
void cfs_on_exit_callback(int code, Datum arg)
1913+
{
1914+
cfs_control_gc_unlock();
1915+
}

0 commit comments

Comments
 (0)