Skip to content

Commit d2a5b31

Browse files
authored
feat: add derp mesh health checking in workspace proxies (#12222)
1 parent 6b0b87e commit d2a5b31

File tree

7 files changed

+445
-102
lines changed

7 files changed

+445
-102
lines changed

enterprise/cli/proxyserver.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func (r *RootCmd) proxyServer() *clibase.Cmd {
244244
closers.Add(closeFunc)
245245
}
246246

247-
proxy, err := wsproxy.New(ctx, &wsproxy.Options{
247+
options := &wsproxy.Options{
248248
Logger: logger,
249249
Experiments: coderd.ReadExperiments(logger, cfg.Experiments.Value()),
250250
HTTPClient: httpClient,
@@ -264,7 +264,12 @@ func (r *RootCmd) proxyServer() *clibase.Cmd {
264264
DERPOnly: derpOnly.Value(),
265265
BlockDirect: cfg.DERP.Config.BlockDirect.Value(),
266266
DERPServerRelayAddress: cfg.DERP.Server.RelayURL.String(),
267-
})
267+
}
268+
if httpServers.TLSConfig != nil {
269+
options.TLSCertificates = httpServers.TLSConfig.Certificates
270+
}
271+
272+
proxy, err := wsproxy.New(ctx, options)
268273
if err != nil {
269274
return xerrors.Errorf("create workspace proxy: %w", err)
270275
}

enterprise/coderd/coderd.go

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package coderd
33
import (
44
"context"
55
"crypto/ed25519"
6-
"crypto/tls"
7-
"crypto/x509"
86
"fmt"
97
"math"
108
"net/http"
@@ -416,27 +414,9 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
416414
})
417415
}
418416

419-
meshRootCA := x509.NewCertPool()
420-
for _, certificate := range options.TLSCertificates {
421-
for _, certificatePart := range certificate.Certificate {
422-
certificate, err := x509.ParseCertificate(certificatePart)
423-
if err != nil {
424-
return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err)
425-
}
426-
meshRootCA.AddCert(certificate)
427-
}
428-
}
429-
// This TLS configuration spoofs access from the access URL hostname
430-
// assuming that the certificates provided will cover that hostname.
431-
//
432-
// Replica sync and DERP meshing require accessing replicas via their
433-
// internal IP addresses, and if TLS is configured we use the same
434-
// certificates.
435-
meshTLSConfig := &tls.Config{
436-
MinVersion: tls.VersionTLS12,
437-
Certificates: options.TLSCertificates,
438-
RootCAs: meshRootCA,
439-
ServerName: options.AccessURL.Hostname(),
417+
meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(options.AccessURL.Hostname(), options.TLSCertificates)
418+
if err != nil {
419+
return nil, xerrors.Errorf("create DERP mesh TLS config: %w", err)
440420
}
441421
api.replicaManager, err = replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{
442422
ID: api.AGPL.ID,

enterprise/coderd/coderdenttest/proxytest.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ type ProxyOptions struct {
4444
// region.
4545
Token string
4646

47+
// ReplicaPingCallback is optional.
48+
ReplicaPingCallback func(replicas []codersdk.Replica, err string)
49+
4750
// FlushStats is optional
4851
FlushStats chan chan<- struct{}
4952
}
@@ -158,6 +161,7 @@ func NewWorkspaceProxyReplica(t *testing.T, coderdAPI *coderd.API, owner *coders
158161
DERPEnabled: !options.DerpDisabled,
159162
DERPOnly: options.DerpOnly,
160163
DERPServerRelayAddress: serverURL.String(),
164+
ReplicaErrCallback: options.ReplicaPingCallback,
161165
StatsCollectorOptions: statsCollectorOptions,
162166
BlockDirect: options.BlockDirect,
163167
})

enterprise/replicasync/replicasync.go

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package replicasync
33
import (
44
"context"
55
"crypto/tls"
6+
"crypto/x509"
67
"database/sql"
78
"errors"
89
"fmt"
@@ -265,45 +266,35 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
265266
},
266267
}
267268
defer client.CloseIdleConnections()
268-
var wg sync.WaitGroup
269-
var mu sync.Mutex
270-
failed := make([]string, 0)
271-
for _, peer := range m.Regional() {
272-
wg.Add(1)
269+
270+
peers := m.Regional()
271+
errs := make(chan error, len(peers))
272+
for _, peer := range peers {
273273
go func(peer database.Replica) {
274-
defer wg.Done()
275-
ra, err := url.Parse(peer.RelayAddress)
274+
err := PingPeerReplica(ctx, client, peer.RelayAddress)
276275
if err != nil {
277-
m.logger.Warn(ctx, "could not parse relay address",
278-
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
276+
errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err)
277+
m.logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown",
278+
slog.F("replica_hostname", peer.Hostname),
279+
slog.F("replica_relay_address", peer.RelayAddress),
280+
slog.Error(err),
281+
)
279282
return
280283
}
281-
target, err := ra.Parse("/derp/latency-check")
282-
if err != nil {
283-
m.logger.Warn(ctx, "could not resolve /derp/latency-check endpoint",
284-
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
285-
return
286-
}
287-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
288-
if err != nil {
289-
m.logger.Warn(ctx, "create http request for relay probe",
290-
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
291-
return
292-
}
293-
res, err := client.Do(req)
294-
if err != nil {
295-
mu.Lock()
296-
failed = append(failed, fmt.Sprintf("relay %s (%s): %s", peer.Hostname, peer.RelayAddress, err))
297-
mu.Unlock()
298-
return
299-
}
300-
_ = res.Body.Close()
284+
errs <- nil
301285
}(peer)
302286
}
303-
wg.Wait()
287+
288+
replicaErrs := make([]string, 0, len(peers))
289+
for i := 0; i < len(peers); i++ {
290+
err := <-errs
291+
if err != nil {
292+
replicaErrs = append(replicaErrs, err.Error())
293+
}
294+
}
304295
replicaError := ""
305-
if len(failed) > 0 {
306-
replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(failed, ", "))
296+
if len(replicaErrs) > 0 {
297+
replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", "))
307298
}
308299

309300
databaseLatency, err := m.db.Ping(ctx)
@@ -363,6 +354,32 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
363354
return nil
364355
}
365356

357+
// PingPeerReplica pings a peer replica over it's internal relay address to
358+
// ensure it's reachable and alive for health purposes.
359+
func PingPeerReplica(ctx context.Context, client http.Client, relayAddress string) error {
360+
ra, err := url.Parse(relayAddress)
361+
if err != nil {
362+
return xerrors.Errorf("parse relay address %q: %w", relayAddress, err)
363+
}
364+
target, err := ra.Parse("/derp/latency-check")
365+
if err != nil {
366+
return xerrors.Errorf("parse latency-check URL: %w", err)
367+
}
368+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
369+
if err != nil {
370+
return xerrors.Errorf("create request: %w", err)
371+
}
372+
res, err := client.Do(req)
373+
if err != nil {
374+
return xerrors.Errorf("do probe: %w", err)
375+
}
376+
_ = res.Body.Close()
377+
if res.StatusCode != http.StatusOK {
378+
return xerrors.Errorf("unexpected status code: %d", res.StatusCode)
379+
}
380+
return nil
381+
}
382+
366383
// Self represents the current replica.
367384
func (m *Manager) Self() database.Replica {
368385
m.mutex.Lock()
@@ -466,3 +483,29 @@ func (m *Manager) Close() error {
466483
}
467484
return nil
468485
}
486+
487+
// CreateDERPMeshTLSConfig creates a TLS configuration for connecting to peers
488+
// in the DERP mesh over private networking. It overrides the ServerName to be
489+
// the expected public hostname of the peer, and trusts all of the TLS server
490+
// certificates used by this replica (as we expect all replicas to use the same
491+
// TLS certificates).
492+
func CreateDERPMeshTLSConfig(hostname string, tlsCertificates []tls.Certificate) (*tls.Config, error) {
493+
meshRootCA := x509.NewCertPool()
494+
for _, certificate := range tlsCertificates {
495+
for _, certificatePart := range certificate.Certificate {
496+
parsedCert, err := x509.ParseCertificate(certificatePart)
497+
if err != nil {
498+
return nil, xerrors.Errorf("parse certificate %s: %w", parsedCert.Subject.CommonName, err)
499+
}
500+
meshRootCA.AddCert(parsedCert)
501+
}
502+
}
503+
504+
// This TLS configuration trusts the built-in TLS certificates and forces
505+
// the server name to be the public hostname.
506+
return &tls.Config{
507+
MinVersion: tls.VersionTLS12,
508+
RootCAs: meshRootCA,
509+
ServerName: hostname,
510+
}, nil
511+
}

enterprise/replicasync/replicasync_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ func (d *derpyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
286286
d.Add(1)
287287
return
288288
}
289-
w.WriteHeader(http.StatusUpgradeRequired)
289+
w.WriteHeader(http.StatusOK)
290290
}
291291

292292
func (d *derpyHandler) requireOnlyDERPPaths(t *testing.T) {

0 commit comments

Comments
 (0)