Skip to content

feat: add derp mesh health checking in workspace proxies #12222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions enterprise/cli/proxyserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (r *RootCmd) proxyServer() *clibase.Cmd {
closers.Add(closeFunc)
}

proxy, err := wsproxy.New(ctx, &wsproxy.Options{
options := &wsproxy.Options{
Logger: logger,
Experiments: coderd.ReadExperiments(logger, cfg.Experiments.Value()),
HTTPClient: httpClient,
Expand All @@ -263,7 +263,12 @@ func (r *RootCmd) proxyServer() *clibase.Cmd {
DERPEnabled: cfg.DERP.Server.Enable.Value(),
DERPOnly: derpOnly.Value(),
DERPServerRelayAddress: cfg.DERP.Server.RelayURL.String(),
})
}
if httpServers.TLSConfig != nil {
options.TLSCertificates = httpServers.TLSConfig.Certificates
}

proxy, err := wsproxy.New(ctx, options)
if err != nil {
return xerrors.Errorf("create workspace proxy: %w", err)
}
Expand Down
26 changes: 3 additions & 23 deletions enterprise/coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package coderd
import (
"context"
"crypto/ed25519"
"crypto/tls"
"crypto/x509"
"fmt"
"math"
"net/http"
Expand Down Expand Up @@ -416,27 +414,9 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
})
}

meshRootCA := x509.NewCertPool()
for _, certificate := range options.TLSCertificates {
for _, certificatePart := range certificate.Certificate {
certificate, err := x509.ParseCertificate(certificatePart)
if err != nil {
return nil, xerrors.Errorf("parse certificate %s: %w", certificate.Subject.CommonName, err)
}
meshRootCA.AddCert(certificate)
}
}
// This TLS configuration spoofs access from the access URL hostname
// assuming that the certificates provided will cover that hostname.
//
// Replica sync and DERP meshing require accessing replicas via their
// internal IP addresses, and if TLS is configured we use the same
// certificates.
meshTLSConfig := &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: options.TLSCertificates,
RootCAs: meshRootCA,
ServerName: options.AccessURL.Hostname(),
meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(options.AccessURL.Hostname(), options.TLSCertificates)
if err != nil {
return nil, xerrors.Errorf("create DERP mesh TLS config: %w", err)
}
api.replicaManager, err = replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{
ID: api.AGPL.ID,
Expand Down
4 changes: 4 additions & 0 deletions enterprise/coderd/coderdenttest/proxytest.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type ProxyOptions struct {
// region.
Token string

// ReplicaPingCallback is optional.
ReplicaPingCallback func(replicas []codersdk.Replica, err string)

// FlushStats is optional
FlushStats chan chan<- struct{}
}
Expand Down Expand Up @@ -157,6 +160,7 @@ func NewWorkspaceProxyReplica(t *testing.T, coderdAPI *coderd.API, owner *coders
DERPEnabled: !options.DerpDisabled,
DERPOnly: options.DerpOnly,
DERPServerRelayAddress: serverURL.String(),
ReplicaErrCallback: options.ReplicaPingCallback,
StatsCollectorOptions: statsCollectorOptions,
})
require.NoError(t, err)
Expand Down
107 changes: 75 additions & 32 deletions enterprise/replicasync/replicasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package replicasync
import (
"context"
"crypto/tls"
"crypto/x509"
"database/sql"
"errors"
"fmt"
Expand Down Expand Up @@ -265,45 +266,35 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
},
}
defer client.CloseIdleConnections()
var wg sync.WaitGroup
var mu sync.Mutex
failed := make([]string, 0)
for _, peer := range m.Regional() {
wg.Add(1)

peers := m.Regional()
errs := make(chan error, len(peers))
for _, peer := range peers {
go func(peer database.Replica) {
defer wg.Done()
ra, err := url.Parse(peer.RelayAddress)
err := PingPeerReplica(ctx, client, peer.RelayAddress)
if err != nil {
m.logger.Warn(ctx, "could not parse relay address",
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err)
m.logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown",
slog.F("replica_hostname", peer.Hostname),
slog.F("replica_relay_address", peer.RelayAddress),
slog.Error(err),
)
return
}
target, err := ra.Parse("/derp/latency-check")
if err != nil {
m.logger.Warn(ctx, "could not resolve /derp/latency-check endpoint",
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
return
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
if err != nil {
m.logger.Warn(ctx, "create http request for relay probe",
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
return
}
res, err := client.Do(req)
if err != nil {
mu.Lock()
failed = append(failed, fmt.Sprintf("relay %s (%s): %s", peer.Hostname, peer.RelayAddress, err))
mu.Unlock()
return
}
_ = res.Body.Close()
errs <- nil
}(peer)
}
wg.Wait()

replicaErrs := make([]string, 0, len(peers))
for i := 0; i < len(peers); i++ {
err := <-errs
if err != nil {
replicaErrs = append(replicaErrs, err.Error())
}
}
replicaError := ""
if len(failed) > 0 {
replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(failed, ", "))
if len(replicaErrs) > 0 {
replicaError = fmt.Sprintf("Failed to dial peers: %s", strings.Join(replicaErrs, ", "))
}

databaseLatency, err := m.db.Ping(ctx)
Expand Down Expand Up @@ -363,6 +354,32 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
return nil
}

// PingPeerReplica pings a peer replica over it's internal relay address to
// ensure it's reachable and alive for health purposes.
func PingPeerReplica(ctx context.Context, client http.Client, relayAddress string) error {
ra, err := url.Parse(relayAddress)
if err != nil {
return xerrors.Errorf("parse relay address %q: %w", relayAddress, err)
}
target, err := ra.Parse("/derp/latency-check")
if err != nil {
return xerrors.Errorf("parse latency-check URL: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
if err != nil {
return xerrors.Errorf("create request: %w", err)
}
res, err := client.Do(req)
if err != nil {
return xerrors.Errorf("do probe: %w", err)
}
_ = res.Body.Close()
if res.StatusCode != http.StatusOK {
return xerrors.Errorf("unexpected status code: %d", res.StatusCode)
}
return nil
}

// Self represents the current replica.
func (m *Manager) Self() database.Replica {
m.mutex.Lock()
Expand Down Expand Up @@ -466,3 +483,29 @@ func (m *Manager) Close() error {
}
return nil
}

// CreateDERPMeshTLSConfig creates a TLS configuration for connecting to peers
// in the DERP mesh over private networking. It overrides the ServerName to be
// the expected public hostname of the peer, and trusts all of the TLS server
// certificates used by this replica (as we expect all replicas to use the same
// TLS certificates).
func CreateDERPMeshTLSConfig(hostname string, tlsCertificates []tls.Certificate) (*tls.Config, error) {
meshRootCA := x509.NewCertPool()
for _, certificate := range tlsCertificates {
for _, certificatePart := range certificate.Certificate {
parsedCert, err := x509.ParseCertificate(certificatePart)
if err != nil {
return nil, xerrors.Errorf("parse certificate %s: %w", parsedCert.Subject.CommonName, err)
}
meshRootCA.AddCert(parsedCert)
}
}

// This TLS configuration trusts the built-in TLS certificates and forces
// the server name to be the public hostname.
return &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: meshRootCA,
ServerName: hostname,
}, nil
}
2 changes: 1 addition & 1 deletion enterprise/replicasync/replicasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (d *derpyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
d.Add(1)
return
}
w.WriteHeader(http.StatusUpgradeRequired)
w.WriteHeader(http.StatusOK)
}

func (d *derpyHandler) requireOnlyDERPPaths(t *testing.T) {
Expand Down
Loading