Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add git to filesystem source 301 devel #893

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
@@ -113,11 +113,12 @@ def protocol(self) -> str:

def on_resolved(self) -> None:
url = urlparse(self.bucket_url)
if not url.path and not url.netloc:
raise ConfigurationValueError(
"File path or netloc missing. Field bucket_url of FilesystemClientConfiguration"
" must contain valid url with a path or host:password component."
)
if not url.scheme in ('gitpythonfs', 'github', 'git', "s3"):
if not url.path and not url.netloc:
raise ConfigurationValueError(
"File path or netloc missing. Field bucket_url of FilesystemClientConfiguration"
" must contain valid url with a path or host:password component."
)
# this is just a path in a local file system
if url.path == self.bucket_url:
url = url._replace(scheme="file")
32 changes: 22 additions & 10 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ class FileItem(TypedDict, total=False):
"gcs": lambda f: ensure_pendulum_datetime(f["updated"]),
"file": lambda f: ensure_pendulum_datetime(f["mtime"]),
"memory": lambda f: ensure_pendulum_datetime(f["created"]),
"gitpythonfs": lambda f: ensure_pendulum_datetime(f["committed_date"]),
}
# Support aliases
MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"]
@@ -118,36 +119,42 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
"filesystem", [f"{version.DLT_PKG_NAME}[{config.protocol}]"]
) from e


class FileItemDict(DictStrAny):
"""A FileItem dictionary with additional methods to get fsspec filesystem, open and read files."""
"""A FileItem dictionary with additional methods to get fsspec filesystem,
open and read files.

It retains fsspec context while being passed between resources and transformers.
"""

def __init__(
self,
mapping: FileItem,
credentials: Optional[Union[FileSystemCredentials, AbstractFileSystem]] = None,
fsspec_instance_or_credentials: Optional[Union[AbstractFileSystem, FileSystemCredentials]] = None,
):
"""Create a dictionary with the filesystem client.

Args:
mapping (FileItem): The file item TypedDict.
credentials (Optional[FileSystemCredentials], optional): The credentials to the
filesystem. Defaults to None.
fsspec_instance_or_credentials (Optional[AbstractFileSystem, FileSystemCredentials], optional):
To help construct an fsspec filesystem instance. Defaults to None. If the fssspec implementation
requires arguments either parsed from the url or as keywords, it is safest to provide this argument, as supplying
only the `FileItem.file_url` (in mapping arg) could result in unexpected behaviour of the filesytem
client due to missing parameters unexpectedly taking on default values.
"""
self.credentials = credentials
self.fsspec_instance_or_credentials = fsspec_instance_or_credentials
super().__init__(**mapping)

@property
def fsspec(self) -> AbstractFileSystem:
"""The filesystem client is based on the given credentials.
"""The filesystem client is based on the available instance, or url and credentials.

Returns:
AbstractFileSystem: The fsspec client.
"""
if isinstance(self.credentials, AbstractFileSystem):
return self.credentials
if isinstance(self.fsspec_instance_or_credentials, AbstractFileSystem):
return self.fsspec_instance_or_credentials
else:
return fsspec_filesystem(self["file_url"], self.credentials)[0]
return fsspec_filesystem(self["file_url"], self.fsspec_instance_or_credentials)[0]

def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
"""Open the file as a fsspec file.
@@ -215,7 +222,12 @@ def glob_files(
"""
import os

protocol = urlparse(bucket_url).scheme
fsspec_path = fs_client._strip_protocol(bucket_url)
bucket_url = f"{protocol}://{fsspec_path}"

bucket_url_parsed = urlparse(bucket_url)

# if this is a file path without a scheme
if not bucket_url_parsed.scheme or (os.path.isabs(bucket_url) and "\\" in bucket_url):
# this is a file so create a proper file url