diff --git a/charts/postgres-operator-ui/values.yaml b/charts/postgres-operator-ui/values.yaml index da3c4baaf..9923ff023 100644 --- a/charts/postgres-operator-ui/values.yaml +++ b/charts/postgres-operator-ui/values.yaml @@ -62,8 +62,6 @@ podAnnotations: extraEnvs: [] # Exemple of settings to make snapshot view working in the ui when using AWS - # - name: WALE_S3_ENDPOINT - # value: https+path://s3.us-east-1.amazonaws.com:443 # - name: SPILO_S3_BACKUP_PREFIX # value: spilo/ # - name: AWS_ACCESS_KEY_ID @@ -83,8 +81,6 @@ extraEnvs: # key: AWS_DEFAULT_REGION # - name: SPILO_S3_BACKUP_BUCKET # value: - # - name: "USE_AWS_INSTANCE_PROFILE" - # value: "true" # configure UI service service: diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index 2511a09d3..bf94b63d0 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -364,7 +364,7 @@ configLogicalBackup: # logical_backup_memory_request: "" # image for pods of the logical backup job (example runs pg_dumpall) - logical_backup_docker_image: "ghcr.io/zalando/postgres-operator/logical-backup:v1.13.0" + logical_backup_docker_image: "ghcr.io/zalando/postgres-operator/logical-backup:v1.14.0" # path of google cloud service account json file # logical_backup_google_application_credentials: "" diff --git a/docs/administrator.md b/docs/administrator.md index 55abebc8b..f394b70ab 100644 --- a/docs/administrator.md +++ b/docs/administrator.md @@ -384,7 +384,7 @@ exceptions: The interval of days can be set with `password_rotation_interval` (default `90` = 90 days, minimum 1). On each rotation the user name and password values are replaced in the K8s secret. They belong to a newly created user named after -the original role plus rotation date in YYMMDD format. All priviliges are +the original role plus rotation date in YYMMDD format. All privileges are inherited meaning that migration scripts should still grant and revoke rights against the original role. The timestamp of the next rotation (in RFC 3339 format, UTC timezone) is written to the secret as well. Note, if the rotation @@ -564,7 +564,7 @@ manifest affinity. ``` If `node_readiness_label_merge` is set to `"OR"` (default) the readiness label -affinty will be appended with its own expressions block: +affinity will be appended with its own expressions block: ```yaml affinity: @@ -620,22 +620,34 @@ By default the topology key for the pod anti affinity is set to `kubernetes.io/hostname`, you can set another topology key e.g. `failure-domain.beta.kubernetes.io/zone`. See [built-in node labels](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#interlude-built-in-node-labels) for available topology keys. -## Pod Disruption Budget +## Pod Disruption Budgets -By default the operator uses a PodDisruptionBudget (PDB) to protect the cluster -from voluntarily disruptions and hence unwanted DB downtime. The `MinAvailable` -parameter of the PDB is set to `1` which prevents killing masters in single-node -clusters and/or the last remaining running instance in a multi-node cluster. +By default the operator creates two PodDisruptionBudgets (PDB) to protect the cluster +from voluntarily disruptions and hence unwanted DB downtime: so-called primary PDB and +and PDB for critical operations. + +### Primary PDB +The `MinAvailable` parameter of this PDB is set to `1` and, if `pdb_master_label_selector` +is enabled, label selector includes `spilo-role=master` condition, which prevents killing +masters in single-node clusters and/or the last remaining running instance in a multi-node +cluster. + +## PDB for critical operations +The `MinAvailable` parameter of this PDB is equal to the `numberOfInstances` set in the +cluster manifest, while label selector includes `critical-operation=true` condition. This +allows to protect all pods of a cluster, given they are labeled accordingly. +For example, Operator labels all Spilo pods with `critical-operation=true` during the major +version upgrade run. You may want to protect cluster pods during other critical operations +by assigning the label to pods yourself or using other means of automation. The PDB is only relaxed in two scenarios: * If a cluster is scaled down to `0` instances (e.g. for draining nodes) * If the PDB is disabled in the configuration (`enable_pod_disruption_budget`) -The PDB is still in place having `MinAvailable` set to `0`. If enabled it will -be automatically set to `1` on scale up. Disabling PDBs helps avoiding blocking -Kubernetes upgrades in managed K8s environments at the cost of prolonged DB -downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384) +The PDBs are still in place having `MinAvailable` set to `0`. Disabling PDBs +helps avoiding blocking Kubernetes upgrades in managed K8s environments at the +cost of prolonged DB downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384) for the use case. ## Add cluster-specific labels @@ -1128,7 +1140,7 @@ metadata: iam.gke.io/gcp-service-account: @.iam.gserviceaccount.com ``` -2. Specify the new custom service account in your [operator paramaters](./reference/operator_parameters.md) +2. Specify the new custom service account in your [operator parameters](./reference/operator_parameters.md) If using manual deployment or kustomize, this is done by setting `pod_service_account_name` in your configuration file specified in the diff --git a/docs/quickstart.md b/docs/quickstart.md index f080bd567..2d6742354 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -230,7 +230,7 @@ kubectl delete postgresql acid-minimal-cluster ``` This should remove the associated StatefulSet, database Pods, Services and -Endpoints. The PersistentVolumes are released and the PodDisruptionBudget is +Endpoints. The PersistentVolumes are released and the PodDisruptionBudgets are deleted. Secrets however are not deleted and backups will remain in place. When deleting a cluster while it is still starting up or got stuck during that diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 8d02ee7d8..ab0353202 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -116,9 +116,9 @@ These parameters are grouped directly under the `spec` key in the manifest. * **maintenanceWindows** a list which defines specific time frames when certain maintenance operations - are allowed. So far, it is only implemented for automatic major version - upgrades. Accepted formats are "01:00-06:00" for daily maintenance windows or - "Sat:00:00-04:00" for specific days, with all times in UTC. + such as automatic major upgrades or master pod migration. Accepted formats + are "01:00-06:00" for daily maintenance windows or "Sat:00:00-04:00" for specific + days, with all times in UTC. * **users** a map of usernames to user flags for the users that should be created in the @@ -247,7 +247,7 @@ These parameters are grouped directly under the `spec` key in the manifest. [kubernetes volumeSource](https://godoc.org/k8s.io/api/core/v1#VolumeSource). It allows you to mount existing PersistentVolumeClaims, ConfigMaps and Secrets inside the StatefulSet. Also an `emptyDir` volume can be shared between initContainer and statefulSet. - Additionaly, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example). + Additionally, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example). Set `isSubPathExpr` to true if you want to include [API environment variables](https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath-expanded-environment). You can also specify in which container the additional Volumes will be mounted with the `targetContainers` array option. If `targetContainers` is empty, additional volumes will be mounted only in the `postgres` container. @@ -257,7 +257,7 @@ These parameters are grouped directly under the `spec` key in the manifest. ## Prepared Databases The operator can create databases with default owner, reader and writer roles -without the need to specifiy them under `users` or `databases` sections. Those +without the need to specify them under `users` or `databases` sections. Those parameters are grouped under the `preparedDatabases` top-level key. For more information, see [user docs](../user.md#prepared-databases-with-roles-and-default-privileges). diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 3bd9e44f7..95bfb4cf3 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -209,7 +209,7 @@ under the `users` key. For all `LOGIN` roles that are not database owners the operator can rotate credentials in the corresponding K8s secrets by replacing the username and password. This means, new users will be added on each rotation inheriting - all priviliges from the original roles. The rotation date (in YYMMDD format) + all privileges from the original roles. The rotation date (in YYMMDD format) is appended to the names of the new user. The timestamp of the next rotation is written to the secret. The default is `false`. @@ -334,13 +334,13 @@ configuration they are grouped under the `kubernetes` key. pod namespace). * **pdb_name_format** - defines the template for PDB (Pod Disruption Budget) names created by the + defines the template for primary PDB (Pod Disruption Budget) name created by the operator. The default is `postgres-{cluster}-pdb`, where `{cluster}` is replaced by the cluster name. Only the `{cluster}` placeholders is allowed in the template. * **pdb_master_label_selector** - By default the PDB will match the master role hence preventing nodes to be + By default the primary PDB will match the master role hence preventing nodes to be drained if the node_readiness_label is not used. If this option if set to `false` the `spilo-role=master` selector will not be added to the PDB. @@ -552,7 +552,7 @@ configuration they are grouped under the `kubernetes` key. pods with `InitialDelaySeconds: 6`, `PeriodSeconds: 10`, `TimeoutSeconds: 5`, `SuccessThreshold: 1` and `FailureThreshold: 3`. When enabling readiness probes it is recommended to switch the `pod_management_policy` to `parallel` - to avoid unneccesary waiting times in case of multiple instances failing. + to avoid unnecessary waiting times in case of multiple instances failing. The default is `false`. * **storage_resize_mode** @@ -701,7 +701,7 @@ In the CRD-based configuration they are grouped under the `load_balancer` key. replaced by the cluster name, `{namespace}` is replaced with the namespace and `{hostedzone}` is replaced with the hosted zone (the value of the `db_hosted_zone` parameter). The `{team}` placeholder can still be used, - although it is not recommened because the team of a cluster can change. + although it is not recommended because the team of a cluster can change. If the cluster name starts with the `teamId` it will also be part of the DNS, aynway. No other placeholders are allowed! @@ -720,7 +720,7 @@ In the CRD-based configuration they are grouped under the `load_balancer` key. is replaced by the cluster name, `{namespace}` is replaced with the namespace and `{hostedzone}` is replaced with the hosted zone (the value of the `db_hosted_zone` parameter). The `{team}` placeholder can still be used, - although it is not recommened because the team of a cluster can change. + although it is not recommended because the team of a cluster can change. If the cluster name starts with the `teamId` it will also be part of the DNS, aynway. No other placeholders are allowed! diff --git a/docs/user.md b/docs/user.md index c63e43f57..c1a7c7d45 100644 --- a/docs/user.md +++ b/docs/user.md @@ -900,7 +900,7 @@ the PostgreSQL version between source and target cluster has to be the same. To start a cluster as standby, add the following `standby` section in the YAML file. You can stream changes from archived WAL files (AWS S3 or Google Cloud -Storage) or from a remote primary. Only one option can be specfied in the +Storage) or from a remote primary. Only one option can be specified in the manifest: ```yaml @@ -911,7 +911,7 @@ spec: For GCS, you have to define STANDBY_GOOGLE_APPLICATION_CREDENTIALS as a [custom pod environment variable](administrator.md#custom-pod-environment-variables). -It is not set from the config to allow for overridding. +It is not set from the config to allow for overriding. ```yaml spec: @@ -1282,7 +1282,7 @@ minutes if the certificates have changed and reloads postgres accordingly. ### TLS certificates for connection pooler By default, the pgBouncer image generates its own TLS certificate like Spilo. -When the `tls` section is specfied in the manifest it will be used for the +When the `tls` section is specified in the manifest it will be used for the connection pooler pod(s) as well. The security context options are hard coded to `runAsUser: 100` and `runAsGroup: 101`. The `fsGroup` will be the same like for Spilo. diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 04c6465c9..b9a2a27d4 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -1187,7 +1187,7 @@ def test_major_version_upgrade(self): Test major version upgrade: with full upgrade, maintenance window, and annotation """ def check_version(): - p = k8s.patroni_rest("acid-upgrade-test-0", "") + p = k8s.patroni_rest("acid-upgrade-test-0", "") or {} version = p.get("server_version", 0) // 10000 return version @@ -1237,7 +1237,7 @@ def get_annotations(): # should not upgrade because current time is not in maintenanceWindow current_time = datetime.now() maintenance_window_future = f"{(current_time+timedelta(minutes=60)).strftime('%H:%M')}-{(current_time+timedelta(minutes=120)).strftime('%H:%M')}" - pg_patch_version_15 = { + pg_patch_version_15_outside_mw = { "spec": { "postgresql": { "version": "15" @@ -1248,10 +1248,10 @@ def get_annotations(): } } k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15) + "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_outside_mw) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label) + # no pod replacement outside of the maintenance window k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) self.eventuallyEqual(check_version, 14, "Version should not be upgraded") @@ -1259,12 +1259,12 @@ def get_annotations(): second_annotations = get_annotations() self.assertIsNone(second_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure should not be set") - # change the version again to trigger operator sync + # change maintenanceWindows to current maintenance_window_current = f"{(current_time-timedelta(minutes=30)).strftime('%H:%M')}-{(current_time+timedelta(minutes=30)).strftime('%H:%M')}" - pg_patch_version_16 = { + pg_patch_version_15_in_mw = { "spec": { "postgresql": { - "version": "16" + "version": "15" }, "maintenanceWindows": [ maintenance_window_current @@ -1273,13 +1273,13 @@ def get_annotations(): } k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_16) + "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label) + k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - self.eventuallyEqual(check_version, 16, "Version should be upgraded from 14 to 16") + self.eventuallyEqual(check_version, 15, "Version should be upgraded from 14 to 15") # check if annotation for last upgrade's success is updated after second upgrade third_annotations = get_annotations() @@ -1303,20 +1303,20 @@ def get_annotations(): "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_17) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label) + k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label) k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - self.eventuallyEqual(check_version, 16, "Version should not be upgraded because annotation for last upgrade's failure is set") + self.eventuallyEqual(check_version, 15, "Version should not be upgraded because annotation for last upgrade's failure is set") # change the version back to 15 and should remove failure annotation k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15) + "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label) k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + self.eventuallyEqual(check_version, 15, "Version should not be upgraded from 15") fourth_annotations = get_annotations() self.assertIsNone(fourth_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure is not removed") @@ -1752,9 +1752,13 @@ def test_password_rotation(self): Test password rotation and removal of users due to retention policy ''' k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' leader = k8s.get_cluster_leader_pod() today = date.today() + # remember number of secrets to make sure it stays the same + secret_count = k8s.count_secrets_with_label(cluster_label) + # enable password rotation for owner of foo database pg_patch_rotation_single_users = { "spec": { @@ -1810,6 +1814,7 @@ def test_password_rotation(self): enable_password_rotation = { "data": { "enable_password_rotation": "true", + "inherited_annotations": "environment", "password_rotation_interval": "30", "password_rotation_user_retention": "30", # should be set to 60 }, @@ -1856,13 +1861,29 @@ def test_password_rotation(self): self.eventuallyEqual(lambda: len(self.query_database_with_user(leader.metadata.name, "postgres", "SELECT 1", "foo_user")), 1, "Could not connect to the database with rotation user {}".format(rotation_user), 10, 5) + # add annotation which triggers syncSecrets call + pg_annotation_patch = { + "metadata": { + "annotations": { + "environment": "test", + } + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_annotation_patch) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + time.sleep(10) + self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), secret_count, "Unexpected number of secrets") + # check if rotation has been ignored for user from test_cross_namespace_secrets test db_user_secret = k8s.get_secret(username="test.db_user", namespace="test") secret_username = str(base64.b64decode(db_user_secret.data["username"]), 'utf-8') - self.assertEqual("test.db_user", secret_username, "Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username)) + # check if annotation for secret has been updated + self.assertTrue("environment" in db_user_secret.metadata.annotations, "Added annotation was not propagated to secret") + # disable password rotation for all other users (foo_user) # and pick smaller intervals to see if the third fake rotation user is dropped enable_password_rotation = { @@ -2100,7 +2121,7 @@ def test_statefulset_annotation_propagation(self): patch_sset_propagate_annotations = { "data": { "downscaler_annotations": "deployment-time,downscaler/*", - "inherited_annotations": "owned-by", + "inherited_annotations": "environment,owned-by", } } k8s.update_config(patch_sset_propagate_annotations) @@ -2547,7 +2568,10 @@ def check_cluster_child_resources_owner_references(self, cluster_name, cluster_n self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed") pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace) - self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "primary pod disruption budget owner reference check failed") + + pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-op-pdb".format(cluster_name), cluster_namespace) + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget for critical operations owner reference check failed") pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed") diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index ec2d359c8..5d0a5b341 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -2,7 +2,7 @@ // +build !ignore_autogenerated /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1a8d6f762..e9a691faa 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -59,16 +59,17 @@ type Config struct { } type kubeResources struct { - Services map[PostgresRole]*v1.Service - Endpoints map[PostgresRole]*v1.Endpoints - PatroniEndpoints map[string]*v1.Endpoints - PatroniConfigMaps map[string]*v1.ConfigMap - Secrets map[types.UID]*v1.Secret - Statefulset *appsv1.StatefulSet - VolumeClaims map[types.UID]*v1.PersistentVolumeClaim - PodDisruptionBudget *policyv1.PodDisruptionBudget - LogicalBackupJob *batchv1.CronJob - Streams map[string]*zalandov1.FabricEventStream + Services map[PostgresRole]*v1.Service + Endpoints map[PostgresRole]*v1.Endpoints + PatroniEndpoints map[string]*v1.Endpoints + PatroniConfigMaps map[string]*v1.ConfigMap + Secrets map[types.UID]*v1.Secret + Statefulset *appsv1.StatefulSet + VolumeClaims map[types.UID]*v1.PersistentVolumeClaim + PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget + CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget + LogicalBackupJob *batchv1.CronJob + Streams map[string]*zalandov1.FabricEventStream //Pods are treated separately } @@ -105,10 +106,17 @@ type Cluster struct { } type compareStatefulsetResult struct { - match bool - replace bool - rollingUpdate bool - reasons []string + match bool + replace bool + rollingUpdate bool + reasons []string + deletedPodAnnotations []string +} + +type compareLogicalBackupJobResult struct { + match bool + reasons []string + deletedPodAnnotations []string } // New creates a new cluster. This function should be called from a controller. @@ -336,14 +344,10 @@ func (c *Cluster) Create() (err error) { c.logger.Infof("secrets have been successfully created") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") - if c.PodDisruptionBudget != nil { - return fmt.Errorf("pod disruption budget already exists in the cluster") + if err = c.createPodDisruptionBudgets(); err != nil { + return fmt.Errorf("could not create pod disruption budgets: %v", err) } - pdb, err := c.createPodDisruptionBudget() - if err != nil { - return fmt.Errorf("could not create pod disruption budget: %v", err) - } - c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta)) + c.logger.Info("pod disruption budgets have been successfully created") if c.Statefulset != nil { return fmt.Errorf("statefulset already exists in the cluster") @@ -431,6 +435,7 @@ func (c *Cluster) Create() (err error) { } func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compareStatefulsetResult { + deletedPodAnnotations := []string{} reasons := make([]string, 0) var match, needsRollUpdate, needsReplace bool @@ -445,7 +450,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa needsReplace = true reasons = append(reasons, "new statefulset's ownerReferences do not match") } - if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations); changed { + if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations, nil); changed { match = false needsReplace = true reasons = append(reasons, "new statefulset's annotations do not match: "+reason) @@ -519,7 +524,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa } } - if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed { + if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations, &deletedPodAnnotations); changed { match = false needsReplace = true reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason) @@ -541,7 +546,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i)) continue } - if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed { + if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations, nil); changed { needsReplace = true reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q do not match the current ones: %s", name, reason)) } @@ -579,7 +584,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa match = false } - return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace} + return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace, deletedPodAnnotations: deletedPodAnnotations} } type containerCondition func(a, b v1.Container) bool @@ -781,7 +786,7 @@ func volumeMountExists(mount v1.VolumeMount, mounts []v1.VolumeMount) bool { return false } -func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) { +func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]string) (bool, string) { reason := "" ignoredAnnotations := make(map[string]bool) for _, ignore := range c.OpConfig.IgnoredAnnotations { @@ -794,6 +799,9 @@ func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) } if _, ok := new[key]; !ok { reason += fmt.Sprintf(" Removed %q.", key) + if removedList != nil { + *removedList = append(*removedList, key) + } } } @@ -836,41 +844,46 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { return true, "" } -func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) { +func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLogicalBackupJobResult { + deletedPodAnnotations := []string{} + reasons := make([]string, 0) + match := true if cur.Spec.Schedule != new.Spec.Schedule { - return false, fmt.Sprintf("new job's schedule %q does not match the current one %q", - new.Spec.Schedule, cur.Spec.Schedule) + match = false + reasons = append(reasons, fmt.Sprintf("new job's schedule %q does not match the current one %q", new.Spec.Schedule, cur.Spec.Schedule)) } newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image curImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image if newImage != curImage { - return false, fmt.Sprintf("new job's image %q does not match the current one %q", - newImage, curImage) + match = false + reasons = append(reasons, fmt.Sprintf("new job's image %q does not match the current one %q", newImage, curImage)) } newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations - if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed { - return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason) + if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation, &deletedPodAnnotations); changed { + match = false + reasons = append(reasons, fmt.Sprint("new job's pod template metadata annotations do not match "+reason)) } newPgVersion := getPgVersion(new) curPgVersion := getPgVersion(cur) if newPgVersion != curPgVersion { - return false, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", - newPgVersion, curPgVersion) + match = false + reasons = append(reasons, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", newPgVersion, curPgVersion)) } needsReplace := false - reasons := make([]string, 0) - needsReplace, reasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, reasons) + contReasons := make([]string, 0) + needsReplace, contReasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, contReasons) if needsReplace { - return false, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(reasons, `', '`)) + match = false + reasons = append(reasons, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(contReasons, `', '`))) } - return true, "" + return &compareLogicalBackupJobResult{match: match, reasons: reasons, deletedPodAnnotations: deletedPodAnnotations} } func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) { @@ -881,7 +894,7 @@ func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBud if !reflect.DeepEqual(new.ObjectMeta.OwnerReferences, cur.ObjectMeta.OwnerReferences) { return false, "new PDB's owner references do not match the current ones" } - if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed { + if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations, nil); changed { return false, "new PDB's annotations do not match the current ones:" + reason } return true, "" @@ -957,6 +970,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { defer c.mu.Unlock() c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating) + + if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) { + // do not apply any major version related changes yet + newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion + } c.setSpec(newSpec) defer func() { @@ -1016,10 +1034,18 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // only when streams were not specified in oldSpec but in newSpec needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 - annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations) - initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser - if initUsers { + + // if inherited annotations differ secrets have to be synced on update + newAnnotations := c.annotationsSet(nil) + oldAnnotations := make(map[string]string) + for _, secret := range c.Secrets { + oldAnnotations = secret.ObjectMeta.Annotations + break + } + annotationsChanged, _ := c.compareAnnotations(oldAnnotations, newAnnotations, nil) + + if initUsers || annotationsChanged { c.logger.Debug("initialize users") if err := c.initUsers(); err != nil { c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err) @@ -1027,8 +1053,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true return } - } - if initUsers || annotationsChanged { + c.logger.Debug("syncing secrets") //TODO: mind the secrets of the deleted/new users if err := c.syncSecrets(); err != nil { @@ -1060,9 +1085,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } - // pod disruption budget - if err := c.syncPodDisruptionBudget(true); err != nil { - c.logger.Errorf("could not sync pod disruption budget: %v", err) + // pod disruption budgets + if err := c.syncPodDisruptionBudgets(true); err != nil { + c.logger.Errorf("could not sync pod disruption budgets: %v", err) updateFailed = true } @@ -1135,6 +1160,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // streams if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) { + c.logger.Debug("syncing streams") if err := c.syncStreams(); err != nil { c.logger.Errorf("could not sync streams: %v", err) updateFailed = true @@ -1207,10 +1233,10 @@ func (c *Cluster) Delete() error { c.logger.Info("not deleting secrets because disabled in configuration") } - if err := c.deletePodDisruptionBudget(); err != nil { + if err := c.deletePodDisruptionBudgets(); err != nil { anyErrors = true - c.logger.Warningf("could not delete pod disruption budget: %v", err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err) + c.logger.Warningf("could not delete pod disruption budgets: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budgets: %v", err) } for _, role := range []PostgresRole{Master, Replica} { @@ -1709,16 +1735,17 @@ func (c *Cluster) GetCurrentProcess() Process { // GetStatus provides status of the cluster func (c *Cluster) GetStatus() *ClusterStatus { status := &ClusterStatus{ - Cluster: c.Name, - Namespace: c.Namespace, - Team: c.Spec.TeamID, - Status: c.Status, - Spec: c.Spec, - MasterService: c.GetServiceMaster(), - ReplicaService: c.GetServiceReplica(), - StatefulSet: c.GetStatefulSet(), - PodDisruptionBudget: c.GetPodDisruptionBudget(), - CurrentProcess: c.GetCurrentProcess(), + Cluster: c.Name, + Namespace: c.Namespace, + Team: c.Spec.TeamID, + Status: c.Status, + Spec: c.Spec, + MasterService: c.GetServiceMaster(), + ReplicaService: c.GetServiceReplica(), + StatefulSet: c.GetStatefulSet(), + PrimaryPodDisruptionBudget: c.GetPrimaryPodDisruptionBudget(), + CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(), + CurrentProcess: c.GetCurrentProcess(), Error: fmt.Errorf("error: %s", c.Error), } @@ -1731,18 +1758,58 @@ func (c *Cluster) GetStatus() *ClusterStatus { return status } -// Switchover does a switchover (via Patroni) to a candidate pod -func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error { +func (c *Cluster) GetSwitchoverSchedule() string { + var possibleSwitchover, schedule time.Time + + now := time.Now().UTC() + for _, window := range c.Spec.MaintenanceWindows { + // in the best case it is possible today + possibleSwitchover = time.Date(now.Year(), now.Month(), now.Day(), window.StartTime.Hour(), window.StartTime.Minute(), 0, 0, time.UTC) + if window.Everyday { + if now.After(possibleSwitchover) { + // we are already past the time for today, try tomorrow + possibleSwitchover = possibleSwitchover.AddDate(0, 0, 1) + } + } else { + if now.Weekday() != window.Weekday { + // get closest possible time for this window + possibleSwitchover = possibleSwitchover.AddDate(0, 0, int((7+window.Weekday-now.Weekday())%7)) + } else if now.After(possibleSwitchover) { + // we are already past the time for today, try next week + possibleSwitchover = possibleSwitchover.AddDate(0, 0, 7) + } + } + + if (schedule == time.Time{}) || possibleSwitchover.Before(schedule) { + schedule = possibleSwitchover + } + } + return schedule.Format("2006-01-02T15:04+00") +} +// Switchover does a switchover (via Patroni) to a candidate pod +func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName, scheduled bool) error { var err error - c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) + stopCh := make(chan struct{}) ch := c.registerPodSubscriber(candidate) defer c.unregisterPodSubscriber(candidate) defer close(stopCh) - if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { + var scheduled_at string + if scheduled { + scheduled_at = c.GetSwitchoverSchedule() + } else { + c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) + scheduled_at = "" + } + + if err = c.patroni.Switchover(curMaster, candidate.Name, scheduled_at); err == nil { + if scheduled { + c.logger.Infof("switchover from %q to %q is scheduled at %s", curMaster.Name, candidate, scheduled_at) + return nil + } c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) _, err = c.waitForPodLabel(ch, stopCh, nil) @@ -1750,6 +1817,9 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e err = fmt.Errorf("could not get master pod label: %v", err) } } else { + if scheduled { + return fmt.Errorf("could not schedule switchover: %v", err) + } err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 897ed6c0d..09d9df972 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -1680,12 +1680,20 @@ func TestCompareLogicalBackupJob(t *testing.T) { } } - match, reason := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob) - if match != tt.match { - t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), match, currentCronJob, desiredCronJob) - } else { - if !strings.HasPrefix(reason, tt.reason) { - t.Errorf("%s - expected reason prefix %s, found %s", t.Name(), tt.reason, reason) + cmp := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob) + if cmp.match != tt.match { + t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), cmp.match, currentCronJob, desiredCronJob) + } else if !cmp.match { + found := false + for _, reason := range cmp.reasons { + if strings.HasPrefix(reason, tt.reason) { + found = true + break + } + found = false + } + if !found { + t.Errorf("%s - expected reason prefix %s, not found in %#v", t.Name(), tt.reason, cmp.reasons) } } }) @@ -2057,3 +2065,91 @@ func TestCompareVolumeMounts(t *testing.T) { }) } } + +func TestGetSwitchoverSchedule(t *testing.T) { + now := time.Now() + + futureTimeStart := now.Add(1 * time.Hour) + futureWindowTimeStart := futureTimeStart.Format("15:04") + futureWindowTimeEnd := now.Add(2 * time.Hour).Format("15:04") + pastTimeStart := now.Add(-2 * time.Hour) + pastWindowTimeStart := pastTimeStart.Format("15:04") + pastWindowTimeEnd := now.Add(-1 * time.Hour).Format("15:04") + + tests := []struct { + name string + windows []acidv1.MaintenanceWindow + expected string + }{ + { + name: "everyday maintenance windows is later today", + windows: []acidv1.MaintenanceWindow{ + { + Everyday: true, + StartTime: mustParseTime(futureWindowTimeStart), + EndTime: mustParseTime(futureWindowTimeEnd), + }, + }, + expected: futureTimeStart.Format("2006-01-02T15:04+00"), + }, + { + name: "everyday maintenance window is tomorrow", + windows: []acidv1.MaintenanceWindow{ + { + Everyday: true, + StartTime: mustParseTime(pastWindowTimeStart), + EndTime: mustParseTime(pastWindowTimeEnd), + }, + }, + expected: pastTimeStart.AddDate(0, 0, 1).Format("2006-01-02T15:04+00"), + }, + { + name: "weekday maintenance windows is later today", + windows: []acidv1.MaintenanceWindow{ + { + Weekday: now.Weekday(), + StartTime: mustParseTime(futureWindowTimeStart), + EndTime: mustParseTime(futureWindowTimeEnd), + }, + }, + expected: futureTimeStart.Format("2006-01-02T15:04+00"), + }, + { + name: "weekday maintenance windows is passed for today", + windows: []acidv1.MaintenanceWindow{ + { + Weekday: now.Weekday(), + StartTime: mustParseTime(pastWindowTimeStart), + EndTime: mustParseTime(pastWindowTimeEnd), + }, + }, + expected: pastTimeStart.AddDate(0, 0, 7).Format("2006-01-02T15:04+00"), + }, + { + name: "choose the earliest window", + windows: []acidv1.MaintenanceWindow{ + { + Weekday: now.AddDate(0, 0, 2).Weekday(), + StartTime: mustParseTime(futureWindowTimeStart), + EndTime: mustParseTime(futureWindowTimeEnd), + }, + { + Everyday: true, + StartTime: mustParseTime(pastWindowTimeStart), + EndTime: mustParseTime(pastWindowTimeEnd), + }, + }, + expected: pastTimeStart.AddDate(0, 0, 1).Format("2006-01-02T15:04+00"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cluster.Spec.MaintenanceWindows = tt.windows + schedule := cluster.GetSwitchoverSchedule() + if schedule != tt.expected { + t.Errorf("Expected GetSwitchoverSchedule to return %s, returned: %s", tt.expected, schedule) + } + }) + } +} diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 6cd46f745..ac4ce67d8 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "encoding/json" "fmt" "reflect" "strings" @@ -977,6 +978,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql err error ) + updatedPodAnnotations := map[string]*string{} syncReason := make([]string, 0) deployment, err = c.KubeClient. Deployments(c.Namespace). @@ -1038,9 +1040,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql } newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec)) - if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed { + deletedPodAnnotations := []string{} + if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations, &deletedPodAnnotations); changed { specSync = true syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...) + + for _, anno := range deletedPodAnnotations { + updatedPodAnnotations[anno] = nil + } + templateMetadataReq := map[string]map[string]map[string]map[string]map[string]*string{ + "spec": {"template": {"metadata": {"annotations": updatedPodAnnotations}}}} + patch, err := json.Marshal(templateMetadataReq) + if err != nil { + return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pod template: %v", role, err) + } + deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(), + deployment.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "") + if err != nil { + c.logger.Errorf("failed to patch %s connection pooler's pod template: %v", role, err) + return nil, err + } + deployment.Spec.Template.Annotations = newPodAnnotations } @@ -1064,7 +1084,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql } newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations - if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations); changed { + if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations, nil); changed { deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations) if err != nil { return nil, err @@ -1098,14 +1118,20 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return nil, fmt.Errorf("could not delete pooler pod: %v", err) } - } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { - patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) + } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations, nil); changed { + metadataReq := map[string]map[string]map[string]*string{"metadata": {}} + + for anno, val := range deployment.Spec.Template.Annotations { + updatedPodAnnotations[anno] = &val + } + metadataReq["metadata"]["annotations"] = updatedPodAnnotations + patch, err := json.Marshal(metadataReq) if err != nil { - return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) + return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pods: %v", role, err) } - _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) if err != nil { - return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) + return nil, fmt.Errorf("could not patch annotations for %s connection pooler's pod %q: %v", role, pod.Name, err) } } } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index ff5536303..fedd6a917 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -109,10 +109,15 @@ func (c *Cluster) servicePort(role PostgresRole) int32 { return pgPort } -func (c *Cluster) podDisruptionBudgetName() string { +func (c *Cluster) PrimaryPodDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } +func (c *Cluster) criticalOpPodDisruptionBudgetName() string { + pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-op-pdb") + return pdbTemplate.Format("cluster", c.Name) +} + func makeDefaultResources(config *config.Config) acidv1.Resources { defaultRequests := acidv1.ResourceDescription{ @@ -1005,6 +1010,9 @@ func (c *Cluster) generateSpiloPodEnvVars( if c.patroniUsesKubernetes() { envVars = append(envVars, v1.EnvVar{Name: "DCS_ENABLE_KUBERNETES_API", Value: "true"}) + if c.OpConfig.EnablePodDisruptionBudget != nil && *c.OpConfig.EnablePodDisruptionBudget { + envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_BOOTSTRAP_LABELS", Value: "{\"critical-operation\":\"true\"}"}) + } } else { envVars = append(envVars, v1.EnvVar{Name: "ETCD_HOST", Value: c.OpConfig.EtcdHost}) } @@ -2207,7 +2215,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript return result } -func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { +func (c *Cluster) generatePrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget { minAvailable := intstr.FromInt(1) pdbEnabled := c.OpConfig.EnablePodDisruptionBudget pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector @@ -2225,7 +2233,36 @@ func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { return &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ - Name: c.podDisruptionBudgetName(), + Name: c.PrimaryPodDisruptionBudgetName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + Annotations: c.annotationsSet(nil), + OwnerReferences: c.ownerReferences(), + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + }, + } +} + +func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { + minAvailable := intstr.FromInt32(c.Spec.NumberOfInstances) + pdbEnabled := c.OpConfig.EnablePodDisruptionBudget + + // if PodDisruptionBudget is disabled or if there are no DB pods, set the budget to 0. + if (pdbEnabled != nil && !(*pdbEnabled)) || c.Spec.NumberOfInstances <= 0 { + minAvailable = intstr.FromInt(0) + } + + labels := c.labelsSet(false) + labels["critical-operation"] = "true" + + return &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.criticalOpPodDisruptionBudgetName(), Namespace: c.Namespace, Labels: c.labelsSet(true), Annotations: c.annotationsSet(nil), diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 612e4525a..137c24081 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2349,22 +2349,34 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } } - testLabelsAndSelectors := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { - masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector - if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { - return fmt.Errorf("Object Namespace incorrect.") - } - if !reflect.DeepEqual(podDisruptionBudget.Labels, map[string]string{"team": "myapp", "cluster-name": "myapp-database"}) { - return fmt.Errorf("Labels incorrect.") - } - if !masterLabelSelectorDisabled && - !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, &metav1.LabelSelector{ - MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}) { + testLabelsAndSelectors := func(isPrimary bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { + return func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { + masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector + if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { + return fmt.Errorf("Object Namespace incorrect.") + } + expectedLabels := map[string]string{"team": "myapp", "cluster-name": "myapp-database"} + if !reflect.DeepEqual(podDisruptionBudget.Labels, expectedLabels) { + return fmt.Errorf("Labels incorrect, got %#v, expected %#v", podDisruptionBudget.Labels, expectedLabels) + } + if !masterLabelSelectorDisabled { + if isPrimary { + expectedLabels := &metav1.LabelSelector{ + MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}} + if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { + return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels) + } + } else { + expectedLabels := &metav1.LabelSelector{ + MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operation": "true"}} + if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { + return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels) + } + } + } - return fmt.Errorf("MatchLabels incorrect.") + return nil } - - return nil } testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { @@ -2400,7 +2412,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2417,7 +2429,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(0), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2434,7 +2446,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(0), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2451,7 +2463,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-databass-budget"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2468,7 +2480,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2485,13 +2497,99 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, } for _, tt := range tests { - result := tt.spec.generatePodDisruptionBudget() + result := tt.spec.generatePrimaryPodDisruptionBudget() + for _, check := range tt.check { + err := check(tt.spec, result) + if err != nil { + t.Errorf("%s [%s]: PodDisruptionBudget spec is incorrect, %+v", + testName, tt.scenario, err) + } + } + } + + testCriticalOp := []struct { + scenario string + spec *Cluster + check []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error + }{ + { + scenario: "With multiple instances", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-op-pdb"), + hasMinAvailable(3), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With zero instances", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-op-pdb"), + hasMinAvailable(0), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With PodDisruptionBudget disabled", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-op-pdb"), + hasMinAvailable(0), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With OwnerReference enabled", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-op-pdb"), + hasMinAvailable(3), + testLabelsAndSelectors(false), + }, + }, + } + + for _, tt := range testCriticalOp { + result := tt.spec.generateCriticalOpPodDisruptionBudget() for _, check := range tt.check { err := check(tt.spec, result) if err != nil { diff --git a/pkg/cluster/majorversionupgrade.go b/pkg/cluster/majorversionupgrade.go index a4ae5f81b..d8a1fb917 100644 --- a/pkg/cluster/majorversionupgrade.go +++ b/pkg/cluster/majorversionupgrade.go @@ -106,6 +106,22 @@ func (c *Cluster) removeFailuresAnnotation() error { return nil } +func (c *Cluster) criticalOperationLabel(pods []v1.Pod, value *string) error { + metadataReq := map[string]map[string]map[string]*string{"metadata": {"labels": {"critical-operation": value}}} + + patchReq, err := json.Marshal(metadataReq) + if err != nil { + return fmt.Errorf("could not marshal ObjectMeta: %v", err) + } + for _, pod := range pods { + _, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchReq, metav1.PatchOptions{}) + if err != nil { + return err + } + } + return nil +} + /* Execute upgrade when mode is set to manual or full or when the owning team is allowed for upgrade (and mode is "off"). @@ -129,17 +145,13 @@ func (c *Cluster) majorVersionUpgrade() error { return nil } - if !isInMainternanceWindow(c.Spec.MaintenanceWindows) { - c.logger.Infof("skipping major version upgrade, not in maintenance window") - return nil - } - pods, err := c.listPods() if err != nil { return err } allRunning := true + isStandbyCluster := false var masterPod *v1.Pod @@ -147,8 +159,9 @@ func (c *Cluster) majorVersionUpgrade() error { ps, _ := c.patroni.GetMemberData(&pod) if ps.Role == "standby_leader" { - c.logger.Errorf("skipping major version upgrade for %s/%s standby cluster. Re-deploy standby cluster with the required Postgres version specified", c.Namespace, c.Name) - return nil + isStandbyCluster = true + c.currentMajorVersion = ps.ServerVersion + break } if ps.State != "running" { @@ -175,6 +188,9 @@ func (c *Cluster) majorVersionUpgrade() error { } c.logger.Infof("recheck cluster version is already up to date. current: %d, min desired: %d", c.currentMajorVersion, desiredVersion) return nil + } else if isStandbyCluster { + c.logger.Warnf("skipping major version upgrade for %s/%s standby cluster. Re-deploy standby cluster with the required Postgres version specified", c.Namespace, c.Name) + return nil } if _, exists := c.ObjectMeta.Annotations[majorVersionUpgradeFailureAnnotation]; exists { @@ -182,6 +198,11 @@ func (c *Cluster) majorVersionUpgrade() error { return nil } + if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) { + c.logger.Infof("skipping major version upgrade, not in maintenance window") + return nil + } + members, err := c.patroni.GetClusterMembers(masterPod) if err != nil { c.logger.Error("could not get cluster members data from Patroni API, skipping major version upgrade") @@ -219,6 +240,17 @@ func (c *Cluster) majorVersionUpgrade() error { if allRunning && masterPod != nil { c.logger.Infof("healthy cluster ready to upgrade, current: %d desired: %d", c.currentMajorVersion, desiredVersion) if c.currentMajorVersion < desiredVersion { + defer func() error { + if err = c.criticalOperationLabel(pods, nil); err != nil { + return fmt.Errorf("failed to remove critical-operation label: %s", err) + } + return nil + }() + val := "true" + if err = c.criticalOperationLabel(pods, &val); err != nil { + return fmt.Errorf("failed to assign critical-operation label: %s", err) + } + podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name} c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods) diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index bd2172c18..7fc95090e 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -280,11 +280,16 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { return fmt.Errorf("could not move pod: %v", err) } + scheduleSwitchover := false + if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) { + c.logger.Infof("postponing switchover, not in maintenance window") + scheduleSwitchover = true + } err = retryutil.Retry(1*time.Minute, 5*time.Minute, func() (bool, error) { - err := c.Switchover(oldMaster, masterCandidateName) + err := c.Switchover(oldMaster, masterCandidateName, scheduleSwitchover) if err != nil { - c.logger.Errorf("could not failover to pod %q: %v", masterCandidateName, err) + c.logger.Errorf("could not switchover to pod %q: %v", masterCandidateName, err) return false, nil } return true, nil @@ -445,7 +450,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp // do not recreate master now so it will keep the update flag and switchover will be retried on next sync return fmt.Errorf("skipping switchover: %v", err) } - if err := c.Switchover(masterPod, masterCandidate); err != nil { + if err := c.Switchover(masterPod, masterCandidate, false); err != nil { return fmt.Errorf("could not perform switch over: %v", err) } } else if newMasterPod == nil && len(replicas) == 0 { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 3f47328ee..2c87efe47 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -23,8 +23,13 @@ const ( ) func (c *Cluster) listResources() error { - if c.PodDisruptionBudget != nil { - c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) + if c.PrimaryPodDisruptionBudget != nil { + c.logger.Infof("found primary pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta), c.PrimaryPodDisruptionBudget.UID) + } + + if c.CriticalOpPodDisruptionBudget != nil { + c.logger.Infof("found pod disruption budget for critical operations: %q (uid: %q)", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta), c.CriticalOpPodDisruptionBudget.UID) + } if c.Statefulset != nil { @@ -162,8 +167,8 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error { return fmt.Errorf("pod %q does not belong to cluster", podName) } - if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name); err != nil { - return fmt.Errorf("could not failover: %v", err) + if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name, ""); err != nil { + return fmt.Errorf("could not switchover: %v", err) } return nil @@ -329,7 +334,7 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe } } - if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed { + if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations, nil); changed { patchData, err := metaAnnotationsPatch(newService.Annotations) if err != nil { return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err) @@ -417,59 +422,166 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset return result } -func (c *Cluster) createPodDisruptionBudget() (*policyv1.PodDisruptionBudget, error) { - podDisruptionBudgetSpec := c.generatePodDisruptionBudget() +func (c *Cluster) createPrimaryPodDisruptionBudget() error { + c.logger.Debug("creating primary pod disruption budget") + if c.PrimaryPodDisruptionBudget != nil { + c.logger.Warning("primary pod disruption budget already exists in the cluster") + return nil + } + + podDisruptionBudgetSpec := c.generatePrimaryPodDisruptionBudget() podDisruptionBudget, err := c.KubeClient. PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) if err != nil { - return nil, err + return err + } + c.logger.Infof("primary pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.PrimaryPodDisruptionBudget = podDisruptionBudget + + return nil +} + +func (c *Cluster) createCriticalOpPodDisruptionBudget() error { + c.logger.Debug("creating pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget != nil { + c.logger.Warning("pod disruption budget for critical operations already exists in the cluster") + return nil + } + + podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget() + podDisruptionBudget, err := c.KubeClient. + PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). + Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) + + if err != nil { + return err + } + c.logger.Infof("pod disruption budget for critical operations %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = podDisruptionBudget + + return nil +} + +func (c *Cluster) createPodDisruptionBudgets() error { + errors := make([]string, 0) + + err := c.createPrimaryPodDisruptionBudget() + if err != nil { + errors = append(errors, fmt.Sprintf("could not create primary pod disruption budget: %v", err)) + } + + err = c.createCriticalOpPodDisruptionBudget() + if err != nil { + errors = append(errors, fmt.Sprintf("could not create pod disruption budget for critical operations: %v", err)) + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil +} + +func (c *Cluster) updatePrimaryPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating primary pod disruption budget") + if c.PrimaryPodDisruptionBudget == nil { + return fmt.Errorf("there is no primary pod disruption budget in the cluster") + } + + if err := c.deletePrimaryPodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete primary pod disruption budget: %v", err) + } + + newPdb, err := c.KubeClient. + PodDisruptionBudgets(pdb.Namespace). + Create(context.TODO(), pdb, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("could not create primary pod disruption budget: %v", err) } - c.PodDisruptionBudget = podDisruptionBudget + c.PrimaryPodDisruptionBudget = newPdb - return podDisruptionBudget, nil + return nil } -func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { - if c.PodDisruptionBudget == nil { - return fmt.Errorf("there is no pod disruption budget in the cluster") +func (c *Cluster) updateCriticalOpPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget == nil { + return fmt.Errorf("there is no pod disruption budget for critical operations in the cluster") } - if err := c.deletePodDisruptionBudget(); err != nil { - return fmt.Errorf("could not delete pod disruption budget: %v", err) + if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) } newPdb, err := c.KubeClient. PodDisruptionBudgets(pdb.Namespace). Create(context.TODO(), pdb, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("could not create pod disruption budget: %v", err) + return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err) + } + c.CriticalOpPodDisruptionBudget = newPdb + + return nil +} + +func (c *Cluster) deletePrimaryPodDisruptionBudget() error { + c.logger.Debug("deleting primary pod disruption budget") + if c.PrimaryPodDisruptionBudget == nil { + c.logger.Debug("there is no primary pod disruption budget in the cluster") + return nil + } + + pdbName := util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta) + err := c.KubeClient. + PodDisruptionBudgets(c.PrimaryPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.PrimaryPodDisruptionBudget.Name, c.deleteOptions) + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)) + } else if err != nil { + return fmt.Errorf("could not delete primary pod disruption budget: %v", err) + } + + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)) + c.PrimaryPodDisruptionBudget = nil + + err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + func() (bool, error) { + _, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{}) + if err2 == nil { + return false, nil + } + if k8sutil.ResourceNotFound(err2) { + return true, nil + } + return false, err2 + }) + if err != nil { + return fmt.Errorf("could not delete primary pod disruption budget: %v", err) } - c.PodDisruptionBudget = newPdb return nil } -func (c *Cluster) deletePodDisruptionBudget() error { - c.logger.Debug("deleting pod disruption budget") - if c.PodDisruptionBudget == nil { - c.logger.Debug("there is no pod disruption budget in the cluster") +func (c *Cluster) deleteCriticalOpPodDisruptionBudget() error { + c.logger.Debug("deleting pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget == nil { + c.logger.Debug("there is no pod disruption budget for critical operations in the cluster") return nil } - pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta) + pdbName := util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta) err := c.KubeClient. - PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). - Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions) + PodDisruptionBudgets(c.CriticalOpPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.CriticalOpPodDisruptionBudget.Name, c.deleteOptions) if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)) } else if err != nil { - return fmt.Errorf("could not delete PodDisruptionBudget: %v", err) + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) } - c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) - c.PodDisruptionBudget = nil + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = nil err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { @@ -483,12 +595,29 @@ func (c *Cluster) deletePodDisruptionBudget() error { return false, err2 }) if err != nil { - return fmt.Errorf("could not delete pod disruption budget: %v", err) + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) } return nil } +func (c *Cluster) deletePodDisruptionBudgets() error { + errors := make([]string, 0) + + if err := c.deletePrimaryPodDisruptionBudget(); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil +} + func (c *Cluster) deleteEndpoint(role PostgresRole) error { c.setProcessName("deleting endpoint") c.logger.Debugf("deleting %s endpoint", role) @@ -705,7 +834,12 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { return c.Statefulset } -// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget -func (c *Cluster) GetPodDisruptionBudget() *policyv1.PodDisruptionBudget { - return c.PodDisruptionBudget +// GetPrimaryPodDisruptionBudget returns cluster's primary kubernetes PodDisruptionBudget +func (c *Cluster) GetPrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.PrimaryPodDisruptionBudget +} + +// GetCriticalOpPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations +func (c *Cluster) GetCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.CriticalOpPodDisruptionBudget } diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 9e2c7482a..bf9be3fb4 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -114,10 +114,10 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za } for slotName, slotAndPublication := range databaseSlotsList { - tables := slotAndPublication.Publication - tableNames := make([]string, len(tables)) + newTables := slotAndPublication.Publication + tableNames := make([]string, len(newTables)) i := 0 - for t := range tables { + for t := range newTables { tableName, schemaName := getTableSchema(t) tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) i++ @@ -126,6 +126,12 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za tableList := strings.Join(tableNames, ", ") currentTables, exists := currentPublications[slotName] + // if newTables is empty it means that it's definition was removed from streams section + // but when slot is defined in manifest we should sync publications, too + // by reusing current tables we make sure it is not + if len(newTables) == 0 { + tableList = currentTables + } if !exists { createPublications[slotName] = tableList } else if currentTables != tableList { @@ -350,16 +356,8 @@ func (c *Cluster) syncStreams() error { return nil } - databaseSlots := make(map[string]map[string]zalandov1.Slot) - slotsToSync := make(map[string]map[string]string) - requiredPatroniConfig := c.Spec.Patroni - - if len(requiredPatroniConfig.Slots) > 0 { - for slotName, slotConfig := range requiredPatroniConfig.Slots { - slotsToSync[slotName] = slotConfig - } - } - + // create map with every database and empty slot defintion + // we need it to detect removal of streams from databases if err := c.initDbConn(); err != nil { return fmt.Errorf("could not init database connection") } @@ -372,13 +370,28 @@ func (c *Cluster) syncStreams() error { if err != nil { return fmt.Errorf("could not get list of databases: %v", err) } - // get database name with empty list of slot, except template0 and template1 + databaseSlots := make(map[string]map[string]zalandov1.Slot) for dbName := range listDatabases { if dbName != "template0" && dbName != "template1" { databaseSlots[dbName] = map[string]zalandov1.Slot{} } } + // need to take explicitly defined slots into account whey syncing Patroni config + slotsToSync := make(map[string]map[string]string) + requiredPatroniConfig := c.Spec.Patroni + if len(requiredPatroniConfig.Slots) > 0 { + for slotName, slotConfig := range requiredPatroniConfig.Slots { + slotsToSync[slotName] = slotConfig + if _, exists := databaseSlots[slotConfig["database"]]; exists { + databaseSlots[slotConfig["database"]][slotName] = zalandov1.Slot{ + Slot: slotConfig, + Publication: make(map[string]acidv1.StreamTable), + } + } + } + } + // get list of required slots and publications, group by database for _, stream := range c.Spec.Streams { if _, exists := databaseSlots[stream.Database]; !exists { @@ -391,13 +404,13 @@ func (c *Cluster) syncStreams() error { "type": "logical", } slotName := getSlotName(stream.Database, stream.ApplicationId) - if _, exists := databaseSlots[stream.Database][slotName]; !exists { + slotAndPublication, exists := databaseSlots[stream.Database][slotName] + if !exists { databaseSlots[stream.Database][slotName] = zalandov1.Slot{ Slot: slot, Publication: stream.Tables, } } else { - slotAndPublication := databaseSlots[stream.Database][slotName] streamTables := slotAndPublication.Publication for tableName, table := range stream.Tables { if _, exists := streamTables[tableName]; !exists { @@ -492,16 +505,17 @@ func (c *Cluster) syncStream(appId string) error { continue } streamExists = true + c.Streams[appId] = &stream desiredStreams := c.generateFabricEventStream(appId) if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences c.setProcessName("updating event streams with applicationId %s", appId) - stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) + updatedStream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) } - c.Streams[appId] = stream + c.Streams[appId] = updatedStream } if match, reason := c.compareStreams(&stream, desiredStreams); !match { c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) @@ -545,7 +559,7 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab for newKey, newValue := range newEventStreams.Annotations { desiredAnnotations[newKey] = newValue } - if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed { + if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations, nil); changed { match = false reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason)) } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index d1a339001..797e7a5aa 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -97,6 +97,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } + if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) { + // do not apply any major version related changes yet + newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion + } + if err = c.syncStatefulSet(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { err = fmt.Errorf("could not sync statefulsets: %v", err) @@ -112,8 +117,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } c.logger.Debug("syncing pod disruption budgets") - if err = c.syncPodDisruptionBudget(false); err != nil { - err = fmt.Errorf("could not sync pod disruption budget: %v", err) + if err = c.syncPodDisruptionBudgets(false); err != nil { + err = fmt.Errorf("could not sync pod disruption budgets: %v", err) return err } @@ -148,7 +153,10 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return fmt.Errorf("could not sync connection pooler: %v", err) } - if len(c.Spec.Streams) > 0 { + // sync if manifest stream count is different from stream CR count + // it can be that they are always different due to grouping of manifest streams + // but we would catch missed removals on update + if len(c.Spec.Streams) != len(c.Streams) { c.logger.Debug("syncing streams") if err = c.syncStreams(); err != nil { err = fmt.Errorf("could not sync streams: %v", err) @@ -230,7 +238,7 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { maps.Copy(annotations, cm.Annotations) // Patroni can add extra annotations so incl. current annotations in desired annotations desiredAnnotations := c.annotationsSet(cm.Annotations) - if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { + if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { return fmt.Errorf("could not form patch for %s config map: %v", configMapName, err) @@ -275,7 +283,7 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { maps.Copy(annotations, ep.Annotations) // Patroni can add extra annotations so incl. current annotations in desired annotations desiredAnnotations := c.annotationsSet(ep.Annotations) - if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { + if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { return fmt.Errorf("could not form patch for %s endpoint: %v", endpointName, err) @@ -320,7 +328,7 @@ func (c *Cluster) syncPatroniService() error { maps.Copy(annotations, svc.Annotations) // Patroni can add extra annotations so incl. current annotations in desired annotations desiredAnnotations := c.annotationsSet(svc.Annotations) - if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { + if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { return fmt.Errorf("could not form patch for %s service: %v", serviceName, err) @@ -412,7 +420,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return fmt.Errorf("could not update %s endpoint: %v", role, err) } } else { - if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed { + if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredEp.Annotations) if err != nil { return fmt.Errorf("could not form patch for %s endpoint: %v", role, err) @@ -447,22 +455,61 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return nil } -func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { +func (c *Cluster) syncPrimaryPodDisruptionBudget(isUpdate bool) error { + var ( + pdb *policyv1.PodDisruptionBudget + err error + ) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.PrimaryPodDisruptionBudget = pdb + newPDB := c.generatePrimaryPodDisruptionBudget() + match, reason := c.comparePodDisruptionBudget(pdb, newPDB) + if !match { + c.logPDBChanges(pdb, newPDB, isUpdate, reason) + if err = c.updatePrimaryPodDisruptionBudget(newPDB); err != nil { + return err + } + } else { + c.PrimaryPodDisruptionBudget = pdb + } + return nil + + } + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get pod disruption budget: %v", err) + } + // no existing pod disruption budget, create new one + c.logger.Infof("could not find the primary pod disruption budget") + + if err = c.createPrimaryPodDisruptionBudget(); err != nil { + if !k8sutil.ResourceAlreadyExists(err) { + return fmt.Errorf("could not create primary pod disruption budget: %v", err) + } + c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) + } + } + + return nil +} + +func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error { var ( pdb *policyv1.PodDisruptionBudget err error ) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { - c.PodDisruptionBudget = pdb - newPDB := c.generatePodDisruptionBudget() + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.CriticalOpPodDisruptionBudget = pdb + newPDB := c.generateCriticalOpPodDisruptionBudget() match, reason := c.comparePodDisruptionBudget(pdb, newPDB) if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) - if err = c.updatePodDisruptionBudget(newPDB); err != nil { + if err = c.updateCriticalOpPodDisruptionBudget(newPDB); err != nil { return err } } else { - c.PodDisruptionBudget = pdb + c.CriticalOpPodDisruptionBudget = pdb } return nil @@ -471,21 +518,35 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { return fmt.Errorf("could not get pod disruption budget: %v", err) } // no existing pod disruption budget, create new one - c.logger.Infof("could not find the cluster's pod disruption budget") + c.logger.Infof("could not find pod disruption budget for critical operations") - if pdb, err = c.createPodDisruptionBudget(); err != nil { + if err = c.createCriticalOpPodDisruptionBudget(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { - return fmt.Errorf("could not create pod disruption budget: %v", err) + return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err) } c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) } } - c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - c.PodDisruptionBudget = pdb + return nil +} + +func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error { + errors := make([]string, 0) + if err := c.syncPrimaryPodDisruptionBudget(isUpdate); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if err := c.syncCriticalOpPodDisruptionBudget(isUpdate); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } return nil } @@ -497,6 +558,7 @@ func (c *Cluster) syncStatefulSet() error { ) podsToRecreate := make([]v1.Pod, 0) isSafeToRecreatePods := true + postponeReasons := make([]string, 0) switchoverCandidates := make([]spec.NamespacedName, 0) pods, err := c.listPods() @@ -561,13 +623,22 @@ func (c *Cluster) syncStatefulSet() error { cmp := c.compareStatefulSetWith(desiredSts) if !cmp.rollingUpdate { + updatedPodAnnotations := map[string]*string{} + for _, anno := range cmp.deletedPodAnnotations { + updatedPodAnnotations[anno] = nil + } + for anno, val := range desiredSts.Spec.Template.Annotations { + updatedPodAnnotations[anno] = &val + } + metadataReq := map[string]map[string]map[string]*string{"metadata": {"annotations": updatedPodAnnotations}} + patch, err := json.Marshal(metadataReq) + if err != nil { + return fmt.Errorf("could not form patch for pod annotations: %v", err) + } + for _, pod := range pods { - if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations); changed { - patchData, err := metaAnnotationsPatch(desiredSts.Spec.Template.Annotations) - if err != nil { - return fmt.Errorf("could not form patch for pod %q annotations: %v", pod.Name, err) - } - _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations, nil); changed { + _, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) if err != nil { return fmt.Errorf("could not patch annotations for pod %q: %v", pod.Name, err) } @@ -646,12 +717,14 @@ func (c *Cluster) syncStatefulSet() error { c.logger.Debug("syncing Patroni config") if configPatched, restartPrimaryFirst, restartWait, err = c.syncPatroniConfig(pods, c.Spec.Patroni, requiredPgParameters); err != nil { c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err) + postponeReasons = append(postponeReasons, "errors during Patroni config sync") isSafeToRecreatePods = false } // restart Postgres where it is still pending if err = c.restartInstances(pods, restartWait, restartPrimaryFirst); err != nil { c.logger.Errorf("errors while restarting Postgres in pods via Patroni API: %v", err) + postponeReasons = append(postponeReasons, "errors while restarting Postgres via Patroni API") isSafeToRecreatePods = false } @@ -666,7 +739,7 @@ func (c *Cluster) syncStatefulSet() error { } c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") } else { - c.logger.Warningf("postpone pod recreation until next sync because of errors during config sync") + c.logger.Warningf("postpone pod recreation until next sync - reason: %s", strings.Join(postponeReasons, `', '`)) } } @@ -1142,7 +1215,7 @@ func (c *Cluster) updateSecret( c.Secrets[secret.UID] = secret } - if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed { + if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations, nil); changed { patchData, err := metaAnnotationsPatch(generatedSecret.Annotations) if err != nil { return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) @@ -1587,19 +1660,38 @@ func (c *Cluster) syncLogicalBackupJob() error { } c.logger.Infof("logical backup job %s updated", c.getLogicalBackupJobName()) } - if match, reason := c.compareLogicalBackupJob(job, desiredJob); !match { + if cmp := c.compareLogicalBackupJob(job, desiredJob); !cmp.match { c.logger.Infof("logical job %s is not in the desired state and needs to be updated", c.getLogicalBackupJobName(), ) - if reason != "" { - c.logger.Infof("reason: %s", reason) + if len(cmp.reasons) != 0 { + for _, reason := range cmp.reasons { + c.logger.Infof("reason: %s", reason) + } + } + if len(cmp.deletedPodAnnotations) != 0 { + templateMetadataReq := map[string]map[string]map[string]map[string]map[string]map[string]map[string]*string{ + "spec": {"jobTemplate": {"spec": {"template": {"metadata": {"annotations": {}}}}}}} + for _, anno := range cmp.deletedPodAnnotations { + templateMetadataReq["spec"]["jobTemplate"]["spec"]["template"]["metadata"]["annotations"][anno] = nil + } + patch, err := json.Marshal(templateMetadataReq) + if err != nil { + return fmt.Errorf("could not marshal ObjectMeta for logical backup job %q pod template: %v", jobName, err) + } + + job, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), jobName, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "") + if err != nil { + c.logger.Errorf("failed to remove annotations from the logical backup job %q pod template: %v", jobName, err) + return err + } } if err = c.patchLogicalBackupJob(desiredJob); err != nil { return fmt.Errorf("could not update logical backup job to match desired state: %v", err) } c.logger.Info("the logical backup job is synced") } - if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations); changed { + if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredJob.Annotations) if err != nil { return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err) diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index d45a193cb..f9d1d7873 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -142,6 +142,181 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) { } } +func TestPodAnnotationsSync(t *testing.T) { + clusterName := "acid-test-cluster-2" + namespace := "default" + podAnnotation := "no-scale-down" + podAnnotations := map[string]string{podAnnotation: "true"} + customPodAnnotation := "foo" + customPodAnnotations := map[string]string{customPodAnnotation: "true"} + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := mocks.NewMockHTTPClient(ctrl) + client, _ := newFakeK8sAnnotationsClient() + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + Volume: acidv1.Volume{ + Size: "1Gi", + }, + EnableConnectionPooler: boolToPointer(true), + EnableLogicalBackup: true, + EnableReplicaConnectionPooler: boolToPointer(true), + PodAnnotations: podAnnotations, + NumberOfInstances: 2, + }, + } + + var cluster = New( + Config{ + OpConfig: config.Config{ + PatroniAPICheckInterval: time.Duration(1), + PatroniAPICheckTimeout: time.Duration(5), + PodManagementPolicy: "ordered_ready", + CustomPodAnnotations: customPodAnnotations, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: k8sutil.Int32ToPointer(1), + }, + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + MaxInstances: -1, + PodRoleLabel: "spilo-role", + ResourceCheckInterval: time.Duration(3), + ResourceCheckTimeout: time.Duration(10), + }, + }, + }, client, pg, logger, eventRecorder) + + configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}` + response := http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(configJson))), + } + + mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes() + cluster.patroni = patroni.New(patroniLogger, mockClient) + cluster.Name = clusterName + cluster.Namespace = namespace + clusterOptions := clusterLabelsOptions(cluster) + + // create a statefulset + _, err := cluster.createStatefulSet() + assert.NoError(t, err) + // create a pods + podsList := createPods(cluster) + for _, pod := range podsList { + _, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + assert.NoError(t, err) + } + // create connection pooler + _, err = cluster.createConnectionPooler(mockInstallLookupFunction) + assert.NoError(t, err) + + // create cron job + err = cluster.createLogicalBackupJob() + assert.NoError(t, err) + + annotateResources(cluster) + err = cluster.Sync(&cluster.Postgresql) + assert.NoError(t, err) + + // 1. PodAnnotations set + stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, sts := range stsList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.Contains(t, sts.Spec.Template.Annotations, annotation) + } + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + assert.NoError(t, err) + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.Contains(t, deploy.Spec.Template.Annotations, annotation, + fmt.Sprintf("pooler deployment pod template %s should contain annotation %s, found %#v", + deploy.Name, annotation, deploy.Spec.Template.Annotations)) + } + } + + podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, pod := range podList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.Contains(t, pod.Annotations, annotation, + fmt.Sprintf("pod %s should contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations)) + } + } + + cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, cronJob := range cronJobList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.Contains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation, + fmt.Sprintf("logical backup cron job's pod template should contain annotation %s, found %#v", + annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) + } + } + + // 2 PodAnnotations removed + newSpec := cluster.Postgresql.DeepCopy() + newSpec.Spec.PodAnnotations = nil + cluster.OpConfig.CustomPodAnnotations = nil + err = cluster.Sync(newSpec) + assert.NoError(t, err) + + stsList, err = cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, sts := range stsList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.NotContains(t, sts.Spec.Template.Annotations, annotation) + } + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + assert.NoError(t, err) + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.NotContains(t, deploy.Spec.Template.Annotations, annotation, + fmt.Sprintf("pooler deployment pod template %s should not contain annotation %s, found %#v", + deploy.Name, annotation, deploy.Spec.Template.Annotations)) + } + } + + podList, err = cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, pod := range podList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.NotContains(t, pod.Annotations, annotation, + fmt.Sprintf("pod %s should not contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations)) + } + } + + cronJobList, err = cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, cronJob := range cronJobList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.NotContains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation, + fmt.Sprintf("logical backup cron job's pod template should not contain annotation %s, found %#v", + annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) + } + } +} + func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { testName := "test config comparison" client, _ := newFakeK8sSyncClient() diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 8e9263d49..17c4e705e 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -58,15 +58,16 @@ type WorkerStatus struct { // ClusterStatus describes status of the cluster type ClusterStatus struct { - Team string - Cluster string - Namespace string - MasterService *v1.Service - ReplicaService *v1.Service - MasterEndpoint *v1.Endpoints - ReplicaEndpoint *v1.Endpoints - StatefulSet *appsv1.StatefulSet - PodDisruptionBudget *policyv1.PodDisruptionBudget + Team string + Cluster string + Namespace string + MasterService *v1.Service + ReplicaService *v1.Service + MasterEndpoint *v1.Endpoints + ReplicaEndpoint *v1.Endpoints + StatefulSet *appsv1.StatefulSet + PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget + CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget CurrentProcess Process Worker uint32 diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index c570fcc3a..0e31ecc32 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -663,7 +663,7 @@ func parseResourceRequirements(resourcesRequirement v1.ResourceRequirements) (ac return resources, nil } -func isInMainternanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool { +func isInMaintenanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool { if len(specMaintenanceWindows) == 0 { return true } diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 2cb755c6c..9cd7dc7e9 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -247,18 +247,18 @@ func createPods(cluster *Cluster) []v1.Pod { for i, role := range []PostgresRole{Master, Replica} { podsList = append(podsList, v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%d", clusterName, i), + Name: fmt.Sprintf("%s-%d", cluster.Name, i), Namespace: namespace, Labels: map[string]string{ "application": "spilo", - "cluster-name": clusterName, + "cluster-name": cluster.Name, "spilo-role": string(role), }, }, }) podsList = append(podsList, v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-pooler-%s", clusterName, role), + Name: fmt.Sprintf("%s-pooler-%s", cluster.Name, role), Namespace: namespace, Labels: cluster.connectionPoolerLabels(role, true).MatchLabels, }, @@ -329,7 +329,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, if err != nil { return nil, err } - _, err = cluster.createPodDisruptionBudget() + err = cluster.createPodDisruptionBudgets() if err != nil { return nil, err } @@ -705,8 +705,8 @@ func TestIsInMaintenanceWindow(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cluster.Spec.MaintenanceWindows = tt.windows - if isInMainternanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected { - t.Errorf("Expected isInMainternanceWindow to return %t", tt.expected) + if isInMaintenanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected { + t.Errorf("Expected isInMaintenanceWindow to return %t", tt.expected) } }) } diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 240220ccf..fee18beaf 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -225,7 +225,7 @@ func (c *Cluster) syncVolumeClaims() error { } newAnnotations := c.annotationsSet(nil) - if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations); changed { + if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations, nil); changed { patchData, err := metaAnnotationsPatch(newAnnotations) if err != nil { return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err) diff --git a/pkg/generated/clientset/versioned/clientset.go b/pkg/generated/clientset/versioned/clientset.go index 741b3e21f..69725a952 100644 --- a/pkg/generated/clientset/versioned/clientset.go +++ b/pkg/generated/clientset/versioned/clientset.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/doc.go b/pkg/generated/clientset/versioned/doc.go index 4c3683194..34b48f910 100644 --- a/pkg/generated/clientset/versioned/doc.go +++ b/pkg/generated/clientset/versioned/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/fake/clientset_generated.go b/pkg/generated/clientset/versioned/fake/clientset_generated.go index 5250dd68b..c85ad76f9 100644 --- a/pkg/generated/clientset/versioned/fake/clientset_generated.go +++ b/pkg/generated/clientset/versioned/fake/clientset_generated.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/fake/doc.go b/pkg/generated/clientset/versioned/fake/doc.go index 78f1ed834..7548400fa 100644 --- a/pkg/generated/clientset/versioned/fake/doc.go +++ b/pkg/generated/clientset/versioned/fake/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/fake/register.go b/pkg/generated/clientset/versioned/fake/register.go index 9939eaa93..225705881 100644 --- a/pkg/generated/clientset/versioned/fake/register.go +++ b/pkg/generated/clientset/versioned/fake/register.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/scheme/doc.go b/pkg/generated/clientset/versioned/scheme/doc.go index 1aa580cc1..1f79f0496 100644 --- a/pkg/generated/clientset/versioned/scheme/doc.go +++ b/pkg/generated/clientset/versioned/scheme/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/scheme/register.go b/pkg/generated/clientset/versioned/scheme/register.go index 0256820e2..6bbec0e5e 100644 --- a/pkg/generated/clientset/versioned/scheme/register.go +++ b/pkg/generated/clientset/versioned/scheme/register.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/acid.zalan.do_client.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/acid.zalan.do_client.go index cef5d984a..e070c7098 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/acid.zalan.do_client.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/acid.zalan.do_client.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/doc.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/doc.go index 34e16f7ad..5c6f06565 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/doc.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/doc.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/doc.go index a5ceefe98..63b4b5b8f 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/doc.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_acid.zalan.do_client.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_acid.zalan.do_client.go index c786701b9..d45375335 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_acid.zalan.do_client.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_acid.zalan.do_client.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_operatorconfiguration.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_operatorconfiguration.go index 5b0c852d8..de1b9a0e3 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_operatorconfiguration.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_operatorconfiguration.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_postgresql.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_postgresql.go index edc3578b7..b472c6057 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_postgresql.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_postgresql.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_postgresteam.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_postgresteam.go index 423483119..5801666c8 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_postgresteam.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake/fake_postgresteam.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/generated_expansion.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/generated_expansion.go index ba0d6503a..8a5e126d7 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/generated_expansion.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/generated_expansion.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/operatorconfiguration.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/operatorconfiguration.go index c2e39dd9d..c941551ca 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/operatorconfiguration.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/operatorconfiguration.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/postgresql.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/postgresql.go index cab484ec0..23133d22a 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/postgresql.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/postgresql.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/postgresteam.go b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/postgresteam.go index 132eac654..c62f6c9d7 100644 --- a/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/postgresteam.go +++ b/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/postgresteam.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/doc.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/doc.go index 34e16f7ad..5c6f06565 100644 --- a/pkg/generated/clientset/versioned/typed/zalando.org/v1/doc.go +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fabriceventstream.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fabriceventstream.go index 1e0db5ff4..ae4a267d3 100644 --- a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fabriceventstream.go +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fabriceventstream.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/doc.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/doc.go index a5ceefe98..63b4b5b8f 100644 --- a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/doc.go +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_fabriceventstream.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_fabriceventstream.go index b6eaa80e0..9885d8755 100644 --- a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_fabriceventstream.go +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_fabriceventstream.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_zalando.org_client.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_zalando.org_client.go index bfc56cce5..049cc72b2 100644 --- a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_zalando.org_client.go +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_zalando.org_client.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/generated_expansion.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/generated_expansion.go index 8d52d5161..4d1d3e37e 100644 --- a/pkg/generated/clientset/versioned/typed/zalando.org/v1/generated_expansion.go +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/generated_expansion.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/zalando.org_client.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/zalando.org_client.go index 7a5fc24b0..a14c4dee3 100644 --- a/pkg/generated/clientset/versioned/typed/zalando.org/v1/zalando.org_client.go +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/zalando.org_client.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/acid.zalan.do/interface.go b/pkg/generated/informers/externalversions/acid.zalan.do/interface.go index d60c3a005..74f5b0458 100644 --- a/pkg/generated/informers/externalversions/acid.zalan.do/interface.go +++ b/pkg/generated/informers/externalversions/acid.zalan.do/interface.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/acid.zalan.do/v1/interface.go b/pkg/generated/informers/externalversions/acid.zalan.do/v1/interface.go index 630d8155f..24950b6fd 100644 --- a/pkg/generated/informers/externalversions/acid.zalan.do/v1/interface.go +++ b/pkg/generated/informers/externalversions/acid.zalan.do/v1/interface.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/acid.zalan.do/v1/postgresql.go b/pkg/generated/informers/externalversions/acid.zalan.do/v1/postgresql.go index 6324c6a47..179562e4c 100644 --- a/pkg/generated/informers/externalversions/acid.zalan.do/v1/postgresql.go +++ b/pkg/generated/informers/externalversions/acid.zalan.do/v1/postgresql.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/acid.zalan.do/v1/postgresteam.go b/pkg/generated/informers/externalversions/acid.zalan.do/v1/postgresteam.go index 4835da430..79e6e872a 100644 --- a/pkg/generated/informers/externalversions/acid.zalan.do/v1/postgresteam.go +++ b/pkg/generated/informers/externalversions/acid.zalan.do/v1/postgresteam.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/factory.go b/pkg/generated/informers/externalversions/factory.go index feed75ec0..2169366b5 100644 --- a/pkg/generated/informers/externalversions/factory.go +++ b/pkg/generated/informers/externalversions/factory.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/generic.go b/pkg/generated/informers/externalversions/generic.go index f0d1921f1..66d94b2a2 100644 --- a/pkg/generated/informers/externalversions/generic.go +++ b/pkg/generated/informers/externalversions/generic.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/internalinterfaces/factory_interfaces.go b/pkg/generated/informers/externalversions/internalinterfaces/factory_interfaces.go index 520853242..a5d7b2299 100644 --- a/pkg/generated/informers/externalversions/internalinterfaces/factory_interfaces.go +++ b/pkg/generated/informers/externalversions/internalinterfaces/factory_interfaces.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/zalando.org/interface.go b/pkg/generated/informers/externalversions/zalando.org/interface.go index ceddb918f..aab6846cb 100644 --- a/pkg/generated/informers/externalversions/zalando.org/interface.go +++ b/pkg/generated/informers/externalversions/zalando.org/interface.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/zalando.org/v1/fabriceventstream.go b/pkg/generated/informers/externalversions/zalando.org/v1/fabriceventstream.go index 07a8d2a2c..2e767f426 100644 --- a/pkg/generated/informers/externalversions/zalando.org/v1/fabriceventstream.go +++ b/pkg/generated/informers/externalversions/zalando.org/v1/fabriceventstream.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/informers/externalversions/zalando.org/v1/interface.go b/pkg/generated/informers/externalversions/zalando.org/v1/interface.go index 0a47f9132..3b61f68a1 100644 --- a/pkg/generated/informers/externalversions/zalando.org/v1/interface.go +++ b/pkg/generated/informers/externalversions/zalando.org/v1/interface.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/listers/acid.zalan.do/v1/expansion_generated.go b/pkg/generated/listers/acid.zalan.do/v1/expansion_generated.go index e7eefa957..dff5ce3f1 100644 --- a/pkg/generated/listers/acid.zalan.do/v1/expansion_generated.go +++ b/pkg/generated/listers/acid.zalan.do/v1/expansion_generated.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/listers/acid.zalan.do/v1/postgresql.go b/pkg/generated/listers/acid.zalan.do/v1/postgresql.go index 8f4d441d7..de713421f 100644 --- a/pkg/generated/listers/acid.zalan.do/v1/postgresql.go +++ b/pkg/generated/listers/acid.zalan.do/v1/postgresql.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/listers/acid.zalan.do/v1/postgresteam.go b/pkg/generated/listers/acid.zalan.do/v1/postgresteam.go index 565167127..52256d158 100644 --- a/pkg/generated/listers/acid.zalan.do/v1/postgresteam.go +++ b/pkg/generated/listers/acid.zalan.do/v1/postgresteam.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/listers/zalando.org/v1/expansion_generated.go b/pkg/generated/listers/zalando.org/v1/expansion_generated.go index ea9d331ff..201fa4ecf 100644 --- a/pkg/generated/listers/zalando.org/v1/expansion_generated.go +++ b/pkg/generated/listers/zalando.org/v1/expansion_generated.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/generated/listers/zalando.org/v1/fabriceventstream.go b/pkg/generated/listers/zalando.org/v1/fabriceventstream.go index fe29c44d4..7c04027bf 100644 --- a/pkg/generated/listers/zalando.org/v1/fabriceventstream.go +++ b/pkg/generated/listers/zalando.org/v1/fabriceventstream.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Compose, Zalando SE +Copyright 2025 Compose, Zalando SE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index 4d580f1c2..2129f1acc 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -20,19 +20,19 @@ import ( ) const ( - failoverPath = "/failover" - configPath = "/config" - clusterPath = "/cluster" - statusPath = "/patroni" - restartPath = "/restart" - ApiPort = 8008 - timeout = 30 * time.Second + switchoverPath = "/switchover" + configPath = "/config" + clusterPath = "/cluster" + statusPath = "/patroni" + restartPath = "/restart" + ApiPort = 8008 + timeout = 30 * time.Second ) // Interface describe patroni methods type Interface interface { GetClusterMembers(master *v1.Pod) ([]ClusterMember, error) - Switchover(master *v1.Pod, candidate string) error + Switchover(master *v1.Pod, candidate string, scheduled_at string) error SetPostgresParameters(server *v1.Pod, options map[string]string) error SetStandbyClusterParameters(server *v1.Pod, options map[string]interface{}) error GetMemberData(server *v1.Pod) (MemberData, error) @@ -103,7 +103,7 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) } }() - if resp.StatusCode != http.StatusOK { + if resp.StatusCode < http.StatusOK || resp.StatusCode >= 300 { bodyBytes, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("could not read response: %v", err) @@ -128,7 +128,7 @@ func (p *Patroni) httpGet(url string) (string, error) { return "", fmt.Errorf("could not read response: %v", err) } - if response.StatusCode != http.StatusOK { + if response.StatusCode < http.StatusOK || response.StatusCode >= 300 { return string(bodyBytes), fmt.Errorf("patroni returned '%d'", response.StatusCode) } @@ -136,9 +136,9 @@ func (p *Patroni) httpGet(url string) (string, error) { } // Switchover by calling Patroni REST API -func (p *Patroni) Switchover(master *v1.Pod, candidate string) error { +func (p *Patroni) Switchover(master *v1.Pod, candidate string, scheduled_at string) error { buf := &bytes.Buffer{} - err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate}) + err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate, "scheduled_at": scheduled_at}) if err != nil { return fmt.Errorf("could not encode json: %v", err) } @@ -146,7 +146,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error { if err != nil { return err } - return p.httpPostOrPatch(http.MethodPost, apiURLString+failoverPath, buf) + return p.httpPostOrPatch(http.MethodPost, apiURLString+switchoverPath, buf) } //TODO: add an option call /patroni to check if it is necessary to restart the server diff --git a/ui/manifests/deployment.yaml b/ui/manifests/deployment.yaml index e09dd1e4f..3b3097416 100644 --- a/ui/manifests/deployment.yaml +++ b/ui/manifests/deployment.yaml @@ -81,8 +81,6 @@ spec: ] } # Exemple of settings to make snapshot view working in the ui when using AWS - # - name: WALE_S3_ENDPOINT - # value: https+path://s3.us-east-1.amazonaws.com:443 # - name: SPILO_S3_BACKUP_PREFIX # value: spilo/ # - name: AWS_ACCESS_KEY_ID @@ -102,5 +100,3 @@ spec: # key: AWS_DEFAULT_REGION # - name: SPILO_S3_BACKUP_BUCKET # value: - # - name: "USE_AWS_INSTANCE_PROFILE" - # value: "true" diff --git a/ui/operator_ui/main.py b/ui/operator_ui/main.py index e02c2995c..bf28df6eb 100644 --- a/ui/operator_ui/main.py +++ b/ui/operator_ui/main.py @@ -95,14 +95,6 @@ DEFAULT_CPU = getenv('DEFAULT_CPU', '10m') DEFAULT_CPU_LIMIT = getenv('DEFAULT_CPU_LIMIT', '300m') -WALE_S3_ENDPOINT = getenv( - 'WALE_S3_ENDPOINT', - 'https+path://s3.eu-central-1.amazonaws.com:443', -) - -USE_AWS_INSTANCE_PROFILE = ( - getenv('USE_AWS_INSTANCE_PROFILE', 'false').lower() != 'false' -) AWS_ENDPOINT = getenv('AWS_ENDPOINT') @@ -784,8 +776,6 @@ def get_versions(pg_cluster: str): bucket=SPILO_S3_BACKUP_BUCKET, pg_cluster=pg_cluster, prefix=SPILO_S3_BACKUP_PREFIX, - s3_endpoint=WALE_S3_ENDPOINT, - use_aws_instance_profile=USE_AWS_INSTANCE_PROFILE, ), ) @@ -797,9 +787,8 @@ def get_basebackups(pg_cluster: str, uid: str): bucket=SPILO_S3_BACKUP_BUCKET, pg_cluster=pg_cluster, prefix=SPILO_S3_BACKUP_PREFIX, - s3_endpoint=WALE_S3_ENDPOINT, uid=uid, - use_aws_instance_profile=USE_AWS_INSTANCE_PROFILE, + postgresql_versions=OPERATOR_UI_CONFIG.get('postgresql_versions', DEFAULT_UI_CONFIG['postgresql_versions']), ), ) @@ -991,8 +980,6 @@ def main(port, debug, clusters: list): logger.info(f'Superuser team: {SUPERUSER_TEAM}') logger.info(f'Target namespace: {TARGET_NAMESPACE}') logger.info(f'Teamservice URL: {TEAM_SERVICE_URL}') - logger.info(f'Use AWS instance_profile: {USE_AWS_INSTANCE_PROFILE}') - logger.info(f'WAL-E S3 endpoint: {WALE_S3_ENDPOINT}') logger.info(f'AWS S3 endpoint: {AWS_ENDPOINT}') if TARGET_NAMESPACE is None: diff --git a/ui/operator_ui/spiloutils.py b/ui/operator_ui/spiloutils.py index f715430a1..6a2f03bb2 100644 --- a/ui/operator_ui/spiloutils.py +++ b/ui/operator_ui/spiloutils.py @@ -6,9 +6,8 @@ from requests import Session from urllib.parse import urljoin from uuid import UUID -from wal_e.cmd import configure_backup_cxt -from .utils import Attrs, defaulting, these +from .utils import defaulting, these from operator_ui.adapters.logger import logger session = Session() @@ -284,10 +283,8 @@ def read_stored_clusters(bucket, prefix, delimiter='/'): def read_versions( pg_cluster, bucket, - s3_endpoint, prefix, delimiter='/', - use_aws_instance_profile=False, ): return [ 'base' if uid == 'wal' else uid @@ -305,35 +302,72 @@ def read_versions( if uid == 'wal' or defaulting(lambda: UUID(uid)) ] -BACKUP_VERSION_PREFIXES = ['', '10/', '11/', '12/', '13/', '14/', '15/', '16/', '17/'] +def lsn_to_wal_segment_stop(finish_lsn, start_segment, wal_segment_size=16 * 1024 * 1024): + timeline = int(start_segment[:8], 16) + log_id = finish_lsn >> 32 + seg_id = (finish_lsn & 0xFFFFFFFF) // wal_segment_size + return f"{timeline:08X}{log_id:08X}{seg_id:08X}" + +def lsn_to_offset_hex(lsn, wal_segment_size=16 * 1024 * 1024): + return f"{lsn % wal_segment_size:08X}" def read_basebackups( pg_cluster, uid, bucket, - s3_endpoint, prefix, - delimiter='/', - use_aws_instance_profile=False, + postgresql_versions, ): - environ['WALE_S3_ENDPOINT'] = s3_endpoint suffix = '' if uid == 'base' else '/' + uid backups = [] - for vp in BACKUP_VERSION_PREFIXES: - - backups = backups + [ - { - key: value - for key, value in basebackup.__dict__.items() - if isinstance(value, str) or isinstance(value, int) - } - for basebackup in Attrs.call( - f=configure_backup_cxt, - aws_instance_profile=use_aws_instance_profile, - s3_prefix=f's3://{bucket}/{prefix}{pg_cluster}{suffix}/wal/{vp}', - )._backup_list(detail=True) - ] + for vp in postgresql_versions: + backup_prefix = f'{prefix}{pg_cluster}{suffix}/wal/{vp}/basebackups_005/' + logger.info(f"{bucket}/{backup_prefix}") + + paginator = client('s3').get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=backup_prefix) + + for page in pages: + for obj in page.get("Contents", []): + key = obj["Key"] + if not key.endswith("backup_stop_sentinel.json"): + continue + + response = client('s3').get_object(Bucket=bucket, Key=key) + backup_info = loads(response["Body"].read().decode("utf-8")) + last_modified = response["LastModified"].astimezone(timezone.utc).isoformat() + + backup_name = key.split("/")[-1].replace("_backup_stop_sentinel.json", "") + start_seg, start_offset = backup_name.split("_")[1], backup_name.split("_")[-1] if "_" in backup_name else None + + if "LSN" in backup_info and "FinishLSN" in backup_info: + # WAL-G + lsn = backup_info["LSN"] + finish_lsn = backup_info["FinishLSN"] + backups.append({ + "expanded_size_bytes": backup_info.get("UncompressedSize"), + "last_modified": last_modified, + "name": backup_name, + "wal_segment_backup_start": start_seg, + "wal_segment_backup_stop": lsn_to_wal_segment_stop(finish_lsn, start_seg), + "wal_segment_offset_backup_start": lsn_to_offset_hex(lsn), + "wal_segment_offset_backup_stop": lsn_to_offset_hex(finish_lsn), + }) + elif "wal_segment_backup_stop" in backup_info: + # WAL-E + stop_seg = backup_info["wal_segment_backup_stop"] + stop_offset = backup_info["wal_segment_offset_backup_stop"] + + backups.append({ + "expanded_size_bytes": backup_info.get("expanded_size_bytes"), + "last_modified": last_modified, + "name": backup_name, + "wal_segment_backup_start": start_seg, + "wal_segment_backup_stop": stop_seg, + "wal_segment_offset_backup_start": start_offset, + "wal_segment_offset_backup_stop": stop_offset, + }) return backups diff --git a/ui/requirements.txt b/ui/requirements.txt index d3318ceec..783c0aac3 100644 --- a/ui/requirements.txt +++ b/ui/requirements.txt @@ -11,5 +11,4 @@ kubernetes==11.0.0 python-json-logger==2.0.7 requests==2.32.2 stups-tokens>=1.1.19 -wal_e==1.1.1 werkzeug==3.0.6