Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b82fd2e

Browse files
authoredJun 20, 2025
scheduler: refactor cluster reconciler to avoid hidden state mutation (#26042)
Cluster reconciler code is notoriously hard to follow because most of its method continuously mutate the fields of the allocReconciler object. Even for top-level methods it makes the code hard to follow, but gets really gnarly with lower-level methods (of which there are many). This changeset proposes a refactoring that makes the vast majority of said methods return explicit values, and avoid mutating object fields.
1 parent c8dcd3c commit b82fd2e

File tree

6 files changed

+976
-937
lines changed

6 files changed

+976
-937
lines changed
 

‎scheduler/generic_sched.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -340,51 +340,55 @@ func (s *GenericScheduler) computeJobAllocs() error {
340340

341341
r := reconciler.NewAllocReconciler(s.logger,
342342
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
343-
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID,
344-
s.eval.Priority, s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true))
345-
r.Compute()
346-
s.logger.Debug("reconciled current state with desired state", "results", log.Fmt("%#v", r.Result))
343+
s.batch, s.eval.JobID, s.job, s.deployment, allocs, s.eval.ID,
344+
s.eval.Priority, reconciler.ClusterState{
345+
TaintedNodes: tainted,
346+
SupportsDisconnectedClients: s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true),
347+
Now: time.Now().UTC(),
348+
})
349+
result := r.Compute()
350+
s.logger.Debug("reconciled current state with desired state", "results", log.Fmt("%#v", result))
347351

348352
if s.eval.AnnotatePlan {
349353
s.plan.Annotations = &structs.PlanAnnotations{
350-
DesiredTGUpdates: r.Result.DesiredTGUpdates,
354+
DesiredTGUpdates: result.DesiredTGUpdates,
351355
}
352356
}
353357

354358
// Add the deployment changes to the plan
355-
s.plan.Deployment = r.Result.Deployment
356-
s.plan.DeploymentUpdates = r.Result.DeploymentUpdates
359+
s.plan.Deployment = result.Deployment
360+
s.plan.DeploymentUpdates = result.DeploymentUpdates
357361

358362
// Store all the follow up evaluations from rescheduled allocations
359-
if len(r.Result.DesiredFollowupEvals) > 0 {
360-
for _, evals := range r.Result.DesiredFollowupEvals {
363+
if len(result.DesiredFollowupEvals) > 0 {
364+
for _, evals := range result.DesiredFollowupEvals {
361365
s.followUpEvals = append(s.followUpEvals, evals...)
362366
}
363367
}
364368

365369
// Update the stored deployment
366-
if r.Result.Deployment != nil {
367-
s.deployment = r.Result.Deployment
370+
if result.Deployment != nil {
371+
s.deployment = result.Deployment
368372
}
369373

370374
// Handle the stop
371-
for _, stop := range r.Result.Stop {
375+
for _, stop := range result.Stop {
372376
s.plan.AppendStoppedAlloc(stop.Alloc, stop.StatusDescription, stop.ClientStatus, stop.FollowupEvalID)
373377
}
374378

375379
// Handle disconnect updates
376-
for _, update := range r.Result.DisconnectUpdates {
380+
for _, update := range result.DisconnectUpdates {
377381
s.plan.AppendUnknownAlloc(update)
378382
}
379383

380384
// Handle reconnect updates.
381385
// Reconnected allocs have a new AllocState entry.
382-
for _, update := range r.Result.ReconnectUpdates {
386+
for _, update := range result.ReconnectUpdates {
383387
s.ctx.Plan().AppendAlloc(update, nil)
384388
}
385389

386390
// Handle the in-place updates
387-
for _, update := range r.Result.InplaceUpdate {
391+
for _, update := range result.InplaceUpdate {
388392
if update.DeploymentID != s.deployment.GetID() {
389393
update.DeploymentID = s.deployment.GetID()
390394
update.DeploymentStatus = nil
@@ -393,12 +397,12 @@ func (s *GenericScheduler) computeJobAllocs() error {
393397
}
394398

395399
// Handle the annotation updates
396-
for _, update := range r.Result.AttributeUpdates {
400+
for _, update := range result.AttributeUpdates {
397401
s.ctx.Plan().AppendAlloc(update, nil)
398402
}
399403

400404
// Nothing remaining to do if placement is not required
401-
if len(r.Result.Place)+len(r.Result.DestructiveUpdate) == 0 {
405+
if len(result.Place)+len(result.DestructiveUpdate) == 0 {
402406
// If the job has been purged we don't have access to the job. Otherwise
403407
// set the queued allocs to zero. This is true if the job is being
404408
// stopped as well.
@@ -411,18 +415,18 @@ func (s *GenericScheduler) computeJobAllocs() error {
411415
}
412416

413417
// Compute the placements
414-
place := make([]reconciler.PlacementResult, 0, len(r.Result.Place))
415-
for _, p := range r.Result.Place {
418+
place := make([]reconciler.PlacementResult, 0, len(result.Place))
419+
for _, p := range result.Place {
416420
s.queuedAllocs[p.TaskGroup().Name] += 1
417421
place = append(place, p)
418422
}
419423

420-
destructive := make([]reconciler.PlacementResult, 0, len(r.Result.DestructiveUpdate))
421-
for _, p := range r.Result.DestructiveUpdate {
424+
destructive := make([]reconciler.PlacementResult, 0, len(result.DestructiveUpdate))
425+
for _, p := range result.DestructiveUpdate {
422426
s.queuedAllocs[p.TaskGroup().Name] += 1
423427
destructive = append(destructive, p)
424428
}
425-
return s.computePlacements(destructive, place, r.Result.TaskGroupAllocNameIndexes)
429+
return s.computePlacements(destructive, place, result.TaskGroupAllocNameIndexes)
426430
}
427431

428432
// downgradedJobForPlacement returns the previous stable version of the job for

‎scheduler/reconciler/allocs.go

Lines changed: 0 additions & 255 deletions
Original file line numberDiff line numberDiff line change
@@ -224,235 +224,6 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet {
224224
return from
225225
}
226226

227-
// filterByTainted takes a set of tainted nodes and filters the allocation set
228-
// into the following groups:
229-
// 1. Those that exist on untainted nodes
230-
// 2. Those exist on nodes that are draining
231-
// 3. Those that exist on lost nodes or have expired
232-
// 4. Those that are on nodes that are disconnected, but have not had their ClientState set to unknown
233-
// 5. Those that are on a node that has reconnected.
234-
// 6. Those that are in a state that results in a noop.
235-
func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverSupportsDisconnectedClients bool, now time.Time) (untainted, migrate, lost, disconnecting, reconnecting, ignore, expiring allocSet) {
236-
untainted = make(map[string]*structs.Allocation)
237-
migrate = make(map[string]*structs.Allocation)
238-
lost = make(map[string]*structs.Allocation)
239-
disconnecting = make(map[string]*structs.Allocation)
240-
reconnecting = make(map[string]*structs.Allocation)
241-
ignore = make(map[string]*structs.Allocation)
242-
expiring = make(map[string]*structs.Allocation)
243-
244-
for _, alloc := range a {
245-
// make sure we don't apply any reconnect logic to task groups
246-
// without max_client_disconnect
247-
supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients)
248-
249-
reconnect := false
250-
251-
// Only compute reconnect for unknown, running, and failed since they
252-
// need to go through the reconnect logic.
253-
if supportsDisconnectedClients &&
254-
(alloc.ClientStatus == structs.AllocClientStatusUnknown ||
255-
alloc.ClientStatus == structs.AllocClientStatusRunning ||
256-
alloc.ClientStatus == structs.AllocClientStatusFailed) {
257-
reconnect = alloc.NeedsToReconnect()
258-
}
259-
260-
// Failed allocs that need to be reconnected must be added to
261-
// reconnecting so that they can be handled as a failed reconnect.
262-
if supportsDisconnectedClients &&
263-
reconnect &&
264-
alloc.DesiredStatus == structs.AllocDesiredStatusRun &&
265-
alloc.ClientStatus == structs.AllocClientStatusFailed {
266-
reconnecting[alloc.ID] = alloc
267-
continue
268-
}
269-
270-
taintedNode, nodeIsTainted := taintedNodes[alloc.NodeID]
271-
if taintedNode != nil && taintedNode.Status == structs.NodeStatusDisconnected {
272-
// Group disconnecting
273-
if supportsDisconnectedClients {
274-
// Filter running allocs on a node that is disconnected to be marked as unknown.
275-
if alloc.ClientStatus == structs.AllocClientStatusRunning {
276-
disconnecting[alloc.ID] = alloc
277-
continue
278-
}
279-
// Filter pending allocs on a node that is disconnected to be marked as lost.
280-
if alloc.ClientStatus == structs.AllocClientStatusPending {
281-
lost[alloc.ID] = alloc
282-
continue
283-
}
284-
285-
} else {
286-
if alloc.PreventReplaceOnDisconnect() {
287-
if alloc.ClientStatus == structs.AllocClientStatusRunning {
288-
disconnecting[alloc.ID] = alloc
289-
continue
290-
}
291-
292-
untainted[alloc.ID] = alloc
293-
continue
294-
}
295-
296-
lost[alloc.ID] = alloc
297-
continue
298-
}
299-
}
300-
301-
if alloc.TerminalStatus() && !reconnect {
302-
// Server-terminal allocs, if supportsDisconnectedClient and not reconnect,
303-
// are probably stopped replacements and should be ignored
304-
if supportsDisconnectedClients && alloc.ServerTerminalStatus() {
305-
ignore[alloc.ID] = alloc
306-
continue
307-
}
308-
309-
// Terminal canaries that have been marked for migration need to be
310-
// migrated, otherwise we block deployments from progressing by
311-
// counting them as running canaries.
312-
if alloc.DeploymentStatus.IsCanary() && alloc.DesiredTransition.ShouldMigrate() {
313-
migrate[alloc.ID] = alloc
314-
continue
315-
}
316-
317-
// Terminal allocs, if not reconnect, are always untainted as they
318-
// should never be migrated.
319-
untainted[alloc.ID] = alloc
320-
continue
321-
}
322-
323-
// Non-terminal allocs that should migrate should always migrate
324-
if alloc.DesiredTransition.ShouldMigrate() {
325-
migrate[alloc.ID] = alloc
326-
continue
327-
}
328-
329-
if supportsDisconnectedClients && alloc.Expired(now) {
330-
expiring[alloc.ID] = alloc
331-
continue
332-
}
333-
334-
// Acknowledge unknown allocs that we want to reconnect eventually.
335-
if supportsDisconnectedClients &&
336-
alloc.ClientStatus == structs.AllocClientStatusUnknown &&
337-
alloc.DesiredStatus == structs.AllocDesiredStatusRun {
338-
untainted[alloc.ID] = alloc
339-
continue
340-
}
341-
342-
// Ignore failed allocs that need to be reconnected and that have been
343-
// marked to stop by the server.
344-
if supportsDisconnectedClients &&
345-
reconnect &&
346-
alloc.ClientStatus == structs.AllocClientStatusFailed &&
347-
alloc.DesiredStatus == structs.AllocDesiredStatusStop {
348-
ignore[alloc.ID] = alloc
349-
continue
350-
}
351-
352-
if !nodeIsTainted || (taintedNode != nil && taintedNode.Status == structs.NodeStatusReady) {
353-
// Filter allocs on a node that is now re-connected to be resumed.
354-
if reconnect {
355-
// Expired unknown allocs should be processed depending on the max client disconnect
356-
// and/or avoid reschedule on lost configurations, they are both treated as
357-
// expiring.
358-
if alloc.Expired(now) {
359-
expiring[alloc.ID] = alloc
360-
continue
361-
}
362-
363-
reconnecting[alloc.ID] = alloc
364-
continue
365-
}
366-
367-
// Otherwise, Node is untainted so alloc is untainted
368-
untainted[alloc.ID] = alloc
369-
continue
370-
}
371-
372-
// Allocs on GC'd (nil) or lost nodes are Lost
373-
if taintedNode == nil {
374-
lost[alloc.ID] = alloc
375-
continue
376-
}
377-
378-
// Allocs on terminal nodes that can't be rescheduled need to be treated
379-
// differently than those that can.
380-
if taintedNode.TerminalStatus() {
381-
if alloc.PreventReplaceOnDisconnect() {
382-
if alloc.ClientStatus == structs.AllocClientStatusUnknown {
383-
untainted[alloc.ID] = alloc
384-
continue
385-
} else if alloc.ClientStatus == structs.AllocClientStatusRunning {
386-
disconnecting[alloc.ID] = alloc
387-
continue
388-
}
389-
}
390-
391-
lost[alloc.ID] = alloc
392-
continue
393-
}
394-
395-
// All other allocs are untainted
396-
untainted[alloc.ID] = alloc
397-
}
398-
399-
return
400-
}
401-
402-
// filterByRescheduleable filters the allocation set to return the set of allocations that are either
403-
// untainted or a set of allocations that must be rescheduled now. Allocations that can be rescheduled
404-
// at a future time are also returned so that we can create follow up evaluations for them. Allocs are
405-
// skipped or considered untainted according to logic defined in shouldFilter method.
406-
func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment) (allocSet, allocSet, []*delayedRescheduleInfo) {
407-
untainted := make(map[string]*structs.Allocation)
408-
rescheduleNow := make(map[string]*structs.Allocation)
409-
rescheduleLater := []*delayedRescheduleInfo{}
410-
411-
for _, alloc := range a {
412-
// Ignore disconnecting allocs that are already unknown. This can happen
413-
// in the case of canaries that are interrupted by a disconnect.
414-
if isDisconnecting && alloc.ClientStatus == structs.AllocClientStatusUnknown {
415-
continue
416-
}
417-
418-
var eligibleNow, eligibleLater bool
419-
var rescheduleTime time.Time
420-
421-
// Ignore failing allocs that have already been rescheduled.
422-
// Only failed or disconnecting allocs should be rescheduled.
423-
// Protects against a bug allowing rescheduling running allocs.
424-
if alloc.NextAllocation != "" && alloc.TerminalStatus() {
425-
continue
426-
}
427-
428-
isUntainted, ignore := shouldFilter(alloc, isBatch)
429-
if isUntainted && !isDisconnecting {
430-
untainted[alloc.ID] = alloc
431-
continue // these allocs can never be rescheduled, so skip checking
432-
}
433-
434-
if ignore {
435-
continue
436-
}
437-
438-
eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID, deployment, isDisconnecting)
439-
if eligibleNow {
440-
rescheduleNow[alloc.ID] = alloc
441-
continue
442-
}
443-
444-
// If the failed alloc is not eligible for rescheduling now we
445-
// add it to the untainted set.
446-
untainted[alloc.ID] = alloc
447-
448-
if eligibleLater {
449-
rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, alloc, rescheduleTime})
450-
}
451-
452-
}
453-
return untainted, rescheduleNow, rescheduleLater
454-
}
455-
456227
// shouldFilter returns whether the alloc should be ignored or considered untainted.
457228
//
458229
// Ignored allocs are filtered out.
@@ -550,32 +321,6 @@ func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID stri
550321
return
551322
}
552323

553-
// filterByTerminal filters out terminal allocs
554-
func filterByTerminal(untainted allocSet) (nonTerminal allocSet) {
555-
nonTerminal = make(map[string]*structs.Allocation)
556-
for id, alloc := range untainted {
557-
if !alloc.TerminalStatus() {
558-
nonTerminal[id] = alloc
559-
}
560-
}
561-
return
562-
}
563-
564-
// filterByDeployment filters allocations into two sets, those that match the
565-
// given deployment ID and those that don't
566-
func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) {
567-
match = make(map[string]*structs.Allocation)
568-
nonmatch = make(map[string]*structs.Allocation)
569-
for _, alloc := range a {
570-
if alloc.DeploymentID == id {
571-
match[alloc.ID] = alloc
572-
} else {
573-
nonmatch[alloc.ID] = alloc
574-
}
575-
}
576-
return
577-
}
578-
579324
// delayByStopAfter returns a delay for any lost allocation that's got a
580325
// disconnect.stop_on_client_after configured
581326
func (a allocSet) delayByStopAfter() (later []*delayedRescheduleInfo) {
There was a problem loading the remainder of the diff.

0 commit comments

Comments
 (0)
Failed to load comments.