Skip to content

Commit 3b10dc6

Browse files
authored
patch/update services on type change (zalando#824)
* use Update when disabling LoadBalancer + added e2e test
1 parent 744c71d commit 3b10dc6

File tree

4 files changed

+86
-76
lines changed

4 files changed

+86
-76
lines changed

e2e/tests/test_e2e.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,55 @@ def setUpClass(cls):
5858
k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml")
5959
k8s.wait_for_pod_start('spilo-role=master')
6060

61+
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
62+
def test_enable_load_balancer(self):
63+
'''
64+
Test if services are updated when enabling/disabling load balancers
65+
'''
66+
67+
k8s = self.k8s
68+
cluster_label = 'version=acid-minimal-cluster'
69+
70+
# enable load balancer services
71+
pg_patch_enable_lbs = {
72+
"spec": {
73+
"enableMasterLoadBalancer": True,
74+
"enableReplicaLoadBalancer": True
75+
}
76+
}
77+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
78+
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_enable_lbs)
79+
# wait for service recreation
80+
time.sleep(60)
81+
82+
master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master')
83+
self.assertEqual(master_svc_type, 'LoadBalancer',
84+
"Expected LoadBalancer service type for master, found {}".format(master_svc_type))
85+
86+
repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica')
87+
self.assertEqual(repl_svc_type, 'LoadBalancer',
88+
"Expected LoadBalancer service type for replica, found {}".format(repl_svc_type))
89+
90+
# disable load balancer services again
91+
pg_patch_disable_lbs = {
92+
"spec": {
93+
"enableMasterLoadBalancer": False,
94+
"enableReplicaLoadBalancer": False
95+
}
96+
}
97+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
98+
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_lbs)
99+
# wait for service recreation
100+
time.sleep(60)
101+
102+
master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master')
103+
self.assertEqual(master_svc_type, 'ClusterIP',
104+
"Expected ClusterIP service type for master, found {}".format(master_svc_type))
105+
106+
repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica')
107+
self.assertEqual(repl_svc_type, 'ClusterIP',
108+
"Expected ClusterIP service type for replica, found {}".format(repl_svc_type))
109+
61110
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
62111
def test_min_resource_limits(self):
63112
'''
@@ -362,6 +411,13 @@ def wait_for_pod_start(self, pod_labels, namespace='default'):
362411
pod_phase = pods[0].status.phase
363412
time.sleep(self.RETRY_TIMEOUT_SEC)
364413

414+
def get_service_type(self, svc_labels, namespace='default'):
415+
svc_type = ''
416+
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
417+
for svc in svcs:
418+
svc_type = svc.spec.type
419+
return svc_type
420+
365421
def check_service_annotations(self, svc_labels, annotations, namespace='default'):
366422
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
367423
for svc in svcs:

manifests/operator-service-account-rbac.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ rules:
114114
- delete
115115
- get
116116
- patch
117+
- update
117118
# to CRUD the StatefulSet which controls the Postgres cluster instances
118119
- apiGroups:
119120
- apps

pkg/cluster/resources.go

Lines changed: 28 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -366,77 +366,18 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
366366
}
367367

368368
func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error {
369+
var (
370+
svc *v1.Service
371+
err error
372+
)
373+
369374
c.setProcessName("updating %v service", role)
370375

371376
if c.Services[role] == nil {
372377
return fmt.Errorf("there is no service in the cluster")
373378
}
374379

375380
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
376-
endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta)
377-
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
378-
if newService.Spec.Type != c.Services[role].Spec.Type {
379-
// service type has changed, need to replace the service completely.
380-
// we cannot use just patch the current service, since it may contain attributes incompatible with the new type.
381-
var (
382-
currentEndpoint *v1.Endpoints
383-
err error
384-
)
385-
386-
if role == Master {
387-
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
388-
// the addresses stored in it before the service is deleted (deletion of the service removes the endpoint)
389-
currentEndpoint, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{})
390-
if err != nil {
391-
return fmt.Errorf("could not get current cluster %s endpoints: %v", role, err)
392-
}
393-
}
394-
err = c.KubeClient.Services(serviceName.Namespace).Delete(serviceName.Name, c.deleteOptions)
395-
if err != nil {
396-
return fmt.Errorf("could not delete service %q: %v", serviceName, err)
397-
}
398-
399-
// wait until the service is truly deleted
400-
c.logger.Debugf("waiting for service to be deleted")
401-
402-
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
403-
func() (bool, error) {
404-
_, err2 := c.KubeClient.Services(serviceName.Namespace).Get(serviceName.Name, metav1.GetOptions{})
405-
if err2 == nil {
406-
return false, nil
407-
}
408-
if k8sutil.ResourceNotFound(err2) {
409-
return true, nil
410-
}
411-
return false, err2
412-
})
413-
if err != nil {
414-
return fmt.Errorf("could not delete service %q: %v", serviceName, err)
415-
}
416-
417-
// make sure we clear the stored service and endpoint status if the subsequent create fails.
418-
c.Services[role] = nil
419-
c.Endpoints[role] = nil
420-
if role == Master {
421-
// create the new endpoint using the addresses obtained from the previous one
422-
endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets)
423-
ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec)
424-
if err != nil {
425-
return fmt.Errorf("could not create endpoint %q: %v", endpointName, err)
426-
}
427-
428-
c.Endpoints[role] = ep
429-
}
430-
431-
svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService)
432-
if err != nil {
433-
return fmt.Errorf("could not create service %q: %v", serviceName, err)
434-
}
435-
436-
c.Services[role] = svc
437-
438-
return nil
439-
}
440381

441382
// update the service annotation in order to propagate ELB notation.
442383
if len(newService.ObjectMeta.Annotations) > 0 {
@@ -454,18 +395,30 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
454395
}
455396
}
456397

457-
patchData, err := specPatch(newService.Spec)
458-
if err != nil {
459-
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
460-
}
398+
// now, patch the service spec, but when disabling LoadBalancers do update instead
399+
// patch does not work because of LoadBalancerSourceRanges field (even if set to nil)
400+
oldServiceType := c.Services[role].Spec.Type
401+
newServiceType := newService.Spec.Type
402+
if newServiceType == "ClusterIP" && newServiceType != oldServiceType {
403+
newService.ResourceVersion = c.Services[role].ResourceVersion
404+
newService.Spec.ClusterIP = c.Services[role].Spec.ClusterIP
405+
svc, err = c.KubeClient.Services(serviceName.Namespace).Update(newService)
406+
if err != nil {
407+
return fmt.Errorf("could not update service %q: %v", serviceName, err)
408+
}
409+
} else {
410+
patchData, err := specPatch(newService.Spec)
411+
if err != nil {
412+
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
413+
}
461414

462-
// update the service spec
463-
svc, err := c.KubeClient.Services(serviceName.Namespace).Patch(
464-
serviceName.Name,
465-
types.MergePatchType,
466-
patchData, "")
467-
if err != nil {
468-
return fmt.Errorf("could not patch service %q: %v", serviceName, err)
415+
svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(
416+
serviceName.Name,
417+
types.MergePatchType,
418+
patchData, "")
419+
if err != nil {
420+
return fmt.Errorf("could not patch service %q: %v", serviceName, err)
421+
}
469422
}
470423
c.Services[role] = svc
471424

pkg/cluster/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (c *Cluster) syncServices() error {
116116
c.logger.Debugf("syncing %s service", role)
117117

118118
if err := c.syncEndpoint(role); err != nil {
119-
return fmt.Errorf("could not sync %s endpont: %v", role, err)
119+
return fmt.Errorf("could not sync %s endpoint: %v", role, err)
120120
}
121121

122122
if err := c.syncService(role); err != nil {

0 commit comments

Comments
 (0)