diff --git a/charts/postgres-operator-ui/values.yaml b/charts/postgres-operator-ui/values.yaml index da3c4baaf..9923ff023 100644 --- a/charts/postgres-operator-ui/values.yaml +++ b/charts/postgres-operator-ui/values.yaml @@ -62,8 +62,6 @@ podAnnotations: extraEnvs: [] # Exemple of settings to make snapshot view working in the ui when using AWS - # - name: WALE_S3_ENDPOINT - # value: https+path://s3.us-east-1.amazonaws.com:443 # - name: SPILO_S3_BACKUP_PREFIX # value: spilo/ # - name: AWS_ACCESS_KEY_ID @@ -83,8 +81,6 @@ extraEnvs: # key: AWS_DEFAULT_REGION # - name: SPILO_S3_BACKUP_BUCKET # value: - # - name: "USE_AWS_INSTANCE_PROFILE" - # value: "true" # configure UI service service: diff --git a/docs/administrator.md b/docs/administrator.md index 9f8e86575..f394b70ab 100644 --- a/docs/administrator.md +++ b/docs/administrator.md @@ -208,9 +208,6 @@ Note that, changes in `SPILO_CONFIGURATION` env variable under `bootstrap.dcs` path are ignored for the diff. They will be applied through Patroni's rest api interface, following a restart of all instances. -Rolling update is postponed until the next maintenance window if any is defined -under the `maintenanceWindows` cluster manifest parameter. - The operator also support lazy updates of the Spilo image. In this case the StatefulSet is only updated, but no rolling update follows. This feature saves you a switchover - and hence downtime - when you know pods are re-started later @@ -387,7 +384,7 @@ exceptions: The interval of days can be set with `password_rotation_interval` (default `90` = 90 days, minimum 1). On each rotation the user name and password values are replaced in the K8s secret. They belong to a newly created user named after -the original role plus rotation date in YYMMDD format. All priviliges are +the original role plus rotation date in YYMMDD format. All privileges are inherited meaning that migration scripts should still grant and revoke rights against the original role. The timestamp of the next rotation (in RFC 3339 format, UTC timezone) is written to the secret as well. Note, if the rotation @@ -567,7 +564,7 @@ manifest affinity. ``` If `node_readiness_label_merge` is set to `"OR"` (default) the readiness label -affinty will be appended with its own expressions block: +affinity will be appended with its own expressions block: ```yaml affinity: @@ -623,22 +620,34 @@ By default the topology key for the pod anti affinity is set to `kubernetes.io/hostname`, you can set another topology key e.g. `failure-domain.beta.kubernetes.io/zone`. See [built-in node labels](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#interlude-built-in-node-labels) for available topology keys. -## Pod Disruption Budget +## Pod Disruption Budgets + +By default the operator creates two PodDisruptionBudgets (PDB) to protect the cluster +from voluntarily disruptions and hence unwanted DB downtime: so-called primary PDB and +and PDB for critical operations. + +### Primary PDB +The `MinAvailable` parameter of this PDB is set to `1` and, if `pdb_master_label_selector` +is enabled, label selector includes `spilo-role=master` condition, which prevents killing +masters in single-node clusters and/or the last remaining running instance in a multi-node +cluster. -By default the operator uses a PodDisruptionBudget (PDB) to protect the cluster -from voluntarily disruptions and hence unwanted DB downtime. The `MinAvailable` -parameter of the PDB is set to `1` which prevents killing masters in single-node -clusters and/or the last remaining running instance in a multi-node cluster. +## PDB for critical operations +The `MinAvailable` parameter of this PDB is equal to the `numberOfInstances` set in the +cluster manifest, while label selector includes `critical-operation=true` condition. This +allows to protect all pods of a cluster, given they are labeled accordingly. +For example, Operator labels all Spilo pods with `critical-operation=true` during the major +version upgrade run. You may want to protect cluster pods during other critical operations +by assigning the label to pods yourself or using other means of automation. The PDB is only relaxed in two scenarios: * If a cluster is scaled down to `0` instances (e.g. for draining nodes) * If the PDB is disabled in the configuration (`enable_pod_disruption_budget`) -The PDB is still in place having `MinAvailable` set to `0`. If enabled it will -be automatically set to `1` on scale up. Disabling PDBs helps avoiding blocking -Kubernetes upgrades in managed K8s environments at the cost of prolonged DB -downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384) +The PDBs are still in place having `MinAvailable` set to `0`. Disabling PDBs +helps avoiding blocking Kubernetes upgrades in managed K8s environments at the +cost of prolonged DB downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384) for the use case. ## Add cluster-specific labels @@ -1131,7 +1140,7 @@ metadata: iam.gke.io/gcp-service-account: @.iam.gserviceaccount.com ``` -2. Specify the new custom service account in your [operator paramaters](./reference/operator_parameters.md) +2. Specify the new custom service account in your [operator parameters](./reference/operator_parameters.md) If using manual deployment or kustomize, this is done by setting `pod_service_account_name` in your configuration file specified in the diff --git a/docs/quickstart.md b/docs/quickstart.md index f080bd567..2d6742354 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -230,7 +230,7 @@ kubectl delete postgresql acid-minimal-cluster ``` This should remove the associated StatefulSet, database Pods, Services and -Endpoints. The PersistentVolumes are released and the PodDisruptionBudget is +Endpoints. The PersistentVolumes are released and the PodDisruptionBudgets are deleted. Secrets however are not deleted and backups will remain in place. When deleting a cluster while it is still starting up or got stuck during that diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index d45bc0948..ab0353202 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -116,7 +116,7 @@ These parameters are grouped directly under the `spec` key in the manifest. * **maintenanceWindows** a list which defines specific time frames when certain maintenance operations - such as automatic major upgrades or rolling updates are allowed. Accepted formats + such as automatic major upgrades or master pod migration. Accepted formats are "01:00-06:00" for daily maintenance windows or "Sat:00:00-04:00" for specific days, with all times in UTC. @@ -247,7 +247,7 @@ These parameters are grouped directly under the `spec` key in the manifest. [kubernetes volumeSource](https://godoc.org/k8s.io/api/core/v1#VolumeSource). It allows you to mount existing PersistentVolumeClaims, ConfigMaps and Secrets inside the StatefulSet. Also an `emptyDir` volume can be shared between initContainer and statefulSet. - Additionaly, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example). + Additionally, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example). Set `isSubPathExpr` to true if you want to include [API environment variables](https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath-expanded-environment). You can also specify in which container the additional Volumes will be mounted with the `targetContainers` array option. If `targetContainers` is empty, additional volumes will be mounted only in the `postgres` container. @@ -257,7 +257,7 @@ These parameters are grouped directly under the `spec` key in the manifest. ## Prepared Databases The operator can create databases with default owner, reader and writer roles -without the need to specifiy them under `users` or `databases` sections. Those +without the need to specify them under `users` or `databases` sections. Those parameters are grouped under the `preparedDatabases` top-level key. For more information, see [user docs](../user.md#prepared-databases-with-roles-and-default-privileges). diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 3bd9e44f7..95bfb4cf3 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -209,7 +209,7 @@ under the `users` key. For all `LOGIN` roles that are not database owners the operator can rotate credentials in the corresponding K8s secrets by replacing the username and password. This means, new users will be added on each rotation inheriting - all priviliges from the original roles. The rotation date (in YYMMDD format) + all privileges from the original roles. The rotation date (in YYMMDD format) is appended to the names of the new user. The timestamp of the next rotation is written to the secret. The default is `false`. @@ -334,13 +334,13 @@ configuration they are grouped under the `kubernetes` key. pod namespace). * **pdb_name_format** - defines the template for PDB (Pod Disruption Budget) names created by the + defines the template for primary PDB (Pod Disruption Budget) name created by the operator. The default is `postgres-{cluster}-pdb`, where `{cluster}` is replaced by the cluster name. Only the `{cluster}` placeholders is allowed in the template. * **pdb_master_label_selector** - By default the PDB will match the master role hence preventing nodes to be + By default the primary PDB will match the master role hence preventing nodes to be drained if the node_readiness_label is not used. If this option if set to `false` the `spilo-role=master` selector will not be added to the PDB. @@ -552,7 +552,7 @@ configuration they are grouped under the `kubernetes` key. pods with `InitialDelaySeconds: 6`, `PeriodSeconds: 10`, `TimeoutSeconds: 5`, `SuccessThreshold: 1` and `FailureThreshold: 3`. When enabling readiness probes it is recommended to switch the `pod_management_policy` to `parallel` - to avoid unneccesary waiting times in case of multiple instances failing. + to avoid unnecessary waiting times in case of multiple instances failing. The default is `false`. * **storage_resize_mode** @@ -701,7 +701,7 @@ In the CRD-based configuration they are grouped under the `load_balancer` key. replaced by the cluster name, `{namespace}` is replaced with the namespace and `{hostedzone}` is replaced with the hosted zone (the value of the `db_hosted_zone` parameter). The `{team}` placeholder can still be used, - although it is not recommened because the team of a cluster can change. + although it is not recommended because the team of a cluster can change. If the cluster name starts with the `teamId` it will also be part of the DNS, aynway. No other placeholders are allowed! @@ -720,7 +720,7 @@ In the CRD-based configuration they are grouped under the `load_balancer` key. is replaced by the cluster name, `{namespace}` is replaced with the namespace and `{hostedzone}` is replaced with the hosted zone (the value of the `db_hosted_zone` parameter). The `{team}` placeholder can still be used, - although it is not recommened because the team of a cluster can change. + although it is not recommended because the team of a cluster can change. If the cluster name starts with the `teamId` it will also be part of the DNS, aynway. No other placeholders are allowed! diff --git a/docs/user.md b/docs/user.md index c63e43f57..c1a7c7d45 100644 --- a/docs/user.md +++ b/docs/user.md @@ -900,7 +900,7 @@ the PostgreSQL version between source and target cluster has to be the same. To start a cluster as standby, add the following `standby` section in the YAML file. You can stream changes from archived WAL files (AWS S3 or Google Cloud -Storage) or from a remote primary. Only one option can be specfied in the +Storage) or from a remote primary. Only one option can be specified in the manifest: ```yaml @@ -911,7 +911,7 @@ spec: For GCS, you have to define STANDBY_GOOGLE_APPLICATION_CREDENTIALS as a [custom pod environment variable](administrator.md#custom-pod-environment-variables). -It is not set from the config to allow for overridding. +It is not set from the config to allow for overriding. ```yaml spec: @@ -1282,7 +1282,7 @@ minutes if the certificates have changed and reloads postgres accordingly. ### TLS certificates for connection pooler By default, the pgBouncer image generates its own TLS certificate like Spilo. -When the `tls` section is specfied in the manifest it will be used for the +When the `tls` section is specified in the manifest it will be used for the connection pooler pod(s) as well. The security context options are hard coded to `runAsUser: 100` and `runAsGroup: 101`. The `fsGroup` will be the same like for Spilo. diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 4743bb4b3..b9a2a27d4 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -1187,7 +1187,7 @@ def test_major_version_upgrade(self): Test major version upgrade: with full upgrade, maintenance window, and annotation """ def check_version(): - p = k8s.patroni_rest("acid-upgrade-test-0", "") + p = k8s.patroni_rest("acid-upgrade-test-0", "") or {} version = p.get("server_version", 0) // 10000 return version @@ -1237,7 +1237,7 @@ def get_annotations(): # should not upgrade because current time is not in maintenanceWindow current_time = datetime.now() maintenance_window_future = f"{(current_time+timedelta(minutes=60)).strftime('%H:%M')}-{(current_time+timedelta(minutes=120)).strftime('%H:%M')}" - pg_patch_version_15 = { + pg_patch_version_15_outside_mw = { "spec": { "postgresql": { "version": "15" @@ -1248,7 +1248,7 @@ def get_annotations(): } } k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15) + "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_outside_mw) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") # no pod replacement outside of the maintenance window @@ -1259,12 +1259,12 @@ def get_annotations(): second_annotations = get_annotations() self.assertIsNone(second_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure should not be set") - # change the version again to trigger operator sync + # change maintenanceWindows to current maintenance_window_current = f"{(current_time-timedelta(minutes=30)).strftime('%H:%M')}-{(current_time+timedelta(minutes=30)).strftime('%H:%M')}" - pg_patch_version_16 = { + pg_patch_version_15_in_mw = { "spec": { "postgresql": { - "version": "16" + "version": "15" }, "maintenanceWindows": [ maintenance_window_current @@ -1273,13 +1273,13 @@ def get_annotations(): } k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_16) + "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - self.eventuallyEqual(check_version, 16, "Version should be upgraded from 14 to 16") + self.eventuallyEqual(check_version, 15, "Version should be upgraded from 14 to 15") # check if annotation for last upgrade's success is updated after second upgrade third_annotations = get_annotations() @@ -1306,16 +1306,17 @@ def get_annotations(): k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label) k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - self.eventuallyEqual(check_version, 16, "Version should not be upgraded because annotation for last upgrade's failure is set") + self.eventuallyEqual(check_version, 15, "Version should not be upgraded because annotation for last upgrade's failure is set") # change the version back to 15 and should remove failure annotation k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15) + "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + self.eventuallyEqual(check_version, 15, "Version should not be upgraded from 15") fourth_annotations = get_annotations() self.assertIsNone(fourth_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure is not removed") @@ -1751,9 +1752,13 @@ def test_password_rotation(self): Test password rotation and removal of users due to retention policy ''' k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' leader = k8s.get_cluster_leader_pod() today = date.today() + # remember number of secrets to make sure it stays the same + secret_count = k8s.count_secrets_with_label(cluster_label) + # enable password rotation for owner of foo database pg_patch_rotation_single_users = { "spec": { @@ -1809,6 +1814,7 @@ def test_password_rotation(self): enable_password_rotation = { "data": { "enable_password_rotation": "true", + "inherited_annotations": "environment", "password_rotation_interval": "30", "password_rotation_user_retention": "30", # should be set to 60 }, @@ -1855,13 +1861,29 @@ def test_password_rotation(self): self.eventuallyEqual(lambda: len(self.query_database_with_user(leader.metadata.name, "postgres", "SELECT 1", "foo_user")), 1, "Could not connect to the database with rotation user {}".format(rotation_user), 10, 5) + # add annotation which triggers syncSecrets call + pg_annotation_patch = { + "metadata": { + "annotations": { + "environment": "test", + } + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_annotation_patch) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + time.sleep(10) + self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), secret_count, "Unexpected number of secrets") + # check if rotation has been ignored for user from test_cross_namespace_secrets test db_user_secret = k8s.get_secret(username="test.db_user", namespace="test") secret_username = str(base64.b64decode(db_user_secret.data["username"]), 'utf-8') - self.assertEqual("test.db_user", secret_username, "Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username)) + # check if annotation for secret has been updated + self.assertTrue("environment" in db_user_secret.metadata.annotations, "Added annotation was not propagated to secret") + # disable password rotation for all other users (foo_user) # and pick smaller intervals to see if the third fake rotation user is dropped enable_password_rotation = { @@ -2099,7 +2121,7 @@ def test_statefulset_annotation_propagation(self): patch_sset_propagate_annotations = { "data": { "downscaler_annotations": "deployment-time,downscaler/*", - "inherited_annotations": "owned-by", + "inherited_annotations": "environment,owned-by", } } k8s.update_config(patch_sset_propagate_annotations) @@ -2546,7 +2568,10 @@ def check_cluster_child_resources_owner_references(self, cluster_name, cluster_n self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed") pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace) - self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "primary pod disruption budget owner reference check failed") + + pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-op-pdb".format(cluster_name), cluster_namespace) + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget for critical operations owner reference check failed") pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed") diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f835eaa00..e9a691faa 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -59,16 +59,17 @@ type Config struct { } type kubeResources struct { - Services map[PostgresRole]*v1.Service - Endpoints map[PostgresRole]*v1.Endpoints - PatroniEndpoints map[string]*v1.Endpoints - PatroniConfigMaps map[string]*v1.ConfigMap - Secrets map[types.UID]*v1.Secret - Statefulset *appsv1.StatefulSet - VolumeClaims map[types.UID]*v1.PersistentVolumeClaim - PodDisruptionBudget *policyv1.PodDisruptionBudget - LogicalBackupJob *batchv1.CronJob - Streams map[string]*zalandov1.FabricEventStream + Services map[PostgresRole]*v1.Service + Endpoints map[PostgresRole]*v1.Endpoints + PatroniEndpoints map[string]*v1.Endpoints + PatroniConfigMaps map[string]*v1.ConfigMap + Secrets map[types.UID]*v1.Secret + Statefulset *appsv1.StatefulSet + VolumeClaims map[types.UID]*v1.PersistentVolumeClaim + PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget + CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget + LogicalBackupJob *batchv1.CronJob + Streams map[string]*zalandov1.FabricEventStream //Pods are treated separately } @@ -105,10 +106,17 @@ type Cluster struct { } type compareStatefulsetResult struct { - match bool - replace bool - rollingUpdate bool - reasons []string + match bool + replace bool + rollingUpdate bool + reasons []string + deletedPodAnnotations []string +} + +type compareLogicalBackupJobResult struct { + match bool + reasons []string + deletedPodAnnotations []string } // New creates a new cluster. This function should be called from a controller. @@ -336,14 +344,10 @@ func (c *Cluster) Create() (err error) { c.logger.Infof("secrets have been successfully created") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") - if c.PodDisruptionBudget != nil { - return fmt.Errorf("pod disruption budget already exists in the cluster") + if err = c.createPodDisruptionBudgets(); err != nil { + return fmt.Errorf("could not create pod disruption budgets: %v", err) } - pdb, err := c.createPodDisruptionBudget() - if err != nil { - return fmt.Errorf("could not create pod disruption budget: %v", err) - } - c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta)) + c.logger.Info("pod disruption budgets have been successfully created") if c.Statefulset != nil { return fmt.Errorf("statefulset already exists in the cluster") @@ -431,6 +435,7 @@ func (c *Cluster) Create() (err error) { } func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compareStatefulsetResult { + deletedPodAnnotations := []string{} reasons := make([]string, 0) var match, needsRollUpdate, needsReplace bool @@ -445,7 +450,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa needsReplace = true reasons = append(reasons, "new statefulset's ownerReferences do not match") } - if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations); changed { + if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations, nil); changed { match = false needsReplace = true reasons = append(reasons, "new statefulset's annotations do not match: "+reason) @@ -519,7 +524,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa } } - if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed { + if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations, &deletedPodAnnotations); changed { match = false needsReplace = true reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason) @@ -541,7 +546,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i)) continue } - if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed { + if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations, nil); changed { needsReplace = true reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q do not match the current ones: %s", name, reason)) } @@ -579,7 +584,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa match = false } - return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace} + return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace, deletedPodAnnotations: deletedPodAnnotations} } type containerCondition func(a, b v1.Container) bool @@ -781,7 +786,7 @@ func volumeMountExists(mount v1.VolumeMount, mounts []v1.VolumeMount) bool { return false } -func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) { +func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]string) (bool, string) { reason := "" ignoredAnnotations := make(map[string]bool) for _, ignore := range c.OpConfig.IgnoredAnnotations { @@ -794,6 +799,9 @@ func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) } if _, ok := new[key]; !ok { reason += fmt.Sprintf(" Removed %q.", key) + if removedList != nil { + *removedList = append(*removedList, key) + } } } @@ -836,41 +844,46 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { return true, "" } -func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) { +func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLogicalBackupJobResult { + deletedPodAnnotations := []string{} + reasons := make([]string, 0) + match := true if cur.Spec.Schedule != new.Spec.Schedule { - return false, fmt.Sprintf("new job's schedule %q does not match the current one %q", - new.Spec.Schedule, cur.Spec.Schedule) + match = false + reasons = append(reasons, fmt.Sprintf("new job's schedule %q does not match the current one %q", new.Spec.Schedule, cur.Spec.Schedule)) } newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image curImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image if newImage != curImage { - return false, fmt.Sprintf("new job's image %q does not match the current one %q", - newImage, curImage) + match = false + reasons = append(reasons, fmt.Sprintf("new job's image %q does not match the current one %q", newImage, curImage)) } newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations - if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed { - return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason) + if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation, &deletedPodAnnotations); changed { + match = false + reasons = append(reasons, fmt.Sprint("new job's pod template metadata annotations do not match "+reason)) } newPgVersion := getPgVersion(new) curPgVersion := getPgVersion(cur) if newPgVersion != curPgVersion { - return false, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", - newPgVersion, curPgVersion) + match = false + reasons = append(reasons, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", newPgVersion, curPgVersion)) } needsReplace := false - reasons := make([]string, 0) - needsReplace, reasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, reasons) + contReasons := make([]string, 0) + needsReplace, contReasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, contReasons) if needsReplace { - return false, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(reasons, `', '`)) + match = false + reasons = append(reasons, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(contReasons, `', '`))) } - return true, "" + return &compareLogicalBackupJobResult{match: match, reasons: reasons, deletedPodAnnotations: deletedPodAnnotations} } func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) { @@ -881,7 +894,7 @@ func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBud if !reflect.DeepEqual(new.ObjectMeta.OwnerReferences, cur.ObjectMeta.OwnerReferences) { return false, "new PDB's owner references do not match the current ones" } - if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed { + if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations, nil); changed { return false, "new PDB's annotations do not match the current ones:" + reason } return true, "" @@ -957,6 +970,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { defer c.mu.Unlock() c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating) + + if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) { + // do not apply any major version related changes yet + newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion + } c.setSpec(newSpec) defer func() { @@ -1016,10 +1034,18 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // only when streams were not specified in oldSpec but in newSpec needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 - annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations) - initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser - if initUsers { + + // if inherited annotations differ secrets have to be synced on update + newAnnotations := c.annotationsSet(nil) + oldAnnotations := make(map[string]string) + for _, secret := range c.Secrets { + oldAnnotations = secret.ObjectMeta.Annotations + break + } + annotationsChanged, _ := c.compareAnnotations(oldAnnotations, newAnnotations, nil) + + if initUsers || annotationsChanged { c.logger.Debug("initialize users") if err := c.initUsers(); err != nil { c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err) @@ -1027,8 +1053,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true return } - } - if initUsers || annotationsChanged { + c.logger.Debug("syncing secrets") //TODO: mind the secrets of the deleted/new users if err := c.syncSecrets(); err != nil { @@ -1060,9 +1085,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } - // pod disruption budget - if err := c.syncPodDisruptionBudget(true); err != nil { - c.logger.Errorf("could not sync pod disruption budget: %v", err) + // pod disruption budgets + if err := c.syncPodDisruptionBudgets(true); err != nil { + c.logger.Errorf("could not sync pod disruption budgets: %v", err) updateFailed = true } @@ -1135,6 +1160,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // streams if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) { + c.logger.Debug("syncing streams") if err := c.syncStreams(); err != nil { c.logger.Errorf("could not sync streams: %v", err) updateFailed = true @@ -1207,10 +1233,10 @@ func (c *Cluster) Delete() error { c.logger.Info("not deleting secrets because disabled in configuration") } - if err := c.deletePodDisruptionBudget(); err != nil { + if err := c.deletePodDisruptionBudgets(); err != nil { anyErrors = true - c.logger.Warningf("could not delete pod disruption budget: %v", err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err) + c.logger.Warningf("could not delete pod disruption budgets: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budgets: %v", err) } for _, role := range []PostgresRole{Master, Replica} { @@ -1709,16 +1735,17 @@ func (c *Cluster) GetCurrentProcess() Process { // GetStatus provides status of the cluster func (c *Cluster) GetStatus() *ClusterStatus { status := &ClusterStatus{ - Cluster: c.Name, - Namespace: c.Namespace, - Team: c.Spec.TeamID, - Status: c.Status, - Spec: c.Spec, - MasterService: c.GetServiceMaster(), - ReplicaService: c.GetServiceReplica(), - StatefulSet: c.GetStatefulSet(), - PodDisruptionBudget: c.GetPodDisruptionBudget(), - CurrentProcess: c.GetCurrentProcess(), + Cluster: c.Name, + Namespace: c.Namespace, + Team: c.Spec.TeamID, + Status: c.Status, + Spec: c.Spec, + MasterService: c.GetServiceMaster(), + ReplicaService: c.GetServiceReplica(), + StatefulSet: c.GetStatefulSet(), + PrimaryPodDisruptionBudget: c.GetPrimaryPodDisruptionBudget(), + CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(), + CurrentProcess: c.GetCurrentProcess(), Error: fmt.Errorf("error: %s", c.Error), } @@ -1761,28 +1788,28 @@ func (c *Cluster) GetSwitchoverSchedule() string { } // Switchover does a switchover (via Patroni) to a candidate pod -func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error { +func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName, scheduled bool) error { var err error - c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) - - if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) { - c.logger.Infof("postponing switchover, not in maintenance window") - schedule := c.GetSwitchoverSchedule() - - if err := c.patroni.Switchover(curMaster, candidate.Name, schedule); err != nil { - return fmt.Errorf("could not schedule switchover: %v", err) - } - c.logger.Infof("switchover is scheduled at %s", schedule) - return nil - } - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) stopCh := make(chan struct{}) ch := c.registerPodSubscriber(candidate) defer c.unregisterPodSubscriber(candidate) defer close(stopCh) - if err = c.patroni.Switchover(curMaster, candidate.Name, ""); err == nil { + var scheduled_at string + if scheduled { + scheduled_at = c.GetSwitchoverSchedule() + } else { + c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) + scheduled_at = "" + } + + if err = c.patroni.Switchover(curMaster, candidate.Name, scheduled_at); err == nil { + if scheduled { + c.logger.Infof("switchover from %q to %q is scheduled at %s", curMaster.Name, candidate, scheduled_at) + return nil + } c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) _, err = c.waitForPodLabel(ch, stopCh, nil) @@ -1790,6 +1817,9 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e err = fmt.Errorf("could not get master pod label: %v", err) } } else { + if scheduled { + return fmt.Errorf("could not schedule switchover: %v", err) + } err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 9fb7f348e..09d9df972 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -1680,12 +1680,20 @@ func TestCompareLogicalBackupJob(t *testing.T) { } } - match, reason := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob) - if match != tt.match { - t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), match, currentCronJob, desiredCronJob) - } else { - if !strings.HasPrefix(reason, tt.reason) { - t.Errorf("%s - expected reason prefix %s, found %s", t.Name(), tt.reason, reason) + cmp := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob) + if cmp.match != tt.match { + t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), cmp.match, currentCronJob, desiredCronJob) + } else if !cmp.match { + found := false + for _, reason := range cmp.reasons { + if strings.HasPrefix(reason, tt.reason) { + found = true + break + } + found = false + } + if !found { + t.Errorf("%s - expected reason prefix %s, not found in %#v", t.Name(), tt.reason, cmp.reasons) } } }) diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 6cd46f745..ac4ce67d8 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "encoding/json" "fmt" "reflect" "strings" @@ -977,6 +978,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql err error ) + updatedPodAnnotations := map[string]*string{} syncReason := make([]string, 0) deployment, err = c.KubeClient. Deployments(c.Namespace). @@ -1038,9 +1040,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql } newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec)) - if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed { + deletedPodAnnotations := []string{} + if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations, &deletedPodAnnotations); changed { specSync = true syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...) + + for _, anno := range deletedPodAnnotations { + updatedPodAnnotations[anno] = nil + } + templateMetadataReq := map[string]map[string]map[string]map[string]map[string]*string{ + "spec": {"template": {"metadata": {"annotations": updatedPodAnnotations}}}} + patch, err := json.Marshal(templateMetadataReq) + if err != nil { + return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pod template: %v", role, err) + } + deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(), + deployment.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "") + if err != nil { + c.logger.Errorf("failed to patch %s connection pooler's pod template: %v", role, err) + return nil, err + } + deployment.Spec.Template.Annotations = newPodAnnotations } @@ -1064,7 +1084,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql } newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations - if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations); changed { + if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations, nil); changed { deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations) if err != nil { return nil, err @@ -1098,14 +1118,20 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return nil, fmt.Errorf("could not delete pooler pod: %v", err) } - } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { - patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) + } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations, nil); changed { + metadataReq := map[string]map[string]map[string]*string{"metadata": {}} + + for anno, val := range deployment.Spec.Template.Annotations { + updatedPodAnnotations[anno] = &val + } + metadataReq["metadata"]["annotations"] = updatedPodAnnotations + patch, err := json.Marshal(metadataReq) if err != nil { - return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) + return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pods: %v", role, err) } - _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) if err != nil { - return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) + return nil, fmt.Errorf("could not patch annotations for %s connection pooler's pod %q: %v", role, pod.Name, err) } } } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index ff5536303..fedd6a917 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -109,10 +109,15 @@ func (c *Cluster) servicePort(role PostgresRole) int32 { return pgPort } -func (c *Cluster) podDisruptionBudgetName() string { +func (c *Cluster) PrimaryPodDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } +func (c *Cluster) criticalOpPodDisruptionBudgetName() string { + pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-op-pdb") + return pdbTemplate.Format("cluster", c.Name) +} + func makeDefaultResources(config *config.Config) acidv1.Resources { defaultRequests := acidv1.ResourceDescription{ @@ -1005,6 +1010,9 @@ func (c *Cluster) generateSpiloPodEnvVars( if c.patroniUsesKubernetes() { envVars = append(envVars, v1.EnvVar{Name: "DCS_ENABLE_KUBERNETES_API", Value: "true"}) + if c.OpConfig.EnablePodDisruptionBudget != nil && *c.OpConfig.EnablePodDisruptionBudget { + envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_BOOTSTRAP_LABELS", Value: "{\"critical-operation\":\"true\"}"}) + } } else { envVars = append(envVars, v1.EnvVar{Name: "ETCD_HOST", Value: c.OpConfig.EtcdHost}) } @@ -2207,7 +2215,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript return result } -func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { +func (c *Cluster) generatePrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget { minAvailable := intstr.FromInt(1) pdbEnabled := c.OpConfig.EnablePodDisruptionBudget pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector @@ -2225,7 +2233,36 @@ func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { return &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ - Name: c.podDisruptionBudgetName(), + Name: c.PrimaryPodDisruptionBudgetName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + Annotations: c.annotationsSet(nil), + OwnerReferences: c.ownerReferences(), + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + }, + } +} + +func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { + minAvailable := intstr.FromInt32(c.Spec.NumberOfInstances) + pdbEnabled := c.OpConfig.EnablePodDisruptionBudget + + // if PodDisruptionBudget is disabled or if there are no DB pods, set the budget to 0. + if (pdbEnabled != nil && !(*pdbEnabled)) || c.Spec.NumberOfInstances <= 0 { + minAvailable = intstr.FromInt(0) + } + + labels := c.labelsSet(false) + labels["critical-operation"] = "true" + + return &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.criticalOpPodDisruptionBudgetName(), Namespace: c.Namespace, Labels: c.labelsSet(true), Annotations: c.annotationsSet(nil), diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 612e4525a..137c24081 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2349,22 +2349,34 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } } - testLabelsAndSelectors := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { - masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector - if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { - return fmt.Errorf("Object Namespace incorrect.") - } - if !reflect.DeepEqual(podDisruptionBudget.Labels, map[string]string{"team": "myapp", "cluster-name": "myapp-database"}) { - return fmt.Errorf("Labels incorrect.") - } - if !masterLabelSelectorDisabled && - !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, &metav1.LabelSelector{ - MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}) { + testLabelsAndSelectors := func(isPrimary bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { + return func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { + masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector + if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { + return fmt.Errorf("Object Namespace incorrect.") + } + expectedLabels := map[string]string{"team": "myapp", "cluster-name": "myapp-database"} + if !reflect.DeepEqual(podDisruptionBudget.Labels, expectedLabels) { + return fmt.Errorf("Labels incorrect, got %#v, expected %#v", podDisruptionBudget.Labels, expectedLabels) + } + if !masterLabelSelectorDisabled { + if isPrimary { + expectedLabels := &metav1.LabelSelector{ + MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}} + if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { + return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels) + } + } else { + expectedLabels := &metav1.LabelSelector{ + MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operation": "true"}} + if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { + return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels) + } + } + } - return fmt.Errorf("MatchLabels incorrect.") + return nil } - - return nil } testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { @@ -2400,7 +2412,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2417,7 +2429,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(0), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2434,7 +2446,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(0), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2451,7 +2463,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-databass-budget"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2468,7 +2480,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2485,13 +2497,99 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, } for _, tt := range tests { - result := tt.spec.generatePodDisruptionBudget() + result := tt.spec.generatePrimaryPodDisruptionBudget() + for _, check := range tt.check { + err := check(tt.spec, result) + if err != nil { + t.Errorf("%s [%s]: PodDisruptionBudget spec is incorrect, %+v", + testName, tt.scenario, err) + } + } + } + + testCriticalOp := []struct { + scenario string + spec *Cluster + check []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error + }{ + { + scenario: "With multiple instances", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-op-pdb"), + hasMinAvailable(3), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With zero instances", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-op-pdb"), + hasMinAvailable(0), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With PodDisruptionBudget disabled", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-op-pdb"), + hasMinAvailable(0), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With OwnerReference enabled", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-op-pdb"), + hasMinAvailable(3), + testLabelsAndSelectors(false), + }, + }, + } + + for _, tt := range testCriticalOp { + result := tt.spec.generateCriticalOpPodDisruptionBudget() for _, check := range tt.check { err := check(tt.spec, result) if err != nil { diff --git a/pkg/cluster/majorversionupgrade.go b/pkg/cluster/majorversionupgrade.go index b75702bcd..d8a1fb917 100644 --- a/pkg/cluster/majorversionupgrade.go +++ b/pkg/cluster/majorversionupgrade.go @@ -106,6 +106,22 @@ func (c *Cluster) removeFailuresAnnotation() error { return nil } +func (c *Cluster) criticalOperationLabel(pods []v1.Pod, value *string) error { + metadataReq := map[string]map[string]map[string]*string{"metadata": {"labels": {"critical-operation": value}}} + + patchReq, err := json.Marshal(metadataReq) + if err != nil { + return fmt.Errorf("could not marshal ObjectMeta: %v", err) + } + for _, pod := range pods { + _, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchReq, metav1.PatchOptions{}) + if err != nil { + return err + } + } + return nil +} + /* Execute upgrade when mode is set to manual or full or when the owning team is allowed for upgrade (and mode is "off"). @@ -224,6 +240,17 @@ func (c *Cluster) majorVersionUpgrade() error { if allRunning && masterPod != nil { c.logger.Infof("healthy cluster ready to upgrade, current: %d desired: %d", c.currentMajorVersion, desiredVersion) if c.currentMajorVersion < desiredVersion { + defer func() error { + if err = c.criticalOperationLabel(pods, nil); err != nil { + return fmt.Errorf("failed to remove critical-operation label: %s", err) + } + return nil + }() + val := "true" + if err = c.criticalOperationLabel(pods, &val); err != nil { + return fmt.Errorf("failed to assign critical-operation label: %s", err) + } + podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name} c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods) diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index bd2172c18..7fc95090e 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -280,11 +280,16 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { return fmt.Errorf("could not move pod: %v", err) } + scheduleSwitchover := false + if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) { + c.logger.Infof("postponing switchover, not in maintenance window") + scheduleSwitchover = true + } err = retryutil.Retry(1*time.Minute, 5*time.Minute, func() (bool, error) { - err := c.Switchover(oldMaster, masterCandidateName) + err := c.Switchover(oldMaster, masterCandidateName, scheduleSwitchover) if err != nil { - c.logger.Errorf("could not failover to pod %q: %v", masterCandidateName, err) + c.logger.Errorf("could not switchover to pod %q: %v", masterCandidateName, err) return false, nil } return true, nil @@ -445,7 +450,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp // do not recreate master now so it will keep the update flag and switchover will be retried on next sync return fmt.Errorf("skipping switchover: %v", err) } - if err := c.Switchover(masterPod, masterCandidate); err != nil { + if err := c.Switchover(masterPod, masterCandidate, false); err != nil { return fmt.Errorf("could not perform switch over: %v", err) } } else if newMasterPod == nil && len(replicas) == 0 { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 85711dbd1..2c87efe47 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -23,8 +23,13 @@ const ( ) func (c *Cluster) listResources() error { - if c.PodDisruptionBudget != nil { - c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) + if c.PrimaryPodDisruptionBudget != nil { + c.logger.Infof("found primary pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta), c.PrimaryPodDisruptionBudget.UID) + } + + if c.CriticalOpPodDisruptionBudget != nil { + c.logger.Infof("found pod disruption budget for critical operations: %q (uid: %q)", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta), c.CriticalOpPodDisruptionBudget.UID) + } if c.Statefulset != nil { @@ -329,7 +334,7 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe } } - if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed { + if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations, nil); changed { patchData, err := metaAnnotationsPatch(newService.Annotations) if err != nil { return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err) @@ -417,59 +422,166 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset return result } -func (c *Cluster) createPodDisruptionBudget() (*policyv1.PodDisruptionBudget, error) { - podDisruptionBudgetSpec := c.generatePodDisruptionBudget() +func (c *Cluster) createPrimaryPodDisruptionBudget() error { + c.logger.Debug("creating primary pod disruption budget") + if c.PrimaryPodDisruptionBudget != nil { + c.logger.Warning("primary pod disruption budget already exists in the cluster") + return nil + } + + podDisruptionBudgetSpec := c.generatePrimaryPodDisruptionBudget() podDisruptionBudget, err := c.KubeClient. PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) if err != nil { - return nil, err + return err + } + c.logger.Infof("primary pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.PrimaryPodDisruptionBudget = podDisruptionBudget + + return nil +} + +func (c *Cluster) createCriticalOpPodDisruptionBudget() error { + c.logger.Debug("creating pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget != nil { + c.logger.Warning("pod disruption budget for critical operations already exists in the cluster") + return nil + } + + podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget() + podDisruptionBudget, err := c.KubeClient. + PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). + Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) + + if err != nil { + return err + } + c.logger.Infof("pod disruption budget for critical operations %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = podDisruptionBudget + + return nil +} + +func (c *Cluster) createPodDisruptionBudgets() error { + errors := make([]string, 0) + + err := c.createPrimaryPodDisruptionBudget() + if err != nil { + errors = append(errors, fmt.Sprintf("could not create primary pod disruption budget: %v", err)) + } + + err = c.createCriticalOpPodDisruptionBudget() + if err != nil { + errors = append(errors, fmt.Sprintf("could not create pod disruption budget for critical operations: %v", err)) + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil +} + +func (c *Cluster) updatePrimaryPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating primary pod disruption budget") + if c.PrimaryPodDisruptionBudget == nil { + return fmt.Errorf("there is no primary pod disruption budget in the cluster") + } + + if err := c.deletePrimaryPodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete primary pod disruption budget: %v", err) + } + + newPdb, err := c.KubeClient. + PodDisruptionBudgets(pdb.Namespace). + Create(context.TODO(), pdb, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("could not create primary pod disruption budget: %v", err) } - c.PodDisruptionBudget = podDisruptionBudget + c.PrimaryPodDisruptionBudget = newPdb - return podDisruptionBudget, nil + return nil } -func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { - if c.PodDisruptionBudget == nil { - return fmt.Errorf("there is no pod disruption budget in the cluster") +func (c *Cluster) updateCriticalOpPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget == nil { + return fmt.Errorf("there is no pod disruption budget for critical operations in the cluster") } - if err := c.deletePodDisruptionBudget(); err != nil { - return fmt.Errorf("could not delete pod disruption budget: %v", err) + if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) } newPdb, err := c.KubeClient. PodDisruptionBudgets(pdb.Namespace). Create(context.TODO(), pdb, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("could not create pod disruption budget: %v", err) + return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err) + } + c.CriticalOpPodDisruptionBudget = newPdb + + return nil +} + +func (c *Cluster) deletePrimaryPodDisruptionBudget() error { + c.logger.Debug("deleting primary pod disruption budget") + if c.PrimaryPodDisruptionBudget == nil { + c.logger.Debug("there is no primary pod disruption budget in the cluster") + return nil + } + + pdbName := util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta) + err := c.KubeClient. + PodDisruptionBudgets(c.PrimaryPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.PrimaryPodDisruptionBudget.Name, c.deleteOptions) + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)) + } else if err != nil { + return fmt.Errorf("could not delete primary pod disruption budget: %v", err) + } + + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)) + c.PrimaryPodDisruptionBudget = nil + + err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + func() (bool, error) { + _, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{}) + if err2 == nil { + return false, nil + } + if k8sutil.ResourceNotFound(err2) { + return true, nil + } + return false, err2 + }) + if err != nil { + return fmt.Errorf("could not delete primary pod disruption budget: %v", err) } - c.PodDisruptionBudget = newPdb return nil } -func (c *Cluster) deletePodDisruptionBudget() error { - c.logger.Debug("deleting pod disruption budget") - if c.PodDisruptionBudget == nil { - c.logger.Debug("there is no pod disruption budget in the cluster") +func (c *Cluster) deleteCriticalOpPodDisruptionBudget() error { + c.logger.Debug("deleting pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget == nil { + c.logger.Debug("there is no pod disruption budget for critical operations in the cluster") return nil } - pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta) + pdbName := util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta) err := c.KubeClient. - PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). - Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions) + PodDisruptionBudgets(c.CriticalOpPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.CriticalOpPodDisruptionBudget.Name, c.deleteOptions) if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)) } else if err != nil { - return fmt.Errorf("could not delete PodDisruptionBudget: %v", err) + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) } - c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) - c.PodDisruptionBudget = nil + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = nil err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { @@ -483,12 +595,29 @@ func (c *Cluster) deletePodDisruptionBudget() error { return false, err2 }) if err != nil { - return fmt.Errorf("could not delete pod disruption budget: %v", err) + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) } return nil } +func (c *Cluster) deletePodDisruptionBudgets() error { + errors := make([]string, 0) + + if err := c.deletePrimaryPodDisruptionBudget(); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil +} + func (c *Cluster) deleteEndpoint(role PostgresRole) error { c.setProcessName("deleting endpoint") c.logger.Debugf("deleting %s endpoint", role) @@ -705,7 +834,12 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { return c.Statefulset } -// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget -func (c *Cluster) GetPodDisruptionBudget() *policyv1.PodDisruptionBudget { - return c.PodDisruptionBudget +// GetPrimaryPodDisruptionBudget returns cluster's primary kubernetes PodDisruptionBudget +func (c *Cluster) GetPrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.PrimaryPodDisruptionBudget +} + +// GetCriticalOpPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations +func (c *Cluster) GetCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.CriticalOpPodDisruptionBudget } diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 9e2c7482a..bf9be3fb4 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -114,10 +114,10 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za } for slotName, slotAndPublication := range databaseSlotsList { - tables := slotAndPublication.Publication - tableNames := make([]string, len(tables)) + newTables := slotAndPublication.Publication + tableNames := make([]string, len(newTables)) i := 0 - for t := range tables { + for t := range newTables { tableName, schemaName := getTableSchema(t) tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) i++ @@ -126,6 +126,12 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za tableList := strings.Join(tableNames, ", ") currentTables, exists := currentPublications[slotName] + // if newTables is empty it means that it's definition was removed from streams section + // but when slot is defined in manifest we should sync publications, too + // by reusing current tables we make sure it is not + if len(newTables) == 0 { + tableList = currentTables + } if !exists { createPublications[slotName] = tableList } else if currentTables != tableList { @@ -350,16 +356,8 @@ func (c *Cluster) syncStreams() error { return nil } - databaseSlots := make(map[string]map[string]zalandov1.Slot) - slotsToSync := make(map[string]map[string]string) - requiredPatroniConfig := c.Spec.Patroni - - if len(requiredPatroniConfig.Slots) > 0 { - for slotName, slotConfig := range requiredPatroniConfig.Slots { - slotsToSync[slotName] = slotConfig - } - } - + // create map with every database and empty slot defintion + // we need it to detect removal of streams from databases if err := c.initDbConn(); err != nil { return fmt.Errorf("could not init database connection") } @@ -372,13 +370,28 @@ func (c *Cluster) syncStreams() error { if err != nil { return fmt.Errorf("could not get list of databases: %v", err) } - // get database name with empty list of slot, except template0 and template1 + databaseSlots := make(map[string]map[string]zalandov1.Slot) for dbName := range listDatabases { if dbName != "template0" && dbName != "template1" { databaseSlots[dbName] = map[string]zalandov1.Slot{} } } + // need to take explicitly defined slots into account whey syncing Patroni config + slotsToSync := make(map[string]map[string]string) + requiredPatroniConfig := c.Spec.Patroni + if len(requiredPatroniConfig.Slots) > 0 { + for slotName, slotConfig := range requiredPatroniConfig.Slots { + slotsToSync[slotName] = slotConfig + if _, exists := databaseSlots[slotConfig["database"]]; exists { + databaseSlots[slotConfig["database"]][slotName] = zalandov1.Slot{ + Slot: slotConfig, + Publication: make(map[string]acidv1.StreamTable), + } + } + } + } + // get list of required slots and publications, group by database for _, stream := range c.Spec.Streams { if _, exists := databaseSlots[stream.Database]; !exists { @@ -391,13 +404,13 @@ func (c *Cluster) syncStreams() error { "type": "logical", } slotName := getSlotName(stream.Database, stream.ApplicationId) - if _, exists := databaseSlots[stream.Database][slotName]; !exists { + slotAndPublication, exists := databaseSlots[stream.Database][slotName] + if !exists { databaseSlots[stream.Database][slotName] = zalandov1.Slot{ Slot: slot, Publication: stream.Tables, } } else { - slotAndPublication := databaseSlots[stream.Database][slotName] streamTables := slotAndPublication.Publication for tableName, table := range stream.Tables { if _, exists := streamTables[tableName]; !exists { @@ -492,16 +505,17 @@ func (c *Cluster) syncStream(appId string) error { continue } streamExists = true + c.Streams[appId] = &stream desiredStreams := c.generateFabricEventStream(appId) if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences c.setProcessName("updating event streams with applicationId %s", appId) - stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) + updatedStream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) } - c.Streams[appId] = stream + c.Streams[appId] = updatedStream } if match, reason := c.compareStreams(&stream, desiredStreams); !match { c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) @@ -545,7 +559,7 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab for newKey, newValue := range newEventStreams.Annotations { desiredAnnotations[newKey] = newValue } - if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed { + if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations, nil); changed { match = false reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason)) } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 32aae605b..797e7a5aa 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -97,6 +97,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } + if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) { + // do not apply any major version related changes yet + newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion + } + if err = c.syncStatefulSet(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { err = fmt.Errorf("could not sync statefulsets: %v", err) @@ -112,8 +117,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } c.logger.Debug("syncing pod disruption budgets") - if err = c.syncPodDisruptionBudget(false); err != nil { - err = fmt.Errorf("could not sync pod disruption budget: %v", err) + if err = c.syncPodDisruptionBudgets(false); err != nil { + err = fmt.Errorf("could not sync pod disruption budgets: %v", err) return err } @@ -148,7 +153,10 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return fmt.Errorf("could not sync connection pooler: %v", err) } - if len(c.Spec.Streams) > 0 { + // sync if manifest stream count is different from stream CR count + // it can be that they are always different due to grouping of manifest streams + // but we would catch missed removals on update + if len(c.Spec.Streams) != len(c.Streams) { c.logger.Debug("syncing streams") if err = c.syncStreams(); err != nil { err = fmt.Errorf("could not sync streams: %v", err) @@ -230,7 +238,7 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { maps.Copy(annotations, cm.Annotations) // Patroni can add extra annotations so incl. current annotations in desired annotations desiredAnnotations := c.annotationsSet(cm.Annotations) - if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { + if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { return fmt.Errorf("could not form patch for %s config map: %v", configMapName, err) @@ -275,7 +283,7 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { maps.Copy(annotations, ep.Annotations) // Patroni can add extra annotations so incl. current annotations in desired annotations desiredAnnotations := c.annotationsSet(ep.Annotations) - if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { + if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { return fmt.Errorf("could not form patch for %s endpoint: %v", endpointName, err) @@ -320,7 +328,7 @@ func (c *Cluster) syncPatroniService() error { maps.Copy(annotations, svc.Annotations) // Patroni can add extra annotations so incl. current annotations in desired annotations desiredAnnotations := c.annotationsSet(svc.Annotations) - if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { + if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) if err != nil { return fmt.Errorf("could not form patch for %s service: %v", serviceName, err) @@ -412,7 +420,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return fmt.Errorf("could not update %s endpoint: %v", role, err) } } else { - if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed { + if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredEp.Annotations) if err != nil { return fmt.Errorf("could not form patch for %s endpoint: %v", role, err) @@ -447,22 +455,22 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return nil } -func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { +func (c *Cluster) syncPrimaryPodDisruptionBudget(isUpdate bool) error { var ( pdb *policyv1.PodDisruptionBudget err error ) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { - c.PodDisruptionBudget = pdb - newPDB := c.generatePodDisruptionBudget() + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.PrimaryPodDisruptionBudget = pdb + newPDB := c.generatePrimaryPodDisruptionBudget() match, reason := c.comparePodDisruptionBudget(pdb, newPDB) if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) - if err = c.updatePodDisruptionBudget(newPDB); err != nil { + if err = c.updatePrimaryPodDisruptionBudget(newPDB); err != nil { return err } } else { - c.PodDisruptionBudget = pdb + c.PrimaryPodDisruptionBudget = pdb } return nil @@ -471,24 +479,77 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { return fmt.Errorf("could not get pod disruption budget: %v", err) } // no existing pod disruption budget, create new one - c.logger.Infof("could not find the cluster's pod disruption budget") + c.logger.Infof("could not find the primary pod disruption budget") - if pdb, err = c.createPodDisruptionBudget(); err != nil { + if err = c.createPrimaryPodDisruptionBudget(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { - return fmt.Errorf("could not create pod disruption budget: %v", err) + return fmt.Errorf("could not create primary pod disruption budget: %v", err) } c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) } } - c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - c.PodDisruptionBudget = pdb + return nil +} + +func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error { + var ( + pdb *policyv1.PodDisruptionBudget + err error + ) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.CriticalOpPodDisruptionBudget = pdb + newPDB := c.generateCriticalOpPodDisruptionBudget() + match, reason := c.comparePodDisruptionBudget(pdb, newPDB) + if !match { + c.logPDBChanges(pdb, newPDB, isUpdate, reason) + if err = c.updateCriticalOpPodDisruptionBudget(newPDB); err != nil { + return err + } + } else { + c.CriticalOpPodDisruptionBudget = pdb + } + return nil + + } + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get pod disruption budget: %v", err) + } + // no existing pod disruption budget, create new one + c.logger.Infof("could not find pod disruption budget for critical operations") + + if err = c.createCriticalOpPodDisruptionBudget(); err != nil { + if !k8sutil.ResourceAlreadyExists(err) { + return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err) + } + c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) + } + } return nil } +func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error { + errors := make([]string, 0) + + if err := c.syncPrimaryPodDisruptionBudget(isUpdate); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if err := c.syncCriticalOpPodDisruptionBudget(isUpdate); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil +} + func (c *Cluster) syncStatefulSet() error { var ( restartWait uint32 @@ -562,13 +623,22 @@ func (c *Cluster) syncStatefulSet() error { cmp := c.compareStatefulSetWith(desiredSts) if !cmp.rollingUpdate { + updatedPodAnnotations := map[string]*string{} + for _, anno := range cmp.deletedPodAnnotations { + updatedPodAnnotations[anno] = nil + } + for anno, val := range desiredSts.Spec.Template.Annotations { + updatedPodAnnotations[anno] = &val + } + metadataReq := map[string]map[string]map[string]*string{"metadata": {"annotations": updatedPodAnnotations}} + patch, err := json.Marshal(metadataReq) + if err != nil { + return fmt.Errorf("could not form patch for pod annotations: %v", err) + } + for _, pod := range pods { - if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations); changed { - patchData, err := metaAnnotationsPatch(desiredSts.Spec.Template.Annotations) - if err != nil { - return fmt.Errorf("could not form patch for pod %q annotations: %v", pod.Name, err) - } - _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations, nil); changed { + _, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) if err != nil { return fmt.Errorf("could not patch annotations for pod %q: %v", pod.Name, err) } @@ -658,11 +728,6 @@ func (c *Cluster) syncStatefulSet() error { isSafeToRecreatePods = false } - if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) { - postponeReasons = append(postponeReasons, "not in maintenance window") - isSafeToRecreatePods = false - } - // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) if len(podsToRecreate) > 0 { @@ -1150,7 +1215,7 @@ func (c *Cluster) updateSecret( c.Secrets[secret.UID] = secret } - if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed { + if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations, nil); changed { patchData, err := metaAnnotationsPatch(generatedSecret.Annotations) if err != nil { return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) @@ -1595,19 +1660,38 @@ func (c *Cluster) syncLogicalBackupJob() error { } c.logger.Infof("logical backup job %s updated", c.getLogicalBackupJobName()) } - if match, reason := c.compareLogicalBackupJob(job, desiredJob); !match { + if cmp := c.compareLogicalBackupJob(job, desiredJob); !cmp.match { c.logger.Infof("logical job %s is not in the desired state and needs to be updated", c.getLogicalBackupJobName(), ) - if reason != "" { - c.logger.Infof("reason: %s", reason) + if len(cmp.reasons) != 0 { + for _, reason := range cmp.reasons { + c.logger.Infof("reason: %s", reason) + } + } + if len(cmp.deletedPodAnnotations) != 0 { + templateMetadataReq := map[string]map[string]map[string]map[string]map[string]map[string]map[string]*string{ + "spec": {"jobTemplate": {"spec": {"template": {"metadata": {"annotations": {}}}}}}} + for _, anno := range cmp.deletedPodAnnotations { + templateMetadataReq["spec"]["jobTemplate"]["spec"]["template"]["metadata"]["annotations"][anno] = nil + } + patch, err := json.Marshal(templateMetadataReq) + if err != nil { + return fmt.Errorf("could not marshal ObjectMeta for logical backup job %q pod template: %v", jobName, err) + } + + job, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), jobName, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "") + if err != nil { + c.logger.Errorf("failed to remove annotations from the logical backup job %q pod template: %v", jobName, err) + return err + } } if err = c.patchLogicalBackupJob(desiredJob); err != nil { return fmt.Errorf("could not update logical backup job to match desired state: %v", err) } c.logger.Info("the logical backup job is synced") } - if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations); changed { + if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations, nil); changed { patchData, err := metaAnnotationsPatch(desiredJob.Annotations) if err != nil { return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err) diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index d45a193cb..f9d1d7873 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -142,6 +142,181 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) { } } +func TestPodAnnotationsSync(t *testing.T) { + clusterName := "acid-test-cluster-2" + namespace := "default" + podAnnotation := "no-scale-down" + podAnnotations := map[string]string{podAnnotation: "true"} + customPodAnnotation := "foo" + customPodAnnotations := map[string]string{customPodAnnotation: "true"} + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := mocks.NewMockHTTPClient(ctrl) + client, _ := newFakeK8sAnnotationsClient() + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + Volume: acidv1.Volume{ + Size: "1Gi", + }, + EnableConnectionPooler: boolToPointer(true), + EnableLogicalBackup: true, + EnableReplicaConnectionPooler: boolToPointer(true), + PodAnnotations: podAnnotations, + NumberOfInstances: 2, + }, + } + + var cluster = New( + Config{ + OpConfig: config.Config{ + PatroniAPICheckInterval: time.Duration(1), + PatroniAPICheckTimeout: time.Duration(5), + PodManagementPolicy: "ordered_ready", + CustomPodAnnotations: customPodAnnotations, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: k8sutil.Int32ToPointer(1), + }, + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + MaxInstances: -1, + PodRoleLabel: "spilo-role", + ResourceCheckInterval: time.Duration(3), + ResourceCheckTimeout: time.Duration(10), + }, + }, + }, client, pg, logger, eventRecorder) + + configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}` + response := http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(configJson))), + } + + mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes() + cluster.patroni = patroni.New(patroniLogger, mockClient) + cluster.Name = clusterName + cluster.Namespace = namespace + clusterOptions := clusterLabelsOptions(cluster) + + // create a statefulset + _, err := cluster.createStatefulSet() + assert.NoError(t, err) + // create a pods + podsList := createPods(cluster) + for _, pod := range podsList { + _, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + assert.NoError(t, err) + } + // create connection pooler + _, err = cluster.createConnectionPooler(mockInstallLookupFunction) + assert.NoError(t, err) + + // create cron job + err = cluster.createLogicalBackupJob() + assert.NoError(t, err) + + annotateResources(cluster) + err = cluster.Sync(&cluster.Postgresql) + assert.NoError(t, err) + + // 1. PodAnnotations set + stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, sts := range stsList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.Contains(t, sts.Spec.Template.Annotations, annotation) + } + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + assert.NoError(t, err) + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.Contains(t, deploy.Spec.Template.Annotations, annotation, + fmt.Sprintf("pooler deployment pod template %s should contain annotation %s, found %#v", + deploy.Name, annotation, deploy.Spec.Template.Annotations)) + } + } + + podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, pod := range podList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.Contains(t, pod.Annotations, annotation, + fmt.Sprintf("pod %s should contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations)) + } + } + + cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, cronJob := range cronJobList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.Contains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation, + fmt.Sprintf("logical backup cron job's pod template should contain annotation %s, found %#v", + annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) + } + } + + // 2 PodAnnotations removed + newSpec := cluster.Postgresql.DeepCopy() + newSpec.Spec.PodAnnotations = nil + cluster.OpConfig.CustomPodAnnotations = nil + err = cluster.Sync(newSpec) + assert.NoError(t, err) + + stsList, err = cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, sts := range stsList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.NotContains(t, sts.Spec.Template.Annotations, annotation) + } + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + assert.NoError(t, err) + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.NotContains(t, deploy.Spec.Template.Annotations, annotation, + fmt.Sprintf("pooler deployment pod template %s should not contain annotation %s, found %#v", + deploy.Name, annotation, deploy.Spec.Template.Annotations)) + } + } + + podList, err = cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, pod := range podList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.NotContains(t, pod.Annotations, annotation, + fmt.Sprintf("pod %s should not contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations)) + } + } + + cronJobList, err = cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, cronJob := range cronJobList.Items { + for _, annotation := range []string{podAnnotation, customPodAnnotation} { + assert.NotContains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation, + fmt.Sprintf("logical backup cron job's pod template should not contain annotation %s, found %#v", + annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) + } + } +} + func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { testName := "test config comparison" client, _ := newFakeK8sSyncClient() diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 8e9263d49..17c4e705e 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -58,15 +58,16 @@ type WorkerStatus struct { // ClusterStatus describes status of the cluster type ClusterStatus struct { - Team string - Cluster string - Namespace string - MasterService *v1.Service - ReplicaService *v1.Service - MasterEndpoint *v1.Endpoints - ReplicaEndpoint *v1.Endpoints - StatefulSet *appsv1.StatefulSet - PodDisruptionBudget *policyv1.PodDisruptionBudget + Team string + Cluster string + Namespace string + MasterService *v1.Service + ReplicaService *v1.Service + MasterEndpoint *v1.Endpoints + ReplicaEndpoint *v1.Endpoints + StatefulSet *appsv1.StatefulSet + PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget + CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget CurrentProcess Process Worker uint32 diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index e245389af..9cd7dc7e9 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -247,18 +247,18 @@ func createPods(cluster *Cluster) []v1.Pod { for i, role := range []PostgresRole{Master, Replica} { podsList = append(podsList, v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%d", clusterName, i), + Name: fmt.Sprintf("%s-%d", cluster.Name, i), Namespace: namespace, Labels: map[string]string{ "application": "spilo", - "cluster-name": clusterName, + "cluster-name": cluster.Name, "spilo-role": string(role), }, }, }) podsList = append(podsList, v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-pooler-%s", clusterName, role), + Name: fmt.Sprintf("%s-pooler-%s", cluster.Name, role), Namespace: namespace, Labels: cluster.connectionPoolerLabels(role, true).MatchLabels, }, @@ -329,7 +329,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, if err != nil { return nil, err } - _, err = cluster.createPodDisruptionBudget() + err = cluster.createPodDisruptionBudgets() if err != nil { return nil, err } @@ -650,7 +650,7 @@ func Test_trimCronjobName(t *testing.T) { } } -func TestisInMaintenanceWindow(t *testing.T) { +func TestIsInMaintenanceWindow(t *testing.T) { now := time.Now() futureTimeStart := now.Add(1 * time.Hour) futureTimeStartFormatted := futureTimeStart.Format("15:04") diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 240220ccf..fee18beaf 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -225,7 +225,7 @@ func (c *Cluster) syncVolumeClaims() error { } newAnnotations := c.annotationsSet(nil) - if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations); changed { + if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations, nil); changed { patchData, err := metaAnnotationsPatch(newAnnotations) if err != nil { return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err) diff --git a/ui/manifests/deployment.yaml b/ui/manifests/deployment.yaml index e09dd1e4f..3b3097416 100644 --- a/ui/manifests/deployment.yaml +++ b/ui/manifests/deployment.yaml @@ -81,8 +81,6 @@ spec: ] } # Exemple of settings to make snapshot view working in the ui when using AWS - # - name: WALE_S3_ENDPOINT - # value: https+path://s3.us-east-1.amazonaws.com:443 # - name: SPILO_S3_BACKUP_PREFIX # value: spilo/ # - name: AWS_ACCESS_KEY_ID @@ -102,5 +100,3 @@ spec: # key: AWS_DEFAULT_REGION # - name: SPILO_S3_BACKUP_BUCKET # value: - # - name: "USE_AWS_INSTANCE_PROFILE" - # value: "true" diff --git a/ui/operator_ui/main.py b/ui/operator_ui/main.py index e02c2995c..bf28df6eb 100644 --- a/ui/operator_ui/main.py +++ b/ui/operator_ui/main.py @@ -95,14 +95,6 @@ DEFAULT_CPU = getenv('DEFAULT_CPU', '10m') DEFAULT_CPU_LIMIT = getenv('DEFAULT_CPU_LIMIT', '300m') -WALE_S3_ENDPOINT = getenv( - 'WALE_S3_ENDPOINT', - 'https+path://s3.eu-central-1.amazonaws.com:443', -) - -USE_AWS_INSTANCE_PROFILE = ( - getenv('USE_AWS_INSTANCE_PROFILE', 'false').lower() != 'false' -) AWS_ENDPOINT = getenv('AWS_ENDPOINT') @@ -784,8 +776,6 @@ def get_versions(pg_cluster: str): bucket=SPILO_S3_BACKUP_BUCKET, pg_cluster=pg_cluster, prefix=SPILO_S3_BACKUP_PREFIX, - s3_endpoint=WALE_S3_ENDPOINT, - use_aws_instance_profile=USE_AWS_INSTANCE_PROFILE, ), ) @@ -797,9 +787,8 @@ def get_basebackups(pg_cluster: str, uid: str): bucket=SPILO_S3_BACKUP_BUCKET, pg_cluster=pg_cluster, prefix=SPILO_S3_BACKUP_PREFIX, - s3_endpoint=WALE_S3_ENDPOINT, uid=uid, - use_aws_instance_profile=USE_AWS_INSTANCE_PROFILE, + postgresql_versions=OPERATOR_UI_CONFIG.get('postgresql_versions', DEFAULT_UI_CONFIG['postgresql_versions']), ), ) @@ -991,8 +980,6 @@ def main(port, debug, clusters: list): logger.info(f'Superuser team: {SUPERUSER_TEAM}') logger.info(f'Target namespace: {TARGET_NAMESPACE}') logger.info(f'Teamservice URL: {TEAM_SERVICE_URL}') - logger.info(f'Use AWS instance_profile: {USE_AWS_INSTANCE_PROFILE}') - logger.info(f'WAL-E S3 endpoint: {WALE_S3_ENDPOINT}') logger.info(f'AWS S3 endpoint: {AWS_ENDPOINT}') if TARGET_NAMESPACE is None: diff --git a/ui/operator_ui/spiloutils.py b/ui/operator_ui/spiloutils.py index f715430a1..6a2f03bb2 100644 --- a/ui/operator_ui/spiloutils.py +++ b/ui/operator_ui/spiloutils.py @@ -6,9 +6,8 @@ from requests import Session from urllib.parse import urljoin from uuid import UUID -from wal_e.cmd import configure_backup_cxt -from .utils import Attrs, defaulting, these +from .utils import defaulting, these from operator_ui.adapters.logger import logger session = Session() @@ -284,10 +283,8 @@ def read_stored_clusters(bucket, prefix, delimiter='/'): def read_versions( pg_cluster, bucket, - s3_endpoint, prefix, delimiter='/', - use_aws_instance_profile=False, ): return [ 'base' if uid == 'wal' else uid @@ -305,35 +302,72 @@ def read_versions( if uid == 'wal' or defaulting(lambda: UUID(uid)) ] -BACKUP_VERSION_PREFIXES = ['', '10/', '11/', '12/', '13/', '14/', '15/', '16/', '17/'] +def lsn_to_wal_segment_stop(finish_lsn, start_segment, wal_segment_size=16 * 1024 * 1024): + timeline = int(start_segment[:8], 16) + log_id = finish_lsn >> 32 + seg_id = (finish_lsn & 0xFFFFFFFF) // wal_segment_size + return f"{timeline:08X}{log_id:08X}{seg_id:08X}" + +def lsn_to_offset_hex(lsn, wal_segment_size=16 * 1024 * 1024): + return f"{lsn % wal_segment_size:08X}" def read_basebackups( pg_cluster, uid, bucket, - s3_endpoint, prefix, - delimiter='/', - use_aws_instance_profile=False, + postgresql_versions, ): - environ['WALE_S3_ENDPOINT'] = s3_endpoint suffix = '' if uid == 'base' else '/' + uid backups = [] - for vp in BACKUP_VERSION_PREFIXES: - - backups = backups + [ - { - key: value - for key, value in basebackup.__dict__.items() - if isinstance(value, str) or isinstance(value, int) - } - for basebackup in Attrs.call( - f=configure_backup_cxt, - aws_instance_profile=use_aws_instance_profile, - s3_prefix=f's3://{bucket}/{prefix}{pg_cluster}{suffix}/wal/{vp}', - )._backup_list(detail=True) - ] + for vp in postgresql_versions: + backup_prefix = f'{prefix}{pg_cluster}{suffix}/wal/{vp}/basebackups_005/' + logger.info(f"{bucket}/{backup_prefix}") + + paginator = client('s3').get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=backup_prefix) + + for page in pages: + for obj in page.get("Contents", []): + key = obj["Key"] + if not key.endswith("backup_stop_sentinel.json"): + continue + + response = client('s3').get_object(Bucket=bucket, Key=key) + backup_info = loads(response["Body"].read().decode("utf-8")) + last_modified = response["LastModified"].astimezone(timezone.utc).isoformat() + + backup_name = key.split("/")[-1].replace("_backup_stop_sentinel.json", "") + start_seg, start_offset = backup_name.split("_")[1], backup_name.split("_")[-1] if "_" in backup_name else None + + if "LSN" in backup_info and "FinishLSN" in backup_info: + # WAL-G + lsn = backup_info["LSN"] + finish_lsn = backup_info["FinishLSN"] + backups.append({ + "expanded_size_bytes": backup_info.get("UncompressedSize"), + "last_modified": last_modified, + "name": backup_name, + "wal_segment_backup_start": start_seg, + "wal_segment_backup_stop": lsn_to_wal_segment_stop(finish_lsn, start_seg), + "wal_segment_offset_backup_start": lsn_to_offset_hex(lsn), + "wal_segment_offset_backup_stop": lsn_to_offset_hex(finish_lsn), + }) + elif "wal_segment_backup_stop" in backup_info: + # WAL-E + stop_seg = backup_info["wal_segment_backup_stop"] + stop_offset = backup_info["wal_segment_offset_backup_stop"] + + backups.append({ + "expanded_size_bytes": backup_info.get("expanded_size_bytes"), + "last_modified": last_modified, + "name": backup_name, + "wal_segment_backup_start": start_seg, + "wal_segment_backup_stop": stop_seg, + "wal_segment_offset_backup_start": start_offset, + "wal_segment_offset_backup_stop": stop_offset, + }) return backups diff --git a/ui/requirements.txt b/ui/requirements.txt index d3318ceec..783c0aac3 100644 --- a/ui/requirements.txt +++ b/ui/requirements.txt @@ -11,5 +11,4 @@ kubernetes==11.0.0 python-json-logger==2.0.7 requests==2.32.2 stups-tokens>=1.1.19 -wal_e==1.1.1 werkzeug==3.0.6