Skip to content

feat: pubsub reports dropped messages #7660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Pubsub with errors tests and fixes
Signed-off-by: Spike Curtis <spike@coder.com>
  • Loading branch information
spikecurtis committed May 24, 2023
commit ede14157c94e679ead5c3105f50639169ccca613
22 changes: 15 additions & 7 deletions coderd/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
// Super unlikely, but it happened. See: https://github.com/coder/coder/runs/5375197003
var openPortMutex sync.Mutex

// Open creates a new PostgreSQL server using a Docker container.
// Open creates a new PostgreSQL database instance. With DB_FROM environment variable set, it clones a database
// from the provided template. With the environment variable unset, it creates a new Docker container running postgres.
func Open() (string, func(), error) {
if os.Getenv("DB_FROM") != "" {
// In CI, creating a Docker container for each test is slow.
Expand Down Expand Up @@ -51,7 +52,12 @@ func Open() (string, func(), error) {
// so cleaning up the container will clean up the database.
}, nil
}
return OpenContainerized(0)
}

// OpenContainerized creates a new PostgreSQL server using a Docker container. If port is nonzero, forward host traffic
// to that port to the database. If port is zero, allocate a free port from the OS.
func OpenContainerized(port int) (string, func(), error) {
pool, err := dockertest.NewPool("")
if err != nil {
return "", nil, xerrors.Errorf("create pool: %w", err)
Expand All @@ -63,12 +69,14 @@ func Open() (string, func(), error) {
}

openPortMutex.Lock()
// Pick an explicit port on the host to connect to 5432.
// This is necessary so we can configure the port to only use ipv4.
port, err := getFreePort()
if err != nil {
openPortMutex.Unlock()
return "", nil, xerrors.Errorf("get free port: %w", err)
if port == 0 {
// Pick an explicit port on the host to connect to 5432.
// This is necessary so we can configure the port to only use ipv4.
port, err = getFreePort()
if err != nil {
openPortMutex.Unlock()
return "", nil, xerrors.Errorf("get free port: %w", err)
}
}

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Expand Down
42 changes: 23 additions & 19 deletions coderd/database/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ type msgOrErr struct {
type msgQueue struct {
ctx context.Context
cond *sync.Cond
q [messageBufferSize]msgOrErr
q [PubsubBufferSize]msgOrErr
front int
back int
size int
closed bool
l Listener
le ListenerWithErr
Expand All @@ -74,15 +74,16 @@ func (q *msgQueue) run() {
for {
// wait until there is something on the queue or we are closed
q.cond.L.Lock()
for q.front == q.back && !q.closed {
for q.size == 0 && !q.closed {
q.cond.Wait()
}
if q.closed {
q.cond.L.Unlock()
return
}
item := q.q[q.front]
q.front = (q.front + 1) % messageBufferSize
q.front = (q.front + 1) % PubsubBufferSize
q.size--
q.cond.L.Unlock()

// process item without holding lock
Expand Down Expand Up @@ -110,22 +111,23 @@ func (q *msgQueue) enqueue(msg []byte) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

next := (q.back + 1) % messageBufferSize
if next == q.front {
if q.size == PubsubBufferSize {
// queue is full, so we're going to drop the msg we got called with.
// We also need to record that messages are being dropped, which we
// do at the last message in the queue. This potentially makes us
// lose 2 messages instead of one, but it's more important at this
// point to warn the subscriber that they're losing messages so they
// can do something about it.
q.q[q.back].msg = nil
q.q[q.back].err = ErrDroppedMessages
back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize
q.q[back].msg = nil
q.q[back].err = ErrDroppedMessages
return
}
// queue is not full, insert the message
q.back = next
next := (q.front + q.size) % PubsubBufferSize
q.q[next].msg = msg
q.q[next].err = nil
q.size++
q.cond.Broadcast()
}

Expand All @@ -141,19 +143,20 @@ func (q *msgQueue) dropped() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

next := (q.back + 1) % messageBufferSize
if next == q.front {
if q.size == PubsubBufferSize {
// queue is full, but we need to record that messages are being dropped,
// which we do at the last message in the queue. This potentially drops
// another message, but it's more important for the subscriber to know.
q.q[q.back].msg = nil
q.q[q.back].err = ErrDroppedMessages
back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize
q.q[back].msg = nil
q.q[back].err = ErrDroppedMessages
return
}
// queue is not full, insert the error
q.back = next
next := (q.front + q.size) % PubsubBufferSize
q.q[next].msg = nil
q.q[next].err = ErrDroppedMessages
q.size++
q.cond.Broadcast()
}

Expand All @@ -166,20 +169,20 @@ type pgPubsub struct {
queues map[string]map[uuid.UUID]*msgQueue
}

// messageBufferSize is the maximum number of unhandled messages we will buffer
// PubsubBufferSize is the maximum number of unhandled messages we will buffer
// for a subscriber before dropping messages.
const messageBufferSize = 2048
const PubsubBufferSize = 2048

// Subscribe calls the listener when an event matching the name is received.
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
return p.subscribe(event, newMsgQueue(p.ctx, listener, nil))
return p.subscribeQueue(event, newMsgQueue(p.ctx, listener, nil))
}

func (p *pgPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
return p.subscribe(event, newMsgQueue(p.ctx, nil, listener))
return p.subscribeQueue(event, newMsgQueue(p.ctx, nil, listener))
}

func (p *pgPubsub) subscribe(event string, newQ *msgQueue) (cancel func(), err error) {
func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
p.mut.Lock()
defer p.mut.Unlock()

Expand Down Expand Up @@ -249,6 +252,7 @@ func (p *pgPubsub) listen(ctx context.Context) {
// A nil notification can be dispatched on reconnect.
if notif == nil {
p.recordReconnect()
continue
}
p.listenReceive(notif)
}
Expand Down
140 changes: 140 additions & 0 deletions coderd/database/pubsub_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package database

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/coder/coder/testutil"
)

func Test_msgQueue_ListenerWithError(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
m := make(chan string)
e := make(chan error)
uut := newMsgQueue(ctx, nil, func(ctx context.Context, msg []byte, err error) {
m <- string(msg)
e <- err
})
defer uut.close()

// We're going to enqueue 4 messages and an error in a loop -- that is, a cycle of 5.
// PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned
// when we wrap around the end of the circular buffer. This tests that we correctly handle
// the wrapping and aren't dequeueing misaligned data.
cycles := (PubsubBufferSize / 5) * 2 // almost twice around the ring
for j := 0; j < cycles; j++ {
for i := 0; i < 4; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i)))
}
uut.dropped()
for i := 0; i < 4; i++ {
select {
case <-ctx.Done():
t.Fatal("timed out")
case msg := <-m:
require.Equal(t, fmt.Sprintf("%d%d", j, i), msg)
}
select {
case <-ctx.Done():
t.Fatal("timed out")
case err := <-e:
require.NoError(t, err)
}
}
select {
case <-ctx.Done():
t.Fatal("timed out")
case msg := <-m:
require.Equal(t, "", msg)
}
select {
case <-ctx.Done():
t.Fatal("timed out")
case err := <-e:
require.ErrorIs(t, err, ErrDroppedMessages)
}
}
}

func Test_msgQueue_Listener(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
m := make(chan string)
uut := newMsgQueue(ctx, func(ctx context.Context, msg []byte) {
m <- string(msg)
}, nil)
defer uut.close()

// We're going to enqueue 4 messages and an error in a loop -- that is, a cycle of 5.
// PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned
// when we wrap around the end of the circular buffer. This tests that we correctly handle
// the wrapping and aren't dequeueing misaligned data.
cycles := (PubsubBufferSize / 5) * 2 // almost twice around the ring
for j := 0; j < cycles; j++ {
for i := 0; i < 4; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i)))
}
uut.dropped()
for i := 0; i < 4; i++ {
select {
case <-ctx.Done():
t.Fatal("timed out")
case msg := <-m:
require.Equal(t, fmt.Sprintf("%d%d", j, i), msg)
}
}
// Listener skips over errors, so we only read out the 4 real messages.
}
}

func Test_msgQueue_Full(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()

firstDequeue := make(chan struct{})
allowRead := make(chan struct{})
n := 0
errors := make(chan error)
uut := newMsgQueue(ctx, nil, func(ctx context.Context, msg []byte, err error) {
if n == 0 {
close(firstDequeue)
}
<-allowRead
if err == nil {
require.Equal(t, fmt.Sprintf("%d", n), string(msg))
n++
return
}
errors <- err
})
defer uut.close()

// we send 2 more than the capacity. One extra because the call to the ListenerFunc blocks
// but only after we've dequeued a message, and then another extra because we want to exceed
// the capacity, not just reach it.
for i := 0; i < PubsubBufferSize+2; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d", i)))
// ensure the first dequeue has happened before proceeding, so that this function isn't racing
// against the goroutine that dequeues items.
<-firstDequeue
}
close(allowRead)

select {
case <-ctx.Done():
t.Fatal("timed out")
case err := <-errors:
require.ErrorIs(t, err, ErrDroppedMessages)
}
// Ok, so we sent 2 more than capacity, but we only read the capacity, that's because the last
// message we send doesn't get queued, AND, it bumps a message out of the queue to make room
// for the error, so we read 2 less than we sent.
require.Equal(t, PubsubBufferSize, n)
}
22 changes: 13 additions & 9 deletions coderd/database/pubsub_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,30 @@ type genericListener struct {
le ListenerWithErr
}

func (g genericListener) send(ctx context.Context, message []byte) {
if g.l != nil {
g.l(ctx, message)
}
if g.le != nil {
g.le(ctx, message, nil)
}
}

// memoryPubsub is an in-memory Pubsub implementation.
type memoryPubsub struct {
mut sync.RWMutex
listeners map[string]map[uuid.UUID]genericListener
}

func (m *memoryPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
return m.subscribe(event, genericListener{l: listener})
return m.subscribeGeneric(event, genericListener{l: listener})
}

func (m *memoryPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
return m.subscribe(event, genericListener{le: listener})
return m.subscribeGeneric(event, genericListener{le: listener})
}

func (m *memoryPubsub) subscribe(event string, listener genericListener) (cancel func(), err error) {
func (m *memoryPubsub) subscribeGeneric(event string, listener genericListener) (cancel func(), err error) {
m.mut.Lock()
defer m.mut.Unlock()

Expand Down Expand Up @@ -66,12 +75,7 @@ func (m *memoryPubsub) Publish(event string, message []byte) error {
listener := listener
go func() {
defer wg.Done()
if listener.l != nil {
listener.l(context.Background(), message)
}
if listener.le != nil {
listener.le(context.Background(), message, nil)
}
listener.send(context.Background(), message)
}()
}
wg.Wait()
Expand Down
Loading