Skip to content

chore: disable provisionerd http listener #1611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cli/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import (
func TestDelete(t *testing.T) {
t.Run("WithParameter", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, nil)
client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerD: true})
user := coderdtest.CreateFirstUser(t, client)
coderdtest.NewProvisionerDaemon(t, client)
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, nil)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
Expand Down
3 changes: 2 additions & 1 deletion cli/logout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package cli_test
import (
"testing"

"github.com/stretchr/testify/require"

"github.com/coder/coder/cli/clitest"
"github.com/coder/coder/coderd/coderdtest"
"github.com/coder/coder/pty/ptytest"
"github.com/stretchr/testify/require"
)

func TestLogout(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func server() *cobra.Command {
logger.Warn(cmd.Context(), "failed to start telemetry exporter", slog.Error(err))
} else {
defer func() {
// allow time for traces to flush even if command context is cancelled
// allow time for traces to flush even if command context is canceled
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = tracerProvider.Shutdown(ctx)
Expand Down
7 changes: 1 addition & 6 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newRouter(options *Options, a *api) chi.Router {
})
},
httpmw.Prometheus,
tracing.HTTPMW(api.TracerProvider, "coderd.http"),
tracing.HTTPMW(a.TracerProvider, "coderd.http"),
)

r.Route("/api/v2", func(r chi.Router) {
Expand Down Expand Up @@ -209,11 +209,6 @@ func newRouter(options *Options, a *api) chi.Router {
r.Get("/resources", a.templateVersionResources)
r.Get("/logs", a.templateVersionLogs)
})
r.Route("/provisionerdaemons", func(r chi.Router) {
r.Route("/me", func(r chi.Router) {
r.Get("/listen", a.provisionerDaemonsListen)
})
})
r.Route("/users", func(r chi.Router) {
r.Get("/first", a.firstUser)
r.Post("/first", a.postFirstUser)
Expand Down
2 changes: 0 additions & 2 deletions coderd/coderd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ func TestAuthorizeAllEndpoints(t *testing.T) {
"GET:/api/v2/parameters/{scope}/{id}": {NoAuthorize: true},
"DELETE:/api/v2/parameters/{scope}/{id}/{name}": {NoAuthorize: true},

"GET:/api/v2/provisionerdaemons/me/listen": {NoAuthorize: true},

"DELETE:/api/v2/templates/{template}": {NoAuthorize: true},
"GET:/api/v2/templates/{template}": {NoAuthorize: true},
"GET:/api/v2/templates/{template}/versions": {NoAuthorize: true},
Expand Down
73 changes: 0 additions & 73 deletions coderd/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ import (
"time"

"github.com/google/uuid"
"github.com/hashicorp/yamux"
"github.com/moby/moby/pkg/namesgenerator"
"github.com/tabbed/pqtype"
"golang.org/x/xerrors"
protobuf "google.golang.org/protobuf/proto"
"nhooyr.io/websocket"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"

Expand Down Expand Up @@ -49,77 +47,6 @@ func (api *api) provisionerDaemonsByOrganization(rw http.ResponseWriter, r *http
httpapi.Write(rw, http.StatusOK, daemons)
}

// Serves the provisioner daemon protobuf API over a WebSocket.
func (api *api) provisionerDaemonsListen(rw http.ResponseWriter, r *http.Request) {
api.websocketWaitMutex.Lock()
api.websocketWaitGroup.Add(1)
api.websocketWaitMutex.Unlock()
defer api.websocketWaitGroup.Done()

conn, err := websocket.Accept(rw, r, &websocket.AcceptOptions{
// Need to disable compression to avoid a data-race.
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
httpapi.Write(rw, http.StatusBadRequest, httpapi.Response{
Message: fmt.Sprintf("accept websocket: %s", err),
})
return
}
// Align with the frame size of yamux.
conn.SetReadLimit(256 * 1024)

daemon, err := api.Database.InsertProvisionerDaemon(r.Context(), database.InsertProvisionerDaemonParams{
ID: uuid.New(),
CreatedAt: database.Now(),
Name: namesgenerator.GetRandomName(1),
Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho, database.ProvisionerTypeTerraform},
})
if err != nil {
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("insert provisioner daemon: %s", err))
return
}

// Multiplexes the incoming connection using yamux.
// This allows multiple function calls to occur over
// the same connection.
config := yamux.DefaultConfig()
config.LogOutput = io.Discard
session, err := yamux.Server(websocket.NetConn(r.Context(), conn, websocket.MessageBinary), config)
if err != nil {
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("multiplex server: %s", err))
return
}
mux := drpcmux.New()
err = proto.DRPCRegisterProvisionerDaemon(mux, &provisionerdServer{
AccessURL: api.AccessURL,
ID: daemon.ID,
Database: api.Database,
Pubsub: api.Pubsub,
Provisioners: daemon.Provisioners,
Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)),
})
if err != nil {
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("drpc register provisioner daemon: %s", err))
return
}
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
Log: func(err error) {
if xerrors.Is(err, io.EOF) {
return
}
api.Logger.Debug(r.Context(), "drpc server error", slog.Error(err))
},
})
err = server.Serve(r.Context(), session)
if err != nil && !xerrors.Is(err, io.EOF) {
api.Logger.Debug(r.Context(), "provisioner daemon disconnected", slog.Error(err))
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("serve: %s", err))
return
}
_ = conn.Close(websocket.StatusGoingAway, "")
}

// ListenProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
// in the same process.
func (c *coderD) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) {
Expand Down
36 changes: 0 additions & 36 deletions codersdk/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@ import (
"database/sql"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"time"

"github.com/google/uuid"
"github.com/hashicorp/yamux"
"golang.org/x/xerrors"
"nhooyr.io/websocket"

"github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionersdk"
)

type LogSource string
Expand Down Expand Up @@ -86,35 +79,6 @@ type ProvisionerJobLog struct {
Output string `json:"output"`
}

// ListenProvisionerDaemon returns the gRPC service for a provisioner daemon implementation.
func (c *Client) ListenProvisionerDaemon(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
serverURL, err := c.URL.Parse("/api/v2/provisionerdaemons/me/listen")
if err != nil {
return nil, xerrors.Errorf("parse url: %w", err)
}
conn, res, err := websocket.Dial(ctx, serverURL.String(), &websocket.DialOptions{
HTTPClient: c.HTTPClient,
// Need to disable compression to avoid a data-race.
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
if res == nil {
return nil, err
}
return nil, readBodyAsError(res)
}
// Align with the frame size of yamux.
conn.SetReadLimit(256 * 1024)

config := yamux.DefaultConfig()
config.LogOutput = io.Discard
session, err := yamux.Client(websocket.NetConn(ctx, conn, websocket.MessageBinary), config)
if err != nil {
return nil, xerrors.Errorf("multiplex client: %w", err)
}
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.Conn(session)), nil
}

// provisionerJobLogsBefore provides log output that occurred before a time.
// This is abstracted from a specific job type to provide consistency between
// APIs. Logs is the only shared route between jobs.
Expand Down