@@ -25,12 +25,17 @@ type Pubsub interface {
25
25
26
26
// Pubsub implementation using PostgreSQL.
27
27
type pgPubsub struct {
28
+ ctx context.Context
28
29
pgListener * pq.Listener
29
30
db * sql.DB
30
31
mut sync.Mutex
31
- listeners map [string ]map [uuid.UUID ]Listener
32
+ listeners map [string ]map [uuid.UUID ]chan <- [] byte
32
33
}
33
34
35
+ // messageBufferSize is the maximum number of unhandled messages we will buffer
36
+ // for a subscriber before dropping messages.
37
+ const messageBufferSize = 2048
38
+
34
39
// Subscribe calls the listener when an event matching the name is received.
35
40
func (p * pgPubsub ) Subscribe (event string , listener Listener ) (cancel func (), err error ) {
36
41
p .mut .Lock ()
@@ -45,25 +50,22 @@ func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), er
45
50
return nil , xerrors .Errorf ("listen: %w" , err )
46
51
}
47
52
48
- var eventListeners map [uuid.UUID ]Listener
53
+ var eventListeners map [uuid.UUID ]chan <- [] byte
49
54
var ok bool
50
55
if eventListeners , ok = p .listeners [event ]; ! ok {
51
- eventListeners = map [uuid.UUID ]Listener {}
56
+ eventListeners = make ( map [uuid.UUID ]chan <- [] byte )
52
57
p .listeners [event ] = eventListeners
53
58
}
54
59
55
- var id uuid.UUID
56
- for {
57
- id = uuid .New ()
58
- if _ , ok = eventListeners [id ]; ! ok {
59
- break
60
- }
61
- }
62
-
63
- eventListeners [id ] = listener
60
+ ctx , cancelCallbacks := context .WithCancel (p .ctx )
61
+ messages := make (chan []byte , messageBufferSize )
62
+ go messagesToListener (ctx , messages , listener )
63
+ id := uuid .New ()
64
+ eventListeners [id ] = messages
64
65
return func () {
65
66
p .mut .Lock ()
66
67
defer p .mut .Unlock ()
68
+ cancelCallbacks ()
67
69
listeners := p .listeners [event ]
68
70
delete (listeners , id )
69
71
@@ -109,11 +111,11 @@ func (p *pgPubsub) listen(ctx context.Context) {
109
111
if notif == nil {
110
112
continue
111
113
}
112
- p .listenReceive (ctx , notif )
114
+ p .listenReceive (notif )
113
115
}
114
116
}
115
117
116
- func (p * pgPubsub ) listenReceive (ctx context. Context , notif * pq.Notification ) {
118
+ func (p * pgPubsub ) listenReceive (notif * pq.Notification ) {
117
119
p .mut .Lock ()
118
120
defer p .mut .Unlock ()
119
121
listeners , ok := p .listeners [notif .Channel ]
@@ -122,7 +124,14 @@ func (p *pgPubsub) listenReceive(ctx context.Context, notif *pq.Notification) {
122
124
}
123
125
extra := []byte (notif .Extra )
124
126
for _ , listener := range listeners {
125
- go listener (ctx , extra )
127
+ select {
128
+ case listener <- extra :
129
+ // ok!
130
+ default :
131
+ // bad news, we dropped the event because the listener isn't
132
+ // keeping up
133
+ // TODO (spike): figure out a way to communicate this to the Listener
134
+ }
126
135
}
127
136
}
128
137
@@ -150,11 +159,23 @@ func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub
150
159
return nil , ctx .Err ()
151
160
}
152
161
pgPubsub := & pgPubsub {
162
+ ctx : ctx ,
153
163
db : database ,
154
164
pgListener : listener ,
155
- listeners : make (map [string ]map [uuid.UUID ]Listener ),
165
+ listeners : make (map [string ]map [uuid.UUID ]chan <- [] byte ),
156
166
}
157
167
go pgPubsub .listen (ctx )
158
168
159
169
return pgPubsub , nil
160
170
}
171
+
172
+ func messagesToListener (ctx context.Context , messages <- chan []byte , listener Listener ) {
173
+ for {
174
+ select {
175
+ case <- ctx .Done ():
176
+ return
177
+ case m := <- messages :
178
+ listener (ctx , m )
179
+ }
180
+ }
181
+ }
0 commit comments