Skip to content

chore: zero out session stats from agent with experiment enabled #13579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Prev Previous commit
Next Next commit
remove loop
  • Loading branch information
f0ssel committed Jun 17, 2024
commit fefc2deba4232fb65fad0a6b7145b6766ac7bca0
41 changes: 4 additions & 37 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type Options struct {
PrometheusRegistry *prometheus.Registry
ReportMetadataInterval time.Duration
ServiceBannerRefreshInterval time.Duration
ExperimentRefreshInterval time.Duration
FetchExperiments func(ctx context.Context) (codersdk.Experiments, error)
Syscaller agentproc.Syscaller
// ModifiedProcesses is used for testing process priority management.
Expand Down Expand Up @@ -141,9 +140,6 @@ func New(options Options) Agent {
return codersdk.Experiments{}, nil
}
}
if options.ExperimentRefreshInterval == 0 {
options.ExperimentRefreshInterval = 5 * time.Minute
}
if options.ReportMetadataInterval == 0 {
options.ReportMetadataInterval = time.Second
}
Expand Down Expand Up @@ -178,7 +174,6 @@ func New(options Options) Agent {
client: options.Client,
exchangeToken: options.ExchangeToken,
fetchExperiments: options.FetchExperiments,
fetchExperimentsInterval: options.ExperimentRefreshInterval,
filesystem: options.Filesystem,
logDir: options.LogDir,
tempDir: options.TempDir,
Expand Down Expand Up @@ -261,9 +256,8 @@ type agent struct {
lifecycleStates []agentsdk.PostLifecycleRequest
lifecycleLastReportedIndex int // Keeps track of the last lifecycle state we successfully reported.

fetchExperiments func(ctx context.Context) (codersdk.Experiments, error)
fetchExperimentsInterval time.Duration
experiments atomic.Pointer[codersdk.Experiments]
fetchExperiments func(ctx context.Context) (codersdk.Experiments, error)
experiments codersdk.Experiments

network *tailnet.Conn
addresses []netip.Prefix
Expand Down Expand Up @@ -753,28 +747,6 @@ func (a *agent) fetchServiceBannerLoop(ctx context.Context, conn drpc.Conn) erro
}
}

// fetchExperimentsLoop fetches experiments on an interval.
func (a *agent) fetchExperimentsLoop(ctx context.Context) error {
ticker := time.NewTicker(a.fetchExperimentsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
experiments, err := a.fetchExperiments(ctx)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
a.logger.Error(ctx, "failed to update experiments", slog.Error(err))
return err
}
a.experiments.Store(&experiments)
}
}
}

func (a *agent) run() (retErr error) {
// This allows the agent to refresh it's token if necessary.
// For instance identity this is required, since the instance
Expand All @@ -785,11 +757,10 @@ func (a *agent) run() (retErr error) {
}
a.sessionToken.Store(&sessionToken)

exp, err := a.fetchExperiments(a.hardCtx)
a.experiments, err = a.fetchExperiments(a.hardCtx)
if err != nil {
return xerrors.Errorf("fetch experiments: %w", err)
}
a.experiments.Store(&exp)

// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
conn, err := a.client.ConnectRPC(a.hardCtx)
Expand Down Expand Up @@ -900,10 +871,6 @@ func (a *agent) run() (retErr error) {

connMan.start("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)

connMan.start("fetch experiments loop", gracefulShutdownBehaviorStop, func(ctx context.Context, _ drpc.Conn) error {
return a.fetchExperimentsLoop(ctx)
})

connMan.start("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, conn drpc.Conn) error {
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
Expand Down Expand Up @@ -1053,7 +1020,7 @@ func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(co
closed := a.isClosed()
if !closed {
a.network = network
a.statsReporter = newStatsReporter(a.logger, network, a, &a.experiments)
a.statsReporter = newStatsReporter(a.logger, network, a, a.experiments)
}
a.closeMutex.Unlock()
if closed {
Expand Down
7 changes: 3 additions & 4 deletions agent/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"golang.org/x/xerrors"
"tailscale.com/types/netlogtype"

Expand Down Expand Up @@ -41,10 +40,10 @@ type statsReporter struct {
source networkStatsSource
collector statsCollector
logger slog.Logger
experiments *atomic.Pointer[codersdk.Experiments]
experiments codersdk.Experiments
}

func newStatsReporter(logger slog.Logger, source networkStatsSource, collector statsCollector, experiments *atomic.Pointer[codersdk.Experiments]) *statsReporter {
func newStatsReporter(logger slog.Logger, source networkStatsSource, collector statsCollector, experiments codersdk.Experiments) *statsReporter {
return &statsReporter{
Cond: sync.NewCond(&sync.Mutex{}),
logger: logger,
Expand Down Expand Up @@ -119,7 +118,7 @@ func (s *statsReporter) reportLocked(

// if the experiment is enabled we zero out certain session stats
// as we migrate to the client reporting these stats instead.
if s.experiments.Load().Enabled(codersdk.ExperimentWorkspaceUsage) {
if s.experiments.Enabled(codersdk.ExperimentWorkspaceUsage) {
stats.SessionCountSsh = 0
// TODO: More session types will be enabled as we migrate over.
// stats.SessionCountVscode = 0
Expand Down
3 changes: 1 addition & 2 deletions agent/stats_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/protobuf/types/known/durationpb"
"tailscale.com/types/ipproto"

Expand All @@ -32,7 +31,7 @@ func TestStatsReporter(t *testing.T) {
fSource := newFakeNetworkStatsSource(ctx, t)
fCollector := newFakeCollector(t)
fDest := newFakeStatsDest()
uut := newStatsReporter(logger, fSource, fCollector, &atomic.Pointer[codersdk.Experiments]{})
uut := newStatsReporter(logger, fSource, fCollector, codersdk.Experiments{})

loopErr := make(chan error, 1)
loopCtx, loopCancel := context.WithCancel(ctx)
Expand Down
Loading