Skip to content

Commit 253f91a

Browse files
committed
various bug fixes
1 parent e665782 commit 253f91a

File tree

5 files changed

+200
-156
lines changed

5 files changed

+200
-156
lines changed

pkg/cluster/cluster.go

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type Config struct {
4545

4646
type kubeResources struct {
4747
Services map[PostgresRole]*v1.Service
48-
Endpoint *v1.Endpoints
48+
Endpoints map[PostgresRole]*v1.Endpoints
4949
Secrets map[types.UID]*v1.Secret
5050
Statefulset *v1beta1.StatefulSet
5151
PodDisruptionBudget *policybeta1.PodDisruptionBudget
@@ -98,12 +98,15 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
9898
})
9999

100100
cluster := &Cluster{
101-
Config: cfg,
102-
Postgresql: pgSpec,
103-
pgUsers: make(map[string]spec.PgUser),
104-
systemUsers: make(map[string]spec.PgUser),
105-
podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
106-
kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service)},
101+
Config: cfg,
102+
Postgresql: pgSpec,
103+
pgUsers: make(map[string]spec.PgUser),
104+
systemUsers: make(map[string]spec.PgUser),
105+
podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
106+
kubeResources: kubeResources{
107+
Secrets: make(map[types.UID]*v1.Secret),
108+
Services: make(map[PostgresRole]*v1.Service),
109+
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
107110
masterLess: false,
108111
userSyncStrategy: users.DefaultUserSyncStrategy{},
109112
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
@@ -202,17 +205,16 @@ func (c *Cluster) Create() error {
202205

203206
c.setStatus(spec.ClusterStatusCreating)
204207

205-
//service will create endpoint implicitly
206-
ep, err = c.createEndpoint()
207-
if err != nil {
208-
return fmt.Errorf("could not create endpoint: %v", err)
209-
}
210-
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
211-
212208
for _, role := range []PostgresRole{Master, Replica} {
213209
if role == Replica && !c.Spec.ReplicaLoadBalancer {
214210
continue
215211
}
212+
ep, err = c.createEndpoint(role)
213+
if err != nil {
214+
return fmt.Errorf("could not create %s endpoint: %v", role, err)
215+
}
216+
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
217+
216218
service, err = c.createService(role)
217219
if err != nil {
218220
return fmt.Errorf("could not create %s service: %v", role, err)
@@ -419,23 +421,11 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
419421
}
420422

421423
// Service
422-
for _, role := range []PostgresRole{Master, Replica} {
423-
if role == Replica && !c.Spec.ReplicaLoadBalancer {
424-
if c.Services[role] != nil {
425-
// delete the left over replica service
426-
if err := c.deleteService(role); err != nil {
427-
c.logger.Errorf("could not delete obsolete %s service: %v", role, err)
428-
}
429-
}
430-
continue
431-
}
432-
433-
if !reflect.DeepEqual(c.generateService(role, &oldSpec.Spec), c.generateService(role, &newSpec.Spec)) {
434-
if err := c.syncService(role); err != nil {
435-
if !k8sutil.ResourceAlreadyExists(err) {
436-
c.logger.Errorf("coud not sync %s service: %v", role, err)
437-
}
438-
}
424+
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
425+
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) ||
426+
oldSpec.Spec.ReplicaLoadBalancer != oldSpec.Spec.ReplicaLoadBalancer {
427+
if err := c.syncServices(); err != nil {
428+
c.logger.Errorf("could not sync services: %v", err)
439429
}
440430
}
441431

@@ -513,14 +503,15 @@ func (c *Cluster) Delete() error {
513503
c.mu.Lock()
514504
defer c.mu.Unlock()
515505

516-
if err := c.deleteEndpoint(); err != nil {
517-
return fmt.Errorf("could not delete endpoint: %v", err)
518-
}
519-
520506
for _, role := range []PostgresRole{Master, Replica} {
521507
if role == Replica && !c.Spec.ReplicaLoadBalancer {
522508
continue
523509
}
510+
511+
if err := c.deleteEndpoint(role); err != nil {
512+
return fmt.Errorf("could not delete %s endpoint: %v", role, err)
513+
}
514+
524515
if err := c.deleteService(role); err != nil {
525516
return fmt.Errorf("could not delete %s service: %v", role, err)
526517
}
@@ -694,7 +685,8 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus {
694685

695686
MasterService: c.GetServiceMaster(),
696687
ReplicaService: c.GetServiceReplica(),
697-
Endpoint: c.GetEndpoint(),
688+
MasterEndpoint: c.GetEndpointMaster(),
689+
ReplicaEndpoint: c.GetEndpointReplica(),
698690
StatefulSet: c.GetStatefulSet(),
699691
PodDisruptionBudget: c.GetPodDisruptionBudget(),
700692
CurrentProcess: c.GetCurrentProcess(),

pkg/cluster/k8sres.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,13 @@ func (c *Cluster) statefulSetName() string {
5555
return c.Name
5656
}
5757

58-
func (c *Cluster) endpointName() string {
59-
return c.Name
58+
func (c *Cluster) endpointName(role PostgresRole) string {
59+
name := c.Name
60+
if role == Replica {
61+
name = name + "-repl"
62+
}
63+
64+
return name
6065
}
6166

6267
func (c *Cluster) serviceName(role PostgresRole) string {
@@ -543,12 +548,12 @@ func (c *Cluster) generateService(role PostgresRole, spec *spec.PostgresSpec) *v
543548
return service
544549
}
545550

546-
func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints {
551+
func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints {
547552
endpoints := &v1.Endpoints{
548553
ObjectMeta: metav1.ObjectMeta{
549-
Name: c.endpointName(),
554+
Name: c.endpointName(role),
550555
Namespace: c.Namespace,
551-
Labels: c.roleLabelsSet(Master),
556+
Labels: c.roleLabelsSet(role),
552557
},
553558
}
554559
if len(subsets) > 0 {

pkg/cluster/resources.go

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ func (c *Cluster) listResources() error {
2929
c.logger.Infof("found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
3030
}
3131

32-
if c.Endpoint != nil {
33-
c.logger.Infof("found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID)
32+
for role, endpoint := range c.Endpoints {
33+
c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
3434
}
3535

3636
for role, service := range c.Services {
@@ -257,7 +257,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
257257
}
258258

259259
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
260-
endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta)
260+
endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta)
261261
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
262262
if newService.Spec.Type != c.Services[role].Spec.Type {
263263
// service type has changed, need to replace the service completely.
@@ -270,17 +270,17 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
270270
if role == Master {
271271
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
272272
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
273-
currentEndpoint, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(), metav1.GetOptions{})
273+
currentEndpoint, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{})
274274
if err != nil {
275-
return fmt.Errorf("could not get current cluster endpoints: %v", err)
275+
return fmt.Errorf("could not get current cluster %s endpoints: %v", role, err)
276276
}
277277
}
278278
err = c.KubeClient.Services(serviceName.Namespace).Delete(serviceName.Name, c.deleteOptions)
279279
if err != nil {
280280
return fmt.Errorf("could not delete service %q: %v", serviceName, err)
281281
}
282282

283-
c.Endpoint = nil
283+
c.Endpoints[role] = nil
284284
svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService)
285285
if err != nil {
286286
return fmt.Errorf("could not create service %q: %v", serviceName, err)
@@ -289,12 +289,13 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
289289
c.Services[role] = svc
290290
if role == Master {
291291
// create the new endpoint using the addresses obtained from the previous one
292-
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
292+
endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets)
293293
ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec)
294294
if err != nil {
295295
return fmt.Errorf("could not create endpoint %q: %v", endpointName, err)
296296
}
297-
c.Endpoint = ep
297+
298+
c.Endpoints[role] = ep
298299
}
299300

300301
return nil
@@ -332,31 +333,32 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
332333

333334
func (c *Cluster) deleteService(role PostgresRole) error {
334335
c.logger.Debugf("deleting service %s", role)
335-
if c.Services[role] == nil {
336-
return fmt.Errorf("there is no %s service in the cluster", role)
337-
}
336+
338337
service := c.Services[role]
339-
err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions)
340-
if err != nil {
338+
339+
if err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions); err != nil {
341340
return err
342341
}
342+
343343
c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta))
344344
c.Services[role] = nil
345+
345346
return nil
346347
}
347348

348-
func (c *Cluster) createEndpoint() (*v1.Endpoints, error) {
349+
func (c *Cluster) createEndpoint(role PostgresRole) (*v1.Endpoints, error) {
349350
c.setProcessName("creating endpoint")
350-
if c.Endpoint != nil {
351-
return nil, fmt.Errorf("endpoint already exists in the cluster")
351+
if c.Endpoints[role] != nil {
352+
return nil, fmt.Errorf("%s endpoint already exists in the cluster", role)
352353
}
353-
endpointsSpec := c.generateMasterEndpoints(nil)
354+
endpointsSpec := c.generateEndpoint(role, nil)
354355

355356
endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec)
356357
if err != nil {
357-
return nil, err
358+
return nil, fmt.Errorf("could not create %s endpoint: %v", role, err)
358359
}
359-
c.Endpoint = endpoints
360+
361+
c.Endpoints[role] = endpoints
360362

361363
return endpoints, nil
362364
}
@@ -379,13 +381,22 @@ func (c *Cluster) createPodDisruptionBudget() (*policybeta1.PodDisruptionBudget,
379381
}
380382

381383
func (c *Cluster) updatePodDisruptionBudget(pdb *policybeta1.PodDisruptionBudget) error {
382-
if c.podEventsQueue == nil {
384+
if c.PodDisruptionBudget == nil {
383385
return fmt.Errorf("there is no pod disruption budget in the cluster")
384386
}
385387

386-
newPdb, err := c.KubeClient.PodDisruptionBudgets(pdb.Namespace).Update(pdb)
388+
err := c.KubeClient.
389+
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace).
390+
Delete(c.PodDisruptionBudget.Name, c.deleteOptions)
391+
if err != nil {
392+
return fmt.Errorf("could not delete pod disruption budget: %v", err)
393+
}
394+
395+
newPdb, err := c.KubeClient.
396+
PodDisruptionBudgets(pdb.Namespace).
397+
Create(pdb)
387398
if err != nil {
388-
return fmt.Errorf("could not update pod disruption budget: %v", err)
399+
return fmt.Errorf("could not create pod disruption budget: %v", err)
389400
}
390401
c.PodDisruptionBudget = newPdb
391402

@@ -399,7 +410,7 @@ func (c *Cluster) deletePodDisruptionBudget() error {
399410
}
400411
err := c.KubeClient.
401412
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace).
402-
Delete(c.PodDisruptionBudget.Namespace, c.deleteOptions)
413+
Delete(c.PodDisruptionBudget.Name, c.deleteOptions)
403414
if err != nil {
404415
return fmt.Errorf("could not delete pod disruption budget: %v", err)
405416
}
@@ -409,18 +420,20 @@ func (c *Cluster) deletePodDisruptionBudget() error {
409420
return nil
410421
}
411422

412-
func (c *Cluster) deleteEndpoint() error {
423+
func (c *Cluster) deleteEndpoint(role PostgresRole) error {
413424
c.setProcessName("deleting endpoint")
414425
c.logger.Debugln("deleting endpoint")
415-
if c.Endpoint == nil {
416-
return fmt.Errorf("there is no endpoint in the cluster")
426+
if c.Endpoints[role] == nil {
427+
return fmt.Errorf("there is no %s endpoint in the cluster", role)
417428
}
418-
err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, c.deleteOptions)
419-
if err != nil {
429+
430+
if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(c.Endpoints[role].Name, c.deleteOptions); err != nil {
420431
return fmt.Errorf("could not delete endpoint: %v", err)
421432
}
422-
c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta))
423-
c.Endpoint = nil
433+
434+
c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoints[role].ObjectMeta))
435+
436+
c.Endpoints[role] = nil
424437

425438
return nil
426439
}
@@ -453,9 +466,14 @@ func (c *Cluster) GetServiceReplica() *v1.Service {
453466
return c.Services[Replica]
454467
}
455468

456-
// GetEndpoint returns cluster's kubernetes Endpoint
457-
func (c *Cluster) GetEndpoint() *v1.Endpoints {
458-
return c.Endpoint
469+
// GetEndpointMaster returns cluster's kubernetes master Endpoint
470+
func (c *Cluster) GetEndpointMaster() *v1.Endpoints {
471+
return c.Endpoints[Master]
472+
}
473+
474+
// GetEndpointReplica returns cluster's kubernetes master Endpoint
475+
func (c *Cluster) GetEndpointReplica() *v1.Endpoints {
476+
return c.Endpoints[Replica]
459477
}
460478

461479
// GetStatefulSet returns cluster's kubernetes StatefulSet

0 commit comments

Comments
 (0)