@@ -4,13 +4,22 @@ import (
4
4
"fmt"
5
5
"net"
6
6
"net/url"
7
+ "sync"
7
8
"time"
8
9
9
10
"github.com/pion/datachannel"
11
+ "github.com/pion/webrtc/v3"
10
12
)
11
13
12
14
const (
13
15
httpScheme = "http"
16
+
17
+ bufferedAmountLowThreshold uint64 = 512 * 1024 // 512 KB
18
+ maxBufferedAmount uint64 = 1024 * 1024 // 1 MB
19
+ // For some reason messages larger just don't work...
20
+ // This shouldn't be a huge deal for real-world usage.
21
+ // See: https://github.com/pion/datachannel/issues/59
22
+ maxMessageLength = 32 * 1024 // 32 KB
14
23
)
15
24
16
25
// TURNEndpoint returns the TURN address for a Coder baseURL.
@@ -43,19 +52,63 @@ func ConnectEndpoint(baseURL *url.URL, workspace, token string) string {
43
52
44
53
type conn struct {
45
54
addr * net.UnixAddr
55
+ dc * webrtc.DataChannel
46
56
rw datachannel.ReadWriteCloser
57
+
58
+ sendMore chan struct {}
59
+ closedMutex sync.RWMutex
60
+ closed bool
61
+
62
+ writeMutex sync.Mutex
63
+ }
64
+
65
+ func (c * conn ) init () {
66
+ c .sendMore = make (chan struct {}, 1 )
67
+ c .dc .SetBufferedAmountLowThreshold (bufferedAmountLowThreshold )
68
+ c .dc .OnBufferedAmountLow (func () {
69
+ c .closedMutex .RLock ()
70
+ defer c .closedMutex .RUnlock ()
71
+ if c .closed {
72
+ return
73
+ }
74
+ select {
75
+ case c .sendMore <- struct {}{}:
76
+ default :
77
+ }
78
+ })
47
79
}
48
80
49
81
func (c * conn ) Read (b []byte ) (n int , err error ) {
50
82
return c .rw .Read (b )
51
83
}
52
84
53
85
func (c * conn ) Write (b []byte ) (n int , err error ) {
86
+ c .writeMutex .Lock ()
87
+ defer c .writeMutex .Unlock ()
88
+ if len (b ) > maxMessageLength {
89
+ return 0 , fmt .Errorf ("outbound packet larger than maximum message size: %d" , maxMessageLength )
90
+ }
91
+ if c .dc .BufferedAmount ()+ uint64 (len (b )) >= maxBufferedAmount {
92
+ <- c .sendMore
93
+ }
94
+ // TODO (@kyle): There's an obvious race-condition here.
95
+ // This is an edge-case, as most-frequently data won't
96
+ // be pooled so synchronously, but is definitely possible.
97
+ //
98
+ // See: https://github.com/pion/sctp/issues/181
99
+ time .Sleep (time .Microsecond )
100
+
54
101
return c .rw .Write (b )
55
102
}
56
103
57
104
func (c * conn ) Close () error {
58
- return c .rw .Close ()
105
+ c .closedMutex .Lock ()
106
+ defer c .closedMutex .Unlock ()
107
+ if ! c .closed {
108
+ c .closed = true
109
+ close (c .sendMore )
110
+ }
111
+ return c .dc .Close ()
59
112
}
60
113
61
114
func (c * conn ) LocalAddr () net.Addr {
0 commit comments