-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[core] allow reporter agent to get pid via rpc to raylet #57004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: tianyi-ge <[email protected]>
Signed-off-by: tianyi-ge <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request adds a new gRPC endpoint to the node manager for fetching worker and driver PIDs, which is a solid approach for discovering all worker processes. The changes to the protobuf definition and the C++ implementation are mostly correct. However, I've found a critical issue in the Python client code due to a typo that would cause the RPC call to fail. I've also included a few suggestions for improving error handling and code efficiency.
Signed-off-by: tianyi-ge <[email protected]>
src/ray/protobuf/node_manager.proto
Outdated
// Get the worker managed by local raylet. | ||
// Failure: Sends to local raylet, so should never fail. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should still add error handling & retries just in case (there could be a logical bug in the raylet)
@edoakes thanks you for the prompt comments; I'll fix it soon. Also, after discussing with @can-anyscale , I'll replace python grpcio lib with a cython wrapper "RayletClient" |
Signed-off-by: tianyi-ge <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's figure out a way to test that the solution work
) | ||
try: | ||
return raylet_client.get_worker_pids(timeout=timeout) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not exception catch all; be explicit of what are the acceptable exceptions can be thrown from get_worker_pids
and what exceptions ray should just fail out loud
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this should be rpc exceptions or something; try not to exception catch all if possible
src/ray/protobuf/node_manager.proto
Outdated
rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply); | ||
// Get the PIDs of all workers currently alive that are managed by the local Raylet. | ||
// This includes connected driver processes. | ||
// Failure: Will retry on failure with logging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what "with logging" means? more useful information would be to retry how many time; what will the reply look like on failures (partial results, empty) etc.
Signed-off-by: tianyi-ge <[email protected]>
|
||
ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int port) | ||
: RayletClient() { | ||
io_service_ = std::make_unique<instrumented_io_context>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe can just use this https://github.com/ray-project/ray/blob/master/src/ray/common/asio/asio_util.h#L53 and don't need to maintain the thread yourself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are also patterns here to make sure the io_context is reused across raylet client within one process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess in the future, if raylet client is used at multiple places, reusing io_context is important.
But to use IOContextProvider, I have to create a "default io context" anyway. It's also manually maintained, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh dang sorry forgot to include the link to the pattern; you can create a static InstrumentedIOContextWithThread and reuse it across the constructor of ThreadedRayletClient https://github.com/ray-project/ray/blob/master/src/ray/gcs_rpc_client/gcs_client.cc#L219
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I run a test of creating 5 actors. The rpc reply has 12 processes, including 5 actors, 5 idle workers, driver (python in the following screenshot) and a dashboard server head, which aligns with the dashboard.
2025-10-08 11:29:33,355 INFO reporter_agent.py:913 -- Worker PIDs from raylet: [41692, 41694, 41685, 41689, 41693, 41688, 41690, 41691, 41687, 41686, 41676, 41618]

should dashboard server head
be here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah -- I don't think the dashboard server head should be there... the reason it's showing up is because it is connecting to ray with ray.init
. We'll need some way of filtering it. I believe it is started in a namespace prefixed with _ray_internal
. We do other such filtering here:
# This includes the _ray_internal_dashboard job that gets automatically |
If the namespace is available in the raylet, we can add the filtering there and exclude any workers that are associated with a _ray_internal*
namespace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, for system driver processes, we should hide them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @edoakes . I added a new option filter_system_drivers
. It finds the corresponding namesapce and check its prefix. now dashboard server head is gone
…ylet client Signed-off-by: tianyi-ge <[email protected]>
Signed-off-by: tianyi-ge <[email protected]>
Signed-off-by: tianyi-ge <[email protected]>
Signed-off-by: tianyi-ge <[email protected]>
Signed-off-by: tianyi-ge <[email protected]>
Signed-off-by: tianyi-ge <[email protected]>
Signed-off-by: tianyi-ge <[email protected]>
) | ||
try: | ||
return raylet_client.get_worker_pids(timeout=timeout) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this should be rpc exceptions or something; try not to exception catch all if possible
rpc::ClientCallManager &client_call_manager, | ||
std::function<void()> raylet_unavailable_timeout_callback); | ||
|
||
RayletClient() = default; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this; ideally we shouldn't change raylet interface; it was designed so that it is always reuse the thread from its caller (so that ray logic won't compete with ray application logic)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because I need to construct threaded raylet client.
The current RayletClient
constructor inits grpc_client_
and retryable_grpc_client_
right away, but ThreadedRayletClient
need to init them after getting io_service
and client_call_manager
. so I need an default RayletClient
constructor for ThreadedRayletClient
constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk perhaps move this constructor into protected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't have this problem if we do wrapper instead of inheritance.
std::shared_ptr<std::vector<int32_t>> worker_pids, int64_t timeout_ms) { | ||
rpc::GetWorkerPIDsRequest request; | ||
auto promise = std::make_shared<std::promise<Status>>(); | ||
std::weak_ptr<std::promise<Status>> weak_promise = promise; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to use weak_ptr
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because promise
is captured in callback lambda. when GetWorkerPIDs
ends, promise is destructed, but the callback
will potentially be called after that. weak_ptr
is to avoid use-after-free
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it makes sense
Signed-off-by: tianyi-ge <[email protected]>
Kicked off full CI tests: https://buildkite.com/ray-project/premerge/builds/51531 |
LGTM, pending for test results, thanks |
|
||
/// Threaded raylet client is provided for python (e.g. ReporterAgent) to communicate with | ||
/// raylet. It creates and manages a separate thread to run the grpc event loop | ||
class ThreadedRayletClient : public RayletClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need a ThreadedRayletClient? We don't have ThreadedGcsClient.
We can have something similar to ConnectOnSingletonIoContext
CreateRayletClientOnSingletonIoContext(ip, port) {
static InstrumentedIOContextWithThread io_context("raylet_client_io_context");
static ClientCallManager client_call_manager();
return RayletClient(ip, port, client_call_manager);
}
or we can create a wrapper of the existing raylet client instead of inheritance:
class RayletClientWithIoContext {
raylet_client_;
io_context_;
client_call_manager_;
}
I think wrapper is probably cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I've considered wrapper before. as you mentioned, RayletClient then need to add GetWorkerPIDs
and a new constructor Raylet(ip, port, client_call_manager)
. Both methods look good to me as they won't affect the cython usage, but wrapper seems a more decoupled way
Status GetWorkerPIDs(std::shared_ptr<std::vector<int32_t>> worker_pids, | ||
int64_t timeout_ms); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should go into the existing RayletClient.
rpc::ClientCallManager &client_call_manager, | ||
std::function<void()> raylet_unavailable_timeout_callback); | ||
|
||
RayletClient() = default; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't have this problem if we do wrapper instead of inheritance.
…ientWithIoContext Signed-off-by: tianyi-ge <[email protected]>
Signed-off-by: tianyi-ge <[email protected]>
return Status::TimedOut("Timed out getting worker PIDs from raylet"); | ||
} | ||
return future.get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Conflicting Timeouts in GetWorkerPIDs
Method
The GetWorkerPIDs
method uses the same timeout_ms
for both the RPC call and the future.wait_for
. This creates competing timeouts, which can cause the method to return TimedOut
prematurely, even if the RPC call is still active or would eventually succeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Get worker pids from raylet via gRPC. | ||
return self._raylet_client.get_worker_pids() | ||
except TimeoutError as e: | ||
logger.debug(f"Failed to get worker pids from raylet: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should use logger.exception
here, not logger.error
logger.exception
formats the stack trace automatically without need to include the exception repr
try: | ||
proc = psutil.Process(pid) | ||
workers[self._generate_worker_key(proc)] = proc | ||
except (psutil.NoSuchProcess, psutil.AccessDenied): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.error("...")
if status.IsTimedOut(): | ||
raise TimeoutError(status.message()) | ||
elif not status.ok(): | ||
raise RuntimeError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this also raises RuntimeError
, need to catch this exception upstream
try: | ||
# Get worker pids from raylet via gRPC. | ||
return self._raylet_client.get_worker_pids() | ||
except TimeoutError as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catch RuntimeError as well
/// | ||
/// \param filter_dead_drivers whether or not if this method will filter dead drivers | ||
/// that are still registered. | ||
/// \param filter_system_drivers whether or not if this method will filter system |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter_system_drivers
looks fine to me since it's mirroring filter_dead_drivers
PushMutableObjectReply *reply, | ||
SendReplyCallback send_reply_callback) = 0; | ||
|
||
virtual void HandleGetWorkerPIDs(GetWorkerPIDsRequest request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets make sure runtime env agent metrics are still reported
@jjyao: all processes are reported as expected with this PR |
@can-anyscale I didn't see runtime env agent metrics from your screenshot. |
Signed-off-by: tianyi-ge <[email protected]>
@jjyao Is runtime env agent a special driver or core worker? is it possible to assert it in my unittest? |
return Status::TimedOut("Timed out getting worker PIDs from raylet"); | ||
} | ||
return future.get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
def test_report_stats(): | ||
@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") | ||
def test_report_stats(mock_raylet_client): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the mock_raylet_client
is not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be used in ReporterAgent constructor to avoid creating a real grpc client
assert resp_data["rayInitCluster"] == meta["ray_init_cluster"] | ||
|
||
|
||
def test_reporter_raylet_agent(ray_start_with_dashboard): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test depends on the fact the total cpu resource of the node is 1 so we don't create extra idle nodes. Could you make it explicit by doing
@pytest.mark.parametrize(
"ray_start_with_dashboard",
[
{
"num_cpus": 1,
}
],
indirect=True,
)
src/ray/common/constants.h
Outdated
/// PID of GCS process to record metrics. | ||
constexpr char kGcsPidKey[] = "gcs_pid"; | ||
|
||
// Please keep this in sync with the definition in ray_constants.py. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can enforce the sync by exposing the c++ constant to python via cython. We have examples in common.pxi
and common.pxd
:
ray/python/ray/includes/common.pxi
Line 161 in 9a434c7
RAY_NODE_TPU_POD_TYPE_KEY = kLabelKeyTpuPodType.decode() |
src/ray/protobuf/node_manager.proto
Outdated
// worker clients. The unavailable callback will eventually be retried so if this fails. | ||
rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply); | ||
// Get the PIDs of all workers currently alive that are managed by the local Raylet. | ||
// This includes connected driver processes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should mention system drivers are excluded
std::weak_ptr<std::promise<Status>> weak_promise = promise; | ||
std::weak_ptr<std::vector<int32_t>> weak_worker_pids = worker_pids; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need weak_ptr and promise here?
def _get_worker_pids_from_raylet(self) -> List[int]: | ||
try: | ||
# Get worker pids from raylet via gRPC. | ||
return self._raylet_client.get_worker_pids() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is PRC so we should make it async and change get_worker_pids_from_raylet
to async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, @tianyi-ge, there is a pattern to turn this async grpc call into a await/future method in python, example here https://github.com/ray-project/ray/blob/master/python/ray/includes/gcs_client.pxi#L177-L191
Signed-off-by: tianyi-ge <[email protected]>
raylet_proc = self._get_raylet_proc() | ||
if raylet_proc is None: | ||
pids = asyncio.run(self._get_worker_pids_from_raylet()) | ||
logger.debug(f"Worker PIDs from raylet: {pids}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: tianyi-ge <[email protected]>
Why are these changes needed?
ray/python/ray/dashboard/modules/reporter/reporter_agent.py
Line 911 in 10eacfd
implementation:
Related issue number
Closes #56739
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.Note
Reporter agent now fetches worker/driver PIDs via a new Raylet GetWorkerPIDs RPC using a new RayletClient binding, replacing psutil child-process scanning.
GetWorkerPIDs
RPC innode_manager.proto
and wire it intoNodeManagerService
.NodeManager::HandleGetWorkerPIDs
to return PIDs of all alive workers and drivers.RayletClient
(C++) withGetWorkerPIDs(timeout_ms)
and an alternate ctor(ip, port)
; expose to Python via Cython (includes/raylet_client.pxi
,includes/common.pxd
).includes/raylet_client.pxi
in_raylet.pyx
to exposeRayletClient
to Python.reporter_agent.py
to useRayletClient(ip, node_manager_port).get_worker_pids(timeout)
to discover workers; buildpsutil.Process
objects from returned PIDs.RAYLET_RPC_TIMEOUT_SECONDS = 1
indashboard/consts.py
and use it for RPC timeout.node_manager_server.h
macro list.Written by Cursor Bugbot for commit f76f633. This will update automatically on new commits. Configure here.