Skip to content

Commit 1f20cab

Browse files
authored
fix: don't use yamux for in-memory provisioner{,d} streams (coder#5136)
1 parent 2b6c229 commit 1f20cab

File tree

14 files changed

+105
-57
lines changed

14 files changed

+105
-57
lines changed

cli/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,7 @@ func newProvisionerDaemon(
890890
return nil, xerrors.Errorf("mkdir %q: %w", cfg.CacheDirectory.Value, err)
891891
}
892892

893-
terraformClient, terraformServer := provisionersdk.TransportPipe()
893+
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
894894
go func() {
895895
<-ctx.Done()
896896
_ = terraformClient.Close()
@@ -920,11 +920,11 @@ func newProvisionerDaemon(
920920
}
921921

922922
provisioners := provisionerd.Provisioners{
923-
string(database.ProvisionerTypeTerraform): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)),
923+
string(database.ProvisionerTypeTerraform): sdkproto.NewDRPCProvisionerClient(terraformClient),
924924
}
925925
// include echo provisioner when in dev mode
926926
if dev {
927-
echoClient, echoServer := provisionersdk.TransportPipe()
927+
echoClient, echoServer := provisionersdk.MemTransportPipe()
928928
go func() {
929929
<-ctx.Done()
930930
_ = echoClient.Close()
@@ -941,7 +941,7 @@ func newProvisionerDaemon(
941941
}
942942
}
943943
}()
944-
provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient))
944+
provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(echoClient)
945945
}
946946
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
947947
// This debounces calls to listen every second. Read the comment

coderd/coderd.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ func compressHandler(h http.Handler) http.Handler {
644644
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
645645
// in the same process.
646646
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
647-
clientSession, serverSession := provisionersdk.TransportPipe()
647+
clientSession, serverSession := provisionersdk.MemTransportPipe()
648648
defer func() {
649649
if err != nil {
650650
_ = clientSession.Close()
@@ -705,5 +705,5 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
705705
_ = serverSession.Close()
706706
}()
707707

708-
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.Conn(clientSession)), nil
708+
return proto.NewDRPCProvisionerDaemonClient(clientSession), nil
709709
}

coderd/coderdtest/coderdtest.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ func NewWithAPI(t *testing.T, options *Options) (*codersdk.Client, io.Closer, *c
315315
// well with coderd testing. It registers the "echo" provisioner for
316316
// quick testing.
317317
func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
318-
echoClient, echoServer := provisionersdk.TransportPipe()
318+
echoClient, echoServer := provisionersdk.MemTransportPipe()
319319
ctx, cancelFunc := context.WithCancel(context.Background())
320320
t.Cleanup(func() {
321321
_ = echoClient.Close()
@@ -339,7 +339,7 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
339339
UpdateInterval: 250 * time.Millisecond,
340340
ForceCancelInterval: time.Second,
341341
Provisioners: provisionerd.Provisioners{
342-
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)),
342+
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
343343
},
344344
WorkDirectory: t.TempDir(),
345345
})
@@ -350,7 +350,7 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
350350
}
351351

352352
func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uuid.UUID, tags map[string]string) io.Closer {
353-
echoClient, echoServer := provisionersdk.TransportPipe()
353+
echoClient, echoServer := provisionersdk.MemTransportPipe()
354354
ctx, cancelFunc := context.WithCancel(context.Background())
355355
t.Cleanup(func() {
356356
_ = echoClient.Close()
@@ -374,7 +374,7 @@ func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uui
374374
UpdateInterval: 250 * time.Millisecond,
375375
ForceCancelInterval: time.Second,
376376
Provisioners: provisionerd.Provisioners{
377-
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)),
377+
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
378378
},
379379
WorkDirectory: t.TempDir(),
380380
})

codersdk/provisionerdaemons.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,5 +212,5 @@ func (c *Client) ServeProvisionerDaemon(ctx context.Context, organization uuid.U
212212
if err != nil {
213213
return nil, xerrors.Errorf("multiplex client: %w", err)
214214
}
215-
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.Conn(session)), nil
215+
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.MultiplexedConn(session)), nil
216216
}

enterprise/cli/provisionerdaemons.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func provisionerDaemonStart() *cobra.Command {
6969
return xerrors.Errorf("mkdir %q: %w", cacheDir, err)
7070
}
7171

72-
terraformClient, terraformServer := provisionersdk.TransportPipe()
72+
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
7373
go func() {
7474
<-ctx.Done()
7575
_ = terraformClient.Close()
@@ -104,7 +104,7 @@ func provisionerDaemonStart() *cobra.Command {
104104
logger.Info(ctx, "starting provisioner daemon", slog.F("tags", tags))
105105

106106
provisioners := provisionerd.Provisioners{
107-
string(database.ProvisionerTypeTerraform): proto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)),
107+
string(database.ProvisionerTypeTerraform): proto.NewDRPCProvisionerClient(terraformClient),
108108
}
109109
srv := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
110110
return client.ServeProvisionerDaemon(ctx, org.ID, []codersdk.ProvisionerType{

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ replace github.com/gliderlabs/ssh => github.com/coder/ssh v0.0.0-20220811105153-
5353

5454
require (
5555
cdr.dev/slog v1.4.2-0.20220525200111-18dce5c2cd5f
56-
cloud.google.com/go/compute v1.12.1 // indirect
5756
cloud.google.com/go/compute/metadata v0.2.1
5857
github.com/AlecAivazis/survey/v2 v2.3.5
5958
github.com/adrg/xdg v0.4.0
@@ -129,6 +128,7 @@ require (
129128
github.com/tabbed/pqtype v0.1.1
130129
github.com/u-root/u-root v0.10.0
131130
github.com/unrolled/secure v1.13.0
131+
github.com/valyala/fasthttp v1.41.0
132132
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1
133133
go.nhat.io/otelsql v0.7.0
134134
go.opentelemetry.io/otel v1.11.1
@@ -166,6 +166,7 @@ require (
166166
)
167167

168168
require (
169+
cloud.google.com/go/compute v1.12.1 // indirect
169170
filippo.io/edwards25519 v1.0.0-rc.1 // indirect
170171
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
171172
github.com/Microsoft/go-winio v0.5.2 // indirect

go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1776,8 +1776,11 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb
17761776
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
17771777
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
17781778
github.com/uudashr/gocognit v1.0.5/go.mod h1:wgYz0mitoKOTysqxTDMOUXg+Jb5SvtihkfmugIZYpEA=
1779+
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
17791780
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
17801781
github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
1782+
github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY=
1783+
github.com/valyala/fasthttp v1.41.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY=
17811784
github.com/valyala/quicktemplate v1.7.0/go.mod h1:sqKJnoaOF88V07vkO+9FL8fb9uZg/VPSJnLYn+LmLk8=
17821785
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
17831786
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE=
@@ -1975,6 +1978,7 @@ golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5y
19751978
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
19761979
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
19771980
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
1981+
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
19781982
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
19791983
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
19801984
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -2117,6 +2121,7 @@ golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug
21172121
golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
21182122
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
21192123
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
2124+
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
21202125
golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=
21212126
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
21222127
golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=

provisioner/echo/serve_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestEcho(t *testing.T) {
2323

2424
fs := afero.NewMemMapFs()
2525
// Create an in-memory provisioner to communicate with.
26-
client, server := provisionersdk.TransportPipe()
26+
client, server := provisionersdk.MemTransportPipe()
2727
ctx, cancelFunc := context.WithCancel(context.Background())
2828
t.Cleanup(func() {
2929
_ = client.Close()
@@ -36,7 +36,7 @@ func TestEcho(t *testing.T) {
3636
})
3737
assert.NoError(t, err)
3838
}()
39-
api := proto.NewDRPCProvisionerClient(provisionersdk.Conn(client))
39+
api := proto.NewDRPCProvisionerClient(client)
4040

4141
t.Run("Parse", func(t *testing.T) {
4242
t.Parallel()

provisioner/terraform/provision_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func setupProvisioner(t *testing.T, opts *provisionerServeOptions) (context.Cont
3636
opts = &provisionerServeOptions{}
3737
}
3838
cachePath := t.TempDir()
39-
client, server := provisionersdk.TransportPipe()
39+
client, server := provisionersdk.MemTransportPipe()
4040
ctx, cancelFunc := context.WithCancel(context.Background())
4141
serverErr := make(chan error, 1)
4242
t.Cleanup(func() {
@@ -59,7 +59,7 @@ func setupProvisioner(t *testing.T, opts *provisionerServeOptions) (context.Cont
5959
ExitTimeout: opts.exitTimeout,
6060
})
6161
}()
62-
api := proto.NewDRPCProvisionerClient(provisionersdk.Conn(client))
62+
api := proto.NewDRPCProvisionerClient(client)
6363
return ctx, api
6464
}
6565

provisionerd/provisionerd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/prometheus/client_golang/prometheus"
1515
"github.com/prometheus/client_golang/prometheus/promauto"
1616
"github.com/spf13/afero"
17+
"github.com/valyala/fasthttp/fasthttputil"
1718
"go.opentelemetry.io/otel/attribute"
1819
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
1920
"go.opentelemetry.io/otel/trace"
@@ -344,7 +345,7 @@ func (p *Server) acquireJob(ctx context.Context) {
344345
}
345346

346347
func retryable(err error) bool {
347-
return xerrors.Is(err, yamux.ErrSessionShutdown) || xerrors.Is(err, io.EOF) ||
348+
return xerrors.Is(err, yamux.ErrSessionShutdown) || xerrors.Is(err, io.EOF) || xerrors.Is(err, fasthttputil.ErrInmemoryListenerClosed) ||
348349
// annoyingly, dRPC sometimes returns context.Canceled if the transport was closed, even if the context for
349350
// the RPC *is not canceled*. Retrying is fine if the RPC context is not canceled.
350351
xerrors.Is(err, context.Canceled)

provisionerd/provisionerd_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,7 @@ func TestProvisionerd(t *testing.T) {
843843
<-failChan
844844
_ = client.DRPCConn().Close()
845845
second.Store(true)
846+
time.Sleep(50 * time.Millisecond)
846847
failedOnce.Do(func() { close(failedChan) })
847848
}()
848849
}
@@ -1075,7 +1076,7 @@ func createProvisionerDaemonClient(t *testing.T, server provisionerDaemonTestSer
10751076
return &proto.Empty{}, nil
10761077
}
10771078
}
1078-
clientPipe, serverPipe := provisionersdk.TransportPipe()
1079+
clientPipe, serverPipe := provisionersdk.MemTransportPipe()
10791080
t.Cleanup(func() {
10801081
_ = clientPipe.Close()
10811082
_ = serverPipe.Close()
@@ -1089,14 +1090,14 @@ func createProvisionerDaemonClient(t *testing.T, server provisionerDaemonTestSer
10891090
go func() {
10901091
_ = srv.Serve(ctx, serverPipe)
10911092
}()
1092-
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.Conn(clientPipe))
1093+
return proto.NewDRPCProvisionerDaemonClient(clientPipe)
10931094
}
10941095

10951096
// Creates a provisioner protobuf client that's connected
10961097
// to the server implementation provided.
10971098
func createProvisionerClient(t *testing.T, server provisionerTestServer) sdkproto.DRPCProvisionerClient {
10981099
t.Helper()
1099-
clientPipe, serverPipe := provisionersdk.TransportPipe()
1100+
clientPipe, serverPipe := provisionersdk.MemTransportPipe()
11001101
t.Cleanup(func() {
11011102
_ = clientPipe.Close()
11021103
_ = serverPipe.Close()
@@ -1110,7 +1111,7 @@ func createProvisionerClient(t *testing.T, server provisionerTestServer) sdkprot
11101111
go func() {
11111112
_ = srv.Serve(ctx, serverPipe)
11121113
}()
1113-
return sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(clientPipe))
1114+
return sdkproto.NewDRPCProvisionerClient(clientPipe)
11141115
}
11151116

11161117
type provisionerTestServer struct {

provisionersdk/serve.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import (
77
"net"
88
"os"
99

10+
"github.com/hashicorp/yamux"
11+
"github.com/valyala/fasthttp/fasthttputil"
1012
"golang.org/x/xerrors"
1113
"storj.io/drpc/drpcmux"
1214
"storj.io/drpc/drpcserver"
1315

14-
"github.com/hashicorp/yamux"
15-
1616
"github.com/coder/coder/provisionersdk/proto"
1717
)
1818

@@ -58,18 +58,14 @@ func Serve(ctx context.Context, server proto.DRPCProvisionerServer, options *Ser
5858
// short-lived processes that can be executed concurrently.
5959
err = srv.Serve(ctx, options.Listener)
6060
if err != nil {
61-
if errors.Is(err, io.EOF) {
62-
return nil
63-
}
64-
if errors.Is(err, context.Canceled) {
65-
return nil
66-
}
67-
if errors.Is(err, io.ErrClosedPipe) {
68-
return nil
69-
}
70-
if errors.Is(err, yamux.ErrSessionShutdown) {
61+
if errors.Is(err, io.EOF) ||
62+
errors.Is(err, context.Canceled) ||
63+
errors.Is(err, io.ErrClosedPipe) ||
64+
errors.Is(err, yamux.ErrSessionShutdown) ||
65+
errors.Is(err, fasthttputil.ErrInmemoryListenerClosed) {
7166
return nil
7267
}
68+
7369
return xerrors.Errorf("serve transport: %w", err)
7470
}
7571
return nil

provisionersdk/serve_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestProvisionerSDK(t *testing.T) {
2121
t.Parallel()
2222
t.Run("Serve", func(t *testing.T) {
2323
t.Parallel()
24-
client, server := provisionersdk.TransportPipe()
24+
client, server := provisionersdk.MemTransportPipe()
2525
defer client.Close()
2626
defer server.Close()
2727

@@ -34,7 +34,7 @@ func TestProvisionerSDK(t *testing.T) {
3434
assert.NoError(t, err)
3535
}()
3636

37-
api := proto.NewDRPCProvisionerClient(provisionersdk.Conn(client))
37+
api := proto.NewDRPCProvisionerClient(client)
3838
stream, err := api.Parse(context.Background(), &proto.Parse_Request{})
3939
require.NoError(t, err)
4040
_, err = stream.Recv()
@@ -43,7 +43,7 @@ func TestProvisionerSDK(t *testing.T) {
4343

4444
t.Run("ServeClosedPipe", func(t *testing.T) {
4545
t.Parallel()
46-
client, server := provisionersdk.TransportPipe()
46+
client, server := provisionersdk.MemTransportPipe()
4747
_ = client.Close()
4848
_ = server.Close()
4949

0 commit comments

Comments
 (0)