Skip to content

test(tailnet): Add TestTransmitHang test with packet capture and logging #7414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor test-cases to run until the issue is triggered
  • Loading branch information
mafredri committed May 25, 2023
commit 57025f5318c2a68b66a7e536fc014737da7056ee
85 changes: 81 additions & 4 deletions tailnet/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package tailnet_test

import (
"context"
"fmt"
"io"
"net/netip"
"os"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"tailscale.com/envknob"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
Expand Down Expand Up @@ -202,13 +206,66 @@ func TestConn_PreferredDERP(t *testing.T) {
}
}

func TestTransmitHang(t *testing.T) {
t.Parallel()
//nolint:paralleltest
func TestTransmitHang_TCPSACK_Disabled(t *testing.T) {
runTestTransmitHangMain(t)
}

//nolint:paralleltest
func TestTransmitHang_TCPSACK_Enabled(t *testing.T) {
t.Setenv("TS_DEBUG_NETSTACK_ENABLE_TCPSACK", "true") // Restore after test.
envknob.Setenv("TS_DEBUG_NETSTACK_ENABLE_TCPSACK", "true")
runTestTransmitHangMain(t)
}

func runTestTransmitHangMain(t *testing.T) {
curMaxProcs := runtime.GOMAXPROCS(0)
t.Cleanup(func() {
runtime.GOMAXPROCS(curMaxProcs)
})
for _, env := range []string{"TS_DEBUG_DISCO", "TS_DEBUG_DERP", "TS_DEBUG_NETSTACK"} {
t.Setenv(env, "true") // Restore after test.
envknob.Setenv(env, "true")
}

var failed atomic.Bool
for !failed.Load() {
for _, maxProcs := range []int{2, 3, curMaxProcs} {
if failed.Load() {
return
}
t.Run(fmt.Sprintf("GOMAXPROCS=%d", maxProcs), func(t *testing.T) {
t.Logf("GOMAXPROCS=%d", maxProcs)
runtime.GOMAXPROCS(maxProcs)

for i := 0; i < 5; i++ {
t.Run(fmt.Sprintf("n=%d", i), func(t *testing.T) {
t.Parallel()
t.Cleanup(func() {
if t.Failed() {
failed.Store(true)
}
})
runTestTransmitHang(t, 20*time.Second)
})
}
})
}
}
}

func runTestTransmitHang(t *testing.T, timeout time.Duration) {
// Not using t.TempDir() here so that we keep logs afterwards.
captureDir, err := os.MkdirTemp("", "tailnet-test-")
require.NoError(t, err)

t.Cleanup(func() {
// Only keep failed runs.
if !t.Failed() {
_ = os.RemoveAll(captureDir)
}
})

testLog, err := os.Create(filepath.Join(captureDir, "test.log"))
require.NoError(t, err)
defer testLog.Close()
Expand All @@ -227,6 +284,8 @@ func TestTransmitHang(t *testing.T) {
t.Logf("recv capture file: %v", recvCapture.Name())
t.Logf("send capture file: %v", sendCapture.Name())

logger.Info(context.Background(), "starting test", slog.F("GOMAXPROCS", runtime.GOMAXPROCS(0)), slog.F("timeout", timeout))

derpMap := tailnettest.RunDERPAndSTUN(t)
updateNodes := func(c *tailnet.Conn) func(*tailnet.Node) {
return func(node *tailnet.Node) {
Expand Down Expand Up @@ -262,7 +321,9 @@ func TestTransmitHang(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

logger.Info(ctx, "waiting for receiver to be reachable (by sender)")
require.True(t, send.AwaitReachable(ctx, recvIP))
logger.Info(ctx, "wait complete")

copyDone := make(chan struct{})
go func() {
Expand All @@ -284,17 +345,33 @@ func TestTransmitHang(t *testing.T) {
assert.NoError(t, err)
}()

logger.Info(ctx, "dialing receiver")
w, err := send.DialContextTCP(ctx, netip.AddrPortFrom(recvIP, 35565))
require.NoError(t, err)
defer w.Close()
logger.Info(ctx, "dial complete")

now := time.Now()

payload := []byte(strings.Repeat("hello world\n", 65536/12))
size := 0
retries := 0
writeTimeout := 2 * time.Second
for i := 0; i < 1024*2; i++ {
logger.Debug(ctx, "write payload", slog.F("num", i), slog.F("transmitted_kb", size/1024))
Retry:
w.SetWriteDeadline(time.Now().Add(writeTimeout))
n, err := w.Write(payload)
require.NoError(t, err)
if err != nil {
if time.Duration(retries)*writeTimeout < timeout {
t.Errorf("write failed: %v", err)
t.Logf("retrying (%v)", retries)
retries++
goto Retry
} else {
require.NoError(t, err)
}
}
size += n
}

Expand All @@ -303,7 +380,7 @@ func TestTransmitHang(t *testing.T) {

<-copyDone

if time.Since(now) > 10*time.Second {
if time.Since(now) > timeout {
t.Fatal("took too long to transmit")
}
}