Skip to content

Commit 0015071

Browse files
Configure load balancer on a per-cluster and operator-wide level (#57)
* Deny all requests to the load balancer by default. * Operator-wide toggle for the load-balancer. * Define per-cluster useLoadBalancer option. If useLoadBalancer is not set - then operator-wide defaults take place. If it is true - the load balancer is created, otherwise a service type clusterIP is created. Internally, we have to completely replace the service if the service type changes. We cannot patch, since some fields from the old service that will remain after patch are incompatible with the new one, and handling them explicitly when updating the service is ugly and error-prone. We cannot update the service because of the immutable fields, that leaves us the only option of deleting the old service and creating the new one. Unfortunately, there is still an issue of unnecessary removal of endpoints associated with the service, it will be addressed in future commits. * Revert the unintended effect of go fmt * Recreate endpoints on service update. When the service type is changed, the service is deleted and then the one with the new type is created. Unfortnately, endpoints are deleted as well. Re-create them afterwards, preserving the original addresses stored in them. * Improve error messages and comments. Use generate instead of gen in names.
1 parent ba6529b commit 0015071

File tree

11 files changed

+120
-46
lines changed

11 files changed

+120
-46
lines changed

manifests/configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ data:
2828
super_username: postgres
2929
teams_api_url: http://fake-teams-api.default.svc.cluster.local
3030
workers: "4"
31+
enable_load_balancer: "true"

manifests/testpostgresql.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ spec:
4141
loop_wait: &loop_wait 10
4242
retry_timeout: 10
4343
maximum_lag_on_failover: 33554432
44+
useLoadBalancer: true
4445
maintenanceWindows:
4546
- 01:00-06:00 #UTC
4647
- Sat:00:00-04:00

pkg/cluster/cluster.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ func (c *Cluster) Create() error {
242242
func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) {
243243
//TODO: improve comparison
244244
match = true
245+
if c.Service[role].Spec.Type != service.Spec.Type {
246+
return false, fmt.Sprintf("new %s service's type %s doesn't match the current one %s",
247+
role, service.Spec.Type, c.Service[role].Spec.Type)
248+
}
245249
oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges
246250
newSourceRanges := service.Spec.LoadBalancerSourceRanges
247251
/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */
@@ -292,7 +296,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
292296
// In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through
293297
// and the combined effect of all the changes should be applied.
294298
// TODO: log all reasons for changing the statefulset, not just the last one.
295-
// TODO: make sure this is in sync with genPodTemplate, ideally by using the same list of fields to generate
299+
// TODO: make sure this is in sync with generatePodTemplate, ideally by using the same list of fields to generate
296300
// the template and the diff
297301
if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName {
298302
needsReplace = true
@@ -435,7 +439,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
435439
continue
436440
}
437441
}
438-
newService := c.genService(role, newSpec.Spec.AllowedSourceRanges)
442+
newService := c.generateService(role, &newSpec.Spec)
439443
if match, reason := c.sameServiceWith(role, newService); !match {
440444
c.logServiceChanges(role, c.Service[role], newService, true, reason)
441445
if err := c.updateService(role, newService); err != nil {
@@ -446,7 +450,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
446450
}
447451
}
448452

449-
newStatefulSet, err := c.genStatefulSet(newSpec.Spec)
453+
newStatefulSet, err := c.generateStatefulSet(newSpec.Spec)
450454
if err != nil {
451455
return fmt.Errorf("could not generate statefulset: %v", err)
452456
}

pkg/cluster/k8sres.go

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
pgBinariesLocationTemplate = "/usr/lib/postgresql/%s/bin"
1919
patroniPGBinariesParameterName = "bin_dir"
2020
patroniPGParametersParameterName = "parameters"
21+
localHost = "127.0.0.1/32"
2122
)
2223

2324
type pgUser struct {
@@ -203,7 +204,7 @@ PATRONI_INITDB_PARAMS:
203204
return string(result)
204205
}
205206

206-
func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec {
207+
func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec {
207208
spiloConfiguration := c.generateSpiloJSONConfiguration(pgParameters, patroniParameters)
208209

209210
envVars := []v1.EnvVar{
@@ -323,14 +324,14 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements,
323324
return &template
324325
}
325326

326-
func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) {
327+
func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) {
327328
resourceRequirements, err := c.resourceRequirements(spec.Resources)
328329
if err != nil {
329330
return nil, err
330331
}
331332

332-
podTemplate := c.genPodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni)
333-
volumeClaimTemplate, err := persistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass)
333+
podTemplate := c.generatePodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni)
334+
volumeClaimTemplate, err := generatePersistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass)
334335
if err != nil {
335336
return nil, err
336337
}
@@ -352,7 +353,7 @@ func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet,
352353
return statefulSet, nil
353354
}
354355

355-
func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
356+
func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
356357
metadata := v1.ObjectMeta{
357358
Name: constants.DataVolumeName,
358359
}
@@ -383,19 +384,19 @@ func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.P
383384
return volumeClaim, nil
384385
}
385386

386-
func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) {
387+
func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) {
387388
secrets = make(map[string]*v1.Secret, len(c.pgUsers))
388389
namespace := c.Metadata.Namespace
389390
for username, pgUser := range c.pgUsers {
390391
//Skip users with no password i.e. human users (they'll be authenticated using pam)
391-
secret := c.genSingleUserSecret(namespace, pgUser)
392+
secret := c.generateSingleUserSecret(namespace, pgUser)
392393
if secret != nil {
393394
secrets[username] = secret
394395
}
395396
}
396397
/* special case for the system user */
397398
for _, systemUser := range c.systemUsers {
398-
secret := c.genSingleUserSecret(namespace, systemUser)
399+
secret := c.generateSingleUserSecret(namespace, systemUser)
399400
if secret != nil {
400401
secrets[systemUser.Name] = secret
401402
}
@@ -404,7 +405,7 @@ func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) {
404405
return
405406
}
406407

407-
func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret {
408+
func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret {
408409
//Skip users with no password i.e. human users (they'll be authenticated using pam)
409410
if pgUser.Password == "" {
410411
return nil
@@ -425,7 +426,7 @@ func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.
425426
return &secret
426427
}
427428

428-
func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v1.Service {
429+
func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service {
429430

430431
dnsNameFunction := c.masterDnsName
431432
name := c.Metadata.Name
@@ -434,37 +435,62 @@ func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v
434435
name = name + "-repl"
435436
}
436437

438+
serviceSpec := v1.ServiceSpec{
439+
Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}},
440+
Type: v1.ServiceTypeClusterIP,
441+
}
442+
443+
if role == Replica {
444+
serviceSpec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)}
445+
}
446+
447+
var annotations map[string]string
448+
449+
// Examine the per-cluster load balancer setting, if it is not defined - check the operator configuration.
450+
if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) ||
451+
(newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) {
452+
453+
// safe default value: lock load balancer to only local address unless overriden explicitely.
454+
sourceRanges := []string{localHost}
455+
allowedSourceRanges := newSpec.AllowedSourceRanges
456+
if len(allowedSourceRanges) >= 0 {
457+
sourceRanges = allowedSourceRanges
458+
}
459+
460+
serviceSpec.Type = v1.ServiceTypeLoadBalancer
461+
serviceSpec.LoadBalancerSourceRanges = sourceRanges
462+
463+
annotations = map[string]string{
464+
constants.ZalandoDNSNameAnnotation: dnsNameFunction(),
465+
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
466+
}
467+
468+
}
469+
437470
service := &v1.Service{
438471
ObjectMeta: v1.ObjectMeta{
439-
Name: name,
440-
Namespace: c.Metadata.Namespace,
441-
Labels: c.roleLabelsSet(role),
442-
Annotations: map[string]string{
443-
constants.ZalandoDNSNameAnnotation: dnsNameFunction(),
444-
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
445-
},
446-
},
447-
Spec: v1.ServiceSpec{
448-
Type: v1.ServiceTypeLoadBalancer,
449-
Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}},
450-
LoadBalancerSourceRanges: allowedSourceRanges,
472+
Name: name,
473+
Namespace: c.Metadata.Namespace,
474+
Labels: c.roleLabelsSet(role),
475+
Annotations: annotations,
451476
},
452-
}
453-
if role == Replica {
454-
service.Spec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)}
477+
Spec: serviceSpec,
455478
}
456479

457480
return service
458481
}
459482

460-
func (c *Cluster) genMasterEndpoints() *v1.Endpoints {
483+
func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints {
461484
endpoints := &v1.Endpoints{
462485
ObjectMeta: v1.ObjectMeta{
463486
Name: c.Metadata.Name,
464487
Namespace: c.Metadata.Namespace,
465488
Labels: c.roleLabelsSet(Master),
466489
},
467490
}
491+
if len(subsets) > 0 {
492+
endpoints.Subsets = subsets
493+
}
468494

469495
return endpoints
470496
}

pkg/cluster/resources.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
119119
if c.Statefulset != nil {
120120
return nil, fmt.Errorf("statefulset already exists in the cluster")
121121
}
122-
statefulSetSpec, err := c.genStatefulSet(c.Spec)
122+
statefulSetSpec, err := c.generateStatefulSet(c.Spec)
123123
if err != nil {
124124
return nil, fmt.Errorf("could not generate statefulset: %v", err)
125125
}
@@ -233,7 +233,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
233233
if c.Service[role] != nil {
234234
return nil, fmt.Errorf("service already exists in the cluster")
235235
}
236-
serviceSpec := c.genService(role, c.Spec.AllowedSourceRanges)
236+
serviceSpec := c.generateService(role, &c.Spec)
237237

238238
service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec)
239239
if err != nil {
@@ -249,9 +249,47 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
249249
return fmt.Errorf("there is no service in the cluster")
250250
}
251251
serviceName := util.NameFromMeta(c.Service[role].ObjectMeta)
252+
endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta)
253+
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
254+
if newService.Spec.Type != c.Service[role].Spec.Type {
255+
// service type has changed, need to replace the service completely.
256+
// we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
257+
var (
258+
currentEndpoint *v1.Endpoints
259+
err error
260+
)
261+
262+
if role == Master {
263+
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
264+
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
265+
currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name)
266+
if err != nil {
267+
return fmt.Errorf("could not get current cluster endpoints: %v", err)
268+
}
269+
}
270+
err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions)
271+
if err != nil {
272+
return fmt.Errorf("could not delete service '%s': '%v'", serviceName, err)
273+
}
274+
c.Endpoint = nil
275+
svc, err := c.KubeClient.Services(newService.Namespace).Create(newService)
276+
if err != nil {
277+
return fmt.Errorf("could not create service '%s': '%v'", serviceName, err)
278+
}
279+
c.Service[role] = svc
280+
if role == Master {
281+
// create the new endpoint using the addresses obtained from the previous one
282+
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
283+
ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec)
284+
if err != nil {
285+
return fmt.Errorf("could not create endpoint '%s': '%v'", endpointName, err)
286+
}
287+
c.Endpoint = ep
288+
}
289+
return nil
290+
}
252291

253292
if len(newService.ObjectMeta.Annotations) > 0 {
254-
255293
annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations)
256294

257295
_, err := c.KubeClient.Services(c.Service[role].Namespace).Patch(
@@ -300,7 +338,7 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) {
300338
if c.Endpoint != nil {
301339
return nil, fmt.Errorf("endpoint already exists in the cluster")
302340
}
303-
endpointsSpec := c.genMasterEndpoints()
341+
endpointsSpec := c.generateMasterEndpoints(nil)
304342

305343
endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec)
306344
if err != nil {
@@ -327,7 +365,7 @@ func (c *Cluster) deleteEndpoint() error {
327365
}
328366

329367
func (c *Cluster) applySecrets() error {
330-
secrets := c.genUserSecrets()
368+
secrets := c.generateUserSecrets()
331369

332370
for secretUsername, secretSpec := range secrets {
333371
secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)

pkg/cluster/sync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (c *Cluster) syncService(role PostgresRole) error {
100100
return nil
101101
}
102102

103-
desiredSvc := c.genService(role, cSpec.AllowedSourceRanges)
103+
desiredSvc := c.generateService(role, &cSpec)
104104
match, reason := c.sameServiceWith(role, desiredSvc)
105105
if match {
106106
return nil
@@ -158,7 +158,7 @@ func (c *Cluster) syncStatefulSet() error {
158158
}
159159
/* TODO: should check that we need to replace the statefulset */
160160
if !rollUpdate {
161-
desiredSS, err := c.genStatefulSet(cSpec)
161+
desiredSS, err := c.generateStatefulSet(cSpec)
162162
if err != nil {
163163
return fmt.Errorf("could not generate statefulset: %v", err)
164164
}

pkg/cluster/util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ func metadataAnnotationsPatch(annotations map[string]string) string {
7070
annotationsList = append(annotationsList, fmt.Sprintf(`"%s":"%s"`, name, value))
7171
}
7272
annotationsString := strings.Join(annotationsList, ",")
73-
// TODO: perhaps use patchStrategy:"replace" json annotation instead of constructing the patch literally.
74-
return fmt.Sprintf(constants.ServiceMetadataAnnotationFormat, annotationsString)
73+
// TODO: perhaps use patchStrategy:action json annotation instead of constructing the patch literally.
74+
return fmt.Sprintf(constants.ServiceMetadataAnnotationReplaceFormat, annotationsString)
7575
}
7676

7777
func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) {

pkg/spec/postgresql.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,10 @@ type PostgresSpec struct {
8686
Patroni `json:"patroni,omitempty"`
8787
Resources `json:"resources,omitempty"`
8888

89-
TeamID string `json:"teamId"`
90-
AllowedSourceRanges []string `json:"allowedSourceRanges"`
89+
TeamID string `json:"teamId"`
90+
AllowedSourceRanges []string `json:"allowedSourceRanges"`
91+
// EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omited from the manifest
92+
UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"`
9193
ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"`
9294
NumberOfInstances int32 `json:"numberOfInstances"`
9395
Users map[string]userFlags `json:"users"`

pkg/spec/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package spec
33
import (
44
"fmt"
55
"strings"
6+
67
"database/sql"
78

89
"k8s.io/client-go/pkg/api/v1"

pkg/util/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type Config struct {
5353
DebugLogging bool `name:"debug_logging" default:"true"`
5454
EnableDBAccess bool `name:"enable_database_access" default:"true"`
5555
EnableTeamsAPI bool `name:"enable_teams_api" default:"true"`
56+
EnableLoadBalancer bool `name:"enable_load_balancer" default:"true"`
5657
MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"`
5758
ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"`
5859
Workers uint32 `name:"workers" default:"4"`

pkg/util/constants/annotations.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package constants
22

33
// Names and values in Kubernetes annotation for services, statefulsets and volumes
44
const (
5-
ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname"
6-
ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout"
7-
ElbTimeoutAnnotationValue = "3600"
8-
KubeIAmAnnotation = "iam.amazonaws.com/role"
9-
VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by"
10-
ServiceMetadataAnnotationFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}`
5+
ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname"
6+
ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout"
7+
ElbTimeoutAnnotationValue = "3600"
8+
KubeIAmAnnotation = "iam.amazonaws.com/role"
9+
VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by"
10+
ServiceMetadataAnnotationReplaceFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}`
1111
)

0 commit comments

Comments
 (0)