Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce StacResource/SaveResult returned from save_result #725

Merged
merged 6 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add support for `export_workspace` process ([#720](https://github.com/Open-EO/openeo-python-client/issues/720))

### Changed

- `DataCube.save_result()` (and related methods) now return a `SaveResult`/`StacResource` object instead of another `DataCube` object to be more in line with the official `save_result` specification ([#402](https://github.com/Open-EO/openeo-python-client/issues/402), [#720](https://github.com/Open-EO/openeo-python-client/issues/720))

### Removed

### Fixed
Expand Down
14 changes: 12 additions & 2 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,18 @@ openeo.rest.mlmodel
:inherited-members:


openeo.rest.multiresult
-----------------------


Results
--------

.. automodule:: openeo.rest.result
:members:
:inherited-members:

.. automodule:: openeo.rest.stac_resource
:members:


.. automodule:: openeo.rest.multiresult
:members: MultiResult
Expand Down
2 changes: 2 additions & 0 deletions docs/process_mapping.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ method or function in the openEO Python Client Library.
- :py:meth:`ProcessBuilder.__eq__() <openeo.processes.ProcessBuilder.__eq__>`, :py:meth:`ProcessBuilder.eq() <openeo.processes.ProcessBuilder.eq>`, :py:meth:`eq() <openeo.processes.eq>`, :py:meth:`DataCube.__eq__() <openeo.rest.datacube.DataCube.__eq__>`
* - `exp <https://processes.openeo.org/#exp>`_
- :py:meth:`ProcessBuilder.exp() <openeo.processes.ProcessBuilder.exp>`, :py:meth:`exp() <openeo.processes.exp>`
* - `export_workspace <https://processes.openeo.org/#export_workspace>`_
- :py:meth:`StacResource.export_workspace() <openeo.rest.stac_resource.StacResource.export_workspace>`
* - `extrema <https://processes.openeo.org/#extrema>`_
- :py:meth:`ProcessBuilder.extrema() <openeo.processes.ProcessBuilder.extrema>`, :py:meth:`extrema() <openeo.processes.extrema>`
* - `filter_bands <https://processes.openeo.org/#filter_bands>`_
Expand Down
38 changes: 3 additions & 35 deletions openeo/rest/_datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import requests

import openeo
from openeo.internal.graph_building import FlatGraphableMixin, PGNode, _FromNodeMixin
from openeo.internal.jupyter import render_component
from openeo.internal.processes.builder import (
Expand All @@ -23,6 +24,8 @@
if typing.TYPE_CHECKING:
# Imports for type checking only (circular import issue at runtime).
from openeo.rest.connection import Connection
from openeo.rest.result import SaveResult
from openeo.rest.stac_resource import StacResource

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -321,38 +324,3 @@ def build_child_callback(
raise ValueError(process)

return PGNode.to_process_graph_argument(pg)


def _ensure_save_result(
cube: _ProcessGraphAbstraction,
*,
format: Optional[str] = None,
options: Optional[dict] = None,
weak_format: Optional[str] = None,
default_format: str,
method: str,
) -> _ProcessGraphAbstraction:
"""
Make sure there is a`save_result` node in the process graph.

:param format: (optional) desired `save_result` file format
:param options: (optional) desired `save_result` file format parameters
:param weak_format: (optional) weak format indicator guessed from file name
:param default_format: default format for data type to use when no format is specified by user
:return:
"""
# TODO #278 instead of standalone helper function, move this to common base class for raster cubes, vector cubes, ...
save_result_nodes = [n for n in cube.result_node().walk_nodes() if n.process_id == "save_result"]

if not save_result_nodes:
# No `save_result` node yet: automatically add it.
# TODO: the `save_result` method is not defined on _ProcessGraphAbstraction, but it is on DataCube and VectorCube
cube = cube.save_result(format=format or weak_format or default_format, options=options)
elif format or options:
raise OpenEoClientException(
f"{method} with explicit output {'format' if format else 'options'} {format or options!r},"
f" but the process graph already has `save_result` node(s)"
f" which is ambiguous and should not be combined."
)

return cube
132 changes: 80 additions & 52 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@
from openeo.rest._datacube import (
THIS,
UDF,
_ensure_save_result,
_ProcessGraphAbstraction,
build_child_callback,
)
from openeo.rest.graph_building import CollectionProperty
from openeo.rest.job import BatchJob, RESTJob
from openeo.rest.mlmodel import MlModel
from openeo.rest.result import SaveResult
from openeo.rest.service import Service
from openeo.rest.udp import RESTUserDefinedProcess
from openeo.rest.vectorcube import VectorCube
Expand Down Expand Up @@ -2330,22 +2330,46 @@ def atmospheric_correction(self, method: str = None, elevation_model: str = None
@openeo_process
def save_result(
self,
# TODO: does it make sense for the client to define a (hard coded) default format here?
format: str = _DEFAULT_RASTER_FORMAT,
options: Optional[dict] = None,
) -> DataCube:
) -> SaveResult:
"""
Materialize the processed data to the given file format.

:param format: an output format supported by the backend.
:param options: file format options

.. versionchanged:: 0.39.0
returns a :py:class:`~openeo.rest.result.SaveResult` instance instead
of another :py:class:`~openeo.rest.datacube.DataCube` instance.
"""
if self._connection:
formats = set(self._connection.list_output_formats().keys())
# TODO: map format to correct casing too?
if format.lower() not in {f.lower() for f in formats}:
raise ValueError("Invalid format {f!r}. Should be one of {s}".format(f=format, s=formats))
return self.process(

pg = self._build_pgnode(
process_id="save_result",
arguments={
"data": THIS,
"data": self,
"format": format,
# TODO: leave out options if unset?
"options": options or {}
}
"options": options or {},
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
},
)
return SaveResult(pg, connection=self._connection)

def _auto_save_result(
self,
format: Optional[str] = None,
outputfile: Optional[Union[str, pathlib.Path]] = None,
options: Optional[dict] = None,
) -> SaveResult:
return self.save_result(
format=format or (guess_format(outputfile) if outputfile else None) or self._DEFAULT_RASTER_FORMAT,
options=options,
)

def download(
Expand All @@ -2365,12 +2389,12 @@ def download(
If outputfile is provided, the result is stored on disk locally, otherwise, a bytes object is returned.
The bytes object can be passed on to a suitable decoder for decoding.

:param outputfile: Optional, an output file if the result needs to be stored on disk.
:param outputfile: Optional, output path to download to.
:param format: Optional, an output format supported by the backend.
:param options: Optional, file format options
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
Expand All @@ -2384,18 +2408,12 @@ def download(
Added arguments ``additional`` and ``job_options``.
"""
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
format=format,
options=options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.download()",
)
res = self._auto_save_result(format=format, outputfile=outputfile, options=options)
else:
res = self
return self._connection.download(
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
cube.flat_graph(), outputfile, validate=validate, additional=additional, job_options=job_options
res.flat_graph(), outputfile=outputfile, validate=validate, additional=additional, job_options=job_options
)

def validate(self) -> List[dict]:
Expand Down Expand Up @@ -2510,19 +2528,35 @@ def execute_batch(
**format_options,
) -> BatchJob:
"""
Evaluate the process graph by creating a batch job, and retrieving the results when it is finished.
This method is mostly recommended if the batch job is expected to run in a reasonable amount of time.
Execute the underlying process graph at the backend in batch job mode:

For very long-running jobs, you probably do not want to keep the client running.
- create the job (like :py:meth:`create_job`)
- start the job (like :py:meth:`BatchJob.start() <openeo.rest.job.BatchJob.start>`)
- track the job's progress with an active polling loop
(like :py:meth:`BatchJob.run_synchronous() <openeo.rest.job.BatchJob.run_synchronous>`)
- optionally (if ``outputfile`` is specified) download the job's results
when the job finished successfully

:param outputfile: The path of a file to which a result can be written
.. note::
Because of the active polling loop,
which blocks any further progress of your script or application,
this :py:meth:`execute_batch` method is mainly recommended
for batch jobs that are expected to complete
in a time that is reasonable for your use case.

:param outputfile: Optional, output path to download to.
:param out_format: (optional) File format to use for the job result.
:param title: job title.
:param description: job description.
:param plan: The billing plan to process and charge the job with
:param budget: Maximum budget to be spent on executing the job.
Note that some backends do not honor this limit.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph.
:param show_error_logs: whether to automatically print error logs when the batch job failed.
:param log_level: Optional minimum severity level for log entries that the back-end should keep track of.
One of "error" (highest severity), "warning", "info", and "debug" (lowest severity).
Expand All @@ -2546,27 +2580,23 @@ def execute_batch(
out_format = format_options["format"] # align with 'download' call arg name

# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
format=out_format,
options=format_options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.execute_batch()",
)
res = self._auto_save_result(format=out_format, outputfile=outputfile, options=format_options)
create_kwargs = {}
else:
res = self
create_kwargs = {"auto_add_save_result": False}

job = cube.create_job(
job = res.create_job(
title=title,
description=description,
plan=plan,
budget=budget,
additional=additional,
job_options=job_options,
validate=validate,
auto_add_save_result=False,
log_level=log_level,
**create_kwargs,
)
return job.run_synchronous(
outputfile=outputfile,
Expand All @@ -2593,25 +2623,27 @@ def create_job(
**format_options,
) -> BatchJob:
"""
Sends the datacube's process graph as a batch job to the back-end
and return a :py:class:`~openeo.rest.job.BatchJob` instance.
Send the underlying process graph to the backend
to create an openEO batch job
and return a corresponding :py:class:`~openeo.rest.job.BatchJob` instance.

Note that the batch job will just be created at the back-end,
it still needs to be started and tracked explicitly.
Use :py:meth:`execute_batch` instead to have the openEO Python client take care of that job management.
Note that this method only *creates* the openEO batch job at the backend,
but it does not *start* it.
Use :py:meth:`execute_batch` instead to let the openEO Python client
take care of the full job life cycle: create, start and track its progress until completion.

:param out_format: output file format.
:param title: job title
:param description: job description
:param plan: The billing plan to process and charge the job with
:param title: job title.
:param description: job description.
:param plan: The billing plan to process and charge the job with.
:param budget: Maximum budget to be spent on executing the job.
Note that some backends do not honor this limit.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph.
:param log_level: Optional minimum severity level for log entries that the back-end should keep track of.
One of "error" (highest severity), "warning", "info", and "debug" (lowest severity).

Expand All @@ -2629,17 +2661,13 @@ def create_job(
# TODO: add option to also automatically start the job?
# TODO: avoid using all kwargs as format_options
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
format=out_format,
options=format_options or None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.create_job()",
)
res = self._auto_save_result(format=out_format, options=format_options)
else:
res = self

return self._connection.create_job(
process_graph=cube.flat_graph(),
process_graph=res.flat_graph(),
title=title,
description=description,
plan=plan,
Expand Down
37 changes: 37 additions & 0 deletions openeo/rest/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from openeo.rest.stac_resource import StacResource


class SaveResult(StacResource):
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
"""
Handle for a process graph that represents the return value
of the openEO process ``save_result``,
as returned by methods like
:py:meth:`DataCube.save_result() <openeo.rest.datacube.DataCube.save_result>`
and :py:meth:`VectorCube.save_result() <openeo.rest.vectorcube.VectorCube.save_result>`.

.. note ::
This class is practically a just direct alias for
:py:class:`~openeo.rest.stac_resource.StacResource`,
but with a more self-explanatory name.

Moreover, this additional abstraction layer also acts somewhat as an adapter between
the incompatible return values from the ``save_result`` process
in different versions of the official openeo-processes definitions:

- in openeo-processes 1.x: ``save_result`` just returned a boolean,
but that was not really useful to further build upon
and was never properly exposed in the openEO Python client.
- in openeo-processes 2.x: ``save_result`` returns a new concept:
a "STAC resource" (object with subtype "stac")
which is a more useful and flexible representation of an openEO result,
allowing additional operations.

The openEO Python client returns the same :py:class:`SaveResult` object
in both cases however.
It does that not only for simplicity,
but also because it seems more useful (even in legacy openeo-processes 1.x use cases)
to follow the new STAC resource based usage patterns
than to strictly return some boolean wrapper nobody has use for.

.. versionadded:: 0.39.0
"""
Loading