Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions vllm/v1/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,20 +334,22 @@ def create_dp_placement_groups(
"No nodes with resources found in Ray cluster.")
assert dp_master_ip_key in nodes[0], (
"The DP master node (ip: %s) is missing or dead", dp_master_ip)
device_str = current_platform.ray_device_key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The ray_device_key can be an empty string for platforms that do not support Ray. If device_str is empty, it will cause a KeyError when used to access node_resources on line 345. It's better to add an assertion to ensure device_str is not empty, which provides an early and clear error message if data parallelism is attempted on an unsupported platform.

        device_str = current_platform.ray_device_key
        assert device_str, (
            "current_platform.ray_device_key is empty, indicating that data "
            "parallelism with Ray is not supported on this platform.")

for node_resources in nodes:
if "GPU" not in node_resources:
if device_str not in node_resources:
continue
# For now, each DP rank can only be assigned to one node
# TODO(rui): support allocating a single DP rank
# to multiple nodes
available_engine_count = int(node_resources["GPU"]) // world_size
available_engine_count = int(
node_resources[device_str]) // world_size
if dp_master_ip_key in node_resources:
assert available_engine_count >= local_engine_count, (
"Not enough resources to allocate DP ranks "
f"on DP master node {dp_master_ip}")
for i in range(local_engine_count):
bundles = [{
"GPU": 1.0,
device_str: 1.0,
"node:" + dp_master_ip: 0.001
}] * world_size + [{
"CPU": 1.0
Expand All @@ -363,7 +365,7 @@ def create_dp_placement_groups(
for i in range(available_engine_count):
if len(placement_groups) == num_pg_to_create:
break
bundles = [{"GPU": 1.0}] * world_size + [{"CPU": 1.0}]
bundles = [{device_str: 1.0}] * world_size + [{"CPU": 1.0}]
pg = ray.util.placement_group(
name=f"dp_rank_{len(placement_groups)}",
strategy="STRICT_PACK",
Expand Down Expand Up @@ -415,17 +417,18 @@ def add_dp_placement_groups(
local_dp_ranks = []
num_pg_created = 0

device_str = current_platform.ray_device_key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the create_dp_placement_groups function, ray_device_key can be an empty string here. If device_str is empty, it will cause a KeyError when used to access available_resources and total_resources on lines 427 and 431. An assertion should be added to ensure device_str is not empty and fail early on unsupported platforms.

        device_str = current_platform.ray_device_key
        assert device_str, (
            "current_platform.ray_device_key is empty, indicating that data "
            "parallelism with Ray is not supported on this platform.")

for node in nodes:
if num_pg_created >= num_pg_to_create:
break

node_ip = node.node_ip
node_id = node.node_id
available_gpus = int(available_resources[node_id]["GPU"])
available_gpus = int(available_resources[node_id][device_str])

# Get total GPUs on this node from the node's resources
# Ray stores node resources with node ID as key
total_gpus = int(total_resources[node_id]["GPU"])
total_gpus = int(total_resources[node_id][device_str])

# Calculate used GPUs and used engines on this node
used_gpus = max(0, total_gpus - available_gpus)
Expand All @@ -444,13 +447,13 @@ def add_dp_placement_groups(
# Create bundles with node constraint for master node
if node_ip == dp_master_ip:
bundles = [{
"GPU": 1.0,
device_str: 1.0,
"node:" + dp_master_ip: 0.001
}] * world_size + [{
"CPU": 1.0
}]
else:
bundles = [{"GPU": 1.0}] * world_size + [{"CPU": 1.0}]
bundles = [{device_str: 1.0}] * world_size + [{"CPU": 1.0}]

pg = ray.util.placement_group(
name=f"dp_rank_{rank}",
Expand Down