Skip to content

Commit 2333d53

Browse files
authored
Fix deletion of event streams resources (zalando#1831)
* fix deletion of event streams * create cluster field to store stream application ids
1 parent 60e0685 commit 2333d53

File tree

3 files changed

+25
-6
lines changed

3 files changed

+25
-6
lines changed

pkg/cluster/cluster.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ type Cluster struct {
9191
currentProcess Process
9292
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
9393
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
94+
streamApplications []string
9495
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
9596
EBSVolumes map[string]volumes.VolumeProperties
9697
VolumeResizer volumes.VolumeResizer

pkg/cluster/streams.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,17 @@ func (c *Cluster) deleteStreams() error {
4545
return nil
4646
}
4747

48-
err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{})
49-
if err != nil {
50-
return fmt.Errorf("could not delete event stream custom resource: %v", err)
48+
errors := make([]string, 0)
49+
for _, appId := range c.streamApplications {
50+
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
51+
err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), fesName, metav1.DeleteOptions{})
52+
if err != nil {
53+
errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", fesName, err))
54+
}
55+
}
56+
57+
if len(errors) > 0 {
58+
return fmt.Errorf("could not delete all event stream custom resources: %v", strings.Join(errors, `', '`))
5159
}
5260

5361
return nil
@@ -265,6 +273,11 @@ func (c *Cluster) syncStreams() error {
265273
return nil
266274
}
267275

276+
// fetch different application IDs from streams section
277+
// there will be a separate event stream resource for each ID
278+
appIds := gatherApplicationIds(c.Spec.Streams)
279+
c.streamApplications = appIds
280+
268281
slots := make(map[string]map[string]string)
269282
publications := make(map[string]map[string]acidv1.StreamTable)
270283

@@ -329,9 +342,7 @@ func (c *Cluster) syncStreams() error {
329342
}
330343

331344
func (c *Cluster) createOrUpdateStreams() error {
332-
333-
appIds := gatherApplicationIds(c.Spec.Streams)
334-
for _, appId := range appIds {
345+
for _, appId := range c.streamApplications {
335346
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
336347
effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
337348
if err != nil {

pkg/cluster/streams_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ func TestGenerateFabricEventStream(t *testing.T) {
196196
_, err := cluster.createStatefulSet()
197197
assert.NoError(t, err)
198198

199+
// createOrUpdateStreams will loop over existing apps
200+
cluster.streamApplications = []string{appId}
201+
199202
// create the streams
200203
err = cluster.createOrUpdateStreams()
201204
assert.NoError(t, err)
@@ -327,6 +330,10 @@ func TestUpdateFabricEventStream(t *testing.T) {
327330
_, err := cluster.KubeClient.Postgresqls(namespace).Create(
328331
context.TODO(), &pg, metav1.CreateOptions{})
329332
assert.NoError(t, err)
333+
334+
// createOrUpdateStreams will loop over existing apps
335+
cluster.streamApplications = []string{appId}
336+
330337
err = cluster.createOrUpdateStreams()
331338
assert.NoError(t, err)
332339

0 commit comments

Comments
 (0)