Skip to content

Commit

Permalink
fix: import constant from wrong file
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulFarault committed Nov 7, 2024
1 parent d53347f commit 233f808
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 276 deletions.
193 changes: 66 additions & 127 deletions tdp/core/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@ class MissingMandatoryDirectoryError(Exception):
pass


class TDPLibDagNodeModel(BaseModel):
"""Model for a TDP operation defined in a tdp_lib_dag file."""

model_config = ConfigDict(extra="ignore")

name: str
depends_on: list[str] = []


class TDPLibDagModel(BaseModel):
"""Model for a TDP DAG defined in a tdp_lib_dag file."""

model_config = ConfigDict(extra="ignore")

operations: list[TDPLibDagNodeModel]


class CollectionReader:
"""An enriched version of an Ansible collection.
Expand All @@ -73,7 +90,7 @@ def __init__(
MissingMandatoryDirectoryError: If the collection does not contain a mandatory directory.
"""
self._path = Path(path)
check_collection_structure(self._path)
self._check_collection_structure(self._path)
self._inventory_reader = inventory_reader or InventoryReader()

# ? Is this method really useful?
Expand Down Expand Up @@ -124,93 +141,63 @@ def schema_directory(self) -> Path:

def read_dag_nodes(self) -> Generator[TDPLibDagNodeModel, None, None]:
"""Read the DAG nodes stored in the dag_directory."""
return read_dag_directory(self.dag_directory)
for dag_file in (self.dag_directory).glob("*" + YML_EXTENSION):
with dag_file.open("r") as operations_file:
file_content = yaml.load(operations_file, Loader=Loader)

try:
tdp_lib_dag = TDPLibDagModel(operations=file_content)
for operation in tdp_lib_dag.operations:
yield operation
except ValidationError as e:
logger.error(f"Error while parsing tdp_lib_dag file {dag_file}: {e}")
raise

def read_playbooks(self) -> dict[str, Playbook]:
"""Read the playbooks stored in the playbooks_directory."""
return read_playbooks_directory(
self.playbooks_directory,
self.name,
inventory_reader=self._inventory_reader,
)

def read_schemas(self) -> list[ServiceCollectionSchema]:
"""Read the schemas stored in the schema_directory."""
return read_schema_directory(self.schema_directory)


def check_collection_structure(path: Path) -> None:
"""Check the structure of a collection.
Args:
path: Path to the collection.
Raises:
PathDoesNotExistsError: If the path does not exists.
PathIsNotADirectoryError: If the path is not a directory.
MissingMandatoryDirectoryError: If the collection does not contain a mandatory directory.
"""
if not path.exists():
raise PathDoesNotExistsError(f"{path} does not exists.")
if not path.is_dir():
raise PathIsNotADirectoryError(f"{path} is not a directory.")
for mandatory_directory in MANDATORY_DIRECTORIES:
mandatory_path = path / mandatory_directory
if not mandatory_path.exists() or not mandatory_path.is_dir():
raise MissingMandatoryDirectoryError(
f"{path} does not contain the mandatory directory {mandatory_directory}.",
return {
playbook_path.stem: Playbook(
path=playbook_path,
collection_name=self.name,
hosts=read_hosts_from_playbook(playbook_path, self._inventory_reader),
)
for playbook_path in (self.playbooks_directory).glob("*" + YML_EXTENSION)
}

def read_schemas(self) -> list[ServiceCollectionSchema]:
"""Read the schemas stored in the schema_directory.
def read_schema_directory(directory_path: Path) -> list[ServiceCollectionSchema]:
"""Read the schemas from a directory.
This function is meant to be used only once during the initialization of a
collection object.
Invalid schemas are ignored.
Args:
directory_path: Path to the schema directory.
Returns:
Dictionary of schemas.
"""
schemas: list[ServiceCollectionSchema] = []
for schema_path in (directory_path).glob("*" + JSON_EXTENSION):
try:
schemas.append(ServiceCollectionSchema.from_path(schema_path))
except InvalidSchemaError as e:
logger.warning(f"{e}. Ignoring schema.")
return schemas


def read_playbooks_directory(
directory_path: Path,
collection_name: str,
inventory_reader: Optional[InventoryReader] = None,
) -> dict[str, Playbook]:
"""Read the playbooks from a directory.
Invalid schemas are ignored.
"""
schemas: list[ServiceCollectionSchema] = []
for schema_path in (self.schema_directory).glob("*" + JSON_EXTENSION):
try:
schemas.append(ServiceCollectionSchema.from_path(schema_path))
except InvalidSchemaError as e:
logger.warning(f"{e}. Ignoring schema.")
return schemas

This function is meant to be used only once during the initialization of a
collection object.
def _check_collection_structure(self, path: Path) -> None:
"""Check the structure of a collection.
Args:
directory_path: Path to the playbooks directory.
collection_name: Name of the collection.
inventory_reader: Inventory reader.
Args:
path: Path to the collection.
Returns:
Dictionary of playbooks.
"""
return {
playbook_path.stem: Playbook(
playbook_path,
collection_name,
read_hosts_from_playbook(playbook_path, inventory_reader),
)
for playbook_path in (directory_path).glob("*" + YML_EXTENSION)
}
Raises:
PathDoesNotExistsError: If the path does not exists.
PathIsNotADirectoryError: If the path is not a directory.
MissingMandatoryDirectoryError: If the collection does not contain a mandatory directory.
"""
if not path.exists():
raise PathDoesNotExistsError(f"{path} does not exists.")
if not path.is_dir():
raise PathIsNotADirectoryError(f"{path} is not a directory.")
for mandatory_directory in MANDATORY_DIRECTORIES:
mandatory_path = path / mandatory_directory
if not mandatory_path.exists() or not mandatory_path.is_dir():
raise MissingMandatoryDirectoryError(
f"{path} does not contain the mandatory directory {mandatory_directory}.",
)


def read_hosts_from_playbook(
Expand All @@ -232,51 +219,3 @@ def read_hosts_from_playbook(
return inventory_reader.get_hosts_from_playbook(fd)
except Exception as e:
raise ValueError(f"Can't parse playbook {playbook_path}.") from e


def read_dag_directory(
directory_path: Path,
) -> Generator[TDPLibDagNodeModel, None, None]:
"""Read the DAG files from a directory.
Args:
directory_path: Path to the DAG directory.
Returns:
List of DAG nodes.
"""
for dag_file in (directory_path).glob("*" + YML_EXTENSION):
yield from read_dag_file(dag_file)


class TDPLibDagNodeModel(BaseModel):
"""Model for a TDP operation defined in a tdp_lib_dag file."""

model_config = ConfigDict(extra="ignore")

name: str
depends_on: list[str] = []


class TDPLibDagModel(BaseModel):
"""Model for a TDP DAG defined in a tdp_lib_dag file."""

model_config = ConfigDict(extra="ignore")

operations: list[TDPLibDagNodeModel]


def read_dag_file(
dag_file_path: Path,
) -> Generator[TDPLibDagNodeModel, None, None]:
"""Read a tdp_lib_dag file and return a list of DAG operations."""
with dag_file_path.open("r") as operations_file:
file_content = yaml.load(operations_file, Loader=Loader)

try:
tdp_lib_dag = TDPLibDagModel(operations=file_content)
for operation in tdp_lib_dag.operations:
yield operation
except ValidationError as e:
logger.error(f"Error while parsing tdp_lib_dag file {dag_file_path}: {e}")
raise
3 changes: 1 addition & 2 deletions tdp/core/variables/service_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Optional

from tdp.core.collection import YML_EXTENSION
from tdp.core.constants import SERVICE_NAME_MAX_LENGTH
from tdp.core.constants import SERVICE_NAME_MAX_LENGTH, YML_EXTENSION
from tdp.core.types import PathLike
from tdp.core.variables.schema.exceptions import SchemaValidationError
from tdp.core.variables.variables import (
Expand Down
Loading

0 comments on commit 233f808

Please sign in to comment.