Skip to content

Commit 125ef95

Browse files
committed
chore: add files cache for reading template tar archives from db
1 parent 9bc727e commit 125ef95

File tree

5 files changed

+303
-0
lines changed

5 files changed

+303
-0
lines changed

archive/fs/tar.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package archivefs
2+
3+
import (
4+
"archive/tar"
5+
"io"
6+
"io/fs"
7+
8+
"github.com/spf13/afero"
9+
"github.com/spf13/afero/tarfs"
10+
)
11+
12+
func FromTarReader(r io.Reader) fs.FS {
13+
tr := tar.NewReader(r)
14+
tfs := tarfs.New(tr)
15+
rofs := afero.NewReadOnlyFs(tfs)
16+
return afero.NewIOFS(rofs)
17+
}

coderd/files/cache.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package files
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"io/fs"
7+
"sync"
8+
"sync/atomic"
9+
10+
"github.com/google/uuid"
11+
"golang.org/x/xerrors"
12+
13+
archivefs "github.com/coder/coder/v2/archive/fs"
14+
"github.com/coder/coder/v2/coderd/database"
15+
"github.com/coder/coder/v2/coderd/util/lazy"
16+
)
17+
18+
// NewFromStore returns a file cache that will fetch files from the provided
19+
// database.
20+
func NewFromStore(store database.Store) Cache {
21+
fetcher := func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
22+
file, err := store.GetFileByID(ctx, fileID)
23+
if err != nil {
24+
return nil, xerrors.Errorf("failed to read file from database: %w", err)
25+
}
26+
27+
content := bytes.NewBuffer(file.Data)
28+
return archivefs.FromTarReader(content), nil
29+
}
30+
31+
return Cache{
32+
lock: sync.Mutex{},
33+
data: make(map[uuid.UUID]*cacheEntry),
34+
fetcher: fetcher,
35+
}
36+
}
37+
38+
// Cache persists the files for template versions, and is used by dynamic
39+
// parameters to deduplicate the files in memory. When any number of users opens
40+
// the workspace creation form for a given template version, it's files are
41+
// loaded into memory exactly once. We hold those files until there are no
42+
// longer any open connections, and then we remove the value from the map.
43+
type Cache struct {
44+
lock sync.Mutex
45+
data map[uuid.UUID]*cacheEntry
46+
fetcher
47+
}
48+
49+
type cacheEntry struct {
50+
refCount *atomic.Int64
51+
value *lazy.ValueWithError[fs.FS]
52+
}
53+
54+
type fetcher func(context.Context, uuid.UUID) (fs.FS, error)
55+
56+
// Acquire will load the fs.FS for the given file. It guarantees that parallel
57+
// calls for the same fileID will only result in one fetch, and that parallel
58+
// calls for distinct fileIDs will fetch in parallel.
59+
func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
60+
// It's important that this `Load` call occurs outside of `prepare`, after the
61+
// mutex has been released, or we would continue to hold the lock until the
62+
// entire file has been fetched, which may be slow, and would prevent other
63+
// files from being fetched in parallel.
64+
return c.prepare(ctx, fileID).Load()
65+
}
66+
67+
func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[fs.FS] {
68+
c.lock.Lock()
69+
defer c.lock.Unlock()
70+
71+
entry, ok := c.data[fileID]
72+
if !ok {
73+
var refCount atomic.Int64
74+
value := lazy.NewWithError(func() (fs.FS, error) {
75+
return c.fetcher(ctx, fileID)
76+
})
77+
78+
entry = &cacheEntry{
79+
value: value,
80+
refCount: &refCount,
81+
}
82+
c.data[fileID] = entry
83+
}
84+
85+
entry.refCount.Add(1)
86+
return entry.value
87+
}
88+
89+
// Release decrements the reference count for the given fileID, and frees the
90+
// backing data if there are no further references being held.
91+
func (c *Cache) Release(fileID uuid.UUID) {
92+
c.lock.Lock()
93+
defer c.lock.Unlock()
94+
95+
entry, ok := c.data[fileID]
96+
if !ok {
97+
// If we land here, it's almost certainly because a bug already happened,
98+
// and we're freeing something that's already been freed, or we're calling
99+
// this function with an incorrect ID. Should this function return an error?
100+
return
101+
}
102+
refCount := entry.refCount.Add(-1)
103+
if refCount > 0 {
104+
return
105+
}
106+
delete(c.data, fileID)
107+
}

coderd/files/cache_internal_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package files
2+
3+
import (
4+
"context"
5+
"io/fs"
6+
"sync"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/spf13/afero"
13+
"github.com/stretchr/testify/require"
14+
"golang.org/x/sync/errgroup"
15+
16+
"github.com/coder/coder/v2/testutil"
17+
)
18+
19+
var emptyFS fs.FS = afero.NewIOFS(afero.NewMemMapFs())
20+
21+
func TestConcurrency(t *testing.T) {
22+
t.Parallel()
23+
24+
var fetches atomic.Int64
25+
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
26+
fetches.Add(1)
27+
// Wait long enough before returning to make sure that all of the goroutines
28+
// will be waiting in line, ensuring that no one duplicated a fetch.
29+
time.Sleep(testutil.IntervalMedium)
30+
return emptyFS, nil
31+
})
32+
33+
batches := 1000
34+
groups := make([]*errgroup.Group, 0, batches)
35+
for range batches {
36+
groups = append(groups, new(errgroup.Group))
37+
}
38+
39+
// Call Acquire with a unique ID per batch, many times per batch, with many
40+
// batches all in parallel. This is pretty much the worst-case scenario:
41+
// thousands of concurrent reads, with both warm and cold loads happening.
42+
batchSize := 10
43+
for _, g := range groups {
44+
id := uuid.New()
45+
for range batchSize {
46+
g.Go(func() error {
47+
_, err := c.Acquire(t.Context(), id)
48+
return err
49+
})
50+
}
51+
}
52+
53+
for _, g := range groups {
54+
require.NoError(t, g.Wait())
55+
}
56+
require.Equal(t, int64(batches), fetches.Load())
57+
}
58+
59+
func TestRelease(t *testing.T) {
60+
t.Parallel()
61+
62+
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
63+
return emptyFS, nil
64+
})
65+
66+
batches := 100
67+
ids := make([]uuid.UUID, 0, batches)
68+
for range batches {
69+
ids = append(ids, uuid.New())
70+
}
71+
72+
// Acquire a bunch of references
73+
batchSize := 10
74+
for _, id := range ids {
75+
for range batchSize {
76+
fs, err := c.Acquire(t.Context(), id)
77+
require.NoError(t, err)
78+
require.Equal(t, emptyFS, fs)
79+
}
80+
}
81+
82+
// Make sure cache is fully loaded
83+
require.Equal(t, len(c.data), batches)
84+
85+
// Now release all of the references
86+
for _, id := range ids {
87+
for range batchSize {
88+
c.Release(id)
89+
}
90+
}
91+
92+
// ...and make sure that the cache has emptied itself.
93+
require.Equal(t, len(c.data), 0)
94+
}
95+
96+
func newTestCache(fetcher func(context.Context, uuid.UUID) (fs.FS, error)) Cache {
97+
return Cache{
98+
lock: sync.Mutex{},
99+
data: make(map[uuid.UUID]*cacheEntry),
100+
fetcher: fetcher,
101+
}
102+
}

coderd/util/lazy/valuewitherror.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package lazy
2+
3+
type ValueWithError[T any] struct {
4+
inner Value[result[T]]
5+
}
6+
7+
type result[T any] struct {
8+
value T
9+
err error
10+
}
11+
12+
// NewWithError allows you to provide a lazy initializer that can fail.
13+
func NewWithError[T any](fn func() (T, error)) *ValueWithError[T] {
14+
return &ValueWithError[T]{
15+
inner: Value[result[T]]{fn: func() result[T] {
16+
value, err := fn()
17+
return result[T]{value: value, err: err}
18+
}},
19+
}
20+
}
21+
22+
func (v *ValueWithError[T]) Load() (T, error) {
23+
result := v.inner.Load()
24+
return result.value, result.err
25+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package lazy_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"golang.org/x/xerrors"
8+
9+
"github.com/coder/coder/v2/coderd/util/lazy"
10+
)
11+
12+
func TestLazyWithErrorOK(t *testing.T) {
13+
t.Parallel()
14+
15+
l := lazy.NewWithError(func() (int, error) {
16+
return 1, nil
17+
})
18+
19+
i, err := l.Load()
20+
require.NoError(t, err)
21+
require.Equal(t, 1, i)
22+
}
23+
24+
func TestLazyWithErrorErr(t *testing.T) {
25+
t.Parallel()
26+
27+
l := lazy.NewWithError(func() (int, error) {
28+
return 0, xerrors.New("oh no! everything that could went horribly wrong!")
29+
})
30+
31+
i, err := l.Load()
32+
require.Error(t, err)
33+
require.Equal(t, 0, i)
34+
}
35+
36+
func TestLazyWithErrorPointers(t *testing.T) {
37+
t.Parallel()
38+
39+
a := 1
40+
l := lazy.NewWithError(func() (*int, error) {
41+
return &a, nil
42+
})
43+
44+
b, err := l.Load()
45+
require.NoError(t, err)
46+
c, err := l.Load()
47+
require.NoError(t, err)
48+
49+
*b += 1
50+
*c += 1
51+
require.Equal(t, 3, a)
52+
}

0 commit comments

Comments
 (0)