Skip to content

Commit 9fb18f3

Browse files
authored
feat(coderd): batch agent stats inserts (#8875)
This PR adds support for batching inserts to the workspace_agents_stats table. Up to 1024 stats are batched, and flushed every second in a batch.
1 parent ae88b79 commit 9fb18f3

File tree

14 files changed

+785
-35
lines changed

14 files changed

+785
-35
lines changed

cli/server.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import (
6363
"github.com/coder/coder/cli/config"
6464
"github.com/coder/coder/coderd"
6565
"github.com/coder/coder/coderd/autobuild"
66+
"github.com/coder/coder/coderd/batchstats"
6667
"github.com/coder/coder/coderd/database"
6768
"github.com/coder/coder/coderd/database/dbfake"
6869
"github.com/coder/coder/coderd/database/dbmetrics"
@@ -813,6 +814,16 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
813814
options.SwaggerEndpoint = cfg.Swagger.Enable.Value()
814815
}
815816

817+
batcher, closeBatcher, err := batchstats.New(ctx,
818+
batchstats.WithLogger(options.Logger.Named("batchstats")),
819+
batchstats.WithStore(options.Database),
820+
)
821+
if err != nil {
822+
return xerrors.Errorf("failed to create agent stats batcher: %w", err)
823+
}
824+
options.StatsBatcher = batcher
825+
defer closeBatcher()
826+
816827
closeCheckInactiveUsersFunc := dormancy.CheckInactiveUsers(ctx, logger, options.Database)
817828
defer closeCheckInactiveUsersFunc()
818829

coderd/batchstats/batcher.go

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
package batchstats
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"os"
7+
"sync"
8+
"sync/atomic"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"golang.org/x/xerrors"
13+
14+
"cdr.dev/slog"
15+
"cdr.dev/slog/sloggers/sloghuman"
16+
17+
"github.com/coder/coder/coderd/database"
18+
"github.com/coder/coder/coderd/database/dbauthz"
19+
"github.com/coder/coder/codersdk/agentsdk"
20+
)
21+
22+
const (
23+
defaultBufferSize = 1024
24+
defaultFlushInterval = time.Second
25+
)
26+
27+
// Batcher holds a buffer of agent stats and periodically flushes them to
28+
// its configured store. It also updates the workspace's last used time.
29+
type Batcher struct {
30+
store database.Store
31+
log slog.Logger
32+
33+
mu sync.Mutex
34+
// TODO: make this a buffered chan instead?
35+
buf *database.InsertWorkspaceAgentStatsParams
36+
// NOTE: we batch this separately as it's a jsonb field and
37+
// pq.Array + unnest doesn't play nicely with this.
38+
connectionsByProto []map[string]int64
39+
batchSize int
40+
41+
// tickCh is used to periodically flush the buffer.
42+
tickCh <-chan time.Time
43+
ticker *time.Ticker
44+
interval time.Duration
45+
// flushLever is used to signal the flusher to flush the buffer immediately.
46+
flushLever chan struct{}
47+
flushForced atomic.Bool
48+
// flushed is used during testing to signal that a flush has completed.
49+
flushed chan<- int
50+
}
51+
52+
// Option is a functional option for configuring a Batcher.
53+
type Option func(b *Batcher)
54+
55+
// WithStore sets the store to use for storing stats.
56+
func WithStore(store database.Store) Option {
57+
return func(b *Batcher) {
58+
b.store = store
59+
}
60+
}
61+
62+
// WithBatchSize sets the number of stats to store in a batch.
63+
func WithBatchSize(size int) Option {
64+
return func(b *Batcher) {
65+
b.batchSize = size
66+
}
67+
}
68+
69+
// WithInterval sets the interval for flushes.
70+
func WithInterval(d time.Duration) Option {
71+
return func(b *Batcher) {
72+
b.interval = d
73+
}
74+
}
75+
76+
// WithLogger sets the logger to use for logging.
77+
func WithLogger(log slog.Logger) Option {
78+
return func(b *Batcher) {
79+
b.log = log
80+
}
81+
}
82+
83+
// New creates a new Batcher and starts it.
84+
func New(ctx context.Context, opts ...Option) (*Batcher, func(), error) {
85+
b := &Batcher{}
86+
b.log = slog.Make(sloghuman.Sink(os.Stderr))
87+
b.flushLever = make(chan struct{}, 1) // Buffered so that it doesn't block.
88+
for _, opt := range opts {
89+
opt(b)
90+
}
91+
92+
if b.store == nil {
93+
return nil, nil, xerrors.Errorf("no store configured for batcher")
94+
}
95+
96+
if b.interval == 0 {
97+
b.interval = defaultFlushInterval
98+
}
99+
100+
if b.batchSize == 0 {
101+
b.batchSize = defaultBufferSize
102+
}
103+
104+
if b.tickCh == nil {
105+
b.ticker = time.NewTicker(b.interval)
106+
b.tickCh = b.ticker.C
107+
}
108+
109+
cancelCtx, cancelFunc := context.WithCancel(ctx)
110+
done := make(chan struct{})
111+
go func() {
112+
b.run(cancelCtx)
113+
close(done)
114+
}()
115+
116+
closer := func() {
117+
cancelFunc()
118+
if b.ticker != nil {
119+
b.ticker.Stop()
120+
}
121+
<-done
122+
}
123+
124+
return b, closer, nil
125+
}
126+
127+
// Add adds a stat to the batcher for the given workspace and agent.
128+
func (b *Batcher) Add(
129+
agentID uuid.UUID,
130+
templateID uuid.UUID,
131+
userID uuid.UUID,
132+
workspaceID uuid.UUID,
133+
st agentsdk.Stats,
134+
) error {
135+
b.mu.Lock()
136+
defer b.mu.Unlock()
137+
138+
now := database.Now()
139+
140+
b.buf.ID = append(b.buf.ID, uuid.New())
141+
b.buf.CreatedAt = append(b.buf.CreatedAt, now)
142+
b.buf.AgentID = append(b.buf.AgentID, agentID)
143+
b.buf.UserID = append(b.buf.UserID, userID)
144+
b.buf.TemplateID = append(b.buf.TemplateID, templateID)
145+
b.buf.WorkspaceID = append(b.buf.WorkspaceID, workspaceID)
146+
147+
// Store the connections by proto separately as it's a jsonb field. We marshal on flush.
148+
// b.buf.ConnectionsByProto = append(b.buf.ConnectionsByProto, st.ConnectionsByProto)
149+
b.connectionsByProto = append(b.connectionsByProto, st.ConnectionsByProto)
150+
151+
b.buf.ConnectionCount = append(b.buf.ConnectionCount, st.ConnectionCount)
152+
b.buf.RxPackets = append(b.buf.RxPackets, st.RxPackets)
153+
b.buf.RxBytes = append(b.buf.RxBytes, st.RxBytes)
154+
b.buf.TxPackets = append(b.buf.TxPackets, st.TxPackets)
155+
b.buf.TxBytes = append(b.buf.TxBytes, st.TxBytes)
156+
b.buf.SessionCountVSCode = append(b.buf.SessionCountVSCode, st.SessionCountVSCode)
157+
b.buf.SessionCountJetBrains = append(b.buf.SessionCountJetBrains, st.SessionCountJetBrains)
158+
b.buf.SessionCountReconnectingPTY = append(b.buf.SessionCountReconnectingPTY, st.SessionCountReconnectingPTY)
159+
b.buf.SessionCountSSH = append(b.buf.SessionCountSSH, st.SessionCountSSH)
160+
b.buf.ConnectionMedianLatencyMS = append(b.buf.ConnectionMedianLatencyMS, st.ConnectionMedianLatencyMS)
161+
162+
// If the buffer is over 80% full, signal the flusher to flush immediately.
163+
// We want to trigger flushes early to reduce the likelihood of
164+
// accidentally growing the buffer over batchSize.
165+
filled := float64(len(b.buf.ID)) / float64(b.batchSize)
166+
if filled >= 0.8 && !b.flushForced.Load() {
167+
b.flushLever <- struct{}{}
168+
b.flushForced.Store(true)
169+
}
170+
return nil
171+
}
172+
173+
// Run runs the batcher.
174+
func (b *Batcher) run(ctx context.Context) {
175+
b.initBuf(b.batchSize)
176+
// nolint:gocritic // This is only ever used for one thing - inserting agent stats.
177+
authCtx := dbauthz.AsSystemRestricted(ctx)
178+
for {
179+
select {
180+
case <-b.tickCh:
181+
b.flush(authCtx, false, "scheduled")
182+
case <-b.flushLever:
183+
// If the flush lever is depressed, flush the buffer immediately.
184+
b.flush(authCtx, true, "reaching capacity")
185+
case <-ctx.Done():
186+
b.log.Warn(ctx, "context done, flushing before exit")
187+
b.flush(authCtx, true, "exit")
188+
return
189+
}
190+
}
191+
}
192+
193+
// flush flushes the batcher's buffer.
194+
func (b *Batcher) flush(ctx context.Context, forced bool, reason string) {
195+
b.mu.Lock()
196+
b.flushForced.Store(true)
197+
start := time.Now()
198+
count := len(b.buf.ID)
199+
defer func() {
200+
b.flushForced.Store(false)
201+
b.mu.Unlock()
202+
// Notify that a flush has completed. This only happens in tests.
203+
if b.flushed != nil {
204+
select {
205+
case <-ctx.Done():
206+
close(b.flushed)
207+
default:
208+
b.flushed <- count
209+
}
210+
}
211+
if count > 0 {
212+
elapsed := time.Since(start)
213+
b.log.Debug(ctx, "flush complete",
214+
slog.F("count", count),
215+
slog.F("elapsed", elapsed),
216+
slog.F("forced", forced),
217+
slog.F("reason", reason),
218+
)
219+
}
220+
}()
221+
222+
if len(b.buf.ID) == 0 {
223+
return
224+
}
225+
226+
// marshal connections by proto
227+
payload, err := json.Marshal(b.connectionsByProto)
228+
if err != nil {
229+
b.log.Error(ctx, "unable to marshal agent connections by proto, dropping data", slog.Error(err))
230+
b.buf.ConnectionsByProto = json.RawMessage(`[]`)
231+
} else {
232+
b.buf.ConnectionsByProto = payload
233+
}
234+
235+
err = b.store.InsertWorkspaceAgentStats(ctx, *b.buf)
236+
elapsed := time.Since(start)
237+
if err != nil {
238+
b.log.Error(ctx, "error inserting workspace agent stats", slog.Error(err), slog.F("elapsed", elapsed))
239+
return
240+
}
241+
242+
b.resetBuf()
243+
}
244+
245+
// initBuf resets the buffer. b MUST be locked.
246+
func (b *Batcher) initBuf(size int) {
247+
b.buf = &database.InsertWorkspaceAgentStatsParams{
248+
ID: make([]uuid.UUID, 0, b.batchSize),
249+
CreatedAt: make([]time.Time, 0, b.batchSize),
250+
UserID: make([]uuid.UUID, 0, b.batchSize),
251+
WorkspaceID: make([]uuid.UUID, 0, b.batchSize),
252+
TemplateID: make([]uuid.UUID, 0, b.batchSize),
253+
AgentID: make([]uuid.UUID, 0, b.batchSize),
254+
ConnectionsByProto: json.RawMessage("[]"),
255+
ConnectionCount: make([]int64, 0, b.batchSize),
256+
RxPackets: make([]int64, 0, b.batchSize),
257+
RxBytes: make([]int64, 0, b.batchSize),
258+
TxPackets: make([]int64, 0, b.batchSize),
259+
TxBytes: make([]int64, 0, b.batchSize),
260+
SessionCountVSCode: make([]int64, 0, b.batchSize),
261+
SessionCountJetBrains: make([]int64, 0, b.batchSize),
262+
SessionCountReconnectingPTY: make([]int64, 0, b.batchSize),
263+
SessionCountSSH: make([]int64, 0, b.batchSize),
264+
ConnectionMedianLatencyMS: make([]float64, 0, b.batchSize),
265+
}
266+
267+
b.connectionsByProto = make([]map[string]int64, 0, size)
268+
}
269+
270+
func (b *Batcher) resetBuf() {
271+
b.buf.ID = b.buf.ID[:0]
272+
b.buf.CreatedAt = b.buf.CreatedAt[:0]
273+
b.buf.UserID = b.buf.UserID[:0]
274+
b.buf.WorkspaceID = b.buf.WorkspaceID[:0]
275+
b.buf.TemplateID = b.buf.TemplateID[:0]
276+
b.buf.AgentID = b.buf.AgentID[:0]
277+
b.buf.ConnectionsByProto = json.RawMessage(`[]`)
278+
b.buf.ConnectionCount = b.buf.ConnectionCount[:0]
279+
b.buf.RxPackets = b.buf.RxPackets[:0]
280+
b.buf.RxBytes = b.buf.RxBytes[:0]
281+
b.buf.TxPackets = b.buf.TxPackets[:0]
282+
b.buf.TxBytes = b.buf.TxBytes[:0]
283+
b.buf.SessionCountVSCode = b.buf.SessionCountVSCode[:0]
284+
b.buf.SessionCountJetBrains = b.buf.SessionCountJetBrains[:0]
285+
b.buf.SessionCountReconnectingPTY = b.buf.SessionCountReconnectingPTY[:0]
286+
b.buf.SessionCountSSH = b.buf.SessionCountSSH[:0]
287+
b.buf.ConnectionMedianLatencyMS = b.buf.ConnectionMedianLatencyMS[:0]
288+
b.connectionsByProto = b.connectionsByProto[:0]
289+
}

0 commit comments

Comments
 (0)