diff --git a/trollmoves/move_it_base.py b/trollmoves/move_it_base.py index ef4a52f5..ff924838 100644 --- a/trollmoves/move_it_base.py +++ b/trollmoves/move_it_base.py @@ -23,8 +23,10 @@ """Base class for move_it_{client,server,mirror}.""" +import fnmatch import logging import logging.handlers +import os import signal import time from abc import ABC, abstractmethod @@ -45,7 +47,7 @@ def __init__(self, cmd_args, publisher=None): """Initialize the class.""" self.cmd_args = cmd_args self.running = False - self.notifier = None + self.new_config_notifier = None self.watchman = None self.publisher = publisher self.chains = {} @@ -61,7 +63,7 @@ def chains_stop(self, *args): self.running = False try: - self.notifier.stop() + self.new_config_notifier.stop() except RuntimeError as err: LOGGER.warning("Could not stop notifier: %s", err) with suppress(AttributeError): @@ -77,7 +79,7 @@ def setup_watchers(self): config_file = self.cmd_args.config_file reload_function = self.reload_cfg_file - self.notifier = create_notifier_for_file(config_file, reload_function) + self.new_config_notifier = create_notifier_for_file(config_file, reload_function) def run(self): """Start the transfer chains.""" @@ -86,10 +88,11 @@ def run(self): signal.signal(signal.SIGHUP, self.signal_reload_cfg_file) except ValueError: LOGGER.warning("Signals could not be set up.") - self.notifier.start() + self.new_config_notifier.start() self.running = True while self.running: time.sleep(1) + # FIXME: should we use timeout instead? shutting_down = not self.run_lock.acquire(blocking=False) if shutting_down: break @@ -122,16 +125,46 @@ def create_publisher(port, publisher_name): return publisher -class WatchdogChangeHandler(FileSystemEventHandler): - """Trigger processing on filesystem events.""" +class _WatchdogHandler(FileSystemEventHandler): + """Trigger processing on filesystem events, with filename matching.""" - def __init__(self, fun): + def __init__(self, fun, pattern=None): """Initialize the processor.""" super().__init__() self.fun = fun + self.pattern = pattern + + def dispatch(self, event): + """Dispatches events to the appropriate methods.""" + if self.pattern is None: + return super().dispatch(event) + if event.is_directory: + return + if getattr(event, 'dest_path', None): + pathname = os.fsdecode(event.dest_path) + elif event.src_path: + pathname = os.fsdecode(event.src_path) + if fnmatch.fnmatch(pathname, self.pattern): + super().dispatch(event) + + +class WatchdogChangeHandler(_WatchdogHandler): + """Trigger processing on filesystem events that change a file (moving, close (write)).""" def on_closed(self, event): - """Process file creation.""" + """Process file closed.""" + self.fun(event.src_path) + + def on_moved(self, event): + """Process a file being moved to the destination directory.""" + self.fun(event.dest_path) + + +class WatchdogCreationHandler(_WatchdogHandler): + """Trigger processing on filesystem events that create a file (moving, creation).""" + + def on_created(self, event): + """Process file closing.""" self.fun(event.src_path) def on_moved(self, event): diff --git a/trollmoves/server.py b/trollmoves/server.py index ca189402..fe39a298 100644 --- a/trollmoves/server.py +++ b/trollmoves/server.py @@ -46,14 +46,14 @@ from posttroll.publisher import get_own_ip from posttroll.subscriber import Subscribe from trollsift import globify, parse -from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver from zmq import NOBLOCK, POLLIN, PULL, PUSH, ROUTER, Poller, ZMQError from trollmoves.client import DEFAULT_REQ_TIMEOUT from trollmoves.logging import add_logging_options_to_parser -from trollmoves.move_it_base import MoveItBase, create_publisher +from trollmoves.move_it_base import (MoveItBase, WatchdogChangeHandler, + WatchdogCreationHandler, create_publisher) from trollmoves.movers import move_it from trollmoves.utils import (clean_url, gen_dict_contains, gen_dict_extract, is_file_local) @@ -748,21 +748,22 @@ def _get_notifier_builder(use_polling, chain_config): def create_watchdog_polling_notifier(pattern, function_to_run_on_matching_files, timeout=1.0): """Create a notifier from the specified configuration attributes *attrs*.""" observer_class = partial(PollingObserver, timeout=timeout) - return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class) + handler_class = WatchdogCreationHandler + return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class, handler_class) def create_watchdog_os_notifier(pattern, function_to_run_on_matching_files, timeout=1.0): """Create a notifier from the specified configuration attributes *attrs*.""" - observer_class = partial(Observer, timeout=timeout) - return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class) + observer_class = partial(Observer, timeout=timeout, generate_full_events=True) + handler_class = WatchdogChangeHandler + return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class, handler_class) -def create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class): +def create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class, handler_class): """Create a watchdog notifier.""" opath = os.path.dirname(pattern) observer = observer_class() - use_polling = observer_class.func is PollingObserver - handler = WatchdogCreationHandler(function_to_run_on_matching_files, pattern, use_polling=use_polling) + handler = handler_class(function_to_run_on_matching_files, pattern) observer.schedule(handler, opath) @@ -791,44 +792,6 @@ def publish_file(orig_pathname, publisher, attrs, unpacked_pathname): LOGGER.debug("Message sent: %s", str(msg)) -class WatchdogCreationHandler(FileSystemEventHandler): - """Trigger processing on filesystem events.""" - - def __init__(self, fun, pattern, use_polling=False): - """Initialize the processor.""" - super().__init__() - self.pattern = pattern - self.fun = fun - self.use_polling = use_polling - - def dispatch(self, event): - """Dispatches events to the appropriate methods.""" - if event.is_directory: - return - if hasattr(event, 'dest_path'): - pathname = os.fsdecode(event.dest_path) - elif event.src_path: - pathname = os.fsdecode(event.src_path) - if fnmatch.fnmatch(pathname, self.pattern): - super().dispatch(event) - - def on_created(self, event): - """Process created files. - - The function is called only when using polling observer. - """ - if self.use_polling: - self.fun(event.src_path) - - def on_closed(self, event): - """Process file closing.""" - self.fun(event.src_path) - - def on_moved(self, event): - """Process a file being moved to the destination directory.""" - self.fun(event.dest_path) - - def create_message_with_request_info(pathname, orig_pathname, attrs): """Create a message containing request info.""" info = _get_notify_message_info(attrs, orig_pathname, pathname) diff --git a/trollmoves/tests/test_move_it.py b/trollmoves/tests/test_move_it.py index 06c123ce..dbcb2b96 100644 --- a/trollmoves/tests/test_move_it.py +++ b/trollmoves/tests/test_move_it.py @@ -17,16 +17,7 @@ def test_move_it_moves_files(tmp_path): """Test that move it moves a file.""" - input_dir = tmp_path / "in" - output_dir = tmp_path / "out" - os.mkdir(input_dir) - os.mkdir(output_dir) - origin = "origin=" + str(input_dir / "bla{number:1s}.txt") - destinations = "destinations=" + str(output_dir) - local_move_it_config = "\n".join([move_it_config_template, origin, destinations]) - config_file = tmp_path / "move_it.cfg" - with open(config_file, "w") as fd: - fd.write(local_move_it_config) + input_dir, output_dir, config_file = create_config_file(tmp_path) cmd_args = parse_args([str(config_file)], default_port=None) move_it_thread = MoveItSimple(cmd_args) @@ -47,10 +38,8 @@ def test_move_it_moves_files(tmp_path): move_it_thread.chains_stop() -def test_move_it_published_a_message(tmp_path): - """Test that move it is publishing messages when provided a port.""" - from posttroll.message import Message - +def create_config_file(tmp_path): + """Create a move_it config file.""" input_dir = tmp_path / "in" output_dir = tmp_path / "out" os.mkdir(input_dir) @@ -61,6 +50,13 @@ def test_move_it_published_a_message(tmp_path): config_file = tmp_path / "move_it.cfg" with open(config_file, "w") as fd: fd.write(local_move_it_config) + return input_dir, output_dir, config_file + + +def test_move_it_published_a_message(tmp_path): + """Test that move it is publishing messages when provided a port.""" + from posttroll.message import Message + input_dir, output_dir, config_file = create_config_file(tmp_path) with patched_publisher() as message_list: cmd_args = parse_args([str(config_file), "-p", "2022"]) diff --git a/trollmoves/tests/test_server.py b/trollmoves/tests/test_server.py index 885e2c59..134cbd1d 100644 --- a/trollmoves/tests/test_server.py +++ b/trollmoves/tests/test_server.py @@ -82,11 +82,13 @@ def test_create_watchdog_notifier(tmp_path): from trollmoves.server import create_watchdog_polling_notifier fname = "20200428_1000_foo.tif" - fname_pattern = "{start_time:%Y%m%d_%H%M}_{product}.tif" - pattern_path = os.path.join(tmp_path, fname_pattern) - file_path = os.path.join(tmp_path, fname) + file_path = tmp_path / fname + + fname_pattern = tmp_path / "{start_time:%Y%m%d_%H%M}_{product}.tif" + pattern_path = tmp_path / fname_pattern + function_to_run = MagicMock() - observer = create_watchdog_polling_notifier(globify(pattern_path), function_to_run, timeout=.1) + observer = create_watchdog_polling_notifier(globify(str(pattern_path)), function_to_run, timeout=.1) observer.start() with open(os.path.join(file_path), "w") as fid: @@ -98,7 +100,7 @@ def test_create_watchdog_notifier(tmp_path): observer.stop() observer.join() - function_to_run.assert_called_with(file_path) + function_to_run.assert_called_with(str(file_path)) @pytest.mark.parametrize("config,expected_timeout",