Skip to content

Commit

Permalink
issue #693 solve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ElienVandermaesenVITO committed Jan 8, 2025
2 parents a6c6533 + c161f08 commit db6bbb8
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Argument `spatial_extent` in `load_collection` supports Shapely objects and loading GeoJSON from a local path.
- Added `show_error_logs` argument to `cube.execute_batch()`/`job.start_and_wait()`/... to toggle the automatic printing of error logs on failure ([#505](https://github.com/Open-EO/openeo-python-client/issues/505))

### Changed

Expand Down
4 changes: 2 additions & 2 deletions docs/batch_jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ When using
:py:meth:`job.start_and_wait() <openeo.rest.job.BatchJob.start_and_wait>`
or :py:meth:`cube.execute_batch() <openeo.rest.datacube.DataCube.execute_batch>`
to run a batch job and it fails,
the openEO Python client library will automatically
print the batch job logs and instructions to help with further investigation:
the openEO Python client library will print (by default)
the batch job's error logs and instructions to help with further investigation:
.. code-block:: pycon
Expand Down
5 changes: 4 additions & 1 deletion openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
ContextTimer,
LazyLoadCache,
dict_no_none,
ensure_dir,
ensure_list,
load_json_resource,
repr_truncate,
Expand Down Expand Up @@ -1757,7 +1758,9 @@ def download(
)

if outputfile is not None:
with Path(outputfile).open(mode="wb") as f:
target = Path(outputfile)
ensure_dir(target.parent)
with target.open(mode="wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
f.write(chunk)
else:
Expand Down
10 changes: 9 additions & 1 deletion openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,7 @@ def execute_batch(
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
show_error_logs: bool = True,
# TODO: deprecate `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand All @@ -2450,12 +2451,16 @@ def execute_batch(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
.. versionadded:: 0.36.0
Added argument ``additional``.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
# TODO: start showing deprecation warnings about these inconsistent argument names
if "format" in format_options and not out_format:
Expand Down Expand Up @@ -2485,7 +2490,10 @@ def execute_batch(
)
return job.run_synchronous(
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)

def create_job(
Expand Down
50 changes: 38 additions & 12 deletions openeo/rest/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,20 +235,43 @@ def logs(
return VisualList("logs", data=entries)

def run_synchronous(
self, outputfile: Union[str, Path, None] = None,
print=print, max_poll_interval=60, connection_retry_interval=30
self,
outputfile: Union[str, Path, None] = None,
print=print,
max_poll_interval=60,
connection_retry_interval=30,
show_error_logs: bool = True,
) -> BatchJob:
"""Start the job, wait for it to finish and download result"""
"""
Start the job, wait for it to finish and download result
:param outputfile: The path of a file to which a result can be written
:param print: print/logging function to show progress/status
:param max_poll_interval: maximum number of seconds to sleep between status polls
:param connection_retry_interval: how long to wait when status poll failed due to connection issue
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
self.start_and_wait(
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)
# TODO #135 support multi file result sets too?
if outputfile is not None:
self.download_result(outputfile)
return self

def start_and_wait(
self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10
self,
print=print,
max_poll_interval: int = 60,
connection_retry_interval: int = 30,
soft_error_max=10,
show_error_logs: bool = True,
) -> BatchJob:
"""
Start the batch job, poll its status and wait till it finishes (or fails)
Expand All @@ -257,7 +280,10 @@ def start_and_wait(
:param max_poll_interval: maximum number of seconds to sleep between status polls
:param connection_retry_interval: how long to wait when status poll failed due to connection issue
:param soft_error_max: maximum number of soft errors (e.g. temporary connection glitches) to allow
:return:
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
# TODO rename `connection_retry_interval` to something more generic?
start_time = time.time()
Expand Down Expand Up @@ -314,13 +340,13 @@ def soft_error(message: str):
poll_interval = min(1.25 * poll_interval, max_poll_interval)

if status != "finished":
# TODO: allow to disable this printing logs (e.g. in non-interactive contexts)?
# TODO: render logs jupyter-aware in a notebook context?
print(f"Your batch job {self.job_id!r} failed. Error logs:")
print(self.logs(level=logging.ERROR))
print(
f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`."
)
if show_error_logs:
print(f"Your batch job {self.job_id!r} failed. Error logs:")
print(self.logs(level=logging.ERROR))
print(
f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`."
)
raise JobFailedException(
f"Batch job {self.job_id!r} didn't finish successfully. Status: {status} (after {elapsed()}).",
job=self,
Expand Down
12 changes: 10 additions & 2 deletions openeo/rest/mlmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ def execute_batch(
connection_retry_interval=30,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
show_error_logs: bool = True,
) -> BatchJob:
"""
Evaluate the process graph by creating a batch job, and retrieving the results when it is finished.
This method is mostly recommended if the batch job is expected to run in a reasonable amount of time.
For very long running jobs, you probably do not want to keep the client running.
For very long-running jobs, you probably do not want to keep the client running.
:param job_options:
:param outputfile: The path of a file to which a result can be written
Expand All @@ -85,9 +86,13 @@ def execute_batch(
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionadded:: 0.36.0
Added argument ``additional``.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
job = self.create_job(
title=title,
Expand All @@ -100,7 +105,10 @@ def execute_batch(
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)

def create_job(
Expand Down
10 changes: 9 additions & 1 deletion openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def execute_batch(
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
show_error_logs: bool = True,
# TODO: avoid using kwargs as format options
**format_options,
) -> BatchJob:
Expand All @@ -277,6 +278,7 @@ def execute_batch(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param show_error_logs: whether to automatically print error logs when the batch job failed.
.. versionchanged:: 0.21.0
When not specified explicitly, output format is guessed from output file extension.
Expand All @@ -286,6 +288,9 @@ def execute_batch(
.. versionadded:: 0.36.0
Added argument ``additional``.
.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
cube = self
if auto_add_save_result:
Expand All @@ -310,7 +315,10 @@ def execute_batch(
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)

def create_job(
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"mock",
"requests-mock>=1.8.0",
"httpretty>=1.1.4",
"urllib3<2.3.0", # httpretty doesn't work properly with urllib3>=2.3.0. See #700 and https://github.com/gabrielfalcao/HTTPretty/issues/484
"netCDF4>=1.7.0",
"matplotlib", # TODO: eliminate matplotlib as test dependency
"geopandas",
Expand Down
9 changes: 4 additions & 5 deletions tests/extra/job_management/test_stac_job_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,10 @@ def handle_row(series):
job_db_exists._upload_items_bulk(collection_id=job_db_exists.collection_id, items=items)

# 10 items in total, 3 items per chunk, should result in 4 calls
assert mock_requests_post.call_count == 4
expected_calls = [
assert sorted(
(c.kwargs for c in mock_requests_post.call_args_list),
key=lambda d: sorted(d["json"]["items"].keys()),
) == [
{
"url": f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items",
"auth": None,
Expand All @@ -386,6 +388,3 @@ def handle_row(series):
"json": {"method": "upsert", "items": {item.id: item.to_dict() for item in items[9:]}},
},
]

for i, call in enumerate(mock_requests_post.call_args_list):
assert call[1] == expected_calls[i]
31 changes: 31 additions & 0 deletions tests/rest/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,37 @@ def test_execute_batch_with_error(con100, requests_mock, tmpdir):
]


@pytest.mark.parametrize("show_error_logs", [True, False])
def test_execute_batch_show_error_logs(con100, requests_mock, show_error_logs):
requests_mock.get(API_URL + "/file_formats", json={"output": {"GTiff": {"gis_data_types": ["raster"]}}})
requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"})
requests_mock.post(API_URL + "/jobs", status_code=201, headers={"OpenEO-Identifier": "f00ba5"})
requests_mock.post(API_URL + "/jobs/f00ba5/results", status_code=202)
requests_mock.get(API_URL + "/jobs/f00ba5", json={"status": "error", "progress": 100})
requests_mock.get(
API_URL + "/jobs/f00ba5/logs",
json={"logs": [{"id": "34", "level": "error", "message": "nope"}]},
)

stdout = []
with fake_time(), pytest.raises(JobFailedException):
con100.load_collection("SENTINEL2").execute_batch(
max_poll_interval=0.1, print=stdout.append, show_error_logs=show_error_logs
)

expected = [
"0:00:01 Job 'f00ba5': send 'start'",
"0:00:02 Job 'f00ba5': error (progress 100%)",
]
if show_error_logs:
expected += [
"Your batch job 'f00ba5' failed. Error logs:",
[{"id": "34", "level": "error", "message": "nope"}],
"Full logs can be inspected in an openEO (web) editor or with `connection.job('f00ba5').logs()`.",
]
assert stdout == expected


@pytest.mark.parametrize(["error_response", "expected"], [
(
{"exc": requests.ConnectionError("time out")},
Expand Down

0 comments on commit db6bbb8

Please sign in to comment.