Skip to content

Commit bc9e96e

Browse files
committed
fix: Add resiliency to daemon connections
Connections could fail when massive payloads were transmitted. This fixes an upstream bug in dRPC where the connection would end with a context canceled if a message was too large. This adds retransmission of completion and failures too. If Coder somehow loses connection with a provisioner daemon, upon the next connection the state will be properly reported.
1 parent 7496c3d commit bc9e96e

File tree

8 files changed

+254
-36
lines changed

8 files changed

+254
-36
lines changed

coderd/provisionerdaemons.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/moby/moby/pkg/namesgenerator"
1818
"github.com/tabbed/pqtype"
1919
"golang.org/x/xerrors"
20+
protobuf "google.golang.org/protobuf/proto"
2021
"nhooyr.io/websocket"
2122
"storj.io/drpc/drpcmux"
2223
"storj.io/drpc/drpcserver"
@@ -27,6 +28,7 @@ import (
2728
"github.com/coder/coder/coderd/httpapi"
2829
"github.com/coder/coder/coderd/parameter"
2930
"github.com/coder/coder/provisionerd/proto"
31+
"github.com/coder/coder/provisionersdk"
3032
sdkproto "github.com/coder/coder/provisionersdk/proto"
3133
)
3234

@@ -47,6 +49,8 @@ func (api *api) provisionerDaemonsListen(rw http.ResponseWriter, r *http.Request
4749
})
4850
return
4951
}
52+
// Align with the frame size of yamux.
53+
conn.SetReadLimit(256 * 1024)
5054

5155
daemon, err := api.Database.InsertProvisionerDaemon(r.Context(), database.InsertProvisionerDaemonParams{
5256
ID: uuid.New(),
@@ -82,9 +86,17 @@ func (api *api) provisionerDaemonsListen(rw http.ResponseWriter, r *http.Request
8286
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("drpc register provisioner daemon: %s", err))
8387
return
8488
}
85-
server := drpcserver.New(mux)
89+
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
90+
Log: func(err error) {
91+
if xerrors.Is(err, io.EOF) {
92+
return
93+
}
94+
api.Logger.Debug(r.Context(), "drpc server error", slog.Error(err))
95+
},
96+
})
8697
err = server.Serve(r.Context(), session)
8798
if err != nil {
99+
api.Logger.Debug(r.Context(), "provisioner daemon disconnected", slog.Error(err))
88100
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("serve: %s", err))
89101
return
90102
}
@@ -253,6 +265,9 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
253265
default:
254266
return nil, failJob(fmt.Sprintf("unsupported storage method: %s", job.StorageMethod))
255267
}
268+
if protobuf.Size(protoJob) > provisionersdk.MaxMessageSize {
269+
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), provisionersdk.MaxMessageSize))
270+
}
256271

257272
return protoJob, err
258273
}

coderd/provisionerdaemons_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package coderd_test
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"runtime"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/coder/coder/coderd/coderdtest"
13+
"github.com/coder/coder/coderd/database"
14+
"github.com/coder/coder/codersdk"
15+
"github.com/coder/coder/provisionersdk"
16+
)
17+
18+
func TestProvisionerDaemons(t *testing.T) {
19+
t.Parallel()
20+
t.Run("PayloadTooBig", func(t *testing.T) {
21+
t.Parallel()
22+
if runtime.GOOS == "windows" {
23+
// Takes too long to allocate memory on Windows!
24+
t.Skip()
25+
}
26+
client := coderdtest.New(t, nil)
27+
user := coderdtest.CreateFirstUser(t, client)
28+
coderdtest.NewProvisionerDaemon(t, client)
29+
data := make([]byte, provisionersdk.MaxMessageSize)
30+
rand.Read(data)
31+
resp, err := client.Upload(context.Background(), codersdk.ContentTypeTar, data)
32+
require.NoError(t, err)
33+
t.Log(resp.Hash)
34+
35+
version, err := client.CreateTemplateVersion(context.Background(), user.OrganizationID, codersdk.CreateTemplateVersionRequest{
36+
StorageMethod: database.ProvisionerStorageMethodFile,
37+
StorageSource: resp.Hash,
38+
Provisioner: database.ProvisionerTypeEcho,
39+
})
40+
require.NoError(t, err)
41+
require.Eventually(t, func() bool {
42+
var err error
43+
version, err = client.TemplateVersion(context.Background(), version.ID)
44+
require.NoError(t, err)
45+
return version.Job.Error != ""
46+
}, 5*time.Second, 25*time.Millisecond)
47+
})
48+
}

codersdk/provisionerdaemons.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ func (c *Client) ListenProvisionerDaemon(ctx context.Context) (proto.DRPCProvisi
7070
}
7171
return nil, readBodyAsError(res)
7272
}
73-
// Allow _somewhat_ large payloads.
74-
conn.SetReadLimit((1 << 20) * 2)
73+
// Align with the frame size of yamux.
74+
conn.SetReadLimit(256 * 1024)
7575

7676
config := yamux.DefaultConfig()
7777
config.LogOutput = io.Discard

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ replace github.com/chzyer/readline => github.com/kylecarbs/readline v0.0.0-20220
1717
// Required until https://github.com/briandowns/spinner/pull/136 is merged.
1818
replace github.com/briandowns/spinner => github.com/kylecarbs/spinner v1.18.2-0.20220329160715-20702b5af89e
1919

20+
// Required until https://github.com/storj/drpc/pull/31 is merged.
21+
replace storj.io/drpc => github.com/kylecarbs/drpc v0.0.31-0.20220424193521-8ebbaf48bdff
22+
2023
// opencensus-go leaks a goroutine by default.
2124
replace go.opencensus.io => github.com/kylecarbs/opencensus-go v0.23.1-0.20220307014935-4d0325a68f8b
2225

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -1107,6 +1107,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
11071107
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
11081108
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
11091109
github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq19sBYvuMoyQ4=
1110+
github.com/kylecarbs/drpc v0.0.31-0.20220424193521-8ebbaf48bdff h1:7qg425aXdULnZWCCQNPOzHO7c+M6BpbTfOUJLrk5+3w=
1111+
github.com/kylecarbs/drpc v0.0.31-0.20220424193521-8ebbaf48bdff/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
11101112
github.com/kylecarbs/opencensus-go v0.23.1-0.20220307014935-4d0325a68f8b h1:1Y1X6aR78kMEQE1iCjQodB3lA7VO4jB88Wf8ZrzXSsA=
11111113
github.com/kylecarbs/opencensus-go v0.23.1-0.20220307014935-4d0325a68f8b/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
11121114
github.com/kylecarbs/readline v0.0.0-20220211054233-0d62993714c8/go.mod h1:n/KX1BZoN1m9EwoXkn/xAV4fd3k8c++gGBsgLONaPOY=
@@ -2544,5 +2546,3 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.3/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
25442546
sigs.k8s.io/structured-merge-diff/v4 v4.1.0/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
25452547
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
25462548
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
2547-
storj.io/drpc v0.0.30 h1:jqPe4T9KEu3CDBI05A2hCMgMSHLtd/E0N0yTF9QreIE=
2548-
storj.io/drpc v0.0.30/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=

provisionerd/provisionerd.go

+42-29
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ func New(clientDialer Dialer, opts *Options) *Server {
6868
clientDialer: clientDialer,
6969
opts: opts,
7070

71-
closeCancel: ctxCancel,
72-
closed: make(chan struct{}),
71+
closeContext: ctx,
72+
closeCancel: ctxCancel,
73+
closed: make(chan struct{}),
7374

7475
shutdown: make(chan struct{}),
7576

@@ -90,10 +91,11 @@ type Server struct {
9091
client proto.DRPCProvisionerDaemonClient
9192

9293
// Locked when closing the daemon.
93-
closeMutex sync.Mutex
94-
closeCancel context.CancelFunc
95-
closed chan struct{}
96-
closeError error
94+
closeMutex sync.Mutex
95+
closeContext context.Context
96+
closeCancel context.CancelFunc
97+
closed chan struct{}
98+
closeError error
9799

98100
shutdownMutex sync.Mutex
99101
shutdown chan struct{}
@@ -244,6 +246,9 @@ func (p *Server) runJob(ctx context.Context, job *proto.AcquiredJob) {
244246
resp, err := p.client.UpdateJob(ctx, &proto.UpdateJobRequest{
245247
JobId: job.JobId,
246248
})
249+
if errors.Is(err, yamux.ErrSessionShutdown) {
250+
continue
251+
}
247252
if err != nil {
248253
p.failActiveJobf("send periodic update: %s", err)
249254
return
@@ -493,7 +498,7 @@ func (p *Server) runTemplateImport(ctx, shutdown context.Context, provisioner sd
493498
return
494499
}
495500

496-
_, err = p.client.CompleteJob(ctx, &proto.CompletedJob{
501+
p.completeJob(&proto.CompletedJob{
497502
JobId: job.JobId,
498503
Type: &proto.CompletedJob_TemplateImport_{
499504
TemplateImport: &proto.CompletedJob_TemplateImport{
@@ -502,10 +507,6 @@ func (p *Server) runTemplateImport(ctx, shutdown context.Context, provisioner sd
502507
},
503508
},
504509
})
505-
if err != nil {
506-
p.failActiveJobf("complete job: %s", err)
507-
return
508-
}
509510
}
510511

511512
// Parses parameter schemas from source.
@@ -729,15 +730,7 @@ func (p *Server) runWorkspaceBuild(ctx, shutdown context.Context, provisioner sd
729730
return
730731
}
731732

732-
p.opts.Logger.Info(context.Background(), "provision successful; marking job as complete",
733-
slog.F("resource_count", len(msgType.Complete.Resources)),
734-
slog.F("resources", msgType.Complete.Resources),
735-
slog.F("state_length", len(msgType.Complete.State)),
736-
)
737-
738-
// Complete job may need to be async if we disconnected...
739-
// When we reconnect we can flush any of these cached values.
740-
_, err = p.client.CompleteJob(ctx, &proto.CompletedJob{
733+
p.completeJob(&proto.CompletedJob{
741734
JobId: job.JobId,
742735
Type: &proto.CompletedJob_WorkspaceBuild_{
743736
WorkspaceBuild: &proto.CompletedJob_WorkspaceBuild{
@@ -746,11 +739,12 @@ func (p *Server) runWorkspaceBuild(ctx, shutdown context.Context, provisioner sd
746739
},
747740
},
748741
})
749-
if err != nil {
750-
p.failActiveJobf("complete job: %s", err)
751-
return
752-
}
753-
// Return so we stop looping!
742+
p.opts.Logger.Info(context.Background(), "provision successful; marked job as complete",
743+
slog.F("resource_count", len(msgType.Complete.Resources)),
744+
slog.F("resources", msgType.Complete.Resources),
745+
slog.F("state_length", len(msgType.Complete.State)),
746+
)
747+
// Stop looping!
754748
return
755749
default:
756750
p.failActiveJobf("invalid message type %T received from provisioner", msg.Type)
@@ -759,6 +753,19 @@ func (p *Server) runWorkspaceBuild(ctx, shutdown context.Context, provisioner sd
759753
}
760754
}
761755

756+
func (p *Server) completeJob(job *proto.CompletedJob) {
757+
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(p.closeContext); {
758+
// Complete job may need to be async if we disconnected...
759+
// When we reconnect we can flush any of these cached values.
760+
_, err := p.client.CompleteJob(p.closeContext, job)
761+
if err != nil {
762+
p.opts.Logger.Warn(p.closeContext, "failed to complete job", slog.Error(err))
763+
continue
764+
}
765+
break
766+
}
767+
}
768+
762769
func (p *Server) failActiveJobf(format string, args ...interface{}) {
763770
p.failActiveJob(&proto.FailedJob{
764771
Error: fmt.Sprintf(format, args...),
@@ -786,12 +793,18 @@ func (p *Server) failActiveJob(failedJob *proto.FailedJob) {
786793
slog.F("job_id", p.jobID),
787794
)
788795
failedJob.JobId = p.jobID
789-
_, err := p.client.FailJob(context.Background(), failedJob)
790-
if err != nil {
791-
p.opts.Logger.Warn(context.Background(), "failed to notify of error; job is no longer running", slog.Error(err))
796+
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(p.closeContext); {
797+
_, err := p.client.FailJob(p.closeContext, failedJob)
798+
if err != nil {
799+
if p.isClosed() {
800+
return
801+
}
802+
p.opts.Logger.Warn(context.Background(), "failed to notify of error; job is no longer running", slog.Error(err))
803+
continue
804+
}
805+
p.opts.Logger.Debug(context.Background(), "marked running job as failed")
792806
return
793807
}
794-
p.opts.Logger.Debug(context.Background(), "marked running job as failed")
795808
}
796809

797810
// isClosed returns whether the API is closed or not.

0 commit comments

Comments
 (0)