Skip to content

Commit 0ae6e6d

Browse files
committed
Merge branch 'PGPROEE9_6_CFS_385' into PGPROEE9_6
2 parents 1a5283c + e6c29f0 commit 0ae6e6d

File tree

5 files changed

+86
-26
lines changed

5 files changed

+86
-26
lines changed

src/backend/storage/file/cfs.c

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
#include "utils/resowner_private.h"
5656
#include "postmaster/bgworker.h"
5757

58+
5859
/*
5960
* GUC variable that defines compression level.
6061
* 0 - no compression, 1 - max speed,
@@ -417,15 +418,21 @@ int cfs_msync(FileMap* map)
417418
FileMap* cfs_mmap(int md)
418419
{
419420
FileMap* map;
421+
if (ftruncate(md, sizeof(FileMap)) != 0)
422+
{
423+
return (FileMap*)MAP_FAILED;
424+
}
425+
420426
#ifdef WIN32
421-
HANDLE mh = CreateFileMapping(_get_osfhandle(md), NULL, PAGE_READWRITE,
427+
{
428+
HANDLE mh = CreateFileMapping(_get_osfhandle(md), NULL, PAGE_READWRITE,
422429
0, (DWORD)sizeof(FileMap), NULL);
423-
if (mh == NULL)
424-
return (FileMap*)MAP_FAILED;
425-
426-
map = (FileMap*)MapViewOfFile(mh, FILE_MAP_ALL_ACCESS, 0, 0, 0);
427-
CloseHandle(mh);
430+
if (mh == NULL)
431+
return (FileMap*)MAP_FAILED;
428432

433+
map = (FileMap*)MapViewOfFile(mh, FILE_MAP_ALL_ACCESS, 0, 0, 0);
434+
CloseHandle(mh);
435+
}
429436
if (map == NULL)
430437
return (FileMap*)MAP_FAILED;
431438

@@ -533,17 +540,44 @@ static bool cfs_write_file(int fd, void const* data, uint32 size)
533540
void cfs_lock_file(FileMap* map, char const* file_path)
534541
{
535542
long delay = CFS_LOCK_MIN_TIMEOUT;
543+
int n_attempts = 0;
536544

537545
while (true)
538546
{
539547
uint64 count = pg_atomic_fetch_add_u32(&map->lock, 1);
548+
bool revokeLock = false;
540549

541550
if (count < CFS_GC_LOCK)
542551
break;
543-
544-
if (InRecovery)
552+
553+
if (InRecovery)
554+
{
555+
revokeLock = true;
556+
}
557+
else
545558
{
546-
/* Uhhh... looks like last GC was interrupted.
559+
if (!pg_atomic_unlocked_test_flag(&cfs_state->gc_started))
560+
{
561+
if (++n_attempts > MAX_LOCK_ATTEMPTS)
562+
{
563+
/* So there is GC lock, but no active GC process during MAX_LOCK_ATTEMPTS.
564+
* Most likely it means that GC is crashed (may be together with other postgres processes or even OS)
565+
* without releasing lock. And for some reasons recovery was not performed and this page left locked.
566+
* We should revoke the the lock to allow access to this segment.
567+
*/
568+
revokeLock = true;
569+
}
570+
}
571+
else
572+
{
573+
n_attempts = 0; /* Reset counter of attempts because GC is in progress */
574+
}
575+
}
576+
if (revokeLock
577+
/* use gc_started flag to prevent race condition with other backends and GC */
578+
&& pg_atomic_test_set_flag(&cfs_state->gc_started))
579+
{
580+
/* Ugggh... looks like last GC was interrupted.
547581
* Try to recover the file.
548582
*/
549583
char* map_bck_path = psprintf("%s.cfm.bck", file_path);
@@ -565,18 +599,20 @@ void cfs_lock_file(FileMap* map, char const* file_path)
565599
else
566600
{
567601
/* Presence of backup file means that we still have
568-
* unchanged data and map files. Just remove backup files,
569-
* grab lock and continue processing
602+
* unchanged data and map files. Just remove backup files and
603+
* revoke GC lock.
570604
*/
571605
unlink(file_bck_path);
572606
unlink(map_bck_path);
573607
}
574608

609+
pg_atomic_clear_flag(&cfs_state->gc_started);
610+
count = pg_atomic_fetch_sub_u32(&map->lock, CFS_GC_LOCK); /* revoke GC lock */
611+
Assert((int)count > 0);
575612
pfree(file_bck_path);
576613
pfree(map_bck_path);
577614
break;
578-
}
579-
615+
}
580616
pg_atomic_fetch_sub_u32(&map->lock, 1);
581617
pg_usleep(delay);
582618
if (delay < CFS_LOCK_MAX_TIMEOUT)

src/backend/storage/file/copydir.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ copy_zip_file(char *fromfile, bool from_compressed,
319319
Assert(sep != NULL);
320320

321321
if ((sscanf(sep+1, "%d.%d%n", &relno, &segno, &n) != 2
322+
&& sscanf(sep+1, "%d_init%n", &relno, &n) != 1
322323
&& sscanf(sep+1, "%d%n", &relno, &n) != 1)
323324
|| sep[n+1] != '\0'
324325
|| relno < FirstNormalObjectId)
@@ -328,7 +329,7 @@ copy_zip_file(char *fromfile, bool from_compressed,
328329
}
329330

330331
if (to_compressed)
331-
fprintf(stderr, "Compress file %s, relno=%d, sep[n+1]=%s\n",
332+
elog(DEBUG2, "Compress file %s, relno=%d, sep[n+1]=%s\n",
332333
tofile, relno, &sep[n+1]);
333334

334335
/* Open the files */

src/backend/storage/file/fd.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,9 +1322,6 @@ PathNameOpenFile(FileName fileName, int fileFlags, int fileMode)
13221322
goto io_error;
13231323
}
13241324

1325-
if (ftruncate(vfdP->md, sizeof(FileMap)) != 0)
1326-
elog(LOG, "OPEN MAP ftruncate FAILED: %d", errno);
1327-
13281325
vfdP->map = cfs_mmap(vfdP->md);
13291326
if (vfdP->map == MAP_FAILED)
13301327
{

src/backend/storage/file/reinit.c

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
#include "utils/memutils.h"
2626

2727
static void ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
28-
int op);
28+
int op, bool compressed);
2929
static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
30-
int op);
30+
int op, bool compressed);
3131
static bool parse_filename_for_nontemp_relation(const char *name,
3232
int *oidchars, ForkNumber *fork);
3333

@@ -71,7 +71,7 @@ ResetUnloggedRelations(int op)
7171
/*
7272
* First process unlogged files in pg_default ($PGDATA/base)
7373
*/
74-
ResetUnloggedRelationsInTablespaceDir("base", op);
74+
ResetUnloggedRelationsInTablespaceDir("base", op, false);
7575

7676
/*
7777
* Cycle through directories for all non-default tablespaces.
@@ -80,13 +80,25 @@ ResetUnloggedRelations(int op)
8080

8181
while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
8282
{
83+
FILE* compressionFile;
84+
8385
if (strcmp(spc_de->d_name, ".") == 0 ||
8486
strcmp(spc_de->d_name, "..") == 0)
8587
continue;
8688

89+
snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s/pg_compression",
90+
spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
91+
92+
compressionFile = fopen(temp_path, "r");
93+
if (compressionFile)
94+
{
95+
fclose(compressionFile);
96+
}
97+
8798
snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
8899
spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
89-
ResetUnloggedRelationsInTablespaceDir(temp_path, op);
100+
101+
ResetUnloggedRelationsInTablespaceDir(temp_path, op, compressionFile != NULL);
90102
}
91103

92104
FreeDir(spc_dir);
@@ -100,7 +112,7 @@ ResetUnloggedRelations(int op)
100112

101113
/* Process one tablespace directory for ResetUnloggedRelations */
102114
static void
103-
ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
115+
ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op, bool compressed)
104116
{
105117
DIR *ts_dir;
106118
struct dirent *de;
@@ -133,15 +145,15 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
133145

134146
snprintf(dbspace_path, sizeof(dbspace_path), "%s/%s",
135147
tsdirname, de->d_name);
136-
ResetUnloggedRelationsInDbspaceDir(dbspace_path, op);
148+
ResetUnloggedRelationsInDbspaceDir(dbspace_path, op, compressed);
137149
}
138150

139151
FreeDir(ts_dir);
140152
}
141153

142154
/* Process one per-dbspace directory for ResetUnloggedRelations */
143155
static void
144-
ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
156+
ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op, bool compressed)
145157
{
146158
DIR *dbspace_dir;
147159
struct dirent *de;
@@ -332,8 +344,13 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
332344
strlen(forkNames[INIT_FORKNUM]));
333345

334346
/* OK, we're ready to perform the actual copy. */
335-
elog(DEBUG2, "copying %s to %s", srcpath, dstpath);
336-
copy_file(srcpath, dstpath);
347+
if (compressed) {
348+
elog(DEBUG2, "copying %s to %s with compression", srcpath, dstpath);
349+
copy_zip_file(srcpath, false, dstpath, true);
350+
} else {
351+
elog(DEBUG2, "copying %s to %s", srcpath, dstpath);
352+
copy_file(srcpath, dstpath);
353+
}
337354
}
338355

339356
FreeDir(dbspace_dir);

src/include/storage/cfs.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@
1515
#define CFS_DISABLE_TIMEOUT 1000 /* milliseconds */
1616
#define CFS_ESTIMATE_PROBES 10
1717

18+
/*
19+
* Maximal number of attempts backend should perform to lock file segment.
20+
* If during all this attempts gc_started flag is not set (it means that no GC is active at this moment),
21+
* then it means that for some reasons GC process was crashed (may be together with other postgres processes or even OS)
22+
* without releasing lock. And for some reasons recovery was not performed and this page left locked. Цe should revoke this lock
23+
* to allow access to this database segment.
24+
*/
25+
#define MAX_LOCK_ATTEMPTS 100
26+
1827
/* Maximal size of buffer for compressing (size) bytes where (size) is equal to PostgreSQL page size.
1928
* Some compression algorithms requires to provide buffer large enough for worst case and immediately returns error is buffer is not enough.
2029
* Accurate calculation of required buffer size is not needed here and doubling buffer size works for all used compression algorithms. */

0 commit comments

Comments
 (0)