Skip to content

chore: separate pubsub into a new package #8017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cli/server.go
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@ import (
"github.com/coder/coder/coderd/database/dbmetrics"
"github.com/coder/coder/coderd/database/dbpurge"
"github.com/coder/coder/coderd/database/migrations"
"github.com/coder/coder/coderd/database/pubsub"
"github.com/coder/coder/coderd/devtunnel"
"github.com/coder/coder/coderd/gitauth"
"github.com/coder/coder/coderd/gitsshkey"
@@ -463,7 +464,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
Logger: logger.Named("coderd"),
Database: dbfake.New(),
DERPMap: derpMap,
Pubsub: database.NewPubsubInMemory(),
Pubsub: pubsub.NewInMemory(),
CacheDir: cacheDir,
GoogleTokenValidator: googleTokenValidator,
GitAuthConfigs: gitAuthConfigs,
@@ -589,7 +590,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
if cfg.InMemoryDatabase {
// This is only used for testing.
options.Database = dbmetrics.New(dbfake.New(), options.PrometheusRegistry)
options.Pubsub = database.NewPubsubInMemory()
options.Pubsub = pubsub.NewInMemory()
} else {
sqlDB, err := connectToPostgres(ctx, logger, sqlDriver, cfg.PostgresURL.String())
if err != nil {
@@ -600,7 +601,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
}()

options.Database = dbmetrics.New(database.New(sqlDB), options.PrometheusRegistry)
options.Pubsub, err = database.NewPubsub(ctx, sqlDB, cfg.PostgresURL.String())
options.Pubsub, err = pubsub.New(ctx, sqlDB, cfg.PostgresURL.String())
if err != nil {
return xerrors.Errorf("create pubsub: %w", err)
}
3 changes: 2 additions & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ import (
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
"github.com/coder/coder/coderd/database/dbmetrics"
"github.com/coder/coder/coderd/database/pubsub"
"github.com/coder/coder/coderd/gitauth"
"github.com/coder/coder/coderd/gitsshkey"
"github.com/coder/coder/coderd/healthcheck"
@@ -95,7 +96,7 @@ type Options struct {
AppHostnameRegex *regexp.Regexp
Logger slog.Logger
Database database.Store
Pubsub database.Pubsub
Pubsub pubsub.Pubsub

// CacheDir is used for caching files served by the API.
CacheDir string
3 changes: 2 additions & 1 deletion coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@ import (
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
"github.com/coder/coder/coderd/database/dbtestutil"
"github.com/coder/coder/coderd/database/pubsub"
"github.com/coder/coder/coderd/gitauth"
"github.com/coder/coder/coderd/gitsshkey"
"github.com/coder/coder/coderd/healthcheck"
@@ -130,7 +131,7 @@ type Options struct {
// It should only be used in cases where multiple Coder
// test instances are running against the same database.
Database database.Store
Pubsub database.Pubsub
Pubsub pubsub.Pubsub

ConfigSSH codersdk.SSHConfigResponse

11 changes: 6 additions & 5 deletions coderd/database/dbtestutil/db.go
Original file line number Diff line number Diff line change
@@ -11,13 +11,14 @@ import (
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbfake"
"github.com/coder/coder/coderd/database/postgres"
"github.com/coder/coder/coderd/database/pubsub"
)

func NewDB(t testing.TB) (database.Store, database.Pubsub) {
func NewDB(t testing.TB) (database.Store, pubsub.Pubsub) {
t.Helper()

db := dbfake.New()
pubsub := database.NewPubsubInMemory()
ps := pubsub.NewInMemory()
if os.Getenv("DB") != "" {
connectionURL := os.Getenv("CODER_PG_CONNECTION_URL")
if connectionURL == "" {
@@ -36,12 +37,12 @@ func NewDB(t testing.TB) (database.Store, database.Pubsub) {
})
db = database.New(sqlDB)

pubsub, err = database.NewPubsub(context.Background(), sqlDB, connectionURL)
ps, err = pubsub.New(context.Background(), sqlDB, connectionURL)
require.NoError(t, err)
t.Cleanup(func() {
_ = pubsub.Close()
_ = ps.Close()
})
}

return db, pubsub
return db, ps
}
26 changes: 13 additions & 13 deletions coderd/database/pubsub.go → coderd/database/pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database
package pubsub

import (
"context"
@@ -48,7 +48,7 @@ type msgOrErr struct {
type msgQueue struct {
ctx context.Context
cond *sync.Cond
q [PubsubBufferSize]msgOrErr
q [BufferSize]msgOrErr
front int
size int
closed bool
@@ -82,7 +82,7 @@ func (q *msgQueue) run() {
return
}
item := q.q[q.front]
q.front = (q.front + 1) % PubsubBufferSize
q.front = (q.front + 1) % BufferSize
q.size--
q.cond.L.Unlock()

@@ -111,20 +111,20 @@ func (q *msgQueue) enqueue(msg []byte) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.size == PubsubBufferSize {
if q.size == BufferSize {
// queue is full, so we're going to drop the msg we got called with.
// We also need to record that messages are being dropped, which we
// do at the last message in the queue. This potentially makes us
// lose 2 messages instead of one, but it's more important at this
// point to warn the subscriber that they're losing messages so they
// can do something about it.
back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize
back := (q.front + BufferSize - 1) % BufferSize
q.q[back].msg = nil
q.q[back].err = ErrDroppedMessages
return
}
// queue is not full, insert the message
next := (q.front + q.size) % PubsubBufferSize
next := (q.front + q.size) % BufferSize
q.q[next].msg = msg
q.q[next].err = nil
q.size++
@@ -143,17 +143,17 @@ func (q *msgQueue) dropped() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.size == PubsubBufferSize {
if q.size == BufferSize {
// queue is full, but we need to record that messages are being dropped,
// which we do at the last message in the queue. This potentially drops
// another message, but it's more important for the subscriber to know.
back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize
back := (q.front + BufferSize - 1) % BufferSize
q.q[back].msg = nil
q.q[back].err = ErrDroppedMessages
return
}
// queue is not full, insert the error
next := (q.front + q.size) % PubsubBufferSize
next := (q.front + q.size) % BufferSize
q.q[next].msg = nil
q.q[next].err = ErrDroppedMessages
q.size++
@@ -171,9 +171,9 @@ type pgPubsub struct {
queues map[string]map[uuid.UUID]*msgQueue
}

// PubsubBufferSize is the maximum number of unhandled messages we will buffer
// BufferSize is the maximum number of unhandled messages we will buffer
// for a subscriber before dropping messages.
const PubsubBufferSize = 2048
const BufferSize = 2048

// Subscribe calls the listener when an event matching the name is received.
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
@@ -295,8 +295,8 @@ func (p *pgPubsub) recordReconnect() {
}
}

// NewPubsub creates a new Pubsub implementation using a PostgreSQL connection.
func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) {
// New creates a new Pubsub implementation using a PostgreSQL connection.
func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) {
// Creates a new listener using pq.
errCh := make(chan error)
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(_ pq.ListenerEventType, err error) {
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database
package pubsub

import (
"context"
@@ -26,7 +26,7 @@ func Test_msgQueue_ListenerWithError(t *testing.T) {
// PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned
// when we wrap around the end of the circular buffer. This tests that we correctly handle
// the wrapping and aren't dequeueing misaligned data.
cycles := (PubsubBufferSize / 5) * 2 // almost twice around the ring
cycles := (BufferSize / 5) * 2 // almost twice around the ring
for j := 0; j < cycles; j++ {
for i := 0; i < 4; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i)))
@@ -75,7 +75,7 @@ func Test_msgQueue_Listener(t *testing.T) {
// PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned
// when we wrap around the end of the circular buffer. This tests that we correctly handle
// the wrapping and aren't dequeueing misaligned data.
cycles := (PubsubBufferSize / 5) * 2 // almost twice around the ring
cycles := (BufferSize / 5) * 2 // almost twice around the ring
for j := 0; j < cycles; j++ {
for i := 0; i < 4; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i)))
@@ -119,7 +119,7 @@ func Test_msgQueue_Full(t *testing.T) {
// we send 2 more than the capacity. One extra because the call to the ListenerFunc blocks
// but only after we've dequeued a message, and then another extra because we want to exceed
// the capacity, not just reach it.
for i := 0; i < PubsubBufferSize+2; i++ {
for i := 0; i < BufferSize+2; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d", i)))
// ensure the first dequeue has happened before proceeding, so that this function isn't racing
// against the goroutine that dequeues items.
@@ -136,5 +136,5 @@ func Test_msgQueue_Full(t *testing.T) {
// Ok, so we sent 2 more than capacity, but we only read the capacity, that's because the last
// message we send doesn't get queued, AND, it bumps a message out of the queue to make room
// for the error, so we read 2 less than we sent.
require.Equal(t, PubsubBufferSize, n)
require.Equal(t, BufferSize, n)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database
package pubsub

import (
"context"
@@ -87,7 +87,7 @@ func (*memoryPubsub) Close() error {
return nil
}

func NewPubsubInMemory() Pubsub {
func NewInMemory() Pubsub {
return &memoryPubsub{
listeners: make(map[string]map[uuid.UUID]genericListener),
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database_test
package pubsub_test

import (
"context"
@@ -7,7 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/pubsub"
)

func TestPubsubMemory(t *testing.T) {
@@ -16,7 +16,7 @@ func TestPubsubMemory(t *testing.T) {
t.Run("Legacy", func(t *testing.T) {
t.Parallel()

pubsub := database.NewPubsubInMemory()
pubsub := pubsub.NewInMemory()
event := "test"
data := "testing"
messageChannel := make(chan []byte)
@@ -36,7 +36,7 @@ func TestPubsubMemory(t *testing.T) {
t.Run("WithErr", func(t *testing.T) {
t.Parallel()

pubsub := database.NewPubsubInMemory()
pubsub := pubsub.NewInMemory()
event := "test"
data := "testing"
messageChannel := make(chan []byte)
Loading