Skip to content

chore: move drpc transport tools to codersdk/drpc #11224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ import (
stringutil "github.com/coder/coder/v2/coderd/util/strings"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/provisioner/echo"
"github.com/coder/coder/v2/provisioner/terraform"
Expand Down Expand Up @@ -1298,7 +1299,7 @@ func newProvisionerDaemon(

connector := provisionerd.LocalProvisioners{}
if cfg.Provisioner.DaemonsEcho {
echoClient, echoServer := provisionersdk.MemTransportPipe()
echoClient, echoServer := drpc.MemTransportPipe()
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -1332,7 +1333,7 @@ func newProvisionerDaemon(
}

tracer := coderAPI.TracerProvider.Tracer(tracing.TracerName)
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
terraformClient, terraformServer := drpc.MemTransportPipe()
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
3 changes: 2 additions & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/coder/coder/v2/coderd/wsconncache"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/site"
Expand Down Expand Up @@ -1159,7 +1160,7 @@ func compressHandler(h http.Handler) http.Handler {
// Useful when starting coderd and provisionerd in the same process.
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string) (client proto.DRPCProvisionerDaemonClient, err error) {
tracer := api.TracerProvider.Tracer(tracing.TracerName)
clientSession, serverSession := provisionersdk.MemTransportPipe()
clientSession, serverSession := drpc.MemTransportPipe()
defer func() {
if err != nil {
_ = clientSession.Close()
Expand Down
5 changes: 3 additions & 2 deletions coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/provisioner/echo"
"github.com/coder/coder/v2/provisionerd"
Expand Down Expand Up @@ -512,7 +513,7 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
// seems t.TempDir() is not safe to call from a different goroutine
workDir := t.TempDir()

echoClient, echoServer := provisionersdk.MemTransportPipe()
echoClient, echoServer := drpc.MemTransportPipe()
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(func() {
_ = echoClient.Close()
Expand Down Expand Up @@ -547,7 +548,7 @@ func NewProvisionerDaemon(t testing.TB, coderAPI *coderd.API) io.Closer {
}

func NewExternalProvisionerDaemon(t testing.TB, client *codersdk.Client, org uuid.UUID, tags map[string]string) io.Closer {
echoClient, echoServer := provisionersdk.MemTransportPipe()
echoClient, echoServer := drpc.MemTransportPipe()
ctx, cancelFunc := context.WithCancel(context.Background())
serveDone := make(chan struct{})
t.Cleanup(func() {
Expand Down
5 changes: 3 additions & 2 deletions coderd/provisionerdserver/provisionerdserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/coder/coder/v2/coderd/telemetry"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisioner"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionersdk"
Expand Down Expand Up @@ -542,8 +543,8 @@ func (s *server) acquireProtoJob(ctx context.Context, job database.ProvisionerJo
default:
return nil, failJob(fmt.Sprintf("unsupported storage method: %s", job.StorageMethod))
}
if protobuf.Size(protoJob) > provisionersdk.MaxMessageSize {
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), provisionersdk.MaxMessageSize))
if protobuf.Size(protoJob) > drpc.MaxMessageSize {
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), drpc.MaxMessageSize))
}

return protoJob, err
Expand Down
2 changes: 1 addition & 1 deletion provisionersdk/transport.go → codersdk/drpc/transport.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package provisionersdk
package drpc

import (
"context"
Expand Down
4 changes: 2 additions & 2 deletions codersdk/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"golang.org/x/xerrors"
"nhooyr.io/websocket"

"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionerd/runner"
"github.com/coder/coder/v2/provisionersdk"
)

type LogSource string
Expand Down Expand Up @@ -252,7 +252,7 @@ func (c *Client) ServeProvisionerDaemon(ctx context.Context, req ServeProvisione
_ = wsNetConn.Close()
return nil, xerrors.Errorf("multiplex client: %w", err)
}
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.MultiplexedConn(session)), nil
return proto.NewDRPCProvisionerDaemonClient(drpc.MultiplexedConn(session)), nil
}

// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
Expand Down
3 changes: 2 additions & 1 deletion enterprise/cli/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/coder/coder/v2/cli/cliutil"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisioner/terraform"
"github.com/coder/coder/v2/provisionerd"
provisionerdproto "github.com/coder/coder/v2/provisionerd/proto"
Expand Down Expand Up @@ -115,7 +116,7 @@ func (r *RootCmd) provisionerDaemonStart() *clibase.Cmd {
return err
}

terraformClient, terraformServer := provisionersdk.MemTransportPipe()
terraformClient, terraformServer := drpc.MemTransportPipe()
go func() {
<-ctx.Done()
_ = terraformClient.Close()
Expand Down
3 changes: 2 additions & 1 deletion enterprise/coderd/provisionerdaemons_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/enterprise/coderd/coderdenttest"
"github.com/coder/coder/v2/enterprise/coderd/license"
"github.com/coder/coder/v2/provisioner/echo"
Expand Down Expand Up @@ -228,7 +229,7 @@ func TestProvisionerDaemonServe(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

terraformClient, terraformServer := provisionersdk.MemTransportPipe()
terraformClient, terraformServer := drpc.MemTransportPipe()
go func() {
<-ctx.Done()
_ = terraformClient.Close()
Expand Down
3 changes: 2 additions & 1 deletion provisioner/echo/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisioner/echo"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/provisionersdk/proto"
Expand All @@ -19,7 +20,7 @@ func TestEcho(t *testing.T) {
workdir := t.TempDir()

// Create an in-memory provisioner to communicate with.
client, server := provisionersdk.MemTransportPipe()
client, server := drpc.MemTransportPipe()
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(func() {
_ = client.Close()
Expand Down
3 changes: 2 additions & 1 deletion provisioner/terraform/provision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisioner/terraform"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/provisionersdk/proto"
Expand All @@ -38,7 +39,7 @@ func setupProvisioner(t *testing.T, opts *provisionerServeOptions) (context.Cont
}
cachePath := t.TempDir()
workDir := t.TempDir()
client, server := provisionersdk.MemTransportPipe()
client, server := drpc.MemTransportPipe()
ctx, cancelFunc := context.WithCancel(context.Background())
serverErr := make(chan error, 1)
t.Cleanup(func() {
Expand Down
5 changes: 3 additions & 2 deletions provisionerd/provisionerd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisionerd"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionersdk"
Expand Down Expand Up @@ -1093,7 +1094,7 @@ func createProvisionerDaemonClient(t *testing.T, done <-chan struct{}, server pr
return &proto.Empty{}, nil
}
}
clientPipe, serverPipe := provisionersdk.MemTransportPipe()
clientPipe, serverPipe := drpc.MemTransportPipe()
t.Cleanup(func() {
_ = clientPipe.Close()
_ = serverPipe.Close()
Expand Down Expand Up @@ -1129,7 +1130,7 @@ func createProvisionerDaemonClient(t *testing.T, done <-chan struct{}, server pr
// to the server implementation provided.
func createProvisionerClient(t *testing.T, done <-chan struct{}, server provisionerTestServer) sdkproto.DRPCProvisionerClient {
t.Helper()
clientPipe, serverPipe := provisionersdk.MemTransportPipe()
clientPipe, serverPipe := drpc.MemTransportPipe()
t.Cleanup(func() {
_ = clientPipe.Close()
_ = serverPipe.Close()
Expand Down
5 changes: 3 additions & 2 deletions provisionersdk/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.uber.org/goleak"
"storj.io/drpc/drpcconn"

"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/coder/v2/testutil"
Expand All @@ -23,7 +24,7 @@ func TestProvisionerSDK(t *testing.T) {
t.Parallel()
t.Run("ServeListener", func(t *testing.T) {
t.Parallel()
client, server := provisionersdk.MemTransportPipe()
client, server := drpc.MemTransportPipe()
defer client.Close()
defer server.Close()

Expand Down Expand Up @@ -65,7 +66,7 @@ func TestProvisionerSDK(t *testing.T) {

t.Run("ServeClosedPipe", func(t *testing.T) {
t.Parallel()
client, server := provisionersdk.MemTransportPipe()
client, server := drpc.MemTransportPipe()
_ = client.Close()
_ = server.Close()

Expand Down