@@ -10,6 +10,7 @@ import (
10
10
"fmt"
11
11
"io"
12
12
"net"
13
+ "net/http"
13
14
"net/netip"
14
15
"os"
15
16
"os/exec"
@@ -33,6 +34,7 @@ import (
33
34
34
35
"cdr.dev/slog"
35
36
"github.com/coder/coder/agent/usershell"
37
+ "github.com/coder/coder/codersdk"
36
38
"github.com/coder/coder/pty"
37
39
"github.com/coder/coder/tailnet"
38
40
"github.com/coder/retry"
@@ -49,55 +51,41 @@ const (
49
51
MagicSessionErrorCode = 229
50
52
)
51
53
52
- var (
53
- // tailnetIP is a static IPv6 address with the Tailscale prefix that is used to route
54
- // connections from clients to this node. A dynamic address is not required because a Tailnet
55
- // client only dials a single agent at a time.
56
- tailnetIP = netip .MustParseAddr ("fd7a:115c:a1e0:49d6:b259:b7ac:b1b2:48f4" )
57
- tailnetSSHPort = 1
58
- tailnetReconnectingPTYPort = 2
59
- tailnetSpeedtestPort = 3
60
- )
61
-
62
54
type Options struct {
63
- CoordinatorDialer CoordinatorDialer
64
- FetchMetadata FetchMetadata
65
-
66
- StatsReporter StatsReporter
67
- ReconnectingPTYTimeout time.Duration
68
- EnvironmentVariables map [string ]string
69
- Logger slog.Logger
70
- }
71
-
72
- type Metadata struct {
73
- DERPMap * tailcfg.DERPMap `json:"derpmap"`
74
- EnvironmentVariables map [string ]string `json:"environment_variables"`
75
- StartupScript string `json:"startup_script"`
76
- Directory string `json:"directory"`
55
+ CoordinatorDialer CoordinatorDialer
56
+ FetchMetadata FetchMetadata
57
+ StatsReporter StatsReporter
58
+ WorkspaceAgentApps WorkspaceAgentApps
59
+ PostWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth
60
+ ReconnectingPTYTimeout time.Duration
61
+ EnvironmentVariables map [string ]string
62
+ Logger slog.Logger
77
63
}
78
64
79
65
// CoordinatorDialer is a function that constructs a new broker.
80
66
// A dialer must be passed in to allow for reconnects.
81
- type CoordinatorDialer func (ctx context.Context ) (net.Conn , error )
67
+ type CoordinatorDialer func (context.Context ) (net.Conn , error )
82
68
83
69
// FetchMetadata is a function to obtain metadata for the agent.
84
- type FetchMetadata func (ctx context.Context ) (Metadata , error )
70
+ type FetchMetadata func (context.Context ) (codersdk. WorkspaceAgentMetadata , error )
85
71
86
72
func New (options Options ) io.Closer {
87
73
if options .ReconnectingPTYTimeout == 0 {
88
74
options .ReconnectingPTYTimeout = 5 * time .Minute
89
75
}
90
76
ctx , cancelFunc := context .WithCancel (context .Background ())
91
77
server := & agent {
92
- reconnectingPTYTimeout : options .ReconnectingPTYTimeout ,
93
- logger : options .Logger ,
94
- closeCancel : cancelFunc ,
95
- closed : make (chan struct {}),
96
- envVars : options .EnvironmentVariables ,
97
- coordinatorDialer : options .CoordinatorDialer ,
98
- fetchMetadata : options .FetchMetadata ,
99
- stats : & Stats {},
100
- statsReporter : options .StatsReporter ,
78
+ reconnectingPTYTimeout : options .ReconnectingPTYTimeout ,
79
+ logger : options .Logger ,
80
+ closeCancel : cancelFunc ,
81
+ closed : make (chan struct {}),
82
+ envVars : options .EnvironmentVariables ,
83
+ coordinatorDialer : options .CoordinatorDialer ,
84
+ fetchMetadata : options .FetchMetadata ,
85
+ stats : & Stats {},
86
+ statsReporter : options .StatsReporter ,
87
+ workspaceAgentApps : options .WorkspaceAgentApps ,
88
+ postWorkspaceAgentAppHealth : options .PostWorkspaceAgentAppHealth ,
101
89
}
102
90
server .init (ctx )
103
91
return server
@@ -120,14 +108,16 @@ type agent struct {
120
108
fetchMetadata FetchMetadata
121
109
sshServer * ssh.Server
122
110
123
- network * tailnet.Conn
124
- coordinatorDialer CoordinatorDialer
125
- stats * Stats
126
- statsReporter StatsReporter
111
+ network * tailnet.Conn
112
+ coordinatorDialer CoordinatorDialer
113
+ stats * Stats
114
+ statsReporter StatsReporter
115
+ workspaceAgentApps WorkspaceAgentApps
116
+ postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth
127
117
}
128
118
129
119
func (a * agent ) run (ctx context.Context ) {
130
- var metadata Metadata
120
+ var metadata codersdk. WorkspaceAgentMetadata
131
121
var err error
132
122
// An exponential back-off occurs when the connection is failing to dial.
133
123
// This is to prevent server spam in case of a coderd outage.
@@ -168,6 +158,10 @@ func (a *agent) run(ctx context.Context) {
168
158
if metadata .DERPMap != nil {
169
159
go a .runTailnet (ctx , metadata .DERPMap )
170
160
}
161
+
162
+ if a .workspaceAgentApps != nil && a .postWorkspaceAgentAppHealth != nil {
163
+ go NewWorkspaceAppHealthReporter (a .logger , a .workspaceAgentApps , a .postWorkspaceAgentAppHealth )(ctx )
164
+ }
171
165
}
172
166
173
167
func (a * agent ) runTailnet (ctx context.Context , derpMap * tailcfg.DERPMap ) {
@@ -182,7 +176,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
182
176
}
183
177
var err error
184
178
a .network , err = tailnet .NewConn (& tailnet.Options {
185
- Addresses : []netip.Prefix {netip .PrefixFrom (tailnetIP , 128 )},
179
+ Addresses : []netip.Prefix {netip .PrefixFrom (codersdk . TailnetIP , 128 )},
186
180
DERPMap : derpMap ,
187
181
Logger : a .logger .Named ("tailnet" ),
188
182
})
@@ -199,7 +193,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
199
193
})
200
194
go a .runCoordinator (ctx )
201
195
202
- sshListener , err := a .network .Listen ("tcp" , ":" + strconv .Itoa (tailnetSSHPort ))
196
+ sshListener , err := a .network .Listen ("tcp" , ":" + strconv .Itoa (codersdk . TailnetSSHPort ))
203
197
if err != nil {
204
198
a .logger .Critical (ctx , "listen for ssh" , slog .Error (err ))
205
199
return
@@ -213,7 +207,8 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
213
207
go a .sshServer .HandleConn (a .stats .wrapConn (conn ))
214
208
}
215
209
}()
216
- reconnectingPTYListener , err := a .network .Listen ("tcp" , ":" + strconv .Itoa (tailnetReconnectingPTYPort ))
210
+
211
+ reconnectingPTYListener , err := a .network .Listen ("tcp" , ":" + strconv .Itoa (codersdk .TailnetReconnectingPTYPort ))
217
212
if err != nil {
218
213
a .logger .Critical (ctx , "listen for reconnecting pty" , slog .Error (err ))
219
214
return
@@ -239,15 +234,16 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
239
234
if err != nil {
240
235
continue
241
236
}
242
- var msg reconnectingPTYInit
237
+ var msg codersdk. ReconnectingPTYInit
243
238
err = json .Unmarshal (data , & msg )
244
239
if err != nil {
245
240
continue
246
241
}
247
242
go a .handleReconnectingPTY (ctx , msg , conn )
248
243
}
249
244
}()
250
- speedtestListener , err := a .network .Listen ("tcp" , ":" + strconv .Itoa (tailnetSpeedtestPort ))
245
+
246
+ speedtestListener , err := a .network .Listen ("tcp" , ":" + strconv .Itoa (codersdk .TailnetSpeedtestPort ))
251
247
if err != nil {
252
248
a .logger .Critical (ctx , "listen for speedtest" , slog .Error (err ))
253
249
return
@@ -268,6 +264,31 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
268
264
}()
269
265
}
270
266
}()
267
+
268
+ statisticsListener , err := a .network .Listen ("tcp" , ":" + strconv .Itoa (codersdk .TailnetStatisticsPort ))
269
+ if err != nil {
270
+ a .logger .Critical (ctx , "listen for statistics" , slog .Error (err ))
271
+ return
272
+ }
273
+ go func () {
274
+ defer statisticsListener .Close ()
275
+ server := & http.Server {
276
+ Handler : a .statisticsHandler (),
277
+ ReadTimeout : 20 * time .Second ,
278
+ ReadHeaderTimeout : 20 * time .Second ,
279
+ WriteTimeout : 20 * time .Second ,
280
+ ErrorLog : slog .Stdlib (ctx , a .logger .Named ("statistics_http_server" ), slog .LevelInfo ),
281
+ }
282
+ go func () {
283
+ <- ctx .Done ()
284
+ _ = server .Close ()
285
+ }()
286
+
287
+ err = server .Serve (statisticsListener )
288
+ if err != nil && ! xerrors .Is (err , http .ErrServerClosed ) && ! strings .Contains (err .Error (), "use of closed network connection" ) {
289
+ a .logger .Critical (ctx , "serve statistics HTTP server" , slog .Error (err ))
290
+ }
291
+ }()
271
292
}
272
293
273
294
// runCoordinator listens for nodes and updates the self-node as it changes.
@@ -443,7 +464,7 @@ func (a *agent) init(ctx context.Context) {
443
464
444
465
go a .run (ctx )
445
466
if a .statsReporter != nil {
446
- cl , err := a .statsReporter (ctx , a .logger , func () * Stats {
467
+ cl , err := a .statsReporter (ctx , a .logger , func () * codersdk. AgentStats {
447
468
return a .stats .Copy ()
448
469
})
449
470
if err != nil {
@@ -478,7 +499,7 @@ func (a *agent) createCommand(ctx context.Context, rawCommand string, env []stri
478
499
if rawMetadata == nil {
479
500
return nil , xerrors .Errorf ("no metadata was provided: %w" , err )
480
501
}
481
- metadata , valid := rawMetadata .(Metadata )
502
+ metadata , valid := rawMetadata .(codersdk. WorkspaceAgentMetadata )
482
503
if ! valid {
483
504
return nil , xerrors .Errorf ("metadata is the wrong type: %T" , metadata )
484
505
}
@@ -634,7 +655,7 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
634
655
return cmd .Wait ()
635
656
}
636
657
637
- func (a * agent ) handleReconnectingPTY (ctx context.Context , msg reconnectingPTYInit , conn net.Conn ) {
658
+ func (a * agent ) handleReconnectingPTY (ctx context.Context , msg codersdk. ReconnectingPTYInit , conn net.Conn ) {
638
659
defer conn .Close ()
639
660
640
661
var rpty * reconnectingPTY
@@ -775,7 +796,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg reconnectingPTYIn
775
796
rpty .activeConnsMutex .Unlock ()
776
797
}()
777
798
decoder := json .NewDecoder (conn )
778
- var req ReconnectingPTYRequest
799
+ var req codersdk. ReconnectingPTYRequest
779
800
for {
780
801
err = decoder .Decode (& req )
781
802
if xerrors .Is (err , io .EOF ) {
0 commit comments