Skip to content

Commit

Permalink
[push_manager] change fulfillment of push request from round robin to…
Browse files Browse the repository at this point in the history
… FIFO
  • Loading branch information
Saurabh Vishwas Joshi authored and Saurabh Vishwas Joshi committed Oct 31, 2024
1 parent 6d879cd commit 47adfbc
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions src/ray/object_manager/push_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,19 @@ void PushManager::OnChunkComplete(const NodeID &dest_id, const ObjectID &obj_id)

void PushManager::ScheduleRemainingPushes() {
bool keep_looping = true;
// Loop over all active pushes for approximate round-robin prioritization.
// TODO(ekl) this isn't the best implementation of round robin, we should
// consider tracking the number of chunks active per-push and balancing those.

// Keep looping while we have capacity and are making progress
while (chunks_in_flight_ < max_chunks_in_flight_ && keep_looping) {
// Loop over each active push and try to send another chunk.
// Loop over the list of active pushes and try to send another chunk
auto it = push_requests_with_chunks_to_send_.begin();
keep_looping = false;
// Walk through each push request in the list
while (it != push_requests_with_chunks_to_send_.end() &&
chunks_in_flight_ < max_chunks_in_flight_) {
auto push_id = it->first;
auto &info = it->second;
if (info->SendOneChunk()) {
while (chunks_in_flight_ < max_chunks_in_flight_ && info->SendOneChunk()) {
// finish as many chunks of the head request as possible
chunks_in_flight_ += 1;
keep_looping = true;
RAY_LOG(DEBUG) << "Sending chunk " << info->next_chunk_id << " of "
Expand All @@ -83,6 +84,7 @@ void PushManager::ScheduleRemainingPushes() {
<< " / " << max_chunks_in_flight_
<< " max, remaining chunks: " << NumChunksRemaining();
}
// If all chunks are sent for the current request, remove it from the list
if (info->NoChunksToSend()) {
it = push_requests_with_chunks_to_send_.erase(it);
} else {
Expand Down

0 comments on commit 47adfbc

Please sign in to comment.