Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
wip
Browse files Browse the repository at this point in the history
wip

wip

wip
msto committed May 5, 2024
1 parent dd82a34 commit 1ce2978
Showing 3 changed files with 604 additions and 2 deletions.
8 changes: 6 additions & 2 deletions fgpyo/io/__init__.py
Original file line number Diff line number Diff line change
@@ -56,11 +56,15 @@
from typing import Iterator
from typing import Set
from typing import TextIO
from typing import TypeAlias
from typing import Union
from typing import cast

COMPRESSED_FILE_EXTENSIONS: Set[str] = {".gz", ".bgz"}

ReadableFileHandle: TypeAlias = Union[io.TextIOWrapper, TextIO, IO[Any]]
WritableFileHandle: TypeAlias = Union[IO[Any], io.TextIOWrapper]


def assert_path_is_readable(path: Path) -> None:
"""Checks that file exists and returns True, else raises AssertionError
@@ -129,7 +133,7 @@ def assert_path_is_writeable(path: Path, parent_must_exist: bool = True) -> None
raise AssertionError(f"No parent directories exist for: {path}")


def to_reader(path: Path) -> Union[io.TextIOWrapper, TextIO, IO[Any]]:
def to_reader(path: Path) -> ReadableFileHandle:
"""Opens a Path for reading and based on extension uses open() or gzip.open()
Args:
@@ -147,7 +151,7 @@ def to_reader(path: Path) -> Union[io.TextIOWrapper, TextIO, IO[Any]]:
return path.open(mode="r")


def to_writer(path: Path, append: bool = False) -> Union[IO[Any], io.TextIOWrapper]:
def to_writer(path: Path, append: bool = False) -> WritableFileHandle:
"""Opens a Path for writing (or appending) and based on extension uses open() or gzip.open()
Args:
339 changes: 339 additions & 0 deletions fgpyo/util/metric.py
Original file line number Diff line number Diff line change
@@ -116,17 +116,27 @@
"""

import dataclasses
from abc import ABC
from csv import DictWriter
from dataclasses import dataclass
from enum import Enum
from inspect import isclass
from pathlib import Path
from types import TracebackType
from typing import Any
from typing import Callable
from typing import Dict
from typing import Generic
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Type
from typing import TypeVar

import attr

from fgpyo import io
from fgpyo.util import inspect

@@ -334,3 +344,332 @@ def fast_concat(*inputs: Path, output: Path) -> None:
io.write_lines(
path=output, lines_to_write=list(io.read_lines(input_path))[1:], append=True
)


def is_metric(cls: Any) -> bool:
"""True if the given class is a Metric."""

return (
isclass(cls)
and issubclass(cls, Metric)
and (dataclasses.is_dataclass(cls) or attr.has(cls))
)


@dataclass(kw_only=True)
class MetricFileFormat:
"""
Parameters describing the format and configuration of a delimited Metric file.
Most of these parameters, if specified, are passed through to `csv.DictReader`/`csv.DictWriter`.
"""

delimiter: str = "\t"
comment: str = "#"


@dataclass(frozen=True, kw_only=True)
class MetricFileHeader:
"""
Header of a file.
A file's header contains an optional preface, consisting of lines prefixed by a comment
character and/or empty lines, and a required row of fieldnames before the data rows begin.
Attributes:
preface: A list of any lines preceding the fieldnames.
fieldnames: The field names specified in the final line of the header.
"""

preface: list[str]
fieldnames: list[str]


def get_header(
reader: io.ReadableFileHandle,
file_format: MetricFileFormat,
) -> Optional[MetricFileHeader]:
"""
Read the header from an open file.
The first row after any commented or empty lines will be used as the fieldnames.
Lines preceding the fieldnames will be returned in the `preface.`
NB: This function returns `Optional` instead of raising an error because the name of the
source file is not in scope, making it difficult to provide a helpful error message. It is
the responsibility of the caller to raise an error if the file is empty.
See original proof-of-concept here: https://github.com/fulcrumgenomics/fgpyo/pull/103
Args:
reader: An open, readable file handle.
file_format: A dataclass containing (at minimum) the file's delimiter and the string
prefixing any comment lines.
Returns:
A `FileHeader` containing the field names and any preceding lines.
None if the file was empty or contained only comments or empty lines.
"""

preface: list[str] = []

for line in reader:
if line.startswith(file_format.comment) or line.strip() == "":
preface.append(line.strip())
else:
break
else:
return None

fieldnames = line.strip().split(file_format.delimiter)

return MetricFileHeader(preface=preface, fieldnames=fieldnames)


class MetricWriter:
_metric_class: type[Metric]
_fieldnames: list[str]
_fout: io.WritableFileHandle
_writer: DictWriter

def __init__(
self,
filename: Path | str,
metric_class: type[Metric],
append: bool = False,
delimiter: str = "\t",
include_fields: list[str] | None = None,
exclude_fields: list[str] | None = None,
**kwds: Any,
) -> None:
"""
Args:
path: Path to the file to write.
metric_class: Metric class.
append: If `True`, the file will be appended to. Otherwise, the specified file will be
overwritten.
delimiter: The output file delimiter.
include_fields: If specified, only the listed fieldnames will be included when writing
records to file. Fields will be written in the order provided.
May not be used together with `exclude_fields`.
exclude_fields: If specified, any listed fieldnames will be excluded when writing
records to file.
May not be used together with `include_fields`.
Raises:
AssertionError: If the provided metric class is not a dataclass- or attr-decorated
subclass of `Metric`.
AssertionError: If the provided filepath is not writable. (Or readable, if
`append=True`.)
"""

filepath: Path = filename if isinstance(filename, Path) else Path(filename)
file_format = MetricFileFormat(delimiter=delimiter)

assert is_metric(
metric_class
), "Metric class must be a dataclass- or attr-decorated subclass of `Metric`."
io.assert_path_is_writeable(filepath)
if append:
io.assert_path_is_readable(filepath)
assert_file_header_matches_metric(filepath, metric_class, file_format)

self._metric_class = metric_class
self._fieldnames = _validate_output_fieldnames(
metric_class=metric_class,
include_fields=include_fields,
exclude_fields=exclude_fields,
)
self._fout = io.to_writer(filepath, append=append)
self._writer = DictWriter(
f=self._fout,
fieldnames=self._fieldnames,
delimiter=delimiter,
)

# If we aren't appending to an existing file, write the header before any rows
if not append:
self._writer.writeheader()

def __enter__(self) -> "MetricWriter":
return self

def __exit__(
self,
exc_type: Type[BaseException],
exc_value: BaseException,
traceback: TracebackType,
) -> None:
self.close()

def close(self) -> None:
"""Close the underlying file handle."""
self._fout.close()

def write(self, metric: Metric) -> None:
"""
Write a single Metric instance to file.
The Metric is converted to a dictionary and then written using the underlying
`csv.DictWriter`. If the `MetricWriter` was created using the `include_fields` or
`exclude_fields` arguments, the attributes of the dataclass are subset and/or reordered
accordingly before writing.
Args:
metric: An instance of the specified Metric.
"""
if not isinstance(metric, self._metric_class):
raise ValueError(f"Must provide instances of {self._metric_class.__name__}")

# Serialize the Metric to a dict for writing by the underlying `DictWriter`
row = asdict(metric)

# Filter and/or re-order output fields if necessary
row = {fieldname: row[fieldname] for fieldname in self._fieldnames}

self._writer.writerow(row)

def writeall(self, metrics: Iterable[Metric]) -> None:
"""
Write multiple Metric instances to file.
Each Metric is converted to a dictionary and then written using the underlying
`csv.DictWriter`. If the `MetricWriter` was created using the `include_fields` or
`exclude_fields` arguments, the attributes of each Metric are subset and/or reordered
accordingly before writing.
Args:
metrics: A sequence of instances of the specified Metric.
"""
for metric in metrics:
self.write(metric)


def assert_is_metric(cls: type[Metric]) -> None:
"""
Assert that the given class is a Metric.
Args:
cls: A class object.
Raises:
TypeError: If the given class is not a Metric.
"""
if not is_metric(cls):
raise TypeError(f"Not a dataclass or attr decorated Metric: {cls}")


def asdict(metric: Metric) -> dict[str, Any]:
"""Convert a Metric instance to a dictionary."""
assert_is_metric(type(metric))

if dataclasses.is_dataclass(metric):
return dataclasses.asdict(metric)
elif attr.has(metric):
return attr.asdict(metric)
else:
assert False, "Unreachable"


def get_fieldnames(metric_class: type[Metric]) -> list[str]:
"""
Get the fieldnames of the specified metric class.
Args:
metric_class: A Metric class.
Returns:
A list of fieldnames.
Raises:
TypeError: If the given class is not a Metric.
"""
assert_is_metric(metric_class)

if dataclasses.is_dataclass(metric_class):
return [f.name for f in dataclasses.fields(metric_class)]
elif attr.has(metric_class):
return [f.name for f in attr.fields(metric_class)]
else:
assert False, "Unreachable"


def assert_file_header_matches_metric(
path: Path,
metric_class: type[Metric],
file_format: MetricFileFormat,
) -> None:
"""
Check that the specified file has a header and its fields match those of the provided Metric.
"""
with path.open("r") as fin:
header: MetricFileHeader = get_header(fin, file_format=file_format)

if header is None:
raise ValueError(f"Could not find a header in the provided file: {path}")

if header.fieldnames != get_fieldnames(metric_class):
raise ValueError(
"The provided file does not have the same field names as the provided dataclass:\n"
f"\tDataclass: {metric_class.__name__}\n"
f"\tFile: {path}\n"
f"\tDataclass fields: {', '.join(get_fieldnames(metric_class))}\n"
f"\tFile: {', '.join(header.fieldnames)}\n"
)


def assert_fieldnames_are_metric_attributes(
specified_fieldnames: list[str],
metric_class: type[MetricType],
) -> None:
"""
Check that all of the specified fields are attributes on the given Metric.
Raises:
ValueError: if any of the specified fieldnames are not an attribute on the given Metric.
"""
invalid_fieldnames = [f for f in specified_fieldnames if f not in get_fieldnames(metric_class)]

if len(invalid_fieldnames) > 0:
raise ValueError(
"One or more of the specified fields are not attributes on the Metric "
+ f"{metric_class.__name__}: "
+ ", ".join(invalid_fieldnames)
)


def _validate_output_fieldnames(
metric_class: type[MetricType],
include_fields: list[str] | None = None,
exclude_fields: list[str] | None = None,
) -> list[str]:
"""
Subset and/or re-order the dataclass's fieldnames based on the specified include/exclude lists.
* Only one of `include_fields` and `exclude_fields` may be specified.
* All fieldnames specified in `include_fields` must be fields on `dataclass_type`. If this
argument is specified, fields will be returned in the order they appear in the list.
* All fieldnames specified in `exclude_fields` must be fields on `dataclass_type`. (This is
technically unnecessary, but is a safeguard against passing an incorrect list.)
* If neither `include_fields` or `exclude_fields` are specified, return the `dataclass_type`'s
fieldnames.
Raises:
ValueError: If both `include_fields` and `exclude_fields` are specified.
"""

if include_fields is not None and exclude_fields is not None:
raise ValueError(
"Only one of `include_fields` and `exclude_fields` may be specified, not both."
)
elif exclude_fields is not None:
assert_fieldnames_are_metric_attributes(exclude_fields, metric_class)
output_fieldnames = [f for f in get_fieldnames(metric_class) if f not in exclude_fields]
elif include_fields is not None:
assert_fieldnames_are_metric_attributes(include_fields, metric_class)
output_fieldnames = include_fields
else:
output_fieldnames = get_fieldnames(metric_class)

return output_fieldnames
259 changes: 259 additions & 0 deletions fgpyo/util/tests/test_metric.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import enum
import gzip
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from typing import Callable
@@ -29,6 +30,10 @@
from fgpyo.util.inspect import is_attr_class
from fgpyo.util.inspect import is_dataclasses_class
from fgpyo.util.metric import Metric
from fgpyo.util.metric import MetricWriter
from fgpyo.util.metric import asdict
from fgpyo.util.metric import assert_is_metric
from fgpyo.util.metric import get_fieldnames


class EnumTest(enum.Enum):
@@ -519,3 +524,257 @@ def test_metric_columns_out_of_order(tmp_path: Path, data_and_classes: DataBuild
names = list(NameMetric.read(path=path))
assert len(names) == 1
assert names[0] == name


@dataclass
class FakeMetric(Metric["FakeMetric"]):
foo: str
bar: int


def test_writer(tmp_path: Path) -> None:
fpath = tmp_path / "test.txt"

with MetricWriter(filename=fpath, append=False, metric_class=FakeMetric) as writer:
writer.write(FakeMetric(foo="abc", bar=1))
writer.write(FakeMetric(foo="def", bar=2))

with fpath.open("r") as f:
assert next(f) == "foo\tbar\n"
assert next(f) == "abc\t1\n"
assert next(f) == "def\t2\n"
with pytest.raises(StopIteration):
next(f)


def test_writer_from_str(tmp_path: Path) -> None:
"""Test that we can create a writer when `filename` is a `str`."""
fpath = tmp_path / "test.txt"

with MetricWriter(filename=str(fpath), append=False, metric_class=FakeMetric) as writer:
writer.write(FakeMetric(foo="abc", bar=1))

with fpath.open("r") as f:
assert next(f) == "foo\tbar\n"
assert next(f) == "abc\t1\n"
with pytest.raises(StopIteration):
next(f)


def test_writer_writeall(tmp_path: Path) -> None:
fpath = tmp_path / "test.txt"

data = [
FakeMetric(foo="abc", bar=1),
FakeMetric(foo="def", bar=2),
]
with MetricWriter(filename=fpath, append=False, metric_class=FakeMetric) as writer:
writer.writeall(data)

with fpath.open("r") as f:
assert next(f) == "foo\tbar\n"
assert next(f) == "abc\t1\n"
assert next(f) == "def\t2\n"
with pytest.raises(StopIteration):
next(f)


def test_writer_append(tmp_path: Path) -> None:
"""Test that we can append to a file."""
fpath = tmp_path / "test.txt"

with fpath.open("w") as fout:
fout.write("foo\tbar\n")

with MetricWriter(filename=fpath, append=True, metric_class=FakeMetric) as writer:
writer.write(FakeMetric(foo="abc", bar=1))
writer.write(FakeMetric(foo="def", bar=2))

with fpath.open("r") as f:
assert next(f) == "foo\tbar\n"
assert next(f) == "abc\t1\n"
assert next(f) == "def\t2\n"
with pytest.raises(StopIteration):
next(f)


def test_writer_append_raises_if_empty(tmp_path: Path) -> None:
"""Test that we raise an error if we try to append to an empty file."""
fpath = tmp_path / "test.txt"
fpath.touch()

with pytest.raises(ValueError, match="Could not find a header"):
with MetricWriter(filename=fpath, append=True, metric_class=FakeMetric) as writer:
writer.write(FakeMetric(foo="abc", bar=1))


def test_writer_append_raises_if_no_header(tmp_path: Path) -> None:
"""Test that we raise an error if we try to append to a file with no header."""
fpath = tmp_path / "test.txt"
with fpath.open("w") as fout:
fout.write("abc\t1\n")

with pytest.raises(ValueError, match="The provided file does not have the same field names"):
with MetricWriter(filename=fpath, append=True, metric_class=FakeMetric) as writer:
writer.write(FakeMetric(foo="abc", bar=1))


def test_writer_append_raises_if_header_does_not_match(tmp_path: Path) -> None:
"""
Test that we raise an error if we try to append to a file whose header doesn't match our
dataclass.
"""
fpath = tmp_path / "test.txt"

with fpath.open("w") as fout:
fout.write("foo\tbar\tbaz\n")

with pytest.raises(ValueError, match="The provided file does not have the same field names"):
with MetricWriter(filename=fpath, append=True, metric_class=FakeMetric) as writer:
writer.write(FakeMetric(foo="abc", bar=1))


def test_writer_include_fields(tmp_path: Path) -> None:
"""Test that we can include only a subset of fields."""
fpath = tmp_path / "test.txt"

data = [
FakeMetric(foo="abc", bar=1),
FakeMetric(foo="def", bar=2),
]
with MetricWriter(
filename=fpath,
append=False,
metric_class=FakeMetric,
include_fields=["foo"],
) as writer:
writer.writeall(data)

with fpath.open("r") as f:
assert next(f) == "foo\n"
assert next(f) == "abc\n"
assert next(f) == "def\n"
with pytest.raises(StopIteration):
next(f)


def test_writer_include_fields_reorders(tmp_path: Path) -> None:
"""Test that we can reorder the output fields."""
fpath = tmp_path / "test.txt"

data = [
FakeMetric(foo="abc", bar=1),
FakeMetric(foo="def", bar=2),
]
with MetricWriter(
filename=fpath,
append=False,
metric_class=FakeMetric,
include_fields=["bar", "foo"],
) as writer:
writer.writeall(data)

with fpath.open("r") as f:
assert next(f) == "bar\tfoo\n"
assert next(f) == "1\tabc\n"
assert next(f) == "2\tdef\n"
with pytest.raises(StopIteration):
next(f)


def test_writer_exclude_fields(tmp_path: Path) -> None:
"""Test that we can exclude fields from being written."""

fpath = tmp_path / "test.txt"

data = [
FakeMetric(foo="abc", bar=1),
FakeMetric(foo="def", bar=2),
]
with MetricWriter(
filename=fpath,
append=False,
metric_class=FakeMetric,
exclude_fields=["bar"],
) as writer:
writer.writeall(data)

with fpath.open("r") as f:
assert next(f) == "foo\n"
assert next(f) == "abc\n"
assert next(f) == "def\n"
with pytest.raises(StopIteration):
next(f)


def test_fieldnames() -> None:
"""Test we can get the fieldnames of a metric."""

assert get_fieldnames(FakeMetric) == ["foo", "bar"]


def test_fieldnames_raises_if_not_a_metric() -> None:
"""Test we raise if we get a non-metric."""

@dataclass
class BadMetric:
foo: str
bar: int

with pytest.raises(TypeError, match="Not a dataclass or attr decorated Metric"):
get_fieldnames(BadMetric) # type: ignore[arg-type]


def test_asdict() -> None:
"""Test that we can get a dict representation of a metric."""

metric = FakeMetric(foo="abc", bar=1)
row = asdict(metric)
assert row == {"foo": "abc", "bar": 1}


def test_assert_is_metric() -> None:
"""
Test that we can validate if a class is a Metric.
"""
try:
assert_is_metric(FakeMetric)
except TypeError:
raise AssertionError("Failed to validate a valid Metric") from None


def test_assert_is_metric_raises_if_not_decorated() -> None:
"""
Test that we raise an error if the provided type is a Metric subclass but not decorated as a
dataclass or attr.
"""

class BadMetric(Metric["BadMetric"]):
foo: str
bar: int

with pytest.raises(TypeError, match="Not a dataclass or attr decorated Metric"):
assert_is_metric(BadMetric)


def test_assert_metric_is_valid_raises_if_not_a_metric() -> None:
"""
Test that we raise an error if the provided type is decorated as a
dataclass or attr but does not subclass Metric.
"""

@dataclass
class BadMetric:
foo: str
bar: int

with pytest.raises(TypeError, match="Not a dataclass or attr decorated Metric"):
assert_is_metric(BadMetric)

@attr.s
class BadMetric:
foo: str
bar: int

with pytest.raises(TypeError, match="Not a dataclass or attr decorated Metric"):
assert_is_metric(BadMetric)

0 comments on commit 1ce2978

Please sign in to comment.