Skip to content

Commit

Permalink
utils: add sysdeps_parser utility module
Browse files Browse the repository at this point in the history
This functionality is used by the update manager to parse
the result from system dependency files.  By moving it
out of the update manager it becomes easier to test,
extend, and maintain.  Additionally it can be embedded
in the install script.

Signed-off-by: Eric Callahan <[email protected]>
  • Loading branch information
Arksine committed Feb 21, 2025
1 parent 6205110 commit 4b4ca76
Showing 1 changed file with 171 additions and 0 deletions.
171 changes: 171 additions & 0 deletions moonraker/utils/sysdeps_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Helpers for parsing system dependencies
#
# Copyright (C) 2025 Eric Callahan <[email protected]>
#
# This file may be distributed under the terms of the GNU GPLv3 license
from __future__ import annotations
import shlex
import re
import pathlib
import logging

from typing import Tuple, Dict, List, Any

def _get_distro_info() -> Dict[str, Any]:
try:
# try importing the distro module first. It can detect
# old/obscure releases that do not have the standard
# os-release fle.
import distro
except ModuleNotFoundError:
pass
else:
return dict(
distro_id=distro.id(),
distro_version=distro.version(),
aliases=distro.like().split()
)
# fall back to manual parsing of /etc/os-release
release_file = pathlib.Path("/etc/os-release")
release_info: Dict[str, str] = {}
with release_file.open("r") as f:
lexer = shlex.shlex(f, posix=True)
lexer.whitespace_split = True
for item in list(lexer):
if "=" in item:
key, val = item.split("=", maxsplit=1)
release_info[key] = val
return dict(
distro_id=release_info.get("ID", ""),
distro_version=release_info.get("VERSION_ID", ""),
aliases=release_info.get("ID_LIKE", "").split()
)

def _convert_version(version: str) -> Tuple[str | int, ...]:
version = version.strip()
ver_match = re.match(r"\d+(\.\d+)*((?:-|\.).+)?", version)
if ver_match is not None:
return tuple([
int(part) if part.isdigit() else part
for part in re.split(r"\.|-", version)
])
return (version,)

class SysDepsParser:
def __init__(self, distro_info: Dict[str, Any] | None = None) -> None:
if distro_info is None:
distro_info = _get_distro_info()
self.distro_id: str = distro_info.get("distro_id", "")
self.aliases: List[str] = distro_info.get("aliases", [])
self.distro_version: Tuple[int | str, ...] = tuple()
version = distro_info.get("distro_version")
if version:
self.distro_version = _convert_version(version)

def _parse_spec(self, full_spec: str) -> str | None:
parts = full_spec.split(";", maxsplit=1)
if len(parts) == 1:
return full_spec
pkg_name = parts[0].strip()
expressions = re.split(r"( and | or )", parts[1].strip())
if not len(expressions) & 1:
# There should always be an odd number of expressions. Each
# expression is separated by an "and" or "or" operator
logging.info(
f"Requirement specifier is missing an expression "
f"between logical operators : {full_spec}"
)
return None
last_result: bool = True
last_logical_op: str | None = "and"
for idx, exp in enumerate(expressions):
if idx & 1:
if last_logical_op is not None:
logging.info(
"Requirement specifier contains sequential logical "
f"operators: {full_spec}"
)
return None
logical_op = exp.strip()
if logical_op not in ("and", "or"):
logging.info(
f"Invalid logical operator {logical_op} in requirement "
f"specifier: {full_spec}")
return None
last_logical_op = logical_op
continue
elif last_logical_op is None:
logging.info(
f"Requirement specifier contains two seqential expressions "
f"without a logical operator: {full_spec}")
return None
dep_parts = re.split(r"(==|!=|<=|>=|<|>)", exp.strip())
req_var = dep_parts[0].strip().lower()
if len(dep_parts) != 3:
logging.info(f"Invalid comparison, must be 3 parts: {full_spec}")
return None
elif req_var == "distro_id":
left_op: str | Tuple[int | str, ...] = self.distro_id
right_op = dep_parts[2].strip().strip("\"'")
elif req_var == "distro_version":
if not self.distro_version:
logging.info(
"Distro Version not detected, cannot satisfy requirement: "
f"{full_spec}"
)
return None
left_op = self.distro_version
right_op = _convert_version(dep_parts[2].strip().strip("\"'"))
else:
logging.info(f"Invalid requirement specifier: {full_spec}")
return None
operator = dep_parts[1].strip()
try:
compfunc = {
"<": lambda x, y: x < y,
">": lambda x, y: x > y,
"==": lambda x, y: x == y,
"!=": lambda x, y: x != y,
">=": lambda x, y: x >= y,
"<=": lambda x, y: x <= y
}.get(operator, lambda x, y: False)
result = compfunc(left_op, right_op)
if last_logical_op == "and":
last_result &= result
else:
last_result |= result
last_logical_op = None
except Exception:
logging.exception(f"Error comparing requirements: {full_spec}")
return None
if last_result:
return pkg_name
return None

def parse_dependencies(self, sys_deps: Dict[str, List[str]]) -> List[str]:
if not self.distro_id:
logging.info(
"Failed to detect current distro ID, cannot parse dependencies"
)
return []
all_ids = [self.distro_id] + self.aliases
for distro_id in all_ids:
if distro_id in sys_deps:
if not sys_deps[distro_id]:
logging.info(
f"Dependency data contains an empty package definition "
f"for linux distro '{distro_id}'"
)
continue
processed_deps: List[str] = []
for dep in sys_deps[distro_id]:
parsed_dep = self._parse_spec(dep)
if parsed_dep is not None:
processed_deps.append(parsed_dep)
return processed_deps
else:
logging.info(
f"Dependency data has no package definition for linux "
f"distro '{self.distro_id}'"
)
return []

0 comments on commit 4b4ca76

Please sign in to comment.