Skip to content

Commit 28e02ec

Browse files
authored
Fixed watcher getting into error loop after timeouts (oxia-db#306)
The watcher client gets automatically closed after 30min. It needs to be reopened after that.
1 parent 5055a62 commit 28e02ec

File tree

2 files changed

+51
-21
lines changed

2 files changed

+51
-21
lines changed

controller/watcher.go

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,73 @@ package controller
1616

1717
import (
1818
"context"
19+
"github.com/rs/zerolog/log"
1920
"io"
2021
"k8s.io/apimachinery/pkg/apis/meta/v1"
21-
"k8s.io/apimachinery/pkg/watch"
22+
"oxia/common"
2223
oxia "oxia/pkg/generated/clientset/versioned"
24+
"time"
2325
)
2426

2527
type Watcher io.Closer
2628

2729
type watcher struct {
28-
watch watch.Interface
29-
closeCh chan bool
30+
ctx context.Context
31+
cancel context.CancelFunc
32+
33+
initWg common.WaitGroup
3034
}
3135

3236
func (w *watcher) Close() error {
33-
w.watch.Stop()
34-
close(w.closeCh)
37+
w.cancel()
3538
return nil
3639
}
3740

3841
func newWatcher(client oxia.Interface, reconciler Reconciler) (Watcher, error) {
39-
ctx := context.Background()
40-
_watch, err := client.OxiaV1alpha1().OxiaClusters("").Watch(ctx, v1.ListOptions{})
41-
if err != nil {
42-
return nil, err
43-
}
4442
w := &watcher{
45-
watch: _watch,
46-
closeCh: make(chan bool),
43+
initWg: common.NewWaitGroup(1),
44+
}
45+
46+
w.ctx, w.cancel = context.WithCancel(context.Background())
47+
go w.runWithRetries(client, reconciler)
48+
49+
// Wait until fully initialized
50+
if err := w.initWg.Wait(w.ctx); err != nil {
51+
return nil, err
4752
}
48-
go w.run(reconciler)
4953
return w, nil
5054
}
5155

52-
func (w *watcher) run(reconciler Reconciler) {
56+
func (w *watcher) runWithRetries(client oxia.Interface, reconciler Reconciler) {
57+
for w.ctx.Err() == nil {
58+
w.run(client, reconciler)
59+
time.Sleep(5 * time.Second)
60+
}
61+
}
62+
63+
func (w *watcher) run(client oxia.Interface, reconciler Reconciler) {
64+
watch, err := client.OxiaV1alpha1().
65+
OxiaClusters("").
66+
Watch(w.ctx, v1.ListOptions{})
67+
if err != nil {
68+
log.Warn().Err(err).Msg("Failed to create watcher")
69+
return
70+
}
71+
72+
w.initWg.Done()
73+
74+
defer watch.Stop()
75+
5376
for {
5477
select {
55-
case event := <-w.watch.ResultChan():
78+
case event, ok := <-watch.ResultChan():
79+
if !ok {
80+
// Watcher was already close, reopen it
81+
return
82+
}
5683
reconciler.Reconcile(event)
57-
case <-w.closeCh:
58-
break
84+
case <-w.ctx.Done():
85+
return
5986
}
6087
}
6188
}

controller/watcher_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,16 @@ func TestWatcher(t *testing.T) {
4848
assert.NoError(t, err)
4949

5050
assert.Eventually(t, func() bool {
51-
get, err := _kubernetes.AppsV1().StatefulSets(namespace).
51+
_, err := _kubernetes.AppsV1().StatefulSets(namespace).
5252
Get(context.Background(), name, v1.GetOptions{})
53-
assert.NoError(t, err)
54-
assert.Equal(t, int32(3), *get.Spec.Replicas)
55-
return true
53+
return err == nil
5654
}, 10*time.Second, 100*time.Millisecond)
5755

56+
get, err := _kubernetes.AppsV1().StatefulSets(namespace).
57+
Get(context.Background(), name, v1.GetOptions{})
58+
assert.NoError(t, err)
59+
assert.Equal(t, int32(3), *get.Spec.Replicas)
60+
5861
err = _oxia.OxiaV1alpha1().OxiaClusters(namespace).
5962
Delete(context.Background(), name, v1.DeleteOptions{})
6063
assert.NoError(t, err)

0 commit comments

Comments
 (0)