diff --git a/examples/pl.yaml b/examples/pl.yaml index ee840039..b5689dae 100644 --- a/examples/pl.yaml +++ b/examples/pl.yaml @@ -66,6 +66,11 @@ product_list: &product_list # # pass extra keyword arguments to Scene.load # scene_load_kwargs: # upper_right_corner: "NE" + # call_on_done: + # - !!python/name:trollflow2.plugins.callback_close + # - !!python/name:trollflow2.plugins.callback_move + # - !!python/name:trollflow2.plugins.callback_log + # early_moving: True # must be set with callback_move; see docs for details areas: omerc_bb: diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index f9fa7193..65cc287b 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -18,10 +18,12 @@ """Trollflow2 plugins.""" +import collections.abc import copy import datetime as dt import os -from contextlib import contextmanager, suppress +import pathlib +from contextlib import contextmanager, suppress, nullcontext from logging import getLogger from tempfile import NamedTemporaryFile from urllib.parse import urlsplit, urlunsplit @@ -30,8 +32,10 @@ import hdf5plugin # noqa import dask +import dask.array as da import dpath.util import rasterio +from dask.delayed import Delayed from posttroll.message import Message from posttroll.publisher import create_publisher_from_dict_config from pyorbital.astronomy import sun_zenith_angle @@ -42,7 +46,7 @@ from satpy import Scene from satpy.resample import get_area_def from satpy.version import version as satpy_version -from satpy.writers import compute_writer_results +from satpy.writers import compute_writer_results, group_results_by_output_file from trollsift import compose from trollflow2.dict_tools import get_config_value, plist_iter @@ -313,6 +317,23 @@ def save_datasets(job): ``staging_zone`` directory, such that the filename written to the headers remains meaningful. + The product list may contain a ``call_on_done`` parameter. + This parameter has effect if and only if ``eager_writing`` is False + (which is the default). It should contain a list of references + to callables. Upon computation time, each callable will be + called with four arguments: the result of ``save_dataset``, + targets (if applicable), the full job dictionary, and the + dictionary describing the format config and output filename + that was written. The parameter ``targets`` is set to None + if using a writer where :meth:`~satpy.Scene.save_datasets` + does not return this. The callables must return again the + ``save_dataset`` return value (possibly altered). This callback + could be used, for example, to ship products as soon as they are + successfully produced. + + Three built-in are provided with Trollflow2: :func:`callback_close`, + :func:`callback_move` and :func:`callback_log`. + Other arguments defined in the job list (either directly under ``product_list``, or under ``formats``) are passed on to the satpy writer. The arguments ``use_tmp_file``, ``staging_zone``, ``output_dir``, @@ -323,16 +344,109 @@ def save_datasets(job): base_config = job['input_mda'].copy() base_config.pop('dataset', None) eager_writing = job['product_list']['product_list'].get("eager_writing", False) - with renamed_files() as renames: + early_moving = job['product_list']['product_list'].get("early_moving", False) + call_on_done = job["product_list"]["product_list"].get("call_on_done", None) + if call_on_done is not None: + callbacks = [dask.delayed(c) for c in call_on_done] + else: + callbacks = None + if early_moving: + cm = nullcontext({}) + else: + cm = renamed_files() + with cm as renames: for fmat, fmat_config in plist_iter(job['product_list']['product_list'], base_config): - obj = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) - if obj is not None: - objs.append(obj) + late_saver = save_dataset(scns, fmat, fmat_config, renames, compute=eager_writing) + late_saver = _apply_callbacks(late_saver, callbacks, job, fmat_config) + if late_saver is not None: + objs.append(late_saver) job['produced_files'].put(fmat_config['filename']) if not eager_writing: compute_writer_results(objs) +def _apply_callbacks(late_saver, callbacks, *args): + """Apply callbacks if there are any. + + If we are using callbacks via the ``call_on_done`` parameter, wrap + ``late_saver`` with those iteratively. If not, return ``late_saver`` as is. + Here, ``late_saver`` is whatever :meth:`satpy.Scene.save_datasets` + returns. + """ + if callbacks is None: + return late_saver + if isinstance(late_saver, Delayed): + return _apply_callbacks_to_delayed(late_saver, callbacks, None, *args) + if isinstance(late_saver, collections.abc.Sequence) and len(late_saver) == 2: + if isinstance(late_saver[0], collections.abc.Sequence): + return _apply_callbacks_to_multiple_sources_and_targets(late_saver, callbacks, *args) + return _apply_callbacks_to_single_source_and_target(late_saver, callbacks, *args) + raise ValueError( + "Unrecognised return value type from ``save_datasets``, " + "don't know how to apply wrappers.") + + +def _apply_callbacks_to_delayed(delayed, callbacks, *args): + """Recursively apply the callbacks to the delayed object. + + Args: + delayed: dask Delayed object to which callbacks are applied + callbacks: list of dask Delayed objects to apply + *args: remaining arguments passed to callbacks + + Returns: + delayed type with callbacks applied + """ + delayed = callbacks[0](delayed, *args) + for callback in callbacks[1:]: + delayed = callback(delayed, *args) + return delayed + + +def _apply_callbacks_to_multiple_sources_and_targets(late_saver, callbacks, *args): + """Apply callbacks to multiple sources/targets pairs. + + Taking source/target pairs such as returned by + :meth:`satpy.Scene.save_datasets`, split those by file and turn them all in + delayed types by calling :func:`dask.array.store`, then apply callbacks. + + Args: + late_saver: tuple of ``(sources, targets)`` such as may be returned + by :meth:`satpy.Scene.save_datasets`. + callbacks: list of dask Delayed objects to apply + *args: remaining arguments passed to callbacks + + Returns: + list of delayed types + """ + delayeds = [] + for (src, targ) in group_results_by_output_file(*late_saver): + delayed = da.store(src, targ, compute=False) + delayeds.append(_apply_callbacks_to_delayed(delayed, callbacks, targ, *args)) + return delayeds + + +def _apply_callbacks_to_single_source_and_target(late_saver, callbacks, *args): + """Apply callbacks to single source/target pairs. + + Taking a single source/target pair such as may be returned by + :meth:`satpy.Scene.save_datasets`, turn this into a delayed type + type by calling :func:`dask.array.store`, then apply callbacks. + + Args: + late_saver: tuple of ``(source, target)`` such as may be returned + by :meth:`satpy.Scene.save_datasets`. + callbacks: list of dask Delayed objects to apply + *args: remaining arguments passed to callbacks + + Returns: + delayed types + """ + (src, targ) = late_saver + delayed = da.store(src, targ, compute=False) + return _apply_callbacks_to_delayed(delayed, callbacks, [targ], *args) + + def product_missing_from_scene(product, scene): """Check if product is missing from the scene.""" if not isinstance(product, (tuple, list)): @@ -941,3 +1055,62 @@ def _product_meets_min_valid_data_fraction( logger.debug(f"Found {rel_valid:%}>{min_frac:%}, keeping " f"{prod_name:s} for area {area_name:s} in the worklist") return True + + +def callback_log(obj, targs, job, fmat_config): + """Log written files as callback for save_datasets call_on_done. + + Callback function that can be used with the :func:`save_datasets` + ``call_on_done`` functionality. Will log a message with loglevel INFO to + report that the filename was written successfully along with its size. + + If using :func:`callback_move` in combination with + :func:`callback_log`, you must call :func:`callback_log` AFTER + :func:`callback_move`, because the logger looks for the final + destination of the file, not the temporary one. + """ + filename = fmat_config["filename"] + size = os.path.getsize(filename) + logger.info(f"Wrote {filename:s} successfully, total {size:d} bytes.") + return obj + + +def callback_move(obj, targs, job, fmat_config): + """Move files as a callback by save_datasets call_on_done. + + Callback function that can be used with the :func:`save_datasets` + ``call_on_done`` functionality. Moves the file to the directory indicated + with ``output_dir`` in the configuration. This directory will be + created if needed. + + This callback must be used with ``staging_zone`` and ``early_moving`` MUST + be set in the configuration. If used in combination with + :func:`callback_log`, you must call :func:`callback_log` AFTER + :func:`callback_move`, because the logger looks for the final destination + of the file, not the temporary one. + """ + destfile = pathlib.Path(fmat_config["filename"]) + srcdir = pathlib.Path(job["product_list"]["product_list"]["staging_zone"]) + srcfile = srcdir / destfile.name + logger.debug(f"Moving {srcfile!s} to {destfile!s}") + srcfile.rename(destfile) + return obj + + +def callback_close(obj, targs, job, fmat_config): + """Close files as a callback where needed. + + When using callbacks with writers that return a ``(src, target)`` pair for + ``da.store``, satpy doesn't close the file until after computation is + completed. That means there may be data that have been computed, but not + yet written to disk. This is normally the case for the geotiff writer. + For callbacks that depend on the files to be complete, the file should be + closed first. This callback should be prepended in this case. + + If passed a ``dask.Delayed`` object, this callback does nothing. If passed + a ``(src, targ)`` pair, it closes the target. + """ + if targs: + for targ in targs: + targ.close() + return obj diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 0f94df44..5036b4be 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -27,12 +27,18 @@ import logging import os import unittest +import pathlib +import queue from functools import partial from unittest import mock import numpy as np import pytest -from pyresample.geometry import DynamicAreaDefinition +from pyresample.geometry import DynamicAreaDefinition, create_area_def +import rasterio + +import dask.array as da +import xarray as xr from trollflow2.launcher import read_config from trollflow2.tests.utils import TestCase, create_filenames_and_topics @@ -608,7 +614,7 @@ def test_pop_unknown_args(self): assert "PhysicUnit" in job["resampled_scenes"]["euron1"].mock_calls[0].kwargs.keys() for absent in {"use_tmp_file", "staging_zone", "output_dir", - "fname_pattern", "dispatch"}: + "fname_pattern", "dispatch", "call_on_done"}: assert absent not in job["resampled_scenes"]["euron1"].mock_calls[0].kwargs.keys() @@ -616,6 +622,7 @@ def _create_job_for_save_datasets(): from yaml import UnsafeLoader job = {} job['input_mda'] = input_mda + job['product_list'] = { 'product_list': read_config(raw_string=yaml_test_save, Loader=UnsafeLoader)['product_list'], } @@ -668,6 +675,133 @@ def test_use_staging_zone_tmpfile(tmp_path): assert next(iter(renames.keys())).startswith(os.fspath(tmp_path)) +@pytest.fixture +def fake_scene(): + """Get a fake scene.""" + from satpy.tests.utils import make_fake_scene + + x = 10 + fake_area = create_area_def("sargasso", 4087, resolution=1, width=x, height=x, center=(0, 0)) + fake_scene = make_fake_scene( + {"dragon_top_height": (dat := xr.DataArray( + dims=("y", "x"), + data=da.arange(x*x).reshape(x, x))), + "penguin_bottom_height": dat, + "kraken_depth": dat}, + daskify=True, + area=fake_area, + common_attrs={"start_time": dt.datetime(1985, 8, 13, 15)}) + + return fake_scene + + +def test_save_datasets_callback(tmp_path, caplog, fake_scene): + """Test callback functionality for save_datasets + + Test that the functionality for a callback after each produced file is + working correctly in the save_datasets plugin. + """ + from trollflow2.plugins import save_datasets, callback_close + + job = {} + job['input_mda'] = input_mda + + logger = logging.getLogger("testlogger") + + def testlog(obj, targs, job, fmat_config): + """Toy function doing some logging""" + filename = fmat_config["filename"] + # ensure computation has indeed completed and file was flushed + p = pathlib.Path(filename) + logger.info(f"Wrote {filename} successfully, {p.stat().st_size:d} bytes") + assert p.exists() + with rasterio.open(filename) as src: + arr = src.read(1) + assert arr[5, 5] == 142 + return obj + + form = [ + {"writer": "geotiff", "compress": "NONE", "fill_value": 0}, + {"writer": "ninjogeotiff", "compress": "NONE", + "ChannelID": "IR -2+3i", "DataType": "ABCD", + "PhysicUnit": "K", "PhysicValue": "Temperature", + "SatelliteNameID": "PytrollSat", "fill_value": 0}, + {"writer": "simple_image", "format": "png"}, + ] + + product_list = { + "fname_pattern": "{productname}-{writer}.tif", + "output_dir": os.fspath(tmp_path / "test"), + "call_on_done": [callback_close, testlog], + "areas": { + "sargasso": { + "products": { + "dragon_top_height": { + "productname": "dragon_top_height", + "formats": copy.deepcopy(form)}, + "penguin_bottom_height": { + "productname": "penguin_bottom_height", + "formats": copy.deepcopy(form)}, + "kraken_depth": { + "productname": "kraken_depth", + "formats": copy.deepcopy(form)}, + } + } + } + } + job["product_list"] = {"product_list": product_list} + job['resampled_scenes'] = {"sargasso": fake_scene} + + job['produced_files'] = mock.MagicMock() + + with caplog.at_level(logging.INFO): + save_datasets(job) + for nm in {"dragon_top_height", "penguin_bottom_height", "kraken_depth"}: + exp = tmp_path / "test" / f"{nm:s}-geotiff.tif" + assert f"Wrote {exp!s} successfully" in caplog.text + + +def test_save_datasets_callback_move_check_products(tmp_path, caplog, fake_scene): + """Test that check_products and the callback move can cooperate. + """ + from trollflow2.plugins import save_datasets, callback_close, callback_move + from trollflow2.launcher import check_results + + job = {} + job['input_mda'] = input_mda + form = [{"writer": "geotiff", "fill_value": 0}] + + product_list = { + "fname_pattern": "{productname}.tif", + "staging_zone": os.fspath(tmp_path / "test1"), + "output_dir": os.fspath(tmp_path / "test2"), + "call_on_done": [callback_close, callback_move], + "early_moving": True, + "areas": { + "sargasso": { + "products": { + "dragon_top_height": { + "productname": "dragon_top_height", + "formats": copy.deepcopy(form)}, + "penguin_bottom_height": { + "productname": "penguin_bottom_height", + "formats": copy.deepcopy(form)}, + "kraken_depth": { + "productname": "kraken_depth", + "formats": copy.deepcopy(form)}, + } + } + } + } + job["product_list"] = {"product_list": product_list} + job['resampled_scenes'] = {"sargasso": fake_scene} + job["produced_files"] = queue.SimpleQueue() + save_datasets(job) + with caplog.at_level(logging.INFO): + check_results(job["produced_files"], dt.datetime(2001, 2, 3, 4, 5, 6), 0) + assert "All 3 files produced nominally" in caplog.text + + class TestCreateScene(TestCase): """Test case for creating a scene.""" @@ -2071,8 +2205,8 @@ def test_persisted(sc_3a_3b): prods[p] = {"min_valid_data_fraction": 40} def fake_persist(*args): - for da in args: - da.attrs["persisted"] = True + for daa in args: + daa.attrs["persisted"] = True return args with mock.patch("dask.persist", new=fake_persist): @@ -2083,6 +2217,43 @@ def fake_persist(*args): assert not sc_3a_3b["another"].attrs.get("persisted") +def test_callback_log(caplog, tmp_path): + """Test callback log functionality.""" + from trollflow2.plugins import callback_log + srcfile = tmp_path / "bouvetøya" + with srcfile.open(mode="w") as fp: + fp.write("x" * 10) + obj = object() + with caplog.at_level(logging.INFO): + res = callback_log(obj, None, {}, {"filename": os.fspath(srcfile)}) + assert res is obj + assert f"Wrote {srcfile!s} successfully, total 10 bytes." in caplog.text + + +def test_callback_move(caplog, tmp_path): + """Test callback move functionality.""" + from trollflow2.plugins import callback_move + obj = object() + srcdir = tmp_path / "src" + srcfile = srcdir / "orkney" + srcdir.mkdir(parents=True, exist_ok=True) + srcfile.touch() + destdir = tmp_path / "dest" + destdir.mkdir(parents=True, exist_ok=True) + destfile = destdir / srcfile.name + job = {"product_list": + {"product_list": + {"staging_zone": os.fspath(srcdir), + "output_dir": os.fspath(destdir)}}} + fname_config = {"filename": os.fspath(destfile)} + with caplog.at_level(logging.DEBUG): + res = callback_move(obj, None, job, fname_config) + assert res is obj + assert not srcfile.exists() + assert destfile.exists() + assert f"Moving {srcfile!s} to {destfile!s}" in caplog.text + + def test_format_decoration(): """Test that decoration text in fmt_config is formated based on fmat.""" import datetime