Skip to content

chore: fix concurrent CommitQuota transactions for unrelated users/orgs #15261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixups
  • Loading branch information
Emyrk committed Oct 29, 2024
commit c0d05db8a170a77632419a541f206492076ddcfe
18 changes: 14 additions & 4 deletions coderd/database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,20 @@
GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
}

func WithSerialRetryCount(count int) func(*sqlQuerier) {
return func(q *sqlQuerier) {
q.serialRetryCount = count
}
}

// New creates a new database store using a SQL database connection.
func New(sdb *sql.DB) Store {
func New(sdb *sql.DB, opts ...func(*sqlQuerier)) Store {

Check failure on line 58 in coderd/database/db.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'opts' seems to be unused, consider removing or renaming it as _ (revive)
dbx := sqlx.NewDb(sdb, "postgres")
return &sqlQuerier{
db: dbx,
sdb: dbx,
// This is an arbitrary number.
serialRetryCount: 3,
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this configurable so unit tests should not fail if they hit serial errors. We are not doing much concurrency in our tests, and if we are hitting these failures, we should understand them better and solve them case by case.

We can change this if we get too many flakes.


Expand Down Expand Up @@ -104,6 +112,10 @@
type sqlQuerier struct {
sdb *sqlx.DB
db DBTX

// serialRetryCount is the number of times to retry a transaction
// if it fails with a serialization error.
serialRetryCount int
}

func (*sqlQuerier) Wrappers() []string {
Expand Down Expand Up @@ -143,11 +155,9 @@
// If we are in a transaction already, the parent InTx call will handle the retry.
// We do not want to duplicate those retries.
if !inTx && sqlOpts.Isolation == sql.LevelSerializable {
// This is an arbitrarily chosen number.
const retryAmount = 1
var err error
attempts := 0
for attempts = 0; attempts < retryAmount; attempts++ {
for attempts = 0; attempts < q.serialRetryCount; attempts++ {
txOpts.executionCount++
err = q.runTx(function, sqlOpts)
if err == nil {
Expand Down
1 change: 1 addition & 0 deletions coderd/database/dbfake/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
}

func (b OrganizationBuilder) EveryoneAllowance(allowance int) OrganizationBuilder {
b.allUsersAllowance = int32(allowance)

Check failure on line 42 in coderd/database/dbfake/builder.go

View workflow job for this annotation

GitHub Actions / lint

modifies-value-receiver: suspicious assignment to a by-value method receiver (revive)
return b
}

func (b OrganizationBuilder) Seed(seed database.Organization) OrganizationBuilder {
b.seed = seed

Check failure on line 47 in coderd/database/dbfake/builder.go

View workflow job for this annotation

GitHub Actions / lint

modifies-value-receiver: suspicious assignment to a by-value method receiver (revive)
return b
}

Expand All @@ -67,6 +67,7 @@
org := dbgen.Organization(b.t, b.db, b.seed)

ctx := testutil.Context(b.t, testutil.WaitShort)
//nolint:gocritic // builder code needs perms
ctx = dbauthz.AsSystemRestricted(ctx)
everyone, err := b.db.InsertAllUsersGroup(ctx, org.ID)
require.NoError(b.t, err)
Expand Down
3 changes: 2 additions & 1 deletion coderd/database/dbtestutil/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
if o.dumpOnFailure {
t.Cleanup(func() { DumpOnFailure(t, connectionURL) })
}
db = database.New(sqlDB)
// Unit tests should not retry serial transaction failures.
db = database.New(sqlDB, database.WithSerialRetryCount(1))

ps, err = pubsub.New(context.Background(), o.logger, sqlDB, connectionURL)
require.NoError(t, err)
Expand Down
8 changes: 3 additions & 5 deletions coderd/database/dbtestutil/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//
// require.NoError(t, a.Done()
func StartTx(t *testing.T, db database.Store, opts *database.TxOptions) *DBTx {
errC := make(chan error)
done := make(chan error)
finalErr := make(chan error)
txC := make(chan database.Store)

Expand All @@ -44,12 +44,10 @@
if count > 1 {
// If you recursively call InTx, then don't use this.
t.Logf("InTx called more than once: %d", count)
t.Fatal("InTx called more than once, this is not allowed with the StartTx helper")

Check failure on line 47 in coderd/database/dbtestutil/tx.go

View workflow job for this annotation

GitHub Actions / lint

testinggoroutine: call to (*T).Fatal from a non-test goroutine (govet)
}

select {
case _, _ = <-errC:
}
<-done
// Just return nil. The caller should be checking their own errors.
return nil
}, opts)
Expand All @@ -59,7 +57,7 @@
txStore := <-txC
close(txC)

return &DBTx{Store: txStore, done: errC, finalErr: finalErr}
return &DBTx{Store: txStore, done: done, finalErr: finalErr}
}

// Done can only be called once. If you call it twice, it will panic.
Expand Down
3 changes: 3 additions & 0 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions coderd/database/queries/quotas.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ JOIN latest_builds ON
latest_builds.workspace_id = workspaces.id
WHERE
NOT deleted AND
-- We can likely remove these conditions since we check above.
-- But it does not hurt to be defensive and make sure future query changes
-- do not break anything.
workspaces.owner_id = @owner_id AND
workspaces.organization_id = @organization_id
;
15 changes: 10 additions & 5 deletions enterprise/coderd/workspacequota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,12 @@
})
}

// DB=ci DB_FROM=cikggwjxbths
// nolint:paralleltest // Tests must run serially
func TestWorkspaceSerialization(t *testing.T) {

Check failure on line 310 in enterprise/coderd/workspacequota_test.go

View workflow job for this annotation

GitHub Actions / lint

TestWorkspaceSerialization's subtests should call t.Parallel (tparallel)
t.Parallel()

if !dbtestutil.WillUsePostgres() {
panic("We should only run this test with postgres")
t.Skip("Serialization errors only occur in postgres")
}

db, ps := dbtestutil.NewDB(t)
Expand Down Expand Up @@ -342,8 +342,6 @@
}, user).
Do()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure the query planner uses the cardinality of different columns relative to the table size to determine how to run its query. That means to get reasonable assurance about whether different transactions interfere, we need a realistic number of organizations, users, workspaces, and workspace builds.

PG Docs have this to say

Predicate locks in PostgreSQL, like in most other database systems, are based on data actually accessed by a transaction. These will show up in the pg_locks system view with a mode of SIReadLock. The particular locks acquired during execution of a query will depend on the plan used by the query, and multiple finer-grained locks (e.g., tuple locks) may be combined into fewer coarser-grained locks (e.g., page locks) during the course of the transaction to prevent exhaustion of the memory used to track the locks.

If any page locks are held by our transactions, either as a result of bitmap scans (if that's a thing), or as a result of coalescing finer grained locks, then whether or not transactions serialize without errors could depend on factors not controlled by this test (PostgreSQL config, version, insert timing, etc). That is to say, flaky.

Really, what we want is a realistic amount of data, and then to run repeated transactions, with overlapping pairs chosen randomly, and then to gather statistics on how many pairs interfered with one another. I don't think it makes sense to run these on every CI run. We could even extend to an arbitrary number of simultaneous transactions on different workspaces --- since as deployments grow in size the number of simultaneous builds increases.

Copy link
Member Author

@Emyrk Emyrk Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. So this does not put anything to bed, however this is a step in the right direction. I would prefer to iterate on this with more PRs.

I added metrics to track and log serialization failures: #15215

So at the very least, in the next release we can begin collecting better frequency metrics on these events.

I can spin up some larger datasets. I think we should take the easy index optimizations you pointed out above.


var _ = otherOrg

// TX mixing tests. **DO NOT** run these in parallel.
// The goal here is to mess around with different ordering of
// transactions and queries.
Expand Down Expand Up @@ -373,6 +371,7 @@
// +------------------------------+------------------+
// pq: could not serialize access due to concurrent update
ctx := testutil.Context(t, testutil.WaitLong)
//nolint:gocritic // testing
ctx = dbauthz.AsSystemRestricted(ctx)

myWorkspace := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
Expand All @@ -380,7 +379,7 @@
OwnerID: user.ID,
}).Do()

bumpDeadline := func() {

Check failure on line 382 in enterprise/coderd/workspacequota_test.go

View workflow job for this annotation

GitHub Actions / lint

empty-lines: extra empty line at the end of a block (revive)
err := db.InTx(func(db database.Store) error {
err := db.UpdateWorkspaceBuildDeadlineByID(ctx, database.UpdateWorkspaceBuildDeadlineByIDParams{
Deadline: dbtime.Now(),
Expand Down Expand Up @@ -426,6 +425,7 @@
// +------------------------------+------------------+
// Works!
ctx := testutil.Context(t, testutil.WaitLong)
//nolint:gocritic // testing
ctx = dbauthz.AsSystemRestricted(ctx)

myWorkspace := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
Expand Down Expand Up @@ -456,7 +456,6 @@
Isolation: sql.LevelSerializable,
})
assert.NoError(t, err)

}

// Start TX
Expand Down Expand Up @@ -491,6 +490,7 @@
// +---------------------+----------------------------------+
// pq: could not serialize access due to concurrent update
ctx := testutil.Context(t, testutil.WaitShort)
//nolint:gocritic // testing
ctx = dbauthz.AsSystemRestricted(ctx)

myWorkspace := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
Expand Down Expand Up @@ -540,6 +540,7 @@
// +---------------------+----------------------------------+
// pq: could not serialize access due to concurrent update
ctx := testutil.Context(t, testutil.WaitShort)
//nolint:gocritic // testing
ctx = dbauthz.AsSystemRestricted(ctx)

myWorkspace := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
Expand Down Expand Up @@ -583,6 +584,7 @@
// +---------------------+----------------------------------+
// Works!
ctx := testutil.Context(t, testutil.WaitShort)
//nolint:gocritic // testing
ctx = dbauthz.AsSystemRestricted(ctx)
var err error

Expand Down Expand Up @@ -637,6 +639,7 @@
// | | CommitTx() |
// +---------------------+---------------------+
ctx := testutil.Context(t, testutil.WaitLong)
//nolint:gocritic // testing
ctx = dbauthz.AsSystemRestricted(ctx)

myWorkspace := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
Expand Down Expand Up @@ -695,6 +698,7 @@
// | | CommitTx() |
// +---------------------+---------------------+
ctx := testutil.Context(t, testutil.WaitLong)
//nolint:gocritic // testing
ctx = dbauthz.AsSystemRestricted(ctx)

myWorkspace := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
Expand Down Expand Up @@ -756,6 +760,7 @@
// +---------------------+---------------------+
// pq: could not serialize access due to read/write dependencies among transactions
ctx := testutil.Context(t, testutil.WaitLong)
//nolint:gocritic // testing
ctx = dbauthz.AsSystemRestricted(ctx)

myWorkspace := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
Expand Down Expand Up @@ -891,7 +896,7 @@
return c.DBTx.Done()
}

func noReturn[T any](f func(context.Context, *testing.T) T) func(context.Context, *testing.T) {

Check failure on line 899 in enterprise/coderd/workspacequota_test.go

View workflow job for this annotation

GitHub Actions / lint

func `noReturn` is unused (unused)
return func(ctx context.Context, t *testing.T) {
f(ctx, t)
}
Expand Down
Loading