Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add filecache prometheus metrics
  • Loading branch information
Emyrk committed May 28, 2025
commit 352cb4f9d75c71a26aacb266f565df4332e82ff3
2 changes: 1 addition & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func New(options *Options) *API {
TemplateScheduleStore: options.TemplateScheduleStore,
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
AccessControlStore: options.AccessControlStore,
FileCache: files.NewFromStore(options.Database),
FileCache: files.NewFromStore(options.Database, options.PrometheusRegistry),
Experiments: experiments,
WebpushDispatcher: options.WebPushDispatcher,
healthCheckGroup: &singleflight.Group[string, *healthsdk.HealthcheckReport]{},
Expand Down
119 changes: 106 additions & 13 deletions coderd/files/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/xerrors"

archivefs "github.com/coder/coder/v2/archive/fs"
Expand All @@ -16,22 +18,75 @@ import (

// NewFromStore returns a file cache that will fetch files from the provided
// database.
func NewFromStore(store database.Store) *Cache {
fetcher := func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
func NewFromStore(store database.Store, registerer prometheus.Registerer) *Cache {
fetch := func(ctx context.Context, fileID uuid.UUID) (fs.FS, int64, error) {
file, err := store.GetFileByID(ctx, fileID)
if err != nil {
return nil, xerrors.Errorf("failed to read file from database: %w", err)
return nil, 0, xerrors.Errorf("failed to read file from database: %w", err)
}

content := bytes.NewBuffer(file.Data)
return archivefs.FromTarReader(content), nil
return archivefs.FromTarReader(content), int64(content.Len()), nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. I really dislike trusting honest reporting of size here. is there not some way we could inspect the size of this entire object from inside the cache at runtime? there might not be but it'd be super slick

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is without doing some recursive Stat() call on all the files in the filesystem.

Right here is the cheapest place to do it. If we ever add a compression layer, then this won't be 100% accurate, but at present it does indicate the total number of bytes held in memory by the cache entry.

}

return &Cache{
return New(fetch, registerer)
}

func New(fetch fetcher, registerer prometheus.Registerer) *Cache {
return (&Cache{
lock: sync.Mutex{},
data: make(map[uuid.UUID]*cacheEntry),
fetcher: fetcher,
}
fetcher: fetch,
}).registerMetrics(registerer)
}

func (c *Cache) registerMetrics(registerer prometheus.Registerer) *Cache {
subsystem := "file_cache"
f := promauto.With(registerer)

c.currentCacheSize = f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_size_current",
Help: "The current size of all files currently open in the file cache.",
})

c.totalCacheSize = f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_size_total",
Help: "The total size of all files opened in the file cache.",
})

c.currentOpenFiles = f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_current",
Help: "The number of unique files currently open in the file cache.",
})

c.totalOpenedFiles = f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_total",
Help: "The number of unique files opened in the file cache.",
})

c.currentOpenFileReferences = f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_file_refs_current",
Help: "The number of file references currently open in the file cache.",
})

c.totalOpenFileReferences = f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_file_refs_total",
Help: "The number of file references currently open in the file cache.",
})

return c
}

// Cache persists the files for template versions, and is used by dynamic
Expand All @@ -43,15 +98,30 @@ type Cache struct {
lock sync.Mutex
data map[uuid.UUID]*cacheEntry
fetcher

// metrics
currentOpenFileReferences prometheus.Gauge
totalOpenFileReferences prometheus.Counter

currentOpenFiles prometheus.Gauge
totalOpenedFiles prometheus.Counter

currentCacheSize prometheus.Gauge
totalCacheSize prometheus.Counter
}

type cacheEntryValue struct {
dir fs.FS
size int64
}

type cacheEntry struct {
// refCount must only be accessed while the Cache lock is held.
refCount int
value *lazy.ValueWithError[fs.FS]
value *lazy.ValueWithError[cacheEntryValue]
}

type fetcher func(context.Context, uuid.UUID) (fs.FS, error)
type fetcher func(context.Context, uuid.UUID) (dir fs.FS, size int64, err error)

// Acquire will load the fs.FS for the given file. It guarantees that parallel
// calls for the same fileID will only result in one fetch, and that parallel
Expand All @@ -67,26 +137,41 @@ func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
if err != nil {
c.Release(fileID)
}
return it, err
return it.dir, err
}

func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[fs.FS] {
func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[cacheEntryValue] {
c.lock.Lock()
defer c.lock.Unlock()

entry, ok := c.data[fileID]
if !ok {
value := lazy.NewWithError(func() (fs.FS, error) {
return c.fetcher(ctx, fileID)
value := lazy.NewWithError(func() (cacheEntryValue, error) {
dir, size, err := c.fetcher(ctx, fileID)

// Always add to the cache size the bytes of the file loaded.
if err == nil {
c.currentCacheSize.Add(float64(size))
c.totalCacheSize.Add(float64(size))
}

return cacheEntryValue{
dir: dir,
size: size,
}, err
})

entry = &cacheEntry{
value: value,
refCount: 0,
}
c.data[fileID] = entry
c.currentOpenFiles.Inc()
c.totalOpenedFiles.Inc()
}

c.currentOpenFileReferences.Inc()
c.totalOpenFileReferences.Inc()
entry.refCount++
return entry.value
}
Expand All @@ -105,11 +190,19 @@ func (c *Cache) Release(fileID uuid.UUID) {
return
}

c.currentOpenFileReferences.Dec()
entry.refCount--
if entry.refCount > 0 {
return
}

c.currentOpenFiles.Dec()

ev, err := entry.value.Load()
if err == nil {
c.currentCacheSize.Add(-1 * float64(ev.size))
}
Comment on lines +206 to +209
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels really unfortunate, but it is all cached so it's not an expensive call.


delete(c.data, fileID)
}

Expand Down
18 changes: 7 additions & 11 deletions coderd/files/cache_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package files
import (
"context"
"io/fs"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand All @@ -21,12 +21,12 @@ func TestConcurrency(t *testing.T) {

emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
var fetches atomic.Int64
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, int64, error) {
fetches.Add(1)
// Wait long enough before returning to make sure that all of the goroutines
// will be waiting in line, ensuring that no one duplicated a fetch.
time.Sleep(testutil.IntervalMedium)
return emptyFS, nil
return emptyFS, 0, nil
})

batches := 1000
Expand Down Expand Up @@ -61,8 +61,8 @@ func TestRelease(t *testing.T) {
t.Parallel()

emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
return emptyFS, nil
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, int64, error) {
return emptyFS, 0, nil
})

batches := 100
Expand Down Expand Up @@ -95,10 +95,6 @@ func TestRelease(t *testing.T) {
require.Equal(t, len(c.data), 0)
}

func newTestCache(fetcher func(context.Context, uuid.UUID) (fs.FS, error)) Cache {
return Cache{
lock: sync.Mutex{},
data: make(map[uuid.UUID]*cacheEntry),
fetcher: fetcher,
}
func newTestCache(fetcher func(context.Context, uuid.UUID) (fs.FS, int64, error)) *Cache {
return New(fetcher, prometheus.NewRegistry())
}