Skip to content

Commit fdb3557

Browse files
committed
Rename to replicasync
1 parent 585bc1d commit fdb3557

File tree

4 files changed

+33
-31
lines changed

4 files changed

+33
-31
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
"ptytest",
8686
"quickstart",
8787
"reconfig",
88+
"replicasync",
8889
"retrier",
8990
"rpty",
9091
"sdkproto",

enterprise/coderd/coderd.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/coder/coder/enterprise/coderd/license"
2727
"github.com/coder/coder/enterprise/highavailability"
2828
"github.com/coder/coder/enterprise/highavailability/derpmesh"
29-
"github.com/coder/coder/enterprise/highavailability/replica"
29+
"github.com/coder/coder/enterprise/highavailability/replicasync"
3030
"github.com/coder/coder/tailnet"
3131
)
3232

@@ -127,7 +127,7 @@ func New(ctx context.Context, options *Options) (*API, error) {
127127
})
128128

129129
var err error
130-
api.replica, err = replica.New(ctx, options.Logger, options.Database, options.Pubsub, replica.Options{
130+
api.replicaManager, err = replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, replicasync.Options{
131131
ID: options.ReplicaID,
132132
RelayAddress: options.DERPServerRelayAddress,
133133
RegionID: int32(options.DERPServerRegionID),
@@ -171,7 +171,7 @@ type API struct {
171171
*Options
172172

173173
// Detects multiple Coder replicas running at the same time.
174-
replica *replica.Server
174+
replicaManager *replicasync.Manager
175175
// Meshes DERP connections from multiple replicas.
176176
derpMesh *derpmesh.Mesh
177177

@@ -182,7 +182,7 @@ type API struct {
182182

183183
func (api *API) Close() error {
184184
api.cancelEntitlementsLoop()
185-
_ = api.replica.Close()
185+
_ = api.replicaManager.Close()
186186
_ = api.derpMesh.Close()
187187
return api.AGPL.Close()
188188
}
@@ -256,9 +256,9 @@ func (api *API) updateEntitlements(ctx context.Context) error {
256256
coordinator = haCoordinator
257257
}
258258

259-
api.replica.SetCallback(func() {
259+
api.replicaManager.SetCallback(func() {
260260
addresses := make([]string, 0)
261-
for _, replica := range api.replica.Regional() {
261+
for _, replica := range api.replicaManager.Regional() {
262262
addresses = append(addresses, replica.RelayAddress)
263263
}
264264
api.derpMesh.SetAddresses(addresses)

enterprise/highavailability/replica/replica.go renamed to enterprise/highavailability/replicasync/replicasync.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package replica
1+
package replicasync
22

33
import (
44
"context"
@@ -34,7 +34,7 @@ type Options struct {
3434

3535
// New registers the replica with the database and periodically updates to ensure
3636
// it's healthy. It contacts all other alive replicas to ensure they are reachable.
37-
func New(ctx context.Context, logger slog.Logger, db database.Store, pubsub database.Pubsub, options Options) (*Server, error) {
37+
func New(ctx context.Context, logger slog.Logger, db database.Store, pubsub database.Pubsub, options Options) (*Manager, error) {
3838
if options.ID == uuid.Nil {
3939
panic("An ID must be provided!")
4040
}
@@ -88,7 +88,7 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, pubsub data
8888
return nil, xerrors.Errorf("publish new replica: %w", err)
8989
}
9090
ctx, cancelFunc := context.WithCancel(ctx)
91-
server := &Server{
91+
server := &Manager{
9292
options: &options,
9393
db: db,
9494
pubsub: pubsub,
@@ -110,7 +110,8 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, pubsub data
110110
return server, nil
111111
}
112112

113-
type Server struct {
113+
// Manager keeps the replica up to date and in sync with other replicas.
114+
type Manager struct {
114115
options *Options
115116
db database.Store
116117
pubsub database.Pubsub
@@ -128,7 +129,7 @@ type Server struct {
128129
}
129130

130131
// loop runs the replica update sequence on an update interval.
131-
func (s *Server) loop(ctx context.Context) {
132+
func (s *Manager) loop(ctx context.Context) {
132133
defer s.closeWait.Done()
133134
ticker := time.NewTicker(s.options.UpdateInterval)
134135
defer ticker.Stop()
@@ -146,7 +147,7 @@ func (s *Server) loop(ctx context.Context) {
146147
}
147148

148149
// subscribe listens for new replica information!
149-
func (s *Server) subscribe(ctx context.Context) error {
150+
func (s *Manager) subscribe(ctx context.Context) error {
150151
needsUpdate := false
151152
updating := false
152153
updateMutex := sync.Mutex{}
@@ -199,7 +200,7 @@ func (s *Server) subscribe(ctx context.Context) error {
199200
return nil
200201
}
201202

202-
func (s *Server) run(ctx context.Context) error {
203+
func (s *Manager) run(ctx context.Context) error {
203204
s.closeMutex.Lock()
204205
s.closeWait.Add(1)
205206
s.closeMutex.Unlock()
@@ -291,21 +292,21 @@ func (s *Server) run(ctx context.Context) error {
291292
}
292293

293294
// Self represents the current replica.
294-
func (s *Server) Self() database.Replica {
295+
func (s *Manager) Self() database.Replica {
295296
s.mutex.Lock()
296297
defer s.mutex.Unlock()
297298
return s.self
298299
}
299300

300301
// All returns every replica, including itself.
301-
func (s *Server) All() []database.Replica {
302+
func (s *Manager) All() []database.Replica {
302303
s.mutex.Lock()
303304
defer s.mutex.Unlock()
304305
return append(s.peers, s.self)
305306
}
306307

307308
// Regional returns all replicas in the same region excluding itself.
308-
func (s *Server) Regional() []database.Replica {
309+
func (s *Manager) Regional() []database.Replica {
309310
s.mutex.Lock()
310311
defer s.mutex.Unlock()
311312
replicas := make([]database.Replica, 0)
@@ -320,15 +321,15 @@ func (s *Server) Regional() []database.Replica {
320321

321322
// SetCallback sets a function to execute whenever new peers
322323
// are refreshed or updated.
323-
func (s *Server) SetCallback(callback func()) {
324+
func (s *Manager) SetCallback(callback func()) {
324325
s.mutex.Lock()
325326
defer s.mutex.Unlock()
326327
s.callback = callback
327328
// Instantly call the callback to inform replicas!
328329
go callback()
329330
}
330331

331-
func (s *Server) Close() error {
332+
func (s *Manager) Close() error {
332333
s.closeMutex.Lock()
333334
select {
334335
case <-s.closed:

enterprise/highavailability/replica/replica_test.go renamed to enterprise/highavailability/replicasync/replicasync_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package replica_test
1+
package replicasync_test
22

33
import (
44
"context"
@@ -17,7 +17,7 @@ import (
1717
"cdr.dev/slog/sloggers/slogtest"
1818
"github.com/coder/coder/coderd/database"
1919
"github.com/coder/coder/coderd/database/dbtestutil"
20-
"github.com/coder/coder/enterprise/highavailability/replica"
20+
"github.com/coder/coder/enterprise/highavailability/replicasync"
2121
"github.com/coder/coder/testutil"
2222
)
2323

@@ -32,12 +32,12 @@ func TestReplica(t *testing.T) {
3232
t.Parallel()
3333
db, pubsub := dbtestutil.NewDB(t)
3434
id := uuid.New()
35-
cancel, err := pubsub.Subscribe(replica.PubsubEvent, func(ctx context.Context, message []byte) {
35+
cancel, err := pubsub.Subscribe(replicasync.PubsubEvent, func(ctx context.Context, message []byte) {
3636
assert.Equal(t, []byte(id.String()), message)
3737
})
3838
require.NoError(t, err)
3939
defer cancel()
40-
server, err := replica.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replica.Options{
40+
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replicasync.Options{
4141
ID: id,
4242
})
4343
require.NoError(t, err)
@@ -54,12 +54,12 @@ func TestReplica(t *testing.T) {
5454
ID: id,
5555
})
5656
require.NoError(t, err)
57-
cancel, err := pubsub.Subscribe(replica.PubsubEvent, func(ctx context.Context, message []byte) {
57+
cancel, err := pubsub.Subscribe(replicasync.PubsubEvent, func(ctx context.Context, message []byte) {
5858
assert.Equal(t, []byte(id.String()), message)
5959
})
6060
require.NoError(t, err)
6161
defer cancel()
62-
server, err := replica.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replica.Options{
62+
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replicasync.Options{
6363
ID: id,
6464
})
6565
require.NoError(t, err)
@@ -84,7 +84,7 @@ func TestReplica(t *testing.T) {
8484
RelayAddress: srv.URL,
8585
})
8686
require.NoError(t, err)
87-
server, err := replica.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replica.Options{
87+
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replicasync.Options{
8888
ID: uuid.New(),
8989
})
9090
require.NoError(t, err)
@@ -97,7 +97,7 @@ func TestReplica(t *testing.T) {
9797
t.Parallel()
9898
db, pubsub := dbtestutil.NewDB(t)
9999
var count atomic.Int32
100-
cancel, err := pubsub.Subscribe(replica.PubsubEvent, func(ctx context.Context, message []byte) {
100+
cancel, err := pubsub.Subscribe(replicasync.PubsubEvent, func(ctx context.Context, message []byte) {
101101
count.Add(1)
102102
})
103103
require.NoError(t, err)
@@ -112,7 +112,7 @@ func TestReplica(t *testing.T) {
112112
RelayAddress: "http://169.254.169.254",
113113
})
114114
require.NoError(t, err)
115-
server, err := replica.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replica.Options{
115+
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replicasync.Options{
116116
ID: uuid.New(),
117117
PeerTimeout: 1 * time.Millisecond,
118118
})
@@ -130,7 +130,7 @@ func TestReplica(t *testing.T) {
130130
t.Parallel()
131131
db, pubsub := dbtestutil.NewDB(t)
132132
id := uuid.New()
133-
server, err := replica.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replica.Options{
133+
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replicasync.Options{
134134
ID: id,
135135
})
136136
require.NoError(t, err)
@@ -145,9 +145,9 @@ func TestReplica(t *testing.T) {
145145
})
146146
require.NoError(t, err)
147147
// Publish multiple times to ensure it can handle that case.
148-
err = pubsub.Publish(replica.PubsubEvent, []byte(peer.ID.String()))
148+
err = pubsub.Publish(replicasync.PubsubEvent, []byte(peer.ID.String()))
149149
require.NoError(t, err)
150-
err = pubsub.Publish(replica.PubsubEvent, []byte(peer.ID.String()))
150+
err = pubsub.Publish(replicasync.PubsubEvent, []byte(peer.ID.String()))
151151
require.NoError(t, err)
152152
require.Eventually(t, func() bool {
153153
return len(server.Regional()) == 1
@@ -168,7 +168,7 @@ func TestReplica(t *testing.T) {
168168
count := 20
169169
wg.Add(count)
170170
for i := 0; i < count; i++ {
171-
server, err := replica.New(context.Background(), logger, db, pubsub, replica.Options{
171+
server, err := replicasync.New(context.Background(), logger, db, pubsub, replicasync.Options{
172172
ID: uuid.New(),
173173
RelayAddress: srv.URL,
174174
})

0 commit comments

Comments
 (0)