Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@
unit="seconds",
targets=[
Target(
expr='sum(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)',
expr='increase(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_task_outputs_generated{{{global_filters}, operator=~"$Operator"}}[5m])',
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W/O PR: shows total sum of block generation time (meaningless)
W/ PR: shows average block generation time over 5min period

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Prometheus Query Division by Zero

The 'Block Generation Time' panel's Prometheus query divides by increase(ray_data_num_task_outputs_generated[5m]). This denominator can be zero when no task outputs are generated in a 5-minute window, leading to NaN/Inf values or dashboard display issues.

Fix in Cursor Fix in Web

legend="Block Generation Time: {{dataset}}, {{operator}}",
)
],
Expand All @@ -466,7 +466,7 @@
unit="seconds",
targets=[
Target(
expr='sum(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)',
expr='increase(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_tasks_submitted{{{global_filters}, operator=~"$Operator"}}[5m])',
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W/O PR: shows total sum of submitted tasks (could be meaningful)
W/ PR: shows current # of submitted tasks (I find this more meaningful)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Prometheus Query Division by Zero Issue

The Prometheus query for the "Task Submission Backpressure Time" panel can result in division by zero. If no tasks are submitted within a 5-minute window, the denominator increase(ray_data_num_tasks_submitted...[5m]) becomes zero, leading to NaN or undefined values on the dashboard.

Fix in Cursor Fix in Web

legend="Backpressure Time: {{dataset}}, {{operator}}",
)
],
Expand Down
21 changes: 14 additions & 7 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ def shutdown(self, force: bool, exception: Optional[Exception] = None):
stats_summary_string = self._final_stats.to_summary().to_string(
include_parent=False
)
# Reset the scheduling loop duration gauge.
self._sched_loop_duration_s.set(0, tags={"dataset": self._dataset_id})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this meant to be nuked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the update_metrics calls it

# Reset the scheduling loop duration gauge + resource manager budgets/usages.
self._resource_manager.update_usages()
self.update_metrics(0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Executor Shutdown Fails to Reset Budget Gauges

During executor shutdown, update_usages() sets operator budgets to None. Since _update_budget_metrics() only updates gauges when the budget is not None, budget gauges are not reset to 0 and retain their last non-zero values.

Fix in Cursor Fix in Web

if self._data_context.enable_auto_log_stats:
logger.info(stats_summary_string)
# Close the progress bars from top to bottom to avoid them jumping
Expand Down Expand Up @@ -364,7 +365,12 @@ def update_metrics(self, sched_loop_duration: int):

def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]):
budget = self._resource_manager.get_budget(op)
if budget is not None:
if budget is None:
cpu_budget = 0
gpu_budget = 0
memory_budget = 0
object_store_memory_budget = 0
else:
# Convert inf to -1 to represent unlimited budget in metrics
cpu_budget = -1 if math.isinf(budget.cpu) else budget.cpu
gpu_budget = -1 if math.isinf(budget.gpu) else budget.gpu
Expand All @@ -374,10 +380,11 @@ def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]):
if math.isinf(budget.object_store_memory)
else budget.object_store_memory
)
self._cpu_budget_gauge.set(cpu_budget, tags=tags)
self._gpu_budget_gauge.set(gpu_budget, tags=tags)
self._memory_budget_gauge.set(memory_budget, tags=tags)
self._osm_budget_gauge.set(object_store_memory_budget, tags=tags)

self._cpu_budget_gauge.set(cpu_budget, tags=tags)
self._gpu_budget_gauge.set(gpu_budget, tags=tags)
self._memory_budget_gauge.set(memory_budget, tags=tags)
self._osm_budget_gauge.set(object_store_memory_budget, tags=tags)

def _update_max_bytes_to_read_metric(
self, op: PhysicalOperator, tags: Dict[str, str]
Expand Down