Skip to content

Commit 21b9b6f

Browse files
authored
Emit K8S events to the postgresql CR as feedback to the requestor / user (zalando#896)
* Add EventsGetter to KubeClient to enable to sending K8S events * Add eventRecorder to the controller, initialize it and hand it down to cluster via its constructor to enable it to emit events this way * Add first set of events which then go to the postgresql custom resource the user interacts with to provide some feedback * Add right to "create" events to operator cluster role * Adapt cluster tests to new function sigurature with eventRecord (via NewFakeRecorder) * Get a proper reference before sending events to a resource Co-authored-by: Christian Rohmann <christian.rohmann@inovex.de>
1 parent 3c91bde commit 21b9b6f

File tree

11 files changed

+107
-29
lines changed

11 files changed

+107
-29
lines changed

charts/postgres-operator/templates/clusterrole.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ rules:
4242
- configmaps
4343
verbs:
4444
- get
45+
# to send events to the CRs
46+
- apiGroups:
47+
- ""
48+
resources:
49+
- events
50+
verbs:
51+
- create
4552
# to manage endpoints which are also used by Patroni
4653
- apiGroups:
4754
- ""

manifests/operator-service-account-rbac.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ rules:
4343
- configmaps
4444
verbs:
4545
- get
46+
# to send events to the CRs
47+
- apiGroups:
48+
- ""
49+
resources:
50+
- events
51+
verbs:
52+
- create
4653
# to manage endpoints which are also used by Patroni
4754
- apiGroups:
4855
- ""

pkg/cluster/cluster.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ import (
2121
"k8s.io/apimachinery/pkg/types"
2222
"k8s.io/client-go/rest"
2323
"k8s.io/client-go/tools/cache"
24+
"k8s.io/client-go/tools/record"
25+
"k8s.io/client-go/tools/reference"
2426

2527
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
28+
"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
2629
"github.com/zalando/postgres-operator/pkg/spec"
2730
"github.com/zalando/postgres-operator/pkg/util"
2831
"github.com/zalando/postgres-operator/pkg/util/config"
@@ -81,6 +84,7 @@ type Cluster struct {
8184
acidv1.Postgresql
8285
Config
8386
logger *logrus.Entry
87+
eventRecorder record.EventRecorder
8488
patroni patroni.Interface
8589
pgUsers map[string]spec.PgUser
8690
systemUsers map[string]spec.PgUser
@@ -109,7 +113,7 @@ type compareStatefulsetResult struct {
109113
}
110114

111115
// New creates a new cluster. This function should be called from a controller.
112-
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry) *Cluster {
116+
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster {
113117
deletePropagationPolicy := metav1.DeletePropagationOrphan
114118

115119
podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) {
@@ -140,7 +144,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
140144
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
141145
cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName)
142146
cluster.patroni = patroni.New(cluster.logger)
143-
147+
cluster.eventRecorder = eventRecorder
144148
return cluster
145149
}
146150

@@ -166,6 +170,16 @@ func (c *Cluster) setProcessName(procName string, args ...interface{}) {
166170
}
167171
}
168172

173+
// GetReference of Postgres CR object
174+
// i.e. required to emit events to this resource
175+
func (c *Cluster) GetReference() *v1.ObjectReference {
176+
ref, err := reference.GetReference(scheme.Scheme, &c.Postgresql)
177+
if err != nil {
178+
c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", c.Postgresql.Namespace, c.Postgresql.Name, err)
179+
}
180+
return ref
181+
}
182+
169183
// SetStatus of Postgres cluster
170184
// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
171185
func (c *Cluster) setStatus(status string) {
@@ -245,6 +259,7 @@ func (c *Cluster) Create() error {
245259
}()
246260

247261
c.setStatus(acidv1.ClusterStatusCreating)
262+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
248263

249264
if err = c.enforceMinResourceLimits(&c.Spec); err != nil {
250265
return fmt.Errorf("could not enforce minimum resource limits: %v", err)
@@ -263,6 +278,7 @@ func (c *Cluster) Create() error {
263278
return fmt.Errorf("could not create %s endpoint: %v", role, err)
264279
}
265280
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
281+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Endpoints", "Endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
266282
}
267283

268284
if c.Services[role] != nil {
@@ -273,6 +289,7 @@ func (c *Cluster) Create() error {
273289
return fmt.Errorf("could not create %s service: %v", role, err)
274290
}
275291
c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta))
292+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Services", "The service %q for role %s has been successfully created", util.NameFromMeta(service.ObjectMeta), role)
276293
}
277294

278295
if err = c.initUsers(); err != nil {
@@ -284,6 +301,7 @@ func (c *Cluster) Create() error {
284301
return fmt.Errorf("could not create secrets: %v", err)
285302
}
286303
c.logger.Infof("secrets have been successfully created")
304+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created")
287305

288306
if c.PodDisruptionBudget != nil {
289307
return fmt.Errorf("pod disruption budget already exists in the cluster")
@@ -302,6 +320,7 @@ func (c *Cluster) Create() error {
302320
return fmt.Errorf("could not create statefulset: %v", err)
303321
}
304322
c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
323+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
305324

306325
c.logger.Info("waiting for the cluster being ready")
307326

@@ -310,6 +329,7 @@ func (c *Cluster) Create() error {
310329
return err
311330
}
312331
c.logger.Infof("pods are ready")
332+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")
313333

314334
// create database objects unless we are running without pods or disabled
315335
// that feature explicitly
@@ -555,6 +575,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
555575
}
556576
if isSmaller {
557577
c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
578+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
558579
spec.Resources.ResourceLimits.CPU = minCPULimit
559580
}
560581
}
@@ -567,6 +588,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
567588
}
568589
if isSmaller {
569590
c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
591+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
570592
spec.Resources.ResourceLimits.Memory = minMemoryLimit
571593
}
572594
}
@@ -598,6 +620,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
598620
if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison
599621
c.logger.Warningf("postgresql version change(%q -> %q) has no effect",
600622
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
623+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect",
624+
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
601625
//we need that hack to generate statefulset with the old version
602626
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
603627
}
@@ -757,6 +781,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
757781
func (c *Cluster) Delete() {
758782
c.mu.Lock()
759783
defer c.mu.Unlock()
784+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources")
760785

761786
// delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods
762787
// deleting the cron job also removes pods and batch jobs it created
@@ -1095,6 +1120,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
10951120

10961121
var err error
10971122
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
1123+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
10981124

10991125
var wg sync.WaitGroup
11001126

@@ -1121,6 +1147,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
11211147

11221148
if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
11231149
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
1150+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
11241151
if err = <-podLabelErr; err != nil {
11251152
err = fmt.Errorf("could not get master pod label: %v", err)
11261153
}
@@ -1136,6 +1163,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
11361163
// close the label waiting channel no sooner than the waiting goroutine terminates.
11371164
close(podLabelErr)
11381165

1166+
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
11391167
return err
11401168

11411169
}

pkg/cluster/cluster_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
1414
"github.com/zalando/postgres-operator/pkg/util/teams"
1515
v1 "k8s.io/api/core/v1"
16+
"k8s.io/client-go/tools/record"
1617
)
1718

1819
const (
@@ -21,6 +22,8 @@ const (
2122
)
2223

2324
var logger = logrus.New().WithField("test", "cluster")
25+
var eventRecorder = record.NewFakeRecorder(1)
26+
2427
var cl = New(
2528
Config{
2629
OpConfig: config.Config{
@@ -34,6 +37,7 @@ var cl = New(
3437
k8sutil.NewMockKubernetesClient(),
3538
acidv1.Postgresql{},
3639
logger,
40+
eventRecorder,
3741
)
3842

3943
func TestInitRobotUsers(t *testing.T) {

pkg/cluster/k8sres_test.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) {
3737
ReplicationUsername: replicationUserName,
3838
},
3939
},
40-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
40+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
4141

4242
testName := "TestGenerateSpiloConfig"
4343
tests := []struct {
@@ -102,7 +102,7 @@ func TestCreateLoadBalancerLogic(t *testing.T) {
102102
ReplicationUsername: replicationUserName,
103103
},
104104
},
105-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
105+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
106106

107107
testName := "TestCreateLoadBalancerLogic"
108108
tests := []struct {
@@ -164,7 +164,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
164164
acidv1.Postgresql{
165165
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
166166
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
167-
logger),
167+
logger,
168+
eventRecorder),
168169
policyv1beta1.PodDisruptionBudget{
169170
ObjectMeta: metav1.ObjectMeta{
170171
Name: "postgres-myapp-database-pdb",
@@ -187,7 +188,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
187188
acidv1.Postgresql{
188189
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
189190
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}},
190-
logger),
191+
logger,
192+
eventRecorder),
191193
policyv1beta1.PodDisruptionBudget{
192194
ObjectMeta: metav1.ObjectMeta{
193195
Name: "postgres-myapp-database-pdb",
@@ -210,7 +212,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
210212
acidv1.Postgresql{
211213
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
212214
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
213-
logger),
215+
logger,
216+
eventRecorder),
214217
policyv1beta1.PodDisruptionBudget{
215218
ObjectMeta: metav1.ObjectMeta{
216219
Name: "postgres-myapp-database-pdb",
@@ -233,7 +236,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
233236
acidv1.Postgresql{
234237
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
235238
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
236-
logger),
239+
logger,
240+
eventRecorder),
237241
policyv1beta1.PodDisruptionBudget{
238242
ObjectMeta: metav1.ObjectMeta{
239243
Name: "postgres-myapp-database-databass-budget",
@@ -368,7 +372,7 @@ func TestCloneEnv(t *testing.T) {
368372
ReplicationUsername: replicationUserName,
369373
},
370374
},
371-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
375+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
372376

373377
for _, tt := range tests {
374378
envs := cluster.generateCloneEnvironment(tt.cloneOpts)
@@ -502,7 +506,7 @@ func TestGetPgVersion(t *testing.T) {
502506
ReplicationUsername: replicationUserName,
503507
},
504508
},
505-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
509+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
506510

507511
for _, tt := range tests {
508512
pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion)
@@ -678,7 +682,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
678682
ConnectionPoolerDefaultMemoryLimit: "100Mi",
679683
},
680684
},
681-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
685+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
682686

683687
var clusterNoDefaultRes = New(
684688
Config{
@@ -690,7 +694,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
690694
},
691695
ConnectionPooler: config.ConnectionPooler{},
692696
},
693-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
697+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
694698

695699
noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil }
696700

@@ -803,7 +807,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) {
803807
ConnectionPoolerDefaultMemoryLimit: "100Mi",
804808
},
805809
},
806-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
810+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
807811
cluster.Statefulset = &appsv1.StatefulSet{
808812
ObjectMeta: metav1.ObjectMeta{
809813
Name: "test-sts",
@@ -904,7 +908,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) {
904908
ConnectionPoolerDefaultMemoryLimit: "100Mi",
905909
},
906910
},
907-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
911+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
908912
cluster.Statefulset = &appsv1.StatefulSet{
909913
ObjectMeta: metav1.ObjectMeta{
910914
Name: "test-sts",
@@ -990,7 +994,7 @@ func TestTLS(t *testing.T) {
990994
SpiloFSGroup: &spiloFSGroup,
991995
},
992996
},
993-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
997+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
994998
spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"})
995999
s, err := cluster.generateStatefulSet(&spec)
9961000
if err != nil {
@@ -1112,7 +1116,7 @@ func TestAdditionalVolume(t *testing.T) {
11121116
ReplicationUsername: replicationUserName,
11131117
},
11141118
},
1115-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
1119+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
11161120

11171121
for _, tt := range tests {
11181122
// Test with additional volume mounted in all containers

pkg/cluster/resources_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) {
3636
ConnectionPoolerDefaultMemoryLimit: "100Mi",
3737
},
3838
},
39-
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger)
39+
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder)
4040

4141
cluster.Statefulset = &appsv1.StatefulSet{
4242
ObjectMeta: metav1.ObjectMeta{
@@ -85,7 +85,7 @@ func TestNeedConnectionPooler(t *testing.T) {
8585
ConnectionPoolerDefaultMemoryLimit: "100Mi",
8686
},
8787
},
88-
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger)
88+
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder)
8989

9090
cluster.Spec = acidv1.PostgresSpec{
9191
ConnectionPooler: &acidv1.ConnectionPooler{},

pkg/cluster/sync.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,12 @@ func (c *Cluster) syncStatefulSet() error {
343343
// statefulset or those that got their configuration from the outdated statefulset)
344344
if podsRollingUpdateRequired {
345345
c.logger.Debugln("performing rolling update")
346+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
346347
if err := c.recreatePods(); err != nil {
347348
return fmt.Errorf("could not recreate pods: %v", err)
348349
}
349350
c.logger.Infof("pods have been recreated")
351+
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
350352
if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil {
351353
c.logger.Warningf("could not clear rolling update for the statefulset: %v", err)
352354
}

pkg/cluster/sync_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
7979
NumberOfInstances: int32ToPointer(1),
8080
},
8181
},
82-
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
82+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
8383

8484
cluster.Statefulset = &appsv1.StatefulSet{
8585
ObjectMeta: metav1.ObjectMeta{

0 commit comments

Comments
 (0)