Skip to content

Commit 590f0f8

Browse files
committed
Remove excessive locking
1 parent ec2c1f1 commit 590f0f8

File tree

3 files changed

+33
-45
lines changed

3 files changed

+33
-45
lines changed

enterprise/replicasync/replicasync.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,11 @@ func (m *Manager) loop(ctx context.Context) {
143143

144144
// subscribe listens for new replica information!
145145
func (m *Manager) subscribe(ctx context.Context) error {
146-
needsUpdate := false
147-
updating := false
148-
updateMutex := sync.Mutex{}
146+
var (
147+
needsUpdate = false
148+
updating = false
149+
updateMutex = sync.Mutex{}
150+
)
149151

150152
// This loop will continually update nodes as updates are processed.
151153
// The intent is to always be up to date without spamming the run
@@ -199,9 +201,7 @@ func (m *Manager) run(ctx context.Context) error {
199201
m.closeMutex.Lock()
200202
m.closeWait.Add(1)
201203
m.closeMutex.Unlock()
202-
go func() {
203-
m.closeWait.Done()
204-
}()
204+
defer m.closeWait.Done()
205205
// Expect replicas to update once every three times the interval...
206206
// If they don't, assume death!
207207
replicas, err := m.db.GetReplicasUpdatedAfter(ctx, database.Now().Add(-3*m.options.UpdateInterval))
@@ -224,8 +224,7 @@ func (m *Manager) run(ctx context.Context) error {
224224
failed := make([]string, 0)
225225
for _, peer := range m.Regional() {
226226
wg.Add(1)
227-
peer := peer
228-
go func() {
227+
go func(peer database.Replica) {
229228
defer wg.Done()
230229
req, err := http.NewRequestWithContext(ctx, http.MethodGet, peer.RelayAddress, nil)
231230
if err != nil {
@@ -247,7 +246,7 @@ func (m *Manager) run(ctx context.Context) error {
247246
return
248247
}
249248
_ = res.Body.Close()
250-
}()
249+
}(peer)
251250
}
252251
wg.Wait()
253252
replicaError := sql.NullString{}
@@ -279,19 +278,18 @@ func (m *Manager) run(ctx context.Context) error {
279278
return xerrors.Errorf("update replica: %w", err)
280279
}
281280
m.mutex.Lock()
281+
defer m.mutex.Unlock()
282282
if m.self.Error.String != replica.Error.String {
283283
// Publish an update occurred!
284284
err = m.pubsub.Publish(PubsubEvent, []byte(m.self.ID.String()))
285285
if err != nil {
286-
m.mutex.Unlock()
287286
return xerrors.Errorf("publish replica update: %w", err)
288287
}
289288
}
290289
m.self = replica
291290
if m.callback != nil {
292291
go m.callback()
293292
}
294-
m.mutex.Unlock()
295293
return nil
296294
}
297295

@@ -306,7 +304,7 @@ func (m *Manager) Self() database.Replica {
306304
func (m *Manager) All() []database.Replica {
307305
m.mutex.Lock()
308306
defer m.mutex.Unlock()
309-
return append(m.peers, m.self)
307+
return append(m.peers[:], m.self)
310308
}
311309

312310
// Regional returns all replicas in the same region excluding itself.

enterprise/tailnet/coordinator.go

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,19 @@ func (c *haCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID
6969
// When a new connection is requested, we update it with the latest
7070
// node of the agent. This allows the connection to establish.
7171
node, ok := c.nodes[agent]
72+
c.mutex.Unlock()
7273
if ok {
7374
data, err := json.Marshal([]*agpl.Node{node})
7475
if err != nil {
75-
c.mutex.Unlock()
7676
return xerrors.Errorf("marshal node: %w", err)
7777
}
7878
_, err = conn.Write(data)
7979
if err != nil {
80-
c.mutex.Unlock()
8180
return xerrors.Errorf("write nodes: %w", err)
8281
}
8382
}
8483

84+
c.mutex.Lock()
8585
connectionSockets, ok := c.agentToConnectionSockets[agent]
8686
if !ok {
8787
connectionSockets = map[uuid.UUID]net.Conn{}
@@ -129,28 +129,17 @@ func (c *haCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *js
129129
}
130130

131131
c.mutex.Lock()
132-
defer c.mutex.Unlock()
133-
134132
// Update the node of this client in our in-memory map. If an agent entirely
135133
// shuts down and reconnects, it needs to be aware of all clients attempting
136134
// to establish connections.
137135
c.nodes[id] = &node
138-
139136
// Write the new node from this client to the actively connected agent.
140-
err = c.writeNodeToAgent(agent, &node)
141-
if err != nil {
142-
return xerrors.Errorf("write node to agent: %w", err)
143-
}
144-
145-
return nil
146-
}
147-
148-
func (c *haCoordinator) writeNodeToAgent(agent uuid.UUID, node *agpl.Node) error {
149137
agentSocket, ok := c.agentSockets[agent]
138+
c.mutex.Unlock()
150139
if !ok {
151140
// If we don't own the agent locally, send it over pubsub to a node that
152141
// owns the agent.
153-
err := c.publishNodesToAgent(agent, []*agpl.Node{node})
142+
err := c.publishNodesToAgent(agent, []*agpl.Node{&node})
154143
if err != nil {
155144
return xerrors.Errorf("publish node to agent")
156145
}
@@ -159,7 +148,7 @@ func (c *haCoordinator) writeNodeToAgent(agent uuid.UUID, node *agpl.Node) error
159148

160149
// Write the new node from this client to the actively
161150
// connected agent.
162-
data, err := json.Marshal([]*agpl.Node{node})
151+
data, err := json.Marshal([]*agpl.Node{&node})
163152
if err != nil {
164153
return xerrors.Errorf("marshal nodes: %w", err)
165154
}
@@ -171,14 +160,13 @@ func (c *haCoordinator) writeNodeToAgent(agent uuid.UUID, node *agpl.Node) error
171160
}
172161
return xerrors.Errorf("write json: %w", err)
173162
}
163+
174164
return nil
175165
}
176166

177167
// ServeAgent accepts a WebSocket connection to an agent that listens to
178168
// incoming connections and publishes node updates.
179169
func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
180-
c.mutex.Lock()
181-
182170
// Tell clients on other instances to send a callmemaybe to us.
183171
err := c.publishAgentHello(id)
184172
if err != nil {
@@ -203,6 +191,7 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
203191
// If an old agent socket is connected, we close it
204192
// to avoid any leaks. This shouldn't ever occur because
205193
// we expect one agent to be running.
194+
c.mutex.Lock()
206195
oldAgentSocket, ok := c.agentSockets[id]
207196
if ok {
208197
_ = oldAgentSocket.Close()
@@ -234,6 +223,8 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
234223
}
235224

236225
func (c *haCoordinator) nodesSubscribedToAgent(agentID uuid.UUID) []*agpl.Node {
226+
c.mutex.Lock()
227+
defer c.mutex.Unlock()
237228
sockets, ok := c.agentToConnectionSockets[agentID]
238229
if !ok {
239230
return nil
@@ -279,12 +270,11 @@ func (c *haCoordinator) hangleAgentUpdate(id uuid.UUID, decoder *json.Decoder) (
279270
for _, connectionSocket := range connectionSockets {
280271
connectionSocket := connectionSocket
281272
go func() {
273+
defer wg.Done()
282274
_ = connectionSocket.SetWriteDeadline(time.Now().Add(5 * time.Second))
283275
_, _ = connectionSocket.Write(data)
284-
wg.Done()
285276
}()
286277
}
287-
288278
wg.Wait()
289279
return &node, nil
290280
}
@@ -428,9 +418,7 @@ func (c *haCoordinator) runPubsub() error {
428418
return
429419
}
430420

431-
c.mutex.Lock()
432421
nodes := c.nodesSubscribedToAgent(agentUUID)
433-
c.mutex.Unlock()
434422
if len(nodes) > 0 {
435423
err := c.publishNodesToAgent(agentUUID, nodes)
436424
if err != nil {

tailnet/coordinator.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,25 +127,26 @@ type coordinator struct {
127127
}
128128

129129
// Node returns an in-memory node by ID.
130+
// If the node does not exist, nil is returned.
130131
func (c *coordinator) Node(id uuid.UUID) *Node {
131132
c.mutex.Lock()
132133
defer c.mutex.Unlock()
133-
node := c.nodes[id]
134-
return node
134+
return c.nodes[id]
135135
}
136136

137137
// ServeClient accepts a WebSocket connection that wants to connect to an agent
138138
// with the specified ID.
139139
func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error {
140140
c.mutex.Lock()
141-
142141
if c.closed {
142+
c.mutex.Unlock()
143143
return xerrors.New("coordinator is closed")
144144
}
145145

146146
// When a new connection is requested, we update it with the latest
147147
// node of the agent. This allows the connection to establish.
148148
node, ok := c.nodes[agent]
149+
c.mutex.Unlock()
149150
if ok {
150151
data, err := json.Marshal([]*Node{node})
151152
if err != nil {
@@ -158,6 +159,7 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID)
158159
return xerrors.Errorf("write nodes: %w", err)
159160
}
160161
}
162+
c.mutex.Lock()
161163
connectionSockets, ok := c.agentToConnectionSockets[agent]
162164
if !ok {
163165
connectionSockets = map[uuid.UUID]net.Conn{}
@@ -203,7 +205,6 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
203205
}
204206

205207
c.mutex.Lock()
206-
207208
// Update the node of this client in our in-memory map. If an agent entirely
208209
// shuts down and reconnects, it needs to be aware of all clients attempting
209210
// to establish connections.
@@ -237,12 +238,13 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json
237238
// listens to incoming connections and publishes node updates.
238239
func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
239240
c.mutex.Lock()
240-
241241
if c.closed {
242+
c.mutex.Unlock()
242243
return xerrors.New("coordinator is closed")
243244
}
244245

245246
sockets, ok := c.agentToConnectionSockets[id]
247+
c.mutex.Unlock()
246248
if ok {
247249
// Publish all nodes that want to connect to the
248250
// desired agent ID.
@@ -269,6 +271,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
269271
// If an old agent socket is connected, we close it
270272
// to avoid any leaks. This shouldn't ever occur because
271273
// we expect one agent to be running.
274+
c.mutex.Lock()
272275
oldAgentSocket, ok := c.agentSockets[id]
273276
if ok {
274277
_ = oldAgentSocket.Close()
@@ -302,17 +305,15 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder
302305
}
303306

304307
c.mutex.Lock()
305-
306308
c.nodes[id] = &node
307309
connectionSockets, ok := c.agentToConnectionSockets[id]
308310
if !ok {
309311
c.mutex.Unlock()
310312
return nil
311313
}
312-
314+
c.mutex.Unlock()
313315
data, err := json.Marshal([]*Node{&node})
314316
if err != nil {
315-
c.mutex.Unlock()
316317
return xerrors.Errorf("marshal nodes: %w", err)
317318
}
318319

@@ -328,7 +329,6 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder
328329
}()
329330
}
330331

331-
c.mutex.Unlock()
332332
wg.Wait()
333333
return nil
334334
}
@@ -337,9 +337,11 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder
337337
// coordinator from accepting new connections.
338338
func (c *coordinator) Close() error {
339339
c.mutex.Lock()
340-
defer c.mutex.Unlock()
341-
340+
if c.closed {
341+
return nil
342+
}
342343
c.closed = true
344+
c.mutex.Unlock()
343345

344346
wg := sync.WaitGroup{}
345347

0 commit comments

Comments
 (0)