Skip to content

fix: use authenticated urls for pubsub #14261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Aug 26, 2024
Merged

fix: use authenticated urls for pubsub #14261

merged 25 commits into from
Aug 26, 2024

Conversation

f0ssel
Copy link
Contributor

@f0ssel f0ssel commented Aug 13, 2024

Fix for https://github.com/coder/customers/issues/639

The pq.Listener we use for PubSub does not expose a way to hook into the lifecycle of reconnecting and that prevents us from supporting rotating passwords on reconnection. Previously, we've used a fork coder/pq that exposes a way to create listeners with a given connector. The new v2 branch of coder/pq is a copy of the 1.10.9 tag from lib/pq with our patch on top to ensure no other package changes are brought in besides ours.

This now allows us to create a *sql.DB that uses awsiamrds under the hood and allows us to make a new connector that will generate new credentials on PubSub reconnect.

I've updated the awsiamrds tests to now include testing pubsub. I've tested this on an EC2 instance with the correct roles for connecting to a test RDS instance:

~/coder$ export AWS_DEFAULT_REGION=us-east-2; export DBAWSIAMRDS_TEST_URL=postgres://coder@f0ssel-awsiamrds-instance-1.crvoyau6uypq.us-east-2.rds.amazonaws.com:5432/coder; go test -v ./coderd/database/awsiamrds/awsiamrds_test.go
=== RUN   TestDriver
=== PAUSE TestDriver
=== CONT  TestDriver
    t.go:99: 2024-08-14 17:46:23.235 [info]  pubsub dialing postgres  network=tcp  address=f0ssel-awsiamrds-instance-1.crvoyau6uypq.us-east-2.rds.amazonaws.com:5432  timeout_ms=0
    t.go:99: 2024-08-14 17:46:23.237 [info]  pubsub postgres TCP connection established  network=tcp  address=f0ssel-awsiamrds-instance-1.crvoyau6uypq.us-east-2.rds.amazonaws.com:5432  timeout_ms=0  elapsed_ms=2
    t.go:99: 2024-08-14 17:46:23.261 [info]  pubsub connected to postgres
    t.go:99: 2024-08-14 17:46:23.261 [info]  pubsub has started
    t.go:99: 2024-08-14 17:46:23.262 [debu]  started listening to event channel  event=test
    t.go:99: 2024-08-14 17:46:23.263 [debu]  publish  event=test  message_len=5
    t.go:99: 2024-08-14 17:46:23.293 [debu]  stopped listening to event channel  event=test
--- PASS: TestDriver (0.16s)
PASS
ok      command-line-arguments  0.189s

Pubsub reconnect tests:

➜  coder git:(f0ssel/pubsub-conn-2) DB=ci go test ./coderd/database/pubsub/ -v -run TestPGPubsubDriver
=== RUN   TestPGPubsubDriver
=== PAUSE TestPGPubsubDriver
=== CONT  TestPGPubsubDriver
    t.go:99: 2024-08-21 16:16:55.761 [info]  pubsub dialing postgres  network=tcp  address=localhost:32779  timeout_ms=0
    t.go:99: 2024-08-21 16:16:55.762 [info]  pubsub postgres TCP connection established  network=tcp  address=localhost:32779  timeout_ms=0  elapsed_ms=0
    t.go:99: 2024-08-21 16:16:55.766 [info]  pubsub connected to postgres
    t.go:99: 2024-08-21 16:16:55.766 [info]  pubsub has started
    t.go:99: 2024-08-21 16:16:55.767 [debu]  started listening to event channel  event=test
    t.go:99: 2024-08-21 16:16:55.767 [debu]  publish  event=test  message_len=5
    t.go:102: 2024-08-21 16:16:55.774 [erro]  pubsub disconnected from postgres  error="read tcp 127.0.0.1:53352->127.0.0.1:32779: use of closed network connection"
    t.go:99: 2024-08-21 16:16:56.766 [info]  pubsub dialing postgres  network=tcp  address=localhost:32779  timeout_ms=0
    t.go:99: 2024-08-21 16:16:56.767 [info]  pubsub postgres TCP connection established  network=tcp  address=localhost:32779  timeout_ms=0  elapsed_ms=0
    t.go:99: 2024-08-21 16:16:56.772 [info]  pubsub reconnected to postgres
    t.go:99: 2024-08-21 16:16:56.772 [debu]  notifying subscribers of a reconnection
    t.go:99: 2024-08-21 16:16:57.772 [debu]  publish  event=test  message_len=11
    t.go:99: 2024-08-21 16:16:57.779 [debu]  stopped listening to event channel  event=test
    t.go:99: 2024-08-21 16:16:57.779 [info]  pubsub is closing
    t.go:99: 2024-08-21 16:16:57.779 [info]  pubsub listen stopped receiving notify
    t.go:99: 2024-08-21 16:16:57.779 [debu]  pubsub closed
--- PASS: TestPGPubsubDriver (5.04s)
PASS
ok      github.com/coder/coder/v2/coderd/database/pubsub        5.062s

@f0ssel f0ssel force-pushed the f0ssel/pubsub-conn-2 branch from 39d9b47 to 6beb8a0 Compare August 13, 2024 18:38
@f0ssel f0ssel marked this pull request as ready for review August 14, 2024 17:15
@f0ssel f0ssel force-pushed the f0ssel/pubsub-conn-2 branch from 119b440 to 85f6cfe Compare August 14, 2024 17:45
@f0ssel f0ssel requested a review from spikecurtis August 14, 2024 17:49
}

// Set the dialer if the connector supports it.
if dc, ok := connector.(database.DialerConnector); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to drop a CRITICAL log if this type assertion fails, since it means we've introduced a new driver that doesn't support a Dialer and our logs will be incomplete.

// Create a custom connector if the database driver supports it.
connectorCreator, ok := p.db.Driver().(database.ConnectorCreator)
if ok {
connector, err = connectorCreator.Connector(connectURL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code doesn't get hit in the package unit tests. A good way to test it would be to create a pq Driver wrapper that we can control.

I'd like to see a test where we start pubsub with the wrapped driver, do some pub'ing and sub'ing, then kill the connection and verify that the pubsub / pq.Listener reconnects automatically. That would give a nice test of the pq changes you made as well.

}

// DialerConnector can create a driver.Connector and set a pq.Dialer.
type DialerConnector interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@f0ssel f0ssel requested a review from spikecurtis August 21, 2024 16:30
name string
inner driver.Driver
connections []driver.Conn
listeners map[chan struct{}]chan struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This connections and listeners business is not threadsafe, and so can cause races in our tests.

A better design would be to just have a chan driver.Conn that we pass each new connection thru. Tests will have to know how many connections they expect, and can read from the channel. For the PubSub, it's the listener we want to test, which should use a connection when it first connects, then another one when it reconnects.

That way you don't need methods like AddConnection, WaitForConnection, or DropConnections -- the test code reads from the channel to wait for the connection, and can directly close the connection when it wants to interrupt it.

There is some complexity around the sql.DB, which we use for publishing and has a pool of connections. I suggest you sidestep that complexity by just using a second PubSub for publishing with a regular pq driver.

}

func Register() (*Driver, error) {
db, err := sql.Open("postgres", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a roundabout way to get a pq driver, which is just pq.Driver{}

listeners: make(map[chan struct{}]chan struct{}),
}

sql.Register(d.name, d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you're doing this so that you can get a sql.DB to pass to the PubSub with this instrumented driver. But, registering it and using sql.Open() with the name is more complex than it needs to be.

If you allow test code to directly instantiate the instrumented connector, then you can get a sql.DB as:

db := sql.ConnectDB(connector)

That avoids registering the driver and worrying about a unique name.

case <-gotChan:
case <-ctx.Done():
t.Fatal("timeout waiting for message")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This construction comes up a lot, so we have

_ = testutil.RequireRecvContext(ctx, t, gotChan)

for this purpose.

case <-reconnectChan:
case <-ctx.Done():
t.Fatal("timeout waiting for reconnect")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ = testutil.RequireRecvChan(ctx, t, reconnectChan)

case <-gotChan:
case <-ctx.Done():
t.Fatal("timeout waiting for message after reconnect")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ = testutil.RequireRecvCtx(ctx, t, gotChan)

@@ -119,3 +120,79 @@ func TestPGPubsub_Metrics(t *testing.T) {
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
}, testutil.WaitShort, testutil.IntervalFast)
}

func TestPGPubsubDriver(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good test, but the design of the instrumented driver is racy -- see comments on dbtestutil/driver.go for how to address.

@f0ssel
Copy link
Contributor Author

f0ssel commented Aug 22, 2024

@spikecurtis Thanks for the feedback on the test structure, I've updated it with your suggestions and I think things are looking good.

@f0ssel f0ssel requested a review from spikecurtis August 22, 2024 20:40
Copy link
Contributor

@spikecurtis spikecurtis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One problem inline that needs fixing, but I don't need to review again.

return conn, nil
}

conn, err := c.driver.Open(c.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is circular. Driver.Open() creates a new Connector and calls into it. Fortunately, the pubsub creates a dialer, but anyone else who uses this might not and will overflow their stack.

You need to use pq.Driver{} directly here.

@f0ssel f0ssel enabled auto-merge (squash) August 23, 2024 18:31
@f0ssel f0ssel merged commit ded612d into main Aug 26, 2024
29 checks passed
@f0ssel f0ssel deleted the f0ssel/pubsub-conn-2 branch August 26, 2024 15:04
@github-actions github-actions bot locked and limited conversation to collaborators Aug 26, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants