Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add repartition by maximum number of rows per block #49748

Closed
wants to merge 200 commits into from

Conversation

srinathk10
Copy link
Contributor

Why are these changes needed?

Add ability to repartition based on maximum number of rows per block.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@srinathk10 srinathk10 requested a review from a team as a code owner January 9, 2025 20:31
Signed-off-by: Srinath Krishnamachari <[email protected]>
yield block


class StreamingRepartitionOperator(OneToOneOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raulchen @srinathk10 i'm still thinking that making this MapOperator is a better shot

  • Drastically reducing amount of code that we need
  • Operator fusion will still work out of the box

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin Yes, resolution is to use MapOperator here.

Comment on lines 46 to 54
# Initialize lists to store metadata and partitioned blocks
metadata_list: List[BlockMetadata] = []
block_list: List[Block] = []

# Initialize a builder to accumulate rows into new blocks
cur_block_builder = DelegatingBlockBuilder()

# Iterate over each input block and partition based on max_num_rows_per_block
for block in input_blocks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the thoroughness, but i don't think we should spare the comments for some of the logic that is complicated and requires a context and avoiding annotating trivial ones (like init, etc)

# Iterate over each input block and partition based on max_num_rows_per_block
for block in input_blocks:

# Create an accessor to interact with the current block data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, please clean these up across the board

Comment on lines 76 to 109
if remaining_rows_in_block + num_rows_in_cur_block < max_num_rows_per_block:

# Add remaining rows to the current block builder
end_idx_in_block = start_idx_in_block + remaining_rows_in_block
cur_block_builder.add_block(
accessor.slice(start_idx_in_block, end_idx_in_block, copy=True)
)
break # Exit the while loop as no partitioning is needed yet

# If the current block can exactly match the max row limit
elif (
remaining_rows_in_block + num_rows_in_cur_block
== max_num_rows_per_block
):

# Partition the current block and create a new one
end_idx_in_block = start_idx_in_block + remaining_rows_in_block
cur_block_builder.add_block(
accessor.slice(start_idx_in_block, end_idx_in_block, copy=True)
)

# Finalize the partitioned block and store its metadata and content
partitioned_block = cur_block_builder.build()
metadata = BlockAccessor.for_block(partitioned_block).get_metadata()
metadata_list.append(metadata)
block_list.append(partitioned_block)

# Update the starting index for the next partition
start_idx_in_block += remaining_rows_in_block

# Reset the builder for the next partition
cur_block_builder = DelegatingBlockBuilder()

# If the current block exceeds the max row limit, partition it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can unify these branches

Comment on lines 112 to 122
# Partition the block into smaller pieces based on the max row limit
end_idx_in_block = start_idx_in_block + max_num_rows_per_block
cur_block_builder.add_block(
accessor.slice(start_idx_in_block, end_idx_in_block, copy=True)
)

# Finalize the partitioned block and store its metadata and content
partitioned_block = cur_block_builder.build()
metadata = BlockAccessor.for_block(partitioned_block).get_metadata()
metadata_list.append(metadata)
block_list.append(partitioned_block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid duplicating the code (for ex, metadata could be collecting at the end, we can lift slicing up, etc)

dentiny and others added 25 commits February 2, 2025 07:19
I mistakenly approved #49513
before making copy edits. This PR is a mulligan.

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: angelinalg <[email protected]>
Signed-off-by: Sven Mika <[email protected]>
Signed-off-by: angelinalg <[email protected]>
Co-authored-by: Sven Mika <[email protected]>
Probably not perfect, but a small patch fix...

Without this:
```
> serve run main:app FOO='abc=def'
Error: Invalid key-value string 'FOO=abc=def'. Must be of the form 'key=value'
```

---------

Signed-off-by: Edward Oakes <[email protected]>
A rewrite of #48788

---------

Signed-off-by: dentiny <[email protected]>
The read_parquet implementation might infer the schema from the first file. This PR adds a test to ensure that implementation handles the case where the first file has no data and the inferred type is null.


---------

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: dentiny <[email protected]>
## Why are these changes needed?

Tunes the Python garbage collector to reduce its frequency running in
the proxy by default. A feature flag is introduced to disable this
behavior and/or tune the threshold.

## Benchmarks

```python
from ray import serve

@serve.deployment(
    max_ongoing_requests=100,
    num_replicas=16,
    ray_actor_options={"num_cpus": 0},
)
class A:
    async def __call__(self):
        return b"hi"

app = A.bind()
```

```
ab -n 10000 -c 100 http://127.0.0.1:8000/
```

No optimization:
```
Concurrency Level:      100
Time taken for tests:   11.985 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      1910000 bytes
HTML transferred:       120000 bytes
Requests per second:    834.34 [#/sec] (mean)
Time per request:       119.855 [ms] (mean)
Time per request:       1.199 [ms] (mean, across all concurrent requests)
Transfer rate:          155.62 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.6      0      28
Processing:     5  119  30.3    121     227
Waiting:        3  118  30.2    120     225
Total:          5  120  30.2    121     227

Percentage of the requests served within a certain time (ms)
  50%    121
  66%    128
  75%    135
  80%    141
  90%    158
  95%    173
  98%    189
  99%    196
 100%    227 (longest request)
```

GC freeze only:
```
Concurrency Level:      100
Time taken for tests:   11.838 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      1910000 bytes
HTML transferred:       120000 bytes
Requests per second:    844.72 [#/sec] (mean)
Time per request:       118.383 [ms] (mean)
Time per request:       1.184 [ms] (mean, across all concurrent requests)
Transfer rate:          157.56 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.7      0      28
Processing:     5  117  31.5    119     302
Waiting:        3  116  31.5    118     300
Total:          5  118  31.5    119     303

Percentage of the requests served within a certain time (ms)
  50%    119
  66%    127
  75%    134
  80%    138
  90%    151
  95%    165
  98%    184
  99%    230
 100%    303 (longest request)
```

GC threshold set to `10_000` (*default after this PR*):
```
Concurrency Level:      100
Time taken for tests:   11.223 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      1910000 bytes
HTML transferred:       120000 bytes
Requests per second:    891.00 [#/sec] (mean)
Time per request:       112.233 [ms] (mean)
Time per request:       1.122 [ms] (mean, across all concurrent requests)
Transfer rate:          166.19 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.5      0      23
Processing:     5  111  26.9    116     202
Waiting:        2  110  27.0    115     199
Total:          5  112  26.8    116     202

Percentage of the requests served within a certain time (ms)
  50%    116
  66%    124
  75%    128
  80%    132
  90%    146
  95%    154
  98%    164
  99%    169
 100%    202 (longest request)
```

GC threshold set to `100_000`:
```
Concurrency Level:      100
Time taken for tests:   11.481 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      1910000 bytes
HTML transferred:       120000 bytes
Requests per second:    870.98 [#/sec] (mean)
Time per request:       114.813 [ms] (mean)
Time per request:       1.148 [ms] (mean, across all concurrent requests)
Transfer rate:          162.46 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.3      0       3
Processing:     5  114  25.0    112     256
Waiting:        2  113  25.0    111     254
Total:          5  114  24.9    112     256

Percentage of the requests served within a certain time (ms)
  50%    112
  66%    116
  75%    119
  80%    123
  90%    150
  95%    159
  98%    185
  99%    201
 100%    256 (longest request)
```

---------

Signed-off-by: Edward Oakes <[email protected]>
The iter_batches release test doesn't actually execute the dataset.

---------

Signed-off-by: Balaji Veeramani <[email protected]>
I just realized I made a mistake:
- In the prev cleanup PR
(#49739), I unified `cc_*` to
`ray_cc_*` with `sed`, but I forgot to remove the copts;
- Default copts have already been applied, no need to re-apply again at
BUILD rule,
https://github.com/ray-project/ray/blob/33a61aaa9f50be4e1572e732dfe3e92640a056cf/bazel/ray.bzl#L169-L176

Signed-off-by: dentiny <[email protected]>
I forgot to check the result for writing PID into cgroup files.

---------

Signed-off-by: dentiny <[email protected]>
…ter]` in `read` / `write` function (#49737)

1. Most channel read/write functions don't call
`ensure_registered_as_[reader|writer]`. This PR ensures that
`ensure_registered_as_[reader|writer]` is called if it hasn't already
been called.

2. Fixed a bug in `ensure_registered_as_reader` for
`TorchTensorNcclChannel`. It is unnecessary to call
`ensure_registered_as_reader` for the internal GPU channel if the reader
reads from the local channel."

After #49711 is merged, we can have a central place to verify whether
`ensure_registered_as_reader` is called.

---------

Signed-off-by: Kai-Hsun Chen <[email protected]>
…remove redundant struct statement (#49766)

This PR includes the following refactoring:  
- Updated the function signatures of the `CallbackReply` constructor,
`ParseAsStringArrayOrScanArray`, and `ParseAsStringArray` to accept
const references instead of pointers as input.
- Removed the redundant `struct` statement in `RedisResponseFn`.

Signed-off-by: Chi-Sheng Liu <[email protected]>
…rrying additional headers to Prometheus (#49700)

Signed-off-by: Rueian <[email protected]>
…Iterator (#49724)

Adds the `DataIterator.iter_rows` method to the public api, removes the
beta tag from the DataIterator API.
---------

Signed-off-by: Matthew Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.