4
4
"context"
5
5
"crypto/tls"
6
6
"crypto/x509"
7
+ "encoding/json"
7
8
"fmt"
8
9
"io"
9
10
"net/http"
@@ -19,10 +20,13 @@ import (
19
20
"github.com/go-chi/chi/v5/middleware"
20
21
"github.com/google/uuid"
21
22
"github.com/klauspost/compress/zstd"
23
+ "github.com/moby/moby/pkg/namesgenerator"
22
24
"github.com/prometheus/client_golang/prometheus"
23
25
"go.opentelemetry.io/otel/trace"
24
26
"golang.org/x/xerrors"
25
27
"google.golang.org/api/idtoken"
28
+ "storj.io/drpc/drpcmux"
29
+ "storj.io/drpc/drpcserver"
26
30
"tailscale.com/derp"
27
31
"tailscale.com/derp/derphttp"
28
32
"tailscale.com/tailcfg"
@@ -33,17 +37,20 @@ import (
33
37
"github.com/coder/coder/coderd/audit"
34
38
"github.com/coder/coder/coderd/awsidentity"
35
39
"github.com/coder/coder/coderd/database"
40
+ "github.com/coder/coder/coderd/database/dbtype"
36
41
"github.com/coder/coder/coderd/gitauth"
37
42
"github.com/coder/coder/coderd/gitsshkey"
38
43
"github.com/coder/coder/coderd/httpapi"
39
44
"github.com/coder/coder/coderd/httpmw"
40
45
"github.com/coder/coder/coderd/metricscache"
46
+ "github.com/coder/coder/coderd/provisionerdserver"
41
47
"github.com/coder/coder/coderd/rbac"
42
48
"github.com/coder/coder/coderd/telemetry"
43
49
"github.com/coder/coder/coderd/tracing"
44
50
"github.com/coder/coder/coderd/wsconncache"
45
51
"github.com/coder/coder/codersdk"
46
52
"github.com/coder/coder/provisionerd/proto"
53
+ "github.com/coder/coder/provisionersdk"
47
54
"github.com/coder/coder/site"
48
55
"github.com/coder/coder/tailnet"
49
56
)
@@ -324,13 +331,6 @@ func New(options *Options) *API {
324
331
r .Get ("/{fileID}" , api .fileByID )
325
332
r .Post ("/" , api .postFile )
326
333
})
327
-
328
- r .Route ("/provisionerdaemons" , func (r chi.Router ) {
329
- r .Use (
330
- apiKeyMiddleware ,
331
- )
332
- r .Get ("/" , api .provisionerDaemons )
333
- })
334
334
r .Route ("/organizations" , func (r chi.Router ) {
335
335
r .Use (
336
336
apiKeyMiddleware ,
@@ -596,18 +596,20 @@ type API struct {
596
596
// RootHandler serves "/"
597
597
RootHandler chi.Router
598
598
599
- metricsCache * metricscache.Cache
600
- siteHandler http.Handler
601
- websocketWaitMutex sync.Mutex
602
- websocketWaitGroup sync.WaitGroup
599
+ metricsCache * metricscache.Cache
600
+ siteHandler http.Handler
601
+
602
+ WebsocketWaitMutex sync.Mutex
603
+ WebsocketWaitGroup sync.WaitGroup
604
+
603
605
workspaceAgentCache * wsconncache.Cache
604
606
}
605
607
606
608
// Close waits for all WebSocket connections to drain before returning.
607
609
func (api * API ) Close () error {
608
- api .websocketWaitMutex .Lock ()
609
- api .websocketWaitGroup .Wait ()
610
- api .websocketWaitMutex .Unlock ()
610
+ api .WebsocketWaitMutex .Lock ()
611
+ api .WebsocketWaitGroup .Wait ()
612
+ api .WebsocketWaitMutex .Unlock ()
611
613
612
614
api .metricsCache .Close ()
613
615
coordinator := api .TailnetCoordinator .Load ()
@@ -636,3 +638,70 @@ func compressHandler(h http.Handler) http.Handler {
636
638
637
639
return cmp .Handler (h )
638
640
}
641
+
642
+ // CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
643
+ // in the same process.
644
+ func (api * API ) CreateInMemoryProvisionerDaemon (ctx context.Context , debounce time.Duration ) (client proto.DRPCProvisionerDaemonClient , err error ) {
645
+ clientSession , serverSession := provisionersdk .TransportPipe ()
646
+ defer func () {
647
+ if err != nil {
648
+ _ = clientSession .Close ()
649
+ _ = serverSession .Close ()
650
+ }
651
+ }()
652
+
653
+ name := namesgenerator .GetRandomName (1 )
654
+ daemon , err := api .Database .InsertProvisionerDaemon (ctx , database.InsertProvisionerDaemonParams {
655
+ ID : uuid .New (),
656
+ CreatedAt : database .Now (),
657
+ Name : name ,
658
+ Provisioners : []database.ProvisionerType {database .ProvisionerTypeEcho , database .ProvisionerTypeTerraform },
659
+ Tags : dbtype.StringMap {
660
+ provisionerdserver .TagScope : provisionerdserver .ScopeOrganization ,
661
+ },
662
+ })
663
+ if err != nil {
664
+ return nil , xerrors .Errorf ("insert provisioner daemon %q: %w" , name , err )
665
+ }
666
+
667
+ tags , err := json .Marshal (daemon .Tags )
668
+ if err != nil {
669
+ return nil , xerrors .Errorf ("marshal tags: %w" , err )
670
+ }
671
+
672
+ mux := drpcmux .New ()
673
+ err = proto .DRPCRegisterProvisionerDaemon (mux , & provisionerdserver.Server {
674
+ AccessURL : api .AccessURL ,
675
+ ID : daemon .ID ,
676
+ Database : api .Database ,
677
+ Pubsub : api .Pubsub ,
678
+ Provisioners : daemon .Provisioners ,
679
+ Telemetry : api .Telemetry ,
680
+ Tags : tags ,
681
+ QuotaCommitter : & api .QuotaCommitter ,
682
+ AcquireJobDebounce : debounce ,
683
+ Logger : api .Logger .Named (fmt .Sprintf ("provisionerd-%s" , daemon .Name )),
684
+ })
685
+ if err != nil {
686
+ return nil , err
687
+ }
688
+ server := drpcserver .NewWithOptions (mux , drpcserver.Options {
689
+ Log : func (err error ) {
690
+ if xerrors .Is (err , io .EOF ) {
691
+ return
692
+ }
693
+ api .Logger .Debug (ctx , "drpc server error" , slog .Error (err ))
694
+ },
695
+ })
696
+ go func () {
697
+ err := server .Serve (ctx , serverSession )
698
+ if err != nil && ! xerrors .Is (err , io .EOF ) {
699
+ api .Logger .Debug (ctx , "provisioner daemon disconnected" , slog .Error (err ))
700
+ }
701
+ // close the sessions so we don't leak goroutines serving them.
702
+ _ = clientSession .Close ()
703
+ _ = serverSession .Close ()
704
+ }()
705
+
706
+ return proto .NewDRPCProvisionerDaemonClient (provisionersdk .Conn (clientSession )), nil
707
+ }
0 commit comments