Skip to content

Commit 9db114d

Browse files
authored
feat: add filecache prometheus metrics (#18089)
Dynamic parameters has an in memory file cache. This adds prometheus metrics to monitor said cache.
1 parent 562c469 commit 9db114d

File tree

3 files changed

+179
-33
lines changed

3 files changed

+179
-33
lines changed

coderd/coderd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ func New(options *Options) *API {
572572
TemplateScheduleStore: options.TemplateScheduleStore,
573573
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
574574
AccessControlStore: options.AccessControlStore,
575-
FileCache: files.NewFromStore(options.Database),
575+
FileCache: files.NewFromStore(options.Database, options.PrometheusRegistry),
576576
Experiments: experiments,
577577
WebpushDispatcher: options.WebPushDispatcher,
578578
healthCheckGroup: &singleflight.Group[string, *healthsdk.HealthcheckReport]{},

coderd/files/cache.go

Lines changed: 111 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"sync"
88

99
"github.com/google/uuid"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/promauto"
1012
"golang.org/x/xerrors"
1113

1214
archivefs "github.com/coder/coder/v2/archive/fs"
@@ -16,22 +18,78 @@ import (
1618

1719
// NewFromStore returns a file cache that will fetch files from the provided
1820
// database.
19-
func NewFromStore(store database.Store) *Cache {
20-
fetcher := func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
21+
func NewFromStore(store database.Store, registerer prometheus.Registerer) *Cache {
22+
fetch := func(ctx context.Context, fileID uuid.UUID) (cacheEntryValue, error) {
2123
file, err := store.GetFileByID(ctx, fileID)
2224
if err != nil {
23-
return nil, xerrors.Errorf("failed to read file from database: %w", err)
25+
return cacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err)
2426
}
2527

2628
content := bytes.NewBuffer(file.Data)
27-
return archivefs.FromTarReader(content), nil
29+
return cacheEntryValue{
30+
FS: archivefs.FromTarReader(content),
31+
size: int64(content.Len()),
32+
}, nil
2833
}
2934

30-
return &Cache{
35+
return New(fetch, registerer)
36+
}
37+
38+
func New(fetch fetcher, registerer prometheus.Registerer) *Cache {
39+
return (&Cache{
3140
lock: sync.Mutex{},
3241
data: make(map[uuid.UUID]*cacheEntry),
33-
fetcher: fetcher,
34-
}
42+
fetcher: fetch,
43+
}).registerMetrics(registerer)
44+
}
45+
46+
func (c *Cache) registerMetrics(registerer prometheus.Registerer) *Cache {
47+
subsystem := "file_cache"
48+
f := promauto.With(registerer)
49+
50+
c.currentCacheSize = f.NewGauge(prometheus.GaugeOpts{
51+
Namespace: "coderd",
52+
Subsystem: subsystem,
53+
Name: "open_files_size_bytes_current",
54+
Help: "The current amount of memory of all files currently open in the file cache.",
55+
})
56+
57+
c.totalCacheSize = f.NewCounter(prometheus.CounterOpts{
58+
Namespace: "coderd",
59+
Subsystem: subsystem,
60+
Name: "open_files_size_bytes_total",
61+
Help: "The total amount of memory ever opened in the file cache. This number never decrements.",
62+
})
63+
64+
c.currentOpenFiles = f.NewGauge(prometheus.GaugeOpts{
65+
Namespace: "coderd",
66+
Subsystem: subsystem,
67+
Name: "open_files_current",
68+
Help: "The count of unique files currently open in the file cache.",
69+
})
70+
71+
c.totalOpenedFiles = f.NewCounter(prometheus.CounterOpts{
72+
Namespace: "coderd",
73+
Subsystem: subsystem,
74+
Name: "open_files_total",
75+
Help: "The total count of unique files ever opened in the file cache.",
76+
})
77+
78+
c.currentOpenFileReferences = f.NewGauge(prometheus.GaugeOpts{
79+
Namespace: "coderd",
80+
Subsystem: subsystem,
81+
Name: "open_file_refs_current",
82+
Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.",
83+
})
84+
85+
c.totalOpenFileReferences = f.NewCounter(prometheus.CounterOpts{
86+
Namespace: "coderd",
87+
Subsystem: subsystem,
88+
Name: "open_file_refs_total",
89+
Help: "The total number of file references ever opened in the file cache.",
90+
})
91+
92+
return c
3593
}
3694

3795
// Cache persists the files for template versions, and is used by dynamic
@@ -43,15 +101,34 @@ type Cache struct {
43101
lock sync.Mutex
44102
data map[uuid.UUID]*cacheEntry
45103
fetcher
104+
105+
// metrics
106+
cacheMetrics
107+
}
108+
109+
type cacheMetrics struct {
110+
currentOpenFileReferences prometheus.Gauge
111+
totalOpenFileReferences prometheus.Counter
112+
113+
currentOpenFiles prometheus.Gauge
114+
totalOpenedFiles prometheus.Counter
115+
116+
currentCacheSize prometheus.Gauge
117+
totalCacheSize prometheus.Counter
118+
}
119+
120+
type cacheEntryValue struct {
121+
fs.FS
122+
size int64
46123
}
47124

48125
type cacheEntry struct {
49126
// refCount must only be accessed while the Cache lock is held.
50127
refCount int
51-
value *lazy.ValueWithError[fs.FS]
128+
value *lazy.ValueWithError[cacheEntryValue]
52129
}
53130

54-
type fetcher func(context.Context, uuid.UUID) (fs.FS, error)
131+
type fetcher func(context.Context, uuid.UUID) (cacheEntryValue, error)
55132

56133
// Acquire will load the fs.FS for the given file. It guarantees that parallel
57134
// calls for the same fileID will only result in one fetch, and that parallel
@@ -66,27 +143,40 @@ func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
66143
it, err := c.prepare(ctx, fileID).Load()
67144
if err != nil {
68145
c.Release(fileID)
146+
return nil, err
69147
}
70-
return it, err
148+
return it.FS, err
71149
}
72150

73-
func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[fs.FS] {
151+
func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[cacheEntryValue] {
74152
c.lock.Lock()
75153
defer c.lock.Unlock()
76154

77155
entry, ok := c.data[fileID]
78156
if !ok {
79-
value := lazy.NewWithError(func() (fs.FS, error) {
80-
return c.fetcher(ctx, fileID)
157+
value := lazy.NewWithError(func() (cacheEntryValue, error) {
158+
val, err := c.fetcher(ctx, fileID)
159+
160+
// Always add to the cache size the bytes of the file loaded.
161+
if err == nil {
162+
c.currentCacheSize.Add(float64(val.size))
163+
c.totalCacheSize.Add(float64(val.size))
164+
}
165+
166+
return val, err
81167
})
82168

83169
entry = &cacheEntry{
84170
value: value,
85171
refCount: 0,
86172
}
87173
c.data[fileID] = entry
174+
c.currentOpenFiles.Inc()
175+
c.totalOpenedFiles.Inc()
88176
}
89177

178+
c.currentOpenFileReferences.Inc()
179+
c.totalOpenFileReferences.Inc()
90180
entry.refCount++
91181
return entry.value
92182
}
@@ -105,11 +195,19 @@ func (c *Cache) Release(fileID uuid.UUID) {
105195
return
106196
}
107197

198+
c.currentOpenFileReferences.Dec()
108199
entry.refCount--
109200
if entry.refCount > 0 {
110201
return
111202
}
112203

204+
c.currentOpenFiles.Dec()
205+
206+
ev, err := entry.value.Load()
207+
if err == nil {
208+
c.currentCacheSize.Add(-1 * float64(ev.size))
209+
}
210+
113211
delete(c.data, fileID)
114212
}
115213

coderd/files/cache_internal_test.go

Lines changed: 67 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,38 @@ package files
22

33
import (
44
"context"
5-
"io/fs"
6-
"sync"
75
"sync/atomic"
86
"testing"
97
"time"
108

119
"github.com/google/uuid"
10+
"github.com/prometheus/client_golang/prometheus"
1211
"github.com/spf13/afero"
1312
"github.com/stretchr/testify/require"
1413
"golang.org/x/sync/errgroup"
1514

15+
"github.com/coder/coder/v2/coderd/coderdtest/promhelp"
1616
"github.com/coder/coder/v2/testutil"
1717
)
1818

19+
func cachePromMetricName(metric string) string {
20+
return "coderd_file_cache_" + metric
21+
}
22+
1923
func TestConcurrency(t *testing.T) {
2024
t.Parallel()
2125

26+
const fileSize = 10
2227
emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
2328
var fetches atomic.Int64
24-
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
29+
reg := prometheus.NewRegistry()
30+
c := New(func(_ context.Context, _ uuid.UUID) (cacheEntryValue, error) {
2531
fetches.Add(1)
2632
// Wait long enough before returning to make sure that all of the goroutines
2733
// will be waiting in line, ensuring that no one duplicated a fetch.
2834
time.Sleep(testutil.IntervalMedium)
29-
return emptyFS, nil
30-
})
35+
return cacheEntryValue{FS: emptyFS, size: fileSize}, nil
36+
}, reg)
3137

3238
batches := 1000
3339
groups := make([]*errgroup.Group, 0, batches)
@@ -55,15 +61,29 @@ func TestConcurrency(t *testing.T) {
5561
require.NoError(t, g.Wait())
5662
}
5763
require.Equal(t, int64(batches), fetches.Load())
64+
65+
// Verify all the counts & metrics are correct.
66+
require.Equal(t, batches, c.Count())
67+
require.Equal(t, batches*fileSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil))
68+
require.Equal(t, batches*fileSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_size_bytes_total"), nil))
69+
require.Equal(t, batches, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil))
70+
require.Equal(t, batches, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_total"), nil))
71+
require.Equal(t, batches*batchSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil))
72+
require.Equal(t, batches*batchSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_file_refs_total"), nil))
5873
}
5974

6075
func TestRelease(t *testing.T) {
6176
t.Parallel()
6277

78+
const fileSize = 10
6379
emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
64-
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
65-
return emptyFS, nil
66-
})
80+
reg := prometheus.NewRegistry()
81+
c := New(func(_ context.Context, _ uuid.UUID) (cacheEntryValue, error) {
82+
return cacheEntryValue{
83+
FS: emptyFS,
84+
size: fileSize,
85+
}, nil
86+
}, reg)
6787

6888
batches := 100
6989
ids := make([]uuid.UUID, 0, batches)
@@ -73,32 +93,60 @@ func TestRelease(t *testing.T) {
7393

7494
// Acquire a bunch of references
7595
batchSize := 10
76-
for _, id := range ids {
77-
for range batchSize {
96+
for openedIdx, id := range ids {
97+
for batchIdx := range batchSize {
7898
it, err := c.Acquire(t.Context(), id)
7999
require.NoError(t, err)
80100
require.Equal(t, emptyFS, it)
101+
102+
// Each time a new file is opened, the metrics should be updated as so:
103+
opened := openedIdx + 1
104+
// Number of unique files opened is equal to the idx of the ids.
105+
require.Equal(t, opened, c.Count())
106+
require.Equal(t, opened, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil))
107+
// Current file size is unique files * file size.
108+
require.Equal(t, opened*fileSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil))
109+
// The number of refs is the current iteration of both loops.
110+
require.Equal(t, ((opened-1)*batchSize)+(batchIdx+1), promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil))
81111
}
82112
}
83113

84114
// Make sure cache is fully loaded
85115
require.Equal(t, len(c.data), batches)
86116

87117
// Now release all of the references
88-
for _, id := range ids {
89-
for range batchSize {
118+
for closedIdx, id := range ids {
119+
stillOpen := len(ids) - closedIdx
120+
for closingIdx := range batchSize {
90121
c.Release(id)
122+
123+
// Each time a file is released, the metrics should decrement the file refs
124+
require.Equal(t, (stillOpen*batchSize)-(closingIdx+1), promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil))
125+
126+
closed := closingIdx+1 == batchSize
127+
if closed {
128+
continue
129+
}
130+
131+
// File ref still exists, so the counts should not change yet.
132+
require.Equal(t, stillOpen, c.Count())
133+
require.Equal(t, stillOpen, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil))
134+
require.Equal(t, stillOpen*fileSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil))
91135
}
92136
}
93137

94138
// ...and make sure that the cache has emptied itself.
95139
require.Equal(t, len(c.data), 0)
96-
}
97140

98-
func newTestCache(fetcher func(context.Context, uuid.UUID) (fs.FS, error)) Cache {
99-
return Cache{
100-
lock: sync.Mutex{},
101-
data: make(map[uuid.UUID]*cacheEntry),
102-
fetcher: fetcher,
103-
}
141+
// Verify all the counts & metrics are correct.
142+
// All existing files are closed
143+
require.Equal(t, 0, c.Count())
144+
require.Equal(t, 0, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil))
145+
require.Equal(t, 0, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil))
146+
require.Equal(t, 0, promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil))
147+
148+
// Total counts remain
149+
require.Equal(t, batches*fileSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_size_bytes_total"), nil))
150+
require.Equal(t, batches, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_total"), nil))
151+
require.Equal(t, batches*batchSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_file_refs_total"), nil))
104152
}

0 commit comments

Comments
 (0)