Skip to content

Commit 06c0486

Browse files
committed
Fix race condition in CFS GC
1 parent 17b656b commit 06c0486

File tree

6 files changed

+132
-35
lines changed

6 files changed

+132
-35
lines changed

doc/src/sgml/cfs.sgml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,13 @@
311311
It is calculated as sum of physical sizes of the files minus sum of used size of the files divided by sum of physical sizes of the files.
312312
</para>
313313

314+
<para>
315+
Particular relation can be defragmented using <varname>cfs_gc_relation(relation)</varname> function.
316+
This function can be used only if there are no active GC workers (<varname>cfs_gc_workers</varname> equals to zero).
317+
It returns number of defragmented segments of relation. If there is on relation with such OID or it is not compressed
318+
or some other GC process is active, 0 is returned.
319+
</para>
320+
314321
<para>
315322
There are several functions allowing to monitors garbage collection activity:
316323
<varname>cfs_gc_activity_scanned_files</varname> returns number of files scanned by GC,

src/backend/storage/file/cfs.c

Lines changed: 99 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ int cfs_gc_period;
5757
int cfs_gc_delay;
5858
int cfs_level;
5959
bool cfs_encryption;
60+
bool cfs_gc_verify_file;
6061

6162
static bool cfs_read_file(int fd, void* data, uint32 size);
6263
static bool cfs_write_file(int fd, void const* data, uint32 size);
@@ -65,7 +66,7 @@ static void cfs_start_background_gc(void);
6566
CfsState* cfs_state;
6667

6768
static bool cfs_stop;
68-
69+
static int cfs_processed_segments;
6970

7071
#if CFS_COMPRESSOR == SNAPPY_COMPRESSOR
7172

@@ -335,6 +336,9 @@ int cfs_munmap(FileMap* map)
335336
#endif
336337
}
337338

339+
/*
340+
* Protects file from GC
341+
*/
338342
void cfs_lock_file(FileMap* map, char const* file_path)
339343
{
340344
long delay = CFS_LOCK_MIN_TIMEOUT;
@@ -356,7 +360,7 @@ void cfs_lock_file(FileMap* map, char const* file_path)
356360
if (md2 >= 0) {
357361
/* Recover map */
358362
if (!cfs_read_file(md2, map, sizeof(FileMap))) {
359-
elog(LOG, "Failed to read file %s: %m", map_bck_path);
363+
elog(LOG, "CFS failed to read file %s: %m", map_bck_path);
360364
}
361365
close(md2);
362366
}
@@ -384,7 +388,7 @@ void cfs_lock_file(FileMap* map, char const* file_path)
384388
}
385389

386390
/*
387-
* Protects file from GC
391+
* Release file lock
388392
*/
389393
void cfs_unlock_file(FileMap* map)
390394
{
@@ -491,12 +495,12 @@ static bool cfs_gc_file(char* map_path)
491495
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
492496
}
493497
if (md < 0) {
494-
elog(LOG, "Failed to open map file %s: %m", map_path);
498+
elog(LOG, "CFS failed to open map file %s: %m", map_path);
495499
goto FinishGC;
496500
}
497501
map = cfs_mmap(md);
498502
if (map == MAP_FAILED) {
499-
elog(LOG, "Failed to map file %s: %m", map_path);
503+
elog(LOG, "CFS failed to map file %s: %m", map_path);
500504
close(md);
501505
goto FinishGC;
502506
}
@@ -517,7 +521,7 @@ static bool cfs_gc_file(char* map_path)
517521
uint32 newSize = 0;
518522
inode_t** inodes = (inode_t**)palloc(RELSEG_SIZE*sizeof(inode_t*));
519523
bool remove_backups = true;
520-
int n_pages = virtSize / BLCKSZ;
524+
int n_pages;
521525
TimestampTz startTime, endTime;
522526
long secs;
523527
int usecs;
@@ -549,7 +553,7 @@ static bool cfs_gc_file(char* map_path)
549553
if (md2 >= 0) {
550554
/* Recover map */
551555
if (!cfs_read_file(md2, newMap, sizeof(FileMap))) {
552-
elog(LOG, "Failed to read file %s: %m", map_bck_path);
556+
elog(LOG, "CFS failed to read file %s: %m", map_bck_path);
553557
goto Cleanup;
554558
}
555559
close(md2);
@@ -572,6 +576,12 @@ static bool cfs_gc_file(char* map_path)
572576
delay *= 2;
573577
}
574578
}
579+
/* Reread variables after lockign file */
580+
usedSize = pg_atomic_read_u32(&map->usedSize);
581+
physSize = pg_atomic_read_u32(&map->physSize);
582+
virtSize = pg_atomic_read_u32(&map->virtSize);
583+
n_pages = virtSize / BLCKSZ;
584+
575585
md2 = open(map_bck_path, O_CREAT|O_RDWR|PG_BINARY|O_TRUNC, 0600);
576586
if (md2 < 0) {
577587
goto Cleanup;
@@ -583,7 +593,7 @@ static bool cfs_gc_file(char* map_path)
583593
/* sort inodes by offset to improve read locality */
584594
qsort(inodes, n_pages, sizeof(inode_t*), cfs_cmp_page_offs);
585595

586-
fd = open(file_path, O_RDWR|PG_BINARY, 0);
596+
fd = open(file_path, O_RDONLY|PG_BINARY, 0);
587597
if (fd < 0) {
588598
goto Cleanup;
589599
}
@@ -593,6 +603,7 @@ static bool cfs_gc_file(char* map_path)
593603
goto Cleanup;
594604
}
595605
cfs_state->gc_stat.processedFiles += 1;
606+
cfs_processed_segments += 1;
596607

597608
for (i = 0; i < n_pages; i++) {
598609
int size = CFS_INODE_SIZE(*inodes[i]);
@@ -605,12 +616,12 @@ static bool cfs_gc_file(char* map_path)
605616
Assert(rc == offs);
606617

607618
if (!cfs_read_file(fd, block, size)) {
608-
elog(LOG, "Failed to read file %s: %m", file_path);
619+
elog(LOG, "CFS GC failed to read block %d of file %s at position %d size %d: %m", i, file_path, offs, size);
609620
goto Cleanup;
610621
}
611622

612623
if (!cfs_write_file(fd2, block, size)) {
613-
elog(LOG, "Failed to write file %s: %m", file_bck_path);
624+
elog(LOG, "CFS failed to write file %s: %m", file_bck_path);
614625
goto Cleanup;
615626
}
616627
cfs_state->gc_stat.processedBytes += size;
@@ -621,58 +632,84 @@ static bool cfs_gc_file(char* map_path)
621632
*inodes[i] = CFS_INODE(size, offs);
622633
}
623634
}
624-
pg_atomic_write_u32(&map->usedSize, newSize);
625-
626635
if (close(fd) < 0) {
627-
elog(LOG, "Failed to close file %s: %m", file_path);
636+
elog(LOG, "CFS failed to close file %s: %m", file_path);
628637
goto Cleanup;
629638
}
630639
fd = -1;
631640

632641
/* Persist copy of data file */
633642
if (pg_fsync(fd2) < 0) {
634-
elog(LOG, "Failed to sync file %s: %m", file_bck_path);
643+
elog(LOG, "CFS failed to sync file %s: %m", file_bck_path);
635644
goto Cleanup;
636645
}
637646
if (close(fd2) < 0) {
638-
elog(LOG, "Failed to close file %s: %m", file_bck_path);
647+
elog(LOG, " CFS failed to close file %s: %m", file_bck_path);
639648
goto Cleanup;
640649
}
641650
fd2 = -1;
642651

643652
/* Persist copy of map file */
644653
if (!cfs_write_file(md2, &newMap, sizeof(newMap))) {
645-
elog(LOG, "Failed to write file %s: %m", map_bck_path);
654+
elog(LOG, "CFS failed to write file %s: %m", map_bck_path);
646655
goto Cleanup;
647656
}
648657
if (pg_fsync(md2) < 0) {
649-
elog(LOG, "Failed to sync file %s: %m", map_bck_path);
658+
elog(LOG, "CFS failed to sync file %s: %m", map_bck_path);
650659
goto Cleanup;
651660
}
652661
if (close(md2) < 0) {
653-
elog(LOG, "Failed to close file %s: %m", map_bck_path);
662+
elog(LOG, "CFS failed to close file %s: %m", map_bck_path);
654663
goto Cleanup;
655664
}
656665
md2 = -1;
657666

658667
/* Persist map with CFS_GC_LOCK set: in case of crash we will know that map may be changed by GC */
659668
if (cfs_msync(map) < 0) {
660-
elog(LOG, "Failed to sync map %s: %m", map_path);
669+
elog(LOG, "CFS failed to sync map %s: %m", map_path);
661670
goto Cleanup;
662671
}
663672
if (pg_fsync(md) < 0) {
664-
elog(LOG, "Failed to sync file %s: %m", map_path);
673+
elog(LOG, "CFS failed to sync file %s: %m", map_path);
665674
goto Cleanup;
666675
}
667676

677+
678+
if (cfs_gc_verify_file) {
679+
fd = open(file_bck_path, O_RDONLY|PG_BINARY, 0);
680+
Assert(fd >= 0);
681+
682+
for (i = 0; i < n_pages; i++) {
683+
inode_t inode = newMap->inodes[i];
684+
int size = CFS_INODE_SIZE(inode);
685+
if (size != 0) {
686+
char block[BLCKSZ];
687+
char decomressedBlock[BLCKSZ];
688+
off_t res PG_USED_FOR_ASSERTS_ONLY;
689+
bool rc PG_USED_FOR_ASSERTS_ONLY;
690+
res = lseek(fd, CFS_INODE_OFFS(inode), SEEK_SET);
691+
Assert(res == (off_t)CFS_INODE_OFFS(inode));
692+
rc = cfs_read_file(fd, block, size);
693+
Assert(rc);
694+
cfs_decrypt(block, (off_t)i*BLCKSZ, size);
695+
res = cfs_decompress(decomressedBlock, BLCKSZ, block, size);
696+
if (res != BLCKSZ) {
697+
pg_atomic_fetch_sub_u32(&map->lock, CFS_GC_LOCK); /* release lock */
698+
elog(PANIC, "Verification failed for block %d of relation %s: error code %d", i, file_path, (int)res);
699+
}
700+
}
701+
}
702+
close(fd);
703+
}
704+
668705
/*
669706
* Now all information necessary for recovery is stored.
670707
* We are ready to replace existed file with defragmented one.
671708
* Use rename and rely on file system to provide atomicity of this operation.
672709
*/
673710
remove_backups = false;
674711
if (rename(file_bck_path, file_path) < 0) {
675-
elog(LOG, "Failed to rename file %s: %m", file_path);
712+
elog(LOG, "CFS failed to rename file %s: %m", file_path);
676713
goto Cleanup;
677714
}
678715
ReplaceMap:
@@ -685,11 +722,11 @@ static bool cfs_gc_file(char* map_path)
685722

686723
/* Before removing backup files and releasing locks we need to flush updated map file */
687724
if (cfs_msync(map) < 0) {
688-
elog(LOG, "Failed to sync map %s: %m", map_path);
725+
elog(LOG, "CFS failed to sync map %s: %m", map_path);
689726
goto Cleanup;
690727
}
691728
if (pg_fsync(md) < 0) {
692-
elog(LOG, "Failed to sync file %s: %m", map_path);
729+
elog(LOG, "CFS failed to sync file %s: %m", map_path);
693730
Cleanup:
694731
if (fd >= 0) close(fd);
695732
if (fd2 >= 0) close(fd2);
@@ -703,11 +740,12 @@ static bool cfs_gc_file(char* map_path)
703740
} else {
704741
remove_backups = true; /* now backups are not need any more */
705742
}
743+
706744
pg_atomic_fetch_sub_u32(&map->lock, CFS_GC_LOCK); /* release lock */
707745

708746
/* remove map backup file */
709747
if (remove_backups && unlink(map_bck_path)) {
710-
elog(LOG, "Failed to unlink file %s: %m", map_bck_path);
748+
elog(LOG, "CFS failed to unlink file %s: %m", map_bck_path);
711749
succeed = false;
712750
}
713751

@@ -739,11 +777,11 @@ static bool cfs_gc_file(char* map_path)
739777
}
740778

741779
if (cfs_munmap(map) < 0) {
742-
elog(LOG, "Failed to unmap file %s: %m", map_path);
780+
elog(LOG, "CFS failed to unmap file %s: %m", map_path);
743781
succeed = false;
744782
}
745783
if (close(md) < 0) {
746-
elog(LOG, "Failed to close file %s: %m", map_path);
784+
elog(LOG, "CFS failed to close file %s: %m", map_path);
747785
succeed = false;
748786
}
749787
FinishGC:
@@ -875,6 +913,7 @@ PG_FUNCTION_INFO_V1(cfs_gc_activity_processed_bytes);
875913
PG_FUNCTION_INFO_V1(cfs_gc_activity_processed_pages);
876914
PG_FUNCTION_INFO_V1(cfs_gc_activity_processed_files);
877915
PG_FUNCTION_INFO_V1(cfs_gc_activity_scanned_files);
916+
PG_FUNCTION_INFO_V1(cfs_gc_relation);
878917

879918
Datum cfs_start_gc(PG_FUNCTION_ARGS)
880919
{
@@ -1002,10 +1041,10 @@ Datum cfs_compression_ratio(PG_FUNCTION_ARGS)
10021041
physSize += pg_atomic_read_u32(&map->physSize);
10031042

10041043
if (cfs_munmap(map) < 0) {
1005-
elog(LOG, "Failed to unmap file %s: %m", map_path);
1044+
elog(LOG, "CFS failed to unmap file %s: %m", map_path);
10061045
}
10071046
if (close(md) < 0) {
1008-
elog(LOG, "Failed to close file %s: %m", map_path);
1047+
elog(LOG, "CFS failed to close file %s: %m", map_path);
10091048
}
10101049
i += 1;
10111050
}
@@ -1051,10 +1090,10 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
10511090
physSize += pg_atomic_read_u32(&map->physSize);
10521091

10531092
if (cfs_munmap(map) < 0) {
1054-
elog(LOG, "Failed to unmap file %s: %m", map_path);
1093+
elog(LOG, "CFS failed to unmap file %s: %m", map_path);
10551094
}
10561095
if (close(md) < 0) {
1057-
elog(LOG, "Failed to close file %s: %m", map_path);
1096+
elog(LOG, "CFS failed to close file %s: %m", map_path);
10581097
}
10591098
i += 1;
10601099
}
@@ -1065,6 +1104,36 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
10651104
PG_RETURN_FLOAT8((double)(physSize - usedSize)/physSize);
10661105
}
10671106

1107+
Datum cfs_gc_relation(PG_FUNCTION_ARGS)
1108+
{
1109+
cfs_processed_segments = 0;
1110+
1111+
if (cfs_gc_workers == 0 && pg_atomic_test_set_flag(&cfs_state->gc_started))
1112+
{
1113+
Oid oid = PG_GETARG_OID(0);
1114+
Relation rel = try_relation_open(oid, AccessShareLock);
1115+
1116+
if (rel != NULL) {
1117+
char* path = relpathbackend(rel->rd_node, rel->rd_backend, MAIN_FORKNUM);
1118+
char* map_path = (char*)palloc(strlen(path) + 16);
1119+
int i = 0;
1120+
sprintf(map_path, "%s.cfm", path);
1121+
1122+
while (true) {
1123+
if (!cfs_gc_file(map_path)) {
1124+
break;
1125+
}
1126+
sprintf(map_path, "%s.%u.cfm", path, ++i);
1127+
}
1128+
pfree(path);
1129+
pfree(map_path);
1130+
relation_close(rel, AccessShareLock);
1131+
}
1132+
pg_atomic_clear_flag(&cfs_state->gc_started);
1133+
}
1134+
PG_RETURN_INT32(cfs_processed_segments);
1135+
}
1136+
10681137
Datum cfs_gc_activity_processed_bytes(PG_FUNCTION_ARGS)
10691138
{
10701139
PG_RETURN_INT64(cfs_state->gc_stat.processedBytes);

src/backend/storage/file/fd.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1816,6 +1816,7 @@ FileWrite(File file, char *buffer, int amount)
18161816
int returnCode;
18171817
char compressedBuffer[CFS_MAX_COMPRESSED_SIZE(BLCKSZ)];
18181818
inode_t inode = 0;
1819+
/*inode_t prev_inode;*/
18191820
off_t seekPos;
18201821

18211822
Assert(FileIsValid(file));
@@ -1867,6 +1868,7 @@ FileWrite(File file, char *buffer, int amount)
18671868
return -1;
18681869
}
18691870
inode = map->inodes[VfdCache[file].seekPos / BLCKSZ];
1871+
/*prev_inode = inode;*/
18701872
if (compressedSize > 0 && compressedSize < CFS_MIN_COMPRESSED_SIZE(BLCKSZ)) {
18711873
Assert((VfdCache[file].seekPos & (BLCKSZ-1)) == 0);
18721874
/* Do not check that new image of compressed page fits into
@@ -1907,8 +1909,14 @@ FileWrite(File file, char *buffer, int amount)
19071909
{
19081910
if (VfdCache[file].fileFlags & PG_COMPRESSION) {
19091911
if (returnCode == amount)
1910-
{
1912+
{
1913+
/* Verify that there is no race condition
1914+
bool rc = pg_atomic_compare_exchange_u64((pg_atomic_uint64*)&VfdCache[file].map->inodes[VfdCache[file].seekPos / BLCKSZ],
1915+
&prev_inode, inode);
1916+
Assert(rc);
1917+
*/
19111918
VfdCache[file].map->inodes[VfdCache[file].seekPos / BLCKSZ] = inode;
1919+
/**/
19121920
VfdCache[file].seekPos += BLCKSZ;
19131921
cfs_extend(VfdCache[file].map, VfdCache[file].seekPos);
19141922
returnCode = BLCKSZ;

0 commit comments

Comments
 (0)