Skip to content

Commit ab34780

Browse files
committed
Merge pull request flynn#964 from benburkert/router-tcp-closenotify
router: detect early termination of downstream TCP connections
2 parents c8db6c1 + b2bf6a7 commit ab34780

File tree

5 files changed

+126
-8
lines changed

5 files changed

+126
-8
lines changed

router/proxy/close_notify.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package proxy
2+
3+
import (
4+
"io"
5+
"net"
6+
)
7+
8+
type closeNotifyConn struct {
9+
net.Conn
10+
11+
r io.Reader
12+
13+
cnc chan bool
14+
}
15+
16+
// CloseNotifyConn returns a net.Conn that implements http.CloseNotifier.
17+
// Used to detect connections closed early on the client side.
18+
func CloseNotifyConn(conn net.Conn) net.Conn {
19+
pr, pw := io.Pipe()
20+
21+
c := &closeNotifyConn{
22+
Conn: conn,
23+
r: pr,
24+
cnc: make(chan bool),
25+
}
26+
27+
go func() {
28+
_, err := io.Copy(pw, conn)
29+
if err == nil {
30+
err = io.EOF
31+
}
32+
pw.CloseWithError(err)
33+
close(c.cnc)
34+
}()
35+
36+
return c
37+
}
38+
39+
// CloseNotify returns a channel that receives a single value when the client
40+
// connection has gone away.
41+
func (c *closeNotifyConn) CloseNotify() <-chan bool {
42+
return c.cnc
43+
}
44+
45+
func (c *closeNotifyConn) Read(p []byte) (n int, err error) {
46+
return c.r.Read(p)
47+
}

router/proxy/reverseproxy.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"strings"
1515
"sync"
1616
"time"
17+
18+
"github.com/flynn/flynn/Godeps/_workspace/src/golang.org/x/net/context"
1719
)
1820

1921
const (
@@ -98,14 +100,26 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
98100
}
99101

100102
// ServeConn takes an inbound conn and proxies it to a backend.
101-
func (p *ReverseProxy) ServeConn(dconn net.Conn) {
103+
func (p *ReverseProxy) ServeConn(ctx context.Context, dconn net.Conn) {
102104
transport := p.transport
103105
if transport == nil {
104106
panic("router: nil transport for proxy")
105107
}
106108
defer dconn.Close()
107109

108-
uconn, err := transport.Connect()
110+
clientGone := dconn.(http.CloseNotifier).CloseNotify()
111+
ctx, cancel := context.WithCancel(ctx)
112+
defer cancel() // finish cancellation goroutine
113+
114+
go func() {
115+
select {
116+
case <-clientGone:
117+
cancel() // client went away, cancel request
118+
case <-ctx.Done():
119+
}
120+
}()
121+
122+
uconn, err := transport.Connect(ctx)
109123
if err != nil {
110124
p.logf("router: proxy error: %v", err)
111125
return

router/proxy/reverseproxy_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package proxy
2+
3+
import (
4+
"errors"
5+
"net"
6+
"net/http"
7+
"testing"
8+
9+
"github.com/flynn/flynn/Godeps/_workspace/src/golang.org/x/net/context"
10+
)
11+
12+
func TestServeConnClientGone(t *testing.T) {
13+
control, conn := net.Pipe()
14+
cnConn := CloseNotifyConn(conn)
15+
16+
clientGone := false
17+
dialer = dialerFunc(func(_, _ string) (net.Conn, error) {
18+
if clientGone {
19+
err := errors.New("dial after client gone")
20+
t.Error(err)
21+
return nil, err
22+
}
23+
24+
if err := control.Close(); err != nil {
25+
t.Fatal(err)
26+
}
27+
<-cnConn.(http.CloseNotifier).CloseNotify()
28+
29+
clientGone = true
30+
return nil, &dialErr{}
31+
})
32+
33+
fn := func() []string { return []string{"127.0.0.1:0", "127.0.0.1:0"} }
34+
prox := NewReverseProxy(fn, nil, false)
35+
36+
prox.ServeConn(context.Background(), cnConn)
37+
}
38+
39+
type dialerFunc func(string, string) (net.Conn, error)
40+
41+
func (f dialerFunc) Dial(network, addr string) (net.Conn, error) {
42+
return f(network, addr)
43+
}

router/proxy/transport.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,25 @@ import (
1111
"time"
1212

1313
"github.com/flynn/flynn/Godeps/_workspace/src/golang.org/x/crypto/nacl/secretbox"
14+
"github.com/flynn/flynn/Godeps/_workspace/src/golang.org/x/net/context"
1415
"github.com/flynn/flynn/pkg/random"
1516
)
1617

18+
type backendDialer interface {
19+
Dial(network, addr string) (c net.Conn, err error)
20+
}
21+
1722
var (
1823
errNoBackends = errors.New("router: no backends available")
24+
errCanceled = errors.New("router: backend connection canceled")
1925

2026
httpTransport = &http.Transport{
2127
Dial: customDial,
2228
ResponseHeaderTimeout: 120 * time.Second,
2329
TLSHandshakeTimeout: 10 * time.Second, // unused, but safer to leave default in place
2430
}
2531

26-
dialer = &net.Dialer{
32+
dialer backendDialer = &net.Dialer{
2733
Timeout: 1 * time.Second,
2834
KeepAlive: 30 * time.Second,
2935
}
@@ -87,16 +93,16 @@ func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
8793
return nil, errNoBackends
8894
}
8995

90-
func (t *transport) Connect() (net.Conn, error) {
96+
func (t *transport) Connect(ctx context.Context) (net.Conn, error) {
9197
backends := t.getOrderedBackends("")
92-
conn, _, err := dialTCP(backends)
98+
conn, _, err := dialTCP(ctx, backends)
9399
return conn, err
94100
}
95101

96102
func (t *transport) UpgradeHTTP(req *http.Request) (*http.Response, net.Conn, error) {
97103
stickyBackend := t.getStickyBackend(req)
98104
backends := t.getOrderedBackends(stickyBackend)
99-
upconn, addr, err := dialTCP(backends)
105+
upconn, addr, err := dialTCP(context.Background(), backends)
100106
if err != nil {
101107
return nil, nil, err
102108
}
@@ -116,8 +122,15 @@ func (t *transport) UpgradeHTTP(req *http.Request) (*http.Response, net.Conn, er
116122
return res, conn, nil
117123
}
118124

119-
func dialTCP(addrs []string) (net.Conn, string, error) {
125+
func dialTCP(ctx context.Context, addrs []string) (net.Conn, string, error) {
126+
donec := ctx.Done()
120127
for _, addr := range addrs {
128+
select {
129+
case <-donec:
130+
return nil, "", errCanceled
131+
default:
132+
}
133+
121134
if conn, err := dialer.Dial("tcp", addr); err == nil {
122135
return conn, addr, nil
123136
}

router/tcp.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync"
1010

1111
"github.com/flynn/flynn/Godeps/_workspace/src/github.com/kavu/go_reuseport"
12+
"github.com/flynn/flynn/Godeps/_workspace/src/golang.org/x/net/context"
1213
"github.com/flynn/flynn/router/proxy"
1314
"github.com/flynn/flynn/router/types"
1415
)
@@ -279,5 +280,5 @@ type tcpService struct {
279280
}
280281

281282
func (s *tcpService) ServeConn(conn net.Conn) {
282-
s.rp.ServeConn(conn)
283+
s.rp.ServeConn(context.Background(), proxy.CloseNotifyConn(conn))
283284
}

0 commit comments

Comments
 (0)