@@ -29,8 +29,8 @@ func (c *Cluster) listResources() error {
29
29
c .logger .Infof ("found secret: %q (uid: %q)" , util .NameFromMeta (obj .ObjectMeta ), obj .UID )
30
30
}
31
31
32
- if c . Endpoint != nil {
33
- c .logger .Infof ("found endpoint: %q (uid: %q)" , util .NameFromMeta (c . Endpoint . ObjectMeta ), c . Endpoint .UID )
32
+ for role , endpoint := range c . Endpoints {
33
+ c .logger .Infof ("found %s endpoint: %q (uid: %q)" , role , util .NameFromMeta (endpoint . ObjectMeta ), endpoint .UID )
34
34
}
35
35
36
36
for role , service := range c .Services {
@@ -257,7 +257,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
257
257
}
258
258
259
259
serviceName := util .NameFromMeta (c .Services [role ].ObjectMeta )
260
- endpointName := util .NameFromMeta (c .Endpoint .ObjectMeta )
260
+ endpointName := util .NameFromMeta (c .Endpoints [ role ] .ObjectMeta )
261
261
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
262
262
if newService .Spec .Type != c .Services [role ].Spec .Type {
263
263
// service type has changed, need to replace the service completely.
@@ -270,17 +270,17 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
270
270
if role == Master {
271
271
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
272
272
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
273
- currentEndpoint , err = c .KubeClient .Endpoints (c .Namespace ).Get (c .endpointName (), metav1.GetOptions {})
273
+ currentEndpoint , err = c .KubeClient .Endpoints (c .Namespace ).Get (c .endpointName (role ), metav1.GetOptions {})
274
274
if err != nil {
275
- return fmt .Errorf ("could not get current cluster endpoints: %v" , err )
275
+ return fmt .Errorf ("could not get current cluster %s endpoints: %v" , role , err )
276
276
}
277
277
}
278
278
err = c .KubeClient .Services (serviceName .Namespace ).Delete (serviceName .Name , c .deleteOptions )
279
279
if err != nil {
280
280
return fmt .Errorf ("could not delete service %q: %v" , serviceName , err )
281
281
}
282
282
283
- c .Endpoint = nil
283
+ c .Endpoints [ role ] = nil
284
284
svc , err := c .KubeClient .Services (serviceName .Namespace ).Create (newService )
285
285
if err != nil {
286
286
return fmt .Errorf ("could not create service %q: %v" , serviceName , err )
@@ -289,12 +289,13 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
289
289
c .Services [role ] = svc
290
290
if role == Master {
291
291
// create the new endpoint using the addresses obtained from the previous one
292
- endpointSpec := c .generateMasterEndpoints ( currentEndpoint .Subsets )
292
+ endpointSpec := c .generateEndpoint ( role , currentEndpoint .Subsets )
293
293
ep , err := c .KubeClient .Endpoints (endpointSpec .Namespace ).Create (endpointSpec )
294
294
if err != nil {
295
295
return fmt .Errorf ("could not create endpoint %q: %v" , endpointName , err )
296
296
}
297
- c .Endpoint = ep
297
+
298
+ c .Endpoints [role ] = ep
298
299
}
299
300
300
301
return nil
@@ -332,31 +333,32 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
332
333
333
334
func (c * Cluster ) deleteService (role PostgresRole ) error {
334
335
c .logger .Debugf ("deleting service %s" , role )
335
- if c .Services [role ] == nil {
336
- return fmt .Errorf ("there is no %s service in the cluster" , role )
337
- }
336
+
338
337
service := c .Services [role ]
339
- err := c . KubeClient . Services ( service . Namespace ). Delete ( service . Name , c . deleteOptions )
340
- if err != nil {
338
+
339
+ if err := c . KubeClient . Services ( service . Namespace ). Delete ( service . Name , c . deleteOptions ); err != nil {
341
340
return err
342
341
}
342
+
343
343
c .logger .Infof ("%s service %q has been deleted" , role , util .NameFromMeta (service .ObjectMeta ))
344
344
c .Services [role ] = nil
345
+
345
346
return nil
346
347
}
347
348
348
- func (c * Cluster ) createEndpoint () (* v1.Endpoints , error ) {
349
+ func (c * Cluster ) createEndpoint (role PostgresRole ) (* v1.Endpoints , error ) {
349
350
c .setProcessName ("creating endpoint" )
350
- if c .Endpoint != nil {
351
- return nil , fmt .Errorf ("endpoint already exists in the cluster" )
351
+ if c .Endpoints [ role ] != nil {
352
+ return nil , fmt .Errorf ("%s endpoint already exists in the cluster" , role )
352
353
}
353
- endpointsSpec := c .generateMasterEndpoints ( nil )
354
+ endpointsSpec := c .generateEndpoint ( role , nil )
354
355
355
356
endpoints , err := c .KubeClient .Endpoints (endpointsSpec .Namespace ).Create (endpointsSpec )
356
357
if err != nil {
357
- return nil , err
358
+ return nil , fmt . Errorf ( "could not create %s endpoint: %v" , role , err )
358
359
}
359
- c .Endpoint = endpoints
360
+
361
+ c .Endpoints [role ] = endpoints
360
362
361
363
return endpoints , nil
362
364
}
@@ -379,13 +381,22 @@ func (c *Cluster) createPodDisruptionBudget() (*policybeta1.PodDisruptionBudget,
379
381
}
380
382
381
383
func (c * Cluster ) updatePodDisruptionBudget (pdb * policybeta1.PodDisruptionBudget ) error {
382
- if c .podEventsQueue == nil {
384
+ if c .PodDisruptionBudget == nil {
383
385
return fmt .Errorf ("there is no pod disruption budget in the cluster" )
384
386
}
385
387
386
- newPdb , err := c .KubeClient .PodDisruptionBudgets (pdb .Namespace ).Update (pdb )
388
+ err := c .KubeClient .
389
+ PodDisruptionBudgets (c .PodDisruptionBudget .Namespace ).
390
+ Delete (c .PodDisruptionBudget .Name , c .deleteOptions )
391
+ if err != nil {
392
+ return fmt .Errorf ("could not delete pod disruption budget: %v" , err )
393
+ }
394
+
395
+ newPdb , err := c .KubeClient .
396
+ PodDisruptionBudgets (pdb .Namespace ).
397
+ Create (pdb )
387
398
if err != nil {
388
- return fmt .Errorf ("could not update pod disruption budget: %v" , err )
399
+ return fmt .Errorf ("could not create pod disruption budget: %v" , err )
389
400
}
390
401
c .PodDisruptionBudget = newPdb
391
402
@@ -399,7 +410,7 @@ func (c *Cluster) deletePodDisruptionBudget() error {
399
410
}
400
411
err := c .KubeClient .
401
412
PodDisruptionBudgets (c .PodDisruptionBudget .Namespace ).
402
- Delete (c .PodDisruptionBudget .Namespace , c .deleteOptions )
413
+ Delete (c .PodDisruptionBudget .Name , c .deleteOptions )
403
414
if err != nil {
404
415
return fmt .Errorf ("could not delete pod disruption budget: %v" , err )
405
416
}
@@ -409,18 +420,20 @@ func (c *Cluster) deletePodDisruptionBudget() error {
409
420
return nil
410
421
}
411
422
412
- func (c * Cluster ) deleteEndpoint () error {
423
+ func (c * Cluster ) deleteEndpoint (role PostgresRole ) error {
413
424
c .setProcessName ("deleting endpoint" )
414
425
c .logger .Debugln ("deleting endpoint" )
415
- if c .Endpoint == nil {
416
- return fmt .Errorf ("there is no endpoint in the cluster" )
426
+ if c .Endpoints [ role ] == nil {
427
+ return fmt .Errorf ("there is no %s endpoint in the cluster" , role )
417
428
}
418
- err := c . KubeClient . Endpoints ( c . Endpoint . Namespace ). Delete ( c . Endpoint . Name , c . deleteOptions )
419
- if err != nil {
429
+
430
+ if err := c . KubeClient . Endpoints ( c . Endpoints [ role ]. Namespace ). Delete ( c . Endpoints [ role ]. Name , c . deleteOptions ); err != nil {
420
431
return fmt .Errorf ("could not delete endpoint: %v" , err )
421
432
}
422
- c .logger .Infof ("endpoint %q has been deleted" , util .NameFromMeta (c .Endpoint .ObjectMeta ))
423
- c .Endpoint = nil
433
+
434
+ c .logger .Infof ("endpoint %q has been deleted" , util .NameFromMeta (c .Endpoints [role ].ObjectMeta ))
435
+
436
+ c .Endpoints [role ] = nil
424
437
425
438
return nil
426
439
}
@@ -453,9 +466,14 @@ func (c *Cluster) GetServiceReplica() *v1.Service {
453
466
return c .Services [Replica ]
454
467
}
455
468
456
- // GetEndpoint returns cluster's kubernetes Endpoint
457
- func (c * Cluster ) GetEndpoint () * v1.Endpoints {
458
- return c .Endpoint
469
+ // GetEndpointMaster returns cluster's kubernetes master Endpoint
470
+ func (c * Cluster ) GetEndpointMaster () * v1.Endpoints {
471
+ return c .Endpoints [Master ]
472
+ }
473
+
474
+ // GetEndpointReplica returns cluster's kubernetes master Endpoint
475
+ func (c * Cluster ) GetEndpointReplica () * v1.Endpoints {
476
+ return c .Endpoints [Replica ]
459
477
}
460
478
461
479
// GetStatefulSet returns cluster's kubernetes StatefulSet
0 commit comments