Skip to content

chore: zero out session stats from agent with experiment enabled #13579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Prev Previous commit
Next Next commit
move to drpc
  • Loading branch information
f0ssel committed Jun 18, 2024
commit cf80efe1eeea06580a9f975eb71d04506c47fd5a
17 changes: 1 addition & 16 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type Options struct {
PrometheusRegistry *prometheus.Registry
ReportMetadataInterval time.Duration
ServiceBannerRefreshInterval time.Duration
FetchExperiments func(ctx context.Context) (codersdk.Experiments, error)
Syscaller agentproc.Syscaller
// ModifiedProcesses is used for testing process priority management.
ModifiedProcesses chan []*agentproc.Process
Expand Down Expand Up @@ -135,11 +134,6 @@ func New(options Options) Agent {
return "", nil
}
}
if options.FetchExperiments == nil {
options.FetchExperiments = func(ctx context.Context) (codersdk.Experiments, error) {
return codersdk.Experiments{}, nil
}
}
if options.ReportMetadataInterval == 0 {
options.ReportMetadataInterval = time.Second
}
Expand Down Expand Up @@ -173,7 +167,6 @@ func New(options Options) Agent {
environmentVariables: options.EnvironmentVariables,
client: options.Client,
exchangeToken: options.ExchangeToken,
fetchExperiments: options.FetchExperiments,
filesystem: options.Filesystem,
logDir: options.LogDir,
tempDir: options.TempDir,
Expand Down Expand Up @@ -256,9 +249,6 @@ type agent struct {
lifecycleStates []agentsdk.PostLifecycleRequest
lifecycleLastReportedIndex int // Keeps track of the last lifecycle state we successfully reported.

fetchExperiments func(ctx context.Context) (codersdk.Experiments, error)
experiments codersdk.Experiments

network *tailnet.Conn
addresses []netip.Prefix
statsReporter *statsReporter
Expand Down Expand Up @@ -757,11 +747,6 @@ func (a *agent) run() (retErr error) {
}
a.sessionToken.Store(&sessionToken)

a.experiments, err = a.fetchExperiments(a.hardCtx)
if err != nil {
return xerrors.Errorf("fetch experiments: %w", err)
}

// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
conn, err := a.client.ConnectRPC(a.hardCtx)
if err != nil {
Expand Down Expand Up @@ -1020,7 +1005,7 @@ func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(co
closed := a.isClosed()
if !closed {
a.network = network
a.statsReporter = newStatsReporter(a.logger, network, a, a.experiments)
a.statsReporter = newStatsReporter(a.logger, network, a)
}
a.closeMutex.Unlock()
if closed {
Expand Down
7 changes: 7 additions & 0 deletions agent/agenttest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
logsCh chan<- *agentproto.BatchCreateLogsRequest
lifecycleStates []codersdk.WorkspaceAgentLifecycle
metadata map[string]agentsdk.Metadata
experiments codersdk.Experiments

getAnnouncementBannersFunc func() ([]codersdk.BannerConfig, error)
}
Expand Down Expand Up @@ -266,6 +267,12 @@
return &agentproto.BatchUpdateMetadataResponse{}, nil
}

func (f *FakeAgentAPI) GetExperiments(ctx context.Context, req *agentproto.GetExperimentsRequest) (*agentproto.GetExperimentsResponse, error) {

Check failure on line 270 in agent/agenttest/client.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
f.Lock()
defer f.Unlock()
return agentsdk.ProtoFromExperiments(f.experiments), nil
}

func (f *FakeAgentAPI) SetLogsChannel(ch chan<- *agentproto.BatchCreateLogsRequest) {
f.Lock()
defer f.Unlock()
Expand Down
371 changes: 248 additions & 123 deletions agent/proto/agent.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions agent/proto/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ message BannerConfig {
string background_color = 3;
}

message GetExperimentsRequest {}

message GetExperimentsResponse {
repeated string experiments = 1;
}

service Agent {
rpc GetManifest(GetManifestRequest) returns (Manifest);
rpc GetServiceBanner(GetServiceBannerRequest) returns (ServiceBanner);
Expand All @@ -273,4 +279,5 @@ service Agent {
rpc BatchUpdateMetadata(BatchUpdateMetadataRequest) returns (BatchUpdateMetadataResponse);
rpc BatchCreateLogs(BatchCreateLogsRequest) returns (BatchCreateLogsResponse);
rpc GetAnnouncementBanners(GetAnnouncementBannersRequest) returns (GetAnnouncementBannersResponse);
rpc GetExperiments(GetExperimentsRequest) returns (GetExperimentsResponse);
}
42 changes: 41 additions & 1 deletion agent/proto/agent_drpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions agent/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"cdr.dev/slog"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
)

const maxConns = 2048
Expand All @@ -24,6 +25,7 @@ type statsCollector interface {
}

type statsDest interface {
GetExperiments(ctx context.Context, req *proto.GetExperimentsRequest) (*proto.GetExperimentsResponse, error)
UpdateStats(ctx context.Context, req *proto.UpdateStatsRequest) (*proto.UpdateStatsResponse, error)
}

Expand All @@ -43,13 +45,12 @@ type statsReporter struct {
experiments codersdk.Experiments
}

func newStatsReporter(logger slog.Logger, source networkStatsSource, collector statsCollector, experiments codersdk.Experiments) *statsReporter {
func newStatsReporter(logger slog.Logger, source networkStatsSource, collector statsCollector) *statsReporter {
return &statsReporter{
Cond: sync.NewCond(&sync.Mutex{}),
logger: logger,
source: source,
collector: collector,
experiments: experiments,
Cond: sync.NewCond(&sync.Mutex{}),
logger: logger,
source: source,
collector: collector,
}
}

Expand All @@ -70,6 +71,12 @@ func (s *statsReporter) callback(_, _ time.Time, virtual, _ map[netlogtype.Conne
// this that use it. There is no retry and we fail on the first error since
// this will be inside a larger retry loop.
func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error {
exp, err := dest.GetExperiments(ctx, &proto.GetExperimentsRequest{})
if err != nil {
return xerrors.Errorf("get experiments: %w", err)
}
s.experiments = agentsdk.ExperimentsFromProto(exp)

// send an initial, blank report to get the interval
resp, err := dest.UpdateStats(ctx, &proto.UpdateStatsRequest{})
if err != nil {
Expand Down
81 changes: 78 additions & 3 deletions agent/stats_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestStatsReporter(t *testing.T) {
fSource := newFakeNetworkStatsSource(ctx, t)
fCollector := newFakeCollector(t)
fDest := newFakeStatsDest()
uut := newStatsReporter(logger, fSource, fCollector, codersdk.Experiments{})
uut := newStatsReporter(logger, fSource, fCollector)

loopErr := make(chan error, 1)
loopCtx, loopCancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -129,6 +129,73 @@ func TestStatsReporter(t *testing.T) {
require.NoError(t, err)
}

func TestStatsExperiment(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fSource := newFakeNetworkStatsSource(ctx, t)
fCollector := newFakeCollector(t)
fDest := newFakeStatsDest()
fDest.experiments.Experiments = append(fDest.experiments.Experiments, string(codersdk.ExperimentWorkspaceUsage))
uut := newStatsReporter(logger, fSource, fCollector)

go func() {
_ = uut.reportLoop(ctx, fDest)
}()

// initial request to get duration
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
require.Nil(t, req.Stats)
interval := time.Second * 34
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.UpdateStatsResponse{ReportInterval: durationpb.New(interval)})

// call to source to set the callback and interval
gotInterval := testutil.RequireRecvCtx(ctx, t, fSource.period)
require.Equal(t, interval, gotInterval)

// callback returning netstats
netStats := map[netlogtype.Connection]netlogtype.Counts{
{
Proto: ipproto.TCP,
Src: netip.MustParseAddrPort("192.168.1.33:4887"),
Dst: netip.MustParseAddrPort("192.168.2.99:9999"),
}: {
TxPackets: 22,
TxBytes: 23,
RxPackets: 24,
RxBytes: 25,
},
}
fSource.callback(time.Now(), time.Now(), netStats, nil)

// collector called to complete the stats
gotNetStats := testutil.RequireRecvCtx(ctx, t, fCollector.calls)
require.Equal(t, netStats, gotNetStats)

// complete first collection
stats := &proto.Stats{
SessionCountSsh: 10,
SessionCountJetbrains: 55,
SessionCountVscode: 20,
SessionCountReconnectingPty: 30,
}
testutil.RequireSendCtx(ctx, t, fCollector.stats, stats)

// destination called to report the first stats
update := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, update)
// confirm certain session counts are zeroed out when
// experiment is enabled.
require.EqualValues(t, 0, update.Stats.SessionCountSsh)
// confirm others are not zeroed out. These will be
// zeroed out in the future as we migrate to workspace
// usage handling these session stats.
require.EqualValues(t, 55, update.Stats.SessionCountJetbrains)
require.EqualValues(t, 20, update.Stats.SessionCountVscode)
require.EqualValues(t, 30, update.Stats.SessionCountReconnectingPty)
}

type fakeNetworkStatsSource struct {
sync.Mutex
ctx context.Context
Expand Down Expand Up @@ -190,8 +257,9 @@ func newFakeCollector(t testing.TB) *fakeCollector {
}

type fakeStatsDest struct {
reqs chan *proto.UpdateStatsRequest
resps chan *proto.UpdateStatsResponse
reqs chan *proto.UpdateStatsRequest
resps chan *proto.UpdateStatsResponse
experiments *proto.GetExperimentsResponse
}

func (f *fakeStatsDest) UpdateStats(ctx context.Context, req *proto.UpdateStatsRequest) (*proto.UpdateStatsResponse, error) {
Expand All @@ -209,10 +277,17 @@ func (f *fakeStatsDest) UpdateStats(ctx context.Context, req *proto.UpdateStatsR
}
}

func (f *fakeStatsDest) GetExperiments(_ context.Context, _ *proto.GetExperimentsRequest) (*proto.GetExperimentsResponse, error) {
return f.experiments, nil
}

func newFakeStatsDest() *fakeStatsDest {
return &fakeStatsDest{
reqs: make(chan *proto.UpdateStatsRequest),
resps: make(chan *proto.UpdateStatsResponse),
experiments: &proto.GetExperimentsResponse{
Experiments: []string{},
},
}
}

Expand Down
3 changes: 0 additions & 3 deletions cli/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,6 @@ func (r *RootCmd) workspaceAgent() *serpent.Command {
client.SetSessionToken(resp.SessionToken)
return resp.SessionToken, nil
},
FetchExperiments: func(ctx context.Context) (codersdk.Experiments, error) {
return client.SDK.Experiments(ctx)
},
EnvironmentVariables: environmentVariables,
IgnorePorts: ignorePorts,
SSHMaxTimeout: sshMaxTimeout,
Expand Down
5 changes: 5 additions & 0 deletions coderd/agentapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type API struct {
*MetadataAPI
*LogsAPI
*tailnet.DRPCService
*ExperimentAPI

mu sync.Mutex
cachedWorkspaceID uuid.UUID
Expand Down Expand Up @@ -159,6 +160,10 @@ func New(opts Options) *API {
DerpMapFn: opts.DerpMapFn,
}

api.ExperimentAPI = &ExperimentAPI{
experiments: opts.Experiments,
}

return api
}

Expand Down
17 changes: 17 additions & 0 deletions coderd/agentapi/experiments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package agentapi

import (
"context"

"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
)

type ExperimentAPI struct {
experiments codersdk.Experiments
}

func (a *ExperimentAPI) GetExperiments(ctx context.Context, _ *proto.GetExperimentsRequest) (*proto.GetExperimentsResponse, error) {

Check failure on line 15 in coderd/agentapi/experiments.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
return agentsdk.ProtoFromExperiments(a.experiments), nil
}
Loading
Loading