19
19
from enum import Enum
20
20
from logging .config import fileConfig
21
21
from subprocess import CalledProcessError
22
+ from typing import Dict , List
22
23
23
24
from botocore .config import Config
24
25
from common .schedulers .slurm_commands import (
36
37
from retrying import retry
37
38
from slurm_plugin .common import TIMESTAMP_FORMAT , log_exception , print_with_count , read_json
38
39
from slurm_plugin .instance_manager import InstanceManager
39
- from slurm_plugin .slurm_resources import CONFIG_FILE_DIR , EC2InstanceHealthState , PartitionStatus , StaticNode
40
+ from slurm_plugin .slurm_resources import (
41
+ CONFIG_FILE_DIR ,
42
+ ComputeResourceFailureEvent ,
43
+ DynamicNode ,
44
+ EC2InstanceHealthState ,
45
+ PartitionStatus ,
46
+ SlurmNode ,
47
+ StaticNode ,
48
+ )
40
49
41
50
LOOP_TIME = 60
42
51
log = logging .getLogger (__name__ )
@@ -300,6 +309,7 @@ def __init__(self, config):
300
309
self.static_nodes_in_replacement is persistent across multiple iteration of manage_cluster
301
310
This state is required because we need to ignore static nodes that might have long bootstrap time
302
311
"""
312
+ self ._insufficient_capacity_compute_resources = {}
303
313
self ._static_nodes_in_replacement = set ()
304
314
self ._partitions_protected_failure_count_map = {}
305
315
self ._compute_fleet_status = ComputeFleetStatus .RUNNING
@@ -380,7 +390,7 @@ def manage_cluster(self):
380
390
log .info ("Retrieving nodes info from the scheduler" )
381
391
nodes = self ._get_node_info_with_retry ()
382
392
log .debug ("Nodes: %s" , nodes )
383
- partitions_name_map = self ._retrieve_scheduler_partitions (nodes )
393
+ partitions_name_map , compute_resource_nodes_map = self ._retrieve_scheduler_partitions (nodes )
384
394
except Exception as e :
385
395
log .error (
386
396
"Unable to get partition/node info from slurm, no other action can be performed. Sleeping... "
@@ -404,7 +414,7 @@ def manage_cluster(self):
404
414
if not self ._config .disable_all_health_checks :
405
415
self ._perform_health_check_actions (partitions )
406
416
# Maintain slurm nodes
407
- self ._maintain_nodes (partitions_name_map )
417
+ self ._maintain_nodes (partitions_name_map , compute_resource_nodes_map )
408
418
# Clean up orphaned instances
409
419
self ._terminate_orphaned_instances (cluster_instances )
410
420
elif self ._compute_fleet_status in {
@@ -608,7 +618,7 @@ def _find_unhealthy_slurm_nodes(self, slurm_nodes):
608
618
def _increase_partitions_protected_failure_count (self , bootstrap_failure_nodes ):
609
619
"""Keep count of boostrap failures."""
610
620
for node in bootstrap_failure_nodes :
611
- compute_resource = node .get_compute_resource_name ()
621
+ compute_resource = node .compute_resource_name
612
622
for p in node .partitions :
613
623
if p in self ._partitions_protected_failure_count_map :
614
624
self ._partitions_protected_failure_count_map [p ][compute_resource ] = (
@@ -701,7 +711,7 @@ def _handle_unhealthy_static_nodes(self, unhealthy_static_nodes):
701
711
)
702
712
703
713
@log_exception (log , "maintaining slurm nodes" , catch_exception = Exception , raise_on_error = False )
704
- def _maintain_nodes (self , partitions_name_map ):
714
+ def _maintain_nodes (self , partitions_name_map , compute_resource_nodes_map ):
705
715
"""
706
716
Call functions to maintain unhealthy nodes.
707
717
@@ -725,6 +735,8 @@ def _maintain_nodes(self, partitions_name_map):
725
735
self ._handle_unhealthy_static_nodes (unhealthy_static_nodes )
726
736
if self ._is_protected_mode_enabled ():
727
737
self ._handle_protected_mode_process (active_nodes , partitions_name_map )
738
+ if self ._config .disable_nodes_on_insufficient_capacity :
739
+ self ._handle_ice_nodes (unhealthy_dynamic_nodes , compute_resource_nodes_map )
728
740
self ._handle_failed_health_check_nodes_in_replacement (active_nodes )
729
741
730
742
@log_exception (log , "terminating orphaned instances" , catch_exception = Exception , raise_on_error = False )
@@ -877,6 +889,7 @@ def _handle_health_check(self, unhealthy_instances_status, instance_id_to_active
877
889
def _retrieve_scheduler_partitions (nodes ):
878
890
try :
879
891
ignored_nodes = []
892
+ compute_resource_nodes_map = {}
880
893
partitions_name_map = ClusterManager ._get_partition_info_with_retry ()
881
894
log .debug ("Partitions: %s" , partitions_name_map )
882
895
for node in nodes :
@@ -886,9 +899,12 @@ def _retrieve_scheduler_partitions(nodes):
886
899
else :
887
900
for p in node .partitions :
888
901
partitions_name_map [p ].slurm_nodes .append (node )
902
+ compute_resource_nodes_map .setdefault (node .queue_name , {}).setdefault (
903
+ node .compute_resource_name , []
904
+ ).append (node )
889
905
if ignored_nodes :
890
906
log .warning ("Ignoring following nodes because they do not belong to any partition: %s" , ignored_nodes )
891
- return partitions_name_map
907
+ return partitions_name_map , compute_resource_nodes_map
892
908
except Exception as e :
893
909
log .error ("Failed when getting partition/node states from scheduler with exception %s" , e )
894
910
raise
@@ -950,6 +966,17 @@ def _find_active_nodes(partitions_name_map):
950
966
active_nodes += partition .slurm_nodes
951
967
return active_nodes
952
968
969
+ @staticmethod
970
+ def _get_unhealthy_ice_nodes (unhealthy_dynamic_nodes : List [DynamicNode ]) -> Dict [str , Dict [str , List [DynamicNode ]]]:
971
+ """Get insufficient capacity compute resource and nodes, error code mapping."""
972
+ ice_compute_resources_and_nodes_map = {}
973
+ for node in unhealthy_dynamic_nodes :
974
+ if node .is_ice ():
975
+ ice_compute_resources_and_nodes_map .setdefault (node .queue_name , {}).setdefault (
976
+ node .compute_resource_name , []
977
+ ).append (node )
978
+ return ice_compute_resources_and_nodes_map
979
+
953
980
def _is_node_in_replacement_valid (self , node , check_node_is_valid ):
954
981
"""
955
982
Check node is replacement timeout or in replacement.
@@ -964,6 +991,116 @@ def _is_node_in_replacement_valid(self, node, check_node_is_valid):
964
991
return not time_is_expired if check_node_is_valid else time_is_expired
965
992
return False
966
993
994
+ @log_exception (
995
+ log , "handling nodes failed due to insufficient capacity" , catch_exception = Exception , raise_on_error = False
996
+ )
997
+ def _handle_ice_nodes (
998
+ self ,
999
+ unhealthy_dynamic_nodes : List [DynamicNode ],
1000
+ compute_resource_nodes_map : Dict [str , Dict [str , List [SlurmNode ]]],
1001
+ ):
1002
+ """Handle nodes failed with insufficient capacity."""
1003
+ # get insufficient capacity compute resource and nodes mapping
1004
+ ice_compute_resources_and_nodes_map = self ._get_unhealthy_ice_nodes (unhealthy_dynamic_nodes )
1005
+ if ice_compute_resources_and_nodes_map :
1006
+ self ._update_insufficient_capacity_compute_resources (ice_compute_resources_and_nodes_map )
1007
+ self ._reset_timeout_expired_compute_resources (ice_compute_resources_and_nodes_map )
1008
+ self ._set_ice_compute_resources_to_down (compute_resource_nodes_map )
1009
+
1010
+ def _update_insufficient_capacity_compute_resources (
1011
+ self , ice_compute_resources_and_nodes_map : Dict [str , Dict [str , List [SlurmNode ]]]
1012
+ ):
1013
+ """Add compute resource to insufficient_capacity_compute_resources if node is ICE node."""
1014
+ for queue_name , compute_resources in ice_compute_resources_and_nodes_map .items ():
1015
+ for compute_resource , nodes in compute_resources .items ():
1016
+ if not self ._insufficient_capacity_compute_resources .get (queue_name , {}).get (compute_resource ):
1017
+ self ._insufficient_capacity_compute_resources .setdefault (queue_name , {})[
1018
+ compute_resource
1019
+ ] = ComputeResourceFailureEvent (self ._current_time , nodes [0 ].error_code )
1020
+
1021
+ def _reset_timeout_expired_compute_resources (
1022
+ self , ice_compute_resources_and_nodes_map : Dict [str , Dict [str , List [SlurmNode ]]]
1023
+ ):
1024
+ """Reset compute resources which insufficient_capacity_timeout expired."""
1025
+ # Find insufficient_capacity_timeout compute resources
1026
+ if not self ._insufficient_capacity_compute_resources :
1027
+ return
1028
+ log .info (
1029
+ "The following compute resources are in down state due to insufficient capacity: %s, "
1030
+ "compute resources will be reset after insufficient capacity timeout (%s seconds) expired" ,
1031
+ self ._insufficient_capacity_compute_resources ,
1032
+ self ._config .insufficient_capacity_timeout ,
1033
+ )
1034
+ timeout_expired_compute_resources = self ._find_insufficient_capacity_timeout_expired_compute_resources ()
1035
+
1036
+ # Reset nodes which insufficient capacity timeout expired
1037
+ if timeout_expired_compute_resources :
1038
+ self ._reset_insufficient_capacity_timeout_expired_nodes (
1039
+ timeout_expired_compute_resources , ice_compute_resources_and_nodes_map
1040
+ )
1041
+
1042
+ def _set_ice_compute_resources_to_down (self , compute_resource_nodes_map : Dict [str , Dict [str , List [SlurmNode ]]]):
1043
+ """Set powered_down nodes which belong to insufficient capacity compute resources to down."""
1044
+ if not self ._insufficient_capacity_compute_resources :
1045
+ return
1046
+ nodes_to_down = {}
1047
+ for queue_name , compute_resources in self ._insufficient_capacity_compute_resources .items ():
1048
+ for compute_resource , event in compute_resources .items ():
1049
+ nodes = compute_resource_nodes_map .get (queue_name , {}).get (compute_resource , [])
1050
+ for node in nodes :
1051
+ if not node .is_ice () and node .is_power () and not node .is_nodeaddr_set ():
1052
+ error_code = event .error_code
1053
+ nodes_to_down .setdefault (error_code , []).append (node .name )
1054
+ if nodes_to_down :
1055
+ for error_code , node_list in nodes_to_down .items ():
1056
+ log .info (
1057
+ "Setting following nodes into DOWN state due to insufficient capacity: %s" ,
1058
+ print_with_count (node_list ),
1059
+ )
1060
+ set_nodes_down (
1061
+ node_list , reason = f"(Code:{ error_code } )Temporarily disabling node due to insufficient capacity"
1062
+ )
1063
+
1064
+ def _find_insufficient_capacity_timeout_expired_compute_resources (
1065
+ self ,
1066
+ ) -> Dict [str , Dict [str , ComputeResourceFailureEvent ]]:
1067
+ """Find compute resources which insufficient_capacity_timeout expired."""
1068
+ timeout_expired_cr = dict ()
1069
+ for queue_name , compute_resources in self ._insufficient_capacity_compute_resources .copy ().items ():
1070
+ for compute_resource , event in compute_resources .copy ().items ():
1071
+ if time_is_up (event .timestamp , self ._current_time , self ._config .insufficient_capacity_timeout ):
1072
+ self ._insufficient_capacity_compute_resources [queue_name ].pop (compute_resource )
1073
+ timeout_expired_cr .setdefault (queue_name , []).append (compute_resource )
1074
+ if not self ._insufficient_capacity_compute_resources .get (queue_name ):
1075
+ self ._insufficient_capacity_compute_resources .pop (queue_name )
1076
+ return timeout_expired_cr
1077
+
1078
+ def _reset_insufficient_capacity_timeout_expired_nodes (
1079
+ self ,
1080
+ timeout_expired_cr : Dict [str , Dict [str , ComputeResourceFailureEvent ]],
1081
+ ice_compute_resources_and_nodes_map : Dict [str , Dict [str , ComputeResourceFailureEvent ]],
1082
+ ):
1083
+ """Reset nodes in the compute resource which insufficient_capacity_timeout expired."""
1084
+ logging .info (
1085
+ f"Reset the following compute resources because insufficient capacity timeout expired: { timeout_expired_cr } "
1086
+ )
1087
+ nodes_to_power_down = []
1088
+ for queue , compute_resources in timeout_expired_cr .items ():
1089
+ for compute_resource in compute_resources :
1090
+ nodes = ice_compute_resources_and_nodes_map .get (queue , {}).get (compute_resource , [])
1091
+ nodes_to_power_down += nodes
1092
+
1093
+ if nodes_to_power_down :
1094
+ node_names = [node .name for node in nodes_to_power_down ]
1095
+ log .info (
1096
+ "Enabling the following nodes because insufficient capacity timeout expired: %s" ,
1097
+ print_with_count (node_names ),
1098
+ )
1099
+ set_nodes_power_down (
1100
+ node_names ,
1101
+ reason = "Enabling node since insufficient capacity timeout expired" ,
1102
+ )
1103
+
967
1104
968
1105
def _run_clustermgtd (config_file ):
969
1106
"""Run clustermgtd actions."""
0 commit comments