Skip to content

Commit cf80efe

Browse files
committed
move to drpc
1 parent 39b78e9 commit cf80efe

File tree

11 files changed

+435
-152
lines changed

11 files changed

+435
-152
lines changed

agent/agent.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ type Options struct {
8686
PrometheusRegistry *prometheus.Registry
8787
ReportMetadataInterval time.Duration
8888
ServiceBannerRefreshInterval time.Duration
89-
FetchExperiments func(ctx context.Context) (codersdk.Experiments, error)
9089
Syscaller agentproc.Syscaller
9190
// ModifiedProcesses is used for testing process priority management.
9291
ModifiedProcesses chan []*agentproc.Process
@@ -135,11 +134,6 @@ func New(options Options) Agent {
135134
return "", nil
136135
}
137136
}
138-
if options.FetchExperiments == nil {
139-
options.FetchExperiments = func(ctx context.Context) (codersdk.Experiments, error) {
140-
return codersdk.Experiments{}, nil
141-
}
142-
}
143137
if options.ReportMetadataInterval == 0 {
144138
options.ReportMetadataInterval = time.Second
145139
}
@@ -173,7 +167,6 @@ func New(options Options) Agent {
173167
environmentVariables: options.EnvironmentVariables,
174168
client: options.Client,
175169
exchangeToken: options.ExchangeToken,
176-
fetchExperiments: options.FetchExperiments,
177170
filesystem: options.Filesystem,
178171
logDir: options.LogDir,
179172
tempDir: options.TempDir,
@@ -256,9 +249,6 @@ type agent struct {
256249
lifecycleStates []agentsdk.PostLifecycleRequest
257250
lifecycleLastReportedIndex int // Keeps track of the last lifecycle state we successfully reported.
258251

259-
fetchExperiments func(ctx context.Context) (codersdk.Experiments, error)
260-
experiments codersdk.Experiments
261-
262252
network *tailnet.Conn
263253
addresses []netip.Prefix
264254
statsReporter *statsReporter
@@ -757,11 +747,6 @@ func (a *agent) run() (retErr error) {
757747
}
758748
a.sessionToken.Store(&sessionToken)
759749

760-
a.experiments, err = a.fetchExperiments(a.hardCtx)
761-
if err != nil {
762-
return xerrors.Errorf("fetch experiments: %w", err)
763-
}
764-
765750
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
766751
conn, err := a.client.ConnectRPC(a.hardCtx)
767752
if err != nil {
@@ -1020,7 +1005,7 @@ func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(co
10201005
closed := a.isClosed()
10211006
if !closed {
10221007
a.network = network
1023-
a.statsReporter = newStatsReporter(a.logger, network, a, a.experiments)
1008+
a.statsReporter = newStatsReporter(a.logger, network, a)
10241009
}
10251010
a.closeMutex.Unlock()
10261011
if closed {

agent/agenttest/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ type FakeAgentAPI struct {
170170
logsCh chan<- *agentproto.BatchCreateLogsRequest
171171
lifecycleStates []codersdk.WorkspaceAgentLifecycle
172172
metadata map[string]agentsdk.Metadata
173+
experiments codersdk.Experiments
173174

174175
getAnnouncementBannersFunc func() ([]codersdk.BannerConfig, error)
175176
}
@@ -266,6 +267,12 @@ func (f *FakeAgentAPI) BatchUpdateMetadata(ctx context.Context, req *agentproto.
266267
return &agentproto.BatchUpdateMetadataResponse{}, nil
267268
}
268269

270+
func (f *FakeAgentAPI) GetExperiments(ctx context.Context, req *agentproto.GetExperimentsRequest) (*agentproto.GetExperimentsResponse, error) {
271+
f.Lock()
272+
defer f.Unlock()
273+
return agentsdk.ProtoFromExperiments(f.experiments), nil
274+
}
275+
269276
func (f *FakeAgentAPI) SetLogsChannel(ch chan<- *agentproto.BatchCreateLogsRequest) {
270277
f.Lock()
271278
defer f.Unlock()

agent/proto/agent.pb.go

Lines changed: 248 additions & 123 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/proto/agent.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,12 @@ message BannerConfig {
263263
string background_color = 3;
264264
}
265265

266+
message GetExperimentsRequest {}
267+
268+
message GetExperimentsResponse {
269+
repeated string experiments = 1;
270+
}
271+
266272
service Agent {
267273
rpc GetManifest(GetManifestRequest) returns (Manifest);
268274
rpc GetServiceBanner(GetServiceBannerRequest) returns (ServiceBanner);
@@ -273,4 +279,5 @@ service Agent {
273279
rpc BatchUpdateMetadata(BatchUpdateMetadataRequest) returns (BatchUpdateMetadataResponse);
274280
rpc BatchCreateLogs(BatchCreateLogsRequest) returns (BatchCreateLogsResponse);
275281
rpc GetAnnouncementBanners(GetAnnouncementBannersRequest) returns (GetAnnouncementBannersResponse);
282+
rpc GetExperiments(GetExperimentsRequest) returns (GetExperimentsResponse);
276283
}

agent/proto/agent_drpc.pb.go

Lines changed: 41 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/stats.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"cdr.dev/slog"
1212
"github.com/coder/coder/v2/agent/proto"
1313
"github.com/coder/coder/v2/codersdk"
14+
"github.com/coder/coder/v2/codersdk/agentsdk"
1415
)
1516

1617
const maxConns = 2048
@@ -24,6 +25,7 @@ type statsCollector interface {
2425
}
2526

2627
type statsDest interface {
28+
GetExperiments(ctx context.Context, req *proto.GetExperimentsRequest) (*proto.GetExperimentsResponse, error)
2729
UpdateStats(ctx context.Context, req *proto.UpdateStatsRequest) (*proto.UpdateStatsResponse, error)
2830
}
2931

@@ -43,13 +45,12 @@ type statsReporter struct {
4345
experiments codersdk.Experiments
4446
}
4547

46-
func newStatsReporter(logger slog.Logger, source networkStatsSource, collector statsCollector, experiments codersdk.Experiments) *statsReporter {
48+
func newStatsReporter(logger slog.Logger, source networkStatsSource, collector statsCollector) *statsReporter {
4749
return &statsReporter{
48-
Cond: sync.NewCond(&sync.Mutex{}),
49-
logger: logger,
50-
source: source,
51-
collector: collector,
52-
experiments: experiments,
50+
Cond: sync.NewCond(&sync.Mutex{}),
51+
logger: logger,
52+
source: source,
53+
collector: collector,
5354
}
5455
}
5556

@@ -70,6 +71,12 @@ func (s *statsReporter) callback(_, _ time.Time, virtual, _ map[netlogtype.Conne
7071
// this that use it. There is no retry and we fail on the first error since
7172
// this will be inside a larger retry loop.
7273
func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error {
74+
exp, err := dest.GetExperiments(ctx, &proto.GetExperimentsRequest{})
75+
if err != nil {
76+
return xerrors.Errorf("get experiments: %w", err)
77+
}
78+
s.experiments = agentsdk.ExperimentsFromProto(exp)
79+
7380
// send an initial, blank report to get the interval
7481
resp, err := dest.UpdateStats(ctx, &proto.UpdateStatsRequest{})
7582
if err != nil {

agent/stats_internal_test.go

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestStatsReporter(t *testing.T) {
3131
fSource := newFakeNetworkStatsSource(ctx, t)
3232
fCollector := newFakeCollector(t)
3333
fDest := newFakeStatsDest()
34-
uut := newStatsReporter(logger, fSource, fCollector, codersdk.Experiments{})
34+
uut := newStatsReporter(logger, fSource, fCollector)
3535

3636
loopErr := make(chan error, 1)
3737
loopCtx, loopCancel := context.WithCancel(ctx)
@@ -129,6 +129,73 @@ func TestStatsReporter(t *testing.T) {
129129
require.NoError(t, err)
130130
}
131131

132+
func TestStatsExperiment(t *testing.T) {
133+
t.Parallel()
134+
ctx := testutil.Context(t, testutil.WaitShort)
135+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
136+
fSource := newFakeNetworkStatsSource(ctx, t)
137+
fCollector := newFakeCollector(t)
138+
fDest := newFakeStatsDest()
139+
fDest.experiments.Experiments = append(fDest.experiments.Experiments, string(codersdk.ExperimentWorkspaceUsage))
140+
uut := newStatsReporter(logger, fSource, fCollector)
141+
142+
go func() {
143+
_ = uut.reportLoop(ctx, fDest)
144+
}()
145+
146+
// initial request to get duration
147+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
148+
require.NotNil(t, req)
149+
require.Nil(t, req.Stats)
150+
interval := time.Second * 34
151+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.UpdateStatsResponse{ReportInterval: durationpb.New(interval)})
152+
153+
// call to source to set the callback and interval
154+
gotInterval := testutil.RequireRecvCtx(ctx, t, fSource.period)
155+
require.Equal(t, interval, gotInterval)
156+
157+
// callback returning netstats
158+
netStats := map[netlogtype.Connection]netlogtype.Counts{
159+
{
160+
Proto: ipproto.TCP,
161+
Src: netip.MustParseAddrPort("192.168.1.33:4887"),
162+
Dst: netip.MustParseAddrPort("192.168.2.99:9999"),
163+
}: {
164+
TxPackets: 22,
165+
TxBytes: 23,
166+
RxPackets: 24,
167+
RxBytes: 25,
168+
},
169+
}
170+
fSource.callback(time.Now(), time.Now(), netStats, nil)
171+
172+
// collector called to complete the stats
173+
gotNetStats := testutil.RequireRecvCtx(ctx, t, fCollector.calls)
174+
require.Equal(t, netStats, gotNetStats)
175+
176+
// complete first collection
177+
stats := &proto.Stats{
178+
SessionCountSsh: 10,
179+
SessionCountJetbrains: 55,
180+
SessionCountVscode: 20,
181+
SessionCountReconnectingPty: 30,
182+
}
183+
testutil.RequireSendCtx(ctx, t, fCollector.stats, stats)
184+
185+
// destination called to report the first stats
186+
update := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
187+
require.NotNil(t, update)
188+
// confirm certain session counts are zeroed out when
189+
// experiment is enabled.
190+
require.EqualValues(t, 0, update.Stats.SessionCountSsh)
191+
// confirm others are not zeroed out. These will be
192+
// zeroed out in the future as we migrate to workspace
193+
// usage handling these session stats.
194+
require.EqualValues(t, 55, update.Stats.SessionCountJetbrains)
195+
require.EqualValues(t, 20, update.Stats.SessionCountVscode)
196+
require.EqualValues(t, 30, update.Stats.SessionCountReconnectingPty)
197+
}
198+
132199
type fakeNetworkStatsSource struct {
133200
sync.Mutex
134201
ctx context.Context
@@ -190,8 +257,9 @@ func newFakeCollector(t testing.TB) *fakeCollector {
190257
}
191258

192259
type fakeStatsDest struct {
193-
reqs chan *proto.UpdateStatsRequest
194-
resps chan *proto.UpdateStatsResponse
260+
reqs chan *proto.UpdateStatsRequest
261+
resps chan *proto.UpdateStatsResponse
262+
experiments *proto.GetExperimentsResponse
195263
}
196264

197265
func (f *fakeStatsDest) UpdateStats(ctx context.Context, req *proto.UpdateStatsRequest) (*proto.UpdateStatsResponse, error) {
@@ -209,10 +277,17 @@ func (f *fakeStatsDest) UpdateStats(ctx context.Context, req *proto.UpdateStatsR
209277
}
210278
}
211279

280+
func (f *fakeStatsDest) GetExperiments(_ context.Context, _ *proto.GetExperimentsRequest) (*proto.GetExperimentsResponse, error) {
281+
return f.experiments, nil
282+
}
283+
212284
func newFakeStatsDest() *fakeStatsDest {
213285
return &fakeStatsDest{
214286
reqs: make(chan *proto.UpdateStatsRequest),
215287
resps: make(chan *proto.UpdateStatsResponse),
288+
experiments: &proto.GetExperimentsResponse{
289+
Experiments: []string{},
290+
},
216291
}
217292
}
218293

cli/agent.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,6 @@ func (r *RootCmd) workspaceAgent() *serpent.Command {
306306
client.SetSessionToken(resp.SessionToken)
307307
return resp.SessionToken, nil
308308
},
309-
FetchExperiments: func(ctx context.Context) (codersdk.Experiments, error) {
310-
return client.SDK.Experiments(ctx)
311-
},
312309
EnvironmentVariables: environmentVariables,
313310
IgnorePorts: ignorePorts,
314311
SSHMaxTimeout: sshMaxTimeout,

coderd/agentapi/api.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type API struct {
4444
*MetadataAPI
4545
*LogsAPI
4646
*tailnet.DRPCService
47+
*ExperimentAPI
4748

4849
mu sync.Mutex
4950
cachedWorkspaceID uuid.UUID
@@ -159,6 +160,10 @@ func New(opts Options) *API {
159160
DerpMapFn: opts.DerpMapFn,
160161
}
161162

163+
api.ExperimentAPI = &ExperimentAPI{
164+
experiments: opts.Experiments,
165+
}
166+
162167
return api
163168
}
164169

coderd/agentapi/experiments.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package agentapi
2+
3+
import (
4+
"context"
5+
6+
"github.com/coder/coder/v2/agent/proto"
7+
"github.com/coder/coder/v2/codersdk"
8+
"github.com/coder/coder/v2/codersdk/agentsdk"
9+
)
10+
11+
type ExperimentAPI struct {
12+
experiments codersdk.Experiments
13+
}
14+
15+
func (a *ExperimentAPI) GetExperiments(ctx context.Context, _ *proto.GetExperimentsRequest) (*proto.GetExperimentsResponse, error) {
16+
return agentsdk.ProtoFromExperiments(a.experiments), nil
17+
}

0 commit comments

Comments
 (0)