@@ -129,25 +129,25 @@ type Manager struct {
129
129
}
130
130
131
131
// loop runs the replica update sequence on an update interval.
132
- func (s * Manager ) loop (ctx context.Context ) {
133
- defer s .closeWait .Done ()
134
- ticker := time .NewTicker (s .options .UpdateInterval )
132
+ func (m * Manager ) loop (ctx context.Context ) {
133
+ defer m .closeWait .Done ()
134
+ ticker := time .NewTicker (m .options .UpdateInterval )
135
135
defer ticker .Stop ()
136
136
for {
137
137
select {
138
138
case <- ctx .Done ():
139
139
return
140
140
case <- ticker .C :
141
141
}
142
- err := s .run (ctx )
142
+ err := m .run (ctx )
143
143
if err != nil && ! errors .Is (err , context .Canceled ) {
144
- s .logger .Warn (ctx , "run replica update loop" , slog .Error (err ))
144
+ m .logger .Warn (ctx , "run replica update loop" , slog .Error (err ))
145
145
}
146
146
}
147
147
}
148
148
149
149
// subscribe listens for new replica information!
150
- func (s * Manager ) subscribe (ctx context.Context ) error {
150
+ func (m * Manager ) subscribe (ctx context.Context ) error {
151
151
needsUpdate := false
152
152
updating := false
153
153
updateMutex := sync.Mutex {}
@@ -158,9 +158,9 @@ func (s *Manager) subscribe(ctx context.Context) error {
158
158
// it will reprocess afterwards.
159
159
var update func ()
160
160
update = func () {
161
- err := s .run (ctx )
161
+ err := m .run (ctx )
162
162
if err != nil && ! errors .Is (err , context .Canceled ) {
163
- s .logger .Error (ctx , "run replica from subscribe" , slog .Error (err ))
163
+ m .logger .Error (ctx , "run replica from subscribe" , slog .Error (err ))
164
164
}
165
165
updateMutex .Lock ()
166
166
if needsUpdate {
@@ -172,15 +172,15 @@ func (s *Manager) subscribe(ctx context.Context) error {
172
172
updating = false
173
173
updateMutex .Unlock ()
174
174
}
175
- cancelFunc , err := s .pubsub .Subscribe (PubsubEvent , func (ctx context.Context , message []byte ) {
175
+ cancelFunc , err := m .pubsub .Subscribe (PubsubEvent , func (ctx context.Context , message []byte ) {
176
176
updateMutex .Lock ()
177
177
defer updateMutex .Unlock ()
178
178
id , err := uuid .Parse (string (message ))
179
179
if err != nil {
180
180
return
181
181
}
182
182
// Don't process updates for ourself!
183
- if id == s .options .ID {
183
+ if id == m .options .ID {
184
184
return
185
185
}
186
186
if updating {
@@ -200,46 +200,46 @@ func (s *Manager) subscribe(ctx context.Context) error {
200
200
return nil
201
201
}
202
202
203
- func (s * Manager ) run (ctx context.Context ) error {
204
- s .closeMutex .Lock ()
205
- s .closeWait .Add (1 )
206
- s .closeMutex .Unlock ()
203
+ func (m * Manager ) run (ctx context.Context ) error {
204
+ m .closeMutex .Lock ()
205
+ m .closeWait .Add (1 )
206
+ m .closeMutex .Unlock ()
207
207
go func () {
208
- s .closeWait .Done ()
208
+ m .closeWait .Done ()
209
209
}()
210
210
// Expect replicas to update once every three times the interval...
211
211
// If they don't, assume death!
212
- replicas , err := s .db .GetReplicasUpdatedAfter (ctx , database .Now ().Add (- 3 * s .options .UpdateInterval ))
212
+ replicas , err := m .db .GetReplicasUpdatedAfter (ctx , database .Now ().Add (- 3 * m .options .UpdateInterval ))
213
213
if err != nil {
214
214
return xerrors .Errorf ("get replicas: %w" , err )
215
215
}
216
216
217
- s .mutex .Lock ()
218
- s .peers = make ([]database.Replica , 0 , len (replicas ))
217
+ m .mutex .Lock ()
218
+ m .peers = make ([]database.Replica , 0 , len (replicas ))
219
219
for _ , replica := range replicas {
220
- if replica .ID == s .options .ID {
220
+ if replica .ID == m .options .ID {
221
221
continue
222
222
}
223
- s .peers = append (s .peers , replica )
223
+ m .peers = append (m .peers , replica )
224
224
}
225
- s .mutex .Unlock ()
225
+ m .mutex .Unlock ()
226
226
227
227
var wg sync.WaitGroup
228
228
var mu sync.Mutex
229
229
failed := make ([]string , 0 )
230
- for _ , peer := range s .Regional () {
230
+ for _ , peer := range m .Regional () {
231
231
wg .Add (1 )
232
232
peer := peer
233
233
go func () {
234
234
defer wg .Done ()
235
235
req , err := http .NewRequestWithContext (ctx , http .MethodGet , peer .RelayAddress , nil )
236
236
if err != nil {
237
- s .logger .Error (ctx , "create http request for relay probe" ,
237
+ m .logger .Error (ctx , "create http request for relay probe" ,
238
238
slog .F ("relay_address" , peer .RelayAddress ), slog .Error (err ))
239
239
return
240
240
}
241
241
client := http.Client {
242
- Timeout : s .options .PeerTimeout ,
242
+ Timeout : m .options .PeerTimeout ,
243
243
}
244
244
res , err := client .Do (req )
245
245
if err != nil {
@@ -260,58 +260,58 @@ func (s *Manager) run(ctx context.Context) error {
260
260
}
261
261
}
262
262
263
- replica , err := s .db .UpdateReplica (ctx , database.UpdateReplicaParams {
264
- ID : s .self .ID ,
263
+ replica , err := m .db .UpdateReplica (ctx , database.UpdateReplicaParams {
264
+ ID : m .self .ID ,
265
265
UpdatedAt : database .Now (),
266
- StartedAt : s .self .StartedAt ,
267
- StoppedAt : s .self .StoppedAt ,
268
- RelayAddress : s .self .RelayAddress ,
269
- RegionID : s .self .RegionID ,
270
- Hostname : s .self .Hostname ,
271
- Version : s .self .Version ,
266
+ StartedAt : m .self .StartedAt ,
267
+ StoppedAt : m .self .StoppedAt ,
268
+ RelayAddress : m .self .RelayAddress ,
269
+ RegionID : m .self .RegionID ,
270
+ Hostname : m .self .Hostname ,
271
+ Version : m .self .Version ,
272
272
Error : replicaError ,
273
273
})
274
274
if err != nil {
275
275
return xerrors .Errorf ("update replica: %w" , err )
276
276
}
277
- s .mutex .Lock ()
278
- if s .self .Error .String != replica .Error .String {
277
+ m .mutex .Lock ()
278
+ if m .self .Error .String != replica .Error .String {
279
279
// Publish an update occurred!
280
- err = s .pubsub .Publish (PubsubEvent , []byte (s .self .ID .String ()))
280
+ err = m .pubsub .Publish (PubsubEvent , []byte (m .self .ID .String ()))
281
281
if err != nil {
282
- s .mutex .Unlock ()
282
+ m .mutex .Unlock ()
283
283
return xerrors .Errorf ("publish replica update: %w" , err )
284
284
}
285
285
}
286
- s .self = replica
287
- if s .callback != nil {
288
- go s .callback ()
286
+ m .self = replica
287
+ if m .callback != nil {
288
+ go m .callback ()
289
289
}
290
- s .mutex .Unlock ()
290
+ m .mutex .Unlock ()
291
291
return nil
292
292
}
293
293
294
294
// Self represents the current replica.
295
- func (s * Manager ) Self () database.Replica {
296
- s .mutex .Lock ()
297
- defer s .mutex .Unlock ()
298
- return s .self
295
+ func (m * Manager ) Self () database.Replica {
296
+ m .mutex .Lock ()
297
+ defer m .mutex .Unlock ()
298
+ return m .self
299
299
}
300
300
301
301
// All returns every replica, including itself.
302
- func (s * Manager ) All () []database.Replica {
303
- s .mutex .Lock ()
304
- defer s .mutex .Unlock ()
305
- return append (s .peers , s .self )
302
+ func (m * Manager ) All () []database.Replica {
303
+ m .mutex .Lock ()
304
+ defer m .mutex .Unlock ()
305
+ return append (m .peers , m .self )
306
306
}
307
307
308
308
// Regional returns all replicas in the same region excluding itself.
309
- func (s * Manager ) Regional () []database.Replica {
310
- s .mutex .Lock ()
311
- defer s .mutex .Unlock ()
309
+ func (m * Manager ) Regional () []database.Replica {
310
+ m .mutex .Lock ()
311
+ defer m .mutex .Unlock ()
312
312
replicas := make ([]database.Replica , 0 )
313
- for _ , replica := range s .peers {
314
- if replica .RegionID != s .self .RegionID {
313
+ for _ , replica := range m .peers {
314
+ if replica .RegionID != m .self .RegionID {
315
315
continue
316
316
}
317
317
replicas = append (replicas , replica )
@@ -321,47 +321,47 @@ func (s *Manager) Regional() []database.Replica {
321
321
322
322
// SetCallback sets a function to execute whenever new peers
323
323
// are refreshed or updated.
324
- func (s * Manager ) SetCallback (callback func ()) {
325
- s .mutex .Lock ()
326
- defer s .mutex .Unlock ()
327
- s .callback = callback
324
+ func (m * Manager ) SetCallback (callback func ()) {
325
+ m .mutex .Lock ()
326
+ defer m .mutex .Unlock ()
327
+ m .callback = callback
328
328
// Instantly call the callback to inform replicas!
329
329
go callback ()
330
330
}
331
331
332
- func (s * Manager ) Close () error {
333
- s .closeMutex .Lock ()
332
+ func (m * Manager ) Close () error {
333
+ m .closeMutex .Lock ()
334
334
select {
335
- case <- s .closed :
336
- s .closeMutex .Unlock ()
335
+ case <- m .closed :
336
+ m .closeMutex .Unlock ()
337
337
return nil
338
338
default :
339
339
}
340
- close (s .closed )
341
- s .closeCancel ()
342
- s .closeWait .Wait ()
343
- s .closeMutex .Unlock ()
340
+ close (m .closed )
341
+ m .closeCancel ()
342
+ m .closeWait .Wait ()
343
+ m .closeMutex .Unlock ()
344
344
345
345
ctx , cancelFunc := context .WithTimeout (context .Background (), 5 * time .Second )
346
346
defer cancelFunc ()
347
- _ , err := s .db .UpdateReplica (ctx , database.UpdateReplicaParams {
348
- ID : s .self .ID ,
347
+ _ , err := m .db .UpdateReplica (ctx , database.UpdateReplicaParams {
348
+ ID : m .self .ID ,
349
349
UpdatedAt : database .Now (),
350
- StartedAt : s .self .StartedAt ,
350
+ StartedAt : m .self .StartedAt ,
351
351
StoppedAt : sql.NullTime {
352
352
Time : database .Now (),
353
353
Valid : true ,
354
354
},
355
- RelayAddress : s .self .RelayAddress ,
356
- RegionID : s .self .RegionID ,
357
- Hostname : s .self .Hostname ,
358
- Version : s .self .Version ,
359
- Error : s .self .Error ,
355
+ RelayAddress : m .self .RelayAddress ,
356
+ RegionID : m .self .RegionID ,
357
+ Hostname : m .self .Hostname ,
358
+ Version : m .self .Version ,
359
+ Error : m .self .Error ,
360
360
})
361
361
if err != nil {
362
362
return xerrors .Errorf ("update replica: %w" , err )
363
363
}
364
- err = s .pubsub .Publish (PubsubEvent , []byte (s .self .ID .String ()))
364
+ err = m .pubsub .Publish (PubsubEvent , []byte (m .self .ID .String ()))
365
365
if err != nil {
366
366
return xerrors .Errorf ("publish replica update: %w" , err )
367
367
}
0 commit comments