Skip to content

Commit 434af15

Browse files
committed
derp: support client->server ping (and server->client pong)
In prep for a future change to have client ping derp connections when their state is questionable, rather than aggressively tearing them down and doing a heavy reconnect when their state is unknown. We already support ping/pong in the other direction (servers probing clients) so we already had the two frame types, but I'd never finished this direction. Updates tailscale#3619 Change-Id: I024b815d9db1bc57c20f82f80f95fb55fc9e2fcc Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
1 parent bc537ad commit 434af15

File tree

4 files changed

+136
-2
lines changed

4 files changed

+136
-2
lines changed

derp/derp_client.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,18 @@ func (c *Client) ForwardPacket(srcKey, dstKey key.NodePublic, pkt []byte) (err e
261261

262262
func (c *Client) writeTimeoutFired() { c.nc.Close() }
263263

264+
func (c *Client) SendPing(data [8]byte) error {
265+
return c.sendPingOrPong(framePing, data)
266+
}
267+
264268
func (c *Client) SendPong(data [8]byte) error {
269+
return c.sendPingOrPong(framePong, data)
270+
}
271+
272+
func (c *Client) sendPingOrPong(typ frameType, data [8]byte) error {
265273
c.wmu.Lock()
266274
defer c.wmu.Unlock()
267-
if err := writeFrameHeader(c.bw, framePong, 8); err != nil {
275+
if err := writeFrameHeader(c.bw, typ, 8); err != nil {
268276
return err
269277
}
270278
if _, err := c.bw.Write(data[:]); err != nil {
@@ -375,6 +383,12 @@ type PingMessage [8]byte
375383

376384
func (PingMessage) msg() {}
377385

386+
// PongMessage is a reply to a PingMessage from a client or server
387+
// with the payload sent previously in a PingMessage.
388+
type PongMessage [8]byte
389+
390+
func (PongMessage) msg() {}
391+
378392
// KeepAliveMessage is a one-way empty message from server to client, just to
379393
// keep the connection alive. It's like a PingMessage, but doesn't solicit
380394
// a reply from the client.
@@ -536,6 +550,15 @@ func (c *Client) recvTimeout(timeout time.Duration) (m ReceivedMessage, err erro
536550
copy(pm[:], b[:])
537551
return pm, nil
538552

553+
case framePong:
554+
var pm PongMessage
555+
if n < 8 {
556+
c.logf("[unexpected] dropping short ping frame")
557+
continue
558+
}
559+
copy(pm[:], b[:])
560+
return pm, nil
561+
539562
case frameHealth:
540563
return HealthMessage{Problem: string(b[:])}, nil
541564

derp/derp_server.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
662662
connectedAt: time.Now(),
663663
sendQueue: make(chan pkt, perClientSendQueueDepth),
664664
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
665+
sendPongCh: make(chan [8]byte, 1),
665666
peerGone: make(chan key.NodePublic),
666667
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
667668
}
@@ -729,6 +730,8 @@ func (c *sclient) run(ctx context.Context) error {
729730
err = c.handleFrameWatchConns(ft, fl)
730731
case frameClosePeer:
731732
err = c.handleFrameClosePeer(ft, fl)
733+
case framePing:
734+
err = c.handleFramePing(ft, fl)
732735
default:
733736
err = c.handleUnknownFrame(ft, fl)
734737
}
@@ -766,6 +769,32 @@ func (c *sclient) handleFrameWatchConns(ft frameType, fl uint32) error {
766769
return nil
767770
}
768771

772+
func (c *sclient) handleFramePing(ft frameType, fl uint32) error {
773+
var m PingMessage
774+
if fl < uint32(len(m)) {
775+
return fmt.Errorf("short ping: %v", fl)
776+
}
777+
if fl > 1000 {
778+
// unreasonably extra large. We leave some extra
779+
// space for future extensibility, but not too much.
780+
return fmt.Errorf("ping body too large: %v", fl)
781+
}
782+
_, err := io.ReadFull(c.br, m[:])
783+
if err != nil {
784+
return err
785+
}
786+
if extra := int64(fl) - int64(len(m)); extra > 0 {
787+
_, err = io.CopyN(ioutil.Discard, c.br, extra)
788+
}
789+
select {
790+
case c.sendPongCh <- [8]byte(m):
791+
default:
792+
// They're pinging too fast. Ignore.
793+
// TODO(bradfitz): add a rate limiter too.
794+
}
795+
return err
796+
}
797+
769798
func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error {
770799
if fl != keyLen {
771800
return fmt.Errorf("handleFrameClosePeer wrong size")
@@ -1202,6 +1231,7 @@ type sclient struct {
12021231
remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port.
12031232
sendQueue chan pkt // packets queued to this client; never closed
12041233
discoSendQueue chan pkt // important packets queued to this client; never closed
1234+
sendPongCh chan [8]byte // pong replies to send to the client; never closed
12051235
peerGone chan key.NodePublic // write request that a previous sender has disconnected (not used by mesh peers)
12061236
meshUpdate chan struct{} // write request to write peerStateChange
12071237
canMesh bool // clientInfo had correct mesh token for inter-region routing
@@ -1342,6 +1372,9 @@ func (c *sclient) sendLoop(ctx context.Context) error {
13421372
werr = c.sendPacket(msg.src, msg.bs)
13431373
c.recordQueueTime(msg.enqueuedAt)
13441374
continue
1375+
case msg := <-c.sendPongCh:
1376+
werr = c.sendPong(msg)
1377+
continue
13451378
case <-keepAliveTick.C:
13461379
werr = c.sendKeepAlive()
13471380
continue
@@ -1368,6 +1401,9 @@ func (c *sclient) sendLoop(ctx context.Context) error {
13681401
case msg := <-c.discoSendQueue:
13691402
werr = c.sendPacket(msg.src, msg.bs)
13701403
c.recordQueueTime(msg.enqueuedAt)
1404+
case msg := <-c.sendPongCh:
1405+
werr = c.sendPong(msg)
1406+
continue
13711407
case <-keepAliveTick.C:
13721408
werr = c.sendKeepAlive()
13731409
}
@@ -1384,6 +1420,16 @@ func (c *sclient) sendKeepAlive() error {
13841420
return writeFrameHeader(c.bw.bw(), frameKeepAlive, 0)
13851421
}
13861422

1423+
// sendPong sends a pong reply, without flushing.
1424+
func (c *sclient) sendPong(data [8]byte) error {
1425+
c.setWriteDeadline()
1426+
if err := writeFrameHeader(c.bw.bw(), framePong, uint32(len(data))); err != nil {
1427+
return err
1428+
}
1429+
_, err := c.bw.Write(data[:])
1430+
return err
1431+
}
1432+
13871433
// sendPeerGone sends a peerGone frame, without flushing.
13881434
func (c *sclient) sendPeerGone(peer key.NodePublic) error {
13891435
c.s.peerGoneFrames.Add(1)

derp/derp_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,14 @@ func TestClientRecv(t *testing.T) {
812812
},
813813
want: PingMessage{1, 2, 3, 4, 5, 6, 7, 8},
814814
},
815+
{
816+
name: "pong",
817+
input: []byte{
818+
byte(framePong), 0, 0, 0, 8,
819+
1, 2, 3, 4, 5, 6, 7, 8,
820+
},
821+
want: PongMessage{1, 2, 3, 4, 5, 6, 7, 8},
822+
},
815823
{
816824
name: "health_bad",
817825
input: []byte{
@@ -858,6 +866,23 @@ func TestClientRecv(t *testing.T) {
858866
}
859867
}
860868

869+
func TestClientSendPing(t *testing.T) {
870+
var buf bytes.Buffer
871+
c := &Client{
872+
bw: bufio.NewWriter(&buf),
873+
}
874+
if err := c.SendPing([8]byte{1, 2, 3, 4, 5, 6, 7, 8}); err != nil {
875+
t.Fatal(err)
876+
}
877+
want := []byte{
878+
byte(framePing), 0, 0, 0, 8,
879+
1, 2, 3, 4, 5, 6, 7, 8,
880+
}
881+
if !bytes.Equal(buf.Bytes(), want) {
882+
t.Errorf("unexpected output\nwrote: % 02x\n want: % 02x", buf.Bytes(), want)
883+
}
884+
}
885+
861886
func TestClientSendPong(t *testing.T) {
862887
var buf bytes.Buffer
863888
c := &Client{
@@ -873,7 +898,6 @@ func TestClientSendPong(t *testing.T) {
873898
if !bytes.Equal(buf.Bytes(), want) {
874899
t.Errorf("unexpected output\nwrote: % 02x\n want: % 02x", buf.Bytes(), want)
875900
}
876-
877901
}
878902

879903
func TestServerDupClients(t *testing.T) {
@@ -1316,3 +1340,30 @@ func TestClientSendRateLimiting(t *testing.T) {
13161340
t.Errorf("limited conn's bytes count = %v; want >=%v, <%v", bytesLimited, bytes1K*2, bytes1K)
13171341
}
13181342
}
1343+
1344+
func TestServerRepliesToPing(t *testing.T) {
1345+
ts := newTestServer(t)
1346+
defer ts.close(t)
1347+
1348+
tc := newRegularClient(t, ts, "alice")
1349+
1350+
data := [8]byte{1, 2, 3, 4, 5, 6, 7, 42}
1351+
1352+
if err := tc.c.SendPing(data); err != nil {
1353+
t.Fatal(err)
1354+
}
1355+
1356+
for {
1357+
m, err := tc.c.recvTimeout(time.Second)
1358+
if err != nil {
1359+
t.Fatal(err)
1360+
}
1361+
switch m := m.(type) {
1362+
case PongMessage:
1363+
if ([8]byte(m)) != data {
1364+
t.Fatalf("got pong %2x; want %2x", [8]byte(m), data)
1365+
}
1366+
return
1367+
}
1368+
}
1369+
}

derp/derphttp/derphttp_client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,20 @@ func (c *Client) Send(dstKey key.NodePublic, b []byte) error {
698698
return err
699699
}
700700

701+
// SendPing sends a ping message, without any implicit connect or reconnect.
702+
func (c *Client) SendPing(data [8]byte) error {
703+
c.mu.Lock()
704+
closed, client := c.closed, c.client
705+
c.mu.Unlock()
706+
if closed {
707+
return ErrClientClosed
708+
}
709+
if client == nil {
710+
return errors.New("client not connected")
711+
}
712+
return client.SendPing(data)
713+
}
714+
701715
func (c *Client) ForwardPacket(from, to key.NodePublic, b []byte) error {
702716
client, _, err := c.connect(context.TODO(), "derphttp.Client.ForwardPacket")
703717
if err != nil {

0 commit comments

Comments
 (0)