From acffbf181bf6ddef094ced86748a39efc9ed92b1 Mon Sep 17 00:00:00 2001 From: Christoph Paulik Date: Wed, 19 Feb 2025 12:11:20 +0100 Subject: [PATCH] Add schema-map to allow validating against local copies of schemas --- README.md | 56 +++++++++---------- stac_validator/stac_validator.py | 15 +++++- stac_validator/utilities.py | 93 ++++++++++++++++++-------------- stac_validator/validate.py | 22 +++++--- 4 files changed, 112 insertions(+), 74 deletions(-) diff --git a/README.md b/README.md index 761d5ed..ed8a582 100644 --- a/README.md +++ b/README.md @@ -91,33 +91,35 @@ stac-validator --help Usage: stac-validator [OPTIONS] STAC_FILE Options: - --core Validate core stac object only without extensions. - --extensions Validate extensions only. - --links Additionally validate links. Only works with - default mode. - --assets Additionally validate assets. Only works with - default mode. - -c, --custom TEXT Validate against a custom schema (local filepath or - remote schema). - -r, --recursive Recursively validate all related stac objects. - -m, --max-depth INTEGER Maximum depth to traverse when recursing. Omit this - argument to get full recursion. Ignored if - `recursive == False`. - --collections Validate /collections response. - --item-collection Validate item collection response. Can be combined - with --pages. Defaults to one page. - --no-assets-urls Disables the opening of href links when validating - assets (enabled by default). - --header KEY VALUE HTTP header to include in the requests. Can be used - multiple times. - -p, --pages INTEGER Maximum number of pages to validate via --item- - collection. Defaults to one page. - -v, --verbose Enables verbose output for recursive mode. - --no_output Do not print output to console. - --log_file TEXT Save full recursive output to log file (local - filepath). - --version Show the version and exit. - --help Show this message and exit. + --core Validate core stac object only without + extensions. + --extensions Validate extensions only. + --links Additionally validate links. Only works with + default mode. + --assets Additionally validate assets. Only works with + default mode. + -c, --custom TEXT Validate against a custom schema (local + filepath or remote schema). + --schema-map ... Schema path to replaced by (local) schema path + during validation. Can be used multiple times. + -r, --recursive Recursively validate all related stac objects. + -m, --max-depth INTEGER Maximum depth to traverse when recursing. Omit + this argument to get full recursion. Ignored if + `recursive == False`. + --collections Validate /collections response. + --item-collection Validate item collection response. Can be + combined with --pages. Defaults to one page. + --no-assets-urls Disables the opening of href links when + validating assets (enabled by default). + --header ... HTTP header to include in the requests. Can be + used multiple times. + -p, --pages INTEGER Maximum number of pages to validate via --item- + collection. Defaults to one page. + -v, --verbose Enables verbose output for recursive mode. + --no_output Do not print output to console. + --log_file TEXT Save full recursive output to log file (local + filepath). + --help Show this message and exit. ``` --- diff --git a/stac_validator/stac_validator.py b/stac_validator/stac_validator.py index 48c69be..9ac0101 100644 --- a/stac_validator/stac_validator.py +++ b/stac_validator/stac_validator.py @@ -1,6 +1,6 @@ import json import sys -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, Tuple import click # type: ignore @@ -87,6 +87,12 @@ def collections_summary(message: List[Dict[str, Any]]) -> None: default="", help="Validate against a custom schema (local filepath or remote schema).", ) +@click.option( + "--schema-map", + type=(str, str), + multiple=True, + help="Schema path to replaced by (local) schema path during validation. Can be used multiple times.", +) @click.option( "--recursive", "-r", @@ -149,6 +155,7 @@ def main( links: bool, assets: bool, custom: str, + schema_map: List[Tuple], verbose: bool, no_output: bool, log_file: str, @@ -170,6 +177,7 @@ def main( links (bool): Whether to additionally validate links. Only works with default mode. assets (bool): Whether to additionally validate assets. Only works with default mode. custom (str): Path to a custom schema file to validate against. + schema_map (list(tuple)): List of tuples each having two elememts. First element is the schema path to be replaced by the path in the second element. verbose (bool): Whether to enable verbose output for recursive mode. no_output (bool): Whether to print output to console. log_file (str): Path to a log file to save full recursive output. @@ -182,6 +190,10 @@ def main( or 1 if it is invalid. """ valid = True + if schema_map == (): + schema_map_dict: Optional[Dict[str, str]] = None + else: + schema_map_dict = dict(schema_map) stac = StacValidate( stac_file=stac_file, collections=collections, @@ -196,6 +208,7 @@ def main( headers=dict(header), extensions=extensions, custom=custom, + schema_map=schema_map_dict, verbose=verbose, log=log_file, ) diff --git a/stac_validator/utilities.py b/stac_validator/utilities.py index a6b24e6..cf7b045 100644 --- a/stac_validator/utilities.py +++ b/stac_validator/utilities.py @@ -5,12 +5,10 @@ from urllib.parse import urlparse from urllib.request import Request, urlopen -import jsonschema import requests # type: ignore from jsonschema import Draft202012Validator from referencing import Registry, Resource from referencing.jsonschema import DRAFT202012 -from referencing.retrieval import to_cached_resource from referencing.typing import URI NEW_VERSIONS = [ @@ -192,12 +190,13 @@ def link_request( initial_message["format_invalid"].append(link["href"]) -def fetch_remote_schema(uri: str) -> dict: +def fetch_remote_schema(uri: str, timeout: int = 10) -> Dict: """ Fetch a remote schema from a URI. Args: uri (str): The URI of the schema to fetch. + timeout (int): Default timeout for robustness Returns: dict: The fetched schema content as a dictionary. @@ -205,47 +204,72 @@ def fetch_remote_schema(uri: str) -> dict: Raises: requests.RequestException: If the request to fetch the schema fails. """ - response = requests.get(uri) - response.raise_for_status() - return response.json() + try: + response = requests.get(uri, timeout=timeout) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + raise requests.RequestException( + f"Failed to fetch schema from {uri}: {str(e)}" + ) from e + except Exception as e: + raise Exception( + f"Unexpected error while retrieving schema from {uri}: {str(e)}" + ) from e -@to_cached_resource() # type: ignore -def cached_retrieve(uri: URI) -> str: +def cached_retrieve(uri: URI, schema_map: Optional[Dict] = None) -> Resource[Dict]: """ Retrieve and cache a remote schema. Args: uri (str): The URI of the schema. + schema_map_keys: Override schema location to validate against local versions of a schema Returns: - str: The raw JSON string of the schema. + dict: The parsed JSON dict of the schema. Raises: requests.RequestException: If the request to fetch the schema fails. Exception: For any other unexpected errors. """ - try: - response = requests.get(uri, timeout=10) # Set a timeout for robustness - response.raise_for_status() # Raise an error for HTTP response codes >= 400 - return response.text - except requests.exceptions.RequestException as e: - raise requests.RequestException( - f"Failed to fetch schema from {uri}: {str(e)}" - ) from e - except Exception as e: - raise Exception( - f"Unexpected error while retrieving schema from {uri}: {str(e)}" - ) from e + return Resource.from_contents( + fetch_schema_with_override(uri, schema_map=schema_map) + ) -def validate_with_ref_resolver(schema_path: str, content: dict) -> None: +def fetch_schema_with_override( + schema_path: str, schema_map: Optional[Dict] = None +) -> Dict: + """ + Retrieve and cache a remote schema. + + Args: + schema_path (str): Path or URI of the schema. + schema_map (dict): Override schema location to validate against local versions of a schema + + Returns: + dict: The parsed JSON dict of the schema. + """ + + if schema_map: + if schema_path in schema_map: + schema_path = schema_map[schema_path] + + # Load the schema + return fetch_and_parse_schema(schema_path) + + +def validate_with_ref_resolver( + schema_path: str, content: Dict, schema_map: Optional[Dict] = None +) -> None: """ Validate a JSON document against a JSON Schema with dynamic reference resolution. Args: schema_path (str): Path or URI of the JSON Schema. content (dict): JSON content to validate. + schema_map (dict): Override schema location to validate against local versions of a schema Raises: jsonschema.exceptions.ValidationError: If validation fails. @@ -253,27 +277,16 @@ def validate_with_ref_resolver(schema_path: str, content: dict) -> None: FileNotFoundError: If a local schema file is not found. Exception: If any other error occurs during validation. """ - # Load the schema - if schema_path.startswith("http"): - schema = fetch_remote_schema(schema_path) - else: - try: - with open(schema_path, "r") as f: - schema = json.load(f) - except FileNotFoundError as e: - raise FileNotFoundError(f"Schema file not found: {schema_path}") from e - + schema = fetch_schema_with_override(schema_path, schema_map=schema_map) # Set up the resource and registry for schema resolution + cached_retrieve_with_schema_map = functools.partial( + cached_retrieve, schema_map=schema_map + ) resource: Resource = Resource(contents=schema, specification=DRAFT202012) # type: ignore - registry: Registry = Registry(retrieve=cached_retrieve).with_resource( # type: ignore + registry: Registry = Registry(retrieve=cached_retrieve_with_schema_map).with_resource( # type: ignore uri=schema_path, resource=resource ) # type: ignore # Validate the content against the schema - try: - validator = Draft202012Validator(schema, registry=registry) - validator.validate(content) - except jsonschema.exceptions.ValidationError as e: - raise jsonschema.exceptions.ValidationError(f"{e.message}") from e - except Exception as e: - raise Exception(f"Unexpected error during validation: {str(e)}") from e + validator = Draft202012Validator(schema, registry=registry) + validator.validate(content) diff --git a/stac_validator/validate.py b/stac_validator/validate.py index 21d5fd5..9964c32 100644 --- a/stac_validator/validate.py +++ b/stac_validator/validate.py @@ -1,7 +1,7 @@ import json import os from json.decoder import JSONDecodeError -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Iterable from urllib.error import HTTPError, URLError import click # type: ignore @@ -61,6 +61,7 @@ def __init__( headers: dict = {}, extensions: bool = False, custom: str = "", + schema_map: Optional[Dict[str, str]] = None, verbose: bool = False, log: str = "", ): @@ -70,6 +71,7 @@ def __init__( self.pages = pages self.message: List = [] self.schema = custom + self.schema_map = schema_map self.links = links self.assets = assets self.assets_open_urls = assets_open_urls @@ -198,14 +200,20 @@ def custom_validator(self) -> None: None """ if is_valid_url(self.schema): - validate_with_ref_resolver(self.schema, self.stac_content) + validate_with_ref_resolver( + self.schema, self.stac_content, schema_map=self.schema_map + ) elif os.path.exists(self.schema): - validate_with_ref_resolver(self.schema, self.stac_content) + validate_with_ref_resolver( + self.schema, self.stac_content, schema_map=self.schema_map + ) else: file_directory = os.path.dirname(os.path.abspath(str(self.stac_file))) self.schema = os.path.join(file_directory, self.schema) self.schema = os.path.abspath(os.path.realpath(self.schema)) - validate_with_ref_resolver(self.schema, self.stac_content) + validate_with_ref_resolver( + self.schema, self.stac_content, schema_map=self.schema_map + ) def core_validator(self, stac_type: str) -> None: """ @@ -216,7 +224,9 @@ def core_validator(self, stac_type: str) -> None: """ stac_type = stac_type.lower() self.schema = set_schema_addr(self.version, stac_type) - validate_with_ref_resolver(self.schema, self.stac_content) + validate_with_ref_resolver( + self.schema, self.stac_content, schema_map=self.schema_map + ) def extensions_validator(self, stac_type: str) -> Dict: """ @@ -254,7 +264,7 @@ def extensions_validator(self, stac_type: str) -> Dict: message["schema"].append(extension) except jsonschema.exceptions.ValidationError as e: - e = best_match(e.context) + e = best_match(e.context) # type: ignore valid = False if e.absolute_path: err_msg = (