@@ -2,14 +2,19 @@ package tailnet_test
2
2
3
3
import (
4
4
"context"
5
+ "io"
5
6
"net/netip"
7
+ "os"
8
+ "path/filepath"
9
+ "strings"
6
10
"testing"
7
11
8
12
"github.com/stretchr/testify/assert"
9
13
"github.com/stretchr/testify/require"
10
14
"go.uber.org/goleak"
11
15
12
16
"cdr.dev/slog"
17
+ "cdr.dev/slog/sloggers/sloghuman"
13
18
"cdr.dev/slog/sloggers/slogtest"
14
19
"github.com/coder/coder/tailnet"
15
20
"github.com/coder/coder/tailnet/tailnettest"
@@ -195,3 +200,103 @@ func TestConn_PreferredDERP(t *testing.T) {
195
200
t .Fatal ("timed out waiting for node" )
196
201
}
197
202
}
203
+
204
+ func TestTransmitHang (t * testing.T ) {
205
+ t .Parallel ()
206
+
207
+ // Not using t.TempDir() here so that we keep logs afterwards.
208
+ captureDir , err := os .MkdirTemp ("" , "tailnet-test-" )
209
+ require .NoError (t , err )
210
+
211
+ testLog , err := os .Create (filepath .Join (captureDir , "test.log" ))
212
+ require .NoError (t , err )
213
+ defer testLog .Close ()
214
+ recvCapture , err := os .Create (filepath .Join (captureDir , "recv.pcap" ))
215
+ require .NoError (t , err )
216
+ defer recvCapture .Close ()
217
+ sendCapture , err := os .Create (filepath .Join (captureDir , "send.pcap" ))
218
+ require .NoError (t , err )
219
+ defer sendCapture .Close ()
220
+
221
+ logger := slogtest .Make (t , nil ).
222
+ Leveled (slog .LevelDebug ).
223
+ AppendSinks (sloghuman .Sink (testLog ))
224
+
225
+ t .Logf ("test log file: %v" , testLog .Name ())
226
+ t .Logf ("recv capture file: %v" , recvCapture .Name ())
227
+ t .Logf ("send capture file: %v" , sendCapture .Name ())
228
+
229
+ derpMap := tailnettest .RunDERPAndSTUN (t )
230
+ updateNodes := func (c * tailnet.Conn ) func (* tailnet.Node ) {
231
+ return func (node * tailnet.Node ) {
232
+ err := c .UpdateNodes ([]* tailnet.Node {node }, false )
233
+ assert .NoError (t , err )
234
+ }
235
+ }
236
+
237
+ recvIP := tailnet .IP ()
238
+ recv , err := tailnet .NewConn (& tailnet.Options {
239
+ Addresses : []netip.Prefix {netip .PrefixFrom (recvIP , 128 )},
240
+ Logger : logger .Named ("recv" ),
241
+ DERPMap : derpMap ,
242
+ })
243
+ require .NoError (t , err )
244
+ defer recv .Close ()
245
+ recvCaptureStop := recv .Capture (recvCapture )
246
+ defer recvCaptureStop ()
247
+
248
+ send , err := tailnet .NewConn (& tailnet.Options {
249
+ Addresses : []netip.Prefix {netip .PrefixFrom (tailnet .IP (), 128 )},
250
+ Logger : logger .Named ("send" ),
251
+ DERPMap : derpMap ,
252
+ })
253
+ require .NoError (t , err )
254
+ defer send .Close ()
255
+ sendCaptureStop := send .Capture (sendCapture )
256
+ defer sendCaptureStop ()
257
+
258
+ recv .SetNodeCallback (updateNodes (send ))
259
+ send .SetNodeCallback (updateNodes (recv ))
260
+
261
+ ctx , cancel := context .WithTimeout (context .Background (), testutil .WaitLong )
262
+ defer cancel ()
263
+
264
+ require .True (t , send .AwaitReachable (ctx , recvIP ))
265
+
266
+ copyDone := make (chan struct {})
267
+ go func () {
268
+ defer close (copyDone )
269
+
270
+ ln , err := recv .Listen ("tcp" , ":35565" )
271
+ if ! assert .NoError (t , err ) {
272
+ return
273
+ }
274
+ defer ln .Close ()
275
+
276
+ r , err := ln .Accept ()
277
+ if ! assert .NoError (t , err ) {
278
+ return
279
+ }
280
+ defer r .Close ()
281
+
282
+ _ , err = io .Copy (io .Discard , r )
283
+ assert .NoError (t , err )
284
+ }()
285
+
286
+ w , err := send .DialContextTCP (ctx , netip .AddrPortFrom (recvIP , 35565 ))
287
+ require .NoError (t , err )
288
+
289
+ payload := []byte (strings .Repeat ("hello world\n " , 65536 / 12 ))
290
+ size := 0
291
+ for i := 0 ; i < 1024 * 2 ; i ++ {
292
+ logger .Debug (ctx , "write payload" , slog .F ("num" , i ), slog .F ("transmitted_kb" , size / 1024 ))
293
+ n , err := w .Write (payload )
294
+ require .NoError (t , err )
295
+ size += n
296
+ }
297
+
298
+ err = w .Close ()
299
+ require .NoError (t , err )
300
+
301
+ <- copyDone
302
+ }
0 commit comments