Skip to content

Commit e18cea1

Browse files
committed
Reimplement interaction beetween explicit and implicit GC
1 parent 06f7235 commit e18cea1

File tree

3 files changed

+72
-39
lines changed

3 files changed

+72
-39
lines changed

src/backend/storage/file/cfs.c

Lines changed: 58 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,12 @@ uint32 cfs_alloc_page(FileMap* map, uint32 oldSize, uint32 newSize)
474474
return pg_atomic_fetch_add_u32(&map->hdr.physSize, newSize);
475475
}
476476

477+
void cfs_undo_alloc_page(FileMap* map, uint32 oldSize, uint32 newSize)
478+
{
479+
pg_atomic_fetch_sub_u32(&map->hdr.usedSize, newSize - oldSize);
480+
pg_atomic_fetch_sub_u32(&map->hdr.physSize, newSize);
481+
}
482+
477483
/*
478484
* Update logical file size
479485
*/
@@ -643,12 +649,18 @@ static int cfs_cmp_page_offs(void const* p1, void const* p2)
643649
return o1 < o2 ? -1 : o1 == o2 ? 0 : 1;
644650
}
645651

652+
typedef enum {
653+
CFS_BACKGROUND,
654+
CFS_EXPLICIT,
655+
CFS_IMPLICIT
656+
} GC_CALL_KIND;
657+
646658
/*
647659
* Perform garbage collection (if required) on the file
648660
* @param map_path - path to the map file (*.cfm).
649661
* @param bacground - GC is performed in background by BGW: surpress error message and set CfsGcLock
650662
*/
651-
static bool cfs_gc_file(char* map_path, bool background)
663+
static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
652664
{
653665
int md;
654666
FileMap* map;
@@ -662,25 +674,34 @@ static bool cfs_gc_file(char* map_path, bool background)
662674
bool succeed = false;
663675
int rc;
664676

665-
666677
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
667-
668-
while (!(background ? (cfs_state->gc_enabled & cfs_state->background_gc_enabled) : cfs_state->gc_enabled))
678+
if (background == CFS_IMPLICIT)
669679
{
670-
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
680+
if (!cfs_state->gc_enabled)
681+
{
682+
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
683+
return false;
684+
}
685+
}
686+
else
687+
{
688+
while (!cfs_state->gc_enabled || (background == CFS_BACKGROUND && !cfs_state->background_gc_enabled))
689+
{
690+
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
671691

672-
rc = WaitLatch(MyLatch,
673-
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
674-
CFS_DISABLE_TIMEOUT /* ms */);
675-
if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH))
676-
exit(1);
692+
rc = WaitLatch(MyLatch,
693+
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
694+
CFS_DISABLE_TIMEOUT /* ms */);
695+
if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH))
696+
exit(1);
677697

678-
ResetLatch(MyLatch);
679-
CHECK_FOR_INTERRUPTS();
698+
ResetLatch(MyLatch);
699+
CHECK_FOR_INTERRUPTS();
680700

681-
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
701+
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
702+
}
682703
}
683-
if (background)
704+
if (background == CFS_BACKGROUND)
684705
{
685706
LWLockAcquire(CfsGcLock, LW_SHARED); /* avoid race condition with cfs_file_lock */
686707
}
@@ -708,7 +729,7 @@ static bool cfs_gc_file(char* map_path, bool background)
708729
cfs_state->gc_stat.scannedFiles += 1;
709730

710731
/* do we need to perform defragmentation? */
711-
if ((uint64)(physSize - usedSize)*100 > (uint64)physSize*cfs_gc_threshold)
732+
if (physSize > CFS_IMPLICIT_GC_THRESHOLD || (uint64)(physSize - usedSize)*100 > (uint64)physSize*cfs_gc_threshold)
712733
{
713734
long delay = CFS_LOCK_MIN_TIMEOUT;
714735
char* file_path = (char*)palloc(suf+1);
@@ -1030,7 +1051,7 @@ static bool cfs_gc_file(char* map_path, bool background)
10301051
pfree(inodes);
10311052
pfree(newMap);
10321053

1033-
if (cfs_gc_delay != 0)
1054+
if (cfs_gc_delay != 0 && background == CFS_BACKGROUND)
10341055
{
10351056
int rc = WaitLatch(MyLatch,
10361057
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
@@ -1058,7 +1079,7 @@ static bool cfs_gc_file(char* map_path, bool background)
10581079
}
10591080

10601081
FinishGC:
1061-
if (background)
1082+
if (background == CFS_BACKGROUND)
10621083
{
10631084
LWLockRelease(CfsGcLock);
10641085
}
@@ -1097,7 +1118,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
10971118
strcmp(file_path + len - 4, ".cfm") == 0)
10981119
{
10991120
if (entry->d_ino % cfs_state->n_workers == worker_id
1100-
&& !cfs_gc_file(file_path, true))
1121+
&& !cfs_gc_file(file_path, CFS_BACKGROUND))
11011122
{
11021123
success = false;
11031124
break;
@@ -1195,6 +1216,7 @@ bool cfs_control_gc(bool enabled)
11951216
{
11961217
bool was_enabled = cfs_state->gc_enabled;
11971218
cfs_state->gc_enabled = enabled;
1219+
pg_memory_barrier();
11981220
if (was_enabled && !enabled)
11991221
{
12001222
/* Wait until there are no active GC workers */
@@ -1238,9 +1260,7 @@ Datum cfs_start_gc(PG_FUNCTION_ARGS)
12381260
int j;
12391261
BackgroundWorkerHandle** handles;
12401262

1241-
cfs_gc_stop = true; /* do just one iteration */
1242-
1243-
cfs_state->max_iterations = 1;
1263+
cfs_state->max_iterations = 1; /* do just one iteration */
12441264
cfs_state->n_workers = PG_GETARG_INT32(0);
12451265
handles = (BackgroundWorkerHandle**)palloc(cfs_state->n_workers*sizeof(BackgroundWorkerHandle*));
12461266

@@ -1460,39 +1480,46 @@ Datum cfs_gc_relation(PG_FUNCTION_ARGS)
14601480
char* path;
14611481
char* map_path;
14621482
int i = 0;
1463-
1464-
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
1483+
bool stop = false;
14651484

14661485
processed_segments = cfs_gc_processed_segments;
14671486

14681487
path = relpathbackend(rel->rd_node, rel->rd_backend, MAIN_FORKNUM);
14691488
map_path = (char*)palloc(strlen(path) + 16);
14701489
sprintf(map_path, "%s.cfm", path);
14711490

1472-
while (cfs_gc_file(map_path, false))
1491+
do
14731492
{
1493+
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
1494+
stop = !cfs_gc_file(map_path, CFS_EXPLICIT);
1495+
LWLockRelease(CfsGcLock);
14741496
sprintf(map_path, "%s.%u.cfm", path, ++i);
1475-
}
1497+
} while(!stop);
14761498
pfree(path);
14771499
pfree(map_path);
14781500
relation_close(rel, AccessShareLock);
14791501

14801502
processed_segments = cfs_gc_processed_segments - processed_segments;
1481-
1482-
LWLockRelease(CfsGcLock);
14831503
}
14841504
PG_RETURN_INT32(processed_segments);
14851505
}
14861506

14871507

1488-
void cfs_gc_segment(char const* fileName)
1508+
void cfs_gc_segment(char const* fileName, bool optional)
14891509
{
1490-
char* mapFileName = psprintf("%s.cfm", fileName);
1510+
char* mapFileName;
14911511

1492-
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
1512+
if (optional)
1513+
{
1514+
if (!LWLockConditionalAcquire(CfsGcLock, LW_EXCLUSIVE)) /* Prevent interaction with background GC */
1515+
return;
1516+
}
1517+
else
1518+
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
14931519

1494-
cfs_gc_file(mapFileName, false);
1520+
mapFileName = psprintf("%s.cfm", fileName);
14951521

1522+
cfs_gc_file(mapFileName, optional ? CFS_IMPLICIT : CFS_EXPLICIT);
14961523
LWLockRelease(CfsGcLock);
14971524

14981525
pfree(mapFileName);

src/backend/storage/file/fd.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1980,8 +1980,9 @@ FileWrite(File file, char *buffer, int amount)
19801980
* because we want to write all updated pages sequentially
19811981
*/
19821982
pos = cfs_alloc_page(map, CFS_INODE_SIZE(inode), compressedSize);
1983-
if (pos > pos + compressedSize) {
1984-
elog(ERROR, "CFS: segment file exceeed 4Gb limit");
1983+
if (pos > CFS_RED_LINE) {
1984+
cfs_undo_alloc_page(map, CFS_INODE_SIZE(inode), compressedSize);
1985+
elog(ERROR, "CFS: segment file exceed 4Gb limit");
19851986
}
19861987

19871988
inode = CFS_INODE(compressedSize, pos);
@@ -2102,14 +2103,14 @@ FileWrite(File file, char *buffer, int amount)
21022103
cfs_unlock_file(VfdCache[file].map);
21032104
/*
21042105
* If GC is disabled for a long time, then file can unlimited grow.
2105-
* To avoid wrap aound of 32-bit offsets we force GC on this file when destination position
2106+
* To avoid wrap around of 32-bit offsets we force GC on this file when destination position
21062107
* cross 2Gb boundary.
21072108
*/
2108-
if ((int32)pos >= 0 && (int32)(pos + amount) < 0)
2109+
if (pos + amount > CFS_IMPLICIT_GC_THRESHOLD)
21092110
{
21102111
elog(LOG, "CFS: backend %d forced to perform GC on file %s block %u because it's size exceed %u bytes",
21112112
MyProcPid, VfdCache[file].fileName, (uint32)(VfdCache[file].seekPos / BLCKSZ), pos);
2112-
cfs_gc_segment(VfdCache[file].fileName);
2113+
cfs_gc_segment(VfdCache[file].fileName, pos + amount < CFS_YELLOW_LINE);
21132114
}
21142115
}
21152116
return returnCode;

src/include/storage/cfs.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ typedef uint64 inode_t;
5151
#define CFS_INODE_OFFS(inode) ((uint32)(inode))
5252
#define CFS_INODE(size,offs) (((inode_t)(size) << 32) | (offs))
5353

54+
#define CFS_IMPLICIT_GC_THRESHOLD 0x80000000U /* 2Gb */
55+
#define CFS_YELLOW_LINE 0xC0000000U /* 3Gb */
56+
#define CFS_RED_LINE 0xFFF00000U /* 4Gb - page_size*100 */
57+
5458
size_t cfs_compress(void* dst, size_t dst_size, void const* src, size_t src_size);
5559
size_t cfs_decompress(void* dst, size_t dst_size, void const* src, size_t src_size);
5660
char const* cfs_algorithm(void);
@@ -83,9 +87,9 @@ typedef struct
8387
* Manually started GC performs just one iteration. */
8488
int64 max_iterations;
8589
/* Flag for temporary disabling GC */
86-
bool gc_enabled;
90+
volatile bool gc_enabled;
8791
/* Flag for controlling background GC */
88-
bool background_gc_enabled;
92+
volatile bool background_gc_enabled;
8993
/* CFS GC statatistic */
9094
CfsStatistic gc_stat;
9195
rijndael_ctx aes_context;
@@ -121,6 +125,7 @@ typedef struct
121125
void cfs_lock_file(FileMap* map, char const* path);
122126
void cfs_unlock_file(FileMap* map);
123127
uint32 cfs_alloc_page(FileMap* map, uint32 oldSize, uint32 newSize);
128+
void cfs_undo_alloc_page(FileMap* map, uint32 oldSize, uint32 newSize);
124129
void cfs_extend(FileMap* map, uint32 pos);
125130
bool cfs_control_gc(bool enabled);
126131
int cfs_msync(FileMap* map);
@@ -132,7 +137,7 @@ int cfs_shmem_size(void);
132137
void cfs_encrypt(const char* fname, void* block, uint32 offs, uint32 size);
133138
void cfs_decrypt(const char* fname, void* block, uint32 offs, uint32 size);
134139

135-
void cfs_gc_segment(char const* name);
140+
void cfs_gc_segment(char const* name, bool optional);
136141
void cfs_recover_map(FileMap* map);
137142

138143
extern CfsState* cfs_state;

0 commit comments

Comments
 (0)