@@ -15,15 +15,16 @@ import (
1515 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1616)
1717
18- func (c * Cluster ) createStreams (appId string ) error {
18+ func (c * Cluster ) createStreams (appId string ) ( * zalandov1. FabricEventStream , error ) {
1919 c .setProcessName ("creating streams" )
2020
2121 fes := c .generateFabricEventStream (appId )
22- if _ , err := c .KubeClient .FabricEventStreams (c .Namespace ).Create (context .TODO (), fes , metav1.CreateOptions {}); err != nil {
23- return err
22+ streamCRD , err := c .KubeClient .FabricEventStreams (c .Namespace ).Create (context .TODO (), fes , metav1.CreateOptions {})
23+ if err != nil {
24+ return nil , err
2425 }
2526
26- return nil
27+ return streamCRD , nil
2728}
2829
2930func (c * Cluster ) updateStreams (newEventStreams * zalandov1.FabricEventStream ) error {
@@ -46,11 +47,17 @@ func (c *Cluster) deleteStreams() error {
4647 }
4748
4849 errors := make ([]string , 0 )
49- for _ , appId := range c .streamApplications {
50- fesName := fmt .Sprintf ("%s-%s" , c .Name , appId )
51- err = c .KubeClient .FabricEventStreams (c .Namespace ).Delete (context .TODO (), fesName , metav1.DeleteOptions {})
50+ listOptions := metav1.ListOptions {
51+ LabelSelector : c .labelsSet (true ).String (),
52+ }
53+ streams , err := c .KubeClient .FabricEventStreams (c .Namespace ).List (context .TODO (), listOptions )
54+ if err != nil {
55+ return fmt .Errorf ("could not list of FabricEventStreams: %v" , err )
56+ }
57+ for _ , stream := range streams .Items {
58+ err = c .KubeClient .FabricEventStreams (stream .Namespace ).Delete (context .TODO (), stream .Name , metav1.DeleteOptions {})
5259 if err != nil {
53- errors = append (errors , fmt .Sprintf ("could not delete event stream %q: %v" , fesName , err ))
60+ errors = append (errors , fmt .Sprintf ("could not delete event stream %q: %v" , stream . Name , err ))
5461 }
5562 }
5663
@@ -184,8 +191,10 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
184191 Kind : constants .EventStreamCRDKind ,
185192 },
186193 ObjectMeta : metav1.ObjectMeta {
187- Name : fmt .Sprintf ("%s-%s" , c .Name , appId ),
194+ // max length for cluster name is 58 so we can only add 5 more characters / numbers
195+ Name : fmt .Sprintf ("%s-%s" , c .Name , util .RandomPassword (5 )),
188196 Namespace : c .Namespace ,
197+ Labels : c .labelsSet (true ),
189198 Annotations : c .AnnotationsToPropagate (c .annotationsSet (nil )),
190199 // make cluster StatefulSet the owner (like with connection pooler objects)
191200 OwnerReferences : c .ownerReferences (),
@@ -284,11 +293,6 @@ func (c *Cluster) syncStreams() error {
284293 return nil
285294 }
286295
287- // fetch different application IDs from streams section
288- // there will be a separate event stream resource for each ID
289- appIds := gatherApplicationIds (c .Spec .Streams )
290- c .streamApplications = appIds
291-
292296 slots := make (map [string ]map [string ]string )
293297 slotsToSync := make (map [string ]map [string ]string )
294298 publications := make (map [string ]map [string ]acidv1.StreamTable )
@@ -355,32 +359,43 @@ func (c *Cluster) syncStreams() error {
355359}
356360
357361func (c * Cluster ) createOrUpdateStreams () error {
358- for _ , appId := range c .streamApplications {
359- fesName := fmt .Sprintf ("%s-%s" , c .Name , appId )
360- effectiveStreams , err := c .KubeClient .FabricEventStreams (c .Namespace ).Get (context .TODO (), fesName , metav1.GetOptions {})
361- if err != nil {
362- if ! k8sutil .ResourceNotFound (err ) {
363- return fmt .Errorf ("failed reading event stream %s: %v" , fesName , err )
364- }
365362
366- c .logger .Infof ("event streams do not exist, create it" )
367- err = c .createStreams (appId )
368- if err != nil {
369- return fmt .Errorf ("failed creating event stream %s: %v" , fesName , err )
370- }
371- c .logger .Infof ("event stream %q has been successfully created" , fesName )
372- } else {
373- desiredStreams := c .generateFabricEventStream (appId )
374- if match , reason := sameStreams (effectiveStreams .Spec .EventStreams , desiredStreams .Spec .EventStreams ); ! match {
375- c .logger .Debugf ("updating event streams: %s" , reason )
376- desiredStreams .ObjectMeta .ResourceVersion = effectiveStreams .ObjectMeta .ResourceVersion
377- err = c .updateStreams (desiredStreams )
378- if err != nil {
379- return fmt .Errorf ("failed updating event stream %s: %v" , fesName , err )
363+ // fetch different application IDs from streams section
364+ // there will be a separate event stream resource for each ID
365+ appIds := gatherApplicationIds (c .Spec .Streams )
366+
367+ // list all existing stream CRDs
368+ listOptions := metav1.ListOptions {
369+ LabelSelector : c .labelsSet (true ).String (),
370+ }
371+ streams , err := c .KubeClient .FabricEventStreams (c .Namespace ).List (context .TODO (), listOptions )
372+ if err != nil {
373+ return fmt .Errorf ("could not list of FabricEventStreams: %v" , err )
374+ }
375+
376+ for _ , appId := range appIds {
377+ // update stream when it exists and EventStreams array differs
378+ for _ , stream := range streams .Items {
379+ if appId == stream .Spec .ApplicationId {
380+ desiredStreams := c .generateFabricEventStream (appId )
381+ if match , reason := sameStreams (stream .Spec .EventStreams , desiredStreams .Spec .EventStreams ); ! match {
382+ c .logger .Debugf ("updating event streams: %s" , reason )
383+ desiredStreams .ObjectMeta .ResourceVersion = stream .ObjectMeta .ResourceVersion
384+ err = c .updateStreams (desiredStreams )
385+ if err != nil {
386+ return fmt .Errorf ("failed updating event stream %s: %v" , stream .Name , err )
387+ }
388+ c .logger .Infof ("event stream %q has been successfully updated" , stream .Name )
380389 }
381- c . logger . Infof ( "event stream %q has been successfully updated" , fesName )
390+ continue
382391 }
383392 }
393+ c .logger .Infof ("event streams with applicationId %s do not exist, create it" , appId )
394+ streamCRD , err := c .createStreams (appId )
395+ if err != nil {
396+ return fmt .Errorf ("failed creating event streams with applicationId %s: %v" , appId , err )
397+ }
398+ c .logger .Infof ("event streams %q have been successfully created" , streamCRD .Name )
384399 }
385400
386401 return nil
0 commit comments