Skip to content

Commit

Permalink
Merge pull request #11 from emdgroup/feat/multiple_fixes
Browse files Browse the repository at this point in the history
cleanup and fixes
  • Loading branch information
jonas-w authored Apr 5, 2023
2 parents 09b4eb0 + d41d5f2 commit ed96e4b
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 33 deletions.
20 changes: 19 additions & 1 deletion docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog],
and this project adheres to [Semantic Versioning].

## [1.0.7] - 2023-04-05

### Added

- skip_instance_cache kwarg to FoundryFileSystem, which gets passed to fsspec
it should be set to True in a multithreaded environment (e.g. Streamlit),
as the same filesystem instance gets reused by default,
which resulted in weird behaviour. (#11)
- an extra check, which may prevent sending a bad request to foundry
when skip_instance_cache is not used (#11)

### Changed

- remove pandas from top level import, which should speed up the initial import (#11)
- restrict pandas to version less than 2,
as pyspark is currently not compatible with version 2 (#11)

## [1.0.6] - 2023-03-31

### Changed
Expand Down Expand Up @@ -63,10 +80,11 @@ and this project adheres to [Semantic Versioning].

[Keep a Changelog]: https://keepachangelog.com/en/1.0.0/
[Semantic Versioning]: https://semver.org/spec/v2.0.0.html
[1.0.7]: https://github.com/emdgroup/foundry-dev-tools/compare/v1.0.6...v1.0.7
[1.0.6]: https://github.com/emdgroup/foundry-dev-tools/compare/v1.0.5...v1.0.6
[1.0.5]: https://github.com/emdgroup/foundry-dev-tools/compare/v1.0.4...v1.0.5
[1.0.4]: https://github.com/emdgroup/foundry-dev-tools/compare/v1.0.3...v1.0.4
[1.0.3]: https://github.com/emdgroup/foundry-dev-tools/compare/v1.0.2...v1.0.3
[1.0.2]: https://github.com/emdgroup/foundry-dev-tools/releases/tag/v1.0.2
[1.0.1]: https://github.com/emdgroup/foundry-dev-tools
[1.0]: https://github.com/emdgroup/foundry-dev-tools
[1.0]: https://github.com/emdgroup/foundry-dev-tools
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ package_dir =
=src
install_requires =
pyarrow
pandas
pandas < 2 # until pyspark supports v2 or also sets this limitation
requests
fs
backoff
Expand Down Expand Up @@ -79,7 +79,7 @@ extras = True

[tool:pytest]
addopts =
--cov=src --cov-report term --cov-report xml:coverage.xml --cov-report html:htmlcov
# --cov=src --cov-report term --cov-report xml:coverage.xml --cov-report html:htmlcov
--verbose
norecursedirs =
dist
Expand Down
9 changes: 5 additions & 4 deletions src/foundry_dev_tools/cached_foundry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import time
from typing import Tuple, Union

import pandas as pd

import foundry_dev_tools
from foundry_dev_tools.foundry_api_client import (
BranchNotFoundError,
Expand Down Expand Up @@ -175,13 +173,13 @@ def _download_dataset_to_cache_dir(self, dataset_identity, branch, foundry_schem

def save_dataset(
self,
df: Union[pd.DataFrame, "pyspark.sql.DataFrame"],
df: Union["pd.DataFrame", "pyspark.sql.DataFrame"],
dataset_path_or_rid: str,
branch: str = "master",
exists_ok: bool = False,
mode: str = "SNAPSHOT",
) -> Tuple[str, str]:
# pylint: disable=invalid-name,too-many-arguments
# pylint: disable=invalid-name,too-many-arguments,too-many-locals
"""Saves a dataframe to Foundry. If the dataset in Foundry does not exist it is created.
If the branch does not exist, it is created. If the dataset exists, an exception is thrown.
Expand Down Expand Up @@ -218,6 +216,9 @@ def save_dataset(
raise ValueError("Please provide a dataset branch with parameter 'branch'")

with tempfile.TemporaryDirectory() as path:
# pylint: disable=import-outside-toplevel
import pandas as pd

if isinstance(df, pd.DataFrame):
df.to_parquet(
os.sep.join([path + "/dataset.parquet"]),
Expand Down
23 changes: 0 additions & 23 deletions src/foundry_dev_tools/foundry_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,8 +1324,6 @@ def query_foundry_sql(
"""
assert return_type in {"pandas", "arrow", "spark"}
_assert_pyarrow_packages_available()
_assert_pandas_packages_available()
foundry_sql_client = FoundrySqlClient(config=self._config, branch=branch)
try:
return foundry_sql_client.query(query=query, return_type=return_type)
Expand Down Expand Up @@ -1696,7 +1694,6 @@ def __init__(self, config: dict = None, branch="master"):
branch (str): default = master, all queries will be executed against this default branch
"""
_assert_pyarrow_packages_available()
self._config = foundry_dev_tools.Configuration.get_config(config)
self._requests_verify_value = _determine_requests_verify_value(self._config)
self.foundry_sql_server_api = (
Expand Down Expand Up @@ -1899,26 +1896,6 @@ def _transform_bad_request_response_to_exception(response):
raise DatasetNotFoundError("SQL")


def _assert_pyarrow_packages_available():
try:
# pylint: disable=import-outside-toplevel,unused-import
import pyarrow
except ImportError as err:
raise ValueError(
"Please install package 'pyarrow' to use SQL functionality."
) from err


def _assert_pandas_packages_available():
try:
# pylint: disable=import-outside-toplevel,unused-import
import pandas as pd
except ImportError as err:
raise ValueError(
"Please install package 'pandas' to use SQL functionality."
) from err


def _is_palantir_oauth_client_installed():
try:
# pylint: disable=import-outside-toplevel,unused-import,import-error
Expand Down
10 changes: 8 additions & 2 deletions src/foundry_dev_tools/fsspec_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class FoundryFileSystem(AbstractFileSystem): # noqa
or using SSO.
transaction_backoff (bool): if FoundryFileSystem should retry to open a transaction, in case one is already
open. The default value is 'True' and FoundryFileSystem will retry 60 seconds to open a transaction.
skip_instance_cache (bool): gets passed to FoundryFileSystem,
in multithreaded environments you want to set this to true.
**kwargs: passed to underlying fsspec.AbstractFileSystem
"""

Expand All @@ -36,9 +38,11 @@ def __init__(
branch: str = DEFAULT_BRANCH,
token: str = None,
transaction_backoff: bool = True,
skip_instance_cache: bool = False,
**kwargs,
):
super().__init__(**kwargs)
# pylint: disable=too-many-arguments
super().__init__(skip_instance_cache=skip_instance_cache, **kwargs)
self.token = token
self.dataset_identity = self.api.get_dataset_identity(
dataset_path_or_rid=dataset, branch=branch
Expand Down Expand Up @@ -310,7 +314,9 @@ def _upload_chunk(self, final=False):
"""Internal function to add a chunk of data to a started upload."""
assert final is True, "chunked uploading not supported"
self.buffer.seek(0)

assert (
self.fs._transaction is not None # pylint: disable=protected-access
), "multiple threads, same dataset? use skip_instance_cache"
self.fs.api.upload_dataset_file(
dataset_rid=self.fs.dataset_identity["dataset_rid"],
transaction_rid=self.fs._transaction.transaction_rid, # pylint: disable=protected-access
Expand Down
2 changes: 1 addition & 1 deletion src/transforms/api/_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from typing import Callable, Dict, Optional

import fs
import pandas as pd
import pyspark

from transforms.api import Input, Output
Expand Down Expand Up @@ -121,6 +120,7 @@ def _compute_pandas(self): # pylint: disable=arguments-differ
kwargs["ctx"] = TransformContext()

output_df = self(**kwargs)
import pandas as pd # pylint: disable=import-outside-toplevel

if not isinstance(output_df, pd.DataFrame):
raise ValueError(
Expand Down
40 changes: 40 additions & 0 deletions tests/test_fsspec_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,3 +971,43 @@ def test_ls_trailing_slash_empty_folder(random_file, fsspec_write_test_folder):
ls_result_no_slash = fs.ls(random_folder, detail=False)
ls_result_with_slash = fs.ls(random_folder + "/", detail=False)
assert ls_result_no_slash == ls_result_with_slash


@patch(
"foundry_dev_tools.FoundryRestClient.get_dataset_identity",
MagicMock(),
)
def test_skip_instance_cache():
fs = FoundryFileSystem(
dataset="ri.foundry.main.dataset.fake1bb5-be92-4ad9-aa3e-07c161751234",
branch="master",
token="super-secret-token",
skip_instance_cache=True,
)
fs2 = FoundryFileSystem(
dataset="ri.foundry.main.dataset.fake1bb5-be92-4ad9-aa3e-07c161751234",
branch="master",
token="super-secret-token",
skip_instance_cache=True,
)

fs3 = FoundryFileSystem(
dataset="ri.foundry.main.dataset.fake1bb5-be92-4ad9-aa3e-07c161751234",
branch="master",
token="super-secret-token",
skip_instance_cache=False,
)
fs4 = FoundryFileSystem(
dataset="ri.foundry.main.dataset.fake1bb5-be92-4ad9-aa3e-07c161751234",
branch="master",
token="super-secret-token",
skip_instance_cache=False,
)
fs.x = 123
fs2.x = 4567
fs3.x = 8901
fs4.x = 2345
assert fs.x == 123
assert fs2.x == 4567
assert fs3.x == 2345
assert fs4.x == 2345

0 comments on commit ed96e4b

Please sign in to comment.