-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: extract parsing logic from collection module
- Loading branch information
1 parent
bbf82f3
commit b3c28bb
Showing
7 changed files
with
174 additions
and
143 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# Copyright 2022 TOSIT.IO | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from .collection import ( | ||
Collection, | ||
MissingMandatoryDirectoryError, | ||
PathDoesNotExistsError, | ||
PathIsNotADirectoryError, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
# Copyright 2022 TOSIT.IO | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from __future__ import annotations | ||
|
||
import logging | ||
from collections.abc import Generator | ||
from pathlib import Path | ||
|
||
import yaml | ||
from pydantic import BaseModel, ConfigDict, ValidationError | ||
|
||
from tdp.core.constants import ( | ||
YML_EXTENSION, | ||
) | ||
|
||
try: | ||
from yaml import CLoader as Loader | ||
except ImportError: | ||
from yaml import Loader | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def parse_dag_directory( | ||
dag_directory_path: Path, | ||
) -> Generator[TDPLibDagNodeModel, None, None]: | ||
"""Get the DAG nodes of a collection. | ||
Args: | ||
dag_directory_path: Path to the DAG directory. | ||
Returns: | ||
List of DAG nodes. | ||
""" | ||
for dag_file in (dag_directory_path).glob("*" + YML_EXTENSION): | ||
yield from parse_dag_file(dag_file) | ||
|
||
|
||
class TDPLibDagNodeModel(BaseModel): | ||
"""Model for a TDP operation defined in a tdp_lib_dag file.""" | ||
|
||
model_config = ConfigDict(extra="ignore") | ||
|
||
name: str | ||
depends_on: list[str] = [] | ||
|
||
|
||
class TDPLibDagModel(BaseModel): | ||
"""Model for a TDP DAG defined in a tdp_lib_dag file.""" | ||
|
||
model_config = ConfigDict(extra="ignore") | ||
|
||
operations: list[TDPLibDagNodeModel] | ||
|
||
|
||
def parse_dag_file( | ||
dag_file_path: Path, | ||
) -> Generator[TDPLibDagNodeModel, None, None]: | ||
"""Read a tdp_lib_dag file and return a list of DAG operations.""" | ||
with dag_file_path.open("r") as operations_file: | ||
file_content = yaml.load(operations_file, Loader=Loader) | ||
|
||
try: | ||
tdp_lib_dag = TDPLibDagModel(operations=file_content) | ||
for operation in tdp_lib_dag.operations: | ||
yield operation | ||
except ValidationError as e: | ||
logger.error(f"Error while parsing tdp_lib_dag file {dag_file_path}: {e}") | ||
raise |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# Copyright 2022 TOSIT.IO | ||
# SPDX-License-Identifier: Apache-2.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
# Copyright 2022 TOSIT.IO | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from pathlib import Path | ||
|
||
import pytest | ||
from pydantic import ValidationError | ||
|
||
from tdp.core.collection.parse_dag import parse_dag_directory, parse_dag_file | ||
|
||
|
||
def test_read_dag_file(tmp_path: Path): | ||
dag_file_path = tmp_path / "dag_file.yml" | ||
dag_file_path.write_text( | ||
"""--- | ||
- name: s1_c1_a | ||
depends_on: | ||
- sx_cx_a | ||
- name: s2_c2_a | ||
depends_on: | ||
- s1_c1_a | ||
- name: s3_c3_a | ||
depends_on: | ||
- sx_cx_a | ||
- sy_cy_a | ||
""" | ||
) | ||
operations = list(parse_dag_file(dag_file_path)) | ||
assert len(operations) == 3 | ||
assert operations[0].name == "s1_c1_a" | ||
assert operations[0].depends_on == ["sx_cx_a"] | ||
assert operations[1].name == "s2_c2_a" | ||
assert operations[1].depends_on == ["s1_c1_a"] | ||
assert operations[2].name == "s3_c3_a" | ||
assert operations[2].depends_on == ["sx_cx_a", "sy_cy_a"] | ||
|
||
|
||
def test_read_dag_file_empty(tmp_path: Path): | ||
dag_file_path = tmp_path / "dag_file.yml" | ||
dag_file_path.write_text("") | ||
with pytest.raises(ValidationError): | ||
list(parse_dag_file(dag_file_path)) | ||
|
||
|
||
def test_read_dag_file_with_additional_props(tmp_path: Path): | ||
dag_file_path = tmp_path / "dag_file.yml" | ||
dag_file_path.write_text( | ||
"""--- | ||
- name: s1_c1_a | ||
depends_on: | ||
- sx_cx_a | ||
foo: bar | ||
""" | ||
) | ||
operations = list(parse_dag_file(dag_file_path)) | ||
assert len(operations) == 1 | ||
assert operations[0].name == "s1_c1_a" | ||
assert operations[0].depends_on == ["sx_cx_a"] | ||
|
||
|
||
def test_get_collection_dag_nodes(tmp_path: Path): | ||
collection_path = tmp_path / "collection" | ||
dag_directory = "dag" | ||
(dag_directory_path := collection_path / dag_directory).mkdir( | ||
parents=True, exist_ok=True | ||
) | ||
dag_file_1 = dag_directory_path / "dag1.yml" | ||
dag_file_2 = dag_directory_path / "dag2.yml" | ||
dag_file_1.write_text( | ||
"""--- | ||
- name: s1_c1_a | ||
depends_on: | ||
- sx_cx_a | ||
""" | ||
) | ||
dag_file_2.write_text( | ||
"""--- | ||
- name: s2_c2_a | ||
depends_on: | ||
- s1_c1_a | ||
""" | ||
) | ||
dag_nodes = list(parse_dag_directory(dag_directory_path)) | ||
assert len(dag_nodes) == 2 | ||
assert any( | ||
node.name == "s1_c1_a" and node.depends_on == ["sx_cx_a"] for node in dag_nodes | ||
) | ||
assert any( | ||
node.name == "s2_c2_a" and node.depends_on == ["s1_c1_a"] for node in dag_nodes | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters