Skip to content

Commit 9a4e110

Browse files
authored
chore: move drpc transport tools to codersdk/drpc (#11224)
Part of #10532 DRPC transport over yamux and in-mem pipes was previously only used on the provisioner APIs, but now will also be used in tailnet. Moved to subpackage of codersdk to avoid import loops.
1 parent b36071c commit 9a4e110

File tree

12 files changed

+28
-18
lines changed

12 files changed

+28
-18
lines changed

cli/server.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ import (
9090
stringutil "github.com/coder/coder/v2/coderd/util/strings"
9191
"github.com/coder/coder/v2/coderd/workspaceapps"
9292
"github.com/coder/coder/v2/codersdk"
93+
"github.com/coder/coder/v2/codersdk/drpc"
9394
"github.com/coder/coder/v2/cryptorand"
9495
"github.com/coder/coder/v2/provisioner/echo"
9596
"github.com/coder/coder/v2/provisioner/terraform"
@@ -1298,7 +1299,7 @@ func newProvisionerDaemon(
12981299

12991300
connector := provisionerd.LocalProvisioners{}
13001301
if cfg.Provisioner.DaemonsEcho {
1301-
echoClient, echoServer := provisionersdk.MemTransportPipe()
1302+
echoClient, echoServer := drpc.MemTransportPipe()
13021303
wg.Add(1)
13031304
go func() {
13041305
defer wg.Done()
@@ -1332,7 +1333,7 @@ func newProvisionerDaemon(
13321333
}
13331334

13341335
tracer := coderAPI.TracerProvider.Tracer(tracing.TracerName)
1335-
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
1336+
terraformClient, terraformServer := drpc.MemTransportPipe()
13361337
wg.Add(1)
13371338
go func() {
13381339
defer wg.Done()

coderd/coderd.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import (
6666
"github.com/coder/coder/v2/coderd/wsconncache"
6767
"github.com/coder/coder/v2/codersdk"
6868
"github.com/coder/coder/v2/codersdk/agentsdk"
69+
"github.com/coder/coder/v2/codersdk/drpc"
6970
"github.com/coder/coder/v2/provisionerd/proto"
7071
"github.com/coder/coder/v2/provisionersdk"
7172
"github.com/coder/coder/v2/site"
@@ -1159,7 +1160,7 @@ func compressHandler(h http.Handler) http.Handler {
11591160
// Useful when starting coderd and provisionerd in the same process.
11601161
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string) (client proto.DRPCProvisionerDaemonClient, err error) {
11611162
tracer := api.TracerProvider.Tracer(tracing.TracerName)
1162-
clientSession, serverSession := provisionersdk.MemTransportPipe()
1163+
clientSession, serverSession := drpc.MemTransportPipe()
11631164
defer func() {
11641165
if err != nil {
11651166
_ = clientSession.Close()

coderd/coderdtest/coderdtest.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ import (
7373
"github.com/coder/coder/v2/coderd/workspaceapps"
7474
"github.com/coder/coder/v2/codersdk"
7575
"github.com/coder/coder/v2/codersdk/agentsdk"
76+
"github.com/coder/coder/v2/codersdk/drpc"
7677
"github.com/coder/coder/v2/cryptorand"
7778
"github.com/coder/coder/v2/provisioner/echo"
7879
"github.com/coder/coder/v2/provisionerd"
@@ -512,7 +513,7 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
512513
// seems t.TempDir() is not safe to call from a different goroutine
513514
workDir := t.TempDir()
514515

515-
echoClient, echoServer := provisionersdk.MemTransportPipe()
516+
echoClient, echoServer := drpc.MemTransportPipe()
516517
ctx, cancelFunc := context.WithCancel(context.Background())
517518
t.Cleanup(func() {
518519
_ = echoClient.Close()
@@ -547,7 +548,7 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
547548
}
548549

549550
func NewExternalProvisionerDaemon(t testing.TB, client *codersdk.Client, org uuid.UUID, tags map[string]string) io.Closer {
550-
echoClient, echoServer := provisionersdk.MemTransportPipe()
551+
echoClient, echoServer := drpc.MemTransportPipe()
551552
ctx, cancelFunc := context.WithCancel(context.Background())
552553
serveDone := make(chan struct{})
553554
t.Cleanup(func() {

coderd/provisionerdserver/provisionerdserver.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/coder/coder/v2/coderd/telemetry"
3838
"github.com/coder/coder/v2/coderd/tracing"
3939
"github.com/coder/coder/v2/codersdk"
40+
"github.com/coder/coder/v2/codersdk/drpc"
4041
"github.com/coder/coder/v2/provisioner"
4142
"github.com/coder/coder/v2/provisionerd/proto"
4243
"github.com/coder/coder/v2/provisionersdk"
@@ -542,8 +543,8 @@ func (s *server) acquireProtoJob(ctx context.Context, job database.ProvisionerJo
542543
default:
543544
return nil, failJob(fmt.Sprintf("unsupported storage method: %s", job.StorageMethod))
544545
}
545-
if protobuf.Size(protoJob) > provisionersdk.MaxMessageSize {
546-
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), provisionersdk.MaxMessageSize))
546+
if protobuf.Size(protoJob) > drpc.MaxMessageSize {
547+
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), drpc.MaxMessageSize))
547548
}
548549

549550
return protoJob, err

provisionersdk/transport.go renamed to codersdk/drpc/transport.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package provisionersdk
1+
package drpc
22

33
import (
44
"context"

codersdk/provisionerdaemons.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import (
1515
"golang.org/x/xerrors"
1616
"nhooyr.io/websocket"
1717

18+
"github.com/coder/coder/v2/codersdk/drpc"
1819
"github.com/coder/coder/v2/provisionerd/proto"
1920
"github.com/coder/coder/v2/provisionerd/runner"
20-
"github.com/coder/coder/v2/provisionersdk"
2121
)
2222

2323
type LogSource string
@@ -252,7 +252,7 @@ func (c *Client) ServeProvisionerDaemon(ctx context.Context, req ServeProvisione
252252
_ = wsNetConn.Close()
253253
return nil, xerrors.Errorf("multiplex client: %w", err)
254254
}
255-
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.MultiplexedConn(session)), nil
255+
return proto.NewDRPCProvisionerDaemonClient(drpc.MultiplexedConn(session)), nil
256256
}
257257

258258
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func

enterprise/cli/provisionerdaemons.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/coder/coder/v2/cli/cliutil"
2121
"github.com/coder/coder/v2/coderd/database"
2222
"github.com/coder/coder/v2/codersdk"
23+
"github.com/coder/coder/v2/codersdk/drpc"
2324
"github.com/coder/coder/v2/provisioner/terraform"
2425
"github.com/coder/coder/v2/provisionerd"
2526
provisionerdproto "github.com/coder/coder/v2/provisionerd/proto"
@@ -115,7 +116,7 @@ func (r *RootCmd) provisionerDaemonStart() *clibase.Cmd {
115116
return err
116117
}
117118

118-
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
119+
terraformClient, terraformServer := drpc.MemTransportPipe()
119120
go func() {
120121
<-ctx.Done()
121122
_ = terraformClient.Close()

enterprise/coderd/provisionerdaemons_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/coder/coder/v2/coderd/rbac"
1717
"github.com/coder/coder/v2/coderd/util/ptr"
1818
"github.com/coder/coder/v2/codersdk"
19+
"github.com/coder/coder/v2/codersdk/drpc"
1920
"github.com/coder/coder/v2/enterprise/coderd/coderdenttest"
2021
"github.com/coder/coder/v2/enterprise/coderd/license"
2122
"github.com/coder/coder/v2/provisioner/echo"
@@ -228,7 +229,7 @@ func TestProvisionerDaemonServe(t *testing.T) {
228229
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
229230
defer cancel()
230231

231-
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
232+
terraformClient, terraformServer := drpc.MemTransportPipe()
232233
go func() {
233234
<-ctx.Done()
234235
_ = terraformClient.Close()

provisioner/echo/serve_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/stretchr/testify/assert"
88
"github.com/stretchr/testify/require"
99

10+
"github.com/coder/coder/v2/codersdk/drpc"
1011
"github.com/coder/coder/v2/provisioner/echo"
1112
"github.com/coder/coder/v2/provisionersdk"
1213
"github.com/coder/coder/v2/provisionersdk/proto"
@@ -19,7 +20,7 @@ func TestEcho(t *testing.T) {
1920
workdir := t.TempDir()
2021

2122
// Create an in-memory provisioner to communicate with.
22-
client, server := provisionersdk.MemTransportPipe()
23+
client, server := drpc.MemTransportPipe()
2324
ctx, cancelFunc := context.WithCancel(context.Background())
2425
t.Cleanup(func() {
2526
_ = client.Close()

provisioner/terraform/provision_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"cdr.dev/slog"
2424
"cdr.dev/slog/sloggers/slogtest"
25+
"github.com/coder/coder/v2/codersdk/drpc"
2526
"github.com/coder/coder/v2/provisioner/terraform"
2627
"github.com/coder/coder/v2/provisionersdk"
2728
"github.com/coder/coder/v2/provisionersdk/proto"
@@ -38,7 +39,7 @@ func setupProvisioner(t *testing.T, opts *provisionerServeOptions) (context.Cont
3839
}
3940
cachePath := t.TempDir()
4041
workDir := t.TempDir()
41-
client, server := provisionersdk.MemTransportPipe()
42+
client, server := drpc.MemTransportPipe()
4243
ctx, cancelFunc := context.WithCancel(context.Background())
4344
serverErr := make(chan error, 1)
4445
t.Cleanup(func() {

provisionerd/provisionerd_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"cdr.dev/slog"
2525
"cdr.dev/slog/sloggers/slogtest"
26+
"github.com/coder/coder/v2/codersdk/drpc"
2627
"github.com/coder/coder/v2/provisionerd"
2728
"github.com/coder/coder/v2/provisionerd/proto"
2829
"github.com/coder/coder/v2/provisionersdk"
@@ -1093,7 +1094,7 @@ func createProvisionerDaemonClient(t *testing.T, done <-chan struct{}, server pr
10931094
return &proto.Empty{}, nil
10941095
}
10951096
}
1096-
clientPipe, serverPipe := provisionersdk.MemTransportPipe()
1097+
clientPipe, serverPipe := drpc.MemTransportPipe()
10971098
t.Cleanup(func() {
10981099
_ = clientPipe.Close()
10991100
_ = serverPipe.Close()
@@ -1129,7 +1130,7 @@ func createProvisionerDaemonClient(t *testing.T, done <-chan struct{}, server pr
11291130
// to the server implementation provided.
11301131
func createProvisionerClient(t *testing.T, done <-chan struct{}, server provisionerTestServer) sdkproto.DRPCProvisionerClient {
11311132
t.Helper()
1132-
clientPipe, serverPipe := provisionersdk.MemTransportPipe()
1133+
clientPipe, serverPipe := drpc.MemTransportPipe()
11331134
t.Cleanup(func() {
11341135
_ = clientPipe.Close()
11351136
_ = serverPipe.Close()

provisionersdk/serve_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"go.uber.org/goleak"
1111
"storj.io/drpc/drpcconn"
1212

13+
"github.com/coder/coder/v2/codersdk/drpc"
1314
"github.com/coder/coder/v2/provisionersdk"
1415
"github.com/coder/coder/v2/provisionersdk/proto"
1516
"github.com/coder/coder/v2/testutil"
@@ -23,7 +24,7 @@ func TestProvisionerSDK(t *testing.T) {
2324
t.Parallel()
2425
t.Run("ServeListener", func(t *testing.T) {
2526
t.Parallel()
26-
client, server := provisionersdk.MemTransportPipe()
27+
client, server := drpc.MemTransportPipe()
2728
defer client.Close()
2829
defer server.Close()
2930

@@ -65,7 +66,7 @@ func TestProvisionerSDK(t *testing.T) {
6566

6667
t.Run("ServeClosedPipe", func(t *testing.T) {
6768
t.Parallel()
68-
client, server := provisionersdk.MemTransportPipe()
69+
client, server := drpc.MemTransportPipe()
6970
_ = client.Close()
7071
_ = server.Close()
7172

0 commit comments

Comments
 (0)