Skip to content

Commit fd40d68

Browse files
authored
Merge pull request kubernetes#123238 from MirrorShih/master
Fix infinite loop and replace channel with queue
2 parents 0e39ca8 + 62f8997 commit fd40d68

File tree

2 files changed

+135
-106
lines changed

2 files changed

+135
-106
lines changed

pkg/controller/nodeipam/ipam/cidr_allocator.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,6 @@ type CIDRAllocatorParams struct {
106106
NodeCIDRMaskSizes []int
107107
}
108108

109-
// CIDRs are reserved, then node resource is patched with them.
110-
// nodeReservedCIDRs holds the reservation info for a node.
111-
type nodeReservedCIDRs struct {
112-
allocatedCIDRs []*net.IPNet
113-
nodeName string
114-
}
115-
116109
// New creates a new CIDR range allocator.
117110
func New(ctx context.Context, kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) {
118111
nodeList, err := listNodes(ctx, kubeClient)

pkg/controller/nodeipam/ipam/range_allocator.go

Lines changed: 135 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,24 @@ import (
2020
"context"
2121
"fmt"
2222
"net"
23-
"sync"
23+
"time"
2424

2525
v1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/util/wait"
2627
"k8s.io/klog/v2"
2728
netutils "k8s.io/utils/net"
2829

2930
apierrors "k8s.io/apimachinery/pkg/api/errors"
3031
"k8s.io/apimachinery/pkg/types"
3132
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
32-
"k8s.io/apimachinery/pkg/util/sets"
3333
informers "k8s.io/client-go/informers/core/v1"
3434
clientset "k8s.io/client-go/kubernetes"
3535
"k8s.io/client-go/kubernetes/scheme"
3636
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
3737
corelisters "k8s.io/client-go/listers/core/v1"
3838
"k8s.io/client-go/tools/cache"
3939
"k8s.io/client-go/tools/record"
40+
"k8s.io/client-go/util/workqueue"
4041
nodeutil "k8s.io/component-helpers/node/util"
4142
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
4243
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
@@ -52,14 +53,12 @@ type rangeAllocator struct {
5253
nodeLister corelisters.NodeLister
5354
// nodesSynced returns true if the node shared informer has been synced at least once.
5455
nodesSynced cache.InformerSynced
55-
// Channel that is used to pass updating Nodes and their reserved CIDRs to the background
56-
// This increases a throughput of CIDR assignment by not blocking on long operations.
57-
nodeCIDRUpdateChannel chan nodeReservedCIDRs
58-
broadcaster record.EventBroadcaster
59-
recorder record.EventRecorder
60-
// Keep a set of nodes that are currently being processed to avoid races in CIDR allocation
61-
lock sync.Mutex
62-
nodesInProcessing sets.String
56+
broadcaster record.EventBroadcaster
57+
recorder record.EventRecorder
58+
59+
// queues are where incoming work is placed to de-dup and to allow "easy"
60+
// rate limited requeues on errors
61+
queue workqueue.RateLimitingInterface
6362
}
6463

6564
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDRs for node (one from each of clusterCIDRs)
@@ -89,15 +88,14 @@ func NewCIDRRangeAllocator(ctx context.Context, client clientset.Interface, node
8988
}
9089

9190
ra := &rangeAllocator{
92-
client: client,
93-
clusterCIDRs: allocatorParams.ClusterCIDRs,
94-
cidrSets: cidrSets,
95-
nodeLister: nodeInformer.Lister(),
96-
nodesSynced: nodeInformer.Informer().HasSynced,
97-
nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize),
98-
broadcaster: eventBroadcaster,
99-
recorder: recorder,
100-
nodesInProcessing: sets.NewString(),
91+
client: client,
92+
clusterCIDRs: allocatorParams.ClusterCIDRs,
93+
cidrSets: cidrSets,
94+
nodeLister: nodeInformer.Lister(),
95+
nodesSynced: nodeInformer.Informer().HasSynced,
96+
broadcaster: eventBroadcaster,
97+
recorder: recorder,
98+
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cidrallocator_node"),
10199
}
102100

103101
if allocatorParams.ServiceCIDR != nil {
@@ -130,37 +128,33 @@ func NewCIDRRangeAllocator(ctx context.Context, client clientset.Interface, node
130128
}
131129

132130
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
133-
AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
134-
return ra.AllocateOrOccupyCIDR(logger, node)
135-
}),
136-
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
137-
// If the PodCIDRs list is not empty we either:
138-
// - already processed a Node that already had CIDRs after NC restarted
139-
// (cidr is marked as used),
140-
// - already processed a Node successfully and allocated CIDRs for it
141-
// (cidr is marked as used),
142-
// - already processed a Node but we did saw a "timeout" response and
143-
// request eventually got through in this case we haven't released
144-
// the allocated CIDRs (cidr is still marked as used).
145-
// There's a possible error here:
146-
// - NC sees a new Node and assigns CIDRs X,Y.. to it,
147-
// - Update Node call fails with a timeout,
148-
// - Node is updated by some other component, NC sees an update and
149-
// assigns CIDRs A,B.. to the Node,
150-
// - Both CIDR X,Y.. and CIDR A,B.. are marked as used in the local cache,
151-
// even though Node sees only CIDR A,B..
152-
// The problem here is that in in-memory cache we see CIDR X,Y.. as marked,
153-
// which prevents it from being assigned to any new node. The cluster
154-
// state is correct.
155-
// Restart of NC fixes the issue.
156-
if len(newNode.Spec.PodCIDRs) == 0 {
157-
return ra.AllocateOrOccupyCIDR(logger, newNode)
131+
AddFunc: func(obj interface{}) {
132+
key, err := cache.MetaNamespaceKeyFunc(obj)
133+
if err == nil {
134+
ra.queue.Add(key)
158135
}
159-
return nil
160-
}),
161-
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
162-
return ra.ReleaseCIDR(logger, node)
163-
}),
136+
},
137+
UpdateFunc: func(old, new interface{}) {
138+
key, err := cache.MetaNamespaceKeyFunc(new)
139+
if err == nil {
140+
ra.queue.Add(key)
141+
}
142+
},
143+
DeleteFunc: func(obj interface{}) {
144+
// The informer cache no longer has the object, and since Node doesn't have a finalizer,
145+
// we don't see the Update with DeletionTimestamp != 0.
146+
// TODO: instead of executing the operation directly in the handler, build a small cache with key node.Name
147+
// and value PodCIDRs use ReleaseCIDR on the reconcile loop so we can retry on `ReleaseCIDR` failures.
148+
if err := ra.ReleaseCIDR(logger, obj.(*v1.Node)); err != nil {
149+
utilruntime.HandleError(fmt.Errorf("error while processing CIDR Release: %w", err))
150+
}
151+
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
152+
// key function.
153+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
154+
if err == nil {
155+
ra.queue.Add(key)
156+
}
157+
},
164158
})
165159

166160
return ra, nil
@@ -176,6 +170,8 @@ func (r *rangeAllocator) Run(ctx context.Context) {
176170
r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")})
177171
defer r.broadcaster.Shutdown()
178172

173+
defer r.queue.ShutDown()
174+
179175
logger.Info("Starting range CIDR allocator")
180176
defer logger.Info("Shutting down range CIDR allocator")
181177

@@ -184,50 +180,100 @@ func (r *rangeAllocator) Run(ctx context.Context) {
184180
}
185181

186182
for i := 0; i < cidrUpdateWorkers; i++ {
187-
go r.worker(ctx)
183+
go wait.UntilWithContext(ctx, r.runWorker, time.Second)
188184
}
189185

190186
<-ctx.Done()
191187
}
192188

193-
func (r *rangeAllocator) worker(ctx context.Context) {
194-
logger := klog.FromContext(ctx)
195-
for {
196-
select {
197-
case workItem, ok := <-r.nodeCIDRUpdateChannel:
198-
if !ok {
199-
logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
200-
return
201-
}
202-
if err := r.updateCIDRsAllocation(logger, workItem); err != nil {
203-
// Requeue the failed node for update again.
204-
r.nodeCIDRUpdateChannel <- workItem
205-
}
206-
case <-ctx.Done():
207-
return
208-
}
189+
// runWorker is a long-running function that will continually call the
190+
// processNextWorkItem function in order to read and process a message on the
191+
// queue.
192+
func (r *rangeAllocator) runWorker(ctx context.Context) {
193+
for r.processNextNodeWorkItem(ctx) {
209194
}
210195
}
211196

212-
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
213-
r.lock.Lock()
214-
defer r.lock.Unlock()
215-
if r.nodesInProcessing.Has(nodeName) {
197+
// processNextWorkItem will read a single work item off the queue and
198+
// attempt to process it, by calling the syncHandler.
199+
func (r *rangeAllocator) processNextNodeWorkItem(ctx context.Context) bool {
200+
obj, shutdown := r.queue.Get()
201+
if shutdown {
216202
return false
217203
}
218-
r.nodesInProcessing.Insert(nodeName)
204+
205+
// We wrap this block in a func so we can defer r.queue.Done.
206+
err := func(logger klog.Logger, obj interface{}) error {
207+
// We call Done here so the workNodeQueue knows we have finished
208+
// processing this item. We also must remember to call Forget if we
209+
// do not want this work item being re-queued. For example, we do
210+
// not call Forget if a transient error occurs, instead the item is
211+
// put back on the queue and attempted again after a back-off
212+
// period.
213+
defer r.queue.Done(obj)
214+
var key string
215+
var ok bool
216+
// We expect strings to come off the workNodeQueue. These are of the
217+
// form namespace/name. We do this as the delayed nature of the
218+
// workNodeQueue means the items in the informer cache may actually be
219+
// more up to date that when the item was initially put onto the
220+
// workNodeQueue.
221+
if key, ok = obj.(string); !ok {
222+
// As the item in the workNodeQueue is actually invalid, we call
223+
// Forget here else we'd go into a loop of attempting to
224+
// process a work item that is invalid.
225+
r.queue.Forget(obj)
226+
utilruntime.HandleError(fmt.Errorf("expected string in workNodeQueue but got %#v", obj))
227+
return nil
228+
}
229+
// Run the syncHandler, passing it the namespace/name string of the
230+
// Foo resource to be synced.
231+
if err := r.syncNode(logger, key); err != nil {
232+
// Put the item back on the queue to handle any transient errors.
233+
r.queue.AddRateLimited(key)
234+
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
235+
}
236+
// Finally, if no error occurs we Forget this item so it does not
237+
// get queue again until another change happens.
238+
r.queue.Forget(obj)
239+
logger.Info("Successfully synced", "key", key)
240+
return nil
241+
}(klog.FromContext(ctx), obj)
242+
243+
if err != nil {
244+
utilruntime.HandleError(err)
245+
return true
246+
}
247+
219248
return true
220249
}
221250

222-
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
223-
r.lock.Lock()
224-
defer r.lock.Unlock()
225-
r.nodesInProcessing.Delete(nodeName)
251+
func (r *rangeAllocator) syncNode(logger klog.Logger, key string) error {
252+
startTime := time.Now()
253+
defer func() {
254+
logger.V(4).Info("Finished syncing Node request", "node", key, "elapsed", time.Since(startTime))
255+
}()
256+
257+
node, err := r.nodeLister.Get(key)
258+
if apierrors.IsNotFound(err) {
259+
logger.V(3).Info("node has been deleted", "node", key)
260+
// TODO: obtain the node object information to call ReleaseCIDR from here
261+
// and retry if there is an error.
262+
return nil
263+
}
264+
if err != nil {
265+
return err
266+
}
267+
// Check the DeletionTimestamp to determine if object is under deletion.
268+
if !node.DeletionTimestamp.IsZero() {
269+
logger.V(3).Info("node is being deleted", "node", key)
270+
return r.ReleaseCIDR(logger, node)
271+
}
272+
return r.AllocateOrOccupyCIDR(logger, node)
226273
}
227274

228275
// marks node.PodCIDRs[...] as used in allocator's tracked cidrSet
229276
func (r *rangeAllocator) occupyCIDRs(node *v1.Node) error {
230-
defer r.removeNodeFromProcessing(node.Name)
231277
if len(node.Spec.PodCIDRs) == 0 {
232278
return nil
233279
}
@@ -257,34 +303,25 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node)
257303
if node == nil {
258304
return nil
259305
}
260-
if !r.insertNodeToProcessing(node.Name) {
261-
logger.V(2).Info("Node is already in a process of CIDR assignment", "node", klog.KObj(node))
262-
return nil
263-
}
264306

265307
if len(node.Spec.PodCIDRs) > 0 {
266308
return r.occupyCIDRs(node)
267309
}
268-
// allocate and queue the assignment
269-
allocated := nodeReservedCIDRs{
270-
nodeName: node.Name,
271-
allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)),
272-
}
310+
311+
allocatedCIDRs := make([]*net.IPNet, len(r.cidrSets))
273312

274313
for idx := range r.cidrSets {
275314
podCIDR, err := r.cidrSets[idx].AllocateNext()
276315
if err != nil {
277-
r.removeNodeFromProcessing(node.Name)
278316
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable")
279317
return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err)
280318
}
281-
allocated.allocatedCIDRs[idx] = podCIDR
319+
allocatedCIDRs[idx] = podCIDR
282320
}
283321

284322
//queue the assignment
285-
logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", allocated.allocatedCIDRs)
286-
r.nodeCIDRUpdateChannel <- allocated
287-
return nil
323+
logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", allocatedCIDRs)
324+
return r.updateCIDRsAllocation(logger, node.Name, allocatedCIDRs)
288325
}
289326

290327
// ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets
@@ -336,21 +373,20 @@ func (r *rangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR *
336373
}
337374

338375
// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
339-
func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeReservedCIDRs) error {
376+
func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, nodeName string, allocatedCIDRs []*net.IPNet) error {
340377
var err error
341378
var node *v1.Node
342-
defer r.removeNodeFromProcessing(data.nodeName)
343-
cidrsString := ipnetToStringList(data.allocatedCIDRs)
344-
node, err = r.nodeLister.Get(data.nodeName)
379+
cidrsString := ipnetToStringList(allocatedCIDRs)
380+
node, err = r.nodeLister.Get(nodeName)
345381
if err != nil {
346-
logger.Error(err, "Failed while getting node for updating Node.Spec.PodCIDRs", "node", klog.KRef("", data.nodeName))
382+
logger.Error(err, "Failed while getting node for updating Node.Spec.PodCIDRs", "node", klog.KRef("", nodeName))
347383
return err
348384
}
349385

350386
// if cidr list matches the proposed.
351387
// then we possibly updated this node
352388
// and just failed to ack the success.
353-
if len(node.Spec.PodCIDRs) == len(data.allocatedCIDRs) {
389+
if len(node.Spec.PodCIDRs) == len(allocatedCIDRs) {
354390
match := true
355391
for idx, cidr := range cidrsString {
356392
if node.Spec.PodCIDRs[idx] != cidr {
@@ -359,15 +395,15 @@ func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeRese
359395
}
360396
}
361397
if match {
362-
logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "CIDRs", data.allocatedCIDRs)
398+
logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "CIDRs", allocatedCIDRs)
363399
return nil
364400
}
365401
}
366402

367403
// node has cidrs, release the reserved
368404
if len(node.Spec.PodCIDRs) != 0 {
369405
logger.Error(nil, "Node already has a CIDR allocated. Releasing the new one", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs)
370-
for idx, cidr := range data.allocatedCIDRs {
406+
for idx, cidr := range allocatedCIDRs {
371407
if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil {
372408
logger.Error(releaseErr, "Error when releasing CIDR", "index", idx, "CIDR", cidr)
373409
}
@@ -390,7 +426,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeRese
390426
// NodeController restart will return all falsely allocated CIDRs to the pool.
391427
if !apierrors.IsServerTimeout(err) {
392428
logger.Error(err, "CIDR assignment for node failed. Releasing allocated CIDR", "node", klog.KObj(node))
393-
for idx, cidr := range data.allocatedCIDRs {
429+
for idx, cidr := range allocatedCIDRs {
394430
if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil {
395431
logger.Error(releaseErr, "Error releasing allocated CIDR for node", "node", klog.KObj(node))
396432
}

0 commit comments

Comments
 (0)