Skip to content

Commit

Permalink
Issue #720/#402 Introduce StacResource/SaveResult returned from `save…
Browse files Browse the repository at this point in the history
…_result`
  • Loading branch information
soxofaan committed Feb 7, 2025
1 parent 29a3276 commit 291c89a
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 196 deletions.
27 changes: 17 additions & 10 deletions openeo/rest/_datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import requests

import openeo
from openeo.internal.graph_building import FlatGraphableMixin, PGNode, _FromNodeMixin
from openeo.internal.jupyter import render_component
from openeo.internal.processes.builder import (
Expand All @@ -23,6 +24,8 @@
if typing.TYPE_CHECKING:
# Imports for type checking only (circular import issue at runtime).
from openeo.rest.connection import Connection
from openeo.rest.result import SaveResult
from openeo.rest.stac_resource import StacResource

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -331,7 +334,7 @@ def _ensure_save_result(
weak_format: Optional[str] = None,
default_format: str,
method: str,
) -> _ProcessGraphAbstraction:
) -> Union[SaveResult, StacResource]:
"""
Make sure there is a`save_result` node in the process graph.
Expand All @@ -346,13 +349,17 @@ def _ensure_save_result(

if not save_result_nodes:
# No `save_result` node yet: automatically add it.
# TODO: the `save_result` method is not defined on _ProcessGraphAbstraction, but it is on DataCube and VectorCube
cube = cube.save_result(format=format or weak_format or default_format, options=options)
elif format or options:
raise OpenEoClientException(
f"{method} with explicit output {'format' if format else 'options'} {format or options!r},"
f" but the process graph already has `save_result` node(s)"
f" which is ambiguous and should not be combined."
)
if isinstance(cube, (openeo.DataCube, openeo.VectorCube)):
pg_with_save_result = cube.save_result(format=format or weak_format or default_format, options=options)
else:
raise OpenEoClientException(f"No support to add `save_result` on {cube!r}.")
else:
if format or options:
raise OpenEoClientException(
f"{method} with explicit output {'format' if format else 'options'} {format or options!r},"
f" but the process graph already has `save_result` node(s)"
f" which is ambiguous and should not be combined."
)
pg_with_save_result = cube

return cube
return pg_with_save_result
46 changes: 28 additions & 18 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
from openeo.rest.graph_building import CollectionProperty
from openeo.rest.job import BatchJob, RESTJob
from openeo.rest.mlmodel import MlModel
from openeo.rest.result import SaveResult
from openeo.rest.service import Service
from openeo.rest.stac_resource import StacResource
from openeo.rest.udp import RESTUserDefinedProcess
from openeo.rest.vectorcube import VectorCube
from openeo.util import dict_no_none, guess_format, load_json, normalize_crs, rfc3339
Expand Down Expand Up @@ -2326,21 +2328,23 @@ def save_result(
self,
format: str = _DEFAULT_RASTER_FORMAT,
options: Optional[dict] = None,
) -> DataCube:
) -> SaveResult:
if self._connection:
formats = set(self._connection.list_output_formats().keys())
# TODO: map format to correct casing too?
if format.lower() not in {f.lower() for f in formats}:
raise ValueError("Invalid format {f!r}. Should be one of {s}".format(f=format, s=formats))
return self.process(

pg = self._build_pgnode(
process_id="save_result",
arguments={
"data": THIS,
"data": self,
"format": format,
# TODO: leave out options if unset?
"options": options or {}
}
"options": options or {},
},
)
return SaveResult(pg, connection=self._connection)

def download(
self,
Expand Down Expand Up @@ -2378,18 +2382,19 @@ def download(
Added arguments ``additional`` and ``job_options``.
"""
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
res = _ensure_save_result(
cube=self,
format=format,
options=options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.download()",
)
else:
res = self
return self._connection.download(
cube.flat_graph(), outputfile, validate=validate, additional=additional, job_options=job_options
res.flat_graph(), outputfile=outputfile, validate=validate, additional=additional, job_options=job_options
)

def validate(self) -> List[dict]:
Expand Down Expand Up @@ -2538,27 +2543,30 @@ def execute_batch(
out_format = format_options["format"] # align with 'download' call arg name

# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
res = _ensure_save_result(
cube=self,
format=out_format,
options=format_options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.execute_batch()",
)
create_kwargs = {}
else:
res = self
create_kwargs = {"auto_add_save_result": False}

job = cube.create_job(
job = res.create_job(
title=title,
description=description,
plan=plan,
budget=budget,
additional=additional,
job_options=job_options,
validate=validate,
auto_add_save_result=False,
log_level=log_level,
**create_kwargs,
)
return job.run_synchronous(
outputfile=outputfile,
Expand Down Expand Up @@ -2621,17 +2629,19 @@ def create_job(
# TODO: add option to also automatically start the job?
# TODO: avoid using all kwargs as format_options
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
res = _ensure_save_result(
cube=self,
format=out_format,
options=format_options or None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.create_job()",
)
else:
res = self

return self._connection.create_job(
process_graph=cube.flat_graph(),
process_graph=res.flat_graph(),
title=title,
description=description,
plan=plan,
Expand Down
7 changes: 7 additions & 0 deletions openeo/rest/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from openeo.rest.stac_resource import StacResource


class SaveResult(StacResource):
"""TODO"""

pass
127 changes: 127 additions & 0 deletions openeo/rest/stac_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from __future__ import annotations

import typing
from pathlib import Path
from typing import Optional, Union

from openeo.internal.documentation import openeo_process
from openeo.internal.graph_building import PGNode
from openeo.rest._datacube import _ProcessGraphAbstraction
from openeo.rest.job import BatchJob

if typing.TYPE_CHECKING:
from openeo.rest.connection import Connection


class StacResource(_ProcessGraphAbstraction):
"""
Handle for a progress graph node that represents a STAC resource (object with subtype "stac"),
e.g. as returned by `save_result`, or handled by `export_workspace`/`stac_modify`.
Refers to a STAC resource of any type (Catalog, Collection, or Item).
It can refer to:
- static STAC resources, e.g. hosted on cloud storage
- dynamic STAC resources made available via a STAC API
- a STAC JSON representation embedded as an argument into an openEO user-defined process
"""

def __init__(self, graph: PGNode, connection: Optional[Connection] = None):
super().__init__(pgnode=graph, connection=connection)

@openeo_process
def export_workspace(self, workspace: str, merge: Union[str, None] = None) -> StacResource:
"""
Export data to a cloud user workspace
Exports the given processing results made available through a STAC resource
(e.g., a STAC Collection) to the given user workspace.
The STAC resource itself is exported with all STAC resources and assets underneath.
:return: the potentially updated STAC resource.
"""
return StacResource(
graph=self._build_pgnode(
process_id="export_workspace", arguments={"data": self, "workspace": workspace, "merge": merge}
),
connection=self._connection,
)

def download(
self,
outputfile: Optional[Union[str, Path]] = None,
*,
validate: Optional[bool] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
):
"""TODO"""
return self._connection.download(
graph=self.flat_graph(),
outputfile=outputfile,
validate=validate,
additional=additional,
job_options=job_options,
)

def create_job(
self,
*,
title: Optional[str] = None,
description: Optional[str] = None,
plan: Optional[str] = None,
budget: Optional[float] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
log_level: Optional[str] = None,
) -> BatchJob:
"""TODO"""
return self._connection.create_job(
process_graph=self.flat_graph(),
title=title,
description=description,
plan=plan,
budget=budget,
validate=validate,
additional=additional,
job_options=job_options,
log_level=log_level,
)

def execute_batch(
self,
outputfile: Optional[Union[str, Path]] = None,
*,
title: Optional[str] = None,
description: Optional[str] = None,
plan: Optional[str] = None,
budget: Optional[float] = None,
print: typing.Callable[[str], None] = print,
max_poll_interval: float = 60,
connection_retry_interval: float = 30,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
show_error_logs: bool = True,
log_level: Optional[str] = None,
):
"""TODO"""
job = self.create_job(
title=title,
description=description,
plan=plan,
budget=budget,
additional=additional,
job_options=job_options,
validate=validate,
log_level=log_level,
)
return job.run_synchronous(
outputfile=outputfile,
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)
Loading

0 comments on commit 291c89a

Please sign in to comment.