@@ -13,6 +13,7 @@ package derphttp
13
13
import (
14
14
"bufio"
15
15
"context"
16
+ "crypto/rand"
16
17
"crypto/tls"
17
18
"crypto/x509"
18
19
"errors"
@@ -72,6 +73,7 @@ type Client struct {
72
73
client * derp.Client
73
74
connGen int // incremented once per new connection; valid values are >0
74
75
serverPubKey key.NodePublic
76
+ pingOut map [derp.PingMessage ]chan <- bool // chan to send to on pong
75
77
}
76
78
77
79
// NewRegionClient returns a new DERP-over-HTTP client. It connects lazily.
@@ -698,7 +700,67 @@ func (c *Client) Send(dstKey key.NodePublic, b []byte) error {
698
700
return err
699
701
}
700
702
701
- // SendPing sends a ping message, without any implicit connect or reconnect.
703
+ func (c * Client ) registerPing (m derp.PingMessage , ch chan <- bool ) {
704
+ c .mu .Lock ()
705
+ defer c .mu .Unlock ()
706
+ if c .pingOut == nil {
707
+ c .pingOut = map [derp.PingMessage ]chan <- bool {}
708
+ }
709
+ c .pingOut [m ] = ch
710
+ }
711
+
712
+ func (c * Client ) unregisterPing (m derp.PingMessage ) {
713
+ c .mu .Lock ()
714
+ defer c .mu .Unlock ()
715
+ delete (c .pingOut , m )
716
+ }
717
+
718
+ func (c * Client ) handledPong (m derp.PongMessage ) bool {
719
+ c .mu .Lock ()
720
+ defer c .mu .Unlock ()
721
+ k := derp .PingMessage (m )
722
+ if ch , ok := c .pingOut [k ]; ok {
723
+ ch <- true
724
+ delete (c .pingOut , k )
725
+ return true
726
+ }
727
+ return false
728
+ }
729
+
730
+ // Ping sends a ping to the peer and waits for it either to be
731
+ // acknowledged (in which case Ping returns nil) or waits for ctx to
732
+ // be over and returns an error. It will wait at most 5 seconds
733
+ // before returning an error.
734
+ //
735
+ // Another goroutine must be in a loop calling Recv or
736
+ // RecvDetail or ping responses won't be handled.
737
+ func (c * Client ) Ping (ctx context.Context ) error {
738
+ maxDL := time .Now ().Add (5 * time .Second )
739
+ if dl , ok := ctx .Deadline (); ! ok || dl .After (maxDL ) {
740
+ var cancel context.CancelFunc
741
+ ctx , cancel = context .WithDeadline (ctx , maxDL )
742
+ defer cancel ()
743
+ }
744
+ var data derp.PingMessage
745
+ rand .Read (data [:])
746
+ gotPing := make (chan bool , 1 )
747
+ c .registerPing (data , gotPing )
748
+ defer c .unregisterPing (data )
749
+ if err := c .SendPing (data ); err != nil {
750
+ return err
751
+ }
752
+ select {
753
+ case <- gotPing :
754
+ return nil
755
+ case <- ctx .Done ():
756
+ return ctx .Err ()
757
+ }
758
+ }
759
+
760
+ // SendPing writes a ping message, without any implicit connect or
761
+ // reconnect. This is a lower-level interface that writes a frame
762
+ // without any implicit handling of the response pong, if any. For a
763
+ // higher-level interface, use Ping.
702
764
func (c * Client ) SendPing (data [8 ]byte ) error {
703
765
c .mu .Lock ()
704
766
closed , client := c .closed , c .client
@@ -819,14 +881,22 @@ func (c *Client) RecvDetail() (m derp.ReceivedMessage, connGen int, err error) {
819
881
if err != nil {
820
882
return nil , 0 , err
821
883
}
822
- m , err = client .Recv ()
823
- if err != nil {
824
- c .closeForReconnect (client )
825
- if c .isClosed () {
826
- err = ErrClientClosed
884
+ for {
885
+ m , err = client .Recv ()
886
+ switch m := m .(type ) {
887
+ case derp.PongMessage :
888
+ if c .handledPong (m ) {
889
+ continue
890
+ }
891
+ }
892
+ if err != nil {
893
+ c .closeForReconnect (client )
894
+ if c .isClosed () {
895
+ err = ErrClientClosed
896
+ }
827
897
}
898
+ return m , connGen , err
828
899
}
829
- return m , connGen , err
830
900
}
831
901
832
902
func (c * Client ) isClosed () bool {
0 commit comments