Skip to content

Conversation

tianyi-ge
Copy link
Contributor

@tianyi-ge tianyi-ge commented Sep 29, 2025

Why are these changes needed?

  1. currently, reporter agent is spawned by raylet process. It's assumed that all core workers are direct children of raylet, but it's not the case with new features (uv, image_url). reporter agent need another way to find all core workers.
    for proc in raylet_proc.children()
  2. driver is not spawned by raylet, thus is never monitored

implementation:

  1. add an grpc endpoint in raylet process (node manager), and allow reporter agent to connect
  2. reporter agent fetches worker lists via grpc reply, including driver. it creates a raylet client with a dedicated thread

Related issue number

Closes #56739

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Note

Reporter agent now fetches worker/driver PIDs via a new Raylet GetWorkerPIDs RPC using a new RayletClient binding, replacing psutil child-process scanning.

  • Backend (Raylet RPC):
    • Add GetWorkerPIDs RPC in node_manager.proto and wire it into NodeManagerService.
    • Implement NodeManager::HandleGetWorkerPIDs to return PIDs of all alive workers and drivers.
    • Extend RayletClient (C++) with GetWorkerPIDs(timeout_ms) and an alternate ctor (ip, port); expose to Python via Cython (includes/raylet_client.pxi, includes/common.pxd).
  • Python/Cython plumbing:
    • Include includes/raylet_client.pxi in _raylet.pyx to expose RayletClient to Python.
  • Dashboard Reporter:
    • Update reporter_agent.py to use RayletClient(ip, node_manager_port).get_worker_pids(timeout) to discover workers; build psutil.Process objects from returned PIDs.
    • Add RAYLET_RPC_TIMEOUT_SECONDS = 1 in dashboard/consts.py and use it for RPC timeout.
  • Server registration:
    • Register new handler in node_manager_server.h macro list.

Written by Cursor Bugbot for commit f76f633. This will update automatically on new commits. Configure here.

@tianyi-ge tianyi-ge requested a review from a team as a code owner September 29, 2025 15:33
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds a new gRPC endpoint to the node manager for fetching worker and driver PIDs, which is a solid approach for discovering all worker processes. The changes to the protobuf definition and the C++ implementation are mostly correct. However, I've found a critical issue in the Python client code due to a typo that would cause the RPC call to fail. I've also included a few suggestions for improving error handling and code efficiency.

@tianyi-ge tianyi-ge changed the title [core] add get pid rpc to node manager [core] allow reporter agent to get pid via rpc to raylet Sep 29, 2025
cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

@ray-gardener ray-gardener bot added core Issues that should be addressed in Ray Core observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling community-contribution Contributed by the community labels Sep 29, 2025
Comment on lines 523 to 524
// Get the worker managed by local raylet.
// Failure: Sends to local raylet, so should never fail.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should still add error handling & retries just in case (there could be a logical bug in the raylet)

@tianyi-ge
Copy link
Contributor Author

@edoakes thanks you for the prompt comments; I'll fix it soon. Also, after discussing with @can-anyscale , I'll replace python grpcio lib with a cython wrapper "RayletClient"

cursor[bot]

This comment was marked as outdated.

@can-anyscale can-anyscale self-assigned this Oct 1, 2025
Copy link
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's figure out a way to test that the solution work

)
try:
return raylet_client.get_worker_pids(timeout=timeout)
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not exception catch all; be explicit of what are the acceptable exceptions can be thrown from get_worker_pids and what exceptions ray should just fail out loud

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this should be rpc exceptions or something; try not to exception catch all if possible

rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply);
// Get the PIDs of all workers currently alive that are managed by the local Raylet.
// This includes connected driver processes.
// Failure: Will retry on failure with logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what "with logging" means? more useful information would be to retry how many time; what will the reply look like on failures (partial results, empty) etc.

cursor[bot]

This comment was marked as outdated.


ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int port)
: RayletClient() {
io_service_ = std::make_unique<instrumented_io_context>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe can just use this https://github.com/ray-project/ray/blob/master/src/ray/common/asio/asio_util.h#L53 and don't need to maintain the thread yourself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are also patterns here to make sure the io_context is reused across raylet client within one process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your suggestions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in the future, if raylet client is used at multiple places, reusing io_context is important.
But to use IOContextProvider, I have to create a "default io context" anyway. It's also manually maintained, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh dang sorry forgot to include the link to the pattern; you can create a static InstrumentedIOContextWithThread and reuse it across the constructor of ThreadedRayletClient https://github.com/ray-project/ray/blob/master/src/ray/gcs_rpc_client/gcs_client.cc#L219

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I run a test of creating 5 actors. The rpc reply has 12 processes, including 5 actors, 5 idle workers, driver (python in the following screenshot) and a dashboard server head, which aligns with the dashboard.

2025-10-08 11:29:33,355	INFO reporter_agent.py:913 -- Worker PIDs from raylet: [41692, 41694, 41685, 41689, 41693, 41688, 41690, 41691, 41687, 41686, 41676, 41618]
image

should dashboard server head be here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah -- I don't think the dashboard server head should be there... the reason it's showing up is because it is connecting to ray with ray.init. We'll need some way of filtering it. I believe it is started in a namespace prefixed with _ray_internal. We do other such filtering here:

# This includes the _ray_internal_dashboard job that gets automatically

If the namespace is available in the raylet, we can add the filtering there and exclude any workers that are associated with a _ray_internal* namespace

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, for system driver processes, we should hide them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @edoakes . I added a new option filter_system_drivers. It finds the corresponding namesapce and check its prefix. now dashboard server head is gone
image

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

)
try:
return raylet_client.get_worker_pids(timeout=timeout)
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this should be rpc exceptions or something; try not to exception catch all if possible

rpc::ClientCallManager &client_call_manager,
std::function<void()> raylet_unavailable_timeout_callback);

RayletClient() = default;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this; ideally we shouldn't change raylet interface; it was designed so that it is always reuse the thread from its caller (so that ray logic won't compete with ray application logic)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because I need to construct threaded raylet client.

The current RayletClient constructor inits grpc_client_ and retryable_grpc_client_ right away, but ThreadedRayletClient need to init them after getting io_service and client_call_manager. so I need an default RayletClient constructor for ThreadedRayletClient constructor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kk perhaps move this constructor into protected

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't have this problem if we do wrapper instead of inheritance.

std::shared_ptr<std::vector<int32_t>> worker_pids, int64_t timeout_ms) {
rpc::GetWorkerPIDsRequest request;
auto promise = std::make_shared<std::promise<Status>>();
std::weak_ptr<std::promise<Status>> weak_promise = promise;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to use weak_ptr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because promise is captured in callback lambda. when GetWorkerPIDs ends, promise is destructed, but the callback will potentially be called after that. weak_ptr is to avoid use-after-free

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it makes sense

Signed-off-by: tianyi-ge <[email protected]>
@edoakes edoakes added the go add ONLY when ready to merge, run all tests label Oct 14, 2025
@edoakes
Copy link
Collaborator

edoakes commented Oct 14, 2025

Kicked off full CI tests: https://buildkite.com/ray-project/premerge/builds/51531

@can-anyscale
Copy link
Contributor

LGTM, pending for test results, thanks


/// Threaded raylet client is provided for python (e.g. ReporterAgent) to communicate with
/// raylet. It creates and manages a separate thread to run the grpc event loop
class ThreadedRayletClient : public RayletClient {
Copy link
Collaborator

@jjyao jjyao Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a ThreadedRayletClient? We don't have ThreadedGcsClient.

We can have something similar to ConnectOnSingletonIoContext

CreateRayletClientOnSingletonIoContext(ip, port) {
static InstrumentedIOContextWithThread io_context("raylet_client_io_context");
static ClientCallManager client_call_manager();
return RayletClient(ip, port, client_call_manager);
}

or we can create a wrapper of the existing raylet client instead of inheritance:

class RayletClientWithIoContext {
 raylet_client_;
 io_context_;
 client_call_manager_;
}

I think wrapper is probably cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I've considered wrapper before. as you mentioned, RayletClient then need to add GetWorkerPIDs and a new constructor Raylet(ip, port, client_call_manager). Both methods look good to me as they won't affect the cython usage, but wrapper seems a more decoupled way

Comment on lines 38 to 39
Status GetWorkerPIDs(std::shared_ptr<std::vector<int32_t>> worker_pids,
int64_t timeout_ms);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go into the existing RayletClient.

rpc::ClientCallManager &client_call_manager,
std::function<void()> raylet_unavailable_timeout_callback);

RayletClient() = default;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't have this problem if we do wrapper instead of inheritance.

return Status::TimedOut("Timed out getting worker PIDs from raylet");
}
return future.get();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Conflicting Timeouts in GetWorkerPIDs Method

The GetWorkerPIDs method uses the same timeout_ms for both the RPC call and the future.wait_for. This creates competing timeouts, which can cause the method to return TimedOut prematurely, even if the RPC call is still active or would eventually succeed.

Fix in Cursor Fix in Web

Copy link
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several nits, but overall LGTM, thank you.

I'll leave some times for @jjyao @edoakes to take a look too before merging.

# Get worker pids from raylet via gRPC.
return self._raylet_client.get_worker_pids()
except TimeoutError as e:
logger.debug(f"Failed to get worker pids from raylet: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.error

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use logger.exception here, not logger.error

logger.exception formats the stack trace automatically without need to include the exception repr

try:
proc = psutil.Process(pid)
workers[self._generate_worker_key(proc)] = proc
except (psutil.NoSuchProcess, psutil.AccessDenied):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.error("...")

if status.IsTimedOut():
raise TimeoutError(status.message())
elif not status.ok():
raise RuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also raises RuntimeError, need to catch this exception upstream

try:
# Get worker pids from raylet via gRPC.
return self._raylet_client.get_worker_pids()
except TimeoutError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch RuntimeError as well

///
/// \param filter_dead_drivers whether or not if this method will filter dead drivers
/// that are still registered.
/// \param filter_system_drivers whether or not if this method will filter system
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be called filter_ray_internal_processes @jjyao , @edoakes , I don't have context if we have system driver as a concept, or this PR is introducing a new concept

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter_system_drivers looks fine to me since it's mirroring filter_dead_drivers

@can-anyscale
Copy link
Contributor

Also kicking off a release test to check if the processes that the metrics reported do not regress. Here is the list of process get reported on master:

Screenshot 2025-10-15 at 12 13 50 PM

PushMutableObjectReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleGetWorkerPIDs(GetWorkerPIDsRequest request,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets make sure runtime env agent metrics are still reported

@can-anyscale
Copy link
Contributor

processes reported by this PR

Screenshot 2025-10-15 at 3 12 48 PM

@can-anyscale
Copy link
Contributor

@jjyao: all processes are reported as expected with this PR

@jjyao
Copy link
Collaborator

jjyao commented Oct 16, 2025

@jjyao: all processes are reported as expected with this PR

@can-anyscale I didn't see runtime env agent metrics from your screenshot.

@tianyi-ge
Copy link
Contributor Author

@jjyao Is runtime env agent a special driver or core worker? is it possible to assert it in my unittest?

return Status::TimedOut("Timed out getting worker PIDs from raylet");
}
return future.get();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race Condition in Dual Timeout Handling

The GetWorkerPIDs method has a race condition due to dual timeout handling. Both the RPC call and future.wait_for use the same timeout_ms, which can cause future.wait_for to incorrectly report a timeout even if the RPC successfully completed.

Fix in Cursor Fix in Web


def test_report_stats():
@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient")
def test_report_stats(mock_raylet_client):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mock_raylet_client is not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be used in ReporterAgent constructor to avoid creating a real grpc client

assert resp_data["rayInitCluster"] == meta["ray_init_cluster"]


def test_reporter_raylet_agent(ray_start_with_dashboard):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test depends on the fact the total cpu resource of the node is 1 so we don't create extra idle nodes. Could you make it explicit by doing

@pytest.mark.parametrize(
    "ray_start_with_dashboard",
    [
        {
            "num_cpus": 1,
        }
    ],
    indirect=True,
)

/// PID of GCS process to record metrics.
constexpr char kGcsPidKey[] = "gcs_pid";

// Please keep this in sync with the definition in ray_constants.py.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can enforce the sync by exposing the c++ constant to python via cython. We have examples in common.pxi and common.pxd:

RAY_NODE_TPU_POD_TYPE_KEY = kLabelKeyTpuPodType.decode()

// worker clients. The unavailable callback will eventually be retried so if this fails.
rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply);
// Get the PIDs of all workers currently alive that are managed by the local Raylet.
// This includes connected driver processes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention system drivers are excluded

Comment on lines 474 to 475
std::weak_ptr<std::promise<Status>> weak_promise = promise;
std::weak_ptr<std::vector<int32_t>> weak_worker_pids = worker_pids;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need weak_ptr and promise here?

def _get_worker_pids_from_raylet(self) -> List[int]:
try:
# Get worker pids from raylet via gRPC.
return self._raylet_client.get_worker_pids()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is PRC so we should make it async and change get_worker_pids_from_raylet to async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, @tianyi-ge, there is a pattern to turn this async grpc call into a await/future method in python, example here https://github.com/ray-project/ray/blob/master/python/ray/includes/gcs_client.pxi#L177-L191

raylet_proc = self._get_raylet_proc()
if raylet_proc is None:
pids = asyncio.run(self._get_worker_pids_from_raylet())
logger.debug(f"Worker PIDs from raylet: {pids}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Asyncio Loop Conflict in Worker Process Retrieval

The _get_worker_processes method uses asyncio.run() to execute _get_worker_pids_from_raylet(). Since the ReporterAgent runs within an existing asyncio event loop, calling asyncio.run() from it raises a RuntimeError and crashes the application.

Fix in Cursor Fix in Web

Signed-off-by: tianyi-ge <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Observability] Improve ray system metrics

4 participants