Skip to content

data race related to dRPC / yamux #2429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
spikecurtis opened this issue Jun 16, 2022 · 2 comments · Fixed by coder/yamux#2, coder/yamux#4 or #2883
Closed

data race related to dRPC / yamux #2429

spikecurtis opened this issue Jun 16, 2022 · 2 comments · Fixed by coder/yamux#2, coder/yamux#4 or #2883
Assignees
Labels
api Area: HTTP API

Comments

@spikecurtis
Copy link
Contributor

https://github.com/coder/coder/runs/6923689367

snip:

==================
WARNING: DATA RACE
Read at 0x00c000810001 by goroutine 69:
  runtime.slicecopy()
      /opt/hostedtoolcache/go/1.18.3/x64/src/runtime/slice.go:295 +0x0
  net.(*pipe).read()
      /opt/hostedtoolcache/go/1.18.3/x64/src/net/pipe.go:161 +0x293
  net.(*pipe).Read()
      /opt/hostedtoolcache/go/1.18.3/x64/src/net/pipe.go:142 +0x54
  bufio.(*Reader).Read()
      /opt/hostedtoolcache/go/1.18.3/x64/src/bufio/bufio.go:236 +0x4da
  io.(*LimitedReader).Read()
      /opt/hostedtoolcache/go/1.18.3/x64/src/io/io.go:476 +0xc5
  bytes.(*Buffer).ReadFrom()
      /opt/hostedtoolcache/go/1.18.3/x64/src/bytes/buffer.go:204 +0x112
  io.copyBuffer()
      /opt/hostedtoolcache/go/1.18.3/x64/src/io/io.go:[412](https://github.com/coder/coder/runs/6923689367?check_suite_focus=true#step:10:413) +0x1c2
  io.Copy()
      /opt/hostedtoolcache/go/1.18.3/x64/src/io/io.go:385 +0x33c
  github.com/hashicorp/yamux.(*Stream).readData()
      /home/runner/go/pkg/mod/github.com/hashicorp/yamux@v0.0.0-20211028200310-0bc27b27de87/stream.go:476 +0x2f9
  github.com/hashicorp/yamux.(*Session).handleStreamMessage()
      /home/runner/go/pkg/mod/github.com/hashicorp/yamux@v0.0.0-20211028200310-0bc27b27de87/session.go:550 +0x3e4
  github.com/hashicorp/yamux.(*Session).recvLoop()
      /home/runner/go/pkg/mod/github.com/hashicorp/yamux@v0.0.0-20211028200310-0bc27b27de87/session.go:501 +0x1ed
  github.com/hashicorp/yamux.(*Session).recv()
      /home/runner/go/pkg/mod/github.com/hashicorp/yamux@v0.0.0-20211028200310-0bc27b27de87/session.go:462 +0x2e
  github.com/hashicorp/yamux.newSession.func1()
      /home/runner/go/pkg/mod/github.com/hashicorp/yamux@v0.0.0-20211028200310-0bc27b27de87/session.go:113 +0x39
Previous write at 0x00c000810001 by goroutine 121:
  storj.io/drpc/drpcwire.AppendVarint()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.2022042[419](https://github.com/coder/coder/runs/6923689367?check_suite_focus=true#step:10:420)3521-8ebbaf48bdff/drpcwire/varint.go:32 +0x1e4
  storj.io/drpc/drpcwire.AppendFrame()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220[424](https://github.com/coder/coder/runs/6923689367?check_suite_focus=true#step:10:425)193521-8ebbaf48bdff/drpcwire/packet.go:139 +0xf9
  storj.io/drpc/drpcwire.(*Writer).WriteFrame()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcwire/writer.go:70 +0x110
  storj.io/drpc/drpcstream.(*Stream).sendPacket()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcstream/stream.go:264 +0x204
  storj.io/drpc/drpcstream.(*Stream).SendError()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcstream/stream.go:457 +0x1d4
  storj.io/drpc/drpcserver.(*Server).handleRPC()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcserver/server.go:124 +0x86
  storj.io/drpc/drpcserver.(*Server).ServeOne()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcserver/server.go:66 +0x2a4
  storj.io/drpc/drpcserver.(*Server).Serve.func2()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcserver/server.go:112 +0x73
  storj.io/drpc/drpcctx.(*Tracker).track()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcctx/transport.go:52 +0x3d
  storj.io/drpc/drpcctx.(*Tracker).Run.func1()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcctx/transport.go:47 +0x47
Goroutine 69 (running) created at:
  github.com/hashicorp/yamux.newSession()
      /home/runner/go/pkg/mod/github.com/hashicorp/yamux@v0.0.0-20211028200310-0bc27b27de87/session.go:113 +0x8a6
  github.com/hashicorp/yamux.Client()
      /home/runner/go/pkg/mod/github.com/hashicorp/yamux@v0.0.0-20211028200310-0bc27b27de87/mux.go:113 +0x207
  github.com/coder/coder/provisionersdk.TransportPipe()
      /home/runner/work/coder/coder/provisionersdk/transport.go:24 +0x25e
  github.com/coder/coder/provisioner/terraform_test.setupProvisioner()
      /home/runner/work/coder/coder/provisioner/terraform/provision_test.go:26 +0x44
  github.com/coder/coder/provisioner/terraform_test.TestProvision()
      /home/runner/work/coder/coder/provisioner/terraform/provision_test.go:49 +0x57
  testing.tRunner()
      /opt/hostedtoolcache/go/1.18.3/x64/src/testing/testing.go:1[439](https://github.com/coder/coder/runs/6923689367?check_suite_focus=true#step:10:440) +0x213
  testing.(*T).Run.func1()
      /opt/hostedtoolcache/go/1.18.3/x64/src/testing/testing.go:1486 +0x47
Goroutine 121 (finished) created at:
  storj.io/drpc/drpcctx.(*Tracker).Run()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcctx/transport.go:47 +0xee
  storj.io/drpc/drpcserver.(*Server).Serve()
      /home/runner/go/pkg/mod/github.com/kylecarbs/drpc@v0.0.31-0.20220424193521-8ebbaf48bdff/drpcserver/server.go:111 +0x3e7
  github.com/coder/coder/provisionersdk.Serve()
      /home/runner/work/coder/coder/provisionersdk/serve.go:55 +0x765
  github.com/coder/coder/provisioner/terraform.Serve()
      /home/runner/work/coder/coder/provisioner/terraform/serve.go:73 +0x64c
  github.com/coder/coder/provisioner/terraform_test.setupProvisioner.func2()
      /home/runner/work/coder/coder/provisioner/terraform/provision_test.go:34 +0x250
==================

Could be related to #2417

@spikecurtis spikecurtis added bug api Area: HTTP API labels Jun 16, 2022
@spikecurtis
Copy link
Contributor Author

spikecurtis commented Jun 17, 2022

Ok, I think this one is related to this snippet of code in dRPC

// Flush forces a flush of any buffered data to the io.Writer. It is a no-op if
// there is no data in the buffer.
func (b *Writer) Flush() (err error) {
	b.mu.Lock()
	if len(b.buf) > 0 {
		_, err = b.w.Write(b.buf)
		b.buf = b.buf[:0]
		atomic.StoreUint32(&b.empty, 0)
	}
	b.mu.Unlock()
	return err
}

It calls Write() with the buffer (which is a []byte). The authors here seem to be assuming that a call to Write() synchronously copies the data, because instead of creating a new buffer, they truncate the existing buffer.

This is not the way net.Pipe works, because what happens is that the buffer gets put on a channel, and a call to Read() pulls that buffer off the channel, and then copies it. The channels are unbuffered, so the call to Write blocks until the reader pulls the buffer off the channel, but taking the buffer off the channel and copying it is not an atomic operation, so another write to the buffer can come in.

That's actually fine for dRPC to be doing because, according to io.Writer's docs, implementations are not allowed to retain the byte slice after they return from Write. net.Pipe, on closer inspection, is fine in this regard, but it looks like yamux is not.

A write to a yamux session eventually hits this function, where body is the []byte from the write method.

// waitForSendErr waits to send a header with optional data, checking for a
// potential shutdown. Since there's the expectation that sends can happen
// in a timely manner, we enforce the connection write timeout here.
func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) error {
	t := timerPool.Get()
	timer := t.(*time.Timer)
	timer.Reset(s.config.ConnectionWriteTimeout)
	defer func() {
		timer.Stop()
		select {
		case <-timer.C:
		default:
		}
		timerPool.Put(t)
	}()

	ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
	select {
	case s.sendCh <- ready:
	case <-s.shutdownCh:
		return ErrSessionShutdown
	case <-timer.C:
		return ErrConnectionWriteTimeout
	}

	select {
	case err := <-errCh:
		return err
	case <-s.shutdownCh:
		return ErrSessionShutdown
	case <-timer.C:
		return ErrConnectionWriteTimeout
	}
}

Notice that the body gets put on a channel, and then we wait for the response to come back, or crucially, the channel to be shut down or a time out. The s.sendCh is actually a buffered channel with len 64, so our write could easily end up at the back of a long queue and time out, or the channel could be shut down while the write is in flight.

This case is consistent with the stack trace of the data race: the main RPC response hits an error like timeout or shutdown, and then dRPC attempts to write an error to the stream: storj.io/drpc/drpcstream.(*Stream).SendError() resulting in a race with the reader trying to read the data.

Handling shutdown should be relatively straightforward: the goroutine that processes the s.sendCh should drain the channel and return errors, rather than directly selecting on that case. The timeout is trickier, because you need to ensure that the buffer won't be written before it's safe to return. A lock in the sendReady stucture could work, but that feels inelegant.

@mafredri
Copy link
Member

I’m glad you opened this issue Spike!

I believe I identified the same issue in: hashicorp/yamux#100 but the project seems kinda dead, so it fell into limbo. If we’re going to continue using yamux, we’ll need to fork. I’d be in favor of replacing it entirely though.

@mafredri mafredri self-assigned this Jul 1, 2022
mafredri added a commit to coder/yamux that referenced this issue Jul 11, 2022
This commit takes a minimal approach to fixing unsafe header re-use
during close and timeout by reallocating the header in those cases.

Partially fixes: coder/coder#2429

Related: hashicorp#40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment