Skip to content

Commit 0ba8f89

Browse files
committed
WIP: claim triggering manifest push to agent
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 903f896 commit 0ba8f89

File tree

9 files changed

+98
-16
lines changed

9 files changed

+98
-16
lines changed

agent/agent.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ type Options struct {
8787
}
8888

8989
type Client interface {
90-
ConnectRPC23(ctx context.Context) (
90+
ConnectRPC24(ctx context.Context) (
9191
proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error,
9292
)
9393
RewriteDERPMap(derpMap *tailcfg.DERPMap)
@@ -742,7 +742,7 @@ func (a *agent) run() (retErr error) {
742742
a.sessionToken.Store(&sessionToken)
743743

744744
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
745-
aAPI, tAPI, err := a.client.ConnectRPC23(a.hardCtx)
745+
aAPI, tAPI, err := a.client.ConnectRPC24(a.hardCtx)
746746
if err != nil {
747747
return err
748748
}
@@ -915,7 +915,6 @@ func (a *agent) handleManifestStream(manifestOK *checkpoint) func(ctx context.Co
915915
}
916916
}
917917

918-
// TODO: change signature to just take in all inputs instead of returning closure; return error
919918
func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentClient24, manifestOK *checkpoint, mp *proto.Manifest) error {
920919
var (
921920
sentResult bool
@@ -927,6 +926,8 @@ func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentCl
927926
}
928927
}()
929928

929+
a.metrics.manifestsReceived.Inc()
930+
930931
manifest, err := agentsdk.ManifestFromProto(mp)
931932
if err != nil {
932933
a.logger.Critical(ctx, "failed to convert manifest", slog.F("manifest", mp), slog.Error(err))
@@ -964,9 +965,6 @@ func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentCl
964965
manifestOK.complete(nil)
965966
sentResult = true
966967

967-
// TODO: remove
968-
a.logger.Info(ctx, "NOW OWNED BY", slog.F("owner", manifest.OwnerName))
969-
970968
// TODO: this will probably have to change in the case of prebuilds; maybe check if owner is the same,
971969
// or add prebuild metadata to manifest?
972970
// The startup script should only execute on the first run!

agent/metrics.go

+10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type agentMetrics struct {
2020
// took to run. This is reported once per agent.
2121
startupScriptSeconds *prometheus.GaugeVec
2222
currentConnections *prometheus.GaugeVec
23+
manifestsReceived prometheus.Counter
2324
}
2425

2526
func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics {
@@ -54,11 +55,20 @@ func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics {
5455
}, []string{"connection_type"})
5556
registerer.MustRegister(currentConnections)
5657

58+
manifestsReceived := prometheus.NewCounter(prometheus.CounterOpts{
59+
Namespace: "coderd",
60+
Subsystem: "agentstats",
61+
Name: "manifests_received",
62+
Help: "The number of manifests this agent has received from the control plane.",
63+
})
64+
registerer.MustRegister(manifestsReceived)
65+
5766
return &agentMetrics{
5867
connectionsTotal: connectionsTotal,
5968
reconnectingPTYErrors: reconnectingPTYErrors,
6069
startupScriptSeconds: startupScriptSeconds,
6170
currentConnections: currentConnections,
71+
manifestsReceived: manifestsReceived,
6272
}
6373
}
6474

coderd/agentapi/api.go

+1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ func New(opts Options) *API {
9797
DerpMapFn: opts.DerpMapFn,
9898
WorkspaceID: opts.WorkspaceID,
9999
Log: opts.Log.Named("manifests"),
100+
Pubsub: opts.Pubsub,
100101
}
101102

102103
api.AnnouncementBannerAPI = &AnnouncementBannerAPI{

coderd/agentapi/manifest.go

+37-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"cdr.dev/slog"
55
"context"
66
"database/sql"
7+
"fmt"
8+
"github.com/coder/coder/v2/coderd/database/pubsub"
79
"net/url"
810
"strings"
911
"time"
@@ -35,6 +37,7 @@ type ManifestAPI struct {
3537
AgentFn func(context.Context) (database.WorkspaceAgent, error)
3638
Database database.Store
3739
DerpMapFn func() *tailcfg.DERPMap
40+
Pubsub pubsub.Pubsub
3841
Log slog.Logger
3942
}
4043

@@ -47,21 +50,52 @@ func (a *ManifestAPI) StreamManifests(in *agentproto.GetManifestRequest, stream
4750
}
4851
}()
4952

53+
updates := make(chan struct{}, 1)
54+
55+
unsub, err := a.Pubsub.Subscribe(ManifestUpdateChannel(a.WorkspaceID), func(ctx context.Context, _ []byte) {
56+
a.Log.Info(ctx, "received 'prebuild claimed' event for workspace, pushing down new manifest", slog.F("workspace_id", a.WorkspaceID.String()))
57+
select {
58+
case <-streamCtx.Done():
59+
return
60+
case <-ctx.Done():
61+
return
62+
case updates <- struct{}{}:
63+
}
64+
})
65+
if err != nil {
66+
return xerrors.Errorf("subscribe to 'prebuild claimed' event: %w", err)
67+
}
68+
defer unsub()
69+
5070
for {
51-
resp, err := a.GetManifest(streamCtx, in)
71+
manifest, err := a.GetManifest(streamCtx, in)
5272
if err != nil {
5373
return xerrors.Errorf("receive manifest: %w", err)
5474
}
5575

56-
err = stream.Send(resp)
76+
a.Log.Debug(streamCtx, "pushing manifest to workspace", slog.F("workspace_id", a.WorkspaceID))
77+
78+
// Send first retrieved manifest.
79+
err = stream.Send(manifest)
5780
if err != nil {
5881
return xerrors.Errorf("send manifest: %w", err)
5982
}
6083

61-
time.Sleep(time.Second * 5)
84+
// ...then wait until triggered by prebuild claim completion.
85+
// At this stage, a prebuild will have been claimed by a user and the agent will need to be reconfigured.
86+
select {
87+
case <-updates:
88+
a.Log.Info(streamCtx, "received manifest update request", slog.F("workspace_id", a.WorkspaceID))
89+
case <-streamCtx.Done():
90+
return xerrors.Errorf("stream close: %w", streamCtx.Err())
91+
}
6292
}
6393
}
6494

95+
func ManifestUpdateChannel(id uuid.UUID) string {
96+
return fmt.Sprintf("prebuild_claimed_%s", id)
97+
}
98+
6599
func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
66100
workspaceAgent, err := a.AgentFn(ctx)
67101
if err != nil {

coderd/provisionerdserver/provisionerdserver.go

+16-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"github.com/coder/coder/v2/coderd/agentapi"
910
"net/http"
1011
"net/url"
1112
"reflect"
@@ -1707,6 +1708,17 @@ func (s *server) CompleteJob(ctx context.Context, completed *proto.CompletedJob)
17071708
if err != nil {
17081709
return nil, xerrors.Errorf("update workspace: %w", err)
17091710
}
1711+
1712+
// If this job was initiated by the prebuilds user and the job is not a prebuild, then it MUST be the claim run.
1713+
// TODO: maybe add some specific metadata to indicate this rather than imputing it.
1714+
if input.IsPrebuildClaimByUser != uuid.Nil {
1715+
s.Logger.Info(ctx, "workspace prebuild successfully claimed by user",
1716+
slog.F("user", input.IsPrebuildClaimByUser.String()),
1717+
slog.F("workspace_id", workspace.ID))
1718+
if err := s.Pubsub.Publish(agentapi.ManifestUpdateChannel(workspace.ID), nil); err != nil {
1719+
s.Logger.Error(ctx, "failed to publish message to workspace agent to pull new manifest", slog.Error(err))
1720+
}
1721+
}
17101722
case *proto.CompletedJob_TemplateDryRun_:
17111723
for _, resource := range jobType.TemplateDryRun.Resources {
17121724
s.Logger.Info(ctx, "inserting template dry-run job resource",
@@ -2356,9 +2368,10 @@ type TemplateVersionImportJob struct {
23562368

23572369
// WorkspaceProvisionJob is the payload for the "workspace_provision" job type.
23582370
type WorkspaceProvisionJob struct {
2359-
WorkspaceBuildID uuid.UUID `json:"workspace_build_id"`
2360-
DryRun bool `json:"dry_run"`
2361-
IsPrebuild bool `json:"is_prebuild,omitempty"`
2371+
WorkspaceBuildID uuid.UUID `json:"workspace_build_id"`
2372+
DryRun bool `json:"dry_run"`
2373+
IsPrebuild bool `json:"is_prebuild,omitempty"`
2374+
IsPrebuildClaimByUser uuid.UUID `json:"is_prebuild_claim_by,omitempty"`
23622375
// RunningWorkspaceAgentID is *only* used for prebuilds. We pass it down when we want to rebuild a prebuilt workspace
23632376
// but not generate a new agent token. The provisionerdserver will retrieve this token and push it down to
23642377
// the provisioner (and ultimately to the `coder_agent` resource in the Terraform provider) where it will be

coderd/workspaces.go

+5
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,7 @@ func createWorkspace(
685685

686686
if claimedWorkspace != nil {
687687
workspaceID = claimedWorkspace.ID
688+
initiatorID = prebuilds.PrebuildOwnerUUID
688689
} else {
689690
// Workspaces are created without any versions.
690691
minimumWorkspace, err := db.InsertWorkspace(ctx, database.InsertWorkspaceParams{
@@ -727,6 +728,10 @@ func createWorkspace(
727728
builder = builder.VersionID(req.TemplateVersionID)
728729
}
729730

731+
if claimedWorkspace != nil {
732+
builder = builder.MarkPrebuildClaimBy(owner.ID)
733+
}
734+
730735
workspaceBuild, provisionerJob, provisionerDaemons, err = builder.Build(
731736
ctx,
732737
db,

coderd/wsbuilder/wsbuilder.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,10 @@ type Builder struct {
7272
lastBuildJob *database.ProvisionerJob
7373
parameterNames *[]string
7474
parameterValues *[]string
75-
prebuild bool
76-
runningWorkspaceAgentID uuid.UUID
75+
76+
prebuild bool
77+
prebuildClaimBy uuid.UUID
78+
runningWorkspaceAgentID uuid.UUID
7779

7880
verifyNoLegacyParametersOnce bool
7981
}
@@ -176,6 +178,12 @@ func (b Builder) MarkPrebuild() Builder {
176178
return b
177179
}
178180

181+
func (b Builder) MarkPrebuildClaimBy(userID uuid.UUID) Builder {
182+
// nolint: revive
183+
b.prebuildClaimBy = userID
184+
return b
185+
}
186+
179187
// RunningWorkspaceAgentID is only used for prebuilds; see the associated field in `provisionerdserver.WorkspaceProvisionJob`.
180188
func (b Builder) RunningWorkspaceAgentID(id uuid.UUID) Builder {
181189
// nolint: revive
@@ -311,6 +319,7 @@ func (b *Builder) buildTx(authFunc func(action policy.Action, object rbac.Object
311319
WorkspaceBuildID: workspaceBuildID,
312320
LogLevel: b.logLevel,
313321
IsPrebuild: b.prebuild,
322+
IsPrebuildClaimByUser: b.prebuildClaimBy,
314323
RunningWorkspaceAgentID: b.runningWorkspaceAgentID,
315324
})
316325
if err != nil {

codersdk/agentsdk/agentsdk.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func (c *Client) ConnectRPC22(ctx context.Context) (
220220
// ConnectRPC23 returns a dRPC client to the Agent API v2.3. It is useful when you want to be
221221
// maximally compatible with Coderd Release Versions from 2.18+
222222
func (c *Client) ConnectRPC23(ctx context.Context) (
223-
proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error,
223+
proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error,
224224
) {
225225
conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 3))
226226
if err != nil {
@@ -229,6 +229,18 @@ func (c *Client) ConnectRPC23(ctx context.Context) (
229229
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
230230
}
231231

232+
// ConnectRPC24 returns a dRPC client to the Agent API v2.4. It is useful when you want to be
233+
// maximally compatible with Coderd Release Versions from 2.18+ // TODO update release
234+
func (c *Client) ConnectRPC24(ctx context.Context) (
235+
proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error,
236+
) {
237+
conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 4))
238+
if err != nil {
239+
return nil, nil, err
240+
}
241+
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
242+
}
243+
232244
// ConnectRPC connects to the workspace agent API and tailnet API
233245
func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
234246
return c.connectRPCVersion(ctx, proto.CurrentVersion)

tailnet/proto/version.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import (
4040
// ScriptCompleted, but be prepared to process "unsupported" errors.)
4141
const (
4242
CurrentMajor = 2
43-
CurrentMinor = 3
43+
CurrentMinor = 4
4444
)
4545

4646
var CurrentVersion = apiversion.New(CurrentMajor, CurrentMinor)

0 commit comments

Comments
 (0)