@@ -281,9 +281,11 @@ void cfs_initialize()
281
281
cfs_state = (CfsState * )ShmemAlloc (sizeof (CfsState ));
282
282
memset (& cfs_state -> gc_stat , 0 , sizeof cfs_state -> gc_stat );
283
283
pg_atomic_init_flag (& cfs_state -> gc_started );
284
+ pg_atomic_init_u32 (& cfs_state -> n_active_gc , 0 );
284
285
cfs_state -> n_workers = 0 ;
285
286
cfs_state -> gc_enabled = true;
286
287
cfs_state -> max_iterations = 0 ;
288
+
287
289
if (cfs_encryption ) {
288
290
cfs_rc4_init ();
289
291
}
@@ -470,26 +472,34 @@ static bool cfs_gc_file(char* map_path)
470
472
uint32 virtSize ;
471
473
int suf = strlen (map_path )- 4 ;
472
474
int fd = -1 , fd2 = -1 , md2 = -1 ;
473
- bool succeed = true;
475
+ bool succeed = false;
476
+
477
+ pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc , 1 );
474
478
475
479
while (!cfs_state -> gc_enabled ) {
476
- int rc = WaitLatch (MyLatch ,
477
- WL_TIMEOUT | WL_POSTMASTER_DEATH ,
478
- CFS_DISABLE_TIMEOUT /* ms */ );
480
+ int rc ;
481
+
482
+ pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc , 1 );
483
+
484
+ rc = WaitLatch (MyLatch ,
485
+ WL_TIMEOUT | WL_POSTMASTER_DEATH ,
486
+ CFS_DISABLE_TIMEOUT /* ms */ ,
487
+ WAIT_EVENT_CFS_GC_ENABLE );
479
488
if (cfs_stop || (rc & WL_POSTMASTER_DEATH )) {
480
489
exit (1 );
481
490
}
482
491
}
483
492
if (md < 0 ) {
484
493
elog (LOG , "Failed to open map file %s: %m" , map_path );
485
- return false ;
494
+ goto FinishGC ;
486
495
}
487
496
map = cfs_mmap (md );
488
497
if (map == MAP_FAILED ) {
489
498
elog (LOG , "Failed to map file %s: %m" , map_path );
490
499
close (md );
491
- return false ;
500
+ goto FinishGC ;
492
501
}
502
+ succeed = true;
493
503
usedSize = pg_atomic_read_u32 (& map -> usedSize );
494
504
physSize = pg_atomic_read_u32 (& map -> physSize );
495
505
virtSize = pg_atomic_read_u32 (& map -> virtSize );
@@ -525,7 +535,8 @@ static bool cfs_gc_file(char* map_path)
525
535
break ;
526
536
}
527
537
if (cfs_stop ) {
528
- return false;
538
+ succeed = false;
539
+ goto FinishGC ;
529
540
}
530
541
if (access_count >= CFS_GC_LOCK ) {
531
542
/* Uhhh... looks like last GC was interrupted.
@@ -734,6 +745,8 @@ static bool cfs_gc_file(char* map_path)
734
745
elog (LOG , "Failed to close file %s: %m" , map_path );
735
746
succeed = false;
736
747
}
748
+ FinishGC :
749
+ pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc , 1 );
737
750
return succeed ;
738
751
}
739
752
@@ -833,6 +846,25 @@ void cfs_start_background_gc()
833
846
elog (LOG , "Start %d background CFS background workers" , i );
834
847
}
835
848
849
+ bool cfs_control_gc (bool enabled )
850
+ {
851
+ bool was_enabled = cfs_state -> gc_enabled ;
852
+ cfs_state -> gc_enabled = enabled ;
853
+ if (was_enabled && !enabled ) {
854
+ /* Wait until there are no active GC workers */
855
+ while (pg_atomic_read_u32 (& cfs_state -> n_active_gc ) != 0 ) {
856
+ int rc = WaitLatch (MyLatch ,
857
+ WL_TIMEOUT | WL_POSTMASTER_DEATH ,
858
+ CFS_DISABLE_TIMEOUT /* ms */ ,
859
+ WAIT_EVENT_CFS_GC_ENABLE );
860
+ if (rc & WL_POSTMASTER_DEATH ) {
861
+ exit (1 );
862
+ }
863
+ }
864
+ }
865
+ return was_enabled ;
866
+ }
867
+
836
868
PG_MODULE_MAGIC ;
837
869
838
870
PG_FUNCTION_INFO_V1 (cfs_start_gc );
@@ -883,11 +915,10 @@ Datum cfs_start_gc(PG_FUNCTION_ARGS)
883
915
PG_RETURN_INT32 (i );
884
916
}
885
917
918
+
886
919
Datum cfs_enable_gc (PG_FUNCTION_ARGS )
887
- {
888
- bool prev = cfs_state -> gc_enabled ;
889
- cfs_state -> gc_enabled = PG_GETARG_BOOL (0 );
890
- PG_RETURN_BOOL (prev );
920
+ {
921
+ PG_RETURN_BOOL (cfs_control_gc (PG_GETARG_BOOL (0 )));
891
922
}
892
923
893
924
Datum cfs_version (PG_FUNCTION_ARGS )
0 commit comments