From 17af3e5d2ebec5a82003baf99bd440c002e1ecb3 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Tue, 8 Nov 2022 10:08:18 +0100 Subject: [PATCH 01/31] Added callback test Added a test for functionality of a callback to be executed upon every completed delayed writing. --- trollflow2/plugins/__init__.py | 7 +++ trollflow2/tests/test_trollflow2.py | 74 +++++++++++++++++++++++++++-- 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index f0037129..4273c2fa 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -286,6 +286,13 @@ def save_datasets(job): ``staging_zone`` directory, such that the filename written to the headers remains meaningful. + The product list may contain a ``call_on_done`` parameter. This parameter + has effect if and only if ``eager_writing`` is False (which is the + default). It should refer to a ``dask.Delayed`` object wrapping a + callable. Upon computation time, this callable will be called with the + result of ``save_dataset``. This could be used, for example, to ship + products as soon as they are successfully produced. + Other arguments defined in the job list (either directly under ``product_list``, or under ``formats``) are passed on to the satpy writer. The arguments ``use_tmp_file``, ``staging_zone``, ``output_dir``, diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 9fd2a233..91be1f3c 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -31,7 +31,10 @@ from functools import partial import pytest -from pyresample.geometry import DynamicAreaDefinition +from pyresample.geometry import DynamicAreaDefinition, create_area_def + +import dask.array as da +import xarray as xr from trollflow2.tests.utils import TestCase from trollflow2.launcher import read_config @@ -576,6 +579,7 @@ def test_pop_unknown_args(self): "fname_pattern": "name.tif", "use_tmp_file": True, "staging_zone": "értékesítési szakember", + "call_on_done": None, "areas": { "euron1": { "products": { @@ -608,7 +612,7 @@ def test_pop_unknown_args(self): assert "PhysicUnit" in job["resampled_scenes"]["euron1"].mock_calls[0].kwargs.keys() for absent in {"use_tmp_file", "staging_zone", "output_dir", - "fname_pattern", "dispatch"}: + "fname_pattern", "dispatch", "callback"}: assert absent not in job["resampled_scenes"]["euron1"].mock_calls[0].kwargs.keys() @@ -616,6 +620,7 @@ def _create_job_for_save_datasets(): from yaml import UnsafeLoader job = {} job['input_mda'] = input_mda + job['product_list'] = { 'product_list': read_config(raw_string=yaml_test_save, Loader=UnsafeLoader)['product_list'], } @@ -668,6 +673,67 @@ def test_use_staging_zone_tmpfile(tmp_path): assert next(iter(renames.keys())).startswith(os.fspath(tmp_path)) +def test_save_datasets_callback(tmp_path): + """Test callback functionality for save_datasets + + Test that the functionality for a callback after each produced file is + working correctly in the save_datasets plugin. + """ + from trollflow2.plugins import save_datasets + from satpy.tests.utils import make_fake_scene + + fake_area = create_area_def("sargasso", 4326, resolution=1, width=5, height=5, center=(0, 0)) + fake_scene = make_fake_scene( + {"dragon_top_height": (dat := xr.DataArray( + dims=("y", "x"), + data=da.arange(25).reshape((5, 5)))), + "penguin_bottom_height": dat, + "kraken_depth": dat}, + daskify=True, + area=fake_area) + job = {} + job['input_mda'] = input_mda + + counter = 0 + + def count(obj): + """Toy function adding to counter""" + nonlocal counter + counter += 1 + return obj + + form = [{"writer": "geotiff", "fname_pattern": "test.tif"}] + + product_list = { + "fname_pattern": "name.tif", + "output_dir": os.fspath(tmp_path / "test"), + "call_on_done": count, + "areas": { + "sargasso": { + "products": { + "dragon_top_height": { + "productname": "dragon_top_height", + "formats": copy.deepcopy(form)}, + "penguin_bottom_height": { + "productname": "penguin_bottom_height", + "formats": copy.deepcopy(form)}, + "kraken_depth": { + "productname": "kraken_depth", + "formats": copy.deepcopy(form)}, + } + } + } + } + job["product_list"] = {"product_list": product_list} + job['resampled_scenes'] = {"sargasso": fake_scene} + + job['produced_files'] = mock.MagicMock() + + job['product_list']['product_list']['call_on_done'] = count + save_datasets(job) + assert counter == 3 + + class TestCreateScene(TestCase): """Test case for creating a scene.""" @@ -2003,8 +2069,8 @@ def test_persisted(sc_3a_3b): prods[p] = {"min_valid_data_fraction": 40} def fake_persist(*args): - for da in args: - da.attrs["persisted"] = True + for daa in args: + daa.attrs["persisted"] = True return args with mock.patch("dask.persist", new=fake_persist): From 22587aa574eb59a26341a017fb631324da88a939 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Tue, 8 Nov 2022 14:55:46 +0100 Subject: [PATCH 02/31] Add functionality for callback when computation done Added functionality for a callback function that is called as soon as the computation is completed. --- trollflow2/plugins/__init__.py | 13 ++++++++++++- trollflow2/tests/test_trollflow2.py | 28 +++++++++++++++------------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 4273c2fa..40379809 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -303,9 +303,20 @@ def save_datasets(job): base_config = job['input_mda'].copy() base_config.pop('dataset', None) eager_writing = job['product_list']['product_list'].get("eager_writing", False) + sentinel = object() + call_on_done = job["product_list"]["product_list"].get("call_on_done", sentinel) + if call_on_done is not sentinel: + callback = dask.delayed(call_on_done) + else: + callback = None with renamed_files() as renames: for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config): - obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) + if callback is not None: + obj = callback( + save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing), + fmat_config["filename"]) + else: + obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) if obj is not None: objs.append(obj) job['produced_files'].put(fmat_config['filename']) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 91be1f3c..cb5b1c25 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -612,7 +612,7 @@ def test_pop_unknown_args(self): assert "PhysicUnit" in job["resampled_scenes"]["euron1"].mock_calls[0].kwargs.keys() for absent in {"use_tmp_file", "staging_zone", "output_dir", - "fname_pattern", "dispatch", "callback"}: + "fname_pattern", "dispatch", "call_on_done"}: assert absent not in job["resampled_scenes"]["euron1"].mock_calls[0].kwargs.keys() @@ -673,7 +673,7 @@ def test_use_staging_zone_tmpfile(tmp_path): assert next(iter(renames.keys())).startswith(os.fspath(tmp_path)) -def test_save_datasets_callback(tmp_path): +def test_save_datasets_callback(tmp_path, caplog): """Test callback functionality for save_datasets Test that the functionality for a callback after each produced file is @@ -694,20 +694,20 @@ def test_save_datasets_callback(tmp_path): job = {} job['input_mda'] = input_mda - counter = 0 + logger = logging.getLogger("testlogger") - def count(obj): - """Toy function adding to counter""" - nonlocal counter - counter += 1 + def testlog(obj, filename): + """Toy function doing some logging""" + logger.info(f"Wrote {filename} successfully") + assert os.path.exists(filename) # ensures we are compute-time return obj - form = [{"writer": "geotiff", "fname_pattern": "test.tif"}] + form = [{"writer": "geotiff"}] product_list = { - "fname_pattern": "name.tif", + "fname_pattern": "{productname}.tif", "output_dir": os.fspath(tmp_path / "test"), - "call_on_done": count, + "call_on_done": testlog, "areas": { "sargasso": { "products": { @@ -729,9 +729,11 @@ def count(obj): job['produced_files'] = mock.MagicMock() - job['product_list']['product_list']['call_on_done'] = count - save_datasets(job) - assert counter == 3 + with caplog.at_level(logging.INFO): + save_datasets(job) + for nm in {"dragon_top_height", "penguin_bottom_height", "kraken_depth"}: + exp = tmp_path / "test" / f"{nm:s}.tif" + assert f"Wrote {exp!s} successfully" in caplog.text class TestCreateScene(TestCase): From 07216d2f6c7b2c24c8df11a9e93f995ebb4339f3 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Tue, 8 Nov 2022 15:00:51 +0100 Subject: [PATCH 03/31] Fix call_on_done docu Fix documentation for call_on_done and add example to the example playlist. --- examples/pl.yaml | 2 ++ trollflow2/plugins/__init__.py | 13 +++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/examples/pl.yaml b/examples/pl.yaml index ee840039..7fd338ed 100644 --- a/examples/pl.yaml +++ b/examples/pl.yaml @@ -66,6 +66,8 @@ product_list: &product_list # # pass extra keyword arguments to Scene.load # scene_load_kwargs: # upper_right_corner: "NE" + # call_on_done: + # !!python/name:mypkg.callable areas: omerc_bb: diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 40379809..704ca74d 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -286,12 +286,13 @@ def save_datasets(job): ``staging_zone`` directory, such that the filename written to the headers remains meaningful. - The product list may contain a ``call_on_done`` parameter. This parameter - has effect if and only if ``eager_writing`` is False (which is the - default). It should refer to a ``dask.Delayed`` object wrapping a - callable. Upon computation time, this callable will be called with the - result of ``save_dataset``. This could be used, for example, to ship - products as soon as they are successfully produced. + The product list may contain a ``call_on_done`` parameter. This + parameter has effect if and only if ``eager_writing`` is False (which + is the default). It should refer to a callable. Upon computation + time, this callable will be called with two arguments: the result of + ``save_dataset``, and the filename that was written. This could be + used, for example, to ship products as soon as they are successfully + produced. Other arguments defined in the job list (either directly under ``product_list``, or under ``formats``) are passed on to the satpy writer. The From a2e92fe31bf29ab4587a26859e0ff00b621e60ea Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Tue, 8 Nov 2022 15:02:40 +0100 Subject: [PATCH 04/31] Documentation clarifiraciotn --- trollflow2/plugins/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 704ca74d..63da55ba 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -290,8 +290,9 @@ def save_datasets(job): parameter has effect if and only if ``eager_writing`` is False (which is the default). It should refer to a callable. Upon computation time, this callable will be called with two arguments: the result of - ``save_dataset``, and the filename that was written. This could be - used, for example, to ship products as soon as they are successfully + ``save_dataset``, and the filename that was written. The callable + must return again the ``save_dataset`` return value. This callback could + be used, for example, to ship products as soon as they are successfully produced. Other arguments defined in the job list (either directly under From fa30b700b5a0aa4130e4c9e4aeb22b8906695080 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 9 Nov 2022 08:52:50 +0100 Subject: [PATCH 05/31] Allow multiple call-on-done callbacks For the call-on-done functionality, allow multiple callbacks to be called each in turn. --- trollflow2/plugins/__init__.py | 23 +++++++++++++---------- trollflow2/tests/test_trollflow2.py | 5 +++-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 63da55ba..12a54f75 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -288,10 +288,12 @@ def save_datasets(job): The product list may contain a ``call_on_done`` parameter. This parameter has effect if and only if ``eager_writing`` is False (which - is the default). It should refer to a callable. Upon computation - time, this callable will be called with two arguments: the result of - ``save_dataset``, and the filename that was written. The callable - must return again the ``save_dataset`` return value. This callback could + is the default). It should contain a list of references to callabales. + Upon computation + time, each callable will be called with three arguments: the result of + ``save_dataset``, the full job dictionary, and the dictionary describing + the format config and output filename that was written. The callables + must return again the ``save_dataset`` return value again. This callback could be used, for example, to ship products as soon as they are successfully produced. @@ -308,15 +310,16 @@ def save_datasets(job): sentinel = object() call_on_done = job["product_list"]["product_list"].get("call_on_done", sentinel) if call_on_done is not sentinel: - callback = dask.delayed(call_on_done) + callbacks = [dask.delayed(c) for c in call_on_done] else: - callback = None + callbacks = None with renamed_files() as renames: for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config): - if callback is not None: - obj = callback( - save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing), - fmat_config["filename"]) + if callbacks: + for callback in callbacks: + obj = callback( + save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing), + job, fmat_config) else: obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) if obj is not None: diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index cb5b1c25..e8f3fbaa 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -696,8 +696,9 @@ def test_save_datasets_callback(tmp_path, caplog): logger = logging.getLogger("testlogger") - def testlog(obj, filename): + def testlog(obj, job, fmat_config): """Toy function doing some logging""" + filename = fmat_config["filename"] logger.info(f"Wrote {filename} successfully") assert os.path.exists(filename) # ensures we are compute-time return obj @@ -707,7 +708,7 @@ def testlog(obj, filename): product_list = { "fname_pattern": "{productname}.tif", "output_dir": os.fspath(tmp_path / "test"), - "call_on_done": testlog, + "call_on_done": [testlog], "areas": { "sargasso": { "products": { From 40d1c3064571cebad9667ca99bec9d5c6d9b154a Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 9 Nov 2022 09:24:46 +0100 Subject: [PATCH 06/31] Added logging and moving callbacks --- trollflow2/plugins/__init__.py | 54 +++++++++++++++++++++++------ trollflow2/tests/test_trollflow2.py | 29 ++++++++++++++++ 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 12a54f75..a3060359 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -286,16 +286,17 @@ def save_datasets(job): ``staging_zone`` directory, such that the filename written to the headers remains meaningful. - The product list may contain a ``call_on_done`` parameter. This - parameter has effect if and only if ``eager_writing`` is False (which - is the default). It should contain a list of references to callabales. - Upon computation - time, each callable will be called with three arguments: the result of - ``save_dataset``, the full job dictionary, and the dictionary describing - the format config and output filename that was written. The callables - must return again the ``save_dataset`` return value again. This callback could - be used, for example, to ship products as soon as they are successfully - produced. + The product list may contain a ``call_on_done`` parameter. + This parameter has effect if and only if ``eager_writing`` is False + (which is the default). It should contain a list of references to + callabales. Upon computation time, each callable will be called + with three arguments: the result of ``save_dataset``, the full + job dictionary, and the dictionary describing the format config and + output filename that was written. The callables must return again the + ``save_dataset`` return value again. This callback could be used, for + example, to ship products as soon as they are successfully produced. + Two callback functions are provided with trollflow2: :func:`callback_log` + and :func:`callback_move`. Other arguments defined in the job list (either directly under ``product_list``, or under ``formats``) are passed on to the satpy writer. The @@ -915,3 +916,36 @@ def _product_meets_min_valid_data_fraction( LOG.debug(f"Found {rel_valid:%}>{min_frac:%}, keeping " f"{prod_name:s} for area {area_name:s} in the worklist") return True + + +def callback_log(obj, job, fmat_config): + """Logging callback for save_datasets call_on_done. + + Callback function that can be used with the :func:`save_datasets` + ``call_on_done`` functionality. Will log a message with loglevel INFO to + report that the filename was written successfully. + """ + + filename = fmat_config["filename"] + LOG.info(f"Wrote {filename:s} successfully.") + return obj + + +def callback_move(obj, job, fmat_config): + """Mover callback for save_datasets call_on_done. + + Callback function that can be used with the :func:`save_datasets` + ``cal_on_done`` functionality. Moves the file to the directory indicated + with ``final_output_dir`` in the configuration. This directory will be + created if needed. + + Using this callback is incompatible with the options ``staging_zone`` or + ``use_tmp_file``. + """ + srcfile = pathlib.Path(fmat_config["filename"]) + destdir = pathlib.Path(job["product_list"]["product_list"]["final_output_dir"]) + destdir.mkdir(exist_ok=True, parents=True) + destfile = destdir / srcfile.name + LOG.debug(f"Moving {srcfile!s} to {destfile!s}") + srcfile.rename(destfile) + return obj diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index e8f3fbaa..444294fb 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2084,5 +2084,34 @@ def fake_persist(*args): assert not sc_3a_3b["another"].attrs.get("persisted") +def test_callback_log(caplog): + """Test callback log functionality.""" + from trollflow2.plugins import callback_log + obj = object() + with caplog.at_level(logging.INFO): + res = callback_log(obj, {}, {"filename": "bouvetøya"}) + assert res is obj + assert "Wrote bouvetøya successfully." in caplog.text + + +def test_callback_move(caplog, tmp_path): + """Test callback move functionality.""" + from trollflow2.plugins import callback_move + obj = object() + srcdir = tmp_path / "src" + srcfile = srcdir / "orkney" + srcdir.mkdir(parents=True, exist_ok=True) + srcfile.touch() + destdir = tmp_path / "dest" + destfile = destdir / srcfile.name + job = {"product_list": {"product_list": {"final_output_dir": os.fspath(destdir)}}} + with caplog.at_level(logging.DEBUG): + res = callback_move(obj, job, {"filename": srcfile}) + assert res is obj + assert not srcfile.exists() + assert destfile.exists() + assert f"Moving {srcfile!s} to {destfile!s}" in caplog.text + + if __name__ == '__main__': unittest.main() From 21e1a3d14f577231936cf9aa2b1dcd4117958bc8 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 9 Nov 2022 09:41:54 +0100 Subject: [PATCH 07/31] Fix recursive calling When having mulitp.le call on dones, don't call save_dataset multiple times --- trollflow2/plugins/__init__.py | 9 +++++---- trollflow2/tests/test_trollflow2.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index a3060359..d932b689 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -317,10 +317,11 @@ def save_datasets(job): with renamed_files() as renames: for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config): if callbacks: - for callback in callbacks: - obj = callback( - save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing), - job, fmat_config) + obj = callbacks[0]( + save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing), + job, fmat_config) + for callback in callbacks[1:]: + obj = callback(obj, job, fmat_config) else: obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) if obj is not None: diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 444294fb..64ee4a67 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -708,7 +708,7 @@ def testlog(obj, job, fmat_config): product_list = { "fname_pattern": "{productname}.tif", "output_dir": os.fspath(tmp_path / "test"), - "call_on_done": [testlog], + "call_on_done": [testlog, testlog, testlog], "areas": { "sargasso": { "products": { From b07b4ebe32bca8c446fba0461ceab48cb44a11c5 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 9 Nov 2022 09:48:12 +0100 Subject: [PATCH 08/31] Add stronger test for successful writing Add stronger test to confirm writing was successful. Turns out it wasn't. --- trollflow2/tests/test_trollflow2.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 64ee4a67..2983f259 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -27,6 +27,7 @@ import os import unittest import copy +import pathlib from unittest import mock from functools import partial @@ -700,7 +701,10 @@ def testlog(obj, job, fmat_config): """Toy function doing some logging""" filename = fmat_config["filename"] logger.info(f"Wrote {filename} successfully") - assert os.path.exists(filename) # ensures we are compute-time + # ensure computation has indeed completed + p = pathlib.Path(filename) + assert p.exists() + assert p.stat().st_size > 0 return obj form = [{"writer": "geotiff"}] From 8c828becb75d42b7d9f513f9386f426e1aa20e33 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 9 Nov 2022 12:14:37 +0100 Subject: [PATCH 09/31] When logging, report on the written size. --- trollflow2/plugins/__init__.py | 3 ++- trollflow2/tests/test_trollflow2.py | 9 ++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index d932b689..c38a54e6 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -928,7 +928,8 @@ def callback_log(obj, job, fmat_config): """ filename = fmat_config["filename"] - LOG.info(f"Wrote {filename:s} successfully.") + size = os.path.getsize(filename) + LOG.info(f"Wrote {filename:s} successfully, total {size:d} bytes.") return obj diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 2983f259..7129ffc9 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2088,14 +2088,17 @@ def fake_persist(*args): assert not sc_3a_3b["another"].attrs.get("persisted") -def test_callback_log(caplog): +def test_callback_log(caplog, tmp_path): """Test callback log functionality.""" from trollflow2.plugins import callback_log + srcfile = tmp_path / "bouvetøya" + with srcfile.open(mode="w") as fp: + fp.write("x" * 10) obj = object() with caplog.at_level(logging.INFO): - res = callback_log(obj, {}, {"filename": "bouvetøya"}) + res = callback_log(obj, {}, {"filename": os.fspath(srcfile)}) assert res is obj - assert "Wrote bouvetøya successfully." in caplog.text + assert f"Wrote {srcfile!s} successfully, total 10 bytes." in caplog.text def test_callback_move(caplog, tmp_path): From 07d2ac5cd38f23abedeca5f08341dcc4b3c35772 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 11 Nov 2022 12:45:59 +0100 Subject: [PATCH 10/31] Add closer-callback Add a callback that closes the file. This should be used before logging or moving. --- trollflow2/plugins/__init__.py | 25 +++++++++++++++++++++++-- trollflow2/tests/test_trollflow2.py | 19 +++++++++---------- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index c38a54e6..78f4595c 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -34,6 +34,7 @@ import dpath.util import rasterio import dask +from dask.delayed import Delayed from posttroll.message import Message from posttroll.publisher import Publisher, NoisyPublisher from pyorbital.astronomy import sun_zenith_angle @@ -295,8 +296,10 @@ def save_datasets(job): output filename that was written. The callables must return again the ``save_dataset`` return value again. This callback could be used, for example, to ship products as soon as they are successfully produced. - Two callback functions are provided with trollflow2: :func:`callback_log` - and :func:`callback_move`. + Three callback functions are provided with trollflow2: + :func:`callback_log`, :func:`callback_move`, and :func:`callback_close`. + If using the geotiff writer, :func:`callback_close` should be used before + the others for correct results. Other arguments defined in the job list (either directly under ``product_list``, or under ``formats``) are passed on to the satpy writer. The @@ -951,3 +954,21 @@ def callback_move(obj, job, fmat_config): LOG.debug(f"Moving {srcfile!s} to {destfile!s}") srcfile.rename(destfile) return obj + + +def callback_close(obj, job, fmat_config): + """Callback closing files where needed. + + When using callbacks with writers that return a ``(src, target)`` pair for + ``da.store``, satpy doesn't close the file until after computation is + completed. That means there may be data that have been computed, but not + yet written to disk. This is normally the case for the geotiff writer. + For callbacks that depend on the files to be complete, the file should be + closed first. This callback should be prepended in this case. + + If passed a ``dask.Delayed`` object, this callback does nothing. If passed + a ``(src, targ)`` pair, it closes the target. + """ + if isinstance(obj, Delayed): + return + obj[1].close() diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 7129ffc9..06bc9723 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -580,7 +580,6 @@ def test_pop_unknown_args(self): "fname_pattern": "name.tif", "use_tmp_file": True, "staging_zone": "értékesítési szakember", - "call_on_done": None, "areas": { "euron1": { "products": { @@ -680,14 +679,14 @@ def test_save_datasets_callback(tmp_path, caplog): Test that the functionality for a callback after each produced file is working correctly in the save_datasets plugin. """ - from trollflow2.plugins import save_datasets + from trollflow2.plugins import save_datasets, callback_close from satpy.tests.utils import make_fake_scene - fake_area = create_area_def("sargasso", 4326, resolution=1, width=5, height=5, center=(0, 0)) + fake_area = create_area_def("sargasso", 4087, resolution=1, width=256, height=256, center=(0, 0)) fake_scene = make_fake_scene( {"dragon_top_height": (dat := xr.DataArray( dims=("y", "x"), - data=da.arange(25).reshape((5, 5)))), + data=da.zeros(shape=(256, 256)))), "penguin_bottom_height": dat, "kraken_depth": dat}, daskify=True, @@ -700,19 +699,19 @@ def test_save_datasets_callback(tmp_path, caplog): def testlog(obj, job, fmat_config): """Toy function doing some logging""" filename = fmat_config["filename"] - logger.info(f"Wrote {filename} successfully") - # ensure computation has indeed completed + # ensure computation has indeed completed and file was flushed p = pathlib.Path(filename) + logger.info(f"Wrote {filename} successfully, {p.stat().st_size:d} bytes") assert p.exists() - assert p.stat().st_size > 0 + assert p.stat().st_size == 131473 return obj - form = [{"writer": "geotiff"}] + form = [{"writer": "geotiff", "compress": "NONE"}] product_list = { "fname_pattern": "{productname}.tif", "output_dir": os.fspath(tmp_path / "test"), - "call_on_done": [testlog, testlog, testlog], + "call_on_done": [callback_close, testlog], "areas": { "sargasso": { "products": { @@ -738,7 +737,7 @@ def testlog(obj, job, fmat_config): save_datasets(job) for nm in {"dragon_top_height", "penguin_bottom_height", "kraken_depth"}: exp = tmp_path / "test" / f"{nm:s}.tif" - assert f"Wrote {exp!s} successfully" in caplog.text + assert f"Wrote {exp!s} successfully, 131473 bytes" in caplog.text class TestCreateScene(TestCase): From f14d7f9b8de49eb48e94fdb420a661ab4f20b166 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 11 Nov 2022 17:14:05 +0100 Subject: [PATCH 11/31] Make callback_close more flexible Make callback_close more flexible, such that it can handle the case where there are dask-dependent attributes, that must also be closed. --- examples/pl.yaml | 4 +++- trollflow2/plugins/__init__.py | 6 +++++- trollflow2/tests/test_trollflow2.py | 19 +++++++++++++------ 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/examples/pl.yaml b/examples/pl.yaml index 7fd338ed..55711234 100644 --- a/examples/pl.yaml +++ b/examples/pl.yaml @@ -67,7 +67,9 @@ product_list: &product_list # scene_load_kwargs: # upper_right_corner: "NE" # call_on_done: - # !!python/name:mypkg.callable + # - !!python/name:trollflow2.plugins.callback_close + # - !!python/name:trollflow2.plugins.callback_log + # - !!python/name:trollflow2.plugins.callback_move areas: omerc_bb: diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 78f4595c..9222fea6 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -971,4 +971,8 @@ def callback_close(obj, job, fmat_config): """ if isinstance(obj, Delayed): return - obj[1].close() + try: + obj[1].close() + except AttributeError: + for ob in obj[1]: + ob.close() diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 06bc9723..d3335201 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -690,7 +690,8 @@ def test_save_datasets_callback(tmp_path, caplog): "penguin_bottom_height": dat, "kraken_depth": dat}, daskify=True, - area=fake_area) + area=fake_area, + common_attrs={"start_time": dt.datetime(1985, 8, 13, 15)}) job = {} job['input_mda'] = input_mda @@ -703,13 +704,19 @@ def testlog(obj, job, fmat_config): p = pathlib.Path(filename) logger.info(f"Wrote {filename} successfully, {p.stat().st_size:d} bytes") assert p.exists() - assert p.stat().st_size == 131473 + assert p.stat().st_size > 65000 return obj - form = [{"writer": "geotiff", "compress": "NONE"}] + form = [ + {"writer": "geotiff", "compress": "NONE", "fill_value": 0}, + {"writer": "ninjogeotiff", "compress": "NONE", + "ChannelID": "IR -2+3i", "DataType": "ABCD", + "PhysicUnit": "K", "PhysicValue": "Temperature", + "SatelliteNameID": "PytrollSat", "fill_value": 0} + ] product_list = { - "fname_pattern": "{productname}.tif", + "fname_pattern": "{productname}-{writer}.tif", "output_dir": os.fspath(tmp_path / "test"), "call_on_done": [callback_close, testlog], "areas": { @@ -736,8 +743,8 @@ def testlog(obj, job, fmat_config): with caplog.at_level(logging.INFO): save_datasets(job) for nm in {"dragon_top_height", "penguin_bottom_height", "kraken_depth"}: - exp = tmp_path / "test" / f"{nm:s}.tif" - assert f"Wrote {exp!s} successfully, 131473 bytes" in caplog.text + exp = tmp_path / "test" / f"{nm:s}-geotiff.tif" + assert f"Wrote {exp!s} successfully" in caplog.text class TestCreateScene(TestCase): From 0a82a2792a262df410c75235a6b2f9f290f10f8b Mon Sep 17 00:00:00 2001 From: stickler-ci Date: Fri, 11 Nov 2022 16:14:52 +0000 Subject: [PATCH 12/31] Fixing style errors. --- trollflow2/tests/test_trollflow2.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index d3335201..a48f315b 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -708,12 +708,12 @@ def testlog(obj, job, fmat_config): return obj form = [ - {"writer": "geotiff", "compress": "NONE", "fill_value": 0}, - {"writer": "ninjogeotiff", "compress": "NONE", - "ChannelID": "IR -2+3i", "DataType": "ABCD", - "PhysicUnit": "K", "PhysicValue": "Temperature", - "SatelliteNameID": "PytrollSat", "fill_value": 0} - ] + {"writer": "geotiff", "compress": "NONE", "fill_value": 0}, + {"writer": "ninjogeotiff", "compress": "NONE", + "ChannelID": "IR -2+3i", "DataType": "ABCD", + "PhysicUnit": "K", "PhysicValue": "Temperature", + "SatelliteNameID": "PytrollSat", "fill_value": 0} + ] product_list = { "fname_pattern": "{productname}-{writer}.tif", From 14fdce880a3fc487e55e238426f4e2cfaf276db3 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 11 Nov 2022 17:56:03 +0100 Subject: [PATCH 13/31] callback_move should update filename and it does now --- trollflow2/plugins/__init__.py | 1 + trollflow2/tests/test_trollflow2.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 9222fea6..eefdb17a 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -953,6 +953,7 @@ def callback_move(obj, job, fmat_config): destfile = destdir / srcfile.name LOG.debug(f"Moving {srcfile!s} to {destfile!s}") srcfile.rename(destfile) + fmat_config["filename"] = os.fspath(destfile) return obj diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index a48f315b..4ed1129f 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2118,12 +2118,14 @@ def test_callback_move(caplog, tmp_path): destdir = tmp_path / "dest" destfile = destdir / srcfile.name job = {"product_list": {"product_list": {"final_output_dir": os.fspath(destdir)}}} + fname_config = {"filename": srcfile} with caplog.at_level(logging.DEBUG): - res = callback_move(obj, job, {"filename": srcfile}) + res = callback_move(obj, job, fname_config) assert res is obj assert not srcfile.exists() assert destfile.exists() assert f"Moving {srcfile!s} to {destfile!s}" in caplog.text + assert fname_config["filename"] == os.fspath(destfile) if __name__ == '__main__': From ff60ddd03d132ce06150ecfbb1086a0dab2226dc Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 11 Nov 2022 18:30:43 +0100 Subject: [PATCH 14/31] Add unit test for missing file problem Add a "unit" test (actually the intersection between two units) to reproduce the problem that the launcher complains about missing files if we have a mover callback. --- trollflow2/tests/test_trollflow2.py | 62 +++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 4ed1129f..a904e989 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -28,6 +28,7 @@ import unittest import copy import pathlib +import queue from unittest import mock from functools import partial @@ -673,13 +674,9 @@ def test_use_staging_zone_tmpfile(tmp_path): assert next(iter(renames.keys())).startswith(os.fspath(tmp_path)) -def test_save_datasets_callback(tmp_path, caplog): - """Test callback functionality for save_datasets - - Test that the functionality for a callback after each produced file is - working correctly in the save_datasets plugin. - """ - from trollflow2.plugins import save_datasets, callback_close +@pytest.fixture +def fake_scene(): + """Get a fake scene.""" from satpy.tests.utils import make_fake_scene fake_area = create_area_def("sargasso", 4087, resolution=1, width=256, height=256, center=(0, 0)) @@ -692,6 +689,17 @@ def test_save_datasets_callback(tmp_path, caplog): daskify=True, area=fake_area, common_attrs={"start_time": dt.datetime(1985, 8, 13, 15)}) + + return fake_scene + +def test_save_datasets_callback(tmp_path, caplog, fake_scene): + """Test callback functionality for save_datasets + + Test that the functionality for a callback after each produced file is + working correctly in the save_datasets plugin. + """ + from trollflow2.plugins import save_datasets, callback_close + job = {} job['input_mda'] = input_mda @@ -747,6 +755,46 @@ def testlog(obj, job, fmat_config): assert f"Wrote {exp!s} successfully" in caplog.text +def test_save_datasets_callback_move_check_products(tmp_path, caplog, fake_scene): + """Test that check_products and the callback move can cooperate. + """ + from trollflow2.plugins import save_datasets, callback_close, callback_move + from trollflow2.launcher import check_results + + job = {} + job['input_mda'] = input_mda + form = [{"writer": "geotiff", "fill_value": 0}] + + product_list = { + "fname_pattern": "{productname}.tif", + "output_dir": os.fspath(tmp_path / "test1"), + "final_output_dir": os.fspath(tmp_path / "test2"), + "call_on_done": [callback_close, callback_move], + "areas": { + "sargasso": { + "products": { + "dragon_top_height": { + "productname": "dragon_top_height", + "formats": copy.deepcopy(form)}, + "penguin_bottom_height": { + "productname": "penguin_bottom_height", + "formats": copy.deepcopy(form)}, + "kraken_depth": { + "productname": "kraken_depth", + "formats": copy.deepcopy(form)}, + } + } + } + } + job["product_list"] = {"product_list": product_list} + job['resampled_scenes'] = {"sargasso": fake_scene} + job["produced_files"] = queue.SimpleQueue() + save_datasets(job) + with caplog.at_level(logging.INFO): + check_results(job["produced_files"], dt.datetime(2001, 2, 3, 4, 5, 6), 0) + assert "All 3 files produced nominally" in caplog.text + + class TestCreateScene(TestCase): """Test case for creating a scene.""" From d30feb1f0841856ce44f83f1310666f6add0a86f Mon Sep 17 00:00:00 2001 From: stickler-ci Date: Fri, 11 Nov 2022 17:31:37 +0000 Subject: [PATCH 15/31] Fixing style errors. --- trollflow2/tests/test_trollflow2.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index a904e989..ba3d9055 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -692,6 +692,7 @@ def fake_scene(): return fake_scene + def test_save_datasets_callback(tmp_path, caplog, fake_scene): """Test callback functionality for save_datasets From 849df63a681e4060a9c39f08df4b524c663aeeb7 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 11 Nov 2022 19:29:49 +0100 Subject: [PATCH 16/31] revert the moving logic Revert the logic on moving with the call on done, using the staging zone again, and use the renames dictionary, but without actually renaming in the context manager. Pitfall: user must explicitly state that this is happening with an andditional setting early_moving: True. Open to alternatives. --- trollflow2/plugins/__init__.py | 31 ++++++++++++++++------------- trollflow2/tests/test_trollflow2.py | 14 ++++++++----- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index eefdb17a..39e796c3 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -257,14 +257,15 @@ def _create_data_query(product, res): @contextmanager -def renamed_files(): +def renamed_files(do_renames): """Context renaming files.""" renames = {} yield renames - for tmp_name, actual_name in renames.items(): - os.rename(tmp_name, actual_name) + if do_renames: + for tmp_name, actual_name in renames.items(): + os.rename(tmp_name, actual_name) def save_datasets(job): @@ -299,7 +300,8 @@ def save_datasets(job): Three callback functions are provided with trollflow2: :func:`callback_log`, :func:`callback_move`, and :func:`callback_close`. If using the geotiff writer, :func:`callback_close` should be used before - the others for correct results. + the others for correct results. When using :func:`callback_move`, the + user must also set ``early_moving`` to True and use a ``staging_zone``. Other arguments defined in the job list (either directly under ``product_list``, or under ``formats``) are passed on to the satpy writer. The @@ -311,13 +313,14 @@ def save_datasets(job): base_config = job['input_mda'].copy() base_config.pop('dataset', None) eager_writing = job['product_list']['product_list'].get("eager_writing", False) + early_moving = job['product_list']['product_list'].get("early_moving", False) sentinel = object() call_on_done = job["product_list"]["product_list"].get("call_on_done", sentinel) if call_on_done is not sentinel: callbacks = [dask.delayed(c) for c in call_on_done] else: callbacks = None - with renamed_files() as renames: + with renamed_files(not early_moving) as renames: for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config): if callbacks: obj = callbacks[0]( @@ -940,20 +943,20 @@ def callback_move(obj, job, fmat_config): """Mover callback for save_datasets call_on_done. Callback function that can be used with the :func:`save_datasets` - ``cal_on_done`` functionality. Moves the file to the directory indicated - with ``final_output_dir`` in the configuration. This directory will be + ``call_on_done`` functionality. Moves the file to the directory indicated + with ``output_dir`` in the configuration. This directory will be created if needed. - Using this callback is incompatible with the options ``staging_zone`` or - ``use_tmp_file``. + This callback must be used with ``staging_zone``. """ - srcfile = pathlib.Path(fmat_config["filename"]) - destdir = pathlib.Path(job["product_list"]["product_list"]["final_output_dir"]) - destdir.mkdir(exist_ok=True, parents=True) - destfile = destdir / srcfile.name + + # due to early moving, I've already changed the filename to the destination + # reverse logic also + destfile = pathlib.Path(fmat_config["filename"]) + srcdir = pathlib.Path(job["product_list"]["product_list"]["staging_zone"]) + srcfile = srcdir / destfile.name LOG.debug(f"Moving {srcfile!s} to {destfile!s}") srcfile.rename(destfile) - fmat_config["filename"] = os.fspath(destfile) return obj diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index a904e989..e6a0fd95 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -767,9 +767,10 @@ def test_save_datasets_callback_move_check_products(tmp_path, caplog, fake_scene product_list = { "fname_pattern": "{productname}.tif", - "output_dir": os.fspath(tmp_path / "test1"), - "final_output_dir": os.fspath(tmp_path / "test2"), + "staging_zone": os.fspath(tmp_path / "test1"), + "output_dir": os.fspath(tmp_path / "test2"), "call_on_done": [callback_close, callback_move], + "early_moving": True, "areas": { "sargasso": { "products": { @@ -2164,16 +2165,19 @@ def test_callback_move(caplog, tmp_path): srcdir.mkdir(parents=True, exist_ok=True) srcfile.touch() destdir = tmp_path / "dest" + destdir.mkdir(parents=True, exist_ok=True) destfile = destdir / srcfile.name - job = {"product_list": {"product_list": {"final_output_dir": os.fspath(destdir)}}} - fname_config = {"filename": srcfile} + job = {"product_list": + {"product_list": + {"staging_zone": os.fspath(srcdir), + "output_dir": os.fspath(destdir)}}} + fname_config = {"filename": os.fspath(destfile)} with caplog.at_level(logging.DEBUG): res = callback_move(obj, job, fname_config) assert res is obj assert not srcfile.exists() assert destfile.exists() assert f"Moving {srcfile!s} to {destfile!s}" in caplog.text - assert fname_config["filename"] == os.fspath(destfile) if __name__ == '__main__': From a462de6a16d2b99b7d494e78158f7656ec5a47a2 Mon Sep 17 00:00:00 2001 From: stickler-ci Date: Fri, 11 Nov 2022 18:31:37 +0000 Subject: [PATCH 17/31] Fixing style errors. --- trollflow2/tests/test_trollflow2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index b058f092..317d6eca 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2169,9 +2169,9 @@ def test_callback_move(caplog, tmp_path): destdir.mkdir(parents=True, exist_ok=True) destfile = destdir / srcfile.name job = {"product_list": - {"product_list": - {"staging_zone": os.fspath(srcdir), - "output_dir": os.fspath(destdir)}}} + {"product_list": + {"staging_zone": os.fspath(srcdir), + "output_dir": os.fspath(destdir)}}} fname_config = {"filename": os.fspath(destfile)} with caplog.at_level(logging.DEBUG): res = callback_move(obj, job, fname_config) From ec02361cd132cd751f294d9566cffb8caa849609 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Mon, 14 Nov 2022 08:27:56 +0100 Subject: [PATCH 18/31] Clarify order of callbacks Clarify that callback_log must be called after callback_move. --- examples/pl.yaml | 2 +- trollflow2/plugins/__init__.py | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/examples/pl.yaml b/examples/pl.yaml index 55711234..aef48869 100644 --- a/examples/pl.yaml +++ b/examples/pl.yaml @@ -68,8 +68,8 @@ product_list: &product_list # upper_right_corner: "NE" # call_on_done: # - !!python/name:trollflow2.plugins.callback_close - # - !!python/name:trollflow2.plugins.callback_log # - !!python/name:trollflow2.plugins.callback_move + # - !!python/name:trollflow2.plugins.callback_log areas: omerc_bb: diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 39e796c3..c8423d63 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -302,6 +302,9 @@ def save_datasets(job): If using the geotiff writer, :func:`callback_close` should be used before the others for correct results. When using :func:`callback_move`, the user must also set ``early_moving`` to True and use a ``staging_zone``. + If both are used, :func:`callback_log` must be called AFTER + :func:`callback_move`, because :func:`callback_log` searches for the final + destination of the file and reports the size (so it accesses the metadata). Other arguments defined in the job list (either directly under ``product_list``, or under ``formats``) are passed on to the satpy writer. The @@ -931,6 +934,11 @@ def callback_log(obj, job, fmat_config): Callback function that can be used with the :func:`save_datasets` ``call_on_done`` functionality. Will log a message with loglevel INFO to report that the filename was written successfully. + + If using :func:`callback_move` in combination with + :func:`callback_log`, you must call :func:`callback_log` AFTER + :func:`callback_move`, because the logger looks for the final + destination of the file, not the temporary one. """ filename = fmat_config["filename"] @@ -947,7 +955,11 @@ def callback_move(obj, job, fmat_config): with ``output_dir`` in the configuration. This directory will be created if needed. - This callback must be used with ``staging_zone``. + This callback must be used with ``staging_zone`` and ``early_moving`` MUST + be set in the configuration. If used in combination with + :func:`callback_log`, you must call :func:`callback_log` AFTER + :func:`callback_move`, because the logger looks for the final destination + of the file, not the temporary one. """ # due to early moving, I've already changed the filename to the destination From 6ea3d2c8a74cba1f1a16d39160f76ece97f86fb6 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Mon, 14 Nov 2022 17:54:15 +0100 Subject: [PATCH 19/31] more realistic test expectation Adapt test_save_datasets_callback to test a realistic size. Sadly this means the test now fails, but reality also fails. --- trollflow2/tests/test_trollflow2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 317d6eca..a2b90b16 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -711,9 +711,9 @@ def testlog(obj, job, fmat_config): filename = fmat_config["filename"] # ensure computation has indeed completed and file was flushed p = pathlib.Path(filename) - logger.info(f"Wrote {filename} successfully, {p.stat().st_size:d} bytes") + logger.info(f"Wrote {filename} successfully, 131505 bytes") assert p.exists() - assert p.stat().st_size > 65000 + assert p.stat().st_size == 131505 return obj form = [ From 1d0bbc78c9b281a3cb7c2e57b6b553483d859990 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Tue, 15 Nov 2022 09:15:50 +0100 Subject: [PATCH 20/31] Compare contents, not size When testing if the file has been written completely, compare the contents. Comparing the size is not enough. --- trollflow2/tests/test_trollflow2.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index a2b90b16..f06f7b9f 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -34,6 +34,7 @@ import pytest from pyresample.geometry import DynamicAreaDefinition, create_area_def +import rasterio import dask.array as da import xarray as xr @@ -679,11 +680,12 @@ def fake_scene(): """Get a fake scene.""" from satpy.tests.utils import make_fake_scene - fake_area = create_area_def("sargasso", 4087, resolution=1, width=256, height=256, center=(0, 0)) + x = 10 + fake_area = create_area_def("sargasso", 4087, resolution=1, width=x, height=x, center=(0, 0)) fake_scene = make_fake_scene( {"dragon_top_height": (dat := xr.DataArray( dims=("y", "x"), - data=da.zeros(shape=(256, 256)))), + data=da.arange(x*x).reshape(x, x))), "penguin_bottom_height": dat, "kraken_depth": dat}, daskify=True, @@ -711,9 +713,11 @@ def testlog(obj, job, fmat_config): filename = fmat_config["filename"] # ensure computation has indeed completed and file was flushed p = pathlib.Path(filename) - logger.info(f"Wrote {filename} successfully, 131505 bytes") + logger.info(f"Wrote {filename} successfully, {p.stat().st_size:d} bytes") assert p.exists() - assert p.stat().st_size == 131505 + with rasterio.open(filename) as src: + arr = src.read(1) + assert arr[5, 5] == 142 return obj form = [ From b1567d9712c554263d5f57b7760dc3196b2fe814 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 17 Nov 2022 10:31:55 +0100 Subject: [PATCH 21/31] Apply callback once per set of targets Apply the callbacks once per set of targets sharing a file. This requires https://github.com/pytroll/satpy/pull/2281 --- trollflow2/plugins/__init__.py | 153 +++++++++++++++++++++------- trollflow2/tests/test_trollflow2.py | 6 +- 2 files changed, 118 insertions(+), 41 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index c8423d63..459566f6 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -21,6 +21,7 @@ """Trollflow2 plugins.""" +import collections.abc import os import pathlib from contextlib import contextmanager, suppress @@ -34,6 +35,7 @@ import dpath.util import rasterio import dask +import dask.array as da from dask.delayed import Delayed from posttroll.message import Message from posttroll.publisher import Publisher, NoisyPublisher @@ -43,7 +45,7 @@ from rasterio.enums import Resampling from satpy import Scene from satpy.resample import get_area_def -from satpy.writers import compute_writer_results +from satpy.writers import compute_writer_results, group_results_by_output_file from satpy.version import version as satpy_version from pyresample.geometry import get_geostationary_bounding_box from trollflow2.dict_tools import get_config_value, plist_iter @@ -290,21 +292,25 @@ def save_datasets(job): The product list may contain a ``call_on_done`` parameter. This parameter has effect if and only if ``eager_writing`` is False - (which is the default). It should contain a list of references to - callabales. Upon computation time, each callable will be called - with three arguments: the result of ``save_dataset``, the full - job dictionary, and the dictionary describing the format config and - output filename that was written. The callables must return again the - ``save_dataset`` return value again. This callback could be used, for - example, to ship products as soon as they are successfully produced. - Three callback functions are provided with trollflow2: - :func:`callback_log`, :func:`callback_move`, and :func:`callback_close`. - If using the geotiff writer, :func:`callback_close` should be used before - the others for correct results. When using :func:`callback_move`, the - user must also set ``early_moving`` to True and use a ``staging_zone``. - If both are used, :func:`callback_log` must be called AFTER - :func:`callback_move`, because :func:`callback_log` searches for the final - destination of the file and reports the size (so it accesses the metadata). + (which is the default). It should contain a list of references + to callables. Upon computation time, each callable will be called + with five arguments: the result of ``save_dataset``, sources (if + applicable), targets (if applicable), the full job dictionary, and + the dictionary describing the format config and output filename + that was written. The parameters sources and targets are set to + None if using a writer where :meth:`~satpy.Scene.save_datasets` + does not return those. The callables must return again the + ``save_dataset`` return value (possibly altered). This callback + could be used, for example, to ship products as soon as they are + successfully produced. Three callback functions are provided + with trollflow2: :func:`callback_log`, :func:`callback_move`, and + :func:`callback_close`. If using the geotiff or ninjogeotiff writers, + :func:`callback_close` should be used before the others for correct + results. When using :func:`callback_move`, the user must also set + ``early_moving`` to True and use a ``staging_zone``. If both are + used, :func:`callback_log` must be called AFTER :func:`callback_move`, + because :func:`callback_log` searches for the final destination of + the file and reports the size (so it accesses the metadata). Other arguments defined in the job list (either directly under ``product_list``, or under ``formats``) are passed on to the satpy writer. The @@ -325,21 +331,97 @@ def save_datasets(job): callbacks = None with renamed_files(not early_moving) as renames: for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config): - if callbacks: - obj = callbacks[0]( - save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing), - job, fmat_config) - for callback in callbacks[1:]: - obj = callback(obj, job, fmat_config) - else: - obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) - if obj is not None: - objs.append(obj) + late_saver = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) + late_saver = _maybe_apply_callbacks(late_saver, callbacks, job, fmat_config) + if late_saver is not None: + objs.append(late_saver) job['produced_files'].put(fmat_config['filename']) if not eager_writing: compute_writer_results(objs) +def _maybe_apply_callbacks(late_saver, callbacks, *args): + """Maybe apply callbacks. + + If we are using callbacks via the ``call_on_done`` parameter, wrap + ``late_saver`` with those iteratively. If not, return ``late_saver`` as is. + Here, ``late_saver`` is whatever :meth:`satpy.Scene.save_datasets` + returns. + """ + if callbacks is None: + return late_saver + if isinstance(late_saver, Delayed): + return _apply_callbacks_to_delayed(late_saver, callbacks, None, None, *args) + if isinstance(late_saver, collections.abc.Sequence) and len(late_saver) == 2: + if isinstance(late_saver[0], collections.abc.Sequence): + return _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args) + return _apply_callbacks_to_source_and_target(late_saver, callbacks, *args) + raise ValueError( + "Unrecognised return value type from ``save_datasets``, " + "don't know how to apply wrappers.") + + +def _apply_callbacks_to_delayed(delayed, callbacks, *args): + """Recursively apply the callbacks to the delayed object. + + Args: + delayed: dask Delayed object to which callbacks are applied + callbacks: list of dask Delayed objects to apply + *args: remaining arguments passed to callbacks + + Returns: + delayed type with callbacks applied + """ + delayed = callbacks[0](delayed, *args) + for callback in callbacks[1:]: + delayed = callback(delayed, *args) + return delayed + + +def _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args): + """Apply callbacks to multiple sources/targets pairs. + + Taking source/target pairs such as returned by + :meth:`satpy.Scene.save_datasets`, split those by file and turn them all in + delayed types by calling :func:`dask.array.store`, then apply callbacks. + + Args: + late_saver: tuple of ``(sources, targets)`` such as may be returned + by :meth:`satpy.Scene.save_datasets`. + callbacks: list of dask Delayed objects to apply + *args: remaining arguments passed to callbacks + + Returns: + list of delayed types + """ + delayeds = [] + for (src, targ) in group_results_by_output_file(*late_saver): + delayed = da.store(src, targ, compute=False) + delayeds.append(_apply_callbacks_to_delayed(delayed, callbacks, src, targ, *args)) + return delayeds + + +def _apply_callbacks_to_source_and_target(late_saver, callbacks, *args): + """Apply callbacks to single source/target pairs. + + Taking a single source/target pair such as may be returned by + :meth:`satpy.Scene.save_datasets`, turn this into a delayed type + type by calling :func:`dask.array.store`, then apply callbacks. + + Args: + late_saver: tuple of ``(source, target)`` such as may be returned + by :meth:`satpy.Scene.save_datasets`. + callbacks: list of dask Delayed objects to apply + *args: remaining arguments passed to callbacks + + Returns: + delayed types + """ + (src, targ) = late_saver + delayed = da.store(src, targ, compute=False) + return _apply_callbacks_to_delayed(delayed, callbacks, [src], [targ], *args) + + def product_missing_from_scene(product, scene): """Check if product is missing from the scene.""" if not isinstance(product, (tuple, list)): @@ -928,7 +1010,7 @@ def _product_meets_min_valid_data_fraction( return True -def callback_log(obj, job, fmat_config): +def callback_log(obj, srcs, targs, job, fmat_config): """Logging callback for save_datasets call_on_done. Callback function that can be used with the :func:`save_datasets` @@ -947,7 +1029,7 @@ def callback_log(obj, job, fmat_config): return obj -def callback_move(obj, job, fmat_config): +def callback_move(obj, srcs, targs, job, fmat_config): """Mover callback for save_datasets call_on_done. Callback function that can be used with the :func:`save_datasets` @@ -962,8 +1044,6 @@ def callback_move(obj, job, fmat_config): of the file, not the temporary one. """ - # due to early moving, I've already changed the filename to the destination - # reverse logic also destfile = pathlib.Path(fmat_config["filename"]) srcdir = pathlib.Path(job["product_list"]["product_list"]["staging_zone"]) srcfile = srcdir / destfile.name @@ -972,7 +1052,7 @@ def callback_move(obj, job, fmat_config): return obj -def callback_close(obj, job, fmat_config): +def callback_close(obj, srcs, targs, job, fmat_config): """Callback closing files where needed. When using callbacks with writers that return a ``(src, target)`` pair for @@ -985,10 +1065,7 @@ def callback_close(obj, job, fmat_config): If passed a ``dask.Delayed`` object, this callback does nothing. If passed a ``(src, targ)`` pair, it closes the target. """ - if isinstance(obj, Delayed): - return - try: - obj[1].close() - except AttributeError: - for ob in obj[1]: - ob.close() + if targs: + for targ in targs: + targ.close() + return obj diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index f06f7b9f..ce33b683 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -708,7 +708,7 @@ def test_save_datasets_callback(tmp_path, caplog, fake_scene): logger = logging.getLogger("testlogger") - def testlog(obj, job, fmat_config): + def testlog(obj, srcs, targs, job, fmat_config): """Toy function doing some logging""" filename = fmat_config["filename"] # ensure computation has indeed completed and file was flushed @@ -2156,7 +2156,7 @@ def test_callback_log(caplog, tmp_path): fp.write("x" * 10) obj = object() with caplog.at_level(logging.INFO): - res = callback_log(obj, {}, {"filename": os.fspath(srcfile)}) + res = callback_log(obj, None, None, {}, {"filename": os.fspath(srcfile)}) assert res is obj assert f"Wrote {srcfile!s} successfully, total 10 bytes." in caplog.text @@ -2178,7 +2178,7 @@ def test_callback_move(caplog, tmp_path): "output_dir": os.fspath(destdir)}}} fname_config = {"filename": os.fspath(destfile)} with caplog.at_level(logging.DEBUG): - res = callback_move(obj, job, fname_config) + res = callback_move(obj, None, None, job, fname_config) assert res is obj assert not srcfile.exists() assert destfile.exists() From ab88f83f7cf930770871e0524aa7e91967a22a99 Mon Sep 17 00:00:00 2001 From: stickler-ci Date: Thu, 17 Nov 2022 09:52:29 +0000 Subject: [PATCH 22/31] Fixing style errors. --- trollflow2/plugins/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 459566f6..796b3afe 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -357,8 +357,8 @@ def _maybe_apply_callbacks(late_saver, callbacks, *args): return _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args) return _apply_callbacks_to_source_and_target(late_saver, callbacks, *args) raise ValueError( - "Unrecognised return value type from ``save_datasets``, " - "don't know how to apply wrappers.") + "Unrecognised return value type from ``save_datasets``, " + "don't know how to apply wrappers.") def _apply_callbacks_to_delayed(delayed, callbacks, *args): From a9608ffbb13fdfc2970afb59b7810dd8b94d7507 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 18 Nov 2022 19:51:12 +0100 Subject: [PATCH 23/31] Do not pass src to callbacks Do not pass srcs to callbacks. This seems to seriously mess up the dask graph. Let's see if this improves the performance now... --- trollflow2/plugins/__init__.py | 10 +++++----- trollflow2/tests/test_trollflow2.py | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 796b3afe..08eb1955 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -397,7 +397,7 @@ def _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args): delayeds = [] for (src, targ) in group_results_by_output_file(*late_saver): delayed = da.store(src, targ, compute=False) - delayeds.append(_apply_callbacks_to_delayed(delayed, callbacks, src, targ, *args)) + delayeds.append(_apply_callbacks_to_delayed(delayed, callbacks, targ, *args)) return delayeds @@ -419,7 +419,7 @@ def _apply_callbacks_to_source_and_target(late_saver, callbacks, *args): """ (src, targ) = late_saver delayed = da.store(src, targ, compute=False) - return _apply_callbacks_to_delayed(delayed, callbacks, [src], [targ], *args) + return _apply_callbacks_to_delayed(delayed, callbacks, [targ], *args) def product_missing_from_scene(product, scene): @@ -1010,7 +1010,7 @@ def _product_meets_min_valid_data_fraction( return True -def callback_log(obj, srcs, targs, job, fmat_config): +def callback_log(obj, targs, job, fmat_config): """Logging callback for save_datasets call_on_done. Callback function that can be used with the :func:`save_datasets` @@ -1029,7 +1029,7 @@ def callback_log(obj, srcs, targs, job, fmat_config): return obj -def callback_move(obj, srcs, targs, job, fmat_config): +def callback_move(obj, targs, job, fmat_config): """Mover callback for save_datasets call_on_done. Callback function that can be used with the :func:`save_datasets` @@ -1052,7 +1052,7 @@ def callback_move(obj, srcs, targs, job, fmat_config): return obj -def callback_close(obj, srcs, targs, job, fmat_config): +def callback_close(obj, targs, job, fmat_config): """Callback closing files where needed. When using callbacks with writers that return a ``(src, target)`` pair for diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index ce33b683..53a83658 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -708,7 +708,7 @@ def test_save_datasets_callback(tmp_path, caplog, fake_scene): logger = logging.getLogger("testlogger") - def testlog(obj, srcs, targs, job, fmat_config): + def testlog(obj, targs, job, fmat_config): """Toy function doing some logging""" filename = fmat_config["filename"] # ensure computation has indeed completed and file was flushed @@ -2156,7 +2156,7 @@ def test_callback_log(caplog, tmp_path): fp.write("x" * 10) obj = object() with caplog.at_level(logging.INFO): - res = callback_log(obj, None, None, {}, {"filename": os.fspath(srcfile)}) + res = callback_log(obj, None, {}, {"filename": os.fspath(srcfile)}) assert res is obj assert f"Wrote {srcfile!s} successfully, total 10 bytes." in caplog.text @@ -2178,7 +2178,7 @@ def test_callback_move(caplog, tmp_path): "output_dir": os.fspath(destdir)}}} fname_config = {"filename": os.fspath(destfile)} with caplog.at_level(logging.DEBUG): - res = callback_move(obj, None, None, job, fname_config) + res = callback_move(obj, None, job, fname_config) assert res is obj assert not srcfile.exists() assert destfile.exists() From a833153f36fee38778a3a861a1bc9e0af7e847b2 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Mon, 6 Feb 2023 18:46:58 +0100 Subject: [PATCH 24/31] Add missing import Add an import that somehow went missing, perhaps while resolving the merge conflict. --- trollflow2/plugins/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 88e106b4..3f1e4657 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -20,6 +20,7 @@ import collections.abc import os +import pathlib from contextlib import contextmanager, suppress from logging import getLogger from tempfile import NamedTemporaryFile From 4fdce8dc772eb7d8d9a5d0e53e98b6dbd2aa8d1c Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 17 Feb 2023 10:07:12 +0100 Subject: [PATCH 25/31] add early_moving to example playlist --- examples/pl.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/pl.yaml b/examples/pl.yaml index aef48869..b5689dae 100644 --- a/examples/pl.yaml +++ b/examples/pl.yaml @@ -70,6 +70,7 @@ product_list: &product_list # - !!python/name:trollflow2.plugins.callback_close # - !!python/name:trollflow2.plugins.callback_move # - !!python/name:trollflow2.plugins.callback_log + # early_moving: True # must be set with callback_move; see docs for details areas: omerc_bb: From 7a6ac4a99b8751d2e1807053f6b2242b5de32b5d Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Tue, 28 Feb 2023 13:11:08 +0100 Subject: [PATCH 26/31] Doc fix: four, not five arguments Fix documentation for save_datasets: the callbacks pass four, not five, parameters. --- trollflow2/plugins/__init__.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 3f1e4657..6253ee2d 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -294,13 +294,13 @@ def save_datasets(job): The product list may contain a ``call_on_done`` parameter. This parameter has effect if and only if ``eager_writing`` is False (which is the default). It should contain a list of references - to callables. Upon computation time, each callable will be called - with five arguments: the result of ``save_dataset``, sources (if - applicable), targets (if applicable), the full job dictionary, and - the dictionary describing the format config and output filename - that was written. The parameters sources and targets are set to - None if using a writer where :meth:`~satpy.Scene.save_datasets` - does not return those. The callables must return again the + to callables. Upon computation time, each callable will be + called with four arguments: the result of ``save_dataset``, + targets (if applicable), the full job dictionary, and the + dictionary describing the format config and output filename + that was written. The parameter ``targets`` is set to None + if using a writer where :meth:`~satpy.Scene.save_datasets` + does not return this. The callables must return again the ``save_dataset`` return value (possibly altered). This callback could be used, for example, to ship products as soon as they are successfully produced. Three callback functions are provided From 78b646f2a438422c5c4ac87f6f8d2b0314418545 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 9 Mar 2023 14:53:02 +0200 Subject: [PATCH 27/31] Fix logger names --- trollflow2/plugins/__init__.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 3de8d116..fa77c956 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -31,10 +31,9 @@ import hdf5plugin # noqa import dask +import dask.array as da import dpath.util import rasterio -import dask -import dask.array as da from dask.delayed import Delayed from posttroll.message import Message from posttroll.publisher import create_publisher_from_dict_config @@ -45,9 +44,8 @@ from rasterio.enums import Resampling from satpy import Scene from satpy.resample import get_area_def -from satpy.writers import compute_writer_results, group_results_by_output_file from satpy.version import version as satpy_version -from satpy.writers import compute_writer_results +from satpy.writers import compute_writer_results, group_results_by_output_file from trollsift import compose from trollflow2.dict_tools import get_config_value, plist_iter @@ -1042,7 +1040,7 @@ def _product_meets_min_valid_data_fraction( def callback_log(obj, targs, job, fmat_config): - """Logging callback for save_datasets call_on_done. + """Log written files as callback for save_datasets call_on_done. Callback function that can be used with the :func:`save_datasets` ``call_on_done`` functionality. Will log a message with loglevel INFO to @@ -1053,15 +1051,14 @@ def callback_log(obj, targs, job, fmat_config): :func:`callback_move`, because the logger looks for the final destination of the file, not the temporary one. """ - filename = fmat_config["filename"] size = os.path.getsize(filename) - LOG.info(f"Wrote {filename:s} successfully, total {size:d} bytes.") + logger.info(f"Wrote {filename:s} successfully, total {size:d} bytes.") return obj def callback_move(obj, targs, job, fmat_config): - """Mover callback for save_datasets call_on_done. + """Move files as a callback by save_datasets call_on_done. Callback function that can be used with the :func:`save_datasets` ``call_on_done`` functionality. Moves the file to the directory indicated @@ -1074,17 +1071,16 @@ def callback_move(obj, targs, job, fmat_config): :func:`callback_move`, because the logger looks for the final destination of the file, not the temporary one. """ - destfile = pathlib.Path(fmat_config["filename"]) srcdir = pathlib.Path(job["product_list"]["product_list"]["staging_zone"]) srcfile = srcdir / destfile.name - LOG.debug(f"Moving {srcfile!s} to {destfile!s}") + logger.debug(f"Moving {srcfile!s} to {destfile!s}") srcfile.rename(destfile) return obj def callback_close(obj, targs, job, fmat_config): - """Callback closing files where needed. + """Close files as a callback where needed. When using callbacks with writers that return a ``(src, target)`` pair for ``da.store``, satpy doesn't close the file until after computation is From 1f56dd1f6f279aec198118f7cb3d4f0a02aadebf Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 10 Mar 2023 11:44:35 +0100 Subject: [PATCH 28/31] Add fix and test for delayed-based collback Add a test for a callback when the writer returns a delayed-object, and fix a bug exposed by this test. Avoid duplicate documentation, just refer from one place to the other. --- trollflow2/plugins/__init__.py | 23 +++++++++-------------- trollflow2/tests/test_trollflow2.py | 3 ++- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index fa77c956..93c96594 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -309,15 +309,10 @@ def save_datasets(job): does not return this. The callables must return again the ``save_dataset`` return value (possibly altered). This callback could be used, for example, to ship products as soon as they are - successfully produced. Three callback functions are provided - with trollflow2: :func:`callback_log`, :func:`callback_move`, and - :func:`callback_close`. If using the geotiff or ninjogeotiff writers, - :func:`callback_close` should be used before the others for correct - results. When using :func:`callback_move`, the user must also set - ``early_moving`` to True and use a ``staging_zone``. If both are - used, :func:`callback_log` must be called AFTER :func:`callback_move`, - because :func:`callback_log` searches for the final destination of - the file and reports the size (so it accesses the metadata). + successfully produced. + + Three built-in are provided with Trollflow2: :func:`callback_close`, + :func:`callback_move` and :func:`callback_log`. Other arguments defined in the job list (either directly under ``product_list``, or under ``formats``) are passed on to the satpy writer. The @@ -339,7 +334,7 @@ def save_datasets(job): with renamed_files(not early_moving) as renames: for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config): late_saver = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) - late_saver = _maybe_apply_callbacks(late_saver, callbacks, job, fmat_config) + late_saver = _apply_callbacks(late_saver, callbacks, job, fmat_config) if late_saver is not None: objs.append(late_saver) job['produced_files'].put(fmat_config['filename']) @@ -347,8 +342,8 @@ def save_datasets(job): compute_writer_results(objs) -def _maybe_apply_callbacks(late_saver, callbacks, *args): - """Maybe apply callbacks. +def _apply_callbacks(late_saver, callbacks, *args): + """Apply callbacks if there are any. If we are using callbacks via the ``call_on_done`` parameter, wrap ``late_saver`` with those iteratively. If not, return ``late_saver`` as is. @@ -358,7 +353,7 @@ def _maybe_apply_callbacks(late_saver, callbacks, *args): if callbacks is None: return late_saver if isinstance(late_saver, Delayed): - return _apply_callbacks_to_delayed(late_saver, callbacks, None, None, *args) + return _apply_callbacks_to_delayed(late_saver, callbacks, None, *args) if isinstance(late_saver, collections.abc.Sequence) and len(late_saver) == 2: if isinstance(late_saver[0], collections.abc.Sequence): return _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args) @@ -1044,7 +1039,7 @@ def callback_log(obj, targs, job, fmat_config): Callback function that can be used with the :func:`save_datasets` ``call_on_done`` functionality. Will log a message with loglevel INFO to - report that the filename was written successfully. + report that the filename was written successfully along with its size. If using :func:`callback_move` in combination with :func:`callback_log`, you must call :func:`callback_log` AFTER diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index e03bddcd..9515ae37 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -725,7 +725,8 @@ def testlog(obj, targs, job, fmat_config): {"writer": "ninjogeotiff", "compress": "NONE", "ChannelID": "IR -2+3i", "DataType": "ABCD", "PhysicUnit": "K", "PhysicValue": "Temperature", - "SatelliteNameID": "PytrollSat", "fill_value": 0} + "SatelliteNameID": "PytrollSat", "fill_value": 0}, + {"writer": "simple_image", "format": "png"}, ] product_list = { From e350c105f9bd57ff0a00c018b0f1a95e7c231238 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 10 Mar 2023 16:11:19 +0100 Subject: [PATCH 29/31] Rename private function source/s target/s Rename the private functions for applying to either single or multiple sources and targets. --- trollflow2/plugins/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 93c96594..23bf975b 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -356,8 +356,8 @@ def _apply_callbacks(late_saver, callbacks, *args): return _apply_callbacks_to_delayed(late_saver, callbacks, None, *args) if isinstance(late_saver, collections.abc.Sequence) and len(late_saver) == 2: if isinstance(late_saver[0], collections.abc.Sequence): - return _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args) - return _apply_callbacks_to_source_and_target(late_saver, callbacks, *args) + return _apply_callbacks_to_multiple_sources_and_targets(late_saver, callbacks, *args) + return _apply_callbacks_to_single_source_and_target(late_saver, callbacks, *args) raise ValueError( "Unrecognised return value type from ``save_datasets``, " "don't know how to apply wrappers.") @@ -380,7 +380,7 @@ def _apply_callbacks_to_delayed(delayed, callbacks, *args): return delayed -def _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args): +def _apply_callbacks_to_multiple_sources_and_targets(late_saver, callbacks, *args): """Apply callbacks to multiple sources/targets pairs. Taking source/target pairs such as returned by @@ -403,7 +403,7 @@ def _apply_callbacks_to_sources_and_targets(late_saver, callbacks, *args): return delayeds -def _apply_callbacks_to_source_and_target(late_saver, callbacks, *args): +def _apply_callbacks_to_single_source_and_target(late_saver, callbacks, *args): """Apply callbacks to single source/target pairs. Taking a single source/target pair such as may be returned by From 8a37c434d5e2fb15dd110bd07d710a49d45586d8 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 10 Mar 2023 16:22:38 +0100 Subject: [PATCH 30/31] Change conditional context manager logic Change the logic for the conditional context maneger, by using a nullcontext if we are not renaming files. --- trollflow2/plugins/__init__.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 23bf975b..cd65928a 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -22,7 +22,7 @@ import datetime as dt import os import pathlib -from contextlib import contextmanager, suppress +from contextlib import contextmanager, suppress, nullcontext from logging import getLogger from tempfile import NamedTemporaryFile from urllib.parse import urlsplit, urlunsplit @@ -264,17 +264,16 @@ def _create_data_query(product, res): @contextmanager -def renamed_files(do_renames): +def renamed_files(): """Context renaming files.""" renames = {} yield renames - if do_renames: - for tmp_name, actual_name in renames.items(): - target_scheme = urlsplit(actual_name).scheme - if target_scheme in ('', 'file'): - os.rename(tmp_name, actual_name) + for tmp_name, actual_name in renames.items(): + target_scheme = urlsplit(actual_name).scheme + if target_scheme in ('', 'file'): + os.rename(tmp_name, actual_name) def save_datasets(job): @@ -331,7 +330,11 @@ def save_datasets(job): callbacks = [dask.delayed(c) for c in call_on_done] else: callbacks = None - with renamed_files(not early_moving) as renames: + if early_moving: + cm = nullcontext({}) + else: + cm = renamed_files() + with cm as renames: for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config): late_saver = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) late_saver = _apply_callbacks(late_saver, callbacks, job, fmat_config) From 6bd02187b37fa9a738041ecefe9a12ff5e9cca0e Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 31 Aug 2023 08:25:06 +0200 Subject: [PATCH 31/31] Use None as a sentinel Use None rather than object() as a sentinel value. --- trollflow2/plugins/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 911952f9..65cc287b 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -345,9 +345,8 @@ def save_datasets(job): base_config.pop('dataset', None) eager_writing = job['product_list']['product_list'].get("eager_writing", False) early_moving = job['product_list']['product_list'].get("early_moving", False) - sentinel = object() - call_on_done = job["product_list"]["product_list"].get("call_on_done", sentinel) - if call_on_done is not sentinel: + call_on_done = job["product_list"]["product_list"].get("call_on_done", None) + if call_on_done is not None: callbacks = [dask.delayed(c) for c in call_on_done] else: callbacks = None