Skip to content

Commit ebf01b5

Browse files
committed
fix: Add resiliency to daemon connections
Connections could fail when massive payloads were transmitted. This fixes an upstream bug in dRPC where the connection would end with a context canceled if a message was too large. This adds retransmission of completion and failures too. If Coder somehow loses connection with a provisioner daemon, upon the next connection the state will be properly reported.
1 parent 7496c3d commit ebf01b5

File tree

8 files changed

+349
-63
lines changed

8 files changed

+349
-63
lines changed

coderd/provisionerdaemons.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/moby/moby/pkg/namesgenerator"
1818
"github.com/tabbed/pqtype"
1919
"golang.org/x/xerrors"
20+
protobuf "google.golang.org/protobuf/proto"
2021
"nhooyr.io/websocket"
2122
"storj.io/drpc/drpcmux"
2223
"storj.io/drpc/drpcserver"
@@ -27,6 +28,7 @@ import (
2728
"github.com/coder/coder/coderd/httpapi"
2829
"github.com/coder/coder/coderd/parameter"
2930
"github.com/coder/coder/provisionerd/proto"
31+
"github.com/coder/coder/provisionersdk"
3032
sdkproto "github.com/coder/coder/provisionersdk/proto"
3133
)
3234

@@ -47,6 +49,8 @@ func (api *api) provisionerDaemonsListen(rw http.ResponseWriter, r *http.Request
4749
})
4850
return
4951
}
52+
// Align with the frame size of yamux.
53+
conn.SetReadLimit(256 * 1024)
5054

5155
daemon, err := api.Database.InsertProvisionerDaemon(r.Context(), database.InsertProvisionerDaemonParams{
5256
ID: uuid.New(),
@@ -82,9 +86,17 @@ func (api *api) provisionerDaemonsListen(rw http.ResponseWriter, r *http.Request
8286
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("drpc register provisioner daemon: %s", err))
8387
return
8488
}
85-
server := drpcserver.New(mux)
89+
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
90+
Log: func(err error) {
91+
if xerrors.Is(err, io.EOF) {
92+
return
93+
}
94+
api.Logger.Debug(r.Context(), "drpc server error", slog.Error(err))
95+
},
96+
})
8697
err = server.Serve(r.Context(), session)
8798
if err != nil {
99+
api.Logger.Debug(r.Context(), "provisioner daemon disconnected", slog.Error(err))
88100
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("serve: %s", err))
89101
return
90102
}
@@ -253,6 +265,9 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
253265
default:
254266
return nil, failJob(fmt.Sprintf("unsupported storage method: %s", job.StorageMethod))
255267
}
268+
if protobuf.Size(protoJob) > provisionersdk.MaxMessageSize {
269+
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), provisionersdk.MaxMessageSize))
270+
}
256271

257272
return protoJob, err
258273
}

coderd/provisionerdaemons_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package coderd_test
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"runtime"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/coder/coder/coderd/coderdtest"
13+
"github.com/coder/coder/coderd/database"
14+
"github.com/coder/coder/codersdk"
15+
"github.com/coder/coder/provisionersdk"
16+
)
17+
18+
func TestProvisionerDaemons(t *testing.T) {
19+
t.Parallel()
20+
t.Run("PayloadTooBig", func(t *testing.T) {
21+
t.Parallel()
22+
if runtime.GOOS == "windows" {
23+
// Takes too long to allocate memory on Windows!
24+
t.Skip()
25+
}
26+
client := coderdtest.New(t, nil)
27+
user := coderdtest.CreateFirstUser(t, client)
28+
coderdtest.NewProvisionerDaemon(t, client)
29+
data := make([]byte, provisionersdk.MaxMessageSize)
30+
rand.Read(data)
31+
resp, err := client.Upload(context.Background(), codersdk.ContentTypeTar, data)
32+
require.NoError(t, err)
33+
t.Log(resp.Hash)
34+
35+
version, err := client.CreateTemplateVersion(context.Background(), user.OrganizationID, codersdk.CreateTemplateVersionRequest{
36+
StorageMethod: database.ProvisionerStorageMethodFile,
37+
StorageSource: resp.Hash,
38+
Provisioner: database.ProvisionerTypeEcho,
39+
})
40+
require.NoError(t, err)
41+
require.Eventually(t, func() bool {
42+
var err error
43+
version, err = client.TemplateVersion(context.Background(), version.ID)
44+
require.NoError(t, err)
45+
return version.Job.Error != ""
46+
}, 5*time.Second, 25*time.Millisecond)
47+
})
48+
}

codersdk/provisionerdaemons.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ func (c *Client) ListenProvisionerDaemon(ctx context.Context) (proto.DRPCProvisi
7070
}
7171
return nil, readBodyAsError(res)
7272
}
73-
// Allow _somewhat_ large payloads.
74-
conn.SetReadLimit((1 << 20) * 2)
73+
// Align with the frame size of yamux.
74+
conn.SetReadLimit(256 * 1024)
7575

7676
config := yamux.DefaultConfig()
7777
config.LogOutput = io.Discard

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ replace github.com/chzyer/readline => github.com/kylecarbs/readline v0.0.0-20220
1717
// Required until https://github.com/briandowns/spinner/pull/136 is merged.
1818
replace github.com/briandowns/spinner => github.com/kylecarbs/spinner v1.18.2-0.20220329160715-20702b5af89e
1919

20+
// Required until https://github.com/storj/drpc/pull/31 is merged.
21+
replace storj.io/drpc => github.com/kylecarbs/drpc v0.0.31-0.20220424193521-8ebbaf48bdff
22+
2023
// opencensus-go leaks a goroutine by default.
2124
replace go.opencensus.io => github.com/kylecarbs/opencensus-go v0.23.1-0.20220307014935-4d0325a68f8b
2225

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -1107,6 +1107,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
11071107
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
11081108
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
11091109
github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq19sBYvuMoyQ4=
1110+
github.com/kylecarbs/drpc v0.0.31-0.20220424193521-8ebbaf48bdff h1:7qg425aXdULnZWCCQNPOzHO7c+M6BpbTfOUJLrk5+3w=
1111+
github.com/kylecarbs/drpc v0.0.31-0.20220424193521-8ebbaf48bdff/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
11101112
github.com/kylecarbs/opencensus-go v0.23.1-0.20220307014935-4d0325a68f8b h1:1Y1X6aR78kMEQE1iCjQodB3lA7VO4jB88Wf8ZrzXSsA=
11111113
github.com/kylecarbs/opencensus-go v0.23.1-0.20220307014935-4d0325a68f8b/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
11121114
github.com/kylecarbs/readline v0.0.0-20220211054233-0d62993714c8/go.mod h1:n/KX1BZoN1m9EwoXkn/xAV4fd3k8c++gGBsgLONaPOY=
@@ -2544,5 +2546,3 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.3/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
25442546
sigs.k8s.io/structured-merge-diff/v4 v4.1.0/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
25452547
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
25462548
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
2547-
storj.io/drpc v0.0.30 h1:jqPe4T9KEu3CDBI05A2hCMgMSHLtd/E0N0yTF9QreIE=
2548-
storj.io/drpc v0.0.30/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=

0 commit comments

Comments
 (0)