14
14
import json
15
15
import logging
16
16
import traceback
17
+ from typing import List
17
18
18
19
import yaml
19
20
20
21
log = logging .getLogger ()
21
22
22
23
24
+ CAPACITY_TYPE_MAP = {
25
+ "ONDEMAND" : "on-demand" ,
26
+ "SPOT" : "spot" ,
27
+ "CAPACITY_BLOCK" : "capacity-block" ,
28
+ }
29
+
30
+
23
31
class CriticalError (Exception ):
24
32
"""Critical error for the script."""
25
33
@@ -32,7 +40,7 @@ class ConfigurationFieldNotFoundError(Exception):
32
40
pass
33
41
34
42
35
- def generate_fleet_config_file (output_file , input_file ):
43
+ def generate_fleet_config_file (output_file : str , input_file : str ):
36
44
"""
37
45
Generate configuration file used by Fleet Manager in node daemon package.
38
46
@@ -64,21 +72,15 @@ def generate_fleet_config_file(output_file, input_file):
64
72
}
65
73
}
66
74
"""
67
- capacity_type_map = {
68
- "ONDEMAND" : "on-demand" ,
69
- "SPOT" : "spot" ,
70
- "CAPACITY_BLOCK" : "capacity-block" ,
71
- }
72
-
73
75
cluster_config = _load_cluster_config (input_file )
74
- queue , compute_resource = None , None
76
+ queue_name , compute_resource_name = None , None
75
77
try :
76
78
fleet_config = {}
77
79
for queue_config in cluster_config ["Scheduling" ]["SlurmQueues" ]:
78
- queue = queue_config ["Name" ]
80
+ queue_name = queue_config ["Name" ]
79
81
80
- # Retrieve capacity info from the queue , if there
81
- queue_capacity_type = capacity_type_map .get (queue_config .get ("CapacityType" , "ONDEMAND" ))
82
+ # Retrieve capacity info from the queue_name , if there
83
+ queue_capacity_type = CAPACITY_TYPE_MAP .get (queue_config .get ("CapacityType" , "ONDEMAND" ))
82
84
queue_allocation_strategy = queue_config .get ("AllocationStrategy" )
83
85
queue_capacity_reservation_target = queue_config .get ("CapacityReservationTarget" , {})
84
86
queue_capacity_reservation = (
@@ -87,66 +89,25 @@ def generate_fleet_config_file(output_file, input_file):
87
89
else None
88
90
)
89
91
90
- fleet_config [queue ] = {}
92
+ fleet_config [queue_name ] = {}
91
93
92
94
for compute_resource_config in queue_config ["ComputeResources" ]:
93
- compute_resource = compute_resource_config ["Name" ]
94
-
95
- # Override capacity info from the compute resource.
96
- # CapacityReservationTarget can be specified on both queue and compute resource level.
97
- # CapacityType and AllocationStrategy are not yet supported at compute resource level from the CLI,
98
- # but this code is ready to use them.
99
- capacity_type = capacity_type_map .get (compute_resource_config .get ("CapacityType" ), queue_capacity_type )
100
- allocation_strategy = compute_resource_config .get ("AllocationStrategy" , queue_allocation_strategy )
101
- capacity_reservation_target = compute_resource_config .get ("CapacityReservationTarget" , {})
102
- capacity_reservation = (
103
- capacity_reservation_target .get ("CapacityReservationId" , queue_capacity_reservation )
104
- if capacity_reservation_target
105
- else queue_capacity_reservation
95
+ compute_resource_name , config_for_fleet = _generate_compute_resource_fleet_config (
96
+ compute_resource_config = compute_resource_config ,
97
+ queue_name = queue_name ,
98
+ queue_allocation_strategy = queue_allocation_strategy ,
99
+ queue_capacity_reservation = queue_capacity_reservation ,
100
+ queue_capacity_type = queue_capacity_type ,
101
+ queue_subnets = queue_config ["Networking" ]["SubnetIds" ],
106
102
)
107
-
108
- config_for_fleet = {"CapacityType" : capacity_type }
109
- if capacity_reservation :
110
- config_for_fleet .update ({"CapacityReservationId" : capacity_reservation })
111
-
112
- if compute_resource_config .get ("Instances" ):
113
- # multiple instance types, create-fleet api
114
- config_for_fleet .update (
115
- {
116
- "Api" : "create-fleet" ,
117
- "Instances" : copy .deepcopy (compute_resource_config ["Instances" ]),
118
- "Networking" : {"SubnetIds" : queue_config ["Networking" ]["SubnetIds" ]},
119
- }
120
- )
121
- if allocation_strategy :
122
- config_for_fleet .update ({"AllocationStrategy" : allocation_strategy })
123
- if capacity_type == "spot" and compute_resource_config ["SpotPrice" ]:
124
- config_for_fleet .update ({"MaxPrice" : compute_resource_config ["SpotPrice" ]})
125
-
126
- elif compute_resource_config .get ("InstanceType" ):
127
- # single instance type, run-instances api
128
- config_for_fleet .update (
129
- {
130
- "Api" : "run-instances" ,
131
- "Instances" : [{"InstanceType" : compute_resource_config ["InstanceType" ]}],
132
- }
133
- )
134
-
135
- else :
136
- raise ConfigurationFieldNotFoundError (
137
- "Instances or InstanceType field not found "
138
- f"in queue: { queue } , compute resource: { compute_resource } configuration"
139
- )
140
-
141
- fleet_config [queue ][compute_resource ] = config_for_fleet
103
+ fleet_config [queue_name ][compute_resource_name ] = config_for_fleet
142
104
143
105
except (KeyError , AttributeError ) as e :
144
106
if isinstance (e , KeyError ):
145
107
message = f"Unable to find key { e } in the configuration file."
146
108
else :
147
109
message = f"Error parsing configuration file. { e } . { traceback .format_exc ()} ."
148
- message += f" Queue: { queue } " if queue else ""
149
- message += f" Compute resource: { compute_resource } " if compute_resource else ""
110
+ message += f" Queue: { queue_name } " if queue_name else ""
150
111
log .error (message )
151
112
raise CriticalError (message )
152
113
@@ -157,6 +118,79 @@ def generate_fleet_config_file(output_file, input_file):
157
118
log .info ("Finished." )
158
119
159
120
121
+ def _generate_compute_resource_fleet_config (
122
+ compute_resource_config : dict ,
123
+ queue_name : str ,
124
+ queue_allocation_strategy : str ,
125
+ queue_capacity_reservation : str ,
126
+ queue_capacity_type : str ,
127
+ queue_subnets : List ,
128
+ ):
129
+ """
130
+ Generate compute resource config to add in the fleet-config.json, overriding values from the queue.
131
+
132
+ CapacityReservationTarget can be specified on both queue and compute resource level.
133
+ CapacityType and AllocationStrategy are not yet supported at compute resource level from the CLI,
134
+ but this code is ready to use them.
135
+
136
+ Returns compute_resource name and fleet-config section for the given compute resource.
137
+ """
138
+ compute_resource_name = compute_resource_config ["Name" ]
139
+
140
+ try :
141
+ capacity_type = CAPACITY_TYPE_MAP .get (compute_resource_config .get ("CapacityType" ), queue_capacity_type )
142
+ config_for_fleet = {"CapacityType" : capacity_type }
143
+
144
+ capacity_reservation_target = compute_resource_config .get ("CapacityReservationTarget" , {})
145
+ capacity_reservation = (
146
+ capacity_reservation_target .get ("CapacityReservationId" , queue_capacity_reservation )
147
+ if capacity_reservation_target
148
+ else queue_capacity_reservation
149
+ )
150
+ if capacity_reservation :
151
+ config_for_fleet .update ({"CapacityReservationId" : capacity_reservation })
152
+
153
+ if compute_resource_config .get ("Instances" ):
154
+ # multiple instance types, create-fleet api
155
+ config_for_fleet .update (
156
+ {
157
+ "Api" : "create-fleet" ,
158
+ "Instances" : copy .deepcopy (compute_resource_config ["Instances" ]),
159
+ "Networking" : {"SubnetIds" : queue_subnets },
160
+ }
161
+ )
162
+ allocation_strategy = compute_resource_config .get ("AllocationStrategy" , queue_allocation_strategy )
163
+ if allocation_strategy :
164
+ config_for_fleet .update ({"AllocationStrategy" : allocation_strategy })
165
+ if capacity_type == "spot" and compute_resource_config ["SpotPrice" ]:
166
+ config_for_fleet .update ({"MaxPrice" : compute_resource_config ["SpotPrice" ]})
167
+
168
+ elif compute_resource_config .get ("InstanceType" ):
169
+ # single instance type, run-instances api
170
+ config_for_fleet .update (
171
+ {
172
+ "Api" : "run-instances" ,
173
+ "Instances" : [{"InstanceType" : compute_resource_config ["InstanceType" ]}],
174
+ }
175
+ )
176
+
177
+ else :
178
+ raise ConfigurationFieldNotFoundError (
179
+ "Instances or InstanceType field not found "
180
+ f"in queue: { queue_name } , compute resource: { compute_resource_name } configuration"
181
+ )
182
+ except (KeyError , AttributeError ) as e :
183
+ if isinstance (e , KeyError ):
184
+ message = f"Unable to find key { e } in the configuration file."
185
+ else :
186
+ message = f"Error parsing configuration file. { e } . { traceback .format_exc ()} ."
187
+ message += f" Queue: { queue_name } , Compute resource: { compute_resource_name } "
188
+ log .error (message )
189
+ raise CriticalError (message )
190
+
191
+ return compute_resource_name , config_for_fleet
192
+
193
+
160
194
def _load_cluster_config (input_file_path ):
161
195
"""Load cluster config file."""
162
196
with open (input_file_path , encoding = "utf-8" ) as input_file :
0 commit comments