diff --git a/coderd/coderd.go b/coderd/coderd.go index c8cd3742c5ec3..69d942304acea 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -572,7 +572,7 @@ func New(options *Options) *API { TemplateScheduleStore: options.TemplateScheduleStore, UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore, AccessControlStore: options.AccessControlStore, - FileCache: files.NewFromStore(options.Database), + FileCache: files.NewFromStore(options.Database, options.PrometheusRegistry), Experiments: experiments, WebpushDispatcher: options.WebPushDispatcher, healthCheckGroup: &singleflight.Group[string, *healthsdk.HealthcheckReport]{}, diff --git a/coderd/files/cache.go b/coderd/files/cache.go index 56e9a715de189..48587eb402351 100644 --- a/coderd/files/cache.go +++ b/coderd/files/cache.go @@ -7,6 +7,8 @@ import ( "sync" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/xerrors" archivefs "github.com/coder/coder/v2/archive/fs" @@ -16,22 +18,78 @@ import ( // NewFromStore returns a file cache that will fetch files from the provided // database. -func NewFromStore(store database.Store) *Cache { - fetcher := func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) { +func NewFromStore(store database.Store, registerer prometheus.Registerer) *Cache { + fetch := func(ctx context.Context, fileID uuid.UUID) (cacheEntryValue, error) { file, err := store.GetFileByID(ctx, fileID) if err != nil { - return nil, xerrors.Errorf("failed to read file from database: %w", err) + return cacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err) } content := bytes.NewBuffer(file.Data) - return archivefs.FromTarReader(content), nil + return cacheEntryValue{ + FS: archivefs.FromTarReader(content), + size: int64(content.Len()), + }, nil } - return &Cache{ + return New(fetch, registerer) +} + +func New(fetch fetcher, registerer prometheus.Registerer) *Cache { + return (&Cache{ lock: sync.Mutex{}, data: make(map[uuid.UUID]*cacheEntry), - fetcher: fetcher, - } + fetcher: fetch, + }).registerMetrics(registerer) +} + +func (c *Cache) registerMetrics(registerer prometheus.Registerer) *Cache { + subsystem := "file_cache" + f := promauto.With(registerer) + + c.currentCacheSize = f.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_size_bytes_current", + Help: "The current amount of memory of all files currently open in the file cache.", + }) + + c.totalCacheSize = f.NewCounter(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_size_bytes_total", + Help: "The total amount of memory ever opened in the file cache. This number never decrements.", + }) + + c.currentOpenFiles = f.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_current", + Help: "The count of unique files currently open in the file cache.", + }) + + c.totalOpenedFiles = f.NewCounter(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_files_total", + Help: "The total count of unique files ever opened in the file cache.", + }) + + c.currentOpenFileReferences = f.NewGauge(prometheus.GaugeOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_file_refs_current", + Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.", + }) + + c.totalOpenFileReferences = f.NewCounter(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: subsystem, + Name: "open_file_refs_total", + Help: "The total number of file references ever opened in the file cache.", + }) + + return c } // Cache persists the files for template versions, and is used by dynamic @@ -43,15 +101,34 @@ type Cache struct { lock sync.Mutex data map[uuid.UUID]*cacheEntry fetcher + + // metrics + cacheMetrics +} + +type cacheMetrics struct { + currentOpenFileReferences prometheus.Gauge + totalOpenFileReferences prometheus.Counter + + currentOpenFiles prometheus.Gauge + totalOpenedFiles prometheus.Counter + + currentCacheSize prometheus.Gauge + totalCacheSize prometheus.Counter +} + +type cacheEntryValue struct { + fs.FS + size int64 } type cacheEntry struct { // refCount must only be accessed while the Cache lock is held. refCount int - value *lazy.ValueWithError[fs.FS] + value *lazy.ValueWithError[cacheEntryValue] } -type fetcher func(context.Context, uuid.UUID) (fs.FS, error) +type fetcher func(context.Context, uuid.UUID) (cacheEntryValue, error) // Acquire will load the fs.FS for the given file. It guarantees that parallel // calls for the same fileID will only result in one fetch, and that parallel @@ -66,18 +143,27 @@ func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (fs.FS, error) { it, err := c.prepare(ctx, fileID).Load() if err != nil { c.Release(fileID) + return nil, err } - return it, err + return it.FS, err } -func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[fs.FS] { +func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[cacheEntryValue] { c.lock.Lock() defer c.lock.Unlock() entry, ok := c.data[fileID] if !ok { - value := lazy.NewWithError(func() (fs.FS, error) { - return c.fetcher(ctx, fileID) + value := lazy.NewWithError(func() (cacheEntryValue, error) { + val, err := c.fetcher(ctx, fileID) + + // Always add to the cache size the bytes of the file loaded. + if err == nil { + c.currentCacheSize.Add(float64(val.size)) + c.totalCacheSize.Add(float64(val.size)) + } + + return val, err }) entry = &cacheEntry{ @@ -85,8 +171,12 @@ func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithEr refCount: 0, } c.data[fileID] = entry + c.currentOpenFiles.Inc() + c.totalOpenedFiles.Inc() } + c.currentOpenFileReferences.Inc() + c.totalOpenFileReferences.Inc() entry.refCount++ return entry.value } @@ -105,11 +195,19 @@ func (c *Cache) Release(fileID uuid.UUID) { return } + c.currentOpenFileReferences.Dec() entry.refCount-- if entry.refCount > 0 { return } + c.currentOpenFiles.Dec() + + ev, err := entry.value.Load() + if err == nil { + c.currentCacheSize.Add(-1 * float64(ev.size)) + } + delete(c.data, fileID) } diff --git a/coderd/files/cache_internal_test.go b/coderd/files/cache_internal_test.go index 03603906b6ccd..6ad84185b44b6 100644 --- a/coderd/files/cache_internal_test.go +++ b/coderd/files/cache_internal_test.go @@ -2,32 +2,38 @@ package files import ( "context" - "io/fs" - "sync" "sync/atomic" "testing" "time" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" + "github.com/coder/coder/v2/coderd/coderdtest/promhelp" "github.com/coder/coder/v2/testutil" ) +func cachePromMetricName(metric string) string { + return "coderd_file_cache_" + metric +} + func TestConcurrency(t *testing.T) { t.Parallel() + const fileSize = 10 emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs())) var fetches atomic.Int64 - c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) { + reg := prometheus.NewRegistry() + c := New(func(_ context.Context, _ uuid.UUID) (cacheEntryValue, error) { fetches.Add(1) // Wait long enough before returning to make sure that all of the goroutines // will be waiting in line, ensuring that no one duplicated a fetch. time.Sleep(testutil.IntervalMedium) - return emptyFS, nil - }) + return cacheEntryValue{FS: emptyFS, size: fileSize}, nil + }, reg) batches := 1000 groups := make([]*errgroup.Group, 0, batches) @@ -55,15 +61,29 @@ func TestConcurrency(t *testing.T) { require.NoError(t, g.Wait()) } require.Equal(t, int64(batches), fetches.Load()) + + // Verify all the counts & metrics are correct. + require.Equal(t, batches, c.Count()) + require.Equal(t, batches*fileSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil)) + require.Equal(t, batches*fileSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_size_bytes_total"), nil)) + require.Equal(t, batches, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil)) + require.Equal(t, batches, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_total"), nil)) + require.Equal(t, batches*batchSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil)) + require.Equal(t, batches*batchSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_file_refs_total"), nil)) } func TestRelease(t *testing.T) { t.Parallel() + const fileSize = 10 emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs())) - c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) { - return emptyFS, nil - }) + reg := prometheus.NewRegistry() + c := New(func(_ context.Context, _ uuid.UUID) (cacheEntryValue, error) { + return cacheEntryValue{ + FS: emptyFS, + size: fileSize, + }, nil + }, reg) batches := 100 ids := make([]uuid.UUID, 0, batches) @@ -73,11 +93,21 @@ func TestRelease(t *testing.T) { // Acquire a bunch of references batchSize := 10 - for _, id := range ids { - for range batchSize { + for openedIdx, id := range ids { + for batchIdx := range batchSize { it, err := c.Acquire(t.Context(), id) require.NoError(t, err) require.Equal(t, emptyFS, it) + + // Each time a new file is opened, the metrics should be updated as so: + opened := openedIdx + 1 + // Number of unique files opened is equal to the idx of the ids. + require.Equal(t, opened, c.Count()) + require.Equal(t, opened, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil)) + // Current file size is unique files * file size. + require.Equal(t, opened*fileSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil)) + // The number of refs is the current iteration of both loops. + require.Equal(t, ((opened-1)*batchSize)+(batchIdx+1), promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil)) } } @@ -85,20 +115,38 @@ func TestRelease(t *testing.T) { require.Equal(t, len(c.data), batches) // Now release all of the references - for _, id := range ids { - for range batchSize { + for closedIdx, id := range ids { + stillOpen := len(ids) - closedIdx + for closingIdx := range batchSize { c.Release(id) + + // Each time a file is released, the metrics should decrement the file refs + require.Equal(t, (stillOpen*batchSize)-(closingIdx+1), promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil)) + + closed := closingIdx+1 == batchSize + if closed { + continue + } + + // File ref still exists, so the counts should not change yet. + require.Equal(t, stillOpen, c.Count()) + require.Equal(t, stillOpen, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil)) + require.Equal(t, stillOpen*fileSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil)) } } // ...and make sure that the cache has emptied itself. require.Equal(t, len(c.data), 0) -} -func newTestCache(fetcher func(context.Context, uuid.UUID) (fs.FS, error)) Cache { - return Cache{ - lock: sync.Mutex{}, - data: make(map[uuid.UUID]*cacheEntry), - fetcher: fetcher, - } + // Verify all the counts & metrics are correct. + // All existing files are closed + require.Equal(t, 0, c.Count()) + require.Equal(t, 0, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil)) + require.Equal(t, 0, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil)) + require.Equal(t, 0, promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil)) + + // Total counts remain + require.Equal(t, batches*fileSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_size_bytes_total"), nil)) + require.Equal(t, batches, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_total"), nil)) + require.Equal(t, batches*batchSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_file_refs_total"), nil)) }