-
-
Notifications
You must be signed in to change notification settings - Fork 10.6k
[DP] Create placement groups by ray_device_key #25026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -334,20 +334,22 @@ def create_dp_placement_groups( | |
"No nodes with resources found in Ray cluster.") | ||
assert dp_master_ip_key in nodes[0], ( | ||
"The DP master node (ip: %s) is missing or dead", dp_master_ip) | ||
device_str = current_platform.ray_device_key | ||
for node_resources in nodes: | ||
if "GPU" not in node_resources: | ||
if device_str not in node_resources: | ||
continue | ||
# For now, each DP rank can only be assigned to one node | ||
# TODO(rui): support allocating a single DP rank | ||
# to multiple nodes | ||
available_engine_count = int(node_resources["GPU"]) // world_size | ||
available_engine_count = int( | ||
node_resources[device_str]) // world_size | ||
if dp_master_ip_key in node_resources: | ||
assert available_engine_count >= local_engine_count, ( | ||
"Not enough resources to allocate DP ranks " | ||
f"on DP master node {dp_master_ip}") | ||
for i in range(local_engine_count): | ||
bundles = [{ | ||
"GPU": 1.0, | ||
device_str: 1.0, | ||
"node:" + dp_master_ip: 0.001 | ||
}] * world_size + [{ | ||
"CPU": 1.0 | ||
|
@@ -363,7 +365,7 @@ def create_dp_placement_groups( | |
for i in range(available_engine_count): | ||
if len(placement_groups) == num_pg_to_create: | ||
break | ||
bundles = [{"GPU": 1.0}] * world_size + [{"CPU": 1.0}] | ||
bundles = [{device_str: 1.0}] * world_size + [{"CPU": 1.0}] | ||
pg = ray.util.placement_group( | ||
name=f"dp_rank_{len(placement_groups)}", | ||
strategy="STRICT_PACK", | ||
|
@@ -415,17 +417,18 @@ def add_dp_placement_groups( | |
local_dp_ranks = [] | ||
num_pg_created = 0 | ||
|
||
device_str = current_platform.ray_device_key | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the device_str = current_platform.ray_device_key
assert device_str, (
"current_platform.ray_device_key is empty, indicating that data "
"parallelism with Ray is not supported on this platform.") |
||
for node in nodes: | ||
if num_pg_created >= num_pg_to_create: | ||
break | ||
|
||
node_ip = node.node_ip | ||
node_id = node.node_id | ||
available_gpus = int(available_resources[node_id]["GPU"]) | ||
available_gpus = int(available_resources[node_id][device_str]) | ||
|
||
# Get total GPUs on this node from the node's resources | ||
# Ray stores node resources with node ID as key | ||
total_gpus = int(total_resources[node_id]["GPU"]) | ||
total_gpus = int(total_resources[node_id][device_str]) | ||
|
||
# Calculate used GPUs and used engines on this node | ||
used_gpus = max(0, total_gpus - available_gpus) | ||
|
@@ -444,13 +447,13 @@ def add_dp_placement_groups( | |
# Create bundles with node constraint for master node | ||
if node_ip == dp_master_ip: | ||
bundles = [{ | ||
"GPU": 1.0, | ||
device_str: 1.0, | ||
"node:" + dp_master_ip: 0.001 | ||
}] * world_size + [{ | ||
"CPU": 1.0 | ||
}] | ||
else: | ||
bundles = [{"GPU": 1.0}] * world_size + [{"CPU": 1.0}] | ||
bundles = [{device_str: 1.0}] * world_size + [{"CPU": 1.0}] | ||
|
||
pg = ray.util.placement_group( | ||
name=f"dp_rank_{rank}", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
ray_device_key
can be an empty string for platforms that do not support Ray. Ifdevice_str
is empty, it will cause aKeyError
when used to accessnode_resources
on line 345. It's better to add an assertion to ensuredevice_str
is not empty, which provides an early and clear error message if data parallelism is attempted on an unsupported platform.