@@ -490,7 +490,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
490
490
}
491
491
492
492
listOptions := metav1.ListOptions {
493
- LabelSelector : cluster .labelsSet (true ).String (),
493
+ LabelSelector : cluster .labelsSet (false ).String (),
494
494
}
495
495
streams , err := cluster .KubeClient .FabricEventStreams (namespace ).List (context .TODO (), listOptions )
496
496
assert .NoError (t , err )
@@ -529,7 +529,8 @@ func newFabricEventStream(streams []zalandov1.EventStream, annotations map[strin
529
529
}
530
530
531
531
func TestSyncStreams (t * testing.T ) {
532
- pg .Name = fmt .Sprintf ("%s-2" , pg .Name )
532
+ newClusterName := fmt .Sprintf ("%s-2" , pg .Name )
533
+ pg .Name = newClusterName
533
534
var cluster = New (
534
535
Config {
535
536
OpConfig : config.Config {
@@ -560,7 +561,7 @@ func TestSyncStreams(t *testing.T) {
560
561
561
562
// check that only one stream remains after sync
562
563
listOptions := metav1.ListOptions {
563
- LabelSelector : cluster .labelsSet (true ).String (),
564
+ LabelSelector : cluster .labelsSet (false ).String (),
564
565
}
565
566
streams , err := cluster .KubeClient .FabricEventStreams (namespace ).List (context .TODO (), listOptions )
566
567
assert .NoError (t , err )
@@ -812,6 +813,49 @@ func TestDeleteStreams(t *testing.T) {
812
813
err = cluster .syncStream (appId )
813
814
assert .NoError (t , err )
814
815
816
+ // change specs of streams and patch CRD
817
+ for i , stream := range pg .Spec .Streams {
818
+ if stream .ApplicationId == appId {
819
+ streamTable := stream .Tables ["data.bar" ]
820
+ streamTable .EventType = "stream-type-c"
821
+ stream .Tables ["data.bar" ] = streamTable
822
+ stream .BatchSize = k8sutil .UInt32ToPointer (uint32 (250 ))
823
+ pg .Spec .Streams [i ] = stream
824
+ }
825
+ }
826
+
827
+ // compare stream returned from API with expected stream
828
+ listOptions := metav1.ListOptions {
829
+ LabelSelector : cluster .labelsSet (false ).String (),
830
+ }
831
+ streams := patchPostgresqlStreams (t , cluster , & pg .Spec , listOptions )
832
+ result := cluster .generateFabricEventStream (appId )
833
+ if match , _ := cluster .compareStreams (& streams .Items [0 ], result ); ! match {
834
+ t .Errorf ("Malformed FabricEventStream after updating manifest, expected %#v, got %#v" , streams .Items [0 ], result )
835
+ }
836
+
837
+ // change teamId and check that stream is updated
838
+ pg .Spec .TeamID = "new-team"
839
+ streams = patchPostgresqlStreams (t , cluster , & pg .Spec , listOptions )
840
+ result = cluster .generateFabricEventStream (appId )
841
+ if match , _ := cluster .compareStreams (& streams .Items [0 ], result ); ! match {
842
+ t .Errorf ("Malformed FabricEventStream after updating teamId, expected %#v, got %#v" , streams .Items [0 ].ObjectMeta .Labels , result .ObjectMeta .Labels )
843
+ }
844
+
845
+ // disable recovery
846
+ for idx , stream := range pg .Spec .Streams {
847
+ if stream .ApplicationId == appId {
848
+ stream .EnableRecovery = util .False ()
849
+ pg .Spec .Streams [idx ] = stream
850
+ }
851
+ }
852
+
853
+ streams = patchPostgresqlStreams (t , cluster , & pg .Spec , listOptions )
854
+ result = cluster .generateFabricEventStream (appId )
855
+ if match , _ := cluster .compareStreams (& streams .Items [0 ], result ); ! match {
856
+ t .Errorf ("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v" , streams .Items [0 ], result )
857
+ }
858
+
815
859
// remove streams from manifest
816
860
pg .Spec .Streams = nil
817
861
pgUpdated , err := cluster .KubeClient .Postgresqls (namespace ).Update (
@@ -822,10 +866,7 @@ func TestDeleteStreams(t *testing.T) {
822
866
cluster .cleanupRemovedStreams (appIds )
823
867
824
868
// check that streams have been deleted
825
- listOptions := metav1.ListOptions {
826
- LabelSelector : cluster .labelsSet (true ).String (),
827
- }
828
- streams , err := cluster .KubeClient .FabricEventStreams (namespace ).List (context .TODO (), listOptions )
869
+ streams , err = cluster .KubeClient .FabricEventStreams (namespace ).List (context .TODO (), listOptions )
829
870
assert .NoError (t , err )
830
871
assert .Equalf (t , 0 , len (streams .Items ), "unexpected number of streams found: got %d, but expected none" , len (streams .Items ))
831
872
0 commit comments