Skip to content

Commit e80cccb

Browse files
authored
use random short name for stream CRDs (#2137)
* use random short name for stream CRDs
1 parent 3e148ea commit e80cccb

File tree

4 files changed

+74
-55
lines changed

4 files changed

+74
-55
lines changed

e2e/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ clean:
3232

3333
copy: clean
3434
mkdir manifests
35-
cp ../manifests -r .
35+
cp -r ../manifests .
3636

3737
docker: scm-source.json
3838
docker build -t "$(IMAGE):$(TAG)" .

pkg/cluster/cluster.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ type Cluster struct {
9191
currentProcess Process
9292
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
9393
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
94-
streamApplications []string
9594
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
9695
EBSVolumes map[string]volumes.VolumeProperties
9796
VolumeResizer volumes.VolumeResizer

pkg/cluster/streams.go

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@ import (
1515
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1616
)
1717

18-
func (c *Cluster) createStreams(appId string) error {
18+
func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, error) {
1919
c.setProcessName("creating streams")
2020

2121
fes := c.generateFabricEventStream(appId)
22-
if _, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}); err != nil {
23-
return err
22+
streamCRD, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{})
23+
if err != nil {
24+
return nil, err
2425
}
2526

26-
return nil
27+
return streamCRD, nil
2728
}
2829

2930
func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error {
@@ -46,11 +47,17 @@ func (c *Cluster) deleteStreams() error {
4647
}
4748

4849
errors := make([]string, 0)
49-
for _, appId := range c.streamApplications {
50-
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
51-
err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), fesName, metav1.DeleteOptions{})
50+
listOptions := metav1.ListOptions{
51+
LabelSelector: c.labelsSet(true).String(),
52+
}
53+
streams, err := c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
54+
if err != nil {
55+
return fmt.Errorf("could not list of FabricEventStreams: %v", err)
56+
}
57+
for _, stream := range streams.Items {
58+
err = c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{})
5259
if err != nil {
53-
errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", fesName, err))
60+
errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", stream.Name, err))
5461
}
5562
}
5663

@@ -184,8 +191,10 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
184191
Kind: constants.EventStreamCRDKind,
185192
},
186193
ObjectMeta: metav1.ObjectMeta{
187-
Name: fmt.Sprintf("%s-%s", c.Name, appId),
194+
// max length for cluster name is 58 so we can only add 5 more characters / numbers
195+
Name: fmt.Sprintf("%s-%s", c.Name, util.RandomPassword(5)),
188196
Namespace: c.Namespace,
197+
Labels: c.labelsSet(true),
189198
Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)),
190199
// make cluster StatefulSet the owner (like with connection pooler objects)
191200
OwnerReferences: c.ownerReferences(),
@@ -284,11 +293,6 @@ func (c *Cluster) syncStreams() error {
284293
return nil
285294
}
286295

287-
// fetch different application IDs from streams section
288-
// there will be a separate event stream resource for each ID
289-
appIds := gatherApplicationIds(c.Spec.Streams)
290-
c.streamApplications = appIds
291-
292296
slots := make(map[string]map[string]string)
293297
slotsToSync := make(map[string]map[string]string)
294298
publications := make(map[string]map[string]acidv1.StreamTable)
@@ -355,32 +359,43 @@ func (c *Cluster) syncStreams() error {
355359
}
356360

357361
func (c *Cluster) createOrUpdateStreams() error {
358-
for _, appId := range c.streamApplications {
359-
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
360-
effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
361-
if err != nil {
362-
if !k8sutil.ResourceNotFound(err) {
363-
return fmt.Errorf("failed reading event stream %s: %v", fesName, err)
364-
}
365362

366-
c.logger.Infof("event streams do not exist, create it")
367-
err = c.createStreams(appId)
368-
if err != nil {
369-
return fmt.Errorf("failed creating event stream %s: %v", fesName, err)
370-
}
371-
c.logger.Infof("event stream %q has been successfully created", fesName)
372-
} else {
373-
desiredStreams := c.generateFabricEventStream(appId)
374-
if match, reason := sameStreams(effectiveStreams.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match {
375-
c.logger.Debugf("updating event streams: %s", reason)
376-
desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion
377-
err = c.updateStreams(desiredStreams)
378-
if err != nil {
379-
return fmt.Errorf("failed updating event stream %s: %v", fesName, err)
363+
// fetch different application IDs from streams section
364+
// there will be a separate event stream resource for each ID
365+
appIds := gatherApplicationIds(c.Spec.Streams)
366+
367+
// list all existing stream CRDs
368+
listOptions := metav1.ListOptions{
369+
LabelSelector: c.labelsSet(true).String(),
370+
}
371+
streams, err := c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
372+
if err != nil {
373+
return fmt.Errorf("could not list of FabricEventStreams: %v", err)
374+
}
375+
376+
for _, appId := range appIds {
377+
// update stream when it exists and EventStreams array differs
378+
for _, stream := range streams.Items {
379+
if appId == stream.Spec.ApplicationId {
380+
desiredStreams := c.generateFabricEventStream(appId)
381+
if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match {
382+
c.logger.Debugf("updating event streams: %s", reason)
383+
desiredStreams.ObjectMeta.ResourceVersion = stream.ObjectMeta.ResourceVersion
384+
err = c.updateStreams(desiredStreams)
385+
if err != nil {
386+
return fmt.Errorf("failed updating event stream %s: %v", stream.Name, err)
387+
}
388+
c.logger.Infof("event stream %q has been successfully updated", stream.Name)
380389
}
381-
c.logger.Infof("event stream %q has been successfully updated", fesName)
390+
continue
382391
}
383392
}
393+
c.logger.Infof("event streams with applicationId %s do not exist, create it", appId)
394+
streamCRD, err := c.createStreams(appId)
395+
if err != nil {
396+
return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err)
397+
}
398+
c.logger.Infof("event streams %q have been successfully created", streamCRD.Name)
384399
}
385400

386401
return nil

pkg/cluster/streams_test.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ var (
4141
appId string = "test-app"
4242
dbName string = "foo"
4343
fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix)
44-
fesName string = fmt.Sprintf("%s-%s", clusterName, appId)
4544
slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1))
4645

4746
pg = acidv1.Postgresql{
@@ -77,6 +76,7 @@ var (
7776
BatchSize: k8sutil.UInt32ToPointer(uint32(100)),
7877
},
7978
},
79+
TeamID: "acid",
8080
Volume: acidv1.Volume{
8181
Size: "1Gi",
8282
},
@@ -89,7 +89,7 @@ var (
8989
Kind: constants.EventStreamCRDKind,
9090
},
9191
ObjectMeta: metav1.ObjectMeta{
92-
Name: fesName,
92+
Name: fmt.Sprintf("%s-12345", clusterName),
9393
Namespace: namespace,
9494
OwnerReferences: []metav1.OwnerReference{
9595
metav1.OwnerReference{
@@ -196,9 +196,6 @@ func TestGenerateFabricEventStream(t *testing.T) {
196196
_, err := cluster.createStatefulSet()
197197
assert.NoError(t, err)
198198

199-
// createOrUpdateStreams will loop over existing apps
200-
cluster.streamApplications = []string{appId}
201-
202199
// create the streams
203200
err = cluster.createOrUpdateStreams()
204201
assert.NoError(t, err)
@@ -209,22 +206,25 @@ func TestGenerateFabricEventStream(t *testing.T) {
209206
t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result)
210207
}
211208

212-
// compare stream resturned from API with expected stream
213-
streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
209+
// compare stream returned from API with expected stream
210+
listOptions := metav1.ListOptions{
211+
LabelSelector: cluster.labelsSet(true).String(),
212+
}
213+
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
214214
assert.NoError(t, err)
215-
if match, _ := sameStreams(streamCRD.Spec.EventStreams, fes.Spec.EventStreams); !match {
216-
t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streamCRD)
215+
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, fes.Spec.EventStreams); !match {
216+
t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streams.Items[0])
217217
}
218218

219219
// sync streams once again
220220
err = cluster.createOrUpdateStreams()
221221
assert.NoError(t, err)
222222

223223
// compare stream resturned from API with generated stream
224-
streamCRD, err = cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
224+
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
225225
assert.NoError(t, err)
226-
if match, _ := sameStreams(streamCRD.Spec.EventStreams, result.Spec.EventStreams); !match {
227-
t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streamCRD)
226+
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match {
227+
t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streams.Items[0])
228228
}
229229
}
230230

@@ -331,8 +331,9 @@ func TestUpdateFabricEventStream(t *testing.T) {
331331
context.TODO(), &pg, metav1.CreateOptions{})
332332
assert.NoError(t, err)
333333

334-
// createOrUpdateStreams will loop over existing apps
335-
cluster.streamApplications = []string{appId}
334+
// create statefulset to have ownerReference for streams
335+
_, err = cluster.createStatefulSet()
336+
assert.NoError(t, err)
336337

337338
err = cluster.createOrUpdateStreams()
338339
assert.NoError(t, err)
@@ -365,11 +366,15 @@ func TestUpdateFabricEventStream(t *testing.T) {
365366
err = cluster.createOrUpdateStreams()
366367
assert.NoError(t, err)
367368

368-
streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
369+
// compare stream returned from API with expected stream
370+
listOptions := metav1.ListOptions{
371+
LabelSelector: cluster.labelsSet(true).String(),
372+
}
373+
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
369374
assert.NoError(t, err)
370375

371376
result := cluster.generateFabricEventStream(appId)
372-
if !reflect.DeepEqual(result, streamCRD) {
373-
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result)
377+
if !reflect.DeepEqual(result.Spec.EventStreams, streams.Items[0].Spec.EventStreams) {
378+
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streams.Items[0], result)
374379
}
375380
}

0 commit comments

Comments
 (0)