Skip to content

Commit bfc4503

Browse files
authored
Correct clean existing cache files during startup (treeverse#1237)
* Correct clean existing cache files during startup Also improve warnings on immediate rejection from cache. Fixes treeverse#1183. * [CR] Use tfs.eviction member where possible * [CR] Fix workspace cleanup on startup * [CR] Keep deleting files when ristretto cache.Set() fails A synchronous failure is due to the ristretto worker goroutine not being set up correctly (possibly during cache shutdown), so onReject might not be called. But continue to log nicely, of course, to debug such weird behaviour. * Keep evidence of failed tier_fs tests This is a temporary directory that is left behind on test failure. In CI it does not matter because the container dies anyway; on a developer machine it helps. Also improve test error detection in TestStartup.
1 parent 9dcbd73 commit bfc4503

File tree

4 files changed

+63
-39
lines changed

4 files changed

+63
-39
lines changed

pyramid/eviction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"fmt"
55

66
"github.com/dgraph-io/ristretto"
7-
params "github.com/treeverse/lakefs/pyramid/params"
7+
"github.com/treeverse/lakefs/pyramid/params"
88
)
99

1010
// nolint: unused

pyramid/params/params.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ type RelativePath string
1111
// Eviction abstracts eviction control.
1212
type Eviction interface {
1313
// Touch indicates the eviction that the file has been used now
14-
Touch(rPath RelativePath)
14+
Touch(path RelativePath)
1515

1616
// Store orders the eviction to Store the path.
1717
// returns true iff the eviction accepted the path.
18-
Store(rPath RelativePath, filesize int64) bool
18+
Store(path RelativePath, filesize int64) bool
1919
}
2020

2121
// LocalDiskParams is pyramid.FS params that are identical for all file-systems

pyramid/tier_fs.go

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func NewFS(c *params.InstanceParams) (FS, error) {
4848
return nil, fmt.Errorf("creating base dir: %s - %w", fsLocalBaseDir, err)
4949
}
5050

51-
tierFS := &TierFS{
51+
tfs := &TierFS{
5252
adapter: c.Adapter,
5353
fsName: c.FSName,
5454
logger: c.Logger,
@@ -59,44 +59,43 @@ func NewFS(c *params.InstanceParams) (FS, error) {
5959
}
6060
if c.Eviction == nil {
6161
var err error
62-
c.Eviction, err = newRistrettoEviction(c.AllocatedBytes(), tierFS.removeFromLocal)
62+
c.Eviction, err = newRistrettoEviction(c.AllocatedBytes(), tfs.removeFromLocal)
6363
if err != nil {
6464
return nil, fmt.Errorf("creating eviction control: %w", err)
6565
}
6666
}
6767

68-
tierFS.eviction = c.Eviction
69-
if err := handleExistingFiles(tierFS.eviction, fsLocalBaseDir); err != nil {
68+
tfs.eviction = c.Eviction
69+
if err := tfs.handleExistingFiles(); err != nil {
7070
return nil, fmt.Errorf("handling existing files: %w", err)
7171
}
7272

73-
return tierFS, nil
73+
return tfs, nil
7474
}
7575

7676
// handleExistingFiles should only be called during init of the TierFS.
7777
// It does 2 things:
7878
// 1. Adds stored files to the eviction control
7979
// 2. Remove workspace directories and all its content if it
8080
// exist under the namespace dir.
81-
func handleExistingFiles(eviction params.Eviction, fsLocalBaseDir string) error {
82-
if err := filepath.Walk(fsLocalBaseDir, func(rPath string, info os.FileInfo, err error) error {
81+
func (tfs *TierFS) handleExistingFiles() error {
82+
if err := filepath.Walk(tfs.fsLocalBaseDir, func(p string, info os.FileInfo, err error) error {
8383
if err != nil {
8484
return err
8585
}
8686
if info.IsDir() {
8787
if info.Name() == workspaceDir {
8888
// skipping workspaces and saving them for later delete
89-
if err := os.RemoveAll(rPath); err != nil {
89+
if err := os.RemoveAll(p); err != nil {
9090
return fmt.Errorf("removing dir: %w", err)
9191
}
9292
return filepath.SkipDir
9393
}
9494
return nil
9595
}
9696

97-
if err := storeLocalFile(rPath, info.Size(), eviction); err != nil {
98-
return err
99-
}
97+
rPath := strings.TrimPrefix(p, tfs.fsLocalBaseDir)
98+
tfs.storeLocalFile(params.RelativePath(rPath), info.Size())
10099
return nil
101100
}); err != nil {
102101
return fmt.Errorf("walking the fs dir: %w", err)
@@ -121,7 +120,7 @@ func (tfs *TierFS) removeFromLocalInternal(rPath params.RelativePath) {
121120
return
122121
}
123122

124-
if err := tfs.syncDir.deleteDirRecIfEmpty(path.Dir(p)); err != nil {
123+
if err := tfs.syncDir.deleteDirRecIfEmpty(path.Dir(string(rPath))); err != nil {
125124
tfs.logger.WithError(err).Error("Failed deleting empty dir")
126125
errorsTotal.WithLabelValues(tfs.fsName, "DirRemoval")
127126
}
@@ -240,10 +239,16 @@ func (tfs *TierFS) openFile(fileRef localFileRef, fh *os.File) (*ROFile, error)
240239
}
241240

242241
if !tfs.eviction.Store(fileRef.fsRelativePath, stat.Size()) {
243-
// This is where we get less strict.
244-
// Ideally, newly fetched file will never be rejected by the cache.
245-
// But if it did, we prefer to serve the file and delete it.
246-
// When the user will close the file, the file will be deleted from the disk too.
242+
tfs.logger.WithFields(logging.Fields{
243+
"namespace": fileRef.namespace,
244+
"file": fileRef.filename,
245+
"full_path": fileRef.fullPath,
246+
}).Info("stored file immediately rejected from cache (delete but continue)")
247+
248+
// A rare occurrence, (currently) happens when Ristretto cache is not set up
249+
// to perform any caching. So be less strict: prefer to serve the file and
250+
// delete it from the cache. It will be removed from disk when the last
251+
// surviving file descriptor -- returned from this function -- is closed.
247252
if err := os.Remove(fileRef.fullPath); err != nil {
248253
return nil, err
249254
}
@@ -334,17 +339,6 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os.
334339
return fh, nil
335340
}
336341

337-
func storeLocalFile(rPath string, size int64, eviction params.Eviction) error {
338-
relativePath := params.RelativePath(rPath)
339-
if !eviction.Store(relativePath, size) {
340-
err := os.Remove(rPath)
341-
if err != nil {
342-
return fmt.Errorf("removing file: %w", err)
343-
}
344-
}
345-
return nil
346-
}
347-
348342
func validateFilename(filename string) error {
349343
if strings.HasPrefix(filename, workspaceDir+string(os.PathSeparator)) {
350344
return errPathInWorkspace
@@ -363,13 +357,36 @@ type localFileRef struct {
363357
fsRelativePath params.RelativePath
364358
}
365359

360+
func (tfs *TierFS) storeLocalFile(rPath params.RelativePath, size int64) {
361+
if !tfs.eviction.Store(rPath, size) {
362+
// Rejected from cache, so deleted. This is safe, but can only happen when
363+
// the cache size was lowered -- so warn.
364+
tfs.logger.WithFields(logging.Fields{
365+
"path": rPath,
366+
"size": size,
367+
}).Warn("existing file immediately rejected from cache on startup (safe if cache size changed; continue)")
368+
369+
// A rare occurrence, (currently) happens when Ristretto cache is not set up
370+
// to perform any caching. So be less strict: prefer to serve the file and
371+
// delete it from the cache. It will be removed from disk when the last
372+
// surviving file descriptor -- returned from this function -- is closed.
373+
if err := os.Remove(string(rPath)); err != nil {
374+
tfs.logger.WithFields(logging.Fields{
375+
"path": rPath,
376+
"size": size,
377+
}).Error("failed to delete immediately-rejected existing file from cache on startup")
378+
return
379+
}
380+
}
381+
}
382+
366383
func (tfs *TierFS) newLocalFileRef(namespace, nsPath, filename string) localFileRef {
367-
relative := path.Join(nsPath, filename)
384+
rPath := path.Join(nsPath, filename)
368385
return localFileRef{
369386
namespace: namespace,
370387
filename: filename,
371-
fsRelativePath: params.RelativePath(relative),
372-
fullPath: path.Join(tfs.fsLocalBaseDir, relative),
388+
fsRelativePath: params.RelativePath(rPath),
389+
fullPath: path.Join(tfs.fsLocalBaseDir, rPath),
373390
}
374391
}
375392

pyramid/tier_fs_test.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"testing"
1515

1616
"github.com/google/uuid"
17+
"github.com/stretchr/testify/assert"
1718
"github.com/stretchr/testify/require"
1819
"github.com/treeverse/lakefs/block/mem"
1920
"github.com/treeverse/lakefs/logging"
@@ -73,8 +74,12 @@ func TestStartup(t *testing.T) {
7374
// cleanup
7475
baseDir := path.Join(os.TempDir(), fsName)
7576
defer func() {
77+
if t.Failed() {
78+
// Leave behind the evidence.
79+
return
80+
}
7681
if err := os.RemoveAll(baseDir); err != nil {
77-
t.Fatal("Remove all filed under", baseDir, err)
82+
t.Fatal("Remove all files under", baseDir, err)
7883
}
7984
}()
8085

@@ -112,16 +117,18 @@ func TestStartup(t *testing.T) {
112117
}
113118

114119
dir, err := os.Open(workspacePath)
115-
require.Nil(t, dir)
116-
require.True(t, os.IsNotExist(err))
120+
assert.Nil(t, dir, "expected to fail to open %s", workspacePath)
121+
// os.IsNotExist does not look as hard to errors.Is; for errors returned directly from
122+
// package os this does not matter.
123+
assert.Error(t, err, os.ErrNotExist, "expected %s not to exist", workspacePath)
117124

118125
f, err := localFS.Open(ctx, namespace, filename)
119126
defer func() { _ = f.Close() }()
120-
require.NoError(t, err)
127+
assert.NoError(t, err)
121128

122129
data, err := ioutil.ReadAll(f)
123-
require.NoError(t, err)
124-
require.Equal(t, content, data)
130+
assert.NoError(t, err)
131+
assert.Equal(t, content, data)
125132
}
126133

127134
func testEviction(t *testing.T, namespaces ...string) {

0 commit comments

Comments
 (0)