Skip to content

Commit 557b390

Browse files
committed
Rename function pointer
1 parent ff5968b commit 557b390

File tree

2 files changed

+74
-96
lines changed

2 files changed

+74
-96
lines changed

codersdk/replicas.go

Lines changed: 0 additions & 22 deletions
This file was deleted.

enterprise/replicasync/replicasync.go

Lines changed: 74 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -129,25 +129,25 @@ type Manager struct {
129129
}
130130

131131
// loop runs the replica update sequence on an update interval.
132-
func (s *Manager) loop(ctx context.Context) {
133-
defer s.closeWait.Done()
134-
ticker := time.NewTicker(s.options.UpdateInterval)
132+
func (m *Manager) loop(ctx context.Context) {
133+
defer m.closeWait.Done()
134+
ticker := time.NewTicker(m.options.UpdateInterval)
135135
defer ticker.Stop()
136136
for {
137137
select {
138138
case <-ctx.Done():
139139
return
140140
case <-ticker.C:
141141
}
142-
err := s.run(ctx)
142+
err := m.run(ctx)
143143
if err != nil && !errors.Is(err, context.Canceled) {
144-
s.logger.Warn(ctx, "run replica update loop", slog.Error(err))
144+
m.logger.Warn(ctx, "run replica update loop", slog.Error(err))
145145
}
146146
}
147147
}
148148

149149
// subscribe listens for new replica information!
150-
func (s *Manager) subscribe(ctx context.Context) error {
150+
func (m *Manager) subscribe(ctx context.Context) error {
151151
needsUpdate := false
152152
updating := false
153153
updateMutex := sync.Mutex{}
@@ -158,9 +158,9 @@ func (s *Manager) subscribe(ctx context.Context) error {
158158
// it will reprocess afterwards.
159159
var update func()
160160
update = func() {
161-
err := s.run(ctx)
161+
err := m.run(ctx)
162162
if err != nil && !errors.Is(err, context.Canceled) {
163-
s.logger.Error(ctx, "run replica from subscribe", slog.Error(err))
163+
m.logger.Error(ctx, "run replica from subscribe", slog.Error(err))
164164
}
165165
updateMutex.Lock()
166166
if needsUpdate {
@@ -172,15 +172,15 @@ func (s *Manager) subscribe(ctx context.Context) error {
172172
updating = false
173173
updateMutex.Unlock()
174174
}
175-
cancelFunc, err := s.pubsub.Subscribe(PubsubEvent, func(ctx context.Context, message []byte) {
175+
cancelFunc, err := m.pubsub.Subscribe(PubsubEvent, func(ctx context.Context, message []byte) {
176176
updateMutex.Lock()
177177
defer updateMutex.Unlock()
178178
id, err := uuid.Parse(string(message))
179179
if err != nil {
180180
return
181181
}
182182
// Don't process updates for ourself!
183-
if id == s.options.ID {
183+
if id == m.options.ID {
184184
return
185185
}
186186
if updating {
@@ -200,46 +200,46 @@ func (s *Manager) subscribe(ctx context.Context) error {
200200
return nil
201201
}
202202

203-
func (s *Manager) run(ctx context.Context) error {
204-
s.closeMutex.Lock()
205-
s.closeWait.Add(1)
206-
s.closeMutex.Unlock()
203+
func (m *Manager) run(ctx context.Context) error {
204+
m.closeMutex.Lock()
205+
m.closeWait.Add(1)
206+
m.closeMutex.Unlock()
207207
go func() {
208-
s.closeWait.Done()
208+
m.closeWait.Done()
209209
}()
210210
// Expect replicas to update once every three times the interval...
211211
// If they don't, assume death!
212-
replicas, err := s.db.GetReplicasUpdatedAfter(ctx, database.Now().Add(-3*s.options.UpdateInterval))
212+
replicas, err := m.db.GetReplicasUpdatedAfter(ctx, database.Now().Add(-3*m.options.UpdateInterval))
213213
if err != nil {
214214
return xerrors.Errorf("get replicas: %w", err)
215215
}
216216

217-
s.mutex.Lock()
218-
s.peers = make([]database.Replica, 0, len(replicas))
217+
m.mutex.Lock()
218+
m.peers = make([]database.Replica, 0, len(replicas))
219219
for _, replica := range replicas {
220-
if replica.ID == s.options.ID {
220+
if replica.ID == m.options.ID {
221221
continue
222222
}
223-
s.peers = append(s.peers, replica)
223+
m.peers = append(m.peers, replica)
224224
}
225-
s.mutex.Unlock()
225+
m.mutex.Unlock()
226226

227227
var wg sync.WaitGroup
228228
var mu sync.Mutex
229229
failed := make([]string, 0)
230-
for _, peer := range s.Regional() {
230+
for _, peer := range m.Regional() {
231231
wg.Add(1)
232232
peer := peer
233233
go func() {
234234
defer wg.Done()
235235
req, err := http.NewRequestWithContext(ctx, http.MethodGet, peer.RelayAddress, nil)
236236
if err != nil {
237-
s.logger.Error(ctx, "create http request for relay probe",
237+
m.logger.Error(ctx, "create http request for relay probe",
238238
slog.F("relay_address", peer.RelayAddress), slog.Error(err))
239239
return
240240
}
241241
client := http.Client{
242-
Timeout: s.options.PeerTimeout,
242+
Timeout: m.options.PeerTimeout,
243243
}
244244
res, err := client.Do(req)
245245
if err != nil {
@@ -260,58 +260,58 @@ func (s *Manager) run(ctx context.Context) error {
260260
}
261261
}
262262

263-
replica, err := s.db.UpdateReplica(ctx, database.UpdateReplicaParams{
264-
ID: s.self.ID,
263+
replica, err := m.db.UpdateReplica(ctx, database.UpdateReplicaParams{
264+
ID: m.self.ID,
265265
UpdatedAt: database.Now(),
266-
StartedAt: s.self.StartedAt,
267-
StoppedAt: s.self.StoppedAt,
268-
RelayAddress: s.self.RelayAddress,
269-
RegionID: s.self.RegionID,
270-
Hostname: s.self.Hostname,
271-
Version: s.self.Version,
266+
StartedAt: m.self.StartedAt,
267+
StoppedAt: m.self.StoppedAt,
268+
RelayAddress: m.self.RelayAddress,
269+
RegionID: m.self.RegionID,
270+
Hostname: m.self.Hostname,
271+
Version: m.self.Version,
272272
Error: replicaError,
273273
})
274274
if err != nil {
275275
return xerrors.Errorf("update replica: %w", err)
276276
}
277-
s.mutex.Lock()
278-
if s.self.Error.String != replica.Error.String {
277+
m.mutex.Lock()
278+
if m.self.Error.String != replica.Error.String {
279279
// Publish an update occurred!
280-
err = s.pubsub.Publish(PubsubEvent, []byte(s.self.ID.String()))
280+
err = m.pubsub.Publish(PubsubEvent, []byte(m.self.ID.String()))
281281
if err != nil {
282-
s.mutex.Unlock()
282+
m.mutex.Unlock()
283283
return xerrors.Errorf("publish replica update: %w", err)
284284
}
285285
}
286-
s.self = replica
287-
if s.callback != nil {
288-
go s.callback()
286+
m.self = replica
287+
if m.callback != nil {
288+
go m.callback()
289289
}
290-
s.mutex.Unlock()
290+
m.mutex.Unlock()
291291
return nil
292292
}
293293

294294
// Self represents the current replica.
295-
func (s *Manager) Self() database.Replica {
296-
s.mutex.Lock()
297-
defer s.mutex.Unlock()
298-
return s.self
295+
func (m *Manager) Self() database.Replica {
296+
m.mutex.Lock()
297+
defer m.mutex.Unlock()
298+
return m.self
299299
}
300300

301301
// All returns every replica, including itself.
302-
func (s *Manager) All() []database.Replica {
303-
s.mutex.Lock()
304-
defer s.mutex.Unlock()
305-
return append(s.peers, s.self)
302+
func (m *Manager) All() []database.Replica {
303+
m.mutex.Lock()
304+
defer m.mutex.Unlock()
305+
return append(m.peers, m.self)
306306
}
307307

308308
// Regional returns all replicas in the same region excluding itself.
309-
func (s *Manager) Regional() []database.Replica {
310-
s.mutex.Lock()
311-
defer s.mutex.Unlock()
309+
func (m *Manager) Regional() []database.Replica {
310+
m.mutex.Lock()
311+
defer m.mutex.Unlock()
312312
replicas := make([]database.Replica, 0)
313-
for _, replica := range s.peers {
314-
if replica.RegionID != s.self.RegionID {
313+
for _, replica := range m.peers {
314+
if replica.RegionID != m.self.RegionID {
315315
continue
316316
}
317317
replicas = append(replicas, replica)
@@ -321,47 +321,47 @@ func (s *Manager) Regional() []database.Replica {
321321

322322
// SetCallback sets a function to execute whenever new peers
323323
// are refreshed or updated.
324-
func (s *Manager) SetCallback(callback func()) {
325-
s.mutex.Lock()
326-
defer s.mutex.Unlock()
327-
s.callback = callback
324+
func (m *Manager) SetCallback(callback func()) {
325+
m.mutex.Lock()
326+
defer m.mutex.Unlock()
327+
m.callback = callback
328328
// Instantly call the callback to inform replicas!
329329
go callback()
330330
}
331331

332-
func (s *Manager) Close() error {
333-
s.closeMutex.Lock()
332+
func (m *Manager) Close() error {
333+
m.closeMutex.Lock()
334334
select {
335-
case <-s.closed:
336-
s.closeMutex.Unlock()
335+
case <-m.closed:
336+
m.closeMutex.Unlock()
337337
return nil
338338
default:
339339
}
340-
close(s.closed)
341-
s.closeCancel()
342-
s.closeWait.Wait()
343-
s.closeMutex.Unlock()
340+
close(m.closed)
341+
m.closeCancel()
342+
m.closeWait.Wait()
343+
m.closeMutex.Unlock()
344344

345345
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
346346
defer cancelFunc()
347-
_, err := s.db.UpdateReplica(ctx, database.UpdateReplicaParams{
348-
ID: s.self.ID,
347+
_, err := m.db.UpdateReplica(ctx, database.UpdateReplicaParams{
348+
ID: m.self.ID,
349349
UpdatedAt: database.Now(),
350-
StartedAt: s.self.StartedAt,
350+
StartedAt: m.self.StartedAt,
351351
StoppedAt: sql.NullTime{
352352
Time: database.Now(),
353353
Valid: true,
354354
},
355-
RelayAddress: s.self.RelayAddress,
356-
RegionID: s.self.RegionID,
357-
Hostname: s.self.Hostname,
358-
Version: s.self.Version,
359-
Error: s.self.Error,
355+
RelayAddress: m.self.RelayAddress,
356+
RegionID: m.self.RegionID,
357+
Hostname: m.self.Hostname,
358+
Version: m.self.Version,
359+
Error: m.self.Error,
360360
})
361361
if err != nil {
362362
return xerrors.Errorf("update replica: %w", err)
363363
}
364-
err = s.pubsub.Publish(PubsubEvent, []byte(s.self.ID.String()))
364+
err = m.pubsub.Publish(PubsubEvent, []byte(m.self.ID.String()))
365365
if err != nil {
366366
return xerrors.Errorf("publish replica update: %w", err)
367367
}

0 commit comments

Comments
 (0)