Skip to content

Commit b0afffb

Browse files
authored
feat: use v2 API for agent metadata updates (#12281)
Switches the agent to report metadata over the v2 API. Fixes #10534
1 parent 7a245e6 commit b0afffb

File tree

5 files changed

+128
-57
lines changed

5 files changed

+128
-57
lines changed

agent/agent.go

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ type Options struct {
9090

9191
type Client interface {
9292
ConnectRPC(ctx context.Context) (drpc.Conn, error)
93-
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
9493
RewriteDERPMap(derpMap *tailcfg.DERPMap)
9594
}
9695

@@ -298,7 +297,6 @@ func (a *agent) init() {
298297
// may be happening, but regardless after the intermittent
299298
// failure, you'll want the agent to reconnect.
300299
func (a *agent) runLoop() {
301-
go a.reportMetadataUntilGracefulShutdown()
302300
go a.manageProcessPriorityUntilGracefulShutdown()
303301

304302
// need to keep retrying up to the hardCtx so that we can send graceful shutdown-related
@@ -405,9 +403,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
405403
fn()
406404
}
407405

408-
func (a *agent) reportMetadataUntilGracefulShutdown() {
409-
// metadata reporting can cease as soon as we start gracefully shutting down.
410-
ctx := a.gracefulCtx
406+
func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
411407
tickerDone := make(chan struct{})
412408
collectDone := make(chan struct{})
413409
ctx, cancel := context.WithCancel(ctx)
@@ -567,51 +563,55 @@ func (a *agent) reportMetadataUntilGracefulShutdown() {
567563
var (
568564
updatedMetadata = make(map[string]*codersdk.WorkspaceAgentMetadataResult)
569565
reportTimeout = 30 * time.Second
570-
reportSemaphore = make(chan struct{}, 1)
566+
reportError = make(chan error, 1)
567+
reportInFlight = false
568+
aAPI = proto.NewDRPCAgentClient(conn)
571569
)
572-
reportSemaphore <- struct{}{}
573570

574571
for {
575572
select {
576573
case <-ctx.Done():
577-
return
574+
return ctx.Err()
578575
case mr := <-metadataResults:
579576
// This can overwrite unsent values, but that's fine because
580577
// we're only interested about up-to-date values.
581578
updatedMetadata[mr.key] = mr.result
582579
continue
580+
case err := <-reportError:
581+
a.logger.Debug(ctx, "batch update metadata complete", slog.Error(err))
582+
if err != nil {
583+
return xerrors.Errorf("failed to report metadata: %w", err)
584+
}
585+
reportInFlight = false
583586
case <-report:
584-
if len(updatedMetadata) > 0 {
585-
select {
586-
case <-reportSemaphore:
587-
default:
588-
// If there's already a report in flight, don't send
589-
// another one, wait for next tick instead.
590-
continue
591-
}
592-
593-
metadata := make([]agentsdk.Metadata, 0, len(updatedMetadata))
594-
for key, result := range updatedMetadata {
595-
metadata = append(metadata, agentsdk.Metadata{
596-
Key: key,
597-
WorkspaceAgentMetadataResult: *result,
598-
})
599-
delete(updatedMetadata, key)
600-
}
587+
if len(updatedMetadata) == 0 {
588+
continue
589+
}
590+
if reportInFlight {
591+
// If there's already a report in flight, don't send
592+
// another one, wait for next tick instead.
593+
a.logger.Debug(ctx, "skipped metadata report tick because report is in flight")
594+
continue
595+
}
596+
metadata := make([]*proto.Metadata, 0, len(updatedMetadata))
597+
for key, result := range updatedMetadata {
598+
pr := agentsdk.ProtoFromMetadataResult(*result)
599+
metadata = append(metadata, &proto.Metadata{
600+
Key: key,
601+
Result: pr,
602+
})
603+
delete(updatedMetadata, key)
604+
}
601605

602-
go func() {
603-
ctx, cancel := context.WithTimeout(ctx, reportTimeout)
604-
defer func() {
605-
cancel()
606-
reportSemaphore <- struct{}{}
607-
}()
606+
reportInFlight = true
607+
go func() {
608+
a.logger.Debug(ctx, "batch updating metadata")
609+
ctx, cancel := context.WithTimeout(ctx, reportTimeout)
610+
defer cancel()
608611

609-
err := a.client.PostMetadata(ctx, agentsdk.PostMetadataRequest{Metadata: metadata})
610-
if err != nil {
611-
a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err))
612-
}
613-
}()
614-
}
612+
_, err := aAPI.BatchUpdateMetadata(ctx, &proto.BatchUpdateMetadataRequest{Metadata: metadata})
613+
reportError <- err
614+
}()
615615
}
616616
}
617617
}
@@ -783,6 +783,9 @@ func (a *agent) run() (retErr error) {
783783
// lifecycle reporting has to be via gracefulShutdownBehaviorRemain
784784
connMan.start("report lifecycle", gracefulShutdownBehaviorRemain, a.reportLifecycle)
785785

786+
// metadata reporting can cease as soon as we start gracefully shutting down
787+
connMan.start("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)
788+
786789
// channels to sync goroutines below
787790
// handle manifest
788791
// |

agent/agenttest/client.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ type Client struct {
8282
t testing.TB
8383
logger slog.Logger
8484
agentID uuid.UUID
85-
metadata map[string]agentsdk.Metadata
8685
coordinator tailnet.Coordinator
8786
server *drpcserver.Server
8887
fakeAgentAPI *FakeAgentAPI
@@ -131,22 +130,7 @@ func (c *Client) GetStartup() <-chan *agentproto.Startup {
131130
}
132131

133132
func (c *Client) GetMetadata() map[string]agentsdk.Metadata {
134-
c.mu.Lock()
135-
defer c.mu.Unlock()
136-
return maps.Clone(c.metadata)
137-
}
138-
139-
func (c *Client) PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error {
140-
c.mu.Lock()
141-
defer c.mu.Unlock()
142-
if c.metadata == nil {
143-
c.metadata = make(map[string]agentsdk.Metadata)
144-
}
145-
for _, md := range req.Metadata {
146-
c.metadata[md.Key] = md
147-
c.logger.Debug(ctx, "post metadata", slog.F("key", md.Key), slog.F("md", md))
148-
}
149-
return nil
133+
return c.fakeAgentAPI.GetMetadata()
150134
}
151135

152136
func (c *Client) GetStartupLogs() []agentsdk.Log {
@@ -186,6 +170,7 @@ type FakeAgentAPI struct {
186170
appHealthCh chan *agentproto.BatchUpdateAppHealthRequest
187171
logsCh chan<- *agentproto.BatchCreateLogsRequest
188172
lifecycleStates []codersdk.WorkspaceAgentLifecycle
173+
metadata map[string]agentsdk.Metadata
189174

190175
getServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
191176
}
@@ -254,9 +239,24 @@ func (f *FakeAgentAPI) UpdateStartup(_ context.Context, req *agentproto.UpdateSt
254239
return req.GetStartup(), nil
255240
}
256241

257-
func (*FakeAgentAPI) BatchUpdateMetadata(context.Context, *agentproto.BatchUpdateMetadataRequest) (*agentproto.BatchUpdateMetadataResponse, error) {
258-
// TODO implement me
259-
panic("implement me")
242+
func (f *FakeAgentAPI) GetMetadata() map[string]agentsdk.Metadata {
243+
f.Lock()
244+
defer f.Unlock()
245+
return maps.Clone(f.metadata)
246+
}
247+
248+
func (f *FakeAgentAPI) BatchUpdateMetadata(ctx context.Context, req *agentproto.BatchUpdateMetadataRequest) (*agentproto.BatchUpdateMetadataResponse, error) {
249+
f.Lock()
250+
defer f.Unlock()
251+
if f.metadata == nil {
252+
f.metadata = make(map[string]agentsdk.Metadata)
253+
}
254+
for _, md := range req.Metadata {
255+
smd := agentsdk.MetadataFromProto(md)
256+
f.metadata[md.Key] = smd
257+
f.logger.Debug(ctx, "post metadata", slog.F("key", md.Key), slog.F("md", md))
258+
}
259+
return &agentproto.BatchUpdateMetadataResponse{}, nil
260260
}
261261

262262
func (f *FakeAgentAPI) SetLogsChannel(ch chan<- *agentproto.BatchCreateLogsRequest) {

codersdk/agentsdk/agentsdk.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type PostMetadataRequest struct {
8585
// performance.
8686
type PostMetadataRequestDeprecated = codersdk.WorkspaceAgentMetadataResult
8787

88+
// PostMetadata posts agent metadata to the Coder server.
89+
//
90+
// Deprecated: use BatchUpdateMetadata on the agent dRPC API instead
8891
func (c *Client) PostMetadata(ctx context.Context, req PostMetadataRequest) error {
8992
res, err := c.SDK.Request(ctx, http.MethodPost, "/api/v2/workspaceagents/me/metadata", req)
9093
if err != nil {

codersdk/agentsdk/convert.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,31 @@ func ProtoFromMetadataDescription(d codersdk.WorkspaceAgentMetadataDescription)
112112
}
113113
}
114114

115+
func ProtoFromMetadataResult(r codersdk.WorkspaceAgentMetadataResult) *proto.WorkspaceAgentMetadata_Result {
116+
return &proto.WorkspaceAgentMetadata_Result{
117+
CollectedAt: timestamppb.New(r.CollectedAt),
118+
Age: r.Age,
119+
Value: r.Value,
120+
Error: r.Error,
121+
}
122+
}
123+
124+
func MetadataResultFromProto(r *proto.WorkspaceAgentMetadata_Result) codersdk.WorkspaceAgentMetadataResult {
125+
return codersdk.WorkspaceAgentMetadataResult{
126+
CollectedAt: r.GetCollectedAt().AsTime(),
127+
Age: r.GetAge(),
128+
Value: r.GetValue(),
129+
Error: r.GetError(),
130+
}
131+
}
132+
133+
func MetadataFromProto(m *proto.Metadata) Metadata {
134+
return Metadata{
135+
Key: m.GetKey(),
136+
WorkspaceAgentMetadataResult: MetadataResultFromProto(m.GetResult()),
137+
}
138+
}
139+
115140
func AgentScriptsFromProto(protoScripts []*proto.WorkspaceAgentScript) ([]codersdk.WorkspaceAgentScript, error) {
116141
ret := make([]codersdk.WorkspaceAgentScript, len(protoScripts))
117142
for i, protoScript := range protoScripts {

codersdk/agentsdk/convert_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/google/uuid"
88
"github.com/stretchr/testify/require"
9+
"google.golang.org/protobuf/types/known/timestamppb"
910
"tailscale.com/tailcfg"
1011

1112
"github.com/coder/coder/v2/agent/proto"
@@ -176,3 +177,42 @@ func TestProtoFromLifecycle(t *testing.T) {
176177
require.Equal(t, s, state)
177178
}
178179
}
180+
181+
func TestProtoFromMetadataResult(t *testing.T) {
182+
t.Parallel()
183+
now := dbtime.Now()
184+
result := codersdk.WorkspaceAgentMetadataResult{
185+
CollectedAt: now,
186+
Age: 4,
187+
Value: "lemons",
188+
Error: "rats",
189+
}
190+
pr := agentsdk.ProtoFromMetadataResult(result)
191+
require.NotNil(t, pr)
192+
require.Equal(t, now, pr.CollectedAt.AsTime())
193+
require.EqualValues(t, 4, pr.Age)
194+
require.Equal(t, "lemons", pr.Value)
195+
require.Equal(t, "rats", pr.Error)
196+
result2 := agentsdk.MetadataResultFromProto(pr)
197+
require.Equal(t, result, result2)
198+
}
199+
200+
func TestMetadataFromProto(t *testing.T) {
201+
t.Parallel()
202+
now := dbtime.Now()
203+
pmd := &proto.Metadata{
204+
Key: "a flat",
205+
Result: &proto.WorkspaceAgentMetadata_Result{
206+
CollectedAt: timestamppb.New(now),
207+
Age: 88,
208+
Value: "lemons",
209+
Error: "rats",
210+
},
211+
}
212+
smd := agentsdk.MetadataFromProto(pmd)
213+
require.Equal(t, "a flat", smd.Key)
214+
require.Equal(t, now, smd.CollectedAt)
215+
require.EqualValues(t, 88, smd.Age)
216+
require.Equal(t, "lemons", smd.Value)
217+
require.Equal(t, "rats", smd.Error)
218+
}

0 commit comments

Comments
 (0)