@@ -13,10 +13,12 @@ import (
13
13
"time"
14
14
15
15
"github.com/google/uuid"
16
+ "github.com/hashicorp/yamux"
16
17
"github.com/moby/moby/pkg/namesgenerator"
17
18
"github.com/tabbed/pqtype"
18
19
"golang.org/x/xerrors"
19
20
protobuf "google.golang.org/protobuf/proto"
21
+ "nhooyr.io/websocket"
20
22
"storj.io/drpc/drpcmux"
21
23
"storj.io/drpc/drpcserver"
22
24
@@ -62,6 +64,78 @@ func (api *API) provisionerDaemons(rw http.ResponseWriter, r *http.Request) {
62
64
httpapi .Write (rw , http .StatusOK , daemons )
63
65
}
64
66
67
+ // Serves the provisioner daemon protobuf API over a WebSocket.
68
+ func (api * API ) provisionerDaemonsListen (rw http.ResponseWriter , r * http.Request ) {
69
+ api .websocketWaitMutex .Lock ()
70
+ api .websocketWaitGroup .Add (1 )
71
+ api .websocketWaitMutex .Unlock ()
72
+ defer api .websocketWaitGroup .Done ()
73
+
74
+ conn , err := websocket .Accept (rw , r , & websocket.AcceptOptions {
75
+ // Need to disable compression to avoid a data-race.
76
+ CompressionMode : websocket .CompressionDisabled ,
77
+ })
78
+ if err != nil {
79
+ httpapi .Write (rw , http .StatusBadRequest , codersdk.Response {
80
+ Message : "Internal error accepting websocket connection." ,
81
+ Detail : err .Error (),
82
+ })
83
+ return
84
+ }
85
+ // Align with the frame size of yamux.
86
+ conn .SetReadLimit (256 * 1024 )
87
+
88
+ daemon , err := api .Database .InsertProvisionerDaemon (r .Context (), database.InsertProvisionerDaemonParams {
89
+ ID : uuid .New (),
90
+ CreatedAt : database .Now (),
91
+ Name : namesgenerator .GetRandomName (1 ),
92
+ Provisioners : []database.ProvisionerType {database .ProvisionerTypeEcho , database .ProvisionerTypeTerraform },
93
+ })
94
+ if err != nil {
95
+ _ = conn .Close (websocket .StatusInternalError , httpapi .WebsocketCloseSprintf ("insert provisioner daemon: %s" , err ))
96
+ return
97
+ }
98
+
99
+ // Multiplexes the incoming connection using yamux.
100
+ // This allows multiple function calls to occur over
101
+ // the same connection.
102
+ config := yamux .DefaultConfig ()
103
+ config .LogOutput = io .Discard
104
+ session , err := yamux .Server (websocket .NetConn (r .Context (), conn , websocket .MessageBinary ), config )
105
+ if err != nil {
106
+ _ = conn .Close (websocket .StatusInternalError , httpapi .WebsocketCloseSprintf ("multiplex server: %s" , err ))
107
+ return
108
+ }
109
+ mux := drpcmux .New ()
110
+ err = proto .DRPCRegisterProvisionerDaemon (mux , & provisionerdServer {
111
+ AccessURL : api .AccessURL ,
112
+ ID : daemon .ID ,
113
+ Database : api .Database ,
114
+ Pubsub : api .Pubsub ,
115
+ Provisioners : daemon .Provisioners ,
116
+ Logger : api .Logger .Named (fmt .Sprintf ("provisionerd-%s" , daemon .Name )),
117
+ })
118
+ if err != nil {
119
+ _ = conn .Close (websocket .StatusInternalError , httpapi .WebsocketCloseSprintf ("drpc register provisioner daemon: %s" , err ))
120
+ return
121
+ }
122
+ server := drpcserver .NewWithOptions (mux , drpcserver.Options {
123
+ Log : func (err error ) {
124
+ if xerrors .Is (err , io .EOF ) {
125
+ return
126
+ }
127
+ api .Logger .Debug (r .Context (), "drpc server error" , slog .Error (err ))
128
+ },
129
+ })
130
+ err = server .Serve (r .Context (), session )
131
+ if err != nil && ! xerrors .Is (err , io .EOF ) {
132
+ api .Logger .Debug (r .Context (), "provisioner daemon disconnected" , slog .Error (err ))
133
+ _ = conn .Close (websocket .StatusInternalError , httpapi .WebsocketCloseSprintf ("serve: %s" , err ))
134
+ return
135
+ }
136
+ _ = conn .Close (websocket .StatusGoingAway , "" )
137
+ }
138
+
65
139
// ListenProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
66
140
// in the same process.
67
141
func (api * API ) ListenProvisionerDaemon (ctx context.Context ) (client proto.DRPCProvisionerDaemonClient , err error ) {
0 commit comments