Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REP] Ray Export API. #55

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
153 changes: 153 additions & 0 deletions reps/2024-06-13-export-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
## Summary
### General Motivation

In the current design of Ray, the way to export various states in the Ray cluster are inconsistent.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the State API is typically recommended for end users to fetch current state info of a running Ray cluster, but the state API gets data from multiple sources depending on the resource type.

The export API will provide another way to fetch state data that works well for large scale clusters or can be saved to query after the cluster is terminated, but I don’t see disjoint interfaces for state data as the main motivation for this feature.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I mentioned it in the Key requirements:
To obtain the current status of the ray cluster at one time, you need to use state api.

For example, the state of the Actor is broadcast through GCS pubsub, and to obtain the state change of the node,
it is necessary to query rpc service (NodeInfoGcsService). For the jobs submitted through job submission,
there is no way to expose the state. The high-level libraries on top of Ray also don't have a unified export way.
E.g. RayServe and RayData collect the states through their own StateActor and report to the Dashboard respectively.

It is very difficult to obtain these ray basic states outside the Ray cluster. This issue is reflected in two aspects:
1. Scale of data exceeds current dashboard API limits.
2. After the Ray cluster terminates, all status data is lost.
If a unified export API can be defined,
we can achieve the observable ability independent of the Ray cluster. The most typical scenario is to build the Ray history server.
Comment on lines +4 to +14

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would re-organize this to something like

Suggested change
In the current design of Ray, the way to export various states in the Ray cluster are inconsistent.
For example, the state of the Actor is broadcast through GCS pubsub, and to obtain the state change of the node,
it is necessary to query rpc service (NodeInfoGcsService). For the jobs submitted through job submission,
there is no way to expose the state. The high-level libraries on top of Ray also don't have a unified export way.
E.g. RayServe and RayData collect the states through their own StateActor and report to the Dashboard respectively.
It is very difficult to obtain these ray basic states outside the Ray cluster. This issue is reflected in two aspects:
1. Scale of data exceeds current dashboard API limits.
2. After the Ray cluster terminates, all status data is lost.
If a unified export API can be defined,
we can achieve the observable ability independent of the Ray cluster. The most typical scenario is to build the Ray history server.
There are several major improvements requested by users about the Ray Dashboard
1. Currently, all state data is lost when the Ray cluster terminates and users are looking for persistence of this data.
2. The Ray dashboard has scalability limits on the amount of data stored and returned. Advanced users would benefit from greater scalability.
3. There is no unified and accurate way to get state events pushed on state change across various resource types. The State and dashboard APIs support a pull model, where it is possible to miss some events. There is also not a consistent way across resource types to get this information directly from components that generate it.
A unified export API could serve as a base to address these issues by allowing state data to be exported and processed separately. This would ensure that observability features remain functional independent of the Ray cluster.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to obtain the state change of the node, it is necessary to query rpc service (NodeInfoGcsService)

Just curious, do you or other users already query these GCS services directly to get data?


#### Key requirements:
- Need to expose all necessary ray states for basic observability, tasks/actor/jobs/nodes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good initial use case of this data is rebuilding the dashboard APIs, so we should make sure the schema for each resource contains all required fields. Timestamps should also be included so the reader can reconstruct a history of state changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to understand. Do you mean that we can use the data format of the dashboard api to correct the completeness of the exported data by the export api? For example, exported state is published to gcs, and dashboard gets data from gcs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we don’t need to exactly use the dashboard API schemas because that would require some post processing which we can leave to the users for flexibility. We should be able to postprocess the export API data to match the dashboard API responses though (which is needed for applications like history server)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Need to expose all necessary ray states for basic observability, tasks/actor/jobs/nodes.
- Need to expose necessary Ray state information which is currently returned in the dashboard APIs for tasks, actors, jobs, and nodes.

- Light load may be added when we export states, but we should be able to limit this impact (eg: max data stored in memory, not blocking any control logic).
The export API should not put too much load on the Ray cluster.
Comment on lines +18 to +19

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Light load may be added when we export states, but we should be able to limit this impact (eg: max data stored in memory, not blocking any control logic).
The export API should not put too much load on the Ray cluster.
- We should be able to limit any additional load from exporting state information (eg: max data stored in memory). Default configurations for the export API should have minimal impact on the Ray cluster and we will run performance tests to determine this.

- Export states streamingly rather than fetching it directly from ray cluster. To obtain the current status of the ray cluster at one time,
you need to use [the state observability api](https://github.com/ray-project/enhancements/blob/main/reps/2022-04-21-state-observability-apis.md "the state observability api") .
Comment on lines +20 to +21

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Export states streamingly rather than fetching it directly from ray cluster. To obtain the current status of the ray cluster at one time,
you need to use [the state observability api](https://github.com/ray-project/enhancements/blob/main/reps/2022-04-21-state-observability-apis.md "the state observability api") .
- Export events should be emitted on state change, rather than being pulled from the current status of the ray cluster (as done with [the state observability api](https://github.com/ray-project/enhancements/blob/main/reps/2022-04-21-state-observability-apis.md)).

This was how I interpreted it, please let me know if it refers to something else

- Certain states can be exported respectively. RayCores (actors/tasks/nodes), RayData, and RayServe-related states can be selectively exported.
For example, if a user is only interested in the state of RayServe and not in the state of RayCore, then the export of RayCore state can be disabled to reduce overhead.
Comment on lines +22 to +23

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Certain states can be exported respectively. RayCores (actors/tasks/nodes), RayData, and RayServe-related states can be selectively exported.
For example, if a user is only interested in the state of RayServe and not in the state of RayCore, then the export of RayCore state can be disabled to reduce overhead.
- [P1] Allow users to selectively enable exporting RayCore (actors/tasks/jobs/nodes), RayData, or RayServe-related states

Let’s label this P1 and move to the bottom of the list because it will be more relevant after we finish implementing the library events

- Friendly to all types of users (especially cloud vendors), easy to deploy and use, without modifying Ray.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this point because it doesn’t add a specific requirement. Was there something in particular you wanted to call out here?

- It should be possible to disable the export API for security and performance.
- The data should be exported from as close as possible to the source where it is generated. The export API should have minimal overhead and we should not need to unnecessarily move data to other nodes before it is exported.

MissiontoMars marked this conversation as resolved.
Show resolved Hide resolved
### Should this change be within ray or outside?

Yes, the states we want to export belong to Ray's internal components.

## Stewardship
### Required Reviewers
@nikitavemuri

### Shepherd of the Proposal (should be a senior committer)

## Design and Architecture
### Event
`Event` represents the state change in the Ray cluster, which indicates that
the state of a certain resource in the Ray Cluster has changed, such as the starting
and stopping of a job, the constuction and destruction of an Actor, etc. Event is structured data,
and each resource corresponds to a specified event type. All types of events follow a generally consistent format,
but the event fields of different types can be customized. E.g.:
nikitavemuri marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +43 to +44

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and each resource corresponds to a specified event type. All types of events follow a generally consistent format,
but the event fields of different types can be customized. E.g.:
and each resource corresponds to a specified event type. All events will follow a consistent schema with the `event_data` following the schema of the specific resource type. Event data schemas are in the below table for each resource
message ExportEvent {
enum SourceType {
EXPORT_TASK = 0;
EXPORT_NODE = 1;
EXPORT_ACTOR = 2;
EXPORT_DRIVER_JOB = 3;
EXPORT_SUBMISSION_JOB = 4;
}
// event_id is the unique ID of this event
string event_id = 1;
// source type is the type of the source
SourceType source_type = 2;
// source_hostname is the hostname of the source
// timestamp is the report milliseconds since 00:00, Jan 1 1970 UTC
int64 timestamp = 3;
// event_data follows the schema associated with the source_type
oneof event_data {
ExportTaskEventData task_event_data = 4;
ExportNodeData node_event_data = 5;
ExportActorData actor_event_data = 6;
ExportDriverJobEventData driver_job_event_data = 7;
ExportSubmissionJobEventData submission_job_event_data = 8;
}
}


```json
{"event_type": "JOB", "job": {"submission_id": "raysubmit_WBMEB9nTaMKmrKrN", "job_info": {"status": "RUNNING", "entrypoint": "python3 my_script.py", "message": null, "error_type": null, "start_time": 1692004435106, "end_time": null, "metadata": null, "entrypoint_num_cpus": null, "entrypoint_num_gpus": null, "entrypoint_resources": null, "driver_agent_http_address": null, "driver_node_id": null}}}

{"event_type": "ACTOR", "actor": {"actor_id": "efc87749b2e337d7872dcf3802000000", "actor_info": {"actorId": "efc87749b2e337d7872dcf3802000000", "jobId": "02000000", "address": {"rayletId": "ffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "workerId": "ffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "ipAddress": "", "port": 0}, "name": "_ray_internal_job_actor_raysubmit_WBMEB9nTaMKmrKrN", "className": "JobSupervisor", "state": "PENDING_CREATION", "numRestarts": "0", "timestamp": 0.0, "pid": 0, "startTime": 0, "endTime": 0, "actorClass": "JobSupervisor", "exitDetail": "-", "requiredResources": {}}}}

```
Comment on lines +46 to +51

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this because the schemas are added already


By aggregating all the Events in time dimension, we can obtain the historical state of the ray cluster,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean By viewing all the Events?

and the consumers of the events can aggregate the final state from a series of events. The export api
does not guarantee the ultimate consistency of event state, which is feasible and tolerated in production practice.
The loss of some intermediate events will not cause the final state to be incorrect. For example,
if the event of an actor's creation is lost, but the consumer can read the event of actor exiting,
then the consumer knows that the actor finally ran successfully.
Comment on lines +54 to +58

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and the consumers of the events can aggregate the final state from a series of events. The export api
does not guarantee the ultimate consistency of event state, which is feasible and tolerated in production practice.
The loss of some intermediate events will not cause the final state to be incorrect. For example,
if the event of an actor's creation is lost, but the consumer can read the event of actor exiting,
then the consumer knows that the actor finally ran successfully.
and the consumers of the events can aggregate the final state from a series of events. The export API doesn’t guarantee every state transition is logged, however it should be possible reconstruct the current state even if intermediate states are missing.



The various components of Ray, including GCS, Dashboard, raylet, and worker, can all serve as event generators.
However, these sources don't need to concern themselves with the details of how events are stored, transmitted, or published.

### Events to Export and How
#### Events to Export
- Core
Tasks/Actors/Objects/Jobs/Nodes/Placement groups, including its meta data and current status when event emits.

| Event | Event Source | When to export | Format example(maybe a file link)|
| ---- | ---- | ---- | ---- |
| Tasks | CoreWorker and raylet | When coreworker adds task event to<br>ray::core::worker::TaskEventBuffer | https://docs.google.com/document/d/1fMt9Rf7dDTKA6AP62hatr7ovChqng_hTt2WAuhCqWko/edit |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| Tasks | CoreWorker and raylet | When coreworker adds task event to<br>ray::core::worker::TaskEventBuffer | https://docs.google.com/document/d/1fMt9Rf7dDTKA6AP62hatr7ovChqng_hTt2WAuhCqWko/edit |
| Tasks | CoreWorker and raylet | When events in `ray::core::worker::TaskEventBuffer` are flushed. This is not on the control path for tasks, and task data to proto and exporting is done in a separate thread. | https://docs.google.com/document/d/1fMt9Rf7dDTKA6AP62hatr7ovChqng_hTt2WAuhCqWko/edit |

| Actors | GCS | GcsPublisher::PublishActor<br>publish actor status through<br>GCS_ACTOR_CHANNEL | https://docs.google.com/document/d/1_7Z81KtmjA3IAEeZqEuxvw6ur5acyU-CrofQ6rFEmTg/edit |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| Actors | GCS | GcsPublisher::PublishActor<br>publish actor status through<br>GCS_ACTOR_CHANNEL | https://docs.google.com/document/d/1_7Z81KtmjA3IAEeZqEuxvw6ur5acyU-CrofQ6rFEmTg/edit |
| Actors | GCS | When actor status is published through `GCS_ACTOR_CHANNEL` and as callback to `gcs_table_storage_->ActorTable().Put()` when GCS actor table data is modified | https://docs.google.com/document/d/1_7Z81KtmjA3IAEeZqEuxvw6ur5acyU-CrofQ6rFEmTg/edit |

| Objects | CoreWorker and raylet | None | |
Copy link

@nikitavemuri nikitavemuri Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove objects from the table or mark as a P1

| Jobs | JobManager | When JobInfoStorageClient.put_status called | https://docs.google.com/document/d/1upQRU-f8WgVH_NWBmeJyegyOSJwNDPPqnl1cCGpmiGo/edit?usp=sharing |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| Jobs | JobManager | When JobInfoStorageClient.put_status called | https://docs.google.com/document/d/1upQRU-f8WgVH_NWBmeJyegyOSJwNDPPqnl1cCGpmiGo/edit?usp=sharing |
| Jobs | JobManager | Job submission events will be exported when `JobInfoStorageClient.put_info` is called. Job driver events are exported as a callback to `gcs_table_storage_->JobTable().Put()` when the GCS job table data is modified. Both types of job events are needed to reconstruct the job dashboard APIs | https://docs.google.com/document/d/1upQRU-f8WgVH_NWBmeJyegyOSJwNDPPqnl1cCGpmiGo/edit?usp=sharing |

| Nodes | GCS | GcsNodeManager::HandleXXXNode | https://docs.google.com/document/d/1qjoF51h2oUN2sr_MtPnovbNFZYZrh3WLNR_P0HrUuOI/edit?usp=sharing |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| Nodes | GCS | GcsNodeManager::HandleXXXNode | https://docs.google.com/document/d/1qjoF51h2oUN2sr_MtPnovbNFZYZrh3WLNR_P0HrUuOI/edit?usp=sharing |
| Nodes | GCS | As callback to gcs_table_storage_->NodeTable().Put() | https://docs.google.com/document/d/1qjoF51h2oUN2sr_MtPnovbNFZYZrh3WLNR_P0HrUuOI/edit?usp=sharing |

| Placement groups | GCS | GcsPlacementGroupManager::HandleXXXPlacementGroup | |
Copy link

@nikitavemuri nikitavemuri Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have an example schema for placement groups? Otherwise let’s remove from this table or mark as P1



- RayServe
MissiontoMars marked this conversation as resolved.
Show resolved Hide resolved
State change events for replicas, deployments, applications.

| Event | Event Source | When to export | Format example(maybe a file link)|
| ---- | ---- | ---- | ---- |
| Serve App | ServeController Actor | None | None |

- RayData
All datasets, the dag, and execution progress.

| Event | Event Source | When to export | Format example(maybe a file link)|
| ---- | ---- | ---- | ---- |
| Datasets | ray.data.internal.stats._StatsActor | _StatsActor's update function called. | None |
Comment on lines +79 to +91

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s actually remove the library events info for now because we don’t have schemas for these yet. Can just say We are planning to emit events for Ray Serve and Ray Data in the future after validating implementation and use cases of the core events.


Priorities:
- Actor/Task/Job/Node states (P0)
- Objects/PlacementGroups/Ray Serve/RayData states (P1)

#### How to export
- Generate event

We propose to use the filesystem based export interface. The event source can just write the
event into the file of this node and then return. The export of the event can be the responsibility of the agent on the node. We know that there will be multiple worker processes running multiple actors or tasks on each Ray node. Events generated by multiple worker processes are written to multiple event files, not just one, in order to avoid competition among multiple worker processes or possible file writing errors.


- Export event

We propose to use pubsub for event export, because pubsub is easier to decouple systems,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion is we can limit this REP to generating the events and let users implement exporting the events to an external system. We can share recommendations on how to do this (eg: log ingestion using Vector which can send data to various sinks per the use case), but I think the pub sub event export isn’t necessary for this version to keep the interface simple and flexible

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or instead, we give some possible export methods (such as gcs or vector) as a suggestion, not a proposal?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it’s fine to describe some export options as a suggestion, but this shouldn’t be the key part of this REP. Could you condense and move this to a “How to use Export API” section, so it’s clear these are ideas for the users?

and it is also pluggable. Various components that support pubsub can be used,
such as GCS in ray cluster, and external systems such as Kafka, etc.
The agent monitors the event file generated on this node and publishes events to the backend.

<br>

<img src="./2024-06-13-export-api-figures/export-api-1.png" alt="alt" title="title" width="600">

<br>


Advantages:
1. For the event sources (gcs/worker/raylet), the export interface is very lightweight and stable,
and will not encounter abnormal errors such as network timeouts.
2. It decouples the process of the event source generating events and the actual export (publishing to gcs or kafka).
3. It is easy to control and configure. By configuring the agent to control whether to export,
control the event types that need to be exported, and control the export speed.
4. Using the pubsub mechanism, the event export is streamingly, the publisher and subscriber
can handle events at their own speed or ignore the events.
5. The mainstream pubsub system (kafka) supports repeated subscription and consumption, which is friendly to subscribers.


#### Usage
Based on the above very flexible solution, users with different needs can choose the most suitable deployment method.
1. Basic use cast: for ordinary users, the scale of the ray cluster and jobs is restricted. It is acceptable
to utilize the default GCS pubsub. Moreover, for cloud vendors, it is very friendly and important to use open-source
components easily, and Kafka is a very good choice.
2. Advanced use case: For users with the capability and demand for customized development, such as companies of a
certain scale that often have a relatively large Ray cluster scale and quantity and possess their own infrastructure.
When deploying Ray, it is necessary to adapt to their internal publish/subscribe system. By employing the exported
solution we proposed, only a small amount of modification to the agent is needed. For example, they can set up log
ingestion (eg: through Vector) on each worker and then exports for their own use cases.


### Status quo
At present, there is already an events mechanism implementation in Ray: components like gcs/raylet/workers
print structured logs (JSON) to the tmp_dir/log/events/ directory, and the event agent on each node
monitors these files and reports the file content to the event head.
This is also the source of the Events on the Overview page of the dashboard.

<br>

<img src="./2024-06-13-export-api-figures/export-api-2.png" alt="alt" title="title" width="600">

<br>

We propose to reuse the existing Ray event mechanism and build on it.