Skip to content

chore: zero out session stats from agent with experiment enabled #13579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions agent/agenttest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type FakeAgentAPI struct {
logsCh chan<- *agentproto.BatchCreateLogsRequest
lifecycleStates []codersdk.WorkspaceAgentLifecycle
metadata map[string]agentsdk.Metadata
experiments codersdk.Experiments

getAnnouncementBannersFunc func() ([]codersdk.BannerConfig, error)
}
Expand Down Expand Up @@ -266,6 +267,12 @@ func (f *FakeAgentAPI) BatchUpdateMetadata(ctx context.Context, req *agentproto.
return &agentproto.BatchUpdateMetadataResponse{}, nil
}

func (f *FakeAgentAPI) GetExperiments(_ context.Context, _ *agentproto.GetExperimentsRequest) (*agentproto.GetExperimentsResponse, error) {
f.Lock()
defer f.Unlock()
return agentsdk.ProtoFromExperiments(f.experiments), nil
}

func (f *FakeAgentAPI) SetLogsChannel(ch chan<- *agentproto.BatchCreateLogsRequest) {
f.Lock()
defer f.Unlock()
Expand Down
371 changes: 248 additions & 123 deletions agent/proto/agent.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions agent/proto/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ message BannerConfig {
string background_color = 3;
}

message GetExperimentsRequest {}

message GetExperimentsResponse {
repeated string experiments = 1;
}

service Agent {
rpc GetManifest(GetManifestRequest) returns (Manifest);
rpc GetServiceBanner(GetServiceBannerRequest) returns (ServiceBanner);
Expand All @@ -273,4 +279,5 @@ service Agent {
rpc BatchUpdateMetadata(BatchUpdateMetadataRequest) returns (BatchUpdateMetadataResponse);
rpc BatchCreateLogs(BatchCreateLogsRequest) returns (BatchCreateLogsResponse);
rpc GetAnnouncementBanners(GetAnnouncementBannersRequest) returns (GetAnnouncementBannersResponse);
rpc GetExperiments(GetExperimentsRequest) returns (GetExperimentsResponse);
}
42 changes: 41 additions & 1 deletion agent/proto/agent_drpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 30 additions & 7 deletions agent/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"cdr.dev/slog"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
)

const maxConns = 2048
Expand All @@ -22,23 +24,25 @@ type statsCollector interface {
Collect(ctx context.Context, networkStats map[netlogtype.Connection]netlogtype.Counts) *proto.Stats
}

type statsDest interface {
type statsAPI interface {
GetExperiments(ctx context.Context, req *proto.GetExperimentsRequest) (*proto.GetExperimentsResponse, error)
UpdateStats(ctx context.Context, req *proto.UpdateStatsRequest) (*proto.UpdateStatsResponse, error)
}

// statsReporter is a subcomponent of the agent that handles registering the stats callback on the
// networkStatsSource (tailnet.Conn in prod), handling the callback, calling back to the
// statsCollector (agent in prod) to collect additional stats, then sending the update to the
// statsDest (agent API in prod)
// statsAPI (agent API in prod)
type statsReporter struct {
*sync.Cond
networkStats *map[netlogtype.Connection]netlogtype.Counts
unreported bool
lastInterval time.Duration

source networkStatsSource
collector statsCollector
logger slog.Logger
source networkStatsSource
collector statsCollector
logger slog.Logger
experiments codersdk.Experiments
}

func newStatsReporter(logger slog.Logger, source networkStatsSource, collector statsCollector) *statsReporter {
Expand Down Expand Up @@ -66,7 +70,15 @@ func (s *statsReporter) callback(_, _ time.Time, virtual, _ map[netlogtype.Conne
// connection to the agent API, then passes that connection to go routines like
// this that use it. There is no retry and we fail on the first error since
// this will be inside a larger retry loop.
func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error {
func (s *statsReporter) reportLoop(ctx context.Context, dest statsAPI) error {
exp, err := dest.GetExperiments(ctx, &proto.GetExperimentsRequest{})
if err != nil {
return xerrors.Errorf("get experiments: %w", err)
}
s.L.Lock()
s.experiments = agentsdk.ExperimentsFromProto(exp)
s.L.Unlock()

// send an initial, blank report to get the interval
resp, err := dest.UpdateStats(ctx, &proto.UpdateStatsRequest{})
if err != nil {
Expand Down Expand Up @@ -105,13 +117,24 @@ func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error {
}

func (s *statsReporter) reportLocked(
ctx context.Context, dest statsDest, networkStats map[netlogtype.Connection]netlogtype.Counts,
ctx context.Context, dest statsAPI, networkStats map[netlogtype.Connection]netlogtype.Counts,
) error {
// here we want to do our collecting/reporting while it is unlocked, but then relock
// when we return to reportLoop.
s.L.Unlock()
defer s.L.Lock()
stats := s.collector.Collect(ctx, networkStats)

// if the experiment is enabled we zero out certain session stats
// as we migrate to the client reporting these stats instead.
Comment on lines +128 to +129
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this get implemented serverside instead? It seems like a lot of changes in the agent for something that could be zeroed out in the stats API on the server.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this would keep the change in 'one place' and remove the need for plumbing this through AgentAPI

Copy link
Contributor Author

@f0ssel f0ssel Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really and here's why - If customers update coder but do not rebuild the workspace, we will have an old agent client and old cli version in the workspace. If the experiment is enabled and we zero out the value server side, we will lose data for that workspace because the old cli will not be reporting the new data via the usage endpoint. So the graphs will go to zero and slowly come back up over time as each workspace is restarted.

So we need the endpoint to still save this data for old workspaces, but also allow new workspaces to report it in the new way. Or we accept downtime on the stats on upgrade until all workspaces are updated. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the experiment is enabled and we zero out the value server side, we will lose data for that workspace because the old cli will not be reporting the new data via the usage endpoint.

Won't the same thing happen even if the feature was GA? Old agent/CLI versions need to be supported on new servers for multiple months.

This will also affect stats collection for older CLIs being used outside of workspaces, which arguably is where more people use the CLI anyways. I don't think anyone is using the CLI in a workspace to connect to a workspace.

So we need the endpoint to still save this data for old workspaces, but also allow new workspaces to report it in the new way. Or we accept downtime on the stats on upgrade until all workspaces are updated.

I think either way there will be a "downtime" of stats with or without the agent changes when an older CLI is being used on the local machine.

I think this PR should change into just the CLI stats upload portion and have both sides report stats for now. Then in a few months you can remove stats reporting from the API by deprecating the field and ignoring it serverside.

if s.experiments.Enabled(codersdk.ExperimentWorkspaceUsage) {
stats.SessionCountSsh = 0
// TODO: More session types will be enabled as we migrate over.
// stats.SessionCountVscode = 0
// stats.SessionCountJetbrains = 0
// stats.SessionCountReconnectingPty = 0
}

resp, err := dest.UpdateStats(ctx, &proto.UpdateStatsRequest{Stats: stats})
if err != nil {
return err
Expand Down
80 changes: 78 additions & 2 deletions agent/stats_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"cdr.dev/slog/sloggers/slogjson"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/testutil"
)

Expand Down Expand Up @@ -128,6 +129,73 @@ func TestStatsReporter(t *testing.T) {
require.NoError(t, err)
}

func TestStatsExperiment(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fSource := newFakeNetworkStatsSource(ctx, t)
fCollector := newFakeCollector(t)
fDest := newFakeStatsDest()
fDest.experiments.Experiments = append(fDest.experiments.Experiments, string(codersdk.ExperimentWorkspaceUsage))
uut := newStatsReporter(logger, fSource, fCollector)

go func() {
_ = uut.reportLoop(ctx, fDest)
}()

// initial request to get duration
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
require.Nil(t, req.Stats)
interval := time.Second * 34
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.UpdateStatsResponse{ReportInterval: durationpb.New(interval)})

// call to source to set the callback and interval
gotInterval := testutil.RequireRecvCtx(ctx, t, fSource.period)
require.Equal(t, interval, gotInterval)

// callback returning netstats
netStats := map[netlogtype.Connection]netlogtype.Counts{
{
Proto: ipproto.TCP,
Src: netip.MustParseAddrPort("192.168.1.33:4887"),
Dst: netip.MustParseAddrPort("192.168.2.99:9999"),
}: {
TxPackets: 22,
TxBytes: 23,
RxPackets: 24,
RxBytes: 25,
},
}
fSource.callback(time.Now(), time.Now(), netStats, nil)

// collector called to complete the stats
gotNetStats := testutil.RequireRecvCtx(ctx, t, fCollector.calls)
require.Equal(t, netStats, gotNetStats)

// complete first collection
stats := &proto.Stats{
SessionCountSsh: 10,
SessionCountJetbrains: 55,
SessionCountVscode: 20,
SessionCountReconnectingPty: 30,
}
testutil.RequireSendCtx(ctx, t, fCollector.stats, stats)

// destination called to report the first stats
update := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, update)
// confirm certain session counts are zeroed out when
// experiment is enabled.
require.EqualValues(t, 0, update.Stats.SessionCountSsh)
// confirm others are not zeroed out. These will be
// zeroed out in the future as we migrate to workspace
// usage handling these session stats.
require.EqualValues(t, 55, update.Stats.SessionCountJetbrains)
require.EqualValues(t, 20, update.Stats.SessionCountVscode)
require.EqualValues(t, 30, update.Stats.SessionCountReconnectingPty)
}

type fakeNetworkStatsSource struct {
sync.Mutex
ctx context.Context
Expand Down Expand Up @@ -189,8 +257,9 @@ func newFakeCollector(t testing.TB) *fakeCollector {
}

type fakeStatsDest struct {
reqs chan *proto.UpdateStatsRequest
resps chan *proto.UpdateStatsResponse
reqs chan *proto.UpdateStatsRequest
resps chan *proto.UpdateStatsResponse
experiments *proto.GetExperimentsResponse
}

func (f *fakeStatsDest) UpdateStats(ctx context.Context, req *proto.UpdateStatsRequest) (*proto.UpdateStatsResponse, error) {
Expand All @@ -208,10 +277,17 @@ func (f *fakeStatsDest) UpdateStats(ctx context.Context, req *proto.UpdateStatsR
}
}

func (f *fakeStatsDest) GetExperiments(_ context.Context, _ *proto.GetExperimentsRequest) (*proto.GetExperimentsResponse, error) {
return f.experiments, nil
}

func newFakeStatsDest() *fakeStatsDest {
return &fakeStatsDest{
reqs: make(chan *proto.UpdateStatsRequest),
resps: make(chan *proto.UpdateStatsResponse),
experiments: &proto.GetExperimentsResponse{
Experiments: []string{},
},
}
}

Expand Down
7 changes: 7 additions & 0 deletions cli/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,13 @@ func (r *RootCmd) ssh() *serpent.Command {
return xerrors.Errorf("start shell: %w", err)
}

// track workspace usage while connection is open
closeUsage := client.UpdateWorkspaceUsageWithBodyContext(ctx, workspace.ID, codersdk.PostWorkspaceUsageRequest{
AgentID: workspaceAgent.ID,
AppName: codersdk.UsageAppNameSSH,
})
defer closeUsage()

// Put cancel at the top of the defer stack to initiate
// shutdown of services.
defer cancel()
Expand Down
Loading
Loading