Skip to content

Commit 97052e0

Browse files
committed
Revert "feature: disable provisionerd listen endpoint (#1612)"
This reverts commit a03615a.
1 parent c432979 commit 97052e0

File tree

4 files changed

+114
-0
lines changed

4 files changed

+114
-0
lines changed

coderd/coderd.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ func New(options *Options) *API {
182182
apiKeyMiddleware,
183183
)
184184
r.Get("/", api.provisionerDaemons)
185+
r.Route("/me", func(r chi.Router) {
186+
r.Get("/listen", api.provisionerDaemonsListen)
187+
})
185188
})
186189
r.Route("/organizations", func(r chi.Router) {
187190
r.Use(

coderd/coderd_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,9 @@ func TestAuthorizeAllEndpoints(t *testing.T) {
468468
"PUT:/api/v2/organizations/{organization}/members/{user}/roles": {NoAuthorize: true},
469469
"POST:/api/v2/workspaces/{workspace}/builds": {StatusCode: http.StatusBadRequest, NoAuthorize: true},
470470
"POST:/api/v2/organizations/{organization}/templateversions": {StatusCode: http.StatusBadRequest, NoAuthorize: true},
471+
472+
// TODO needs authorization
473+
"GET:/api/v2/provisionerdaemons/me/listen": {NoAuthorize: true},
471474
}
472475

473476
for k, v := range assertRoute {

coderd/provisionerdaemons.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ import (
1313
"time"
1414

1515
"github.com/google/uuid"
16+
"github.com/hashicorp/yamux"
1617
"github.com/moby/moby/pkg/namesgenerator"
1718
"github.com/tabbed/pqtype"
1819
"golang.org/x/xerrors"
1920
protobuf "google.golang.org/protobuf/proto"
21+
"nhooyr.io/websocket"
2022
"storj.io/drpc/drpcmux"
2123
"storj.io/drpc/drpcserver"
2224

@@ -62,6 +64,78 @@ func (api *API) provisionerDaemons(rw http.ResponseWriter, r *http.Request) {
6264
httpapi.Write(rw, http.StatusOK, daemons)
6365
}
6466

67+
// Serves the provisioner daemon protobuf API over a WebSocket.
68+
func (api *API) provisionerDaemonsListen(rw http.ResponseWriter, r *http.Request) {
69+
api.websocketWaitMutex.Lock()
70+
api.websocketWaitGroup.Add(1)
71+
api.websocketWaitMutex.Unlock()
72+
defer api.websocketWaitGroup.Done()
73+
74+
conn, err := websocket.Accept(rw, r, &websocket.AcceptOptions{
75+
// Need to disable compression to avoid a data-race.
76+
CompressionMode: websocket.CompressionDisabled,
77+
})
78+
if err != nil {
79+
httpapi.Write(rw, http.StatusBadRequest, codersdk.Response{
80+
Message: "Internal error accepting websocket connection.",
81+
Detail: err.Error(),
82+
})
83+
return
84+
}
85+
// Align with the frame size of yamux.
86+
conn.SetReadLimit(256 * 1024)
87+
88+
daemon, err := api.Database.InsertProvisionerDaemon(r.Context(), database.InsertProvisionerDaemonParams{
89+
ID: uuid.New(),
90+
CreatedAt: database.Now(),
91+
Name: namesgenerator.GetRandomName(1),
92+
Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho, database.ProvisionerTypeTerraform},
93+
})
94+
if err != nil {
95+
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("insert provisioner daemon: %s", err))
96+
return
97+
}
98+
99+
// Multiplexes the incoming connection using yamux.
100+
// This allows multiple function calls to occur over
101+
// the same connection.
102+
config := yamux.DefaultConfig()
103+
config.LogOutput = io.Discard
104+
session, err := yamux.Server(websocket.NetConn(r.Context(), conn, websocket.MessageBinary), config)
105+
if err != nil {
106+
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("multiplex server: %s", err))
107+
return
108+
}
109+
mux := drpcmux.New()
110+
err = proto.DRPCRegisterProvisionerDaemon(mux, &provisionerdServer{
111+
AccessURL: api.AccessURL,
112+
ID: daemon.ID,
113+
Database: api.Database,
114+
Pubsub: api.Pubsub,
115+
Provisioners: daemon.Provisioners,
116+
Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)),
117+
})
118+
if err != nil {
119+
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("drpc register provisioner daemon: %s", err))
120+
return
121+
}
122+
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
123+
Log: func(err error) {
124+
if xerrors.Is(err, io.EOF) {
125+
return
126+
}
127+
api.Logger.Debug(r.Context(), "drpc server error", slog.Error(err))
128+
},
129+
})
130+
err = server.Serve(r.Context(), session)
131+
if err != nil && !xerrors.Is(err, io.EOF) {
132+
api.Logger.Debug(r.Context(), "provisioner daemon disconnected", slog.Error(err))
133+
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("serve: %s", err))
134+
return
135+
}
136+
_ = conn.Close(websocket.StatusGoingAway, "")
137+
}
138+
65139
// ListenProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
66140
// in the same process.
67141
func (api *API) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) {

codersdk/provisionerdaemons.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,20 @@ import (
55
"database/sql"
66
"encoding/json"
77
"fmt"
8+
"io"
89
"net/http"
910
"net/http/cookiejar"
1011
"net/url"
1112
"strconv"
1213
"time"
1314

1415
"github.com/google/uuid"
16+
"github.com/hashicorp/yamux"
1517
"golang.org/x/xerrors"
1618
"nhooyr.io/websocket"
19+
20+
"github.com/coder/coder/provisionerd/proto"
21+
"github.com/coder/coder/provisionersdk"
1722
)
1823

1924
type LogSource string
@@ -82,6 +87,35 @@ type ProvisionerJobLog struct {
8287
Output string `json:"output"`
8388
}
8489

90+
// ListenProvisionerDaemon returns the gRPC service for a provisioner daemon implementation.
91+
func (c *Client) ListenProvisionerDaemon(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
92+
serverURL, err := c.URL.Parse("/api/v2/provisionerdaemons/me/listen")
93+
if err != nil {
94+
return nil, xerrors.Errorf("parse url: %w", err)
95+
}
96+
conn, res, err := websocket.Dial(ctx, serverURL.String(), &websocket.DialOptions{
97+
HTTPClient: c.HTTPClient,
98+
// Need to disable compression to avoid a data-race.
99+
CompressionMode: websocket.CompressionDisabled,
100+
})
101+
if err != nil {
102+
if res == nil {
103+
return nil, err
104+
}
105+
return nil, readBodyAsError(res)
106+
}
107+
// Align with the frame size of yamux.
108+
conn.SetReadLimit(256 * 1024)
109+
110+
config := yamux.DefaultConfig()
111+
config.LogOutput = io.Discard
112+
session, err := yamux.Client(websocket.NetConn(ctx, conn, websocket.MessageBinary), config)
113+
if err != nil {
114+
return nil, xerrors.Errorf("multiplex client: %w", err)
115+
}
116+
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.Conn(session)), nil
117+
}
118+
85119
// provisionerJobLogsBefore provides log output that occurred before a time.
86120
// This is abstracted from a specific job type to provide consistency between
87121
// APIs. Logs is the only shared route between jobs.

0 commit comments

Comments
 (0)