9
9
"net"
10
10
"strings"
11
11
"sync"
12
+ "time"
12
13
13
14
"github.com/hashicorp/yamux"
14
15
"github.com/pion/webrtc/v3"
@@ -20,7 +21,51 @@ import (
20
21
// Listen connects to the broker proxies connections to the local net.
21
22
// Close will end all RTC connections.
22
23
func Listen (ctx context.Context , broker string ) (io.Closer , error ) {
23
- conn , resp , err := websocket .Dial (ctx , broker , nil )
24
+ l := & listener {
25
+ broker : broker ,
26
+ connClosers : make ([]io.Closer , 0 ),
27
+ }
28
+ // We do a one-off dial outside of the loop to ensure the initial
29
+ // connection is successful. If not, there's likely an error the
30
+ // user needs to act on.
31
+ ch , err := l .dial (ctx )
32
+ if err != nil {
33
+ return nil , err
34
+ }
35
+ go func () {
36
+ for {
37
+ err := <- ch
38
+ if errors .Is (err , io .EOF ) {
39
+ // If we hit an EOF, then the connection to the broker
40
+ // was interrupted. We'll take a short break then dial
41
+ // again.
42
+ time .Sleep (time .Second )
43
+ ch , err = l .dial (ctx )
44
+ }
45
+ if err != nil {
46
+ l .acceptError = err
47
+ _ = l .Close ()
48
+ break
49
+ }
50
+ }
51
+ }()
52
+ return l , nil
53
+ }
54
+
55
+ type listener struct {
56
+ broker string
57
+
58
+ acceptError error
59
+ ws * websocket.Conn
60
+ connClosers []io.Closer
61
+ connClosersMut sync.Mutex
62
+ }
63
+
64
+ func (l * listener ) dial (ctx context.Context ) (<- chan error , error ) {
65
+ if l .ws != nil {
66
+ _ = l .ws .Close (websocket .StatusNormalClosure , "new connection inbound" )
67
+ }
68
+ conn , resp , err := websocket .Dial (ctx , l .broker , nil )
24
69
if err != nil {
25
70
if resp != nil {
26
71
return nil , & coder.HTTPError {
@@ -29,40 +74,31 @@ func Listen(ctx context.Context, broker string) (io.Closer, error) {
29
74
}
30
75
return nil , err
31
76
}
77
+ l .ws = conn
32
78
nconn := websocket .NetConn (ctx , conn , websocket .MessageBinary )
33
79
session , err := yamux .Server (nconn , nil )
34
80
if err != nil {
35
81
return nil , fmt .Errorf ("create multiplex: %w" , err )
36
82
}
37
- l := & listener {
38
- ws : conn ,
39
- connClosers : make ([]io.Closer , 0 ),
40
- }
83
+ errCh := make (chan error )
41
84
go func () {
85
+ defer close (errCh )
42
86
for {
43
87
conn , err := session .Accept ()
44
88
if err != nil {
45
- if errors .Is (err , io .EOF ) {
46
- continue
47
- }
48
- l .acceptError = err
49
- l .Close ()
50
- return
89
+ errCh <- err
90
+ break
51
91
}
52
92
go l .negotiate (conn )
53
93
}
54
94
}()
55
- return l , nil
56
- }
57
-
58
- type listener struct {
59
- acceptError error
60
- ws * websocket.Conn
61
- connClosers []io.Closer
62
- connClosersMut sync.Mutex
95
+ return errCh , nil
63
96
}
64
97
65
98
// Negotiates the handshake protocol over the connection provided.
99
+ // This functions control-flow is important to readability,
100
+ // so the cognitive overload linter has been disabled.
101
+ // nolint:gocognit
66
102
func (l * listener ) negotiate (conn net.Conn ) {
67
103
var (
68
104
err error
@@ -119,6 +155,13 @@ func (l *listener) negotiate(conn net.Conn) {
119
155
closeError (fmt .Errorf ("ICEServers must be provided" ))
120
156
return
121
157
}
158
+ for _ , server := range msg .Servers {
159
+ err = DialICE (server , nil )
160
+ if err != nil {
161
+ closeError (fmt .Errorf ("dial server %+v: %w" , server .URLs , err ))
162
+ return
163
+ }
164
+ }
122
165
rtc , err = newPeerConnection (msg .Servers )
123
166
if err != nil {
124
167
closeError (err )
0 commit comments