@@ -57,6 +57,7 @@ int cfs_gc_period;
57
57
int cfs_gc_delay ;
58
58
int cfs_level ;
59
59
bool cfs_encryption ;
60
+ bool cfs_gc_verify_file ;
60
61
61
62
static bool cfs_read_file (int fd , void * data , uint32 size );
62
63
static bool cfs_write_file (int fd , void const * data , uint32 size );
@@ -65,7 +66,7 @@ static void cfs_start_background_gc(void);
65
66
CfsState * cfs_state ;
66
67
67
68
static bool cfs_stop ;
68
-
69
+ static int cfs_processed_segments ;
69
70
70
71
#if CFS_COMPRESSOR == SNAPPY_COMPRESSOR
71
72
@@ -335,6 +336,9 @@ int cfs_munmap(FileMap* map)
335
336
#endif
336
337
}
337
338
339
+ /*
340
+ * Protects file from GC
341
+ */
338
342
void cfs_lock_file (FileMap * map , char const * file_path )
339
343
{
340
344
long delay = CFS_LOCK_MIN_TIMEOUT ;
@@ -356,7 +360,7 @@ void cfs_lock_file(FileMap* map, char const* file_path)
356
360
if (md2 >= 0 ) {
357
361
/* Recover map */
358
362
if (!cfs_read_file (md2 , map , sizeof (FileMap ))) {
359
- elog (LOG , "Failed to read file %s: %m" , map_bck_path );
363
+ elog (LOG , "CFS failed to read file %s: %m" , map_bck_path );
360
364
}
361
365
close (md2 );
362
366
}
@@ -384,7 +388,7 @@ void cfs_lock_file(FileMap* map, char const* file_path)
384
388
}
385
389
386
390
/*
387
- * Protects file from GC
391
+ * Release file lock
388
392
*/
389
393
void cfs_unlock_file (FileMap * map )
390
394
{
@@ -491,12 +495,12 @@ static bool cfs_gc_file(char* map_path)
491
495
pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc , 1 );
492
496
}
493
497
if (md < 0 ) {
494
- elog (LOG , "Failed to open map file %s: %m" , map_path );
498
+ elog (LOG , "CFS failed to open map file %s: %m" , map_path );
495
499
goto FinishGC ;
496
500
}
497
501
map = cfs_mmap (md );
498
502
if (map == MAP_FAILED ) {
499
- elog (LOG , "Failed to map file %s: %m" , map_path );
503
+ elog (LOG , "CFS failed to map file %s: %m" , map_path );
500
504
close (md );
501
505
goto FinishGC ;
502
506
}
@@ -517,7 +521,7 @@ static bool cfs_gc_file(char* map_path)
517
521
uint32 newSize = 0 ;
518
522
inode_t * * inodes = (inode_t * * )palloc (RELSEG_SIZE * sizeof (inode_t * ));
519
523
bool remove_backups = true;
520
- int n_pages = virtSize / BLCKSZ ;
524
+ int n_pages ;
521
525
TimestampTz startTime , endTime ;
522
526
long secs ;
523
527
int usecs ;
@@ -549,7 +553,7 @@ static bool cfs_gc_file(char* map_path)
549
553
if (md2 >= 0 ) {
550
554
/* Recover map */
551
555
if (!cfs_read_file (md2 , newMap , sizeof (FileMap ))) {
552
- elog (LOG , "Failed to read file %s: %m" , map_bck_path );
556
+ elog (LOG , "CFS failed to read file %s: %m" , map_bck_path );
553
557
goto Cleanup ;
554
558
}
555
559
close (md2 );
@@ -572,6 +576,12 @@ static bool cfs_gc_file(char* map_path)
572
576
delay *= 2 ;
573
577
}
574
578
}
579
+ /* Reread variables after lockign file */
580
+ usedSize = pg_atomic_read_u32 (& map -> usedSize );
581
+ physSize = pg_atomic_read_u32 (& map -> physSize );
582
+ virtSize = pg_atomic_read_u32 (& map -> virtSize );
583
+ n_pages = virtSize / BLCKSZ ;
584
+
575
585
md2 = open (map_bck_path , O_CREAT |O_RDWR |PG_BINARY |O_TRUNC , 0600 );
576
586
if (md2 < 0 ) {
577
587
goto Cleanup ;
@@ -583,7 +593,7 @@ static bool cfs_gc_file(char* map_path)
583
593
/* sort inodes by offset to improve read locality */
584
594
qsort (inodes , n_pages , sizeof (inode_t * ), cfs_cmp_page_offs );
585
595
586
- fd = open (file_path , O_RDWR |PG_BINARY , 0 );
596
+ fd = open (file_path , O_RDONLY |PG_BINARY , 0 );
587
597
if (fd < 0 ) {
588
598
goto Cleanup ;
589
599
}
@@ -593,6 +603,7 @@ static bool cfs_gc_file(char* map_path)
593
603
goto Cleanup ;
594
604
}
595
605
cfs_state -> gc_stat .processedFiles += 1 ;
606
+ cfs_processed_segments += 1 ;
596
607
597
608
for (i = 0 ; i < n_pages ; i ++ ) {
598
609
int size = CFS_INODE_SIZE (* inodes [i ]);
@@ -605,12 +616,12 @@ static bool cfs_gc_file(char* map_path)
605
616
Assert (rc == offs );
606
617
607
618
if (!cfs_read_file (fd , block , size )) {
608
- elog (LOG , "Failed to read file %s: %m" , file_path );
619
+ elog (LOG , "CFS GC failed to read block %d of file %s at position %d size %d : %m" , i , file_path , offs , size );
609
620
goto Cleanup ;
610
621
}
611
622
612
623
if (!cfs_write_file (fd2 , block , size )) {
613
- elog (LOG , "Failed to write file %s: %m" , file_bck_path );
624
+ elog (LOG , "CFS failed to write file %s: %m" , file_bck_path );
614
625
goto Cleanup ;
615
626
}
616
627
cfs_state -> gc_stat .processedBytes += size ;
@@ -621,58 +632,84 @@ static bool cfs_gc_file(char* map_path)
621
632
* inodes [i ] = CFS_INODE (size , offs );
622
633
}
623
634
}
624
- pg_atomic_write_u32 (& map -> usedSize , newSize );
625
-
626
635
if (close (fd ) < 0 ) {
627
- elog (LOG , "Failed to close file %s: %m" , file_path );
636
+ elog (LOG , "CFS failed to close file %s: %m" , file_path );
628
637
goto Cleanup ;
629
638
}
630
639
fd = -1 ;
631
640
632
641
/* Persist copy of data file */
633
642
if (pg_fsync (fd2 ) < 0 ) {
634
- elog (LOG , "Failed to sync file %s: %m" , file_bck_path );
643
+ elog (LOG , "CFS failed to sync file %s: %m" , file_bck_path );
635
644
goto Cleanup ;
636
645
}
637
646
if (close (fd2 ) < 0 ) {
638
- elog (LOG , "Failed to close file %s: %m" , file_bck_path );
647
+ elog (LOG , " CFS failed to close file %s: %m" , file_bck_path );
639
648
goto Cleanup ;
640
649
}
641
650
fd2 = -1 ;
642
651
643
652
/* Persist copy of map file */
644
653
if (!cfs_write_file (md2 , & newMap , sizeof (newMap ))) {
645
- elog (LOG , "Failed to write file %s: %m" , map_bck_path );
654
+ elog (LOG , "CFS failed to write file %s: %m" , map_bck_path );
646
655
goto Cleanup ;
647
656
}
648
657
if (pg_fsync (md2 ) < 0 ) {
649
- elog (LOG , "Failed to sync file %s: %m" , map_bck_path );
658
+ elog (LOG , "CFS failed to sync file %s: %m" , map_bck_path );
650
659
goto Cleanup ;
651
660
}
652
661
if (close (md2 ) < 0 ) {
653
- elog (LOG , "Failed to close file %s: %m" , map_bck_path );
662
+ elog (LOG , "CFS failed to close file %s: %m" , map_bck_path );
654
663
goto Cleanup ;
655
664
}
656
665
md2 = -1 ;
657
666
658
667
/* Persist map with CFS_GC_LOCK set: in case of crash we will know that map may be changed by GC */
659
668
if (cfs_msync (map ) < 0 ) {
660
- elog (LOG , "Failed to sync map %s: %m" , map_path );
669
+ elog (LOG , "CFS failed to sync map %s: %m" , map_path );
661
670
goto Cleanup ;
662
671
}
663
672
if (pg_fsync (md ) < 0 ) {
664
- elog (LOG , "Failed to sync file %s: %m" , map_path );
673
+ elog (LOG , "CFS failed to sync file %s: %m" , map_path );
665
674
goto Cleanup ;
666
675
}
667
676
677
+
678
+ if (cfs_gc_verify_file ) {
679
+ fd = open (file_bck_path , O_RDONLY |PG_BINARY , 0 );
680
+ Assert (fd >= 0 );
681
+
682
+ for (i = 0 ; i < n_pages ; i ++ ) {
683
+ inode_t inode = newMap -> inodes [i ];
684
+ int size = CFS_INODE_SIZE (inode );
685
+ if (size != 0 ) {
686
+ char block [BLCKSZ ];
687
+ char decomressedBlock [BLCKSZ ];
688
+ off_t res PG_USED_FOR_ASSERTS_ONLY ;
689
+ bool rc PG_USED_FOR_ASSERTS_ONLY ;
690
+ res = lseek (fd , CFS_INODE_OFFS (inode ), SEEK_SET );
691
+ Assert (res == (off_t )CFS_INODE_OFFS (inode ));
692
+ rc = cfs_read_file (fd , block , size );
693
+ Assert (rc );
694
+ cfs_decrypt (block , (off_t )i * BLCKSZ , size );
695
+ res = cfs_decompress (decomressedBlock , BLCKSZ , block , size );
696
+ if (res != BLCKSZ ) {
697
+ pg_atomic_fetch_sub_u32 (& map -> lock , CFS_GC_LOCK ); /* release lock */
698
+ elog (PANIC , "Verification failed for block %d of relation %s: error code %d" , i , file_path , (int )res );
699
+ }
700
+ }
701
+ }
702
+ close (fd );
703
+ }
704
+
668
705
/*
669
706
* Now all information necessary for recovery is stored.
670
707
* We are ready to replace existed file with defragmented one.
671
708
* Use rename and rely on file system to provide atomicity of this operation.
672
709
*/
673
710
remove_backups = false;
674
711
if (rename (file_bck_path , file_path ) < 0 ) {
675
- elog (LOG , "Failed to rename file %s: %m" , file_path );
712
+ elog (LOG , "CFS failed to rename file %s: %m" , file_path );
676
713
goto Cleanup ;
677
714
}
678
715
ReplaceMap :
@@ -685,11 +722,11 @@ static bool cfs_gc_file(char* map_path)
685
722
686
723
/* Before removing backup files and releasing locks we need to flush updated map file */
687
724
if (cfs_msync (map ) < 0 ) {
688
- elog (LOG , "Failed to sync map %s: %m" , map_path );
725
+ elog (LOG , "CFS failed to sync map %s: %m" , map_path );
689
726
goto Cleanup ;
690
727
}
691
728
if (pg_fsync (md ) < 0 ) {
692
- elog (LOG , "Failed to sync file %s: %m" , map_path );
729
+ elog (LOG , "CFS failed to sync file %s: %m" , map_path );
693
730
Cleanup :
694
731
if (fd >= 0 ) close (fd );
695
732
if (fd2 >= 0 ) close (fd2 );
@@ -703,11 +740,12 @@ static bool cfs_gc_file(char* map_path)
703
740
} else {
704
741
remove_backups = true; /* now backups are not need any more */
705
742
}
743
+
706
744
pg_atomic_fetch_sub_u32 (& map -> lock , CFS_GC_LOCK ); /* release lock */
707
745
708
746
/* remove map backup file */
709
747
if (remove_backups && unlink (map_bck_path )) {
710
- elog (LOG , "Failed to unlink file %s: %m" , map_bck_path );
748
+ elog (LOG , "CFS failed to unlink file %s: %m" , map_bck_path );
711
749
succeed = false;
712
750
}
713
751
@@ -739,11 +777,11 @@ static bool cfs_gc_file(char* map_path)
739
777
}
740
778
741
779
if (cfs_munmap (map ) < 0 ) {
742
- elog (LOG , "Failed to unmap file %s: %m" , map_path );
780
+ elog (LOG , "CFS failed to unmap file %s: %m" , map_path );
743
781
succeed = false;
744
782
}
745
783
if (close (md ) < 0 ) {
746
- elog (LOG , "Failed to close file %s: %m" , map_path );
784
+ elog (LOG , "CFS failed to close file %s: %m" , map_path );
747
785
succeed = false;
748
786
}
749
787
FinishGC :
@@ -875,6 +913,7 @@ PG_FUNCTION_INFO_V1(cfs_gc_activity_processed_bytes);
875
913
PG_FUNCTION_INFO_V1 (cfs_gc_activity_processed_pages );
876
914
PG_FUNCTION_INFO_V1 (cfs_gc_activity_processed_files );
877
915
PG_FUNCTION_INFO_V1 (cfs_gc_activity_scanned_files );
916
+ PG_FUNCTION_INFO_V1 (cfs_gc_relation );
878
917
879
918
Datum cfs_start_gc (PG_FUNCTION_ARGS )
880
919
{
@@ -1002,10 +1041,10 @@ Datum cfs_compression_ratio(PG_FUNCTION_ARGS)
1002
1041
physSize += pg_atomic_read_u32 (& map -> physSize );
1003
1042
1004
1043
if (cfs_munmap (map ) < 0 ) {
1005
- elog (LOG , "Failed to unmap file %s: %m" , map_path );
1044
+ elog (LOG , "CFS failed to unmap file %s: %m" , map_path );
1006
1045
}
1007
1046
if (close (md ) < 0 ) {
1008
- elog (LOG , "Failed to close file %s: %m" , map_path );
1047
+ elog (LOG , "CFS failed to close file %s: %m" , map_path );
1009
1048
}
1010
1049
i += 1 ;
1011
1050
}
@@ -1051,10 +1090,10 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
1051
1090
physSize += pg_atomic_read_u32 (& map -> physSize );
1052
1091
1053
1092
if (cfs_munmap (map ) < 0 ) {
1054
- elog (LOG , "Failed to unmap file %s: %m" , map_path );
1093
+ elog (LOG , "CFS failed to unmap file %s: %m" , map_path );
1055
1094
}
1056
1095
if (close (md ) < 0 ) {
1057
- elog (LOG , "Failed to close file %s: %m" , map_path );
1096
+ elog (LOG , "CFS failed to close file %s: %m" , map_path );
1058
1097
}
1059
1098
i += 1 ;
1060
1099
}
@@ -1065,6 +1104,36 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
1065
1104
PG_RETURN_FLOAT8 ((double )(physSize - usedSize )/physSize );
1066
1105
}
1067
1106
1107
+ Datum cfs_gc_relation (PG_FUNCTION_ARGS )
1108
+ {
1109
+ cfs_processed_segments = 0 ;
1110
+
1111
+ if (cfs_gc_workers == 0 && pg_atomic_test_set_flag (& cfs_state -> gc_started ))
1112
+ {
1113
+ Oid oid = PG_GETARG_OID (0 );
1114
+ Relation rel = try_relation_open (oid , AccessShareLock );
1115
+
1116
+ if (rel != NULL ) {
1117
+ char * path = relpathbackend (rel -> rd_node , rel -> rd_backend , MAIN_FORKNUM );
1118
+ char * map_path = (char * )palloc (strlen (path ) + 16 );
1119
+ int i = 0 ;
1120
+ sprintf (map_path , "%s.cfm" , path );
1121
+
1122
+ while (true) {
1123
+ if (!cfs_gc_file (map_path )) {
1124
+ break ;
1125
+ }
1126
+ sprintf (map_path , "%s.%u.cfm" , path , ++ i );
1127
+ }
1128
+ pfree (path );
1129
+ pfree (map_path );
1130
+ relation_close (rel , AccessShareLock );
1131
+ }
1132
+ pg_atomic_clear_flag (& cfs_state -> gc_started );
1133
+ }
1134
+ PG_RETURN_INT32 (cfs_processed_segments );
1135
+ }
1136
+
1068
1137
Datum cfs_gc_activity_processed_bytes (PG_FUNCTION_ARGS )
1069
1138
{
1070
1139
PG_RETURN_INT64 (cfs_state -> gc_stat .processedBytes );
0 commit comments