From 09bf9c1e2da15aed7a1ea9e6543404520e3877f7 Mon Sep 17 00:00:00 2001 From: Spike Curtis <spike@coder.com> Date: Tue, 17 May 2022 22:57:34 +0000 Subject: [PATCH 1/3] in-process provisionerd connection Signed-off-by: Spike Curtis <spike@coder.com> --- coderd/coderd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/coderd.go b/coderd/coderd.go index 10862599a19e5..9acc78f16fd59 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -103,7 +103,7 @@ func newRouter(options *Options, a *api) chi.Router { }) }, httpmw.Prometheus, - tracing.HTTPMW(api.TracerProvider, "coderd.http"), + tracing.HTTPMW(a.TracerProvider, "coderd.http"), ) r.Route("/api/v2", func(r chi.Router) { From 17146e535446b870c69ca1793960cb96eee3ffef Mon Sep 17 00:00:00 2001 From: Spike Curtis <spike@coder.com> Date: Wed, 18 May 2022 22:40:30 +0000 Subject: [PATCH 2/3] chore: disable provisionerd listen endpoint Signed-off-by: Spike Curtis <spike@coder.com> --- coderd/coderd.go | 5 --- coderd/coderd_test.go | 2 - coderd/provisionerdaemons.go | 73 ---------------------------------- codersdk/provisionerdaemons.go | 36 ----------------- 4 files changed, 116 deletions(-) diff --git a/coderd/coderd.go b/coderd/coderd.go index 9acc78f16fd59..343da27bf55cf 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -209,11 +209,6 @@ func newRouter(options *Options, a *api) chi.Router { r.Get("/resources", a.templateVersionResources) r.Get("/logs", a.templateVersionLogs) }) - r.Route("/provisionerdaemons", func(r chi.Router) { - r.Route("/me", func(r chi.Router) { - r.Get("/listen", a.provisionerDaemonsListen) - }) - }) r.Route("/users", func(r chi.Router) { r.Get("/first", a.firstUser) r.Post("/first", a.postFirstUser) diff --git a/coderd/coderd_test.go b/coderd/coderd_test.go index f3d13dc8b8b75..f6d555fb3d015 100644 --- a/coderd/coderd_test.go +++ b/coderd/coderd_test.go @@ -106,8 +106,6 @@ func TestAuthorizeAllEndpoints(t *testing.T) { "GET:/api/v2/parameters/{scope}/{id}": {NoAuthorize: true}, "DELETE:/api/v2/parameters/{scope}/{id}/{name}": {NoAuthorize: true}, - "GET:/api/v2/provisionerdaemons/me/listen": {NoAuthorize: true}, - "DELETE:/api/v2/templates/{template}": {NoAuthorize: true}, "GET:/api/v2/templates/{template}": {NoAuthorize: true}, "GET:/api/v2/templates/{template}/versions": {NoAuthorize: true}, diff --git a/coderd/provisionerdaemons.go b/coderd/provisionerdaemons.go index 9c0c6592226c6..bb3fa21361bdd 100644 --- a/coderd/provisionerdaemons.go +++ b/coderd/provisionerdaemons.go @@ -13,12 +13,10 @@ import ( "time" "github.com/google/uuid" - "github.com/hashicorp/yamux" "github.com/moby/moby/pkg/namesgenerator" "github.com/tabbed/pqtype" "golang.org/x/xerrors" protobuf "google.golang.org/protobuf/proto" - "nhooyr.io/websocket" "storj.io/drpc/drpcmux" "storj.io/drpc/drpcserver" @@ -49,77 +47,6 @@ func (api *api) provisionerDaemonsByOrganization(rw http.ResponseWriter, r *http httpapi.Write(rw, http.StatusOK, daemons) } -// Serves the provisioner daemon protobuf API over a WebSocket. -func (api *api) provisionerDaemonsListen(rw http.ResponseWriter, r *http.Request) { - api.websocketWaitMutex.Lock() - api.websocketWaitGroup.Add(1) - api.websocketWaitMutex.Unlock() - defer api.websocketWaitGroup.Done() - - conn, err := websocket.Accept(rw, r, &websocket.AcceptOptions{ - // Need to disable compression to avoid a data-race. - CompressionMode: websocket.CompressionDisabled, - }) - if err != nil { - httpapi.Write(rw, http.StatusBadRequest, httpapi.Response{ - Message: fmt.Sprintf("accept websocket: %s", err), - }) - return - } - // Align with the frame size of yamux. - conn.SetReadLimit(256 * 1024) - - daemon, err := api.Database.InsertProvisionerDaemon(r.Context(), database.InsertProvisionerDaemonParams{ - ID: uuid.New(), - CreatedAt: database.Now(), - Name: namesgenerator.GetRandomName(1), - Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho, database.ProvisionerTypeTerraform}, - }) - if err != nil { - _ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("insert provisioner daemon: %s", err)) - return - } - - // Multiplexes the incoming connection using yamux. - // This allows multiple function calls to occur over - // the same connection. - config := yamux.DefaultConfig() - config.LogOutput = io.Discard - session, err := yamux.Server(websocket.NetConn(r.Context(), conn, websocket.MessageBinary), config) - if err != nil { - _ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("multiplex server: %s", err)) - return - } - mux := drpcmux.New() - err = proto.DRPCRegisterProvisionerDaemon(mux, &provisionerdServer{ - AccessURL: api.AccessURL, - ID: daemon.ID, - Database: api.Database, - Pubsub: api.Pubsub, - Provisioners: daemon.Provisioners, - Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)), - }) - if err != nil { - _ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("drpc register provisioner daemon: %s", err)) - return - } - server := drpcserver.NewWithOptions(mux, drpcserver.Options{ - Log: func(err error) { - if xerrors.Is(err, io.EOF) { - return - } - api.Logger.Debug(r.Context(), "drpc server error", slog.Error(err)) - }, - }) - err = server.Serve(r.Context(), session) - if err != nil && !xerrors.Is(err, io.EOF) { - api.Logger.Debug(r.Context(), "provisioner daemon disconnected", slog.Error(err)) - _ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("serve: %s", err)) - return - } - _ = conn.Close(websocket.StatusGoingAway, "") -} - // ListenProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd // in the same process. func (c *coderD) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) { diff --git a/codersdk/provisionerdaemons.go b/codersdk/provisionerdaemons.go index 308e3a803f09c..7d2fd3717896b 100644 --- a/codersdk/provisionerdaemons.go +++ b/codersdk/provisionerdaemons.go @@ -5,19 +5,12 @@ import ( "database/sql" "encoding/json" "fmt" - "io" "net/http" "net/url" "strconv" "time" "github.com/google/uuid" - "github.com/hashicorp/yamux" - "golang.org/x/xerrors" - "nhooyr.io/websocket" - - "github.com/coder/coder/provisionerd/proto" - "github.com/coder/coder/provisionersdk" ) type LogSource string @@ -86,35 +79,6 @@ type ProvisionerJobLog struct { Output string `json:"output"` } -// ListenProvisionerDaemon returns the gRPC service for a provisioner daemon implementation. -func (c *Client) ListenProvisionerDaemon(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) { - serverURL, err := c.URL.Parse("/api/v2/provisionerdaemons/me/listen") - if err != nil { - return nil, xerrors.Errorf("parse url: %w", err) - } - conn, res, err := websocket.Dial(ctx, serverURL.String(), &websocket.DialOptions{ - HTTPClient: c.HTTPClient, - // Need to disable compression to avoid a data-race. - CompressionMode: websocket.CompressionDisabled, - }) - if err != nil { - if res == nil { - return nil, err - } - return nil, readBodyAsError(res) - } - // Align with the frame size of yamux. - conn.SetReadLimit(256 * 1024) - - config := yamux.DefaultConfig() - config.LogOutput = io.Discard - session, err := yamux.Client(websocket.NetConn(ctx, conn, websocket.MessageBinary), config) - if err != nil { - return nil, xerrors.Errorf("multiplex client: %w", err) - } - return proto.NewDRPCProvisionerDaemonClient(provisionersdk.Conn(session)), nil -} - // provisionerJobLogsBefore provides log output that occurred before a time. // This is abstracted from a specific job type to provide consistency between // APIs. Logs is the only shared route between jobs. From b981032bd66b0e5c45181ec33552a1042ac20c80 Mon Sep 17 00:00:00 2001 From: Spike Curtis <spike@coder.com> Date: Thu, 19 May 2022 16:09:26 -0700 Subject: [PATCH 3/3] fix: tests, lint Signed-off-by: Spike Curtis <spike@coder.com> --- cli/delete_test.go | 3 +-- cli/logout_test.go | 3 ++- cli/server.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cli/delete_test.go b/cli/delete_test.go index 2f67fc52bd38f..f9e1102a0eb39 100644 --- a/cli/delete_test.go +++ b/cli/delete_test.go @@ -13,9 +13,8 @@ import ( func TestDelete(t *testing.T) { t.Run("WithParameter", func(t *testing.T) { t.Parallel() - client := coderdtest.New(t, nil) + client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerD: true}) user := coderdtest.CreateFirstUser(t, client) - coderdtest.NewProvisionerDaemon(t, client) version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, nil) coderdtest.AwaitTemplateVersionJob(t, client, version.ID) template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) diff --git a/cli/logout_test.go b/cli/logout_test.go index 2c98d92e29d43..2e36d114aa047 100644 --- a/cli/logout_test.go +++ b/cli/logout_test.go @@ -3,10 +3,11 @@ package cli_test import ( "testing" + "github.com/stretchr/testify/require" + "github.com/coder/coder/cli/clitest" "github.com/coder/coder/coderd/coderdtest" "github.com/coder/coder/pty/ptytest" - "github.com/stretchr/testify/require" ) func TestLogout(t *testing.T) { diff --git a/cli/server.go b/cli/server.go index fc7a97c0e0afd..4bd71b86f1161 100644 --- a/cli/server.go +++ b/cli/server.go @@ -110,7 +110,7 @@ func server() *cobra.Command { logger.Warn(cmd.Context(), "failed to start telemetry exporter", slog.Error(err)) } else { defer func() { - // allow time for traces to flush even if command context is cancelled + // allow time for traces to flush even if command context is canceled ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = tracerProvider.Shutdown(ctx)