@@ -15,15 +15,16 @@ import (
15
15
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
16
)
17
17
18
- func (c * Cluster ) createStreams (appId string ) error {
18
+ func (c * Cluster ) createStreams (appId string ) ( * zalandov1. FabricEventStream , error ) {
19
19
c .setProcessName ("creating streams" )
20
20
21
21
fes := c .generateFabricEventStream (appId )
22
- if _ , err := c .KubeClient .FabricEventStreams (c .Namespace ).Create (context .TODO (), fes , metav1.CreateOptions {}); err != nil {
23
- return err
22
+ streamCRD , err := c .KubeClient .FabricEventStreams (c .Namespace ).Create (context .TODO (), fes , metav1.CreateOptions {})
23
+ if err != nil {
24
+ return nil , err
24
25
}
25
26
26
- return nil
27
+ return streamCRD , nil
27
28
}
28
29
29
30
func (c * Cluster ) updateStreams (newEventStreams * zalandov1.FabricEventStream ) error {
@@ -46,11 +47,17 @@ func (c *Cluster) deleteStreams() error {
46
47
}
47
48
48
49
errors := make ([]string , 0 )
49
- for _ , appId := range c .streamApplications {
50
- fesName := fmt .Sprintf ("%s-%s" , c .Name , appId )
51
- err = c .KubeClient .FabricEventStreams (c .Namespace ).Delete (context .TODO (), fesName , metav1.DeleteOptions {})
50
+ listOptions := metav1.ListOptions {
51
+ LabelSelector : c .labelsSet (true ).String (),
52
+ }
53
+ streams , err := c .KubeClient .FabricEventStreams (c .Namespace ).List (context .TODO (), listOptions )
54
+ if err != nil {
55
+ return fmt .Errorf ("could not list of FabricEventStreams: %v" , err )
56
+ }
57
+ for _ , stream := range streams .Items {
58
+ err = c .KubeClient .FabricEventStreams (stream .Namespace ).Delete (context .TODO (), stream .Name , metav1.DeleteOptions {})
52
59
if err != nil {
53
- errors = append (errors , fmt .Sprintf ("could not delete event stream %q: %v" , fesName , err ))
60
+ errors = append (errors , fmt .Sprintf ("could not delete event stream %q: %v" , stream . Name , err ))
54
61
}
55
62
}
56
63
@@ -184,8 +191,10 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
184
191
Kind : constants .EventStreamCRDKind ,
185
192
},
186
193
ObjectMeta : metav1.ObjectMeta {
187
- Name : fmt .Sprintf ("%s-%s" , c .Name , appId ),
194
+ // max length for cluster name is 58 so we can only add 5 more characters / numbers
195
+ Name : fmt .Sprintf ("%s-%s" , c .Name , util .RandomPassword (5 )),
188
196
Namespace : c .Namespace ,
197
+ Labels : c .labelsSet (true ),
189
198
Annotations : c .AnnotationsToPropagate (c .annotationsSet (nil )),
190
199
// make cluster StatefulSet the owner (like with connection pooler objects)
191
200
OwnerReferences : c .ownerReferences (),
@@ -284,11 +293,6 @@ func (c *Cluster) syncStreams() error {
284
293
return nil
285
294
}
286
295
287
- // fetch different application IDs from streams section
288
- // there will be a separate event stream resource for each ID
289
- appIds := gatherApplicationIds (c .Spec .Streams )
290
- c .streamApplications = appIds
291
-
292
296
slots := make (map [string ]map [string ]string )
293
297
slotsToSync := make (map [string ]map [string ]string )
294
298
publications := make (map [string ]map [string ]acidv1.StreamTable )
@@ -355,32 +359,43 @@ func (c *Cluster) syncStreams() error {
355
359
}
356
360
357
361
func (c * Cluster ) createOrUpdateStreams () error {
358
- for _ , appId := range c .streamApplications {
359
- fesName := fmt .Sprintf ("%s-%s" , c .Name , appId )
360
- effectiveStreams , err := c .KubeClient .FabricEventStreams (c .Namespace ).Get (context .TODO (), fesName , metav1.GetOptions {})
361
- if err != nil {
362
- if ! k8sutil .ResourceNotFound (err ) {
363
- return fmt .Errorf ("failed reading event stream %s: %v" , fesName , err )
364
- }
365
362
366
- c .logger .Infof ("event streams do not exist, create it" )
367
- err = c .createStreams (appId )
368
- if err != nil {
369
- return fmt .Errorf ("failed creating event stream %s: %v" , fesName , err )
370
- }
371
- c .logger .Infof ("event stream %q has been successfully created" , fesName )
372
- } else {
373
- desiredStreams := c .generateFabricEventStream (appId )
374
- if match , reason := sameStreams (effectiveStreams .Spec .EventStreams , desiredStreams .Spec .EventStreams ); ! match {
375
- c .logger .Debugf ("updating event streams: %s" , reason )
376
- desiredStreams .ObjectMeta .ResourceVersion = effectiveStreams .ObjectMeta .ResourceVersion
377
- err = c .updateStreams (desiredStreams )
378
- if err != nil {
379
- return fmt .Errorf ("failed updating event stream %s: %v" , fesName , err )
363
+ // fetch different application IDs from streams section
364
+ // there will be a separate event stream resource for each ID
365
+ appIds := gatherApplicationIds (c .Spec .Streams )
366
+
367
+ // list all existing stream CRDs
368
+ listOptions := metav1.ListOptions {
369
+ LabelSelector : c .labelsSet (true ).String (),
370
+ }
371
+ streams , err := c .KubeClient .FabricEventStreams (c .Namespace ).List (context .TODO (), listOptions )
372
+ if err != nil {
373
+ return fmt .Errorf ("could not list of FabricEventStreams: %v" , err )
374
+ }
375
+
376
+ for _ , appId := range appIds {
377
+ // update stream when it exists and EventStreams array differs
378
+ for _ , stream := range streams .Items {
379
+ if appId == stream .Spec .ApplicationId {
380
+ desiredStreams := c .generateFabricEventStream (appId )
381
+ if match , reason := sameStreams (stream .Spec .EventStreams , desiredStreams .Spec .EventStreams ); ! match {
382
+ c .logger .Debugf ("updating event streams: %s" , reason )
383
+ desiredStreams .ObjectMeta .ResourceVersion = stream .ObjectMeta .ResourceVersion
384
+ err = c .updateStreams (desiredStreams )
385
+ if err != nil {
386
+ return fmt .Errorf ("failed updating event stream %s: %v" , stream .Name , err )
387
+ }
388
+ c .logger .Infof ("event stream %q has been successfully updated" , stream .Name )
380
389
}
381
- c . logger . Infof ( "event stream %q has been successfully updated" , fesName )
390
+ continue
382
391
}
383
392
}
393
+ c .logger .Infof ("event streams with applicationId %s do not exist, create it" , appId )
394
+ streamCRD , err := c .createStreams (appId )
395
+ if err != nil {
396
+ return fmt .Errorf ("failed creating event streams with applicationId %s: %v" , appId , err )
397
+ }
398
+ c .logger .Infof ("event streams %q have been successfully created" , streamCRD .Name )
384
399
}
385
400
386
401
return nil
0 commit comments