Skip to content

Commit d44bfab

Browse files
authored
do not use extra labels to list stream CRDs (#2803)
* do not use extra labels to list stream CRDs * add diff on labels for streams + unit test coverage
1 parent 80ef38f commit d44bfab

File tree

2 files changed

+58
-9
lines changed

2 files changed

+58
-9
lines changed

pkg/cluster/streams.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,9 @@ func (c *Cluster) syncStream(appId string) error {
467467
c.setProcessName("syncing stream with applicationId %s", appId)
468468
c.logger.Debugf("syncing stream with applicationId %s", appId)
469469

470-
listOptions := metav1.ListOptions{LabelSelector: c.labelsSet(true).String()}
470+
listOptions := metav1.ListOptions{
471+
LabelSelector: c.labelsSet(false).String(),
472+
}
471473
streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
472474
if err != nil {
473475
return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err)
@@ -492,7 +494,8 @@ func (c *Cluster) syncStream(appId string) error {
492494
}
493495
if match, reason := c.compareStreams(&stream, desiredStreams); !match {
494496
c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason)
495-
desiredStreams.ObjectMeta = stream.ObjectMeta
497+
// make sure to keep the old name with randomly generated suffix
498+
desiredStreams.ObjectMeta.Name = stream.ObjectMeta.Name
496499
updatedStream, err := c.updateStreams(desiredStreams)
497500
if err != nil {
498501
return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err)
@@ -527,6 +530,11 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab
527530
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
528531
}
529532

533+
if !reflect.DeepEqual(curEventStreams.ObjectMeta.Labels, newEventStreams.ObjectMeta.Labels) {
534+
match = false
535+
reasons = append(reasons, "new streams labels do not match the current ones")
536+
}
537+
530538
if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed {
531539
match = false
532540
reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason))

pkg/cluster/streams_test.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
490490
}
491491

492492
listOptions := metav1.ListOptions{
493-
LabelSelector: cluster.labelsSet(true).String(),
493+
LabelSelector: cluster.labelsSet(false).String(),
494494
}
495495
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
496496
assert.NoError(t, err)
@@ -529,7 +529,8 @@ func newFabricEventStream(streams []zalandov1.EventStream, annotations map[strin
529529
}
530530

531531
func TestSyncStreams(t *testing.T) {
532-
pg.Name = fmt.Sprintf("%s-2", pg.Name)
532+
newClusterName := fmt.Sprintf("%s-2", pg.Name)
533+
pg.Name = newClusterName
533534
var cluster = New(
534535
Config{
535536
OpConfig: config.Config{
@@ -560,7 +561,7 @@ func TestSyncStreams(t *testing.T) {
560561

561562
// check that only one stream remains after sync
562563
listOptions := metav1.ListOptions{
563-
LabelSelector: cluster.labelsSet(true).String(),
564+
LabelSelector: cluster.labelsSet(false).String(),
564565
}
565566
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
566567
assert.NoError(t, err)
@@ -812,6 +813,49 @@ func TestDeleteStreams(t *testing.T) {
812813
err = cluster.syncStream(appId)
813814
assert.NoError(t, err)
814815

816+
// change specs of streams and patch CRD
817+
for i, stream := range pg.Spec.Streams {
818+
if stream.ApplicationId == appId {
819+
streamTable := stream.Tables["data.bar"]
820+
streamTable.EventType = "stream-type-c"
821+
stream.Tables["data.bar"] = streamTable
822+
stream.BatchSize = k8sutil.UInt32ToPointer(uint32(250))
823+
pg.Spec.Streams[i] = stream
824+
}
825+
}
826+
827+
// compare stream returned from API with expected stream
828+
listOptions := metav1.ListOptions{
829+
LabelSelector: cluster.labelsSet(false).String(),
830+
}
831+
streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
832+
result := cluster.generateFabricEventStream(appId)
833+
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
834+
t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result)
835+
}
836+
837+
// change teamId and check that stream is updated
838+
pg.Spec.TeamID = "new-team"
839+
streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
840+
result = cluster.generateFabricEventStream(appId)
841+
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
842+
t.Errorf("Malformed FabricEventStream after updating teamId, expected %#v, got %#v", streams.Items[0].ObjectMeta.Labels, result.ObjectMeta.Labels)
843+
}
844+
845+
// disable recovery
846+
for idx, stream := range pg.Spec.Streams {
847+
if stream.ApplicationId == appId {
848+
stream.EnableRecovery = util.False()
849+
pg.Spec.Streams[idx] = stream
850+
}
851+
}
852+
853+
streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
854+
result = cluster.generateFabricEventStream(appId)
855+
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
856+
t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result)
857+
}
858+
815859
// remove streams from manifest
816860
pg.Spec.Streams = nil
817861
pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update(
@@ -822,10 +866,7 @@ func TestDeleteStreams(t *testing.T) {
822866
cluster.cleanupRemovedStreams(appIds)
823867

824868
// check that streams have been deleted
825-
listOptions := metav1.ListOptions{
826-
LabelSelector: cluster.labelsSet(true).String(),
827-
}
828-
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
869+
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
829870
assert.NoError(t, err)
830871
assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))
831872

0 commit comments

Comments
 (0)