From 7051340bcaf0ef9b1729a39773783971d1ca47c9 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 11 Jul 2022 12:06:42 +0300 Subject: [PATCH 1/5] Fix use of unprotected `stream.sendHdr` in session (#1) We now create a new header slice instead of taking a `sendLock` as this is a rare occurrence. --- session.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/session.go b/session.go index 046a3d3..33442da 100644 --- a/session.go +++ b/session.go @@ -639,8 +639,9 @@ func (s *Session) incomingStream(id uint32) error { // Backlog exceeded! RST the stream s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") delete(s.streams, id) - stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0) - return s.sendNoWait(stream.sendHdr) + hdr := header(make([]byte, headerSize)) + hdr.encode(typeWindowUpdate, flagRST, id, 0) + return s.sendNoWait(hdr) } } From 12254a01acb0514018d36d2becd6fde04cc885be Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 11 Jul 2022 12:15:06 +0300 Subject: [PATCH 2/5] Fix unsafe header re-use on close and timeout (#2) This commit takes a minimal approach to fixing unsafe header re-use during close and timeout by reallocating the header in those cases. Partially fixes: https://github.com/coder/coder/issues/2429 Related: https://github.com/hashicorp/yamux/issues/40 --- stream.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index f444bdc..a62fea2 100644 --- a/stream.go +++ b/stream.go @@ -2,6 +2,7 @@ package yamux import ( "bytes" + "errors" "io" "sync" "sync/atomic" @@ -200,6 +201,10 @@ START: // Send the header s.sendHdr.encode(typeData, flags, s.id, max) if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { + if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) { + // Message left in ready queue, header re-use is unsafe. + s.sendHdr = header(make([]byte, headerSize)) + } return 0, err } @@ -273,6 +278,10 @@ func (s *Stream) sendWindowUpdate() error { // Send the header s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { + if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) { + // Message left in ready queue, header re-use is unsafe. + s.controlHdr = header(make([]byte, headerSize)) + } return err } return nil @@ -287,6 +296,10 @@ func (s *Stream) sendClose() error { flags |= flagFIN s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0) if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { + if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) { + // Message left in ready queue, header re-use is unsafe. + s.controlHdr = header(make([]byte, headerSize)) + } return err } return nil @@ -362,8 +375,9 @@ func (s *Stream) closeTimeout() { // Send a RST so the remote side closes too. s.sendLock.Lock() defer s.sendLock.Unlock() - s.sendHdr.encode(typeWindowUpdate, flagRST, s.id, 0) - s.session.sendNoWait(s.sendHdr) + hdr := header(make([]byte, headerSize)) + hdr.encode(typeWindowUpdate, flagRST, s.id, 0) + s.session.sendNoWait(hdr) } // forceClose is used for when the session is exiting From 3a58105df144b7415732752931948f16862e5cfd Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 11 Jul 2022 12:15:56 +0300 Subject: [PATCH 3/5] Fix mutex deadlock in `(*Steam).readData` (#3) --- stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/stream.go b/stream.go index a62fea2..d197d28 100644 --- a/stream.go +++ b/stream.go @@ -479,6 +479,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { if length > s.recvWindow { s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length) + s.recvLock.Unlock() return ErrRecvWindowExceeded } From ba57465c315e13323b0895bdee2ef149b0aadf05 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 11 Jul 2022 12:16:26 +0300 Subject: [PATCH 4/5] Protect reads of `sendReady.Body` with mutex and temp buffer (#4) This commit fixes an edge case where the `body` passed to `waitForSendErr` can be written to after returning from the function. This happens when `sendReady` is buffered on the `sendCh` and the session is shutdown or the write times out. When this condition happens and `waitForSendErr` has not yet exited, the `body` is safely copied into a temporary buffer in `send`. Otherwise `waitForSendErr` safely created a copy of the `body` and exits, this essentially results in double buffering for the edge case which seems acceptable. --- session.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/session.go b/session.go index 33442da..7cb18b4 100644 --- a/session.go +++ b/session.go @@ -2,6 +2,7 @@ package yamux import ( "bufio" + "bytes" "fmt" "io" "io/ioutil" @@ -80,6 +81,7 @@ type Session struct { // or to directly send a header type sendReady struct { Hdr []byte + mu *sync.Mutex // Protects Body from unsafe reads. Body []byte Err chan error } @@ -373,7 +375,7 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro timerPool.Put(t) }() - ready := sendReady{Hdr: hdr, Body: body, Err: errCh} + ready := sendReady{Hdr: hdr, mu: &sync.Mutex{}, Body: body, Err: errCh} select { case s.sendCh <- ready: case <-s.shutdownCh: @@ -382,12 +384,34 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro return ErrConnectionWriteTimeout } + bodyCopy := func() { + if body == nil { + return // A nil body is ignored. + } + + // In the event of session shutdown or connection write timeout, + // we need to prevent `send` from reading the body buffer after + // returning from this function since the caller may re-use the + // underlying array. + ready.mu.Lock() + defer ready.mu.Unlock() + + if ready.Body == nil { + return // Body was already copied in `send`. + } + newBody := make([]byte, len(body)) + copy(newBody, body) + ready.Body = newBody + } + select { case err := <-errCh: return err case <-s.shutdownCh: + bodyCopy() return ErrSessionShutdown case <-timer.C: + bodyCopy() return ErrConnectionWriteTimeout } } @@ -420,7 +444,10 @@ func (s *Session) sendNoWait(hdr header) error { // send is a long running goroutine that sends data func (s *Session) send() { + var bodyBuf bytes.Buffer for { + bodyBuf.Reset() + select { case ready := <-s.sendCh: // Send a header if ready @@ -438,9 +465,28 @@ func (s *Session) send() { } } - // Send data from a body if given - if ready.Body != nil { - _, err := s.conn.Write(ready.Body) + if ready.mu != nil { + ready.mu.Lock() + if ready.Body != nil { + // Copy the body into the buffer to avoid + // holding a mutex lock during the write. + _, err := bodyBuf.Write(ready.Body) + if err != nil { + ready.Body = nil + ready.mu.Unlock() + s.logger.Printf("[ERR] yamux: Failed to copy body into buffer: %v", err) + asyncSendErr(ready.Err, err) + s.exitErr(err) + return + } + ready.Body = nil + } + ready.mu.Unlock() + } + + if bodyBuf.Len() > 0 { + // Send data from a body if given + _, err := s.conn.Write(bodyBuf.Bytes()) if err != nil { s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) asyncSendErr(ready.Err, err) From 7bf120a2ce53933316adc49e922770582b2699e9 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Tue, 12 Jul 2022 20:40:24 +0300 Subject: [PATCH 5/5] fix: Make sendReady a pointer so the Body can be swapped (#5) --- session.go | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/session.go b/session.go index 7cb18b4..ae81cff 100644 --- a/session.go +++ b/session.go @@ -64,7 +64,7 @@ type Session struct { // sendCh is used to mark a stream as ready to send, // or to send a header out directly. - sendCh chan sendReady + sendCh chan *sendReady // recvDoneCh is closed when recv() exits to avoid a race // between stream registration and stream shutdown @@ -81,7 +81,7 @@ type Session struct { // or to directly send a header type sendReady struct { Hdr []byte - mu *sync.Mutex // Protects Body from unsafe reads. + mu sync.Mutex // Protects Body from unsafe reads. Body []byte Err chan error } @@ -103,7 +103,7 @@ func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { inflight: make(map[uint32]struct{}), synCh: make(chan struct{}, config.AcceptBacklog), acceptCh: make(chan *Stream, config.AcceptBacklog), - sendCh: make(chan sendReady, 64), + sendCh: make(chan *sendReady, 64), recvDoneCh: make(chan struct{}), shutdownCh: make(chan struct{}), } @@ -375,7 +375,7 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro timerPool.Put(t) }() - ready := sendReady{Hdr: hdr, mu: &sync.Mutex{}, Body: body, Err: errCh} + ready := &sendReady{Hdr: hdr, Body: body, Err: errCh} select { case s.sendCh <- ready: case <-s.shutdownCh: @@ -433,7 +433,7 @@ func (s *Session) sendNoWait(hdr header) error { }() select { - case s.sendCh <- sendReady{Hdr: hdr}: + case s.sendCh <- &sendReady{Hdr: hdr}: return nil case <-s.shutdownCh: return ErrSessionShutdown @@ -465,24 +465,22 @@ func (s *Session) send() { } } - if ready.mu != nil { - ready.mu.Lock() - if ready.Body != nil { - // Copy the body into the buffer to avoid - // holding a mutex lock during the write. - _, err := bodyBuf.Write(ready.Body) - if err != nil { - ready.Body = nil - ready.mu.Unlock() - s.logger.Printf("[ERR] yamux: Failed to copy body into buffer: %v", err) - asyncSendErr(ready.Err, err) - s.exitErr(err) - return - } + ready.mu.Lock() + if ready.Body != nil { + // Copy the body into the buffer to avoid + // holding a mutex lock during the write. + _, err := bodyBuf.Write(ready.Body) + if err != nil { ready.Body = nil + ready.mu.Unlock() + s.logger.Printf("[ERR] yamux: Failed to copy body into buffer: %v", err) + asyncSendErr(ready.Err, err) + s.exitErr(err) + return } - ready.mu.Unlock() + ready.Body = nil } + ready.mu.Unlock() if bodyBuf.Len() > 0 { // Send data from a body if given