Skip to content

chore: separate pubsub into a new package #8017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/coder/coder/coderd/database/dbmetrics"
"github.com/coder/coder/coderd/database/dbpurge"
"github.com/coder/coder/coderd/database/migrations"
"github.com/coder/coder/coderd/database/pubsub"
"github.com/coder/coder/coderd/devtunnel"
"github.com/coder/coder/coderd/gitauth"
"github.com/coder/coder/coderd/gitsshkey"
Expand Down Expand Up @@ -463,7 +464,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
Logger: logger.Named("coderd"),
Database: dbfake.New(),
DERPMap: derpMap,
Pubsub: database.NewPubsubInMemory(),
Pubsub: pubsub.NewInMemory(),
CacheDir: cacheDir,
GoogleTokenValidator: googleTokenValidator,
GitAuthConfigs: gitAuthConfigs,
Expand Down Expand Up @@ -589,7 +590,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
if cfg.InMemoryDatabase {
// This is only used for testing.
options.Database = dbmetrics.New(dbfake.New(), options.PrometheusRegistry)
options.Pubsub = database.NewPubsubInMemory()
options.Pubsub = pubsub.NewInMemory()
} else {
sqlDB, err := connectToPostgres(ctx, logger, sqlDriver, cfg.PostgresURL.String())
if err != nil {
Expand All @@ -600,7 +601,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
}()

options.Database = dbmetrics.New(database.New(sqlDB), options.PrometheusRegistry)
options.Pubsub, err = database.NewPubsub(ctx, sqlDB, cfg.PostgresURL.String())
options.Pubsub, err = pubsub.New(ctx, sqlDB, cfg.PostgresURL.String())
if err != nil {
return xerrors.Errorf("create pubsub: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
"github.com/coder/coder/coderd/database/dbmetrics"
"github.com/coder/coder/coderd/database/pubsub"
"github.com/coder/coder/coderd/gitauth"
"github.com/coder/coder/coderd/gitsshkey"
"github.com/coder/coder/coderd/healthcheck"
Expand Down Expand Up @@ -95,7 +96,7 @@ type Options struct {
AppHostnameRegex *regexp.Regexp
Logger slog.Logger
Database database.Store
Pubsub database.Pubsub
Pubsub pubsub.Pubsub

// CacheDir is used for caching files served by the API.
CacheDir string
Expand Down
3 changes: 2 additions & 1 deletion coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
"github.com/coder/coder/coderd/database/dbtestutil"
"github.com/coder/coder/coderd/database/pubsub"
"github.com/coder/coder/coderd/gitauth"
"github.com/coder/coder/coderd/gitsshkey"
"github.com/coder/coder/coderd/healthcheck"
Expand Down Expand Up @@ -130,7 +131,7 @@ type Options struct {
// It should only be used in cases where multiple Coder
// test instances are running against the same database.
Database database.Store
Pubsub database.Pubsub
Pubsub pubsub.Pubsub

ConfigSSH codersdk.SSHConfigResponse

Expand Down
11 changes: 6 additions & 5 deletions coderd/database/dbtestutil/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbfake"
"github.com/coder/coder/coderd/database/postgres"
"github.com/coder/coder/coderd/database/pubsub"
)

func NewDB(t testing.TB) (database.Store, database.Pubsub) {
func NewDB(t testing.TB) (database.Store, pubsub.Pubsub) {
t.Helper()

db := dbfake.New()
pubsub := database.NewPubsubInMemory()
ps := pubsub.NewInMemory()
if os.Getenv("DB") != "" {
connectionURL := os.Getenv("CODER_PG_CONNECTION_URL")
if connectionURL == "" {
Expand All @@ -36,12 +37,12 @@ func NewDB(t testing.TB) (database.Store, database.Pubsub) {
})
db = database.New(sqlDB)

pubsub, err = database.NewPubsub(context.Background(), sqlDB, connectionURL)
ps, err = pubsub.New(context.Background(), sqlDB, connectionURL)
require.NoError(t, err)
t.Cleanup(func() {
_ = pubsub.Close()
_ = ps.Close()
})
}

return db, pubsub
return db, ps
}
26 changes: 13 additions & 13 deletions coderd/database/pubsub.go → coderd/database/pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database
package pubsub

import (
"context"
Expand Down Expand Up @@ -48,7 +48,7 @@ type msgOrErr struct {
type msgQueue struct {
ctx context.Context
cond *sync.Cond
q [PubsubBufferSize]msgOrErr
q [BufferSize]msgOrErr
front int
size int
closed bool
Expand Down Expand Up @@ -82,7 +82,7 @@ func (q *msgQueue) run() {
return
}
item := q.q[q.front]
q.front = (q.front + 1) % PubsubBufferSize
q.front = (q.front + 1) % BufferSize
q.size--
q.cond.L.Unlock()

Expand Down Expand Up @@ -111,20 +111,20 @@ func (q *msgQueue) enqueue(msg []byte) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.size == PubsubBufferSize {
if q.size == BufferSize {
// queue is full, so we're going to drop the msg we got called with.
// We also need to record that messages are being dropped, which we
// do at the last message in the queue. This potentially makes us
// lose 2 messages instead of one, but it's more important at this
// point to warn the subscriber that they're losing messages so they
// can do something about it.
back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize
back := (q.front + BufferSize - 1) % BufferSize
q.q[back].msg = nil
q.q[back].err = ErrDroppedMessages
return
}
// queue is not full, insert the message
next := (q.front + q.size) % PubsubBufferSize
next := (q.front + q.size) % BufferSize
q.q[next].msg = msg
q.q[next].err = nil
q.size++
Expand All @@ -143,17 +143,17 @@ func (q *msgQueue) dropped() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.size == PubsubBufferSize {
if q.size == BufferSize {
// queue is full, but we need to record that messages are being dropped,
// which we do at the last message in the queue. This potentially drops
// another message, but it's more important for the subscriber to know.
back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize
back := (q.front + BufferSize - 1) % BufferSize
q.q[back].msg = nil
q.q[back].err = ErrDroppedMessages
return
}
// queue is not full, insert the error
next := (q.front + q.size) % PubsubBufferSize
next := (q.front + q.size) % BufferSize
q.q[next].msg = nil
q.q[next].err = ErrDroppedMessages
q.size++
Expand All @@ -171,9 +171,9 @@ type pgPubsub struct {
queues map[string]map[uuid.UUID]*msgQueue
}

// PubsubBufferSize is the maximum number of unhandled messages we will buffer
// BufferSize is the maximum number of unhandled messages we will buffer
// for a subscriber before dropping messages.
const PubsubBufferSize = 2048
const BufferSize = 2048

// Subscribe calls the listener when an event matching the name is received.
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
Expand Down Expand Up @@ -295,8 +295,8 @@ func (p *pgPubsub) recordReconnect() {
}
}

// NewPubsub creates a new Pubsub implementation using a PostgreSQL connection.
func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) {
// New creates a new Pubsub implementation using a PostgreSQL connection.
func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) {
// Creates a new listener using pq.
errCh := make(chan error)
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(_ pq.ListenerEventType, err error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database
package pubsub

import (
"context"
Expand Down Expand Up @@ -26,7 +26,7 @@ func Test_msgQueue_ListenerWithError(t *testing.T) {
// PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned
// when we wrap around the end of the circular buffer. This tests that we correctly handle
// the wrapping and aren't dequeueing misaligned data.
cycles := (PubsubBufferSize / 5) * 2 // almost twice around the ring
cycles := (BufferSize / 5) * 2 // almost twice around the ring
for j := 0; j < cycles; j++ {
for i := 0; i < 4; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i)))
Expand Down Expand Up @@ -75,7 +75,7 @@ func Test_msgQueue_Listener(t *testing.T) {
// PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned
// when we wrap around the end of the circular buffer. This tests that we correctly handle
// the wrapping and aren't dequeueing misaligned data.
cycles := (PubsubBufferSize / 5) * 2 // almost twice around the ring
cycles := (BufferSize / 5) * 2 // almost twice around the ring
for j := 0; j < cycles; j++ {
for i := 0; i < 4; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i)))
Expand Down Expand Up @@ -119,7 +119,7 @@ func Test_msgQueue_Full(t *testing.T) {
// we send 2 more than the capacity. One extra because the call to the ListenerFunc blocks
// but only after we've dequeued a message, and then another extra because we want to exceed
// the capacity, not just reach it.
for i := 0; i < PubsubBufferSize+2; i++ {
for i := 0; i < BufferSize+2; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d", i)))
// ensure the first dequeue has happened before proceeding, so that this function isn't racing
// against the goroutine that dequeues items.
Expand All @@ -136,5 +136,5 @@ func Test_msgQueue_Full(t *testing.T) {
// Ok, so we sent 2 more than capacity, but we only read the capacity, that's because the last
// message we send doesn't get queued, AND, it bumps a message out of the queue to make room
// for the error, so we read 2 less than we sent.
require.Equal(t, PubsubBufferSize, n)
require.Equal(t, BufferSize, n)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database
package pubsub

import (
"context"
Expand Down Expand Up @@ -87,7 +87,7 @@ func (*memoryPubsub) Close() error {
return nil
}

func NewPubsubInMemory() Pubsub {
func NewInMemory() Pubsub {
return &memoryPubsub{
listeners: make(map[string]map[uuid.UUID]genericListener),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database_test
package pubsub_test

import (
"context"
Expand All @@ -7,7 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/pubsub"
)

func TestPubsubMemory(t *testing.T) {
Expand All @@ -16,7 +16,7 @@ func TestPubsubMemory(t *testing.T) {
t.Run("Legacy", func(t *testing.T) {
t.Parallel()

pubsub := database.NewPubsubInMemory()
pubsub := pubsub.NewInMemory()
event := "test"
data := "testing"
messageChannel := make(chan []byte)
Expand All @@ -36,7 +36,7 @@ func TestPubsubMemory(t *testing.T) {
t.Run("WithErr", func(t *testing.T) {
t.Parallel()

pubsub := database.NewPubsubInMemory()
pubsub := pubsub.NewInMemory()
event := "test"
data := "testing"
messageChannel := make(chan []byte)
Expand Down
Loading