Skip to content

Commit dcf072e

Browse files
committed
derp mesh probably working
1 parent 4ba7af6 commit dcf072e

File tree

5 files changed

+276
-50
lines changed

5 files changed

+276
-50
lines changed

enterprise/cli/proxyserver.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -225,22 +225,27 @@ func (*RootCmd) proxyServer() *clibase.Cmd {
225225
closers.Add(closeFunc)
226226
}
227227

228-
proxy, err := wsproxy.New(ctx, &wsproxy.Options{
229-
Logger: logger,
230-
HTTPClient: httpClient,
231-
DashboardURL: primaryAccessURL.Value(),
232-
AccessURL: cfg.AccessURL.Value(),
233-
AppHostname: appHostname,
234-
AppHostnameRegex: appHostnameRegex,
235-
RealIPConfig: realIPConfig,
236-
Tracing: tracer,
237-
PrometheusRegistry: prometheusRegistry,
238-
APIRateLimit: int(cfg.RateLimit.API.Value()),
239-
SecureAuthCookie: cfg.SecureAuthCookie.Value(),
240-
DisablePathApps: cfg.DisablePathApps.Value(),
241-
DERPEnabled: cfg.DERP.Server.Enable.Value(),
242-
ProxySessionToken: proxySessionToken.Value(),
243-
})
228+
opts := &wsproxy.Options{
229+
Logger: logger,
230+
HTTPClient: httpClient,
231+
DashboardURL: primaryAccessURL.Value(),
232+
AccessURL: cfg.AccessURL.Value(),
233+
AppHostname: appHostname,
234+
AppHostnameRegex: appHostnameRegex,
235+
RealIPConfig: realIPConfig,
236+
Tracing: tracer,
237+
PrometheusRegistry: prometheusRegistry,
238+
APIRateLimit: int(cfg.RateLimit.API.Value()),
239+
SecureAuthCookie: cfg.SecureAuthCookie.Value(),
240+
DisablePathApps: cfg.DisablePathApps.Value(),
241+
DERPEnabled: cfg.DERP.Server.Enable.Value(),
242+
DERPServerRelayAddress: cfg.DERP.Server.RelayURL.String(),
243+
ProxySessionToken: proxySessionToken.Value(),
244+
}
245+
if httpServers.TLSConfig != nil {
246+
opts.TLSCertificates = httpServers.TLSConfig.Certificates
247+
}
248+
proxy, err := wsproxy.New(ctx, opts)
244249
if err != nil {
245250
return xerrors.Errorf("create workspace proxy: %w", err)
246251
}

enterprise/coderd/coderdenttest/proxytest.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/moby/moby/pkg/namesgenerator"
1616
"github.com/prometheus/client_golang/prometheus"
17+
"github.com/stretchr/testify/assert"
1718
"github.com/stretchr/testify/require"
1819

1920
"cdr.dev/slog"
@@ -131,6 +132,10 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie
131132
PrometheusRegistry: prometheus.NewRegistry(),
132133
})
133134
require.NoError(t, err)
135+
t.Cleanup(func() {
136+
err := wssrv.Close()
137+
assert.NoError(t, err)
138+
})
134139

135140
mutex.Lock()
136141
handler = wssrv.Handler

enterprise/coderd/workspaceproxy.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,8 @@ func (api *API) workspaceProxyRegister(rw http.ResponseWriter, r *http.Request)
391391
if replica.StoppedAt.Valid && !replica.StartedAt.IsZero() {
392392
// If the replica deregistered, it shouldn't be able to
393393
// re-register before restarting.
394-
// TODO: sadly this results in 500
395-
return xerrors.Errorf("replica %s is stopped but not deregistered", replica.ID)
394+
// TODO: sadly this results in 500 when it should be 400
395+
return xerrors.Errorf("replica %s is marked stopped", replica.ID)
396396
}
397397

398398
replica, err = db.UpdateReplica(ctx, database.UpdateReplicaParams{
@@ -465,6 +465,7 @@ func (api *API) workspaceProxyRegister(rw http.ResponseWriter, r *http.Request)
465465

466466
httpapi.Write(ctx, rw, http.StatusCreated, wsproxysdk.RegisterWorkspaceProxyResponse{
467467
AppSecurityKey: api.AppSecurityKey.String(),
468+
DERPMeshKey: api.DERPServer.MeshKey(),
468469
SiblingReplicas: siblingsRes,
469470
})
470471

enterprise/wsproxy/wsproxy.go

Lines changed: 112 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package wsproxy
22

33
import (
44
"context"
5+
"crypto/tls"
6+
"crypto/x509"
57
"fmt"
68
"net/http"
79
"net/url"
@@ -13,6 +15,7 @@ import (
1315

1416
"github.com/go-chi/chi/v5"
1517
"github.com/google/uuid"
18+
"github.com/hashicorp/go-multierror"
1619
"github.com/prometheus/client_golang/prometheus"
1720
"go.opentelemetry.io/otel/trace"
1821
"golang.org/x/xerrors"
@@ -28,6 +31,7 @@ import (
2831
"github.com/coder/coder/coderd/workspaceapps"
2932
"github.com/coder/coder/coderd/wsconncache"
3033
"github.com/coder/coder/codersdk"
34+
"github.com/coder/coder/enterprise/derpmesh"
3135
"github.com/coder/coder/enterprise/wsproxy/wsproxysdk"
3236
"github.com/coder/coder/site"
3337
"github.com/coder/coder/tailnet"
@@ -57,11 +61,13 @@ type Options struct {
5761
RealIPConfig *httpmw.RealIPConfig
5862
Tracing trace.TracerProvider
5963
PrometheusRegistry *prometheus.Registry
64+
TLSCertificates []tls.Certificate
6065

61-
APIRateLimit int
62-
SecureAuthCookie bool
63-
DisablePathApps bool
64-
DERPEnabled bool
66+
APIRateLimit int
67+
SecureAuthCookie bool
68+
DisablePathApps bool
69+
DERPEnabled bool
70+
DERPServerRelayAddress string
6571

6672
ProxySessionToken string
6773
}
@@ -101,10 +107,14 @@ type Server struct {
101107
// the moon's token.
102108
SDKClient *wsproxysdk.Client
103109

110+
// DERP
111+
derpMesh *derpmesh.Mesh
112+
104113
// Used for graceful shutdown. Required for the dialer.
105114
ctx context.Context
106115
cancel context.CancelFunc
107116
derpCloseFunc func()
117+
registerDone <-chan struct{}
108118
}
109119

110120
// New creates a new workspace proxy server. This requires a primary coderd
@@ -139,35 +149,33 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
139149
return nil, xerrors.Errorf("%q is a workspace proxy, not a primary coderd instance", opts.DashboardURL)
140150
}
141151

142-
// TODO: registering logic need to be moved to a struct that calls it
143-
// periodically
144-
replicaID := uuid.New()
145-
osHostname, err := os.Hostname()
146-
if err != nil {
147-
return nil, xerrors.Errorf("get OS hostname: %w", err)
152+
meshRootCA := x509.NewCertPool()
153+
for _, certificate := range opts.TLSCertificates {
154+
for _, certificatePart := range certificate.Certificate {
155+
certificate, err := x509.ParseCertificate(certificatePart)
156+
if err != nil {
157+
return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err)
158+
}
159+
meshRootCA.AddCert(certificate)
160+
}
148161
}
149-
regResp, err := client.RegisterWorkspaceProxy(ctx, wsproxysdk.RegisterWorkspaceProxyRequest{
150-
AccessURL: opts.AccessURL.String(),
151-
WildcardHostname: opts.AppHostname,
152-
DerpEnabled: opts.DERPEnabled,
153-
ReplicaID: replicaID,
154-
ReplicaHostname: osHostname,
155-
ReplicaError: "",
156-
// TODO: replica relay address
157-
ReplicaRelayAddress: "",
158-
Version: buildinfo.Version(),
159-
})
160-
if err != nil {
161-
return nil, xerrors.Errorf("register proxy: %w", err)
162+
// This TLS configuration spoofs access from the access URL hostname
163+
// assuming that the certificates provided will cover that hostname.
164+
//
165+
// Replica sync and DERP meshing require accessing replicas via their
166+
// internal IP addresses, and if TLS is configured we use the same
167+
// certificates.
168+
meshTLSConfig := &tls.Config{
169+
MinVersion: tls.VersionTLS12,
170+
Certificates: opts.TLSCertificates,
171+
RootCAs: meshRootCA,
172+
ServerName: opts.AccessURL.Hostname(),
162173
}
163174

164-
secKey, err := workspaceapps.KeyFromString(regResp.AppSecurityKey)
165-
if err != nil {
166-
return nil, xerrors.Errorf("parse app security key: %w", err)
167-
}
175+
derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(opts.Logger.Named("derp")))
168176

169-
r := chi.NewRouter()
170177
ctx, cancel := context.WithCancel(context.Background())
178+
r := chi.NewRouter()
171179
s := &Server{
172180
Options: opts,
173181
Handler: r,
@@ -176,10 +184,48 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
176184
TracerProvider: opts.Tracing,
177185
PrometheusRegistry: opts.PrometheusRegistry,
178186
SDKClient: client,
187+
derpMesh: derpmesh.New(opts.Logger.Named("derpmesh"), derpServer, meshTLSConfig),
179188
ctx: ctx,
180189
cancel: cancel,
181190
}
182191

192+
// Register the workspace proxy with the primary coderd instance and start a
193+
// goroutine to periodically re-register.
194+
replicaID := uuid.New()
195+
osHostname, err := os.Hostname()
196+
if err != nil {
197+
return nil, xerrors.Errorf("get OS hostname: %w", err)
198+
}
199+
regResp, registerDone, err := client.RegisterWorkspaceProxyLoop(ctx, wsproxysdk.RegisterWorkspaceProxyLoopOpts{
200+
Logger: opts.Logger,
201+
Request: wsproxysdk.RegisterWorkspaceProxyRequest{
202+
AccessURL: opts.AccessURL.String(),
203+
WildcardHostname: opts.AppHostname,
204+
DerpEnabled: opts.DERPEnabled,
205+
ReplicaID: replicaID,
206+
ReplicaHostname: osHostname,
207+
ReplicaError: "",
208+
ReplicaRelayAddress: opts.DERPServerRelayAddress,
209+
Version: buildinfo.Version(),
210+
},
211+
MutateFn: s.mutateRegister,
212+
CallbackFn: s.handleRegister,
213+
FailureFn: s.handleRegisterFailure,
214+
})
215+
if err != nil {
216+
return nil, xerrors.Errorf("register proxy: %w", err)
217+
}
218+
s.registerDone = registerDone
219+
err = s.handleRegister(ctx, regResp)
220+
if err != nil {
221+
return nil, xerrors.Errorf("handle register: %w", err)
222+
}
223+
derpServer.SetMeshKey(regResp.DERPMeshKey)
224+
225+
secKey, err := workspaceapps.KeyFromString(regResp.AppSecurityKey)
226+
if err != nil {
227+
return nil, xerrors.Errorf("parse app security key: %w", err)
228+
}
183229
s.AppServer = &workspaceapps.Server{
184230
Logger: opts.Logger.Named("workspaceapps"),
185231
DashboardURL: opts.DashboardURL,
@@ -202,9 +248,6 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
202248
SecureAuthCookie: opts.SecureAuthCookie,
203249
}
204250

205-
derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(opts.Logger.Named("derp")))
206-
// TODO: mesh and derpmesh package stuff
207-
// derpServer.SetMeshKey(regResp.DERPMeshKey)
208251
derpHandler := derphttp.Handler(derpServer)
209252
derpHandler, s.derpCloseFunc = tailnet.WithWebsocketSupport(derpServer, derpHandler)
210253

@@ -279,14 +322,51 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
279322

280323
func (s *Server) Close() error {
281324
s.cancel()
325+
326+
var err error
327+
registerDoneWaitTicker := time.NewTicker(3 * time.Second)
328+
select {
329+
case <-registerDoneWaitTicker.C:
330+
err = multierror.Append(err, xerrors.New("timed out waiting for registerDone"))
331+
case <-s.registerDone:
332+
}
282333
s.derpCloseFunc()
283-
return s.AppServer.Close()
334+
appServerErr := s.AppServer.Close()
335+
if appServerErr != nil {
336+
err = multierror.Append(err, appServerErr)
337+
}
338+
return err
284339
}
285340

286341
func (s *Server) DialWorkspaceAgent(id uuid.UUID) (*codersdk.WorkspaceAgentConn, error) {
287342
return s.SDKClient.DialWorkspaceAgent(s.ctx, id, nil)
288343
}
289344

345+
func (*Server) mutateRegister(_ *wsproxysdk.RegisterWorkspaceProxyRequest) {
346+
// TODO: we should probably ping replicas similarly to the replicasync
347+
// package in the primary and update req.ReplicaError accordingly.
348+
}
349+
350+
func (s *Server) handleRegister(_ context.Context, res wsproxysdk.RegisterWorkspaceProxyResponse) error {
351+
addresses := make([]string, len(res.SiblingReplicas))
352+
for i, replica := range res.SiblingReplicas {
353+
addresses[i] = replica.RelayAddress
354+
}
355+
s.derpMesh.SetAddresses(addresses, false)
356+
357+
return nil
358+
}
359+
360+
func (s *Server) handleRegisterFailure(err error) {
361+
if s.ctx.Err() != nil {
362+
return
363+
}
364+
s.Logger.Fatal(s.ctx,
365+
"failed to periodically re-register workspace proxy with primary Coder deployment",
366+
slog.Error(err),
367+
)
368+
}
369+
290370
func (s *Server) buildInfo(rw http.ResponseWriter, r *http.Request) {
291371
httpapi.Write(r.Context(), rw, http.StatusOK, codersdk.BuildInfoResponse{
292372
ExternalURL: buildinfo.ExternalURL(),

0 commit comments

Comments
 (0)