Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix(agent): prevent goroutine pile up in reportMetadataLoop
In the prior implementation, calls to DoChan would stack up because we
weren't updating lastCollectedAts until collectMetadata finished. This
wasn't a true leak, instead, it meant that there would be
up to ~ (collectionRuntime / baseInterval) outstanding goroutines. So, for
example, if `sleep 60s` was the metadata script there would be up to
60 goroutines waiting at peak.
  • Loading branch information
ammario committed Apr 1, 2023
commit 94770a264f15899dd9c3659eab1f887ccb8b708b
26 changes: 21 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"go.uber.org/atomic"
gossh "golang.org/x/crypto/ssh"
"golang.org/x/exp/slices"
"golang.org/x/sync/singleflight"
"golang.org/x/xerrors"
"tailscale.com/net/speedtest"
"tailscale.com/tailcfg"
Expand Down Expand Up @@ -264,6 +263,21 @@ type metadataResultAndKey struct {
key string
}

type trySingleflight struct {
m sync.Map
}

func (t *trySingleflight) Do(key string, fn func()) {
_, loaded := t.m.LoadOrStore(key, struct{}{})
if !loaded {
// There is already a goroutine running for this key.
return
}

defer t.m.Delete(key)
fn()
}

func (a *agent) reportMetadataLoop(ctx context.Context) {
baseInterval := adjustIntervalForTests(1)

Expand All @@ -276,7 +290,11 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
)
defer baseTicker.Stop()

var flight singleflight.Group
// We use a custom singleflight that immediately returns if there is already
// a goroutine running for a given key. This is to prevent a build-up of
// goroutines waiting on Do when the script takes many multiples of
// baseInterval to run.
var flight trySingleflight

for {
select {
Expand Down Expand Up @@ -348,7 +366,7 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
// We send the result to the channel in the goroutine to avoid
// sending the same result multiple times. So, we don't care about
// the return values.
flight.DoChan(md.Key, func() (interface{}, error) {
go flight.Do(md.Key, func() {
timeout := md.Timeout
if timeout == 0 {
timeout = md.Interval
Expand All @@ -360,13 +378,11 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {

select {
case <-ctx.Done():
return 0, nil
case metadataResults <- metadataResultAndKey{
key: md.Key,
result: a.collectMetadata(ctx, md),
}:
}
return 0, nil
})
}
}
Expand Down