Skip to content

feat: pubsub reports dropped messages #7660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions coderd/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
// Super unlikely, but it happened. See: https://github.com/coder/coder/runs/5375197003
var openPortMutex sync.Mutex

// Open creates a new PostgreSQL server using a Docker container.
// Open creates a new PostgreSQL database instance. With DB_FROM environment variable set, it clones a database
// from the provided template. With the environment variable unset, it creates a new Docker container running postgres.
func Open() (string, func(), error) {
if os.Getenv("DB_FROM") != "" {
// In CI, creating a Docker container for each test is slow.
Expand Down Expand Up @@ -51,7 +52,12 @@ func Open() (string, func(), error) {
// so cleaning up the container will clean up the database.
}, nil
}
return OpenContainerized(0)
}

// OpenContainerized creates a new PostgreSQL server using a Docker container. If port is nonzero, forward host traffic
// to that port to the database. If port is zero, allocate a free port from the OS.
func OpenContainerized(port int) (string, func(), error) {
pool, err := dockertest.NewPool("")
if err != nil {
return "", nil, xerrors.Errorf("create pool: %w", err)
Expand All @@ -63,12 +69,14 @@ func Open() (string, func(), error) {
}

openPortMutex.Lock()
// Pick an explicit port on the host to connect to 5432.
// This is necessary so we can configure the port to only use ipv4.
port, err := getFreePort()
if err != nil {
openPortMutex.Unlock()
return "", nil, xerrors.Errorf("get free port: %w", err)
if port == 0 {
// Pick an explicit port on the host to connect to 5432.
// This is necessary so we can configure the port to only use ipv4.
port, err = getFreePort()
if err != nil {
openPortMutex.Unlock()
return "", nil, xerrors.Errorf("get free port: %w", err)
}
}

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Expand Down
205 changes: 170 additions & 35 deletions coderd/database/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,174 @@ import (
// Listener represents a pubsub handler.
type Listener func(ctx context.Context, message []byte)

// ListenerWithErr represents a pubsub handler that can also receive error
// indications
type ListenerWithErr func(ctx context.Context, message []byte, err error)

// ErrDroppedMessages is sent to ListenerWithErr if messages are dropped or
// might have been dropped.
var ErrDroppedMessages = xerrors.New("dropped messages")

// Pubsub is a generic interface for broadcasting and receiving messages.
// Implementors should assume high-availability with the backing implementation.
type Pubsub interface {
Subscribe(event string, listener Listener) (cancel func(), err error)
SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error)
Publish(event string, message []byte) error
Close() error
}

// msgOrErr either contains a message or an error
type msgOrErr struct {
msg []byte
err error
}
Comment on lines +35 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on moving the in-memory pubsub to its own package? I feel like I made a mistake bundling it inside database when I created it, and this seems like an opportunity to fix that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to move it, but probably will open a separate PR. Where do you think it should go? subpackage of database?

We could move the interface & in-memory pubsub to coderd/pubsub?


// msgQueue implements a fixed length queue with the ability to replace elements
// after they are queued (but before they are dequeued).
//
// The purpose of this data structure is to build something that works a bit
// like a golang channel, but if the queue is full, then we can replace the
// last element with an error so that the subscriber can get notified that some
// messages were dropped, all without blocking.
type msgQueue struct {
ctx context.Context
cond *sync.Cond
q [PubsubBufferSize]msgOrErr
front int
size int
closed bool
l Listener
le ListenerWithErr
}

func newMsgQueue(ctx context.Context, l Listener, le ListenerWithErr) *msgQueue {
if l == nil && le == nil {
panic("l or le must be non-nil")
}
q := &msgQueue{
ctx: ctx,
cond: sync.NewCond(&sync.Mutex{}),
l: l,
le: le,
}
go q.run()
return q
}

func (q *msgQueue) run() {
for {
// wait until there is something on the queue or we are closed
q.cond.L.Lock()
for q.size == 0 && !q.closed {
q.cond.Wait()
}
if q.closed {
q.cond.L.Unlock()
return
}
item := q.q[q.front]
q.front = (q.front + 1) % PubsubBufferSize
q.size--
q.cond.L.Unlock()

// process item without holding lock
if item.err == nil {
// real message
if q.l != nil {
q.l(q.ctx, item.msg)
continue
}
if q.le != nil {
q.le(q.ctx, item.msg, nil)
continue
}
// unhittable
continue
}
// if the listener wants errors, send it.
if q.le != nil {
q.le(q.ctx, nil, item.err)
}
}
}

func (q *msgQueue) enqueue(msg []byte) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.size == PubsubBufferSize {
// queue is full, so we're going to drop the msg we got called with.
// We also need to record that messages are being dropped, which we
// do at the last message in the queue. This potentially makes us
// lose 2 messages instead of one, but it's more important at this
// point to warn the subscriber that they're losing messages so they
// can do something about it.
back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize
q.q[back].msg = nil
q.q[back].err = ErrDroppedMessages
return
}
// queue is not full, insert the message
next := (q.front + q.size) % PubsubBufferSize
q.q[next].msg = msg
q.q[next].err = nil
q.size++
q.cond.Broadcast()
}

func (q *msgQueue) close() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
defer q.cond.Broadcast()
q.closed = true
}

// dropped records an error in the queue that messages might have been dropped
func (q *msgQueue) dropped() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.size == PubsubBufferSize {
// queue is full, but we need to record that messages are being dropped,
// which we do at the last message in the queue. This potentially drops
// another message, but it's more important for the subscriber to know.
back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize
q.q[back].msg = nil
q.q[back].err = ErrDroppedMessages
return
}
// queue is not full, insert the error
next := (q.front + q.size) % PubsubBufferSize
q.q[next].msg = nil
q.q[next].err = ErrDroppedMessages
q.size++
q.cond.Broadcast()
}

// Pubsub implementation using PostgreSQL.
type pgPubsub struct {
ctx context.Context
pgListener *pq.Listener
db *sql.DB
mut sync.Mutex
listeners map[string]map[uuid.UUID]chan<- []byte
queues map[string]map[uuid.UUID]*msgQueue
}

// messageBufferSize is the maximum number of unhandled messages we will buffer
// PubsubBufferSize is the maximum number of unhandled messages we will buffer
// for a subscriber before dropping messages.
const messageBufferSize = 2048
const PubsubBufferSize = 2048

// Subscribe calls the listener when an event matching the name is received.
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
return p.subscribeQueue(event, newMsgQueue(p.ctx, listener, nil))
}

func (p *pgPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
return p.subscribeQueue(event, newMsgQueue(p.ctx, nil, listener))
}

func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
p.mut.Lock()
defer p.mut.Unlock()

Expand All @@ -50,23 +195,20 @@ func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), er
return nil, xerrors.Errorf("listen: %w", err)
}

var eventListeners map[uuid.UUID]chan<- []byte
var eventQs map[uuid.UUID]*msgQueue
var ok bool
if eventListeners, ok = p.listeners[event]; !ok {
eventListeners = make(map[uuid.UUID]chan<- []byte)
p.listeners[event] = eventListeners
if eventQs, ok = p.queues[event]; !ok {
eventQs = make(map[uuid.UUID]*msgQueue)
p.queues[event] = eventQs
}

ctx, cancelCallbacks := context.WithCancel(p.ctx)
messages := make(chan []byte, messageBufferSize)
go messagesToListener(ctx, messages, listener)
id := uuid.New()
eventListeners[id] = messages
eventQs[id] = newQ
return func() {
p.mut.Lock()
defer p.mut.Unlock()
cancelCallbacks()
listeners := p.listeners[event]
listeners := p.queues[event]
q := listeners[id]
q.close()
delete(listeners, id)

if len(listeners) == 0 {
Expand Down Expand Up @@ -109,6 +251,7 @@ func (p *pgPubsub) listen(ctx context.Context) {
}
// A nil notification can be dispatched on reconnect.
if notif == nil {
p.recordReconnect()
continue
}
p.listenReceive(notif)
Expand All @@ -118,19 +261,22 @@ func (p *pgPubsub) listen(ctx context.Context) {
func (p *pgPubsub) listenReceive(notif *pq.Notification) {
p.mut.Lock()
defer p.mut.Unlock()
listeners, ok := p.listeners[notif.Channel]
queues, ok := p.queues[notif.Channel]
if !ok {
return
}
extra := []byte(notif.Extra)
for _, listener := range listeners {
select {
case listener <- extra:
// ok!
default:
// bad news, we dropped the event because the listener isn't
// keeping up
// TODO (spike): figure out a way to communicate this to the Listener
for _, q := range queues {
q.enqueue(extra)
}
}

func (p *pgPubsub) recordReconnect() {
p.mut.Lock()
defer p.mut.Unlock()
for _, listeners := range p.queues {
for _, q := range listeners {
q.dropped()
}
}
}
Expand Down Expand Up @@ -162,20 +308,9 @@ func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub
ctx: ctx,
db: database,
pgListener: listener,
listeners: make(map[string]map[uuid.UUID]chan<- []byte),
queues: make(map[string]map[uuid.UUID]*msgQueue),
}
go pgPubsub.listen(ctx)

return pgPubsub, nil
}

func messagesToListener(ctx context.Context, messages <-chan []byte, listener Listener) {
for {
select {
case <-ctx.Done():
return
case m := <-messages:
listener(ctx, m)
}
}
}
Loading