From 8699f3c0b92034142bb6d4b4dc42a56ccb76a4be Mon Sep 17 00:00:00 2001 From: Elien Vandermaesen Date: Wed, 8 Jan 2025 15:46:48 +0100 Subject: [PATCH] issue #693 use openeo.internal.process.parse intead of new functions --- openeo/rest/connection.py | 90 +++++++---------------------------- openeo/rest/datacube.py | 2 +- tests/rest/test_connection.py | 25 ++++++++++ 3 files changed, 43 insertions(+), 74 deletions(-) diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index abedac28b..9ef4238cf 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -38,6 +38,7 @@ from openeo.internal.graph_building import FlatGraphableMixin, PGNode, as_flat_graph from openeo.internal.jupyter import VisualDict, VisualList from openeo.internal.processes.builder import ProcessBuilderBase +from openeo.internal.processes.parse import Process from openeo.internal.warnings import deprecated, legacy_alias from openeo.metadata import ( Band, @@ -1065,22 +1066,10 @@ def describe_process(self, id: str, namespace: Optional[str] = None) -> dict: raise OpenEoClientException("Process does not exist.") - """ - Loads all available processes of the back end. - - :param namespace: The namespace for which to list processes. - - :return: Dictionary of All available processes of the back end. - """ - - def dict_processes(self, namespace: Optional[str] = None) -> dict: - processes = self.list_processes(namespace) - result = {} - for process in processes: - result[process["id"]] = process - return result - - """ + def get_schema_from_process_parameter( + self, process_id: str, parameter_id: str, namespace: Optional[str] = None + ) -> dict: + """ Returns schema of the parameter of the process from the back end. :param process_id: The id of the process. @@ -1088,13 +1077,15 @@ def dict_processes(self, namespace: Optional[str] = None) -> dict: :param namespace: The namespace of the process. :return: schema of the parameter in the process. - """ - - def get_schema(self, process_id: str, parameter_id: str, namespace: Optional[str] = None) -> Union[dict, list]: - process = self._capabilities_cache.get(("processes_dict", "backend"), lambda: self.dict_processes(namespace))[ - process_id - ] - return extract_process_argument(process, ["parameters", parameter_id, "schema"]) + """ + processes = self.list_processes(namespace) + for process in processes: + if process["id"] == process_id: + schema = Process.from_dict(process) + for parameter in schema.parameters: + if parameter.name == parameter_id: + return parameter.schema.schema + raise OpenEoClientException("Process does not exist.") def list_jobs(self, limit: Union[int, None] = None) -> List[dict]: """ @@ -2115,36 +2106,15 @@ def extract_connections( return connections -""" - Extract element of the information using the parameters. - - :param process: The dict or of a process. - :param parameters: list of the parameters to extract. - - :return: arguments of process -""" - - -def extract_process_argument(information: Union[dict, list], parameters: list[str]) -> Union[dict, list]: - for parameter in parameters: - if isinstance(information, dict): - information = information[parameter] - elif isinstance(information, list): - information = search_dict_in_list(information, parameter) - return information - - -""" +def search_list_for_dict_key(lst: list, key: str) -> Union[dict, list]: + """ Searches a value of the dict that matches with the key in the list. :param lst: list with dictionaries :param parameters: list of the :return: value that matches key -""" - - -def search_list_for_dict_key(lst: list, key: str) -> Union[dict, list]: + """ result = None for item in lst: if key in item: @@ -2155,29 +2125,3 @@ def search_list_for_dict_key(lst: list, key: str) -> Union[dict, list]: if result is None: raise OpenEoClientException("No dictionary found with the key {k}.".format(k=key)) return result - - -""" - Searches a dictionary that contains the key-value pair in the list - - :param lst: list with dictionaries - :param value: the value to search for - :param key: the key to search for - - :return: dictionary containing key-value pair -""" - - -def search_dict_in_list(lst: list, value: str, key: str = "name") -> Union[dict, list]: - result = None - for item in lst: - if item[key] == value: - if result is None: - result = item - else: - raise OpenEoClientException( - "Multiple dictionaries found with the key-value pair ({k},{v}).".format(k=key, v=value) - ) - if result is None: - raise OpenEoClientException("No dictionary found with the key-value pair ({k},{v}).".format(k=key, v=value)) - return result diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index cb38fb490..4fd0431d1 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -2724,7 +2724,7 @@ def sar_backscatter( .. versionadded:: 0.4.9 .. versionchanged:: 0.4.10 replace `orthorectify` and `rtc` arguments with `coefficient`. """ - schema = self.connection.get_schema("sar_backscatter", "coefficient") + schema = self.connection.get_schema_from_process_parameter("sar_backscatter", "coefficient") coefficient_options = search_list_for_dict_key(schema, "enum") if coefficient not in coefficient_options: raise OpenEoClientException("Invalid `sar_backscatter` coefficient {c!r}. Should be one of {o}".format( diff --git a/tests/rest/test_connection.py b/tests/rest/test_connection.py index 0bf36d2d7..5ceb38edc 100644 --- a/tests/rest/test_connection.py +++ b/tests/rest/test_connection.py @@ -2874,6 +2874,31 @@ def test_list_processes_namespace(requests_mock): assert m.call_count == 1 +def test_get_schema_from_process_parameter(requests_mock): + requests_mock.get(API_URL, json={"api_version": "1.0.0"}) + processes = [ + { + "id": "incr", + "description": "Increment a value", + "summary": "Increment a value", + "parameters": [{"name": "x", "description": "value", "schema": {"type": "integer"}}], + "returns": {"description": "incremented value", "schema": {"type": "integer"}}, + }, + { + "id": "pi", + "description": "Pi", + "summary": "Pi", + "parameters": [], + "returns": {"description": "value of pi", "schema": {"type": "number"}}, + }, + ] + m = requests_mock.get(API_URL + "processes", json={"processes": processes}) + conn = Connection(API_URL) + assert conn.list_processes() == processes + schema = conn.get_schema_from_process_parameter("incr", "x") + assert schema == {"type": "integer"} + + def test_get_job(requests_mock): requests_mock.get(API_URL, json={"api_version": "1.0.0"}) conn = Connection(API_URL)