diff --git a/modin/core/io/column_stores/parquet_dispatcher.py b/modin/core/io/column_stores/parquet_dispatcher.py index e9e95d842a2..e5e3478ec45 100644 --- a/modin/core/io/column_stores/parquet_dispatcher.py +++ b/modin/core/io/column_stores/parquet_dispatcher.py @@ -22,6 +22,8 @@ from fsspec.spec import AbstractBufferedFile import numpy as np from packaging import version +from pandas.api.types import is_list_like +import s3fs from modin.core.storage_formats.pandas.utils import compute_chunksize from modin.config import NPartitions @@ -40,7 +42,7 @@ class ColumnStoreDataset: Attributes ---------- - path : str, path object or file-like object + paths : list-like of str, path object or file-like object The filepath of the parquet file in local filesystem or hdfs. storage_options : dict Parameters for specific storage engine. @@ -60,10 +62,13 @@ class ColumnStoreDataset: List that contains the full paths of the parquet files in the dataset. """ - def __init__(self, path, storage_options): # noqa : PR01 - self.path = path.__fspath__() if isinstance(path, os.PathLike) else path + def __init__(self, paths, storage_options): # noqa : PR01 + self.paths = [ + path.__fspath__() if isinstance(path, os.PathLike) else path + for path in paths + ] self.storage_options = storage_options - self._fs_path = None + self._fs_paths = None self._fs = None self.dataset = self._init_dataset() self._row_groups_per_file = None @@ -107,14 +112,18 @@ def fs(self): Filesystem object. """ if self._fs is None: - if isinstance(self.path, AbstractBufferedFile): - self._fs = self.path.fs + if isinstance(self.paths[0], AbstractBufferedFile): + self._fs = self.paths[0].fs else: - self._fs, self._fs_path = url_to_fs(self.path, **self.storage_options) + self._fs_paths = [ + url_to_fs(path, **self.storage_options)[1] for path in self.paths + ] + # If given list of paths, can assume they are all on the same filesystem + self._fs = url_to_fs(self.paths[0])[0] return self._fs @property - def fs_path(self): + def fs_paths(self): """ Return the filesystem-specific path or file handle. @@ -123,12 +132,18 @@ def fs_path(self): fs_path : str, path object or file-like object String path specific to filesystem or a file handle. """ - if self._fs_path is None: - if isinstance(self.path, AbstractBufferedFile): - self._fs_path = self.path + if self._fs_paths is None: + if len(self.paths) > 1: + self._fs_paths = self.paths + elif isinstance(self.paths[0], AbstractBufferedFile): + self._fs_paths = [self.paths[0]] else: - self._fs, self._fs_path = url_to_fs(self.path, **self.storage_options) - return self._fs_path + self._fs_paths = [ + url_to_fs(path, **self.storage_options)[1] for path in self.paths + ] + # If given list of paths, can assume they are all on the same filesystem + self._fs = url_to_fs(self.paths[0])[0] + return self._fs_paths def to_pandas_dataframe(self, columns): """ @@ -166,8 +181,8 @@ def _unstrip_protocol(protocol, path): return path return f"{protos[0]}://{path}" - if isinstance(self.path, AbstractBufferedFile): - return [self.path] + if len(self.paths) == 1 and isinstance(self.paths[0], AbstractBufferedFile): + return [self.paths[0]] # version.parse() is expensive, so we can split this into two separate loops if version.parse(fsspec.__version__) < version.parse("2022.5.0"): fs_files = [_unstrip_protocol(self.fs.protocol, fpath) for fpath in files] @@ -183,7 +198,7 @@ def _init_dataset(self): # noqa: GL08 from pyarrow.parquet import ParquetDataset return ParquetDataset( - self.fs_path, filesystem=self.fs, use_legacy_dataset=False + self.fs_paths, filesystem=self.fs, use_legacy_dataset=False ) @property @@ -231,7 +246,7 @@ def to_pandas_dataframe( from pyarrow.parquet import read_table return read_table( - self._fs_path, columns=columns, filesystem=self.fs + self._fs_paths, columns=columns, filesystem=self.fs ).to_pandas() @@ -240,7 +255,7 @@ class FastParquetDataset(ColumnStoreDataset): def _init_dataset(self): # noqa: GL08 from fastparquet import ParquetFile - return ParquetFile(self.fs_path, fs=self.fs) + return ParquetFile(self.fs_paths, fs=self.fs) @property def pandas_metadata(self): @@ -285,12 +300,13 @@ def _get_fastparquet_files(self): # noqa: GL08 # have to copy some of their logic here while we work on getting # an easier method to get a list of valid files. # See: https://github.com/dask/fastparquet/issues/795 - if "*" in self.path: + if "*" in self.paths: files = self.fs.glob(self.path) else: files = [ f - for f in self.fs.find(self.path) + for path in self.paths + for f in self.fs.find(path) if f.endswith(".parquet") or f.endswith(".parq") ] return files @@ -612,41 +628,49 @@ def _read(cls, path, engine, columns, **kwargs): ParquetFile API is used. Please refer to the documentation here https://arrow.apache.org/docs/python/parquet.html """ - if isinstance(path, str): - if os.path.isdir(path): - path_generator = os.walk(path) - else: - storage_options = kwargs.get("storage_options") - if storage_options is not None: - fs, fs_path = url_to_fs(path, **storage_options) + # s3fs.core.S3File is considered list-like + path_list = ( + path + if is_list_like(path) and not isinstance(path, s3fs.core.S3File) + else [path] + ) + for path in path_list: + if isinstance(path, str): + if os.path.isdir(path): + path_generator = os.walk(path) else: - fs, fs_path = url_to_fs(path) - path_generator = fs.walk(fs_path) - partitioned_columns = set() - # We do a tree walk of the path directory because partitioned - # parquet directories have a unique column at each directory level. - # Thus, we can use os.walk(), which does a dfs search, to walk - # through the different columns that the data is partitioned on - for _, dir_names, files in path_generator: - if dir_names: - partitioned_columns.add(dir_names[0].split("=")[0]) - if files: - # Metadata files, git files, .DSStore - # TODO: fix conditional for column partitioning, see issue #4637 - if len(files[0]) > 0 and files[0][0] == ".": - continue - break - partitioned_columns = list(partitioned_columns) - if len(partitioned_columns): - return cls.single_worker_read( - path, - engine=engine, - columns=columns, - reason="Mixed partitioning columns in Parquet", - **kwargs, - ) - - dataset = cls.get_dataset(path, engine, kwargs.get("storage_options") or {}) + storage_options = kwargs.get("storage_options") + if storage_options is not None: + fs, fs_path = url_to_fs(path, **storage_options) + else: + fs, fs_path = url_to_fs(path) + path_generator = fs.walk(fs_path) + partitioned_columns = set() + # We do a tree walk of the path directory because partitioned + # parquet directories have a unique column at each directory level. + # Thus, we can use os.walk(), which does a dfs search, to walk + # through the different columns that the data is partitioned on + for _, dir_names, files in path_generator: + if dir_names: + partitioned_columns.add(dir_names[0].split("=")[0]) + if files: + # Metadata files, git files, .DSStore + # TODO: fix conditional for column partitioning, see issue #4637 + if len(files[0]) > 0 and files[0][0] == ".": + continue + break + partitioned_columns = list(partitioned_columns) + if len(partitioned_columns): + return cls.single_worker_read( + path, + engine=engine, + columns=columns, + reason="Mixed partitioning columns in Parquet", + **kwargs, + ) + dataset = cls.get_dataset( + path_list, engine, kwargs.get("storage_options") or {} + ) index_columns = ( dataset.pandas_metadata.get("index_columns", []) if dataset.pandas_metadata @@ -659,5 +683,4 @@ def _read(cls, path, engine, columns, **kwargs): for c in column_names if c not in index_columns and not cls.index_regex.match(c) ] - return cls.build_query_compiler(dataset, columns, index_columns, **kwargs) diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 6e20e3b18bb..0035d655e6f 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -1356,6 +1356,20 @@ def test_read_parquet( columns=columns, ) + def test_read_parquet_list_of_files_5698(self, engine, make_parquet_file): + with ensure_clean(".parquet") as f1, ensure_clean( + ".parquet" + ) as f2, ensure_clean(".parquet") as f3: + for f in [f1, f2, f3]: + make_parquet_file(filename=f) + eval_io(fn_name="read_parquet", path=[f1, f2, f3], engine=engine) + + def test_empty_list(self, engine): + eval_io(fn_name="read_parquet", path=[], engine=engine) + + def test_list_of_s3_file(self, engine): + raise NotImplementedError + @pytest.mark.xfail( condition="config.getoption('--simulate-cloud').lower() != 'off'", reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",