Skip to content

Commit b485470

Browse files
committed
Force CFS GC if size of fiel exceeds 2Gb threshold
1 parent 020258a commit b485470

File tree

3 files changed

+68
-41
lines changed

3 files changed

+68
-41
lines changed

src/backend/storage/file/cfs.c

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,9 @@ static void cfs_crypto_init(void)
257257
uint8 aes_key[32] = {0}; /* at most 256 bits */
258258

259259
cipher_key = getenv("PG_CIPHER_KEY");
260-
if (cipher_key == NULL) {
260+
if (cipher_key == NULL) {
261261
elog(ERROR, "PG_CIPHER_KEY environment variable is not set");
262-
}
262+
}
263263
unsetenv("PG_CIPHER_KEY"); /* disable inspection of this environment variable */
264264
key_length = strlen(cipher_key);
265265

@@ -406,11 +406,11 @@ void cfs_initialize()
406406
cfs_state->n_workers = 0;
407407
cfs_state->gc_enabled = cfs_gc_enabled;
408408
cfs_state->max_iterations = 0;
409-
409+
410410
if (cfs_encryption)
411411
cfs_crypto_init();
412-
413-
elog(LOG, "Start CFS version %s compression algorithm %s encryption %s GC %s",
412+
413+
elog(LOG, "Start CFS version %s compression algorithm %s encryption %s GC %s",
414414
CFS_VERSION, cfs_algorithm(), cfs_encryption ? "enabled" : "disabled", cfs_gc_enabled ? "enabled" : "disabled");
415415
}
416416
}
@@ -426,19 +426,19 @@ int cfs_msync(FileMap* map)
426426
FileMap* cfs_mmap(int md)
427427
{
428428
FileMap* map;
429-
if (ftruncate(md, sizeof(FileMap)) != 0)
429+
if (ftruncate(md, sizeof(FileMap)) != 0)
430430
{
431431
return (FileMap*)MAP_FAILED;
432432
}
433-
433+
434434
#ifdef WIN32
435435
{
436-
HANDLE mh = CreateFileMapping(_get_osfhandle(md), NULL, PAGE_READWRITE,
436+
HANDLE mh = CreateFileMapping(_get_osfhandle(md), NULL, PAGE_READWRITE,
437437
0, (DWORD)sizeof(FileMap), NULL);
438438
if (mh == NULL)
439439
return (FileMap*)MAP_FAILED;
440440

441-
map = (FileMap*)MapViewOfFile(mh, FILE_MAP_ALL_ACCESS, 0, 0, 0);
441+
map = (FileMap*)MapViewOfFile(mh, FILE_MAP_ALL_ACCESS, 0, 0, 0);
442442
CloseHandle(mh);
443443
}
444444
if (map == NULL)
@@ -499,7 +499,7 @@ static bool cfs_read_file(int fd, void* data, uint32 size)
499499
else
500500
offs += rc;
501501
} while (offs < size);
502-
502+
503503
return true;
504504
}
505505

@@ -519,7 +519,7 @@ static bool cfs_write_file(int fd, void const* data, uint32 size)
519519
else
520520
offs += rc;
521521
} while (offs < size);
522-
522+
523523
return true;
524524
}
525525

@@ -559,23 +559,23 @@ void cfs_lock_file(FileMap* map, char const* file_path)
559559
break;
560560
}
561561

562-
if (pg_atomic_read_u32(&cfs_state->n_active_gc) == 0)
563-
{
562+
if (pg_atomic_read_u32(&cfs_state->n_active_gc) == 0)
563+
{
564564
/* There is no active GC, so lock is set by crashed GC */
565565

566566
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent race condition with GC */
567567

568568
/* Recheck under CfsGcLock that map->lock was not released */
569-
if (pg_atomic_read_u32(&map->lock) >= CFS_GC_LOCK)
569+
if (pg_atomic_read_u32(&map->lock) >= CFS_GC_LOCK)
570570
{
571571
/* Uhhh... looks like last GC was interrupted.
572572
* Try to recover the file.
573573
*/
574574
char* map_bck_path = psprintf("%s.cfm.bck", file_path);
575575
char* file_bck_path = psprintf("%s.bck", file_path);
576-
576+
577577
elog(WARNING, "CFS indicates that GC of %s was interrupted: trying to perform recovery", file_path);
578-
578+
579579
if (access(file_bck_path, R_OK) != 0)
580580
{
581581
/* There is no backup file: new map should be constructed */
@@ -585,20 +585,20 @@ void cfs_lock_file(FileMap* map, char const* file_path)
585585
/* Recover map. */
586586
if (!cfs_read_file(md2, map, sizeof(FileMap)))
587587
elog(WARNING, "CFS failed to read file %s: %m", map_bck_path);
588-
588+
589589
close(md2);
590590
}
591591
}
592592
else
593593
{
594594
/* Presence of backup file means that we still have
595-
* unchanged data and map files. Just remove backup files and
595+
* unchanged data and map files. Just remove backup files and
596596
* revoke GC lock.
597597
*/
598598
unlink(file_bck_path);
599599
unlink(map_bck_path);
600600
}
601-
601+
602602
count = pg_atomic_fetch_sub_u32(&map->lock, CFS_GC_LOCK); /* revoke GC lock */
603603
Assert((int)count > 0);
604604
pfree(file_bck_path);
@@ -634,12 +634,13 @@ void cfs_unlock_file(FileMap* map)
634634
/*
635635
* Sort pages by offset to improve access locality
636636
*/
637-
static int cfs_cmp_page_offs(void const* p1, void const* p2)
637+
static int cfs_cmp_page_offs(void const* p1, void const* p2)
638638
{
639639
uint32 o1 = CFS_INODE_OFFS(**(inode_t**)p1);
640640
uint32 o2 = CFS_INODE_OFFS(**(inode_t**)p2);
641641
return o1 < o2 ? -1 : o1 == o2 ? 0 : 1;
642642
}
643+
643644
/*
644645
* Perform garbage collection (if required) on the file
645646
* @param map_path - path to the map file (*.cfm).
@@ -658,7 +659,7 @@ static bool cfs_gc_file(char* map_path, bool background)
658659
int md2 = -1;
659660
bool succeed = false;
660661
int rc;
661-
662+
662663

663664
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
664665

@@ -667,13 +668,13 @@ static bool cfs_gc_file(char* map_path, bool background)
667668
while (!cfs_state->gc_enabled)
668669
{
669670
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
670-
671+
671672
rc = WaitLatch(MyLatch,
672673
WL_TIMEOUT | WL_POSTMASTER_DEATH,
673674
CFS_DISABLE_TIMEOUT /* ms */);
674675
if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH))
675676
exit(1);
676-
677+
677678
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
678679
}
679680

@@ -682,7 +683,7 @@ static bool cfs_gc_file(char* map_path, bool background)
682683

683684
md = open(map_path, O_RDWR|PG_BINARY, 0);
684685
if (md < 0)
685-
{
686+
{
686687
elog(DEBUG1, "CFS failed to open map file %s: %m", map_path);
687688
goto FinishGC;
688689
}
@@ -699,7 +700,7 @@ static bool cfs_gc_file(char* map_path, bool background)
699700
usedSize = pg_atomic_read_u32(&map->usedSize);
700701
physSize = pg_atomic_read_u32(&map->physSize);
701702
virtSize = pg_atomic_read_u32(&map->virtSize);
702-
703+
703704
cfs_state->gc_stat.scannedFiles += 1;
704705

705706
/* do we need to perform defragmentation? */
@@ -804,7 +805,7 @@ static bool cfs_gc_file(char* map_path, bool background)
804805
}
805806
/* sort inodes by offset to improve read locality */
806807
qsort(inodes, n_pages, sizeof(inode_t*), cfs_cmp_page_offs);
807-
808+
808809
fd = open(file_path, O_RDONLY|PG_BINARY, 0);
809810
if (fd < 0)
810811
goto Cleanup;
@@ -927,9 +928,8 @@ static bool cfs_gc_file(char* map_path, bool background)
927928
if (res != BLCKSZ)
928929
{
929930
pg_atomic_fetch_sub_u32(&map->lock, CFS_GC_LOCK); /* release lock */
930-
/* TODO Is it worth to PANIC or ERROR will be enough? */
931-
elog(PANIC, "Verification failed for block %d of relation %s: error code %d",
932-
i, file_bck_path, (int)res);
931+
elog(ERROR, "Verification failed for block %d position %d size %d of relation %s: error code %d",
932+
i, (int)CFS_INODE_OFFS(inode), size, file_bck_path, (int)res);
933933
}
934934
}
935935
}
@@ -977,7 +977,7 @@ static bool cfs_gc_file(char* map_path, bool background)
977977
if (remove_backups)
978978
{
979979
unlink(file_bck_path);
980-
unlink(map_bck_path);
980+
unlink(map_bck_path);
981981
remove_backups = false;
982982
}
983983
succeed = false;
@@ -1070,7 +1070,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
10701070
/* If we have found a map file, run gc worker on it.
10711071
* Otherwise, try to gc the directory recursively.
10721072
*/
1073-
if (len > 4 &&
1073+
if (len > 4 &&
10741074
strcmp(file_path + len - 4, ".cfm") == 0)
10751075
{
10761076
if (entry->d_ino % cfs_state->n_workers == worker_id
@@ -1080,7 +1080,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
10801080
break;
10811081
}
10821082
}
1083-
else if (!cfs_gc_directory(worker_id, file_path))
1083+
else if (!cfs_gc_directory(worker_id, file_path))
10841084
{
10851085
success = false;
10861086
break;
@@ -1124,7 +1124,7 @@ static void cfs_gc_bgworker_main(Datum arg)
11241124
int timeout = cfs_gc_period;
11251125
int rc;
11261126

1127-
if (!cfs_gc_scan_tablespace(worker_id))
1127+
if (!cfs_gc_scan_tablespace(worker_id))
11281128
{
11291129
timeout = CFS_RETRY_TIMEOUT;
11301130
}
@@ -1148,7 +1148,7 @@ void cfs_gc_start_bgworkers()
11481148

11491149
for (i = 0; i < cfs_gc_workers; i++)
11501150
{
1151-
BackgroundWorker worker;
1151+
BackgroundWorker worker;
11521152
BackgroundWorkerHandle* handle;
11531153
MemSet(&worker, 0, sizeof(worker));
11541154
sprintf(worker.bgw_name, "cfs-worker-%d", i);
@@ -1165,9 +1165,9 @@ void cfs_gc_start_bgworkers()
11651165
}
11661166

11671167
/* Enable/disable garbage colection. */
1168-
bool cfs_control_gc(bool enabled)
1168+
bool cfs_control_gc(bool enabled)
11691169
{
1170-
bool was_enabled = cfs_state->gc_enabled;
1170+
bool was_enabled = cfs_state->gc_enabled;
11711171
cfs_state->gc_enabled = enabled;
11721172
if (was_enabled && !enabled)
11731173
{
@@ -1272,7 +1272,7 @@ Datum cfs_estimate(PG_FUNCTION_ARGS)
12721272
off_t step = rc / BLCKSZ / CFS_ESTIMATE_PROBES * BLCKSZ;
12731273
for (i = 0; i < CFS_ESTIMATE_PROBES; i++)
12741274
{
1275-
rc = lseek(fd, step*i, SEEK_SET);
1275+
rc = lseek(fd, step*i, SEEK_SET);
12761276
if (rc < 0)
12771277
break;
12781278

@@ -1406,7 +1406,7 @@ Datum cfs_gc_relation(PG_FUNCTION_ARGS)
14061406
Oid oid = PG_GETARG_OID(0);
14071407
Relation rel = try_relation_open(oid, AccessShareLock);
14081408
int processed_segments = 0;
1409-
1409+
14101410
if (rel != NULL)
14111411
{
14121412
char* path;
@@ -1420,7 +1420,7 @@ Datum cfs_gc_relation(PG_FUNCTION_ARGS)
14201420
path = relpathbackend(rel->rd_node, rel->rd_backend, MAIN_FORKNUM);
14211421
map_path = (char*)palloc(strlen(path) + 16);
14221422
sprintf(map_path, "%s.cfm", path);
1423-
1423+
14241424
while (cfs_gc_file(map_path, false))
14251425
{
14261426
sprintf(map_path, "%s.%u.cfm", path, ++i);
@@ -1436,6 +1436,21 @@ Datum cfs_gc_relation(PG_FUNCTION_ARGS)
14361436
PG_RETURN_INT32(cfs_gc_processed_segments);
14371437
}
14381438

1439+
1440+
void cfs_gc_segment(char const* fileName)
1441+
{
1442+
char* mapFileName = psprintf("%s.cfm", fileName);
1443+
1444+
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
1445+
1446+
cfs_gc_file(mapFileName, false);
1447+
1448+
LWLockRelease(CfsGcLock);
1449+
1450+
pfree(mapFileName);
1451+
}
1452+
1453+
14391454
Datum cfs_gc_activity_processed_bytes(PG_FUNCTION_ARGS)
14401455
{
14411456
PG_RETURN_INT64(cfs_state->gc_stat.processedBytes);

src/backend/storage/file/fd.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1839,6 +1839,7 @@ FileWrite(File file, char *buffer, int amount)
18391839
inode_t inode = 0;
18401840
/*inode_t prev_inode;*/
18411841
off_t seekPos;
1842+
uint32 pos = 0;
18421843

18431844
Assert(FileIsValid(file));
18441845

@@ -1878,7 +1879,6 @@ FileWrite(File file, char *buffer, int amount)
18781879

18791880
if (VfdCache[file].fileFlags & PG_COMPRESSION)
18801881
{
1881-
uint32 pos;
18821882
FileMap* map = VfdCache[file].map;
18831883
uint32 compressedSize;
18841884
Assert(amount == BLCKSZ);
@@ -2006,8 +2006,18 @@ FileWrite(File file, char *buffer, int amount)
20062006
}
20072007

20082008
if (VfdCache[file].fileFlags & PG_COMPRESSION)
2009+
{
20092010
cfs_unlock_file(VfdCache[file].map);
2010-
2011+
/*
2012+
* If GC is disabled for a long time, then faile can unlimited grow.
2013+
* To avoid wrap aound of 32-bit offsets we force GC on this file when destination position
2014+
* cross 2Gb boundary.
2015+
*/
2016+
if ((int32)pos >= 0 && (int32)(pos + amount) < 0)
2017+
{
2018+
cfs_gc_segment(VfdCache[file].fileName);
2019+
}
2020+
}
20112021
return returnCode;
20122022
}
20132023

src/include/storage/cfs.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ int cfs_shmem_size(void);
124124
void cfs_encrypt(const char* fname, void* block, uint32 offs, uint32 size);
125125
void cfs_decrypt(const char* fname, void* block, uint32 offs, uint32 size);
126126

127+
void cfs_gc_segment(char const* name);
128+
127129
extern CfsState* cfs_state;
128130

129131
extern int cfs_level;

0 commit comments

Comments
 (0)