From c313a520e20b0bfeab65d83c0154fee0d6a0ca58 Mon Sep 17 00:00:00 2001 From: sjl Date: Mon, 1 Jul 2024 23:23:41 +0000 Subject: [PATCH 01/16] add iter_internal_block_refs Signed-off-by: sjl --- python/ray/data/dataset.py | 38 +++++++++++++++++++++------ python/ray/data/tests/test_formats.py | 17 ++++++++++++ 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 54ee2b6d62c4..99e74e2d51a1 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -12,6 +12,7 @@ Dict, Generic, Iterable, + Iterator, List, Literal, Mapping, @@ -2486,11 +2487,12 @@ def count(self) -> int: get_num_rows = cached_remote_fn(_get_num_rows) - return sum( - ray.get( - [get_num_rows.remote(block) for block in self.get_internal_block_refs()] - ) - ) + # Directly loop over the iterator of `BlockRef`s instead of first + # retrieving a list of `BlockRef`s. + total_rows = 0 + for block_ref in self.iter_internal_block_refs(): + total_rows += ray.get(get_num_rows.remote(block_ref)) + return total_rows @ConsumptionAPI( if_more_than_read=True, @@ -4563,7 +4565,29 @@ def stats(self) -> str: def _get_stats_summary(self) -> DatasetStatsSummary: return self._plan.stats_summary() - @ConsumptionAPI(pattern="Time complexity:") + @ConsumptionAPI(pattern="") + @DeveloperAPI + def iter_internal_block_refs(self) -> Iterator[ObjectRef[Block]]: + """Get an iterator over references to the underlying blocks of this Dataset. + + This function can be used for zero-copy access to the data. It does not + keep the data materialized in-memory. + + Examples: + >>> import ray + >>> ds = ray.data.range(1) + >>> for block_ref in ds.get_internal_block_refs(): + ... block = ray.get(block_ref) + + Returns: + An iterator over references to this Dataset's blocks. + """ + iter_block_refs_md, _, _ = self._plan.execute_to_iterator() + iter_block_refs = (block_ref for block_ref, _ in iter_block_refs_md) + self._synchronize_progress_bar() + return iter_block_refs + + @ConsumptionAPI(pattern="") @DeveloperAPI def get_internal_block_refs(self) -> List[ObjectRef[Block]]: """Get a list of references to the underlying blocks of this dataset. @@ -4577,8 +4601,6 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: >>> ds.get_internal_block_refs() [ObjectRef(...)] - Time complexity: O(1) - Returns: A list of references to this dataset's blocks. """ diff --git a/python/ray/data/tests/test_formats.py b/python/ray/data/tests/test_formats.py index 900c249d5030..bec9e37bf1ba 100644 --- a/python/ray/data/tests/test_formats.py +++ b/python/ray/data/tests/test_formats.py @@ -85,6 +85,23 @@ def test_get_internal_block_refs(ray_start_regular_shared): assert out == list(range(10)), out +def test_iter_internal_block_refs(ray_start_regular_shared): + n = 10 + iter_block_refs = ray.data.range( + n, override_num_blocks=n + ).iter_internal_block_refs() + + out = [] + block_ref_count = 0 + for block_ref in iter_block_refs: + b = ray.get(block_ref) + out.extend(extract_values("id", BlockAccessor.for_block(b).iter_rows(True))) + block_ref_count += 1 + out = sorted(out) + assert block_ref_count == n + assert out == list(range(n)), out + + def test_fsspec_filesystem(ray_start_regular_shared, tmp_path): """Same as `test_parquet_write` but using a custom, fsspec filesystem. From 5ef80417f23b14d541cd1fa0ca50c81d2aed8c0e Mon Sep 17 00:00:00 2001 From: sjl Date: Wed, 3 Jul 2024 04:32:26 +0000 Subject: [PATCH 02/16] return iterator over refbundles Signed-off-by: sjl --- python/ray/data/dataset.py | 53 +++++++++++++++++---------- python/ray/data/tests/test_formats.py | 20 +++++----- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 99e74e2d51a1..57c0ac38f3ae 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -78,6 +78,7 @@ VALID_BATCH_FORMATS, Block, BlockAccessor, + BlockMetadata, DataBatch, T, U, @@ -2460,12 +2461,13 @@ def show(self, limit: int = 20) -> None: @ConsumptionAPI( if_more_than_read=True, datasource_metadata="row count", - pattern="Time complexity:", + pattern="without reading in the entire data.", ) def count(self) -> int: - """Count the number of records in the dataset. - - Time complexity: O(dataset size / parallelism), O(1) for parquet + """Count the number of records in the dataset. For `Dataset`s + which only read Parquet files (created with :meth:`~ray.data.read_parquet`), + this method reads the file metadata to efficiently count the number of records + without reading in the entire data. Examples: >>> import ray @@ -2485,13 +2487,14 @@ def count(self) -> int: if meta_count is not None: return meta_count - get_num_rows = cached_remote_fn(_get_num_rows) - - # Directly loop over the iterator of `BlockRef`s instead of first - # retrieving a list of `BlockRef`s. + # Directly loop over the iterator of `RefBundle`s instead of + # retrieving a full list of `BlockRef`s. total_rows = 0 - for block_ref in self.iter_internal_block_refs(): - total_rows += ray.get(get_num_rows.remote(block_ref)) + for ref_bundle in self.iter_internal_ref_bundles(): + num_rows = ref_bundle.num_rows() + # Executing the dataset always returns blocks with valid `num_rows`. + assert num_rows is not None + total_rows += num_rows return total_rows @ConsumptionAPI( @@ -4567,25 +4570,35 @@ def _get_stats_summary(self) -> DatasetStatsSummary: @ConsumptionAPI(pattern="") @DeveloperAPI - def iter_internal_block_refs(self) -> Iterator[ObjectRef[Block]]: - """Get an iterator over references to the underlying blocks of this Dataset. - - This function can be used for zero-copy access to the data. It does not - keep the data materialized in-memory. + def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: + """Get an iterator over + :class:`~ray.data._internal.execution.interfaces.RefBundle`s + belonging to this Dataset. Calling this function doesn't keep + the data materialized in-memory. Examples: >>> import ray >>> ds = ray.data.range(1) - >>> for block_ref in ds.get_internal_block_refs(): - ... block = ray.get(block_ref) + >>> for ref_bundle in ds.get_internal_block_refs(): + ... for block_ref, block_md in ref_bundle.blocks: + ... block = ray.get(block_ref) Returns: - An iterator over references to this Dataset's blocks. + An iterator over this Dataset's + :class:`~ray.data._internal.execution.interfaces.RefBundle`s. """ + + def _build_ref_bundle( + blocks: Tuple[ObjectRef[Block], BlockMetadata], + ) -> RefBundle: + # Set `owns_blocks=True` so we can destroy the blocks eagerly + # after getting count from metadata. + return RefBundle((blocks,), owns_blocks=True) + iter_block_refs_md, _, _ = self._plan.execute_to_iterator() - iter_block_refs = (block_ref for block_ref, _ in iter_block_refs_md) + iter_ref_bundles = map(_build_ref_bundle, iter_block_refs_md) self._synchronize_progress_bar() - return iter_block_refs + return iter_ref_bundles @ConsumptionAPI(pattern="") @DeveloperAPI diff --git a/python/ray/data/tests/test_formats.py b/python/ray/data/tests/test_formats.py index bec9e37bf1ba..a813ffeff999 100644 --- a/python/ray/data/tests/test_formats.py +++ b/python/ray/data/tests/test_formats.py @@ -85,20 +85,20 @@ def test_get_internal_block_refs(ray_start_regular_shared): assert out == list(range(10)), out -def test_iter_internal_block_refs(ray_start_regular_shared): +def test_iter_internal_ref_bundles(ray_start_regular_shared): n = 10 - iter_block_refs = ray.data.range( - n, override_num_blocks=n - ).iter_internal_block_refs() + ds = ray.data.range(n, override_num_blocks=n) + iter_ref_bundles = ds.iter_internal_ref_bundles() out = [] - block_ref_count = 0 - for block_ref in iter_block_refs: - b = ray.get(block_ref) - out.extend(extract_values("id", BlockAccessor.for_block(b).iter_rows(True))) - block_ref_count += 1 + ref_bundle_count = 0 + for ref_bundle in iter_ref_bundles: + for block_ref, block_md in ref_bundle.blocks: + b = ray.get(block_ref) + out.extend(extract_values("id", BlockAccessor.for_block(b).iter_rows(True))) + ref_bundle_count += 1 out = sorted(out) - assert block_ref_count == n + assert ref_bundle_count == n assert out == list(range(n)), out From 374898b80c32ef8ce69cf50f556ee51041f9dc8a Mon Sep 17 00:00:00 2001 From: sjl Date: Wed, 3 Jul 2024 17:12:06 +0000 Subject: [PATCH 03/16] fix docs Signed-off-by: sjl --- python/ray/data/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 57c0ac38f3ae..86750027052a 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4579,7 +4579,7 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: Examples: >>> import ray >>> ds = ray.data.range(1) - >>> for ref_bundle in ds.get_internal_block_refs(): + >>> for ref_bundle in ds.iter_internal_ref_bundles(): ... for block_ref, block_md in ref_bundle.blocks: ... block = ray.get(block_ref) From b0ec8943b07c51d4c406f44f5b148cb3dd3d7b51 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 11:10:34 -0700 Subject: [PATCH 04/16] update consumption api usage Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 86750027052a..b5daf4062349 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2461,7 +2461,7 @@ def show(self, limit: int = 20) -> None: @ConsumptionAPI( if_more_than_read=True, datasource_metadata="row count", - pattern="without reading in the entire data.", + pattern="Examples:", ) def count(self) -> int: """Count the number of records in the dataset. For `Dataset`s From f0b49f1507d344932dbbc0553808cdf96b03dcdf Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 11:43:43 -0700 Subject: [PATCH 05/16] fix Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index b5daf4062349..56afd1e8efdf 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4568,11 +4568,10 @@ def stats(self) -> str: def _get_stats_summary(self) -> DatasetStatsSummary: return self._plan.stats_summary() - @ConsumptionAPI(pattern="") + @ConsumptionAPI(pattern="Examples:") @DeveloperAPI def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: - """Get an iterator over - :class:`~ray.data._internal.execution.interfaces.RefBundle`s + """Get an iterator over ``RefBundle``s belonging to this Dataset. Calling this function doesn't keep the data materialized in-memory. @@ -4584,8 +4583,7 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: ... block = ray.get(block_ref) Returns: - An iterator over this Dataset's - :class:`~ray.data._internal.execution.interfaces.RefBundle`s. + An iterator over this Dataset's ``RefBundle``s. """ def _build_ref_bundle( From aa368d4257fcc7663a6edbb4db372d2ef63da635 Mon Sep 17 00:00:00 2001 From: sjl Date: Wed, 3 Jul 2024 20:28:16 +0000 Subject: [PATCH 06/16] clean up Signed-off-by: sjl --- doc/source/data/api/dataset.rst | 1 + python/ray/data/dataset.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/doc/source/data/api/dataset.rst b/doc/source/data/api/dataset.rst index c884a3aaf0a1..44d52810ee58 100644 --- a/doc/source/data/api/dataset.rst +++ b/doc/source/data/api/dataset.rst @@ -131,6 +131,7 @@ Inspecting Metadata Dataset.input_files Dataset.stats Dataset.get_internal_block_refs + Dataset.iter_internal_ref_bundles Execution --------- diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 56afd1e8efdf..950c47e31384 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4571,7 +4571,7 @@ def _get_stats_summary(self) -> DatasetStatsSummary: @ConsumptionAPI(pattern="Examples:") @DeveloperAPI def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: - """Get an iterator over ``RefBundle``s + """Get an iterator over ``RefBundles`` belonging to this Dataset. Calling this function doesn't keep the data materialized in-memory. @@ -4583,7 +4583,7 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: ... block = ray.get(block_ref) Returns: - An iterator over this Dataset's ``RefBundle``s. + An iterator over this Dataset's ``RefBundles``. """ def _build_ref_bundle( From ab8d6ed4cf6a72d7ed753eceeb9973e0da5055b7 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 14:55:40 -0700 Subject: [PATCH 07/16] comments Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 950c47e31384..0874d9c140dc 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2464,10 +2464,11 @@ def show(self, limit: int = 20) -> None: pattern="Examples:", ) def count(self) -> int: - """Count the number of records in the dataset. For `Dataset`s - which only read Parquet files (created with :meth:`~ray.data.read_parquet`), - this method reads the file metadata to efficiently count the number of records - without reading in the entire data. + """Count the number of records in the dataset. + + For Datasets which only read Parquet files (created with + :meth:`~ray.data.read_parquet`), this method reads the file metadata to + efficiently count the number of records without reading in the entire data. Examples: >>> import ray @@ -4598,7 +4599,7 @@ def _build_ref_bundle( self._synchronize_progress_bar() return iter_ref_bundles - @ConsumptionAPI(pattern="") + @ConsumptionAPI(pattern="Examples:") @DeveloperAPI def get_internal_block_refs(self) -> List[ObjectRef[Block]]: """Get a list of references to the underlying blocks of this dataset. From 128614db0b6f6bf93f6f69c255b546cd885c48c0 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 15:04:27 -0700 Subject: [PATCH 08/16] comments Signed-off-by: Scott Lee --- python/ray/data/_internal/plan.py | 2 ++ python/ray/data/dataset.py | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 392cfbd592bc..a46d938ee257 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -414,6 +414,8 @@ def execute_to_iterator( metrics_tag = create_dataset_tag(self._dataset_name, self._dataset_uuid) executor = StreamingExecutor(copy.deepcopy(ctx.execution_options), metrics_tag) + # TODO(scottjlee): replace with `execute_to_legacy_bundle_iterator` and + # update execute_to_iterator usages to handle RefBundles instead of Blocks block_iter = execute_to_legacy_block_iterator( executor, self, diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 0874d9c140dc..195a04845cbf 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4590,8 +4590,6 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: def _build_ref_bundle( blocks: Tuple[ObjectRef[Block], BlockMetadata], ) -> RefBundle: - # Set `owns_blocks=True` so we can destroy the blocks eagerly - # after getting count from metadata. return RefBundle((blocks,), owns_blocks=True) iter_block_refs_md, _, _ = self._plan.execute_to_iterator() @@ -4616,6 +4614,7 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: Returns: A list of references to this dataset's blocks. """ + # TODO(scottjlee): replace get_internal_block_refs() usages with iter_internal_ref_bundles() block_refs = self._plan.execute().block_refs self._synchronize_progress_bar() return block_refs From 65c3f4ef6164ddebd4d5e39b2aed60e07a81e688 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 15:05:10 -0700 Subject: [PATCH 09/16] lint Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 195a04845cbf..f34113a47de9 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4614,7 +4614,8 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: Returns: A list of references to this dataset's blocks. """ - # TODO(scottjlee): replace get_internal_block_refs() usages with iter_internal_ref_bundles() + # TODO(scottjlee): replace get_internal_block_refs() usages with + # iter_internal_ref_bundles() block_refs = self._plan.execute().block_refs self._synchronize_progress_bar() return block_refs From 958d306a5167b1147dd1a2f8ea5c01d760491e85 Mon Sep 17 00:00:00 2001 From: sjl Date: Thu, 4 Jul 2024 00:18:11 +0000 Subject: [PATCH 10/16] fix tests Signed-off-by: sjl --- python/ray/data/dataset.py | 21 +++++++++++---------- python/ray/data/tests/test_zip.py | 12 ++++++++++-- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 4886509379c3..83bb31e5a563 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2464,9 +2464,9 @@ def show(self, limit: int = 20) -> None: pattern="Examples:", ) def count(self) -> int: - """Count the number of records in the dataset. For `Dataset`s + """Count the number of rows in the dataset. For `Dataset`s which only read Parquet files (created with :meth:`~ray.data.read_parquet`), - this method reads the file metadata to efficiently count the number of records + this method reads the file metadata to efficiently count the number of rows without reading in the entire data. Examples: @@ -4333,14 +4333,15 @@ def to_pandas(self, limit: int = None) -> "pandas.DataFrame": ValueError: if the number of rows in the :class:`~ray.data.Dataset` exceeds ``limit``. """ - count = self.count() - if limit is not None and count > limit: - raise ValueError( - f"the dataset has more than the given limit of {limit} " - f"rows: {count}. If you are sure that a DataFrame with " - f"{count} rows will fit in local memory, set ds.to_pandas(limit=None) " - "to disable limits." - ) + if limit is not None: + count = self.count() + if count > limit: + raise ValueError( + f"the dataset has more than the given limit of {limit} " + f"rows: {count}. If you are sure that a DataFrame with " + f"{count} rows will fit in local memory, set " + "ds.to_pandas(limit=None) to disable limits." + ) blocks = self.get_internal_block_refs() output = DelegatingBlockBuilder() for block in blocks: diff --git a/python/ray/data/tests/test_zip.py b/python/ray/data/tests/test_zip.py index 2df33ef8bb0e..a7130722427b 100644 --- a/python/ray/data/tests/test_zip.py +++ b/python/ray/data/tests/test_zip.py @@ -99,17 +99,25 @@ def test_zip_arrow(ray_start_regular_shared): ds2 = ray.data.range(5).map(lambda r: {"a": r["id"] + 1, "b": r["id"] + 2}) ds = ds1.zip(ds2) assert ds.count() == 5 - assert "{id: int64, a: int64, b: int64}" in str(ds) + result = list(ds.take()) assert result[0] == {"id": 0, "a": 1, "b": 2} + # Execute the dataset to get full schema. + ds = ds.materialize() + assert "{id: int64, a: int64, b: int64}" in str(ds) + # Test duplicate column names. ds = ds1.zip(ds1).zip(ds1) assert ds.count() == 5 - assert "{id: int64, id_1: int64, id_2: int64}" in str(ds) + result = list(ds.take()) assert result[0] == {"id": 0, "id_1": 0, "id_2": 0} + # Execute the dataset to get full schema. + ds = ds.materialize() + assert "{id: int64, id_1: int64, id_2: int64}" in str(ds) + def test_zip_multiple_block_types(ray_start_regular_shared): df = pd.DataFrame({"spam": [0]}) From 87151ee207acb61c4bde86722bf7eddacacf1200 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Fri, 5 Jul 2024 10:54:53 -0700 Subject: [PATCH 11/16] update tests Signed-off-by: Scott Lee --- python/ray/data/tests/test_zip.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/ray/data/tests/test_zip.py b/python/ray/data/tests/test_zip.py index a7130722427b..6c57afbb3a3b 100644 --- a/python/ray/data/tests/test_zip.py +++ b/python/ray/data/tests/test_zip.py @@ -82,17 +82,25 @@ def test_zip_pandas(ray_start_regular_shared): ds2 = ray.data.from_pandas(pd.DataFrame({"col3": ["a", "b"], "col4": ["d", "e"]})) ds = ds1.zip(ds2) assert ds.count() == 2 - assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds) + result = list(ds.take()) assert result[0] == {"col1": 1, "col2": 4, "col3": "a", "col4": "d"} + # Execute the dataset to get full schema. + ds = ds.materialize() + assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds) + ds3 = ray.data.from_pandas(pd.DataFrame({"col2": ["a", "b"], "col4": ["d", "e"]})) ds = ds1.zip(ds3) assert ds.count() == 2 - assert "{col1: int64, col2: int64, col2_1: object, col4: object}" in str(ds) + result = list(ds.take()) assert result[0] == {"col1": 1, "col2": 4, "col2_1": "a", "col4": "d"} + # Execute the dataset to get full schema. + ds = ds.materialize() + assert "{col1: int64, col2: int64, col2_1: object, col4: object}" in str(ds) + def test_zip_arrow(ray_start_regular_shared): ds1 = ray.data.range(5).map(lambda r: {"id": r["id"]}) From b6df226a3c8b9fc5d8d4e7d078b2638561442ae0 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 14:58:38 -0700 Subject: [PATCH 12/16] snapshot metadata only Signed-off-by: Scott Lee --- .../data/_internal/execution/legacy_compat.py | 46 ++++++++++++++++++- python/ray/data/_internal/plan.py | 9 ++++ python/ray/data/tests/test_zip.py | 24 ++-------- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index b7d489efe659..d4b3261f82ca 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -3,7 +3,7 @@ It should be deleted once we fully move to the new executor backend. """ -from typing import Iterator, Tuple +from typing import Iterator, Optional, Tuple from ray.data._internal.block_list import BlockList from ray.data._internal.execution.interfaces import ( @@ -11,10 +11,12 @@ PhysicalOperator, RefBundle, ) +from ray.data._internal.execution.interfaces.executor import OutputIterator from ray.data._internal.logical.optimizers import get_execution_plan from ray.data._internal.logical.util import record_operators_usage from ray.data._internal.plan import ExecutionPlan from ray.data._internal.stats import DatasetStats +from ray.data._internal.util import unify_block_metadata_schema from ray.data.block import Block, BlockMetadata from ray.types import ObjectRef @@ -59,6 +61,48 @@ def execute_to_legacy_bundle_iterator( dag = dag_rewrite(dag) bundle_iter = executor.execute(dag, initial_stats=stats) + + class CacheMetadataIterator(OutputIterator): + """Wrapper for `bundle_iterator` above. + + For a given iterator which yields output RefBundles, + cache the metadata from each output bundle, and yield + the original RefBundle.""" + + def __init__(self, base_iterator: OutputIterator): + # Note: the base_iterator should be of type StreamIterator, + # defined within `StreamingExecutor.execute()`. It must + # support the `get_next()` method. + self._base_iterator = base_iterator + + def get_next(self, output_split_idx: Optional[int] = None) -> RefBundle: + bundle = self._base_iterator.get_next(output_split_idx) + self._cache_metadata(bundle) + return bundle + + def _cache_metadata(self, bundle: RefBundle) -> RefBundle: + """Cache the metadata from each output bundle, so we can + access important information, such as row count, schema, etc.""" + if not plan._snapshot_metadata: + # Initialize the snapshot BlockMetadata. + plan._snapshot_metadata = BlockMetadata( + num_rows=bundle.num_rows(), + size_bytes=bundle.size_bytes(), + schema=unify_block_metadata_schema(bundle.metadata), + input_files=None, + exec_stats=None, + ) + else: + # Update the snapshot BlockMetadata. + snap_md = plan._snapshot_metadata + snap_md.num_rows += bundle.num_rows() + snap_md.size_bytes += bundle.size_bytes() + snap_md.schema = unify_block_metadata_schema( + [snap_md, *bundle.metadata] + ) + return bundle + + bundle_iter = CacheMetadataIterator(bundle_iter) return bundle_iter diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index a46d938ee257..9b158b7d9009 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -72,6 +72,12 @@ def __init__( self._snapshot_operator: Optional[LogicalOperator] = None self._snapshot_stats = None self._snapshot_bundle = None + # Snapshot of only metadata corresponding to the final operator's + # output bundles, used as the source of truth for the Dataset's schema + # and count. This is calculated and cached when the plan is executed as an + # iterator (`execute_to_iterator()`), and avoids caching + # all of the output blocks in memory like in `self.snapshot_bundle`. + self._snapshot_metadata: Optional[BlockMetadata] = None # Cached schema. self._schema = None @@ -148,6 +154,9 @@ def generate_logical_plan_string( # This plan has executed some but not all operators. schema = unify_block_metadata_schema(self._snapshot_bundle.metadata) count = self._snapshot_bundle.num_rows() + elif self._snapshot_metadata is not None: + schema = self._snapshot_metadata.schema + count = self._snapshot_metadata.num_rows else: # This plan hasn't executed any operators. sources = self._logical_plan.sources() diff --git a/python/ray/data/tests/test_zip.py b/python/ray/data/tests/test_zip.py index 6c57afbb3a3b..2df33ef8bb0e 100644 --- a/python/ray/data/tests/test_zip.py +++ b/python/ray/data/tests/test_zip.py @@ -82,50 +82,34 @@ def test_zip_pandas(ray_start_regular_shared): ds2 = ray.data.from_pandas(pd.DataFrame({"col3": ["a", "b"], "col4": ["d", "e"]})) ds = ds1.zip(ds2) assert ds.count() == 2 - + assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds) result = list(ds.take()) assert result[0] == {"col1": 1, "col2": 4, "col3": "a", "col4": "d"} - # Execute the dataset to get full schema. - ds = ds.materialize() - assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds) - ds3 = ray.data.from_pandas(pd.DataFrame({"col2": ["a", "b"], "col4": ["d", "e"]})) ds = ds1.zip(ds3) assert ds.count() == 2 - + assert "{col1: int64, col2: int64, col2_1: object, col4: object}" in str(ds) result = list(ds.take()) assert result[0] == {"col1": 1, "col2": 4, "col2_1": "a", "col4": "d"} - # Execute the dataset to get full schema. - ds = ds.materialize() - assert "{col1: int64, col2: int64, col2_1: object, col4: object}" in str(ds) - def test_zip_arrow(ray_start_regular_shared): ds1 = ray.data.range(5).map(lambda r: {"id": r["id"]}) ds2 = ray.data.range(5).map(lambda r: {"a": r["id"] + 1, "b": r["id"] + 2}) ds = ds1.zip(ds2) assert ds.count() == 5 - + assert "{id: int64, a: int64, b: int64}" in str(ds) result = list(ds.take()) assert result[0] == {"id": 0, "a": 1, "b": 2} - # Execute the dataset to get full schema. - ds = ds.materialize() - assert "{id: int64, a: int64, b: int64}" in str(ds) - # Test duplicate column names. ds = ds1.zip(ds1).zip(ds1) assert ds.count() == 5 - + assert "{id: int64, id_1: int64, id_2: int64}" in str(ds) result = list(ds.take()) assert result[0] == {"id": 0, "id_1": 0, "id_2": 0} - # Execute the dataset to get full schema. - ds = ds.materialize() - assert "{id: int64, id_1: int64, id_2: int64}" in str(ds) - def test_zip_multiple_block_types(ray_start_regular_shared): df = pd.DataFrame({"spam": [0]}) From 70f82de93a6821430c9cd5f4685760616e9e2632 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 15:06:34 -0700 Subject: [PATCH 13/16] clean up Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index c5380bb2fb16..eb5915e5f0f7 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4588,13 +4588,14 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: An iterator over this Dataset's ``RefBundles``. """ - def _build_ref_bundle( - blocks: Tuple[ObjectRef[Block], BlockMetadata], - ) -> RefBundle: - return RefBundle((blocks,), owns_blocks=True) + def _build_ref_bundles( + iter_blocks: Iterator[Tuple[ObjectRef[Block], BlockMetadata]], + ) -> Iterator[RefBundle]: + for block in iter_blocks: + yield RefBundle((block,), owns_blocks=True) iter_block_refs_md, _, _ = self._plan.execute_to_iterator() - iter_ref_bundles = map(_build_ref_bundle, iter_block_refs_md) + iter_ref_bundles = _build_ref_bundles(iter_block_refs_md) self._synchronize_progress_bar() return iter_ref_bundles From f520c62f460e821157afb89198699e1825dfbcab Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 16:11:58 -0700 Subject: [PATCH 14/16] update parquet test Signed-off-by: Scott Lee --- python/ray/data/tests/test_parquet.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 9e92e2536b12..8266e56a3325 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -329,10 +329,11 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): assert "test2.parquet" in str(input_files) assert not ds._plan.has_started_execution - # Schema isn't available, so we do a partial read. + # Dataset.schema() calls execute_to_iterator(), which caches the metadata. + # This means the schema and num_rows are available once `ds.schema()` is called. assert ds.schema() is not None - assert str(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds - assert repr(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds + assert str(ds) == "Dataset(num_rows=3, schema={one: int64, two: string})", ds + assert repr(ds) == "Dataset(num_rows=3, schema={one: int64, two: string})", ds assert ds._plan.has_started_execution assert not ds._plan.has_computed_output() From 629d6bb194ce1a097c6ff2352020b8f34b505f03 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 18:19:13 -0700 Subject: [PATCH 15/16] only cache metadata once iteration terminates Signed-off-by: Scott Lee --- .../data/_internal/execution/legacy_compat.py | 55 ++++++++++--------- python/ray/data/tests/test_consumption.py | 13 +++++ python/ray/data/tests/test_parquet.py | 7 +-- 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index d4b3261f82ca..0fccf070d95d 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -66,40 +66,43 @@ class CacheMetadataIterator(OutputIterator): """Wrapper for `bundle_iterator` above. For a given iterator which yields output RefBundles, - cache the metadata from each output bundle, and yield - the original RefBundle.""" + collect the metadata from each output bundle, and yield the + original RefBundle. Only after the entire iterator is exhausted, + we cache the resulting metadata to the execution plan.""" def __init__(self, base_iterator: OutputIterator): # Note: the base_iterator should be of type StreamIterator, # defined within `StreamingExecutor.execute()`. It must # support the `get_next()` method. self._base_iterator = base_iterator + self._collected_metadata = BlockMetadata( + num_rows=0, + size_bytes=0, + schema=None, + input_files=None, + exec_stats=None, + ) def get_next(self, output_split_idx: Optional[int] = None) -> RefBundle: - bundle = self._base_iterator.get_next(output_split_idx) - self._cache_metadata(bundle) - return bundle - - def _cache_metadata(self, bundle: RefBundle) -> RefBundle: - """Cache the metadata from each output bundle, so we can - access important information, such as row count, schema, etc.""" - if not plan._snapshot_metadata: - # Initialize the snapshot BlockMetadata. - plan._snapshot_metadata = BlockMetadata( - num_rows=bundle.num_rows(), - size_bytes=bundle.size_bytes(), - schema=unify_block_metadata_schema(bundle.metadata), - input_files=None, - exec_stats=None, - ) - else: - # Update the snapshot BlockMetadata. - snap_md = plan._snapshot_metadata - snap_md.num_rows += bundle.num_rows() - snap_md.size_bytes += bundle.size_bytes() - snap_md.schema = unify_block_metadata_schema( - [snap_md, *bundle.metadata] - ) + try: + bundle = self._base_iterator.get_next(output_split_idx) + self._collect_metadata(bundle) + return bundle + except StopIteration: + # Once the iterator is completely exhausted, we are done + # collecting metadata. We can add this cached metadata to the plan. + plan._snapshot_metadata = self._collected_metadata + raise + + def _collect_metadata(self, bundle: RefBundle) -> RefBundle: + """Collect the metadata from each output bundle and accumulate + results, so we can access important information, such as + row count, schema, etc., after iteration completes.""" + self._collected_metadata.num_rows += bundle.num_rows() + self._collected_metadata.size_bytes += bundle.size_bytes() + self._collected_metadata.schema = unify_block_metadata_schema( + [self._collected_metadata, *bundle.metadata] + ) return bundle bundle_iter = CacheMetadataIterator(bundle_iter) diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index cc806d905263..2bc6760553bd 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -158,6 +158,19 @@ def test_count_edge_case(ray_start_regular): assert actual_count == 5 +def test_count_after_partial_execution(ray_start_regular): + paths = ["example://iris.csv"] * 5 + ds = ray.data.read_csv(paths, override_num_blocks=15) + for batch in ds.iter_batches(batch_size=1): + # Take one batch and break to simulate partial iteration/execution. + break + # Row count should be unknown after partial execution. + assert "num_rows=?" in str(ds) + # After calling `ds.count()`, row count should be known. + assert ds.count() == 150 * 5 + assert f"num_rows={150*5}" in str(ds) + + def test_limit_execution(ray_start_regular): last_snapshot = get_initial_core_execution_metrics_snapshot() override_num_blocks = 20 diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 8266e56a3325..9e92e2536b12 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -329,11 +329,10 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): assert "test2.parquet" in str(input_files) assert not ds._plan.has_started_execution - # Dataset.schema() calls execute_to_iterator(), which caches the metadata. - # This means the schema and num_rows are available once `ds.schema()` is called. + # Schema isn't available, so we do a partial read. assert ds.schema() is not None - assert str(ds) == "Dataset(num_rows=3, schema={one: int64, two: string})", ds - assert repr(ds) == "Dataset(num_rows=3, schema={one: int64, two: string})", ds + assert str(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds + assert repr(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds assert ds._plan.has_started_execution assert not ds._plan.has_computed_output() From 4720bf6f6cdb6d6516c64a42119b40ca80879662 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 9 Jul 2024 12:08:24 -0700 Subject: [PATCH 16/16] comments Signed-off-by: Scott Lee --- python/ray/data/_internal/plan.py | 4 ++++ python/ray/data/tests/test_consumption.py | 10 ++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 9b158b7d9009..6c0ca99b70cc 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -77,6 +77,10 @@ def __init__( # and count. This is calculated and cached when the plan is executed as an # iterator (`execute_to_iterator()`), and avoids caching # all of the output blocks in memory like in `self.snapshot_bundle`. + # TODO(scottjlee): To keep the caching logic consistent, update `execute()` + # to also store the metadata in `_snapshot_metadata` instead of + # `_snapshot_bundle`. For example, we could store the blocks in + # `self._snapshot_blocks` and the metadata in `self._snapshot_metadata`. self._snapshot_metadata: Optional[BlockMetadata] = None # Cached schema. diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index 2bc6760553bd..4cef09afbc24 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -160,15 +160,17 @@ def test_count_edge_case(ray_start_regular): def test_count_after_partial_execution(ray_start_regular): paths = ["example://iris.csv"] * 5 - ds = ray.data.read_csv(paths, override_num_blocks=15) - for batch in ds.iter_batches(batch_size=1): + ds = ray.data.read_csv(paths) + for batch in ds.iter_batches(): # Take one batch and break to simulate partial iteration/execution. break # Row count should be unknown after partial execution. assert "num_rows=?" in str(ds) - # After calling `ds.count()`, row count should be known. - assert ds.count() == 150 * 5 + + # After iterating over bundles and completing execution, row count should be known. + list(ds.iter_internal_ref_bundles()) assert f"num_rows={150*5}" in str(ds) + assert ds.count() == 150 * 5 def test_limit_execution(ray_start_regular):