@@ -17,6 +17,8 @@ limitations under the License.
1717package detector
1818
1919import (
20+ pq "github.com/emirpasic/gods/queues/priorityqueue"
21+ godsutils "github.com/emirpasic/gods/utils"
2022 corev1 "k8s.io/api/core/v1"
2123 apierrors "k8s.io/apimachinery/pkg/api/errors"
2224 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -33,6 +35,13 @@ import (
3335 "github.com/karmada-io/karmada/pkg/util/names"
3436)
3537
38+ // PriorityKey is the unique propagation policy key with priority.
39+ type PriorityKey struct {
40+ util.QueueKey
41+ // Priority is the priority of the propagation policy.
42+ Priority int32
43+ }
44+
3645// preemptionEnabled checks if preemption is enabled.
3746func preemptionEnabled (preemption policyv1alpha1.PreemptionBehavior ) bool {
3847 if preemption != policyv1alpha1 .PreemptAlways {
@@ -257,10 +266,10 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
257266 return
258267 }
259268
260- // TODO(@RainbowMango): Should sort the listed policies to ensure the
269+ // Use the priority queue to sort the listed policies to ensure the
261270 // higher priority PropagationPolicy be process first to avoid possible
262271 // multiple preemption.
263-
272+ var sortedPotentialKeys * pq. Queue
264273 for i := range policies {
265274 var potentialPolicy policyv1alpha1.PropagationPolicy
266275 if err = helper .ConvertToTypedObject (policies [i ], & potentialPolicy ); err != nil {
@@ -277,35 +286,43 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
277286 potentialPolicy .Spec .Preemption == policyv1alpha1 .PreemptAlways &&
278287 potentialPolicy .ExplicitPriority () > newPolicy .ExplicitPriority () &&
279288 potentialPolicy .ExplicitPriority () < oldPolicy .ExplicitPriority () {
280- var potentialKey util.QueueKey
281- potentialKey , err = ClusterWideKeyFunc (& potentialPolicy )
289+ potentialKey , err := ClusterWideKeyFunc (& potentialPolicy )
282290 if err != nil {
283- return
291+ klog .Errorf ("Failed to convert PropagationPolicy to queued key: %v" , err )
292+ continue
284293 }
294+
285295 klog .Infof ("Enqueuing PropagationPolicy(%s/%s) in case of PropagationPolicy(%s/%s) priority changes" , potentialPolicy .GetNamespace (), potentialPolicy .GetName (), newPolicy .GetNamespace (), newPolicy .GetName ())
286- d .policyReconcileWorker .Add (potentialKey )
296+ if sortedPotentialKeys == nil {
297+ sortedPotentialKeys = pq .NewWith (priorityDescendingComparator )
298+ }
299+
300+ sortedPotentialKeys .Enqueue (& PriorityKey {
301+ QueueKey : potentialKey ,
302+ Priority : potentialPolicy .ExplicitPriority (),
303+ })
287304 }
288305 }
306+ requeuePotentialKeys (sortedPotentialKeys , d .policyReconcileWorker )
289307}
290308
291309// HandleDeprioritizedClusterPropagationPolicy responses to priority change of a ClusterPropagationPolicy,
292310// if the change is from high priority (e.g. 5) to low priority(e.g. 3), it will
293311// check if there is another ClusterPropagationPolicy could preempt the targeted resource,
294312// and put the ClusterPropagationPolicy in the queue to trigger preemption.
295313func (d * ResourceDetector ) HandleDeprioritizedClusterPropagationPolicy (oldPolicy policyv1alpha1.ClusterPropagationPolicy , newPolicy policyv1alpha1.ClusterPropagationPolicy ) {
296- klog .Infof ("ClusterPropagationPolicy(%s/%s) priority changed from %d to %d" ,
297- newPolicy .GetNamespace (), newPolicy .GetName (), * oldPolicy .Spec .Priority , * newPolicy .Spec .Priority )
298-
299- policies , err := d .clusterPropagationPolicyLister .ByNamespace (newPolicy .GetNamespace ()).List (labels .Everything ())
314+ klog .Infof ("ClusterPropagationPolicy(%s) priority changed from %d to %d" ,
315+ newPolicy .GetName (), * oldPolicy .Spec .Priority , * newPolicy .Spec .Priority )
316+ policies , err := d .clusterPropagationPolicyLister .List (labels .Everything ())
300317 if err != nil {
301- klog .Errorf ("Failed to list ClusterPropagationPolicy from namespace: %s , error: %v" , newPolicy . GetNamespace () , err )
318+ klog .Errorf ("Failed to list ClusterPropagationPolicy, error: %v" , err )
302319 return
303320 }
304321
305- // TODO(@RainbowMango): Should sort the listed policies to ensure the
322+ // Use the priority queue to sort the listed policies to ensure the
306323 // higher priority ClusterPropagationPolicy be process first to avoid possible
307324 // multiple preemption.
308-
325+ var sortedPotentialKeys * pq. Queue
309326 for i := range policies {
310327 var potentialPolicy policyv1alpha1.ClusterPropagationPolicy
311328 if err = helper .ConvertToTypedObject (policies [i ], & potentialPolicy ); err != nil {
@@ -322,14 +339,47 @@ func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy
322339 potentialPolicy .Spec .Preemption == policyv1alpha1 .PreemptAlways &&
323340 potentialPolicy .ExplicitPriority () > newPolicy .ExplicitPriority () &&
324341 potentialPolicy .ExplicitPriority () < oldPolicy .ExplicitPriority () {
325- var potentialKey util.QueueKey
326- potentialKey , err = ClusterWideKeyFunc (& potentialPolicy )
342+ potentialKey , err := ClusterWideKeyFunc (& potentialPolicy )
327343 if err != nil {
328- return
344+ klog .Errorf ("Failed to convert ClusterPropagationPolicy to queued key: %v" , err )
345+ continue
329346 }
330- klog .Infof ("Enqueuing ClusterPropagationPolicy(%s/%s) in case of ClusterPropagationPolicy(%s/%s) priority changes" ,
331- potentialPolicy .GetNamespace (), potentialPolicy .GetName (), newPolicy .GetNamespace (), newPolicy .GetName ())
332- d .clusterPolicyReconcileWorker .Add (potentialKey )
347+
348+ klog .Infof ("Enqueuing ClusterPropagationPolicy(%s) in case of ClusterPropagationPolicy(%s) priority changes" ,
349+ potentialPolicy .GetName (), newPolicy .GetName ())
350+ if sortedPotentialKeys == nil {
351+ sortedPotentialKeys = pq .NewWith (priorityDescendingComparator )
352+ }
353+
354+ sortedPotentialKeys .Enqueue (& PriorityKey {
355+ QueueKey : potentialKey ,
356+ Priority : potentialPolicy .ExplicitPriority (),
357+ })
333358 }
334359 }
360+ requeuePotentialKeys (sortedPotentialKeys , d .clusterPolicyReconcileWorker )
361+ }
362+
363+ // requeuePotentialKeys re-queues potential policy keys.
364+ func requeuePotentialKeys (sortedPotentialKeys * pq.Queue , worker util.AsyncWorker ) {
365+ // No suitable policy key to re-queue.
366+ if sortedPotentialKeys == nil {
367+ return
368+ }
369+
370+ for {
371+ key , ok := sortedPotentialKeys .Dequeue ()
372+ if ! ok {
373+ break
374+ }
375+
376+ worker .Add (key .(* PriorityKey ).QueueKey )
377+ }
378+ }
379+
380+ // priorityDescendingComparator provides a basic descending comparison on policy priority.
381+ func priorityDescendingComparator (a , b interface {}) int {
382+ aPriority := a .(* PriorityKey ).Priority
383+ bPriority := b .(* PriorityKey ).Priority
384+ return godsutils .Int32Comparator (bPriority , aPriority )
335385}
0 commit comments