Skip to content

fix: Add resiliency to daemon connections #1116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion coderd/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/moby/moby/pkg/namesgenerator"
"github.com/tabbed/pqtype"
"golang.org/x/xerrors"
protobuf "google.golang.org/protobuf/proto"
"nhooyr.io/websocket"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/coder/coder/coderd/httpapi"
"github.com/coder/coder/coderd/parameter"
"github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionersdk"
sdkproto "github.com/coder/coder/provisionersdk/proto"
)

Expand All @@ -47,6 +49,8 @@ func (api *api) provisionerDaemonsListen(rw http.ResponseWriter, r *http.Request
})
return
}
// Align with the frame size of yamux.
conn.SetReadLimit(256 * 1024)

daemon, err := api.Database.InsertProvisionerDaemon(r.Context(), database.InsertProvisionerDaemonParams{
ID: uuid.New(),
Expand Down Expand Up @@ -82,9 +86,17 @@ func (api *api) provisionerDaemonsListen(rw http.ResponseWriter, r *http.Request
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("drpc register provisioner daemon: %s", err))
return
}
server := drpcserver.New(mux)
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
Log: func(err error) {
if xerrors.Is(err, io.EOF) {
return
}
api.Logger.Debug(r.Context(), "drpc server error", slog.Error(err))
},
})
err = server.Serve(r.Context(), session)
if err != nil {
api.Logger.Debug(r.Context(), "provisioner daemon disconnected", slog.Error(err))
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("serve: %s", err))
return
}
Expand Down Expand Up @@ -253,6 +265,9 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
default:
return nil, failJob(fmt.Sprintf("unsupported storage method: %s", job.StorageMethod))
}
if protobuf.Size(protoJob) > provisionersdk.MaxMessageSize {
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), provisionersdk.MaxMessageSize))
}

return protoJob, err
}
Expand Down
48 changes: 48 additions & 0 deletions coderd/provisionerdaemons_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package coderd_test

import (
"context"
"crypto/rand"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/coder/coder/coderd/coderdtest"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/provisionersdk"
)

func TestProvisionerDaemons(t *testing.T) {
t.Parallel()
t.Run("PayloadTooBig", func(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
// Takes too long to allocate memory on Windows!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🪟🪟🪟

t.Skip()
}
client := coderdtest.New(t, nil)
user := coderdtest.CreateFirstUser(t, client)
coderdtest.NewProvisionerDaemon(t, client)
data := make([]byte, provisionersdk.MaxMessageSize)
rand.Read(data)
resp, err := client.Upload(context.Background(), codersdk.ContentTypeTar, data)
require.NoError(t, err)
t.Log(resp.Hash)

version, err := client.CreateTemplateVersion(context.Background(), user.OrganizationID, codersdk.CreateTemplateVersionRequest{
StorageMethod: database.ProvisionerStorageMethodFile,
StorageSource: resp.Hash,
Provisioner: database.ProvisionerTypeEcho,
})
require.NoError(t, err)
require.Eventually(t, func() bool {
var err error
version, err = client.TemplateVersion(context.Background(), version.ID)
require.NoError(t, err)
return version.Job.Error != ""
}, 5*time.Second, 25*time.Millisecond)
})
}
4 changes: 2 additions & 2 deletions codersdk/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func (c *Client) ListenProvisionerDaemon(ctx context.Context) (proto.DRPCProvisi
}
return nil, readBodyAsError(res)
}
// Allow _somewhat_ large payloads.
conn.SetReadLimit((1 << 20) * 2)
// Align with the frame size of yamux.
conn.SetReadLimit(256 * 1024)

config := yamux.DefaultConfig()
config.LogOutput = io.Discard
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ replace github.com/chzyer/readline => github.com/kylecarbs/readline v0.0.0-20220
// Required until https://github.com/briandowns/spinner/pull/136 is merged.
replace github.com/briandowns/spinner => github.com/kylecarbs/spinner v1.18.2-0.20220329160715-20702b5af89e

// Required until https://github.com/storj/drpc/pull/31 is merged.
replace storj.io/drpc => github.com/kylecarbs/drpc v0.0.31-0.20220424193521-8ebbaf48bdff

// opencensus-go leaks a goroutine by default.
replace go.opencensus.io => github.com/kylecarbs/opencensus-go v0.23.1-0.20220307014935-4d0325a68f8b

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq19sBYvuMoyQ4=
github.com/kylecarbs/drpc v0.0.31-0.20220424193521-8ebbaf48bdff h1:7qg425aXdULnZWCCQNPOzHO7c+M6BpbTfOUJLrk5+3w=
github.com/kylecarbs/drpc v0.0.31-0.20220424193521-8ebbaf48bdff/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
github.com/kylecarbs/opencensus-go v0.23.1-0.20220307014935-4d0325a68f8b h1:1Y1X6aR78kMEQE1iCjQodB3lA7VO4jB88Wf8ZrzXSsA=
github.com/kylecarbs/opencensus-go v0.23.1-0.20220307014935-4d0325a68f8b/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
github.com/kylecarbs/readline v0.0.0-20220211054233-0d62993714c8/go.mod h1:n/KX1BZoN1m9EwoXkn/xAV4fd3k8c++gGBsgLONaPOY=
Expand Down Expand Up @@ -2544,5 +2546,3 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.3/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/structured-merge-diff/v4 v4.1.0/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
storj.io/drpc v0.0.30 h1:jqPe4T9KEu3CDBI05A2hCMgMSHLtd/E0N0yTF9QreIE=
storj.io/drpc v0.0.30/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
Loading