Skip to content

Commit 74ba5d7

Browse files
committed
fix: set partialmetadata gvk in list/watch funcs to avoid data race in cache
Signed-off-by: Joe Lanford <[email protected]>
1 parent 1730628 commit 74ba5d7

File tree

3 files changed

+90
-95
lines changed

3 files changed

+90
-95
lines changed

pkg/builder/controller_test.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ var _ = Describe("application", func() {
307307

308308
ctx, cancel := context.WithCancel(context.Background())
309309
defer cancel()
310-
doReconcileTest(ctx, "3", bldr, m, false)
310+
doReconcileTest(ctx, "3", m, false, bldr)
311311
}, 10)
312312

313313
It("should Reconcile Watches objects", func() {
@@ -322,7 +322,7 @@ var _ = Describe("application", func() {
322322

323323
ctx, cancel := context.WithCancel(context.Background())
324324
defer cancel()
325-
doReconcileTest(ctx, "4", bldr, m, true)
325+
doReconcileTest(ctx, "4", m, true, bldr)
326326
}, 10)
327327
})
328328

@@ -378,7 +378,7 @@ var _ = Describe("application", func() {
378378

379379
ctx, cancel := context.WithCancel(context.Background())
380380
defer cancel()
381-
doReconcileTest(ctx, "5", bldr, m, true)
381+
doReconcileTest(ctx, "5", m, true, bldr)
382382

383383
Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once")
384384
Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once")
@@ -396,6 +396,16 @@ var _ = Describe("application", func() {
396396
Expect(err).NotTo(HaveOccurred())
397397
})
398398

399+
It("should support multiple controllers watching the same metadata kind", func() {
400+
bldr1 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata)
401+
bldr2 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata)
402+
403+
ctx, cancel := context.WithCancel(context.Background())
404+
defer cancel()
405+
406+
doReconcileTest(ctx, "6", mgr, true, bldr1, bldr2)
407+
})
408+
399409
It("should support watching For, Owns, and Watch as metadata", func() {
400410
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)
401411

@@ -421,7 +431,7 @@ var _ = Describe("application", func() {
421431

422432
ctx, cancel := context.WithCancel(context.Background())
423433
defer cancel()
424-
doReconcileTest(ctx, "8", bldr, mgr, true)
434+
doReconcileTest(ctx, "8", mgr, true, bldr)
425435

426436
By("Creating a new stateful set")
427437
set := &appsv1.StatefulSet{
@@ -496,7 +506,7 @@ func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.G
496506

497507
// TODO(directxman12): this function has too many arguments, and the whole
498508
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time.
499-
func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) {
509+
func doReconcileTest(ctx context.Context, nameSuffix string, mgr manager.Manager, complete bool, blders ...*Builder) {
500510
deployName := "deploy-name-" + nameSuffix
501511
rsName := "rs-name-" + nameSuffix
502512

@@ -512,15 +522,17 @@ func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr
512522
return reconcile.Result{}, nil
513523
})
514524

515-
if complete {
516-
err := blder.Complete(fn)
517-
Expect(err).NotTo(HaveOccurred())
518-
} else {
519-
var err error
520-
var c controller.Controller
521-
c, err = blder.Build(fn)
522-
Expect(err).NotTo(HaveOccurred())
523-
Expect(c).NotTo(BeNil())
525+
for _, blder := range blders {
526+
if complete {
527+
err := blder.Complete(fn)
528+
Expect(err).NotTo(HaveOccurred())
529+
} else {
530+
var err error
531+
var c controller.Controller
532+
c, err = blder.Build(fn)
533+
Expect(err).NotTo(HaveOccurred())
534+
Expect(c).NotTo(BeNil())
535+
}
524536
}
525537

526538
By("Starting the application")

pkg/cache/internal/informers_map.go

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/client-go/metadata"
3535
"k8s.io/client-go/rest"
3636
"k8s.io/client-go/tools/cache"
37+
3738
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3839
)
3940

@@ -231,12 +232,6 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
231232
return nil, false, err
232233
}
233234

234-
switch obj.(type) {
235-
case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList:
236-
ni = metadataSharedIndexInformerPreserveGVK(gvk, ni)
237-
default:
238-
}
239-
240235
i := &MapEntry{
241236
Informer: ni,
242237
Reader: CacheReader{
@@ -372,26 +367,85 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
372367
return &cache.ListWatch{
373368
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
374369
ip.selectors[gvk].ApplyToList(&opts)
370+
371+
var (
372+
list *metav1.PartialObjectMetadataList
373+
err error
374+
)
375375
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
376376
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
377-
return client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
377+
list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
378+
} else {
379+
list, err = client.Resource(mapping.Resource).List(ctx, opts)
380+
}
381+
if list != nil {
382+
for i := range list.Items {
383+
list.Items[i].SetGroupVersionKind(gvk)
384+
}
378385
}
379-
return client.Resource(mapping.Resource).List(ctx, opts)
386+
return list, err
380387
},
381388
// Setup the watch function
382389
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
383390
ip.selectors[gvk].ApplyToList(&opts)
384391
// Watch needs to be set to true separately
385392
opts.Watch = true
393+
394+
var (
395+
watcher watch.Interface
396+
err error
397+
)
386398
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
387399
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
388-
return client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
400+
watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
401+
} else {
402+
watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts)
403+
}
404+
if watcher != nil {
405+
watcher = newGVKFixupWatcher(gvk, watcher)
389406
}
390-
return client.Resource(mapping.Resource).Watch(ctx, opts)
407+
return watcher, err
391408
},
392409
}, nil
393410
}
394411

412+
type gvkFixupWatcher struct {
413+
watcher watch.Interface
414+
ch chan watch.Event
415+
gvk schema.GroupVersionKind
416+
wg sync.WaitGroup
417+
}
418+
419+
func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface {
420+
ch := make(chan watch.Event)
421+
w := &gvkFixupWatcher{
422+
gvk: gvk,
423+
watcher: watcher,
424+
ch: ch,
425+
}
426+
w.wg.Add(1)
427+
go w.run()
428+
return w
429+
}
430+
431+
func (w *gvkFixupWatcher) run() {
432+
for e := range w.watcher.ResultChan() {
433+
e.Object.GetObjectKind().SetGroupVersionKind(w.gvk)
434+
w.ch <- e
435+
}
436+
w.wg.Done()
437+
}
438+
439+
func (w *gvkFixupWatcher) Stop() {
440+
w.watcher.Stop()
441+
w.wg.Wait()
442+
close(w.ch)
443+
}
444+
445+
func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event {
446+
return w.ch
447+
}
448+
395449
// resyncPeriod returns a function which generates a duration each time it is
396450
// invoked; this is so that multiple controllers don't get into lock-step and all
397451
// hammer the apiserver with list requests simultaneously.

pkg/cache/internal/metadata_infomer_wrapper.go

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)