4
4
"cdr.dev/slog"
5
5
"context"
6
6
"database/sql"
7
+ "fmt"
8
+ "github.com/coder/coder/v2/coderd/database/pubsub"
7
9
"net/url"
8
10
"strings"
9
11
"time"
@@ -35,6 +37,7 @@ type ManifestAPI struct {
35
37
AgentFn func (context.Context ) (database.WorkspaceAgent , error )
36
38
Database database.Store
37
39
DerpMapFn func () * tailcfg.DERPMap
40
+ Pubsub pubsub.Pubsub
38
41
Log slog.Logger
39
42
}
40
43
@@ -47,21 +50,52 @@ func (a *ManifestAPI) StreamManifests(in *agentproto.GetManifestRequest, stream
47
50
}
48
51
}()
49
52
53
+ updates := make (chan struct {}, 1 )
54
+
55
+ unsub , err := a .Pubsub .Subscribe (ManifestUpdateChannel (a .WorkspaceID ), func (ctx context.Context , _ []byte ) {
56
+ a .Log .Info (ctx , "received 'prebuild claimed' event for workspace, pushing down new manifest" , slog .F ("workspace_id" , a .WorkspaceID .String ()))
57
+ select {
58
+ case <- streamCtx .Done ():
59
+ return
60
+ case <- ctx .Done ():
61
+ return
62
+ case updates <- struct {}{}:
63
+ }
64
+ })
65
+ if err != nil {
66
+ return xerrors .Errorf ("subscribe to 'prebuild claimed' event: %w" , err )
67
+ }
68
+ defer unsub ()
69
+
50
70
for {
51
- resp , err := a .GetManifest (streamCtx , in )
71
+ manifest , err := a .GetManifest (streamCtx , in )
52
72
if err != nil {
53
73
return xerrors .Errorf ("receive manifest: %w" , err )
54
74
}
55
75
56
- err = stream .Send (resp )
76
+ a .Log .Debug (streamCtx , "pushing manifest to workspace" , slog .F ("workspace_id" , a .WorkspaceID ))
77
+
78
+ // Send first retrieved manifest.
79
+ err = stream .Send (manifest )
57
80
if err != nil {
58
81
return xerrors .Errorf ("send manifest: %w" , err )
59
82
}
60
83
61
- time .Sleep (time .Second * 5 )
84
+ // ...then wait until triggered by prebuild claim completion.
85
+ // At this stage, a prebuild will have been claimed by a user and the agent will need to be reconfigured.
86
+ select {
87
+ case <- updates :
88
+ a .Log .Info (streamCtx , "received manifest update request" , slog .F ("workspace_id" , a .WorkspaceID ))
89
+ case <- streamCtx .Done ():
90
+ return xerrors .Errorf ("stream close: %w" , streamCtx .Err ())
91
+ }
62
92
}
63
93
}
64
94
95
+ func ManifestUpdateChannel (id uuid.UUID ) string {
96
+ return fmt .Sprintf ("prebuild_claimed_%s" , id )
97
+ }
98
+
65
99
func (a * ManifestAPI ) GetManifest (ctx context.Context , _ * agentproto.GetManifestRequest ) (* agentproto.Manifest , error ) {
66
100
workspaceAgent , err := a .AgentFn (ctx )
67
101
if err != nil {
0 commit comments