Skip to content

Commit e884d97

Browse files
committed
Clustermgtd refactor
Signed-off-by: chenwany <[email protected]>
1 parent af299c9 commit e884d97

File tree

17 files changed

+3974
-3968
lines changed

17 files changed

+3974
-3968
lines changed

src/common/schedulers/slurm_commands.py

Lines changed: 28 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,21 @@
99
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
1010
# limitations under the License.
1111

12-
13-
import collections
1412
import logging
1513
import re
16-
from enum import Enum
1714

18-
from common.utils import check_command_output, convert_range_to_list, grouper, run_command
15+
from common.utils import check_command_output, grouper, run_command
1916
from retrying import retry
17+
from slurm_plugin.slurm_resources import (
18+
DynamicNode,
19+
InvalidNodenameError,
20+
PartitionStatus,
21+
SlurmPartition,
22+
StaticNode,
23+
parse_nodename,
24+
)
25+
26+
log = logging.getLogger(__name__)
2027

2128
PENDING_RESOURCES_REASONS = [
2229
"Resources",
@@ -46,144 +53,12 @@
4653
SCONTROL = "sudo /opt/slurm/bin/scontrol"
4754
SINFO = "/opt/slurm/bin/sinfo"
4855

49-
SlurmPartition = collections.namedtuple("SlurmPartition", ["name", "nodes", "state"])
50-
NodeType = collections.namedtuple("NodeType", ["instance_type", "partition"])
51-
5256
# Set default timeouts for running different slurm commands.
5357
# These timeouts might be needed when running on large scale
5458
DEFAULT_GET_INFO_COMMAND_TIMEOUT = 30
5559
DEFAULT_UPDATE_COMMAND_TIMEOUT = 60
5660

5761

58-
class PartitionStatus(Enum):
59-
UP = "UP"
60-
DOWN = "DOWN"
61-
INACTIVE = "INACTIVE"
62-
DRAIN = "DRAIN"
63-
64-
def __str__(self):
65-
return str(self.value)
66-
67-
68-
class SlurmNode:
69-
SLURM_SCONTROL_BUSY_STATES = {"MIXED", "ALLOCATED", "COMPLETING"}
70-
SLURM_SCONTROL_IDLE_STATE = "IDLE"
71-
SLURM_SCONTROL_DOWN_STATE = "DOWN"
72-
SLURM_SCONTROL_DRAIN_STATE = "DRAIN"
73-
SLURM_SCONTROL_POWERING_DOWN_STATE = "POWERING_DOWN"
74-
SLURM_SCONTROL_POWER_STATE = "IDLE+CLOUD+POWER"
75-
SLURM_SCONTROL_POWER_UP_STATE = "#"
76-
SLURM_SCONTROL_ONLINE_STATES = {"IDLE+CLOUD", "MIXED+CLOUD", "ALLOCATED+CLOUD", "COMPLETING+CLOUD"}
77-
SLURM_SCONTROL_POWER_WITH_JOB_STATE = "MIXED+CLOUD+POWER"
78-
SLURM_SCONTROL_RESUME_FAILED_STATE = "DOWN*+CLOUD+POWER"
79-
80-
def __init__(self, name, nodeaddr, nodehostname, state, partitions=None):
81-
"""Initialize slurm node with attributes."""
82-
self.name = name
83-
self.is_static = is_static_node(name)
84-
self.nodeaddr = nodeaddr
85-
self.nodehostname = nodehostname
86-
self.state = state
87-
self.partitions = partitions.strip().split(",") if partitions else None
88-
89-
def is_nodeaddr_set(self):
90-
"""Check if nodeaddr(private ip) for the node is set."""
91-
return self.nodeaddr != self.name
92-
93-
def has_job(self):
94-
"""Check if slurm node is in a working state."""
95-
return any(working_state in self.state for working_state in self.SLURM_SCONTROL_BUSY_STATES)
96-
97-
def _is_drain(self):
98-
"""Check if slurm node is in any drain(draining, drained) states."""
99-
return self.SLURM_SCONTROL_DRAIN_STATE in self.state
100-
101-
def is_drained(self):
102-
"""
103-
Check if slurm node is in drained state.
104-
105-
drained(sinfo) is equivalent to IDLE+DRAIN(scontrol) or DOWN+DRAIN(scontrol)
106-
"""
107-
return self._is_drain() and (
108-
self.SLURM_SCONTROL_IDLE_STATE in self.state or self.SLURM_SCONTROL_DOWN_STATE in self.state
109-
)
110-
111-
def is_powering_down(self):
112-
"""Check if slurm node is in powering down state."""
113-
return self.SLURM_SCONTROL_POWERING_DOWN_STATE in self.state
114-
115-
def is_power(self):
116-
"""Check if slurm node is in power state."""
117-
return self.SLURM_SCONTROL_POWER_STATE == self.state
118-
119-
def is_down(self):
120-
"""Check if slurm node is in a down state."""
121-
return self.SLURM_SCONTROL_DOWN_STATE in self.state and not self.is_powering_down()
122-
123-
def is_up(self):
124-
"""Check if slurm node is in a healthy state."""
125-
return not self._is_drain() and not self.is_down() and not self.is_powering_down()
126-
127-
def is_powering_up(self):
128-
"""Check if slurm node is in powering up state."""
129-
return self.SLURM_SCONTROL_POWER_UP_STATE in self.state
130-
131-
def is_online(self):
132-
"""Check if slurm node is online with backing instance."""
133-
return self.state in self.SLURM_SCONTROL_ONLINE_STATES
134-
135-
def is_configuring_job(self):
136-
"""Check if slurm node is configuring with job and haven't begun to run a job."""
137-
return self.is_powering_up() and self.has_job()
138-
139-
def is_power_with_job(self):
140-
"""Dynamic nodes allocated a job but power up process has not started yet."""
141-
return self.state == self.SLURM_SCONTROL_POWER_WITH_JOB_STATE
142-
143-
def is_running_job(self):
144-
"""Check if slurm node is running a job but not in configuring job state."""
145-
return not self.is_powering_up() and self.has_job() and not self.is_power_with_job()
146-
147-
def is_resume_failed(self):
148-
"""Check if node resume timeout expires."""
149-
return self.state == self.SLURM_SCONTROL_RESUME_FAILED_STATE
150-
151-
def __eq__(self, other):
152-
"""Compare 2 SlurmNode objects."""
153-
if isinstance(other, SlurmNode):
154-
return self.__dict__ == other.__dict__
155-
return False
156-
157-
def __repr__(self):
158-
attrs = ", ".join(["{key}={value}".format(key=key, value=repr(value)) for key, value in self.__dict__.items()])
159-
return "{class_name}({attrs})".format(class_name=self.__class__.__name__, attrs=attrs)
160-
161-
def __str__(self):
162-
return f"{self.name}({self.nodeaddr})"
163-
164-
165-
class InvalidNodenameError(ValueError):
166-
r"""
167-
Exception raised when encountering a NodeName that is invalid/incorrectly formatted.
168-
169-
Valid NodeName format: {queue-name}-{st/dy}-{instancetype}-{number}
170-
And match: ^([a-z0-9\-]+)-(st|dy)-([a-z0-9-]+)-\d+$
171-
Sample NodeName: queue-1-st-c5xlarge-2
172-
"""
173-
174-
pass
175-
176-
177-
def parse_nodename(nodename):
178-
"""Parse queue_name, node_type (st vs dy) and instance_type from nodename."""
179-
nodename_capture = re.match(r"^([a-z0-9\-]+)-(st|dy)-([a-z0-9]+)-\d+$", nodename)
180-
if not nodename_capture:
181-
raise InvalidNodenameError
182-
183-
queue_name, node_type, instance_name = nodename_capture.groups()
184-
return queue_name, node_type, instance_name
185-
186-
18762
def is_static_node(nodename):
18863
"""
18964
Check if the node is static or dynamic.
@@ -227,7 +102,6 @@ def update_nodes(
227102
update_cmd += f" state={state}"
228103
if reason:
229104
update_cmd += f' reason="{reason}"'
230-
231105
for nodenames, addrs, hostnames in batched_node_info:
232106
node_info = f"nodename={nodenames}"
233107
if addrs:
@@ -248,7 +122,7 @@ def update_partitions(partitions, state):
248122
)
249123
succeeded_partitions.append(partition)
250124
except Exception as e:
251-
logging.error("Failed when setting partition %s to %s with error %s", partition, state, e)
125+
log.error("Failed when setting partition %s to %s with error %s", partition, state, e)
252126

253127
return succeeded_partitions
254128

@@ -261,15 +135,15 @@ def update_all_partitions(state, reset_node_addrs_hostname):
261135
partition_to_update = []
262136
for part in partitions:
263137
if PartitionStatus(part.state) != PartitionStatus(state):
264-
logging.info(f"Setting partition {part.name} state from {part.state} to {state}")
138+
log.info(f"Setting partition {part.name} state from {part.state} to {state}")
265139
if reset_node_addrs_hostname:
266-
logging.info(f"Resetting partition nodes {part.nodes}")
267-
reset_nodes(part.nodes, state="power_down", reason="stopping cluster")
140+
log.info(f"Resetting partition nodes {part.nodenames}")
141+
reset_nodes(part.nodenames, state="power_down", reason="stopping cluster")
268142
partition_to_update.append(part.name)
269143
succeeded_partitions = update_partitions(partition_to_update, state)
270144
return succeeded_partitions == partition_to_update
271145
except Exception as e:
272-
logging.error("Failed when updating partitions with error %s", e)
146+
log.error("Failed when updating partitions with error %s", e)
273147
return False
274148

275149

@@ -296,15 +170,13 @@ def _batch_node_info(nodenames, nodeaddrs, nodehostnames, batch_size):
296170
try:
297171
nodeaddrs_batch = _batch_attribute(nodeaddrs, batch_size, expected_length=len(nodenames))
298172
except ValueError:
299-
logging.error("Nodename %s and NodeAddr %s contain different number of entries", nodenames, nodeaddrs)
173+
log.error("Nodename %s and NodeAddr %s contain different number of entries", nodenames, nodeaddrs)
300174
raise
301175
if nodehostnames:
302176
try:
303177
nodehostnames_batch = _batch_attribute(nodehostnames, batch_size, expected_length=len(nodenames))
304178
except ValueError:
305-
logging.error(
306-
"Nodename %s and NodeHostname %s contain different number of entries", nodenames, nodehostnames
307-
)
179+
log.error("Nodename %s and NodeHostname %s contain different number of entries", nodenames, nodehostnames)
308180
raise
309181

310182
return zip(nodename_batch, nodeaddrs_batch, nodehostnames_batch)
@@ -467,53 +339,14 @@ def _parse_nodes_info(slurm_node_info):
467339
for node in node_info:
468340
lines = node.strip().splitlines()
469341
if lines:
470-
slurm_nodes.append(SlurmNode(*lines))
471-
return slurm_nodes
472-
473-
474-
def get_nodes_type(nodes):
475-
"""Get node type given nodename. Node type format: NodeType(instancetype, queue_name)."""
476-
nodes_types = set()
477-
for node in nodes:
478-
queue_name, _, instance_name = parse_nodename(node.name)
479-
nodes_types.add(NodeType(instance_name, queue_name))
480-
return nodes_types
342+
try:
343+
if is_static_node(lines[0]):
344+
node = StaticNode(*lines)
345+
slurm_nodes.append(node)
346+
else:
347+
node = DynamicNode(*lines)
348+
slurm_nodes.append(node)
349+
except InvalidNodenameError:
350+
log.warning("Ignoring node %s because it has an invalid name", lines[0])
481351

482-
483-
def get_inactive_partition_names(partitions):
484-
"""Get the name of all inactive partitions from partitions and name mapping."""
485-
inactive_partitions_name = []
486-
for partition_name, partition in partitions.items():
487-
if partition.state == "INACTIVE":
488-
inactive_partitions_name.append(partition_name)
489-
return inactive_partitions_name
490-
491-
492-
def retrieve_partitions_from_node_types(node_types):
493-
"""Retrieve partitions for node types."""
494-
return {node_type.partition for node_type in node_types}
495-
496-
497-
def get_partition_node_names(nodenames):
498-
"""
499-
Convert partition nodenames read from partition to a list of nodes.
500-
501-
Example input nodenames: "queue1-st-c5xlarge-[1,3,4-5],queue1-st-c5large-20"
502-
Example output [queue1-st-c5xlarge-1, queue1-st-c5xlarge-3, queue1-st-c5xlarge-4, queue1-st-c5xlarge-5,
503-
queue1-st-c5large-20]
504-
"""
505-
if type(nodenames) is str:
506-
matches = re.findall(r"((([a-z0-9\-]+)-(st|dy)-([a-z0-9]+)-)(\[[\d+,-]+\]|\d+))", nodenames)
507-
# [('queue1-st-c5xlarge-[1,3,4-5]', 'queue1-st-c5xlarge-', 'queue1', 'st', 'c5xlarge', '[1,3,4-5]'),
508-
# ('queue1-st-c5large-20', 'queue1-st-c5large-', 'queue1', 'st', 'c5large', '20')]
509-
partition_node_names = []
510-
for match in matches:
511-
nodename, prefix, _, _, _, nodes = match
512-
if "[" not in nodes:
513-
# Single nodename
514-
partition_node_names.append(nodename)
515-
else:
516-
# Multiple nodenames
517-
node_list = convert_range_to_list(nodes.strip("[]"))
518-
partition_node_names += [prefix + str(n) for n in node_list]
519-
return partition_node_names
352+
return slurm_nodes

src/common/utils.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,22 @@ def convert_range_to_list(node_range):
259259
),
260260
[],
261261
)
262+
263+
264+
def time_is_up(initial_time, current_time, grace_time):
265+
"""Check if timeout is exceeded."""
266+
# Localize datetime objects to UTC if not previously localized
267+
# All timestamps used in this function should be already localized
268+
# Assume timestamp was taken from system local time if there is no localization info
269+
if not initial_time.tzinfo:
270+
logging.warning(
271+
"Timestamp %s is not localized. Please double check that this is expected, localizing to UTC.", initial_time
272+
)
273+
initial_time = initial_time.astimezone(tz=timezone.utc)
274+
if not current_time.tzinfo:
275+
logging.warning(
276+
"Timestamp %s is not localized. Please double check that this is expected, localizing to UTC", current_time
277+
)
278+
current_time = current_time.astimezone(tz=timezone.utc)
279+
time_diff = (current_time - initial_time).total_seconds()
280+
return time_diff >= grace_time

0 commit comments

Comments
 (0)