Skip to content

Protect reads of sendReady.Body with mutex and temp buffer #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 11, 2022

Conversation

mafredri
Copy link
Member

@mafredri mafredri commented Jul 1, 2022

This commit fixes an edge case where the body passed to waitForSendErr can be written to after returning from the function. This happens when sendReady is buffered on the sendCh and the session is shutdown or the write times out.

When this condition happens and waitForSendErr has not yet exited, the body is safely copied into a temporary buffer in send. Otherwise waitForSendErr safely created a copy of the body and exits, this essentially results in double buffering for the edge case which seems acceptable.

Fixes coder/coder#2429.

Related: hashicorp#40.

@mafredri mafredri self-assigned this Jul 1, 2022
@johnstcn
Copy link
Member

johnstcn commented Jul 1, 2022

One solution would be to always use a copy of body, but to avoid any potential performance impacts there. We instead use a channel to communicate the fact that the body is either not yet written (at which point we can make a copy) or then we block until the write is done.

What sort of performance impact are we talking about here? If it's negligible, maybe it's better to always copy.

@mafredri
Copy link
Member Author

mafredri commented Jul 1, 2022

What sort of performance impact are we talking about here? If it's negligible, maybe it's better to always copy.

I haven't measured, but essentially we'd need to allocate ~make([]byte, 256*1024) per in-flight write, but that size only applies for the initial send window size, it can grow or shrink (and I haven't look at when / by how much). Furthermore, all these byte slices would end up on the heap, so we'd want to manage them via sync.Pool to avoid creating extra work for the garbage collector. If we do use the pool, we might either use a bytes.Buffer to avoid having to manage buffer sizes, or always use a maximum size (if there is one).

@mafredri
Copy link
Member Author

mafredri commented Jul 1, 2022

A new edge case introduced by this PR could be for a stall to happen during conn.Write(body) which would prevent waitForSendErr to exit immediately on timeout. In this case we would wait for Write to complete. I think this is preferable to always making a copy of body.

This edge-case should now be fixed via a9a7d9e.

We simply rely on a bytes.Buffer and do not attempt to manage its size, once it has grown to a certain size, it will remain that way for the life of the session.

session.go Outdated
return // Body was already copied.
}
newBody := make([]byte, len(body))
copy(newBody, body)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is still a race here where we drain the channel on line 395, then send() selects on line 469 and hits the default case, then we write the copy to the channel on line 401. You end up not writing the data.

Given that we're in session shutdown or write timeout, do we want to even bother copying the body? Like, we're going to return an error, so I don't think the sender will have the expectation that the data was actually written.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! This subtle error was introduced by the change in a9a7d9e, the default case was not supposed to be present any longer.

The reason we care about copying the body is that we have no "cancel" mechanism for writes where the header has already been written, so we allow the message to pass as-is if it reaches this junction. If we don't write the body a future write could fail due to incomplete packet (header, missing body). The idea is to keep the behavior as close to the original implementation as possible, whilst doing it safely.

session.go Outdated
@@ -373,7 +374,9 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro
timerPool.Put(t)
}()

ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
bodyC := make(chan []byte, 1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this chan has to be allocated on the heap, which will make more work for the garbage collector. If we're concerned about that kind of hit, we could add a mutex to sendReady that needs to be held while accessing the body in send() and bodyCopy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good suggestion and I did actually consider a mutex, but a channel seemed more appropriate in this case. It'd be interesting to do some benchmarks, but according to go build -gcflags '-m -m' it doesn't escape to the heap. But maybe I'm using/interpreting that wrong.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's accurate to say that bodyC itself doesn't need to escape to the heap, since it is just copied to the sendReady. But, the chan type is itself a reference, and the thing it points to, hchan, is allocated on the heap: https://levelup.gitconnected.com/how-does-golang-channel-works-6d66acd54753

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's true. I wasn't sure how much of a penalty the channels are though, so I made a quick benchmark to see how much of a difference there is. And it's not huge, but we can save 1 alloc and a little bit of memory by using a mutex here. Big picture it's quite negligible but since I already wrote the code, let's go with it 😄.

Benchmark results (do note that wait group and groutine skews the raw results here):

Channels:

❯ go test -benchmem -run='^$' -bench '^BenchmarkSession_send' -count=3 -benchtime=2s
goos: linux
goarch: amd64
pkg: github.com/hashicorp/yamux
cpu: Intel Core Processor (Broadwell, IBRS)
BenchmarkSession_send-chan-6   	   13564	    157409 ns/op	   31389 B/op	     501 allocs/op
BenchmarkSession_send-chan-6   	   14619	    166961 ns/op	   31396 B/op	     501 allocs/op
BenchmarkSession_send-chan-6   	   15681	    164911 ns/op	   31391 B/op	     501 allocs/op

If we change chan to mutex, and sendReady to a pointer chan *sendReady (to avoid mutex copy):

BenchmarkSession_send-mu-6   	   13770	    153617 ns/op	   25756 B/op	     400 allocs/op
BenchmarkSession_send-mu-6   	   16140	    166674 ns/op	   25764 B/op	     401 allocs/op
BenchmarkSession_send-mu-6   	   16030	    151941 ns/op	   25755 B/op	     400 allocs/op

And if we restore chan *sendReady => chan sendReady, and change the mutex to a pointer (mu *sync.Mutex), we save a bit more:

BenchmarkSession_send-mu2-6   	   15700	    149387 ns/op	   20130 B/op	     400 allocs/op
BenchmarkSession_send-mu2-6   	   16130	    140559 ns/op	   20129 B/op	     400 allocs/op
BenchmarkSession_send-mu2-6   	   15886	    138668 ns/op	   20128 B/op	     400 allocs/op

For reference, my quick and dirty benchmark:

type fakeConn struct {
	io.ReadCloser
	io.Writer
}

func BenchmarkSession_send(b *testing.B) {
	conn := &fakeConn{
		Writer: io.Discard,
	}
	s := &Session{
		logger: log.New(os.Stderr, "test", log.LstdFlags),
		conn:   conn,
		sendCh: make(chan sendReady, 64),
		config: &Config{
			ConnectionWriteTimeout: 5 * time.Second,
		},
	}
	go s.send()

	hdr := header(make([]byte, headerSize))
	body := make([]byte, 1024)
	var wg sync.WaitGroup
	n := 100

	for i := 0; i < b.N; i++ {
		wg.Add(n)
		for i := 0; i < n; i++ {
			errc := make(chan error, 1)
			go func() {
				s.waitForSendErr(hdr, body, errc)
				wg.Done()
			}()
		}
		wg.Wait()
	}
}

FYI: Not using WaitGroup and running a single s.waitForSendErr synchronously produces 4 allocs for chan, 3 allocs for mutex.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit fixes an edge case where the `body` passed to
`waitForSendErr` can be written to after returning from the function.
This happens when `sendReady` is buffered on the `sendCh` and the
session is shutdown or the write times out.

When this condition happens and `waitForSendErr` has not yet exited, the
`body` is safely copied into a temporary buffer in `send`. Otherwise
`waitForSendErr` safely created a copy of the `body` and exits, this
essentially results in double buffering for the edge case which seems
acceptable.
@mafredri mafredri force-pushed the mafredri/fix-unsafe-body-reuse branch from f0673b7 to f6d66e1 Compare July 11, 2022 09:04
@mafredri mafredri changed the title Create a body copy in waitForSendErr on shutdown and timeout Protect reads of sendReady.Body with mutex and temp buffer Jul 11, 2022
@mafredri mafredri merged commit ba57465 into coder:master Jul 11, 2022
mafredri added a commit that referenced this pull request Jul 12, 2022
This bug appreaed due to multiple refactors in
#4.
@mafredri mafredri deleted the mafredri/fix-unsafe-body-reuse branch July 18, 2022 17:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

data race related to dRPC / yamux
3 participants