@@ -3,6 +3,7 @@ package replicasync
3
3
import (
4
4
"context"
5
5
"crypto/tls"
6
+ "crypto/x509"
6
7
"database/sql"
7
8
"errors"
8
9
"fmt"
@@ -265,45 +266,35 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
265
266
},
266
267
}
267
268
defer client .CloseIdleConnections ()
268
- var wg sync.WaitGroup
269
- var mu sync.Mutex
270
- failed := make ([]string , 0 )
271
- for _ , peer := range m .Regional () {
272
- wg .Add (1 )
269
+
270
+ peers := m .Regional ()
271
+ errs := make (chan error , len (peers ))
272
+ for _ , peer := range peers {
273
273
go func (peer database.Replica ) {
274
- defer wg .Done ()
275
- ra , err := url .Parse (peer .RelayAddress )
274
+ err := PingPeerReplica (ctx , client , peer .RelayAddress )
276
275
if err != nil {
277
- m .logger .Warn (ctx , "could not parse relay address" ,
278
- slog .F ("relay_address" , peer .RelayAddress ), slog .Error (err ))
276
+ errs <- xerrors .Errorf ("ping sibling replica %s (%s): %w" , peer .Hostname , peer .RelayAddress , err )
277
+ m .logger .Warn (ctx , "failed to ping sibling replica, this could happen if the replica has shutdown" ,
278
+ slog .F ("replica_hostname" , peer .Hostname ),
279
+ slog .F ("replica_relay_address" , peer .RelayAddress ),
280
+ slog .Error (err ),
281
+ )
279
282
return
280
283
}
281
- target , err := ra .Parse ("/derp/latency-check" )
282
- if err != nil {
283
- m .logger .Warn (ctx , "could not resolve /derp/latency-check endpoint" ,
284
- slog .F ("relay_address" , peer .RelayAddress ), slog .Error (err ))
285
- return
286
- }
287
- req , err := http .NewRequestWithContext (ctx , http .MethodGet , target .String (), nil )
288
- if err != nil {
289
- m .logger .Warn (ctx , "create http request for relay probe" ,
290
- slog .F ("relay_address" , peer .RelayAddress ), slog .Error (err ))
291
- return
292
- }
293
- res , err := client .Do (req )
294
- if err != nil {
295
- mu .Lock ()
296
- failed = append (failed , fmt .Sprintf ("relay %s (%s): %s" , peer .Hostname , peer .RelayAddress , err ))
297
- mu .Unlock ()
298
- return
299
- }
300
- _ = res .Body .Close ()
284
+ errs <- nil
301
285
}(peer )
302
286
}
303
- wg .Wait ()
287
+
288
+ replicaErrs := make ([]string , 0 , len (peers ))
289
+ for i := 0 ; i < len (peers ); i ++ {
290
+ err := <- errs
291
+ if err != nil {
292
+ replicaErrs = append (replicaErrs , err .Error ())
293
+ }
294
+ }
304
295
replicaError := ""
305
- if len (failed ) > 0 {
306
- replicaError = fmt .Sprintf ("Failed to dial peers: %s" , strings .Join (failed , ", " ))
296
+ if len (replicaErrs ) > 0 {
297
+ replicaError = fmt .Sprintf ("Failed to dial peers: %s" , strings .Join (replicaErrs , ", " ))
307
298
}
308
299
309
300
databaseLatency , err := m .db .Ping (ctx )
@@ -363,6 +354,32 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
363
354
return nil
364
355
}
365
356
357
+ // PingPeerReplica pings a peer replica over it's internal relay address to
358
+ // ensure it's reachable and alive for health purposes.
359
+ func PingPeerReplica (ctx context.Context , client http.Client , relayAddress string ) error {
360
+ ra , err := url .Parse (relayAddress )
361
+ if err != nil {
362
+ return xerrors .Errorf ("parse relay address %q: %w" , relayAddress , err )
363
+ }
364
+ target , err := ra .Parse ("/derp/latency-check" )
365
+ if err != nil {
366
+ return xerrors .Errorf ("parse latency-check URL: %w" , err )
367
+ }
368
+ req , err := http .NewRequestWithContext (ctx , http .MethodGet , target .String (), nil )
369
+ if err != nil {
370
+ return xerrors .Errorf ("create request: %w" , err )
371
+ }
372
+ res , err := client .Do (req )
373
+ if err != nil {
374
+ return xerrors .Errorf ("do probe: %w" , err )
375
+ }
376
+ _ = res .Body .Close ()
377
+ if res .StatusCode != http .StatusOK {
378
+ return xerrors .Errorf ("unexpected status code: %d" , res .StatusCode )
379
+ }
380
+ return nil
381
+ }
382
+
366
383
// Self represents the current replica.
367
384
func (m * Manager ) Self () database.Replica {
368
385
m .mutex .Lock ()
@@ -466,3 +483,29 @@ func (m *Manager) Close() error {
466
483
}
467
484
return nil
468
485
}
486
+
487
+ // CreateDERPMeshTLSConfig creates a TLS configuration for connecting to peers
488
+ // in the DERP mesh over private networking. It overrides the ServerName to be
489
+ // the expected public hostname of the peer, and trusts all of the TLS server
490
+ // certificates used by this replica (as we expect all replicas to use the same
491
+ // TLS certificates).
492
+ func CreateDERPMeshTLSConfig (hostname string , tlsCertificates []tls.Certificate ) (* tls.Config , error ) {
493
+ meshRootCA := x509 .NewCertPool ()
494
+ for _ , certificate := range tlsCertificates {
495
+ for _ , certificatePart := range certificate .Certificate {
496
+ parsedCert , err := x509 .ParseCertificate (certificatePart )
497
+ if err != nil {
498
+ return nil , xerrors .Errorf ("parse certificate %s: %w" , parsedCert .Subject .CommonName , err )
499
+ }
500
+ meshRootCA .AddCert (parsedCert )
501
+ }
502
+ }
503
+
504
+ // This TLS configuration trusts the built-in TLS certificates and forces
505
+ // the server name to be the public hostname.
506
+ return & tls.Config {
507
+ MinVersion : tls .VersionTLS12 ,
508
+ RootCAs : meshRootCA ,
509
+ ServerName : hostname ,
510
+ }, nil
511
+ }
0 commit comments