Skip to content

Commit 957be91

Browse files
committed
Fix controller reconcile concurrent
Signed-off-by: pigletfly <[email protected]>
1 parent 2bd15c9 commit 957be91

File tree

9 files changed

+147
-49
lines changed

9 files changed

+147
-49
lines changed

cmd/agent/app/agent.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ import (
1515
"k8s.io/client-go/tools/clientcmd"
1616
"k8s.io/klog/v2"
1717
controllerruntime "sigs.k8s.io/controller-runtime"
18+
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
1819

1920
"github.com/karmada-io/karmada/cmd/agent/app/options"
2021
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
22+
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
2123
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
2224
"github.com/karmada-io/karmada/pkg/controllers/execution"
2325
"github.com/karmada-io/karmada/pkg/controllers/mcs"
@@ -126,6 +128,12 @@ func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *opti
126128
LeaderElectionID: fmt.Sprintf("karmada-agent-%s", opts.ClusterName),
127129
LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace,
128130
LeaderElectionResourceLock: opts.LeaderElection.ResourceLock,
131+
Controller: v1alpha1.ControllerConfigurationSpec{
132+
GroupKindConcurrency: map[string]int{
133+
workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs,
134+
clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs,
135+
},
136+
},
129137
})
130138
if err != nil {
131139
return fmt.Errorf("failed to build controller manager: %w", err)
@@ -165,6 +173,8 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
165173
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
166174
ClusterAPIQPS: opts.ClusterAPIQPS,
167175
ClusterAPIBurst: opts.ClusterAPIBurst,
176+
ConcurrentWorkReconciles: opts.ConcurrentWorkSyncs,
177+
ConcurrentServiceExportReconciles: opts.ConcurrentServiceExportSyncs,
168178
},
169179
StopChan: stopChan,
170180
}
@@ -222,16 +232,16 @@ func startExecutionController(ctx controllerscontext.Context) (bool, error) {
222232

223233
func startWorkStatusController(ctx controllerscontext.Context) (bool, error) {
224234
workStatusController := &status.WorkStatusController{
225-
Client: ctx.Mgr.GetClient(),
226-
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
227-
RESTMapper: ctx.Mgr.GetRESTMapper(),
228-
InformerManager: informermanager.GetInstance(),
229-
StopChan: ctx.StopChan,
230-
WorkerNumber: 1,
231-
ObjectWatcher: ctx.ObjectWatcher,
232-
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
233-
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
234-
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
235+
Client: ctx.Mgr.GetClient(),
236+
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
237+
RESTMapper: ctx.Mgr.GetRESTMapper(),
238+
InformerManager: informermanager.GetInstance(),
239+
StopChan: ctx.StopChan,
240+
ObjectWatcher: ctx.ObjectWatcher,
241+
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
242+
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
243+
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
244+
ConcurrentWorkReconciles: ctx.Opts.ConcurrentWorkReconciles,
235245
}
236246
workStatusController.RunWorkQueue()
237247
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
@@ -247,7 +257,7 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error)
247257
RESTMapper: ctx.Mgr.GetRESTMapper(),
248258
InformerManager: informermanager.GetInstance(),
249259
StopChan: ctx.StopChan,
250-
WorkerNumber: 1,
260+
WorkerNumber: ctx.Opts.ConcurrentServiceExportReconciles,
251261
PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName),
252262
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
253263
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,

cmd/agent/app/options/options.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,27 @@ type Options struct {
5555
ClusterAPIEndpoint string
5656
// ProxyServerAddress holds the proxy server address that is used to proxy to the cluster.
5757
ProxyServerAddress string
58+
// concurrentClusterSyncs is the number of cluster objects that are
59+
// allowed to sync concurrently.
60+
ConcurrentClusterSyncs int
61+
// concurrentClusterAPISyncs is the number of clusterapi controller workers.
62+
ConcurrentClusterAPISyncs int
63+
// ConcurrentClusterResourceBindingSyncs is the number of clusterresourcebinding objects that are
64+
// allowed to sync concurrently.
65+
ConcurrentClusterResourceBindingSyncs int
66+
// ConcurrentWorkSyncs is the number of work objects that are
67+
// allowed to sync concurrently.
68+
ConcurrentWorkSyncs int
69+
// ConcurrentResourceBindingSyncs is the number of resourcebinding objects that are
70+
// allowed to sync concurrently.
71+
ConcurrentResourceBindingSyncs int
72+
// ConcurrentNamespaceSyncs is the number of name objects that are
73+
// allowed to sync concurrently.
74+
ConcurrentNamespaceSyncs int
75+
// ConcurrentDetectorSyncs is the number of detector workers.
76+
ConcurrentDetectorSyncs int
77+
// ConcurrentServiceExportSyncs is the number of resource workers.
78+
ConcurrentServiceExportSyncs int
5879
}
5980

6081
// NewOptions builds an default scheduler options.
@@ -97,4 +118,12 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) {
97118
fs.StringVar(&o.ClusterAPIEndpoint, "cluster-api-endpoint", o.ClusterAPIEndpoint, "APIEndpoint of the cluster.")
98119
fs.StringVar(&o.ProxyServerAddress, "proxy-server-address", o.ProxyServerAddress, "Address of the proxy server that is used to proxy to the cluster.")
99120
fs.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.")
121+
fs.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)")
122+
fs.IntVar(&o.ConcurrentClusterAPISyncs, "concurrent-clusterapi-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)")
123+
fs.IntVar(&o.ConcurrentClusterResourceBindingSyncs, "concurrent-clusterresourcebinding-syncs", 5, "The number of clusterresourcebinding objects that are allowed to sync concurrently. (default 5)")
124+
fs.IntVar(&o.ConcurrentResourceBindingSyncs, "concurrent-resourcebinding-syncs", 5, "The number of resourcebinding objects that are allowed to sync concurrently. (default 5)")
125+
fs.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of work objects that are allowed to sync concurrently. (default 5)")
126+
fs.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 5, "The number of namespace objects that are allowed to sync concurrently. (default 5)")
127+
fs.IntVar(&o.ConcurrentDetectorSyncs, "concurrent-resource-template-syncs", 5, "The number of resource template workers that are allowed to sync concurrently. (default 5)")
128+
fs.IntVar(&o.ConcurrentServiceExportSyncs, "concurrent-serviceexport-syncs", 5, "The number of serviceexport workers that are allowed to sync concurrently. (default 5)")
100129
}

cmd/controller-manager/app/controllermanager.go

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,22 @@ import (
88
"strconv"
99

1010
"github.com/spf13/cobra"
11+
"k8s.io/apimachinery/pkg/runtime/schema"
1112
"k8s.io/client-go/discovery"
1213
"k8s.io/client-go/dynamic"
1314
kubeclientset "k8s.io/client-go/kubernetes"
1415
"k8s.io/client-go/tools/clientcmd"
1516
"k8s.io/klog/v2"
17+
1618
controllerruntime "sigs.k8s.io/controller-runtime"
19+
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
1720
"sigs.k8s.io/controller-runtime/pkg/event"
1821
"sigs.k8s.io/controller-runtime/pkg/healthz"
1922
"sigs.k8s.io/controller-runtime/pkg/predicate"
2023

2124
"github.com/karmada-io/karmada/cmd/controller-manager/app/options"
2225
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
26+
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
2327
"github.com/karmada-io/karmada/pkg/clusterdiscovery/clusterapi"
2428
"github.com/karmada-io/karmada/pkg/controllers/binding"
2529
"github.com/karmada-io/karmada/pkg/controllers/cluster"
@@ -88,6 +92,15 @@ func Run(ctx context.Context, opts *options.Options) error {
8892
HealthProbeBindAddress: net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)),
8993
LivenessEndpointName: "/healthz",
9094
MetricsBindAddress: opts.MetricsBindAddress,
95+
Controller: v1alpha1.ControllerConfigurationSpec{
96+
GroupKindConcurrency: map[string]int{
97+
workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs,
98+
workv1alpha1.SchemeGroupVersion.WithKind("ResourceBinding").GroupKind().String(): opts.ConcurrentResourceBindingSyncs,
99+
workv1alpha1.SchemeGroupVersion.WithKind("ClusterResourceBinding").GroupKind().String(): opts.ConcurrentClusterResourceBindingSyncs,
100+
clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs,
101+
schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}.GroupKind().String(): opts.ConcurrentNamespaceSyncs,
102+
},
103+
},
91104
})
92105
if err != nil {
93106
klog.Errorf("failed to build controller manager: %v", err)
@@ -247,19 +260,20 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err
247260
func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
248261
opts := ctx.Opts
249262
workStatusController := &status.WorkStatusController{
250-
Client: ctx.Mgr.GetClient(),
251-
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
252-
RESTMapper: ctx.Mgr.GetRESTMapper(),
253-
InformerManager: informermanager.GetInstance(),
254-
StopChan: ctx.StopChan,
255-
WorkerNumber: 1,
256-
ObjectWatcher: ctx.ObjectWatcher,
257-
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
258-
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
259-
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
263+
Client: ctx.Mgr.GetClient(),
264+
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
265+
RESTMapper: ctx.Mgr.GetRESTMapper(),
266+
InformerManager: informermanager.GetInstance(),
267+
StopChan: ctx.StopChan,
268+
ObjectWatcher: ctx.ObjectWatcher,
269+
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
270+
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
271+
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
272+
ConcurrentWorkReconciles: opts.ConcurrentWorkReconciles,
260273
}
261274
workStatusController.RunWorkQueue()
262275
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
276+
klog.Fatalf("Failed to setup work status controller: %v", err)
263277
return false, err
264278
}
265279
return true, nil
@@ -289,7 +303,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
289303
RESTMapper: ctx.Mgr.GetRESTMapper(),
290304
InformerManager: informermanager.GetInstance(),
291305
StopChan: ctx.StopChan,
292-
WorkerNumber: 1,
306+
WorkerNumber: ctx.Opts.ConcurrentServiceExportReconciles,
293307
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
294308
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
295309
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
@@ -364,15 +378,17 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
364378
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
365379

366380
resourceDetector := &detector.ResourceDetector{
367-
DiscoveryClientSet: discoverClientSet,
368-
Client: mgr.GetClient(),
369-
InformerManager: controlPlaneInformerManager,
370-
RESTMapper: mgr.GetRESTMapper(),
371-
DynamicClient: dynamicClientSet,
372-
SkippedResourceConfig: skippedResourceConfig,
373-
SkippedPropagatingNamespaces: skippedPropagatingNamespaces,
374-
ResourceInterpreter: resourceInterpreter,
375-
EventRecorder: mgr.GetEventRecorderFor("resource-detector"),
381+
DiscoveryClientSet: discoverClientSet,
382+
Client: mgr.GetClient(),
383+
InformerManager: controlPlaneInformerManager,
384+
RESTMapper: mgr.GetRESTMapper(),
385+
DynamicClient: dynamicClientSet,
386+
SkippedResourceConfig: skippedResourceConfig,
387+
SkippedPropagatingNamespaces: skippedPropagatingNamespaces,
388+
ResourceInterpreter: resourceInterpreter,
389+
EventRecorder: mgr.GetEventRecorderFor("resource-detector"),
390+
ConcurrentResourceTemplateSyncs: opts.ConcurrentDetectorSyncs,
391+
ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs,
376392
}
377393
if err := mgr.Add(resourceDetector); err != nil {
378394
klog.Fatalf("Failed to setup resource detector: %v", err)
@@ -394,6 +410,8 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
394410
ClusterAPIQPS: opts.ClusterAPIQPS,
395411
ClusterAPIBurst: opts.ClusterAPIBurst,
396412
SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces,
413+
ConcurrentWorkReconciles: opts.ConcurrentWorkSyncs,
414+
ConcurrentServiceExportReconciles: opts.ConcurrentServiceExportSyncs,
397415
},
398416
StopChan: stopChan,
399417
DynamicClientSet: dynamicClientSet,
@@ -438,6 +456,7 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options
438456
ClusterAPIConfig: clusterAPIRestConfig,
439457
ClusterAPIClient: clusterAPIClient,
440458
InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0, stopChan),
459+
ConcurrentReconciles: opts.ConcurrentClusterAPISyncs,
441460
}
442461
if err := mgr.Add(clusterAPIClusterDetector); err != nil {
443462
klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err)

cmd/controller-manager/app/options/options.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,27 @@ type Options struct {
7979
// It can be set to "0" to disable the metrics serving.
8080
// Defaults to ":8080".
8181
MetricsBindAddress string
82+
// concurrentClusterSyncs is the number of cluster objects that are
83+
// allowed to sync concurrently.
84+
ConcurrentClusterSyncs int
85+
// concurrentClusterAPISyncs is the number of clusterapi controller workers.
86+
ConcurrentClusterAPISyncs int
87+
// ConcurrentClusterResourceBindingSyncs is the number of clusterresourcebinding objects that are
88+
// allowed to sync concurrently.
89+
ConcurrentClusterResourceBindingSyncs int
90+
// ConcurrentWorkSyncs is the number of work objects that are
91+
// allowed to sync concurrently.
92+
ConcurrentWorkSyncs int
93+
// ConcurrentResourceBindingSyncs is the number of resourcebinding objects that are
94+
// allowed to sync concurrently.
95+
ConcurrentResourceBindingSyncs int
96+
// ConcurrentNamespaceSyncs is the number of name objects that are
97+
// allowed to sync concurrently.
98+
ConcurrentNamespaceSyncs int
99+
// ConcurrentDetectorSyncs is the number of detector workers.
100+
ConcurrentDetectorSyncs int
101+
// ConcurrentServiceExportSyncs is the number of resource workers.
102+
ConcurrentServiceExportSyncs int
82103
}
83104

84105
// NewOptions builds an empty options.
@@ -134,4 +155,12 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string) {
134155
flags.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.")
135156
flags.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.")
136157
flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8088, :8088)")
158+
flags.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)")
159+
flags.IntVar(&o.ConcurrentClusterAPISyncs, "concurrent-clusterapi-syncs", 5, "The number of cluster objects that are allowed to sync concurrently. (default 5)")
160+
flags.IntVar(&o.ConcurrentClusterResourceBindingSyncs, "concurrent-clusterresourcebinding-syncs", 5, "The number of clusterresourcebinding objects that are allowed to sync concurrently. (default 5)")
161+
flags.IntVar(&o.ConcurrentResourceBindingSyncs, "concurrent-resourcebinding-syncs", 5, "The number of resourcebinding objects that are allowed to sync concurrently. (default 5)")
162+
flags.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of work objects that are allowed to sync concurrently. (default 5)")
163+
flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 5, "The number of namespace objects that are allowed to sync concurrently. (default 5)")
164+
flags.IntVar(&o.ConcurrentDetectorSyncs, "concurrent-resource-template-syncs", 5, "The number of resource template workers that are allowed to sync concurrently. (default 5)")
165+
flags.IntVar(&o.ConcurrentServiceExportSyncs, "concurrent-serviceexport-syncs", 5, "The number of serviceexport workers that are allowed to sync concurrently. (default 5)")
137166
}

pkg/clusterdiscovery/clusterapi/clusterapi.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type ClusterDetector struct {
4747
InformerManager informermanager.SingleClusterInformerManager
4848
EventHandler cache.ResourceEventHandler
4949
Processor util.AsyncWorker
50+
ConcurrentReconciles int
5051

5152
stopCh <-chan struct{}
5253
}
@@ -58,7 +59,7 @@ func (d *ClusterDetector) Start(ctx context.Context) error {
5859

5960
d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
6061
d.Processor = util.NewAsyncWorker("cluster-api cluster detector", ClusterWideKeyFunc, d.Reconcile)
61-
d.Processor.Run(1, d.stopCh)
62+
d.Processor.Run(d.ConcurrentReconciles, d.stopCh)
6263
d.discoveryCluster()
6364

6465
<-d.stopCh

pkg/controllers/context/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ type Options struct {
4747
SkippedPropagatingNamespaces []string
4848
// ClusterName is the name of cluster.
4949
ClusterName string
50+
// ConcurrentWorkReconciles is the number of workstatus workers.
51+
ConcurrentWorkReconciles int
52+
// ConcurrentServiceExportReconciles is the number of resource workers.
53+
ConcurrentServiceExportReconciles int
5054
}
5155

5256
// Context defines the context object for controller.

0 commit comments

Comments
 (0)