From b89ff23d1fbcf036b722022c37c5a61b3ad4b5b3 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 13 Aug 2024 17:02:43 +0000 Subject: [PATCH 01/22] fix: use authenticated urls for pubsub --- coderd/database/awsiamrds/awsiamrds.go | 47 ++++++++++++++++++++++++++ coderd/database/connector.go | 17 ++++++++++ coderd/database/pubsub/pubsub.go | 27 ++++++++++++++- go.mod | 5 +++ go.sum | 4 +-- 5 files changed, 97 insertions(+), 3 deletions(-) create mode 100644 coderd/database/connector.go diff --git a/coderd/database/awsiamrds/awsiamrds.go b/coderd/database/awsiamrds/awsiamrds.go index 1d4ded8ac2ea2..b6542810ceb36 100644 --- a/coderd/database/awsiamrds/awsiamrds.go +++ b/coderd/database/awsiamrds/awsiamrds.go @@ -10,7 +10,10 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/rds/auth" + "github.com/lib/pq" "golang.org/x/xerrors" + + "github.com/coder/coder/v2/coderd/database" ) type awsIamRdsDriver struct { @@ -19,6 +22,7 @@ type awsIamRdsDriver struct { } var _ driver.Driver = &awsIamRdsDriver{} +var _ database.ConnectorCreator = &awsIamRdsDriver{} // Register initializes and registers our aws iam rds wrapped database driver. func Register(ctx context.Context, parentName string) (string, error) { @@ -65,6 +69,15 @@ func (d *awsIamRdsDriver) Open(name string) (driver.Conn, error) { return conn, nil } +func (d *awsIamRdsDriver) Connector(name string) (driver.Connector, error) { + connector := &connector{ + url: name, + cfg: d.cfg, + } + + return connector, nil +} + func getAuthenticatedURL(cfg aws.Config, dbURL string) (string, error) { nURL, err := url.Parse(dbURL) if err != nil { @@ -82,3 +95,37 @@ func getAuthenticatedURL(cfg aws.Config, dbURL string) (string, error) { return nURL.String(), nil } + +type connector struct { + url string + cfg aws.Config + dialer pq.Dialer +} + +var _ database.DialerConnector = &connector{} + +func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { + nURL, err := getAuthenticatedURL(c.cfg, c.url) + if err != nil { + return nil, xerrors.Errorf("assigning authentication token to url: %w", err) + } + + nc, err := pq.NewConnector(nURL) + if err != nil { + return nil, xerrors.Errorf("creating new connector: %w", err) + } + + if c.dialer != nil { + nc.Dialer(c.dialer) + } + + return nc.Connect(ctx) +} + +func (c *connector) Driver() driver.Driver { + return &pq.Driver{} +} + +func (c *connector) Dialer(dialer pq.Dialer) { + c.dialer = dialer +} diff --git a/coderd/database/connector.go b/coderd/database/connector.go new file mode 100644 index 0000000000000..3a4d30d1239fa --- /dev/null +++ b/coderd/database/connector.go @@ -0,0 +1,17 @@ +package database + +import ( + "context" + "database/sql/driver" + + "github.com/lib/pq" +) + +type ConnectorCreator interface { + Connector(name string) (driver.Connector, error) +} + +type DialerConnector interface { + Connect(context.Context) (driver.Conn, error) + Dialer(dialer pq.Dialer) +} diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index c391a7c3eaf66..3680d0a26e678 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "database/sql" + "database/sql/driver" "errors" "io" "net" @@ -15,6 +16,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "golang.org/x/xerrors" + "github.com/coder/coder/v2/coderd/database" + "cdr.dev/slog" ) @@ -432,9 +435,31 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { // pq.defaultDialer uses a zero net.Dialer as well. d: net.Dialer{}, } + connector driver.Connector + err error ) + + // Create a custom connector if the database driver supports it. + connectorCreator, ok := p.db.Driver().(database.ConnectorCreator) + if !ok { + connector, err = pq.NewConnector(connectURL) + if err != nil { + return xerrors.Errorf("create pq connector: %w", err) + } + } else { + connector, err = connectorCreator.Connector(connectURL) + if err != nil { + return xerrors.Errorf("create custom connector: %w", err) + } + } + + // Set the dialer if the connector supports it. + if dc, ok := connector.(database.DialerConnector); ok { + dc.Dialer(dialer) + } + p.pgListener = pqListenerShim{ - Listener: pq.NewDialListener(dialer, connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { + Listener: pq.NewConnectorListener(connector, connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { switch t { case pq.ListenerEventConnected: p.logger.Info(ctx, "pubsub connected to postgres") diff --git a/go.mod b/go.mod index 9b2dfcd4f4b46..a71aeacc17cab 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,11 @@ replace github.com/imulab/go-scim/pkg/v2 => github.com/coder/go-scim/pkg/v2 v2.0 // Fixes https://github.com/coder/coder/issues/6685 replace github.com/pkg/sftp => github.com/mafredri/sftp v1.13.6-0.20231212144145-8218e927edb0 +// Adds support for a new Listener from a driver.Connector +// This lets us use rotating authentication tokens for passwords in connection strings +// which we use in the awsiamrds package. +replace github.com/lib/pq => github.com/coder/pq v1.10.5-0.20240813145306-1ce661cfa68d + require ( cdr.dev/slog v1.6.2-0.20240126064726-20367d4aede6 cloud.google.com/go/compute/metadata v0.5.0 diff --git a/go.sum b/go.sum index ce4034a7da5a5..3027257a7bb61 100644 --- a/go.sum +++ b/go.sum @@ -215,6 +215,8 @@ github.com/coder/go-httpstat v0.0.0-20230801153223-321c88088322 h1:m0lPZjlQ7vdVp github.com/coder/go-httpstat v0.0.0-20230801153223-321c88088322/go.mod h1:rOLFDDVKVFiDqZFXoteXc97YXx7kFi9kYqR+2ETPkLQ= github.com/coder/go-scim/pkg/v2 v2.0.0-20230221055123-1d63c1222136 h1:0RgB61LcNs24WOxc3PBvygSNTQurm0PYPujJjLLOzs0= github.com/coder/go-scim/pkg/v2 v2.0.0-20230221055123-1d63c1222136/go.mod h1:VkD1P761nykiq75dz+4iFqIQIZka189tx1BQLOp0Skc= +github.com/coder/pq v1.10.5-0.20240813145306-1ce661cfa68d h1:pv+JacyCHoHAr2kh6HltHdFlWqVeWHCvaQDqra5Aff4= +github.com/coder/pq v1.10.5-0.20240813145306-1ce661cfa68d/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0 h1:3A0ES21Ke+FxEM8CXx9n47SZOKOpgSE1bbJzlE4qPVs= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0/go.mod h1:5UuS2Ts+nTToAMeOjNlnHFkPahrtDkmpydBen/3wgZc= github.com/coder/quartz v0.1.0 h1:cLL+0g5l7xTf6ordRnUMMiZtRE8Sq5LxpghS63vEXrQ= @@ -670,8 +672,6 @@ github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1 github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/mafredri/sftp v1.13.6-0.20231212144145-8218e927edb0 h1:lG2o/EWMEOlV/RfQrf3zYfQStjnUj0Mg2gmbcBcoxFI= From 88d50b85e937bbfa777b5660cddd00b6dd87d0b8 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 13 Aug 2024 17:08:10 +0000 Subject: [PATCH 02/22] comments --- coderd/database/awsiamrds/awsiamrds.go | 3 ++- coderd/database/connector.go | 2 ++ coderd/database/pubsub/pubsub.go | 11 ++++++----- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/coderd/database/awsiamrds/awsiamrds.go b/coderd/database/awsiamrds/awsiamrds.go index b6542810ceb36..eb9a29dcc6a87 100644 --- a/coderd/database/awsiamrds/awsiamrds.go +++ b/coderd/database/awsiamrds/awsiamrds.go @@ -69,6 +69,7 @@ func (d *awsIamRdsDriver) Open(name string) (driver.Conn, error) { return conn, nil } +// Connector returns a driver.Connector that fetches a new authentication token for each connection. func (d *awsIamRdsDriver) Connector(name string) (driver.Connector, error) { connector := &connector{ url: name, @@ -122,7 +123,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { return nc.Connect(ctx) } -func (c *connector) Driver() driver.Driver { +func (*connector) Driver() driver.Driver { return &pq.Driver{} } diff --git a/coderd/database/connector.go b/coderd/database/connector.go index 3a4d30d1239fa..a022ea033dade 100644 --- a/coderd/database/connector.go +++ b/coderd/database/connector.go @@ -7,10 +7,12 @@ import ( "github.com/lib/pq" ) +// ConnectorCreator can create a driver.Connector. type ConnectorCreator interface { Connector(name string) (driver.Connector, error) } +// DialerConnector can create a driver.Connector and set a pq.Dialer. type DialerConnector interface { Connect(context.Context) (driver.Conn, error) Dialer(dialer pq.Dialer) diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index 3680d0a26e678..74bd46b361cfe 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -441,15 +441,16 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { // Create a custom connector if the database driver supports it. connectorCreator, ok := p.db.Driver().(database.ConnectorCreator) - if !ok { - connector, err = pq.NewConnector(connectURL) + if ok { + connector, err = connectorCreator.Connector(connectURL) if err != nil { - return xerrors.Errorf("create pq connector: %w", err) + return xerrors.Errorf("create custom connector: %w", err) } } else { - connector, err = connectorCreator.Connector(connectURL) + // use the default pq connector otherwise + connector, err = pq.NewConnector(connectURL) if err != nil { - return xerrors.Errorf("create custom connector: %w", err) + return xerrors.Errorf("create pq connector: %w", err) } } From 8be052406d52dbca98721de8e1faa69ab63a1783 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 13 Aug 2024 17:18:01 +0000 Subject: [PATCH 03/22] flake --- coderd/database/pubsub/pubsub.go | 8 ++++---- flake.nix | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index 74bd46b361cfe..388cfc69a0991 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -609,8 +609,8 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) { } // New creates a new Pubsub implementation using a PostgreSQL connection. -func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (*PGPubsub, error) { - p := newWithoutListener(logger, database) +func New(startCtx context.Context, logger slog.Logger, db *sql.DB, connectURL string) (*PGPubsub, error) { + p := newWithoutListener(logger, db) if err := p.startListener(startCtx, connectURL); err != nil { return nil, err } @@ -620,11 +620,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect } // newWithoutListener creates a new PGPubsub without creating the pqListener. -func newWithoutListener(logger slog.Logger, database *sql.DB) *PGPubsub { +func newWithoutListener(logger slog.Logger, db *sql.DB) *PGPubsub { return &PGPubsub{ logger: logger, listenDone: make(chan struct{}), - db: database, + db: db, queues: make(map[string]map[uuid.UUID]*msgQueue), latencyMeasurer: NewLatencyMeasurer(logger.Named("latency-measurer")), diff --git a/flake.nix b/flake.nix index 079d895e482a9..3bb9e1e6c1329 100644 --- a/flake.nix +++ b/flake.nix @@ -117,7 +117,7 @@ name = "coder-${osArch}"; # Updated with ./scripts/update-flake.sh`. # This should be updated whenever go.mod changes! - vendorHash = "sha256-AZ0qzh7H+UwnZNyg2iaNMSUWlGgomI/mo70T+FdF7ws="; + vendorHash = "sha256-AWWSj+tNYEzvf4lTgdUJMi2bY6rXxZHGh50P4WY2BUo="; proxyVendor = true; src = ./.; nativeBuildInputs = with pkgs; [ getopt openssl zstd ]; From 3be05c2c028a7e0d5d203a24838550645fec4fbd Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 13 Aug 2024 17:26:43 +0000 Subject: [PATCH 04/22] fmt --- coderd/database/awsiamrds/awsiamrds.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/coderd/database/awsiamrds/awsiamrds.go b/coderd/database/awsiamrds/awsiamrds.go index eb9a29dcc6a87..a8cd6ab495b55 100644 --- a/coderd/database/awsiamrds/awsiamrds.go +++ b/coderd/database/awsiamrds/awsiamrds.go @@ -21,8 +21,10 @@ type awsIamRdsDriver struct { cfg aws.Config } -var _ driver.Driver = &awsIamRdsDriver{} -var _ database.ConnectorCreator = &awsIamRdsDriver{} +var ( + _ driver.Driver = &awsIamRdsDriver{} + _ database.ConnectorCreator = &awsIamRdsDriver{} +) // Register initializes and registers our aws iam rds wrapped database driver. func Register(ctx context.Context, parentName string) (string, error) { From c7964e406a5f76c4f3b092a359745604d2213546 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 13 Aug 2024 18:37:56 +0000 Subject: [PATCH 05/22] temp add personal fork --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a71aeacc17cab..a29a504375f48 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ replace github.com/pkg/sftp => github.com/mafredri/sftp v1.13.6-0.20231212144145 // Adds support for a new Listener from a driver.Connector // This lets us use rotating authentication tokens for passwords in connection strings // which we use in the awsiamrds package. -replace github.com/lib/pq => github.com/coder/pq v1.10.5-0.20240813145306-1ce661cfa68d +replace github.com/lib/pq => github.com/f0ssel/pq v1.10.10-0.20240813183442-0c420cb5a048 require ( cdr.dev/slog v1.6.2-0.20240126064726-20367d4aede6 diff --git a/go.sum b/go.sum index 3027257a7bb61..e94d0a19c342d 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,6 @@ github.com/coder/go-httpstat v0.0.0-20230801153223-321c88088322 h1:m0lPZjlQ7vdVp github.com/coder/go-httpstat v0.0.0-20230801153223-321c88088322/go.mod h1:rOLFDDVKVFiDqZFXoteXc97YXx7kFi9kYqR+2ETPkLQ= github.com/coder/go-scim/pkg/v2 v2.0.0-20230221055123-1d63c1222136 h1:0RgB61LcNs24WOxc3PBvygSNTQurm0PYPujJjLLOzs0= github.com/coder/go-scim/pkg/v2 v2.0.0-20230221055123-1d63c1222136/go.mod h1:VkD1P761nykiq75dz+4iFqIQIZka189tx1BQLOp0Skc= -github.com/coder/pq v1.10.5-0.20240813145306-1ce661cfa68d h1:pv+JacyCHoHAr2kh6HltHdFlWqVeWHCvaQDqra5Aff4= -github.com/coder/pq v1.10.5-0.20240813145306-1ce661cfa68d/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0 h1:3A0ES21Ke+FxEM8CXx9n47SZOKOpgSE1bbJzlE4qPVs= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0/go.mod h1:5UuS2Ts+nTToAMeOjNlnHFkPahrtDkmpydBen/3wgZc= github.com/coder/quartz v0.1.0 h1:cLL+0g5l7xTf6ordRnUMMiZtRE8Sq5LxpghS63vEXrQ= @@ -302,6 +300,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanw/esbuild v0.23.0 h1:PLUwTn2pzQfIBRrMKcD3M0g1ALOKIHMDefdFCk7avwM= github.com/evanw/esbuild v0.23.0/go.mod h1:D2vIQZqV/vIf/VRHtViaUtViZmG7o+kKmlBfVQuRi48= +github.com/f0ssel/pq v1.10.10-0.20240813183442-0c420cb5a048 h1:deEfYCBk8DLTgP+vRn7lh/6Qkc8WIWzQivMciYszoU4= +github.com/f0ssel/pq v1.10.10-0.20240813183442-0c420cb5a048/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= From 6ff2ec7edf0fb3c49207b0549ca1c1b246310806 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 14 Aug 2024 17:03:32 +0000 Subject: [PATCH 06/22] update coder fork --- flake.nix | 2 +- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flake.nix b/flake.nix index 3bb9e1e6c1329..c21de7c021fe5 100644 --- a/flake.nix +++ b/flake.nix @@ -117,7 +117,7 @@ name = "coder-${osArch}"; # Updated with ./scripts/update-flake.sh`. # This should be updated whenever go.mod changes! - vendorHash = "sha256-AWWSj+tNYEzvf4lTgdUJMi2bY6rXxZHGh50P4WY2BUo="; + vendorHash = "sha256-0tsJY4/afWbsaFgq4j0F4kUSVXsWCQOwCk2VeU32vfM="; proxyVendor = true; src = ./.; nativeBuildInputs = with pkgs; [ getopt openssl zstd ]; diff --git a/go.mod b/go.mod index a29a504375f48..c644bf22195fd 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ replace github.com/pkg/sftp => github.com/mafredri/sftp v1.13.6-0.20231212144145 // Adds support for a new Listener from a driver.Connector // This lets us use rotating authentication tokens for passwords in connection strings // which we use in the awsiamrds package. -replace github.com/lib/pq => github.com/f0ssel/pq v1.10.10-0.20240813183442-0c420cb5a048 +replace github.com/lib/pq => github.com/coder/pq v1.10.5-0.20240813183442-0c420cb5a048 require ( cdr.dev/slog v1.6.2-0.20240126064726-20367d4aede6 diff --git a/go.sum b/go.sum index e94d0a19c342d..3893c3bcb9eea 100644 --- a/go.sum +++ b/go.sum @@ -215,6 +215,8 @@ github.com/coder/go-httpstat v0.0.0-20230801153223-321c88088322 h1:m0lPZjlQ7vdVp github.com/coder/go-httpstat v0.0.0-20230801153223-321c88088322/go.mod h1:rOLFDDVKVFiDqZFXoteXc97YXx7kFi9kYqR+2ETPkLQ= github.com/coder/go-scim/pkg/v2 v2.0.0-20230221055123-1d63c1222136 h1:0RgB61LcNs24WOxc3PBvygSNTQurm0PYPujJjLLOzs0= github.com/coder/go-scim/pkg/v2 v2.0.0-20230221055123-1d63c1222136/go.mod h1:VkD1P761nykiq75dz+4iFqIQIZka189tx1BQLOp0Skc= +github.com/coder/pq v1.10.5-0.20240813183442-0c420cb5a048 h1:3jzYUlGH7ZELIH4XggXhnTnP05FCYiAFeQpoN+gNR5I= +github.com/coder/pq v1.10.5-0.20240813183442-0c420cb5a048/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0 h1:3A0ES21Ke+FxEM8CXx9n47SZOKOpgSE1bbJzlE4qPVs= github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0/go.mod h1:5UuS2Ts+nTToAMeOjNlnHFkPahrtDkmpydBen/3wgZc= github.com/coder/quartz v0.1.0 h1:cLL+0g5l7xTf6ordRnUMMiZtRE8Sq5LxpghS63vEXrQ= @@ -300,8 +302,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanw/esbuild v0.23.0 h1:PLUwTn2pzQfIBRrMKcD3M0g1ALOKIHMDefdFCk7avwM= github.com/evanw/esbuild v0.23.0/go.mod h1:D2vIQZqV/vIf/VRHtViaUtViZmG7o+kKmlBfVQuRi48= -github.com/f0ssel/pq v1.10.10-0.20240813183442-0c420cb5a048 h1:deEfYCBk8DLTgP+vRn7lh/6Qkc8WIWzQivMciYszoU4= -github.com/f0ssel/pq v1.10.10-0.20240813183442-0c420cb5a048/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= From 02258ee89c5b1b15b22393a10d2aeace5295c168 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 14 Aug 2024 17:29:53 +0000 Subject: [PATCH 07/22] test pubsub --- coderd/database/awsiamrds/awsiamrds_test.go | 23 ++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/coderd/database/awsiamrds/awsiamrds_test.go b/coderd/database/awsiamrds/awsiamrds_test.go index d4a1ce193016e..d2fdd2a769174 100644 --- a/coderd/database/awsiamrds/awsiamrds_test.go +++ b/coderd/database/awsiamrds/awsiamrds_test.go @@ -7,10 +7,11 @@ import ( "github.com/stretchr/testify/require" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" - "github.com/coder/coder/v2/cli" - awsrdsiam "github.com/coder/coder/v2/coderd/database/awsiamrds" + "github.com/coder/coder/v2/coderd/database/awsiamrds" + "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/testutil" ) @@ -25,10 +26,11 @@ func TestDriver(t *testing.T) { t.Skip() } + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) defer cancel() - sqlDriver, err := awsrdsiam.Register(ctx, "postgres") + sqlDriver, err := awsiamrds.Register(ctx, "postgres") require.NoError(t, err) db, err := cli.ConnectToPostgres(ctx, slogtest.Make(t, nil), sqlDriver, url) @@ -47,4 +49,19 @@ func TestDriver(t *testing.T) { var one int require.NoError(t, i.Scan(&one)) require.Equal(t, 1, one) + + ps, err := pubsub.New(ctx, logger, db, url) + require.NoError(t, err) + + gotChan := make(chan struct{}) + subCancel, err := ps.Subscribe("test", func(_ context.Context, _ []byte) { + close(gotChan) + }) + defer subCancel() + require.NoError(t, err) + + err = ps.Publish("test", []byte("hello")) + require.NoError(t, err) + + <-gotChan } From 9d8681b668c2400f0ef91ad90ff62fb1f7280084 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 14 Aug 2024 17:31:46 +0000 Subject: [PATCH 08/22] test pubsub --- coderd/database/awsiamrds/awsiamrds_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/coderd/database/awsiamrds/awsiamrds_test.go b/coderd/database/awsiamrds/awsiamrds_test.go index d2fdd2a769174..8e326382785b9 100644 --- a/coderd/database/awsiamrds/awsiamrds_test.go +++ b/coderd/database/awsiamrds/awsiamrds_test.go @@ -63,5 +63,9 @@ func TestDriver(t *testing.T) { err = ps.Publish("test", []byte("hello")) require.NoError(t, err) - <-gotChan + select { + case <-gotChan: + case <-ctx.Done(): + require.Fail(t, "timed out waiting for message") + } } From 85f6cfe5157f617d6ed1d04789c3b0f8ae318e12 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 14 Aug 2024 17:43:33 +0000 Subject: [PATCH 09/22] t log --- coderd/database/awsiamrds/awsiamrds_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/coderd/database/awsiamrds/awsiamrds_test.go b/coderd/database/awsiamrds/awsiamrds_test.go index 8e326382785b9..36f4ea4d8f6b2 100644 --- a/coderd/database/awsiamrds/awsiamrds_test.go +++ b/coderd/database/awsiamrds/awsiamrds_test.go @@ -23,6 +23,7 @@ func TestDriver(t *testing.T) { // export DBAWSIAMRDS_TEST_URL="postgres://user@host:5432/dbname"; url := os.Getenv("DBAWSIAMRDS_TEST_URL") if url == "" { + t.Log("skipping test; no DBAWSIAMRDS_TEST_URL set") t.Skip() } From dae69c715d750a4c1b9bb7bb421582bd52f4f419 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 21 Aug 2024 16:11:03 +0000 Subject: [PATCH 10/22] add customer driver tests --- coderd/database/connector.go | 8 +- coderd/database/dbtestutil/driver.go | 128 ++++++++++++++++++++++++++ coderd/database/pubsub/pubsub.go | 5 +- coderd/database/pubsub/pubsub_test.go | 79 +++++++++++++++- 4 files changed, 214 insertions(+), 6 deletions(-) create mode 100644 coderd/database/dbtestutil/driver.go diff --git a/coderd/database/connector.go b/coderd/database/connector.go index a022ea033dade..5ade33ed18233 100644 --- a/coderd/database/connector.go +++ b/coderd/database/connector.go @@ -1,19 +1,19 @@ package database import ( - "context" "database/sql/driver" "github.com/lib/pq" ) -// ConnectorCreator can create a driver.Connector. +// ConnectorCreator is a driver.Driver that can create a driver.Connector. type ConnectorCreator interface { + driver.Driver Connector(name string) (driver.Connector, error) } -// DialerConnector can create a driver.Connector and set a pq.Dialer. +// DialerConnector is a driver.Connector that can set a pq.Dialer. type DialerConnector interface { - Connect(context.Context) (driver.Conn, error) + driver.Connector Dialer(dialer pq.Dialer) } diff --git a/coderd/database/dbtestutil/driver.go b/coderd/database/dbtestutil/driver.go new file mode 100644 index 0000000000000..5e097580efc63 --- /dev/null +++ b/coderd/database/dbtestutil/driver.go @@ -0,0 +1,128 @@ +package dbtestutil + +import ( + "context" + "database/sql" + "database/sql/driver" + "fmt" + + "github.com/lib/pq" + "golang.org/x/xerrors" + + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/cryptorand" +) + +var ( + _ driver.Driver = &Driver{} + _ database.ConnectorCreator = &Driver{} + _ database.DialerConnector = &Connector{} +) + +type Driver struct { + name string + inner driver.Driver + connections []driver.Conn + listeners map[chan struct{}]chan struct{} +} + +func Register() (*Driver, error) { + db, err := sql.Open("postgres", "") + if err != nil { + return nil, xerrors.Errorf("failed to open database: %w", err) + } + + su, err := cryptorand.StringCharset(cryptorand.Alpha, 10) + if err != nil { + return nil, xerrors.Errorf("failed to generate random string: %w", err) + } + + d := &Driver{ + name: fmt.Sprintf("postgres-test-%s", su), + inner: db.Driver(), + listeners: make(map[chan struct{}]chan struct{}), + } + + sql.Register(d.name, d) + + return d, nil +} + +func (d *Driver) Open(name string) (driver.Conn, error) { + conn, err := d.inner.Open(name) + if err != nil { + return nil, xerrors.Errorf("failed to open connection: %w", err) + } + + d.AddConnection(conn) + + return conn, nil +} + +func (d *Driver) Connector(name string) (driver.Connector, error) { + return &Connector{ + name: name, + driver: d, + }, nil +} + +func (d *Driver) Name() string { + return d.name +} + +func (d *Driver) AddConnection(conn driver.Conn) { + d.connections = append(d.connections, conn) + + for listener := range d.listeners { + d.listeners[listener] <- struct{}{} + } + +} + +func (d *Driver) WaitForConnection() { + ch := make(chan struct{}) + d.listeners[ch] = ch + <-ch + delete(d.listeners, ch) +} + +func (d *Driver) DropConnections() { + for _, conn := range d.connections { + _ = conn.Close() + } + d.connections = nil +} + +type Connector struct { + name string + driver *Driver + dialer pq.Dialer +} + +func (c *Connector) Connect(_ context.Context) (driver.Conn, error) { + if c.dialer != nil { + conn, err := pq.DialOpen(c.dialer, c.name) + if err != nil { + return nil, xerrors.Errorf("failed to dial open connection: %w", err) + } + + c.driver.AddConnection(conn) + + return conn, nil + } + + conn, err := c.driver.Open(c.name) + if err != nil { + return nil, xerrors.Errorf("failed to open connection: %w", err) + } + + return conn, nil +} + +func (c *Connector) Driver() driver.Driver { + return c.driver +} + +func (c *Connector) Dialer(dialer pq.Dialer) { + c.dialer = dialer +} diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index 388cfc69a0991..79be4bd602032 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -455,7 +455,10 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { } // Set the dialer if the connector supports it. - if dc, ok := connector.(database.DialerConnector); ok { + dc, ok := connector.(database.DialerConnector) + if !ok { + p.logger.Critical(ctx, "connector does not support setting log dialer, database connection debug logs will be missing") + } else { dc.Dialer(dialer) } diff --git a/coderd/database/pubsub/pubsub_test.go b/coderd/database/pubsub/pubsub_test.go index d36298bb3221d..ce114ce0e6b05 100644 --- a/coderd/database/pubsub/pubsub_test.go +++ b/coderd/database/pubsub/pubsub_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -86,7 +87,7 @@ func TestPGPubsub_Metrics(t *testing.T) { for i := range colossalData { colossalData[i] = 'q' } - unsub1, err := uut.Subscribe(event, func(ctx context.Context, message []byte) { + unsub1, err := uut.Subscribe(event, func(_ context.Context, message []byte) { messageChannel <- message }) require.NoError(t, err) @@ -119,3 +120,79 @@ func TestPGPubsub_Metrics(t *testing.T) { !testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total") }, testutil.WaitShort, testutil.IntervalFast) } + +func TestPGPubsubDriver(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("test only with postgres") + } + + ctx := testutil.Context(t, testutil.WaitLong) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoreErrors: true, + }).Leveled(slog.LevelDebug) + + connectionURL, closePg, err := dbtestutil.Open() + require.NoError(t, err) + defer closePg() + + // wrap the pg driver with one we can control + d, err := dbtestutil.Register() + require.NoError(t, err) + + db, err := sql.Open(d.Name(), connectionURL) + require.NoError(t, err) + defer db.Close() + + ps, err := pubsub.New(ctx, logger, db, connectionURL) + require.NoError(t, err) + defer ps.Close() + + // test that we can publish and subscribe + gotChan := make(chan struct{}) + defer close(gotChan) + subCancel, err := ps.Subscribe("test", func(_ context.Context, _ []byte) { + gotChan <- struct{}{} + }) + require.NoError(t, err) + defer subCancel() + + err = ps.Publish("test", []byte("hello")) + require.NoError(t, err) + + select { + case <-gotChan: + case <-ctx.Done(): + t.Fatal("timeout waiting for message") + } + + reconnectChan := make(chan struct{}) + go func() { + d.WaitForConnection() + // wait a bit to make sure the pubsub has reestablished it's connection + // if we don't wait, the publish may be dropped because the pubsub hasn't initialized yet. + time.Sleep(1 * time.Second) + reconnectChan <- struct{}{} + }() + + // drop the underlying connection being used by the pubsub + // the pq.Listener should reconnect and repopulate it's listeners + // so old subscriptions should still work + d.DropConnections() + + select { + case <-reconnectChan: + case <-ctx.Done(): + t.Fatal("timeout waiting for reconnect") + } + + // ensure our old subscription still fires + err = ps.Publish("test", []byte("hello-again")) + require.NoError(t, err) + + select { + case <-gotChan: + case <-ctx.Done(): + t.Fatal("timeout waiting for message after reconnect") + } +} From 2019fecd3b489e69312770f6c8be02286d1aa12c Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 21 Aug 2024 16:15:53 +0000 Subject: [PATCH 11/22] update flake --- flake.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index 7b2c6644b89ed..4dc2b2b330969 100644 --- a/flake.nix +++ b/flake.nix @@ -117,7 +117,7 @@ name = "coder-${osArch}"; # Updated with ./scripts/update-flake.sh`. # This should be updated whenever go.mod changes! - vendorHash = "sha256-I2YMiYXq8XrJd6jw8JZnOL5wugPTLsip9JGELEy5Uao="; + vendorHash = "sha256-wC1c8MK+rP/oP+CTbjCNDvCAvXZRqizxbBJN6v5bQ9A="; proxyVendor = true; src = ./.; nativeBuildInputs = with pkgs; [ getopt openssl zstd ]; From ff2784cd399b059b5f16dd9871398ae357dfd434 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 21 Aug 2024 16:20:57 +0000 Subject: [PATCH 12/22] lint --- coderd/database/dbtestutil/driver.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/coderd/database/dbtestutil/driver.go b/coderd/database/dbtestutil/driver.go index 5e097580efc63..7f750b35daf35 100644 --- a/coderd/database/dbtestutil/driver.go +++ b/coderd/database/dbtestutil/driver.go @@ -72,11 +72,9 @@ func (d *Driver) Name() string { func (d *Driver) AddConnection(conn driver.Conn) { d.connections = append(d.connections, conn) - for listener := range d.listeners { d.listeners[listener] <- struct{}{} } - } func (d *Driver) WaitForConnection() { From c18963bd8922ec6df8da2a6038edb19b010c2beb Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 21 Aug 2024 16:22:16 +0000 Subject: [PATCH 13/22] move to defer --- coderd/database/dbtestutil/driver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/coderd/database/dbtestutil/driver.go b/coderd/database/dbtestutil/driver.go index 7f750b35daf35..b21ee1d2d9b54 100644 --- a/coderd/database/dbtestutil/driver.go +++ b/coderd/database/dbtestutil/driver.go @@ -79,9 +79,10 @@ func (d *Driver) AddConnection(conn driver.Conn) { func (d *Driver) WaitForConnection() { ch := make(chan struct{}) + defer close(ch) + defer delete(d.listeners, ch) d.listeners[ch] = ch <-ch - delete(d.listeners, ch) } func (d *Driver) DropConnections() { From 1600abff933930520fbe39ee2254a360bea8a699 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 22 Aug 2024 20:11:44 +0000 Subject: [PATCH 14/22] fix tests --- coderd/database/dbtestutil/driver.go | 111 +++++++------------------- coderd/database/pubsub/pubsub_test.go | 62 ++++++-------- 2 files changed, 57 insertions(+), 116 deletions(-) diff --git a/coderd/database/dbtestutil/driver.go b/coderd/database/dbtestutil/driver.go index b21ee1d2d9b54..6fb5396869abc 100644 --- a/coderd/database/dbtestutil/driver.go +++ b/coderd/database/dbtestutil/driver.go @@ -2,96 +2,19 @@ package dbtestutil import ( "context" - "database/sql" + "database/sql/driver" - "fmt" "github.com/lib/pq" "golang.org/x/xerrors" "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/cryptorand" ) var ( - _ driver.Driver = &Driver{} - _ database.ConnectorCreator = &Driver{} - _ database.DialerConnector = &Connector{} + _ database.DialerConnector = &Connector{} ) -type Driver struct { - name string - inner driver.Driver - connections []driver.Conn - listeners map[chan struct{}]chan struct{} -} - -func Register() (*Driver, error) { - db, err := sql.Open("postgres", "") - if err != nil { - return nil, xerrors.Errorf("failed to open database: %w", err) - } - - su, err := cryptorand.StringCharset(cryptorand.Alpha, 10) - if err != nil { - return nil, xerrors.Errorf("failed to generate random string: %w", err) - } - - d := &Driver{ - name: fmt.Sprintf("postgres-test-%s", su), - inner: db.Driver(), - listeners: make(map[chan struct{}]chan struct{}), - } - - sql.Register(d.name, d) - - return d, nil -} - -func (d *Driver) Open(name string) (driver.Conn, error) { - conn, err := d.inner.Open(name) - if err != nil { - return nil, xerrors.Errorf("failed to open connection: %w", err) - } - - d.AddConnection(conn) - - return conn, nil -} - -func (d *Driver) Connector(name string) (driver.Connector, error) { - return &Connector{ - name: name, - driver: d, - }, nil -} - -func (d *Driver) Name() string { - return d.name -} - -func (d *Driver) AddConnection(conn driver.Conn) { - d.connections = append(d.connections, conn) - for listener := range d.listeners { - d.listeners[listener] <- struct{}{} - } -} - -func (d *Driver) WaitForConnection() { - ch := make(chan struct{}) - defer close(ch) - defer delete(d.listeners, ch) - d.listeners[ch] = ch - <-ch -} - -func (d *Driver) DropConnections() { - for _, conn := range d.connections { - _ = conn.Close() - } - d.connections = nil -} - type Connector struct { name string driver *Driver @@ -105,7 +28,7 @@ func (c *Connector) Connect(_ context.Context) (driver.Conn, error) { return nil, xerrors.Errorf("failed to dial open connection: %w", err) } - c.driver.AddConnection(conn) + c.driver.Connections <- conn return conn, nil } @@ -115,6 +38,8 @@ func (c *Connector) Connect(_ context.Context) (driver.Conn, error) { return nil, xerrors.Errorf("failed to open connection: %w", err) } + c.driver.Connections <- conn + return conn, nil } @@ -125,3 +50,29 @@ func (c *Connector) Driver() driver.Driver { func (c *Connector) Dialer(dialer pq.Dialer) { c.dialer = dialer } + +type Driver struct { + Connections chan driver.Conn +} + +func NewDriver() *Driver { + return &Driver{ + Connections: make(chan driver.Conn, 1), + } +} + +func (d *Driver) Connector(name string) (driver.Connector, error) { + return &Connector{ + name: name, + driver: d, + }, nil +} + +func (d *Driver) Open(name string) (driver.Conn, error) { + c, err := d.Connector(name) + if err != nil { + return nil, err + } + + return c.Connect(context.Background()) +} diff --git a/coderd/database/pubsub/pubsub_test.go b/coderd/database/pubsub/pubsub_test.go index ce114ce0e6b05..fcc0b98e73986 100644 --- a/coderd/database/pubsub/pubsub_test.go +++ b/coderd/database/pubsub/pubsub_test.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "testing" - "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -136,63 +135,54 @@ func TestPGPubsubDriver(t *testing.T) { require.NoError(t, err) defer closePg() - // wrap the pg driver with one we can control - d, err := dbtestutil.Register() + // use a separate subber and pubber so we can keep track of listener connections + db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) - - db, err := sql.Open(d.Name(), connectionURL) + pubber, err := pubsub.New(ctx, logger, db, connectionURL) require.NoError(t, err) - defer db.Close() - ps, err := pubsub.New(ctx, logger, db, connectionURL) + // use a connector that sends us the connections for the subber + subDriver := dbtestutil.NewDriver() + tconn, err := subDriver.Connector(connectionURL) require.NoError(t, err) - defer ps.Close() + tcdb := sql.OpenDB(tconn) + subber, err := pubsub.New(ctx, logger, tcdb, connectionURL) + require.NoError(t, err) + defer subber.Close() // test that we can publish and subscribe - gotChan := make(chan struct{}) + gotChan := make(chan struct{}, 1) defer close(gotChan) - subCancel, err := ps.Subscribe("test", func(_ context.Context, _ []byte) { + subCancel, err := subber.Subscribe("test", func(_ context.Context, _ []byte) { gotChan <- struct{}{} }) require.NoError(t, err) defer subCancel() - err = ps.Publish("test", []byte("hello")) + t.Log("publishing message") + // send a message + err = pubber.Publish("test", []byte("hello")) require.NoError(t, err) - select { - case <-gotChan: - case <-ctx.Done(): - t.Fatal("timeout waiting for message") - } + // wait for the message + _ = testutil.RequireRecvCtx(ctx, t, gotChan) - reconnectChan := make(chan struct{}) - go func() { - d.WaitForConnection() - // wait a bit to make sure the pubsub has reestablished it's connection - // if we don't wait, the publish may be dropped because the pubsub hasn't initialized yet. - time.Sleep(1 * time.Second) - reconnectChan <- struct{}{} - }() + // read out first connection + firstConn := testutil.RequireRecvCtx(ctx, t, subDriver.Connections) // drop the underlying connection being used by the pubsub // the pq.Listener should reconnect and repopulate it's listeners // so old subscriptions should still work - d.DropConnections() + err = firstConn.Close() + require.NoError(t, err) - select { - case <-reconnectChan: - case <-ctx.Done(): - t.Fatal("timeout waiting for reconnect") - } + // wait for the reconnect + _ = testutil.RequireRecvCtx(ctx, t, subDriver.Connections) // ensure our old subscription still fires - err = ps.Publish("test", []byte("hello-again")) + err = pubber.Publish("test", []byte("hello-again")) require.NoError(t, err) - select { - case <-gotChan: - case <-ctx.Done(): - t.Fatal("timeout waiting for message after reconnect") - } + // wait for the message on the old subscription + _ = testutil.RequireRecvCtx(ctx, t, gotChan) } From 839a52e86a86a01dd5d908572706f023521357a3 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 22 Aug 2024 20:12:02 +0000 Subject: [PATCH 15/22] remove log --- coderd/database/pubsub/pubsub_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/coderd/database/pubsub/pubsub_test.go b/coderd/database/pubsub/pubsub_test.go index fcc0b98e73986..96c7439ef510e 100644 --- a/coderd/database/pubsub/pubsub_test.go +++ b/coderd/database/pubsub/pubsub_test.go @@ -159,7 +159,6 @@ func TestPGPubsubDriver(t *testing.T) { require.NoError(t, err) defer subCancel() - t.Log("publishing message") // send a message err = pubber.Publish("test", []byte("hello")) require.NoError(t, err) From cd61cc7dc7883d887547588296d251b0e671bb82 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 22 Aug 2024 20:14:29 +0000 Subject: [PATCH 16/22] update flake --- flake.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index bd012d1f51371..7e96e3dc04b0f 100644 --- a/flake.nix +++ b/flake.nix @@ -117,7 +117,7 @@ name = "coder-${osArch}"; # Updated with ./scripts/update-flake.sh`. # This should be updated whenever go.mod changes! - vendorHash = "sha256-lkjb9a40kzi7c3JbVb+5wT1JYTaobLl4lIq11FIvxuQ="; + vendorHash = "sha256-+vj01/Krgy2SjUny88cRKALrQ3bIVvOlq87acIS6gBw="; proxyVendor = true; src = ./.; nativeBuildInputs = with pkgs; [ getopt openssl zstd ]; From c946cdddb7fb61363549cc17484fd75d9922d934 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 22 Aug 2024 20:17:20 +0000 Subject: [PATCH 17/22] fmt --- coderd/database/dbtestutil/driver.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/coderd/database/dbtestutil/driver.go b/coderd/database/dbtestutil/driver.go index 6fb5396869abc..1fd237e8260b5 100644 --- a/coderd/database/dbtestutil/driver.go +++ b/coderd/database/dbtestutil/driver.go @@ -2,7 +2,6 @@ package dbtestutil import ( "context" - "database/sql/driver" "github.com/lib/pq" @@ -11,9 +10,7 @@ import ( "github.com/coder/coder/v2/coderd/database" ) -var ( - _ database.DialerConnector = &Connector{} -) +var _ database.DialerConnector = &Connector{} type Connector struct { name string From 1b139e4697e3ed58ed1698658dcdac181f8dcba4 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 22 Aug 2024 20:30:13 +0000 Subject: [PATCH 18/22] close chan --- coderd/database/dbtestutil/driver.go | 4 ++++ coderd/database/pubsub/pubsub_test.go | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/coderd/database/dbtestutil/driver.go b/coderd/database/dbtestutil/driver.go index 1fd237e8260b5..f1150d990e0fb 100644 --- a/coderd/database/dbtestutil/driver.go +++ b/coderd/database/dbtestutil/driver.go @@ -73,3 +73,7 @@ func (d *Driver) Open(name string) (driver.Conn, error) { return c.Connect(context.Background()) } + +func (d *Driver) Close() { + close(d.Connections) +} diff --git a/coderd/database/pubsub/pubsub_test.go b/coderd/database/pubsub/pubsub_test.go index 96c7439ef510e..bd3a3be831fc0 100644 --- a/coderd/database/pubsub/pubsub_test.go +++ b/coderd/database/pubsub/pubsub_test.go @@ -51,7 +51,7 @@ func TestPGPubsub_Metrics(t *testing.T) { event := "test" data := "testing" messageChannel := make(chan []byte) - unsub0, err := uut.Subscribe(event, func(ctx context.Context, message []byte) { + unsub0, err := uut.Subscribe(event, func(_ context.Context, message []byte) { messageChannel <- message }) require.NoError(t, err) @@ -140,9 +140,11 @@ func TestPGPubsubDriver(t *testing.T) { require.NoError(t, err) pubber, err := pubsub.New(ctx, logger, db, connectionURL) require.NoError(t, err) + defer pubber.Close() // use a connector that sends us the connections for the subber subDriver := dbtestutil.NewDriver() + defer subDriver.Close() tconn, err := subDriver.Connector(connectionURL) require.NoError(t, err) tcdb := sql.OpenDB(tconn) From 8c96f6ed1c2d4dc6fd5df436f4fa4d5ac0463dfd Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Fri, 23 Aug 2024 18:25:15 +0000 Subject: [PATCH 19/22] use pq --- coderd/database/dbtestutil/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/database/dbtestutil/driver.go b/coderd/database/dbtestutil/driver.go index f1150d990e0fb..cb2e05af78617 100644 --- a/coderd/database/dbtestutil/driver.go +++ b/coderd/database/dbtestutil/driver.go @@ -30,7 +30,7 @@ func (c *Connector) Connect(_ context.Context) (driver.Conn, error) { return conn, nil } - conn, err := c.driver.Open(c.name) + conn, err := pq.Driver{}.Open(c.name) if err != nil { return nil, xerrors.Errorf("failed to open connection: %w", err) } From ed009f7a5f0e7006d0de444990b9dc5c153fa772 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Fri, 23 Aug 2024 18:27:03 +0000 Subject: [PATCH 20/22] update flake --- flake.nix | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flake.nix b/flake.nix index e75652fec209e..0a38d3955924e 100644 --- a/flake.nix +++ b/flake.nix @@ -118,9 +118,9 @@ # Updated with ./scripts/update-flake.sh`. # This should be updated whenever go.mod changes! <<<<<<< HEAD - vendorHash = "sha256-+vj01/Krgy2SjUny88cRKALrQ3bIVvOlq87acIS6gBw="; + vendorHash = "sha256-fQsVoD/aRjVXmvQ/Pg4O9tpJCPlf3eC2uo0z0TU7AX8="; ======= - vendorHash = "sha256-I/FcLT6N7Nz21QptkvCcs/SpMJFH0B5xVzIZNrEqVGo="; + vendorHash = "sha256-fQsVoD/aRjVXmvQ/Pg4O9tpJCPlf3eC2uo0z0TU7AX8="; >>>>>>> main proxyVendor = true; src = ./.; From c8a7c9ae95209a4abd1c360ef2461c57524a2f95 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Fri, 23 Aug 2024 18:30:01 +0000 Subject: [PATCH 21/22] fix flake --- flake.nix | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flake.nix b/flake.nix index 0a38d3955924e..15ce314c7b427 100644 --- a/flake.nix +++ b/flake.nix @@ -117,11 +117,7 @@ name = "coder-${osArch}"; # Updated with ./scripts/update-flake.sh`. # This should be updated whenever go.mod changes! -<<<<<<< HEAD vendorHash = "sha256-fQsVoD/aRjVXmvQ/Pg4O9tpJCPlf3eC2uo0z0TU7AX8="; -======= - vendorHash = "sha256-fQsVoD/aRjVXmvQ/Pg4O9tpJCPlf3eC2uo0z0TU7AX8="; ->>>>>>> main proxyVendor = true; src = ./.; nativeBuildInputs = with pkgs; [ getopt openssl zstd ]; From 05cbc3c903a1bb10cf6fc3ca6d562c775c4f649e Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Mon, 26 Aug 2024 14:52:49 +0000 Subject: [PATCH 22/22] sleep to prevent flake --- coderd/database/pubsub/pubsub_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/coderd/database/pubsub/pubsub_test.go b/coderd/database/pubsub/pubsub_test.go index bd3a3be831fc0..6059b0cecbd97 100644 --- a/coderd/database/pubsub/pubsub_test.go +++ b/coderd/database/pubsub/pubsub_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -179,6 +180,9 @@ func TestPGPubsubDriver(t *testing.T) { // wait for the reconnect _ = testutil.RequireRecvCtx(ctx, t, subDriver.Connections) + // we need to sleep because the raw connection notification + // is sent before the pq.Listener can reestablish it's listeners + time.Sleep(1 * time.Second) // ensure our old subscription still fires err = pubber.Publish("test", []byte("hello-again"))