Skip to content

Commit 9720ac1

Browse files
committed
WIP: Hold the proper locks while examining the list of databases.
Introduce a new lock called specMu lock to protect the cluster spec. This lock is held on update and sync, and when retrieving the spec in the API code. There is no need to acquire it for cluster creation and deletion: creation assigns the spec to the cluster before linking it to the controller, and deletion just removes the cluster from the list in the controller, both holding the global clustersMu Lock.
1 parent 011458f commit 9720ac1

File tree

5 files changed

+49
-9
lines changed

5 files changed

+49
-9
lines changed

pkg/apiserver/apiserver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ type controllerInformer interface {
3333
ClusterStatus(team, cluster string) (*spec.ClusterStatus, error)
3434
ClusterLogs(team, cluster string) ([]*spec.LogEntry, error)
3535
ClusterHistory(team, cluster string) ([]*spec.Diff, error)
36+
ClusterDatabasesMap() map[string][]string
3637
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
3738
ListQueue(workerID uint32) (*spec.QueueDump, error)
3839
GetWorkersCnt() uint32
3940
WorkerStatus(workerID uint32) (*spec.WorkerStatus, error)
40-
GetClusterDatabasesMap() map[string][]string
4141
}
4242

4343
// Server describes HTTP API server
@@ -226,7 +226,7 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) {
226226

227227
func (s *Server) databases(w http.ResponseWriter, req *http.Request) {
228228

229-
databaseNamesPerCluster := s.controller.GetClusterDatabasesMap()
229+
databaseNamesPerCluster := s.controller.ClusterDatabasesMap()
230230
s.respond(databaseNamesPerCluster, nil, w)
231231
return
232232

pkg/cluster/cluster.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ type Cluster struct {
7575
oauthTokenGetter OAuthTokenGetter
7676
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
7777
currentProcess spec.Process
78-
processMu sync.RWMutex
78+
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
79+
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
7980
}
8081

8182
type compareStatefulsetResult struct {
@@ -437,7 +438,7 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
437438
defer c.mu.Unlock()
438439

439440
c.setStatus(spec.ClusterStatusUpdating)
440-
c.Postgresql = *newSpec
441+
c.setSpec(newSpec)
441442

442443
defer func() {
443444
if updateFailed {

pkg/cluster/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
2020
c.mu.Lock()
2121
defer c.mu.Unlock()
2222

23-
c.Postgresql = *newSpec
23+
c.setSpec(newSpec)
2424

2525
defer func() {
2626
if err != nil {

pkg/cluster/util.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package cluster
22

33
import (
4+
"bytes"
5+
"encoding/gob"
46
"encoding/json"
57
"fmt"
68
"math/rand"
9+
"sort"
710
"strings"
811
"time"
912

@@ -18,7 +21,6 @@ import (
1821
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
1922
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
2023
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
21-
"sort"
2224
)
2325

2426
// OAuthTokenGetter provides the method for fetching OAuth tokens
@@ -386,3 +388,32 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st
386388
func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName {
387389
return replicas[rand.Intn(len(replicas))]
388390
}
391+
392+
func cloneSpec(from *spec.Postgresql) (*spec.Postgresql, error) {
393+
var (
394+
buf bytes.Buffer
395+
result *spec.Postgresql
396+
err error
397+
)
398+
enc := gob.NewEncoder(&buf)
399+
if err = enc.Encode(*from); err != nil {
400+
return nil, fmt.Errorf("could not encode the spec: %v", err)
401+
}
402+
dec := gob.NewDecoder(&buf)
403+
if err = dec.Decode(&result); err != nil {
404+
return nil, fmt.Errorf("could not decode the spec: %v", err)
405+
}
406+
return result, nil
407+
}
408+
409+
func (c *Cluster) setSpec(newSpec *spec.Postgresql) {
410+
c.specMu.Lock()
411+
c.Postgresql = *newSpec
412+
c.specMu.Unlock()
413+
}
414+
415+
func (c *Cluster) GetSpec() (*spec.Postgresql, error) {
416+
c.specMu.RLock()
417+
defer c.specMu.RUnlock()
418+
return cloneSpec(&c.Postgresql)
419+
}

pkg/controller/status.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,21 @@ func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, e
3333
}
3434

3535
// GetClusterDatabasesMap returns for each cluster the list of databases running there
36-
func (c *Controller) GetClusterDatabasesMap() map[string][]string {
36+
func (c *Controller) ClusterDatabasesMap() map[string][]string {
3737

3838
m := make(map[string][]string)
3939

40+
// avoid modifying the cluster list while we are fetching each one of them.
41+
c.clustersMu.RLock()
42+
defer c.clustersMu.RUnlock()
4043
for _, cluster := range c.clusters {
41-
for database := range cluster.Postgresql.Spec.Databases {
42-
m[cluster.Name] = append(m[cluster.Name], database)
44+
// GetSpec holds the specMu lock of a cluster
45+
if spec, err := cluster.GetSpec(); err == nil {
46+
for database := range spec.Spec.Databases {
47+
m[cluster.Name] = append(m[cluster.Name], database)
48+
}
49+
} else {
50+
c.logger.Warningf("could not get the list of databases for cluster %q: %v", cluster.Name, err)
4351
}
4452
}
4553

0 commit comments

Comments
 (0)