Skip to content

Commit bfcd8f4

Browse files
committed
Merge branch 'main' into 9983-insights-metrics
2 parents 86f45a0 + b5e5b39 commit bfcd8f4

File tree

138 files changed

+3550
-1646
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

138 files changed

+3550
-1646
lines changed

.github/workflows/ci.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ jobs:
136136
137137
# Check for any typos
138138
- name: Check for typos
139-
uses: crate-ci/typos@v1.16.17
139+
uses: crate-ci/typos@v1.16.19
140140
with:
141141
config: .github/workflows/typos.toml
142142

@@ -223,7 +223,7 @@ jobs:
223223
go-version: 1.20.10
224224

225225
- name: Install shfmt
226-
run: go install mvdan.cc/sh/v3/cmd/shfmt@v3.5.0
226+
run: go install mvdan.cc/sh/v3/cmd/shfmt@v3.7.0
227227

228228
- name: make fmt
229229
run: |

.github/workflows/dogfood.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
echo "tag=${tag}" >> $GITHUB_OUTPUT
3939
4040
- name: Install Nix
41-
uses: DeterminateSystems/nix-installer-action@v5
41+
uses: DeterminateSystems/nix-installer-action@v6
4242

4343
- name: Run the Magic Nix Cache
4444
uses: DeterminateSystems/magic-nix-cache-action@v2

agent/agent.go

Lines changed: 186 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ type Client interface {
9090
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
9191
PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error
9292
PostStartup(ctx context.Context, req agentsdk.PostStartupRequest) error
93-
PostMetadata(ctx context.Context, key string, req agentsdk.PostMetadataRequest) error
93+
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
9494
PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error
9595
GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerConfig, error)
9696
}
@@ -362,140 +362,210 @@ func (t *trySingleflight) Do(key string, fn func()) {
362362
}
363363

364364
func (a *agent) reportMetadataLoop(ctx context.Context) {
365-
const metadataLimit = 128
365+
tickerDone := make(chan struct{})
366+
collectDone := make(chan struct{})
367+
ctx, cancel := context.WithCancel(ctx)
368+
defer func() {
369+
cancel()
370+
<-collectDone
371+
<-tickerDone
372+
}()
366373

367374
var (
368-
baseTicker = time.NewTicker(a.reportMetadataInterval)
369-
lastCollectedAtMu sync.RWMutex
370-
lastCollectedAts = make(map[string]time.Time)
371-
metadataResults = make(chan metadataResultAndKey, metadataLimit)
372-
logger = a.logger.Named("metadata")
375+
logger = a.logger.Named("metadata")
376+
report = make(chan struct{}, 1)
377+
collect = make(chan struct{}, 1)
378+
metadataResults = make(chan metadataResultAndKey, 1)
373379
)
374-
defer baseTicker.Stop()
375-
376-
// We use a custom singleflight that immediately returns if there is already
377-
// a goroutine running for a given key. This is to prevent a build-up of
378-
// goroutines waiting on Do when the script takes many multiples of
379-
// baseInterval to run.
380-
flight := trySingleflight{m: map[string]struct{}{}}
381-
382-
postMetadata := func(mr metadataResultAndKey) {
383-
err := a.client.PostMetadata(ctx, mr.key, *mr.result)
384-
if err != nil {
385-
a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err))
386-
}
387-
}
388380

389-
for {
390-
select {
391-
case <-ctx.Done():
392-
return
393-
case mr := <-metadataResults:
394-
postMetadata(mr)
395-
continue
396-
case <-baseTicker.C:
381+
// Set up collect and report as a single ticker with two channels,
382+
// this is to allow collection and reporting to be triggered
383+
// independently of each other.
384+
go func() {
385+
t := time.NewTicker(a.reportMetadataInterval)
386+
defer func() {
387+
t.Stop()
388+
close(report)
389+
close(collect)
390+
close(tickerDone)
391+
}()
392+
wake := func(c chan<- struct{}) {
393+
select {
394+
case c <- struct{}{}:
395+
default:
396+
}
397397
}
398+
wake(collect) // Start immediately.
398399

399-
if len(metadataResults) > 0 {
400-
// The inner collection loop expects the channel is empty before spinning up
401-
// all the collection goroutines.
402-
logger.Debug(ctx, "metadata collection backpressured",
403-
slog.F("queue_len", len(metadataResults)),
404-
)
405-
continue
400+
for {
401+
select {
402+
case <-ctx.Done():
403+
return
404+
case <-t.C:
405+
wake(report)
406+
wake(collect)
407+
}
406408
}
409+
}()
407410

408-
manifest := a.manifest.Load()
409-
if manifest == nil {
410-
continue
411-
}
411+
go func() {
412+
defer close(collectDone)
413+
414+
var (
415+
// We use a custom singleflight that immediately returns if there is already
416+
// a goroutine running for a given key. This is to prevent a build-up of
417+
// goroutines waiting on Do when the script takes many multiples of
418+
// baseInterval to run.
419+
flight = trySingleflight{m: map[string]struct{}{}}
420+
lastCollectedAtMu sync.RWMutex
421+
lastCollectedAts = make(map[string]time.Time)
422+
)
423+
for {
424+
select {
425+
case <-ctx.Done():
426+
return
427+
case <-collect:
428+
}
412429

413-
if len(manifest.Metadata) > metadataLimit {
414-
logger.Error(
415-
ctx, "metadata limit exceeded",
416-
slog.F("limit", metadataLimit), slog.F("got", len(manifest.Metadata)),
417-
)
418-
continue
419-
}
430+
manifest := a.manifest.Load()
431+
if manifest == nil {
432+
continue
433+
}
420434

421-
// If the manifest changes (e.g. on agent reconnect) we need to
422-
// purge old cache values to prevent lastCollectedAt from growing
423-
// boundlessly.
424-
lastCollectedAtMu.Lock()
425-
for key := range lastCollectedAts {
426-
if slices.IndexFunc(manifest.Metadata, func(md codersdk.WorkspaceAgentMetadataDescription) bool {
427-
return md.Key == key
428-
}) < 0 {
429-
logger.Debug(ctx, "deleting lastCollected key, missing from manifest",
430-
slog.F("key", key),
431-
)
432-
delete(lastCollectedAts, key)
435+
// If the manifest changes (e.g. on agent reconnect) we need to
436+
// purge old cache values to prevent lastCollectedAt from growing
437+
// boundlessly.
438+
lastCollectedAtMu.Lock()
439+
for key := range lastCollectedAts {
440+
if slices.IndexFunc(manifest.Metadata, func(md codersdk.WorkspaceAgentMetadataDescription) bool {
441+
return md.Key == key
442+
}) < 0 {
443+
logger.Debug(ctx, "deleting lastCollected key, missing from manifest",
444+
slog.F("key", key),
445+
)
446+
delete(lastCollectedAts, key)
447+
}
433448
}
434-
}
435-
lastCollectedAtMu.Unlock()
436-
437-
// Spawn a goroutine for each metadata collection, and use a
438-
// channel to synchronize the results and avoid both messy
439-
// mutex logic and overloading the API.
440-
for _, md := range manifest.Metadata {
441-
md := md
442-
// We send the result to the channel in the goroutine to avoid
443-
// sending the same result multiple times. So, we don't care about
444-
// the return values.
445-
go flight.Do(md.Key, func() {
446-
ctx := slog.With(ctx, slog.F("key", md.Key))
447-
lastCollectedAtMu.RLock()
448-
collectedAt, ok := lastCollectedAts[md.Key]
449-
lastCollectedAtMu.RUnlock()
450-
if ok {
451-
// If the interval is zero, we assume the user just wants
452-
// a single collection at startup, not a spinning loop.
453-
if md.Interval == 0 {
454-
return
449+
lastCollectedAtMu.Unlock()
450+
451+
// Spawn a goroutine for each metadata collection, and use a
452+
// channel to synchronize the results and avoid both messy
453+
// mutex logic and overloading the API.
454+
for _, md := range manifest.Metadata {
455+
md := md
456+
// We send the result to the channel in the goroutine to avoid
457+
// sending the same result multiple times. So, we don't care about
458+
// the return values.
459+
go flight.Do(md.Key, func() {
460+
ctx := slog.With(ctx, slog.F("key", md.Key))
461+
lastCollectedAtMu.RLock()
462+
collectedAt, ok := lastCollectedAts[md.Key]
463+
lastCollectedAtMu.RUnlock()
464+
if ok {
465+
// If the interval is zero, we assume the user just wants
466+
// a single collection at startup, not a spinning loop.
467+
if md.Interval == 0 {
468+
return
469+
}
470+
intervalUnit := time.Second
471+
// reportMetadataInterval is only less than a second in tests,
472+
// so adjust the interval unit for them.
473+
if a.reportMetadataInterval < time.Second {
474+
intervalUnit = 100 * time.Millisecond
475+
}
476+
// The last collected value isn't quite stale yet, so we skip it.
477+
if collectedAt.Add(time.Duration(md.Interval) * intervalUnit).After(time.Now()) {
478+
return
479+
}
455480
}
456-
intervalUnit := time.Second
457-
// reportMetadataInterval is only less than a second in tests,
458-
// so adjust the interval unit for them.
459-
if a.reportMetadataInterval < time.Second {
460-
intervalUnit = 100 * time.Millisecond
481+
482+
timeout := md.Timeout
483+
if timeout == 0 {
484+
if md.Interval != 0 {
485+
timeout = md.Interval
486+
} else if interval := int64(a.reportMetadataInterval.Seconds()); interval != 0 {
487+
// Fallback to the report interval
488+
timeout = interval * 3
489+
} else {
490+
// If the interval is still 0 (possible if the interval
491+
// is less than a second), default to 5. This was
492+
// randomly picked.
493+
timeout = 5
494+
}
461495
}
462-
// The last collected value isn't quite stale yet, so we skip it.
463-
if collectedAt.Add(time.Duration(md.Interval) * intervalUnit).After(time.Now()) {
464-
return
496+
ctxTimeout := time.Duration(timeout) * time.Second
497+
ctx, cancel := context.WithTimeout(ctx, ctxTimeout)
498+
defer cancel()
499+
500+
now := time.Now()
501+
select {
502+
case <-ctx.Done():
503+
logger.Warn(ctx, "metadata collection timed out", slog.F("timeout", ctxTimeout))
504+
case metadataResults <- metadataResultAndKey{
505+
key: md.Key,
506+
result: a.collectMetadata(ctx, md, now),
507+
}:
508+
lastCollectedAtMu.Lock()
509+
lastCollectedAts[md.Key] = now
510+
lastCollectedAtMu.Unlock()
465511
}
466-
}
512+
})
513+
}
514+
}
515+
}()
467516

468-
timeout := md.Timeout
469-
if timeout == 0 {
470-
if md.Interval != 0 {
471-
timeout = md.Interval
472-
} else if interval := int64(a.reportMetadataInterval.Seconds()); interval != 0 {
473-
// Fallback to the report interval
474-
timeout = interval * 3
475-
} else {
476-
// If the interval is still 0 (possible if the interval
477-
// is less than a second), default to 5. This was
478-
// randomly picked.
479-
timeout = 5
480-
}
517+
// Gather metadata updates and report them once every interval. If a
518+
// previous report is in flight, wait for it to complete before
519+
// sending a new one. If the network conditions are bad, we won't
520+
// benefit from canceling the previous send and starting a new one.
521+
var (
522+
updatedMetadata = make(map[string]*codersdk.WorkspaceAgentMetadataResult)
523+
reportTimeout = 30 * time.Second
524+
reportSemaphore = make(chan struct{}, 1)
525+
)
526+
reportSemaphore <- struct{}{}
527+
528+
for {
529+
select {
530+
case <-ctx.Done():
531+
return
532+
case mr := <-metadataResults:
533+
// This can overwrite unsent values, but that's fine because
534+
// we're only interested about up-to-date values.
535+
updatedMetadata[mr.key] = mr.result
536+
continue
537+
case <-report:
538+
if len(updatedMetadata) > 0 {
539+
metadata := make([]agentsdk.Metadata, 0, len(updatedMetadata))
540+
for key, result := range updatedMetadata {
541+
metadata = append(metadata, agentsdk.Metadata{
542+
Key: key,
543+
WorkspaceAgentMetadataResult: *result,
544+
})
545+
delete(updatedMetadata, key)
481546
}
482-
ctxTimeout := time.Duration(timeout) * time.Second
483-
ctx, cancel := context.WithTimeout(ctx, ctxTimeout)
484-
defer cancel()
485547

486-
now := time.Now()
487548
select {
488-
case <-ctx.Done():
489-
logger.Warn(ctx, "metadata collection timed out", slog.F("timeout", ctxTimeout))
490-
case metadataResults <- metadataResultAndKey{
491-
key: md.Key,
492-
result: a.collectMetadata(ctx, md, now),
493-
}:
494-
lastCollectedAtMu.Lock()
495-
lastCollectedAts[md.Key] = now
496-
lastCollectedAtMu.Unlock()
549+
case <-reportSemaphore:
550+
default:
551+
// If there's already a report in flight, don't send
552+
// another one, wait for next tick instead.
553+
continue
497554
}
498-
})
555+
556+
go func() {
557+
ctx, cancel := context.WithTimeout(ctx, reportTimeout)
558+
defer func() {
559+
cancel()
560+
reportSemaphore <- struct{}{}
561+
}()
562+
563+
err := a.client.PostMetadata(ctx, agentsdk.PostMetadataRequest{Metadata: metadata})
564+
if err != nil {
565+
a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err))
566+
}
567+
}()
568+
}
499569
}
500570
}
501571
}

0 commit comments

Comments
 (0)