@@ -257,9 +257,9 @@ static void cfs_crypto_init(void)
257
257
uint8 aes_key [32 ] = {0 }; /* at most 256 bits */
258
258
259
259
cipher_key = getenv ("PG_CIPHER_KEY" );
260
- if (cipher_key == NULL ) {
260
+ if (cipher_key == NULL ) {
261
261
elog (ERROR , "PG_CIPHER_KEY environment variable is not set" );
262
- }
262
+ }
263
263
unsetenv ("PG_CIPHER_KEY" ); /* disable inspection of this environment variable */
264
264
key_length = strlen (cipher_key );
265
265
@@ -406,11 +406,11 @@ void cfs_initialize()
406
406
cfs_state -> n_workers = 0 ;
407
407
cfs_state -> gc_enabled = cfs_gc_enabled ;
408
408
cfs_state -> max_iterations = 0 ;
409
-
409
+
410
410
if (cfs_encryption )
411
411
cfs_crypto_init ();
412
-
413
- elog (LOG , "Start CFS version %s compression algorithm %s encryption %s GC %s" ,
412
+
413
+ elog (LOG , "Start CFS version %s compression algorithm %s encryption %s GC %s" ,
414
414
CFS_VERSION , cfs_algorithm (), cfs_encryption ? "enabled" : "disabled" , cfs_gc_enabled ? "enabled" : "disabled" );
415
415
}
416
416
}
@@ -426,19 +426,19 @@ int cfs_msync(FileMap* map)
426
426
FileMap * cfs_mmap (int md )
427
427
{
428
428
FileMap * map ;
429
- if (ftruncate (md , sizeof (FileMap )) != 0 )
429
+ if (ftruncate (md , sizeof (FileMap )) != 0 )
430
430
{
431
431
return (FileMap * )MAP_FAILED ;
432
432
}
433
-
433
+
434
434
#ifdef WIN32
435
435
{
436
- HANDLE mh = CreateFileMapping (_get_osfhandle (md ), NULL , PAGE_READWRITE ,
436
+ HANDLE mh = CreateFileMapping (_get_osfhandle (md ), NULL , PAGE_READWRITE ,
437
437
0 , (DWORD )sizeof (FileMap ), NULL );
438
438
if (mh == NULL )
439
439
return (FileMap * )MAP_FAILED ;
440
440
441
- map = (FileMap * )MapViewOfFile (mh , FILE_MAP_ALL_ACCESS , 0 , 0 , 0 );
441
+ map = (FileMap * )MapViewOfFile (mh , FILE_MAP_ALL_ACCESS , 0 , 0 , 0 );
442
442
CloseHandle (mh );
443
443
}
444
444
if (map == NULL )
@@ -499,7 +499,7 @@ static bool cfs_read_file(int fd, void* data, uint32 size)
499
499
else
500
500
offs += rc ;
501
501
} while (offs < size );
502
-
502
+
503
503
return true;
504
504
}
505
505
@@ -519,7 +519,7 @@ static bool cfs_write_file(int fd, void const* data, uint32 size)
519
519
else
520
520
offs += rc ;
521
521
} while (offs < size );
522
-
522
+
523
523
return true;
524
524
}
525
525
@@ -559,23 +559,23 @@ void cfs_lock_file(FileMap* map, char const* file_path)
559
559
break ;
560
560
}
561
561
562
- if (pg_atomic_read_u32 (& cfs_state -> n_active_gc ) == 0 )
563
- {
562
+ if (pg_atomic_read_u32 (& cfs_state -> n_active_gc ) == 0 )
563
+ {
564
564
/* There is no active GC, so lock is set by crashed GC */
565
565
566
566
LWLockAcquire (CfsGcLock , LW_EXCLUSIVE ); /* Prevent race condition with GC */
567
567
568
568
/* Recheck under CfsGcLock that map->lock was not released */
569
- if (pg_atomic_read_u32 (& map -> lock ) >= CFS_GC_LOCK )
569
+ if (pg_atomic_read_u32 (& map -> lock ) >= CFS_GC_LOCK )
570
570
{
571
571
/* Uhhh... looks like last GC was interrupted.
572
572
* Try to recover the file.
573
573
*/
574
574
char * map_bck_path = psprintf ("%s.cfm.bck" , file_path );
575
575
char * file_bck_path = psprintf ("%s.bck" , file_path );
576
-
576
+
577
577
elog (WARNING , "CFS indicates that GC of %s was interrupted: trying to perform recovery" , file_path );
578
-
578
+
579
579
if (access (file_bck_path , R_OK ) != 0 )
580
580
{
581
581
/* There is no backup file: new map should be constructed */
@@ -585,20 +585,20 @@ void cfs_lock_file(FileMap* map, char const* file_path)
585
585
/* Recover map. */
586
586
if (!cfs_read_file (md2 , map , sizeof (FileMap )))
587
587
elog (WARNING , "CFS failed to read file %s: %m" , map_bck_path );
588
-
588
+
589
589
close (md2 );
590
590
}
591
591
}
592
592
else
593
593
{
594
594
/* Presence of backup file means that we still have
595
- * unchanged data and map files. Just remove backup files and
595
+ * unchanged data and map files. Just remove backup files and
596
596
* revoke GC lock.
597
597
*/
598
598
unlink (file_bck_path );
599
599
unlink (map_bck_path );
600
600
}
601
-
601
+
602
602
count = pg_atomic_fetch_sub_u32 (& map -> lock , CFS_GC_LOCK ); /* revoke GC lock */
603
603
Assert ((int )count > 0 );
604
604
pfree (file_bck_path );
@@ -634,12 +634,13 @@ void cfs_unlock_file(FileMap* map)
634
634
/*
635
635
* Sort pages by offset to improve access locality
636
636
*/
637
- static int cfs_cmp_page_offs (void const * p1 , void const * p2 )
637
+ static int cfs_cmp_page_offs (void const * p1 , void const * p2 )
638
638
{
639
639
uint32 o1 = CFS_INODE_OFFS (* * (inode_t * * )p1 );
640
640
uint32 o2 = CFS_INODE_OFFS (* * (inode_t * * )p2 );
641
641
return o1 < o2 ? -1 : o1 == o2 ? 0 : 1 ;
642
642
}
643
+
643
644
/*
644
645
* Perform garbage collection (if required) on the file
645
646
* @param map_path - path to the map file (*.cfm).
@@ -658,7 +659,7 @@ static bool cfs_gc_file(char* map_path, bool background)
658
659
int md2 = -1 ;
659
660
bool succeed = false;
660
661
int rc ;
661
-
662
+
662
663
663
664
pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc , 1 );
664
665
@@ -667,13 +668,13 @@ static bool cfs_gc_file(char* map_path, bool background)
667
668
while (!cfs_state -> gc_enabled )
668
669
{
669
670
pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc , 1 );
670
-
671
+
671
672
rc = WaitLatch (MyLatch ,
672
673
WL_TIMEOUT | WL_POSTMASTER_DEATH ,
673
674
CFS_DISABLE_TIMEOUT /* ms */ );
674
675
if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
675
676
exit (1 );
676
-
677
+
677
678
pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc , 1 );
678
679
}
679
680
@@ -682,7 +683,7 @@ static bool cfs_gc_file(char* map_path, bool background)
682
683
683
684
md = open (map_path , O_RDWR |PG_BINARY , 0 );
684
685
if (md < 0 )
685
- {
686
+ {
686
687
elog (DEBUG1 , "CFS failed to open map file %s: %m" , map_path );
687
688
goto FinishGC ;
688
689
}
@@ -699,7 +700,7 @@ static bool cfs_gc_file(char* map_path, bool background)
699
700
usedSize = pg_atomic_read_u32 (& map -> usedSize );
700
701
physSize = pg_atomic_read_u32 (& map -> physSize );
701
702
virtSize = pg_atomic_read_u32 (& map -> virtSize );
702
-
703
+
703
704
cfs_state -> gc_stat .scannedFiles += 1 ;
704
705
705
706
/* do we need to perform defragmentation? */
@@ -804,7 +805,7 @@ static bool cfs_gc_file(char* map_path, bool background)
804
805
}
805
806
/* sort inodes by offset to improve read locality */
806
807
qsort (inodes , n_pages , sizeof (inode_t * ), cfs_cmp_page_offs );
807
-
808
+
808
809
fd = open (file_path , O_RDONLY |PG_BINARY , 0 );
809
810
if (fd < 0 )
810
811
goto Cleanup ;
@@ -927,9 +928,8 @@ static bool cfs_gc_file(char* map_path, bool background)
927
928
if (res != BLCKSZ )
928
929
{
929
930
pg_atomic_fetch_sub_u32 (& map -> lock , CFS_GC_LOCK ); /* release lock */
930
- /* TODO Is it worth to PANIC or ERROR will be enough? */
931
- elog (PANIC , "Verification failed for block %d of relation %s: error code %d" ,
932
- i , file_bck_path , (int )res );
931
+ elog (ERROR , "Verification failed for block %d position %d size %d of relation %s: error code %d" ,
932
+ i , (int )CFS_INODE_OFFS (inode ), size , file_bck_path , (int )res );
933
933
}
934
934
}
935
935
}
@@ -977,7 +977,7 @@ static bool cfs_gc_file(char* map_path, bool background)
977
977
if (remove_backups )
978
978
{
979
979
unlink (file_bck_path );
980
- unlink (map_bck_path );
980
+ unlink (map_bck_path );
981
981
remove_backups = false;
982
982
}
983
983
succeed = false;
@@ -1070,7 +1070,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
1070
1070
/* If we have found a map file, run gc worker on it.
1071
1071
* Otherwise, try to gc the directory recursively.
1072
1072
*/
1073
- if (len > 4 &&
1073
+ if (len > 4 &&
1074
1074
strcmp (file_path + len - 4 , ".cfm" ) == 0 )
1075
1075
{
1076
1076
if (entry -> d_ino % cfs_state -> n_workers == worker_id
@@ -1080,7 +1080,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
1080
1080
break ;
1081
1081
}
1082
1082
}
1083
- else if (!cfs_gc_directory (worker_id , file_path ))
1083
+ else if (!cfs_gc_directory (worker_id , file_path ))
1084
1084
{
1085
1085
success = false;
1086
1086
break ;
@@ -1124,7 +1124,7 @@ static void cfs_gc_bgworker_main(Datum arg)
1124
1124
int timeout = cfs_gc_period ;
1125
1125
int rc ;
1126
1126
1127
- if (!cfs_gc_scan_tablespace (worker_id ))
1127
+ if (!cfs_gc_scan_tablespace (worker_id ))
1128
1128
{
1129
1129
timeout = CFS_RETRY_TIMEOUT ;
1130
1130
}
@@ -1148,7 +1148,7 @@ void cfs_gc_start_bgworkers()
1148
1148
1149
1149
for (i = 0 ; i < cfs_gc_workers ; i ++ )
1150
1150
{
1151
- BackgroundWorker worker ;
1151
+ BackgroundWorker worker ;
1152
1152
BackgroundWorkerHandle * handle ;
1153
1153
MemSet (& worker , 0 , sizeof (worker ));
1154
1154
sprintf (worker .bgw_name , "cfs-worker-%d" , i );
@@ -1165,9 +1165,9 @@ void cfs_gc_start_bgworkers()
1165
1165
}
1166
1166
1167
1167
/* Enable/disable garbage colection. */
1168
- bool cfs_control_gc (bool enabled )
1168
+ bool cfs_control_gc (bool enabled )
1169
1169
{
1170
- bool was_enabled = cfs_state -> gc_enabled ;
1170
+ bool was_enabled = cfs_state -> gc_enabled ;
1171
1171
cfs_state -> gc_enabled = enabled ;
1172
1172
if (was_enabled && !enabled )
1173
1173
{
@@ -1272,7 +1272,7 @@ Datum cfs_estimate(PG_FUNCTION_ARGS)
1272
1272
off_t step = rc / BLCKSZ / CFS_ESTIMATE_PROBES * BLCKSZ ;
1273
1273
for (i = 0 ; i < CFS_ESTIMATE_PROBES ; i ++ )
1274
1274
{
1275
- rc = lseek (fd , step * i , SEEK_SET );
1275
+ rc = lseek (fd , step * i , SEEK_SET );
1276
1276
if (rc < 0 )
1277
1277
break ;
1278
1278
@@ -1406,7 +1406,7 @@ Datum cfs_gc_relation(PG_FUNCTION_ARGS)
1406
1406
Oid oid = PG_GETARG_OID (0 );
1407
1407
Relation rel = try_relation_open (oid , AccessShareLock );
1408
1408
int processed_segments = 0 ;
1409
-
1409
+
1410
1410
if (rel != NULL )
1411
1411
{
1412
1412
char * path ;
@@ -1420,7 +1420,7 @@ Datum cfs_gc_relation(PG_FUNCTION_ARGS)
1420
1420
path = relpathbackend (rel -> rd_node , rel -> rd_backend , MAIN_FORKNUM );
1421
1421
map_path = (char * )palloc (strlen (path ) + 16 );
1422
1422
sprintf (map_path , "%s.cfm" , path );
1423
-
1423
+
1424
1424
while (cfs_gc_file (map_path , false))
1425
1425
{
1426
1426
sprintf (map_path , "%s.%u.cfm" , path , ++ i );
@@ -1436,6 +1436,21 @@ Datum cfs_gc_relation(PG_FUNCTION_ARGS)
1436
1436
PG_RETURN_INT32 (cfs_gc_processed_segments );
1437
1437
}
1438
1438
1439
+
1440
+ void cfs_gc_segment (char const * fileName )
1441
+ {
1442
+ char * mapFileName = psprintf ("%s.cfm" , fileName );
1443
+
1444
+ LWLockAcquire (CfsGcLock , LW_EXCLUSIVE ); /* Prevent interaction with background GC */
1445
+
1446
+ cfs_gc_file (mapFileName , false);
1447
+
1448
+ LWLockRelease (CfsGcLock );
1449
+
1450
+ pfree (mapFileName );
1451
+ }
1452
+
1453
+
1439
1454
Datum cfs_gc_activity_processed_bytes (PG_FUNCTION_ARGS )
1440
1455
{
1441
1456
PG_RETURN_INT64 (cfs_state -> gc_stat .processedBytes );
0 commit comments