Skip to content

Commit

Permalink
Fix CI
Browse files Browse the repository at this point in the history
  • Loading branch information
mraspaud committed Feb 27, 2024
1 parent c037aed commit 3e1b339
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 73 deletions.
49 changes: 41 additions & 8 deletions trollmoves/move_it_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

"""Base class for move_it_{client,server,mirror}."""

import fnmatch
import logging
import logging.handlers
import os
import signal
import time
from abc import ABC, abstractmethod
Expand All @@ -45,7 +47,7 @@ def __init__(self, cmd_args, publisher=None):
"""Initialize the class."""
self.cmd_args = cmd_args
self.running = False
self.notifier = None
self.new_config_notifier = None
self.watchman = None
self.publisher = publisher
self.chains = {}
Expand All @@ -61,7 +63,7 @@ def chains_stop(self, *args):

self.running = False
try:
self.notifier.stop()
self.new_config_notifier.stop()
except RuntimeError as err:
LOGGER.warning("Could not stop notifier: %s", err)
with suppress(AttributeError):
Expand All @@ -77,7 +79,7 @@ def setup_watchers(self):
config_file = self.cmd_args.config_file
reload_function = self.reload_cfg_file

self.notifier = create_notifier_for_file(config_file, reload_function)
self.new_config_notifier = create_notifier_for_file(config_file, reload_function)

def run(self):
"""Start the transfer chains."""
Expand All @@ -86,10 +88,11 @@ def run(self):
signal.signal(signal.SIGHUP, self.signal_reload_cfg_file)
except ValueError:
LOGGER.warning("Signals could not be set up.")
self.notifier.start()
self.new_config_notifier.start()
self.running = True
while self.running:
time.sleep(1)
# FIXME: should we use timeout instead?
shutting_down = not self.run_lock.acquire(blocking=False)
if shutting_down:
break
Expand Down Expand Up @@ -122,16 +125,46 @@ def create_publisher(port, publisher_name):
return publisher


class WatchdogChangeHandler(FileSystemEventHandler):
"""Trigger processing on filesystem events."""
class _WatchdogHandler(FileSystemEventHandler):
"""Trigger processing on filesystem events, with filename matching."""

def __init__(self, fun):
def __init__(self, fun, pattern=None):
"""Initialize the processor."""
super().__init__()
self.fun = fun
self.pattern = pattern

def dispatch(self, event):
"""Dispatches events to the appropriate methods."""
if self.pattern is None:
return super().dispatch(event)
if event.is_directory:
return
if getattr(event, 'dest_path', None):
pathname = os.fsdecode(event.dest_path)
elif event.src_path:
pathname = os.fsdecode(event.src_path)
if fnmatch.fnmatch(pathname, self.pattern):
super().dispatch(event)


class WatchdogChangeHandler(_WatchdogHandler):
"""Trigger processing on filesystem events that change a file (moving, close (write))."""

def on_closed(self, event):
"""Process file creation."""
"""Process file closed."""
self.fun(event.src_path)

def on_moved(self, event):
"""Process a file being moved to the destination directory."""
self.fun(event.dest_path)


class WatchdogCreationHandler(_WatchdogHandler):
"""Trigger processing on filesystem events that create a file (moving, creation)."""

def on_created(self, event):
"""Process file closing."""
self.fun(event.src_path)

def on_moved(self, event):
Expand Down
55 changes: 9 additions & 46 deletions trollmoves/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@
from posttroll.publisher import get_own_ip
from posttroll.subscriber import Subscribe
from trollsift import globify, parse
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from zmq import NOBLOCK, POLLIN, PULL, PUSH, ROUTER, Poller, ZMQError

from trollmoves.client import DEFAULT_REQ_TIMEOUT
from trollmoves.logging import add_logging_options_to_parser
from trollmoves.move_it_base import MoveItBase, create_publisher
from trollmoves.move_it_base import (MoveItBase, WatchdogChangeHandler,
WatchdogCreationHandler, create_publisher)
from trollmoves.movers import move_it
from trollmoves.utils import (clean_url, gen_dict_contains, gen_dict_extract,
is_file_local)
Expand Down Expand Up @@ -748,21 +748,22 @@ def _get_notifier_builder(use_polling, chain_config):
def create_watchdog_polling_notifier(pattern, function_to_run_on_matching_files, timeout=1.0):
"""Create a notifier from the specified configuration attributes *attrs*."""
observer_class = partial(PollingObserver, timeout=timeout)
return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class)
handler_class = WatchdogCreationHandler
return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class, handler_class)


def create_watchdog_os_notifier(pattern, function_to_run_on_matching_files, timeout=1.0):
"""Create a notifier from the specified configuration attributes *attrs*."""
observer_class = partial(Observer, timeout=timeout)
return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class)
observer_class = partial(Observer, timeout=timeout, generate_full_events=True)
handler_class = WatchdogChangeHandler
return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class, handler_class)


def create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class):
def create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class, handler_class):
"""Create a watchdog notifier."""
opath = os.path.dirname(pattern)
observer = observer_class()
use_polling = observer_class.func is PollingObserver
handler = WatchdogCreationHandler(function_to_run_on_matching_files, pattern, use_polling=use_polling)
handler = handler_class(function_to_run_on_matching_files, pattern)

observer.schedule(handler, opath)

Expand Down Expand Up @@ -791,44 +792,6 @@ def publish_file(orig_pathname, publisher, attrs, unpacked_pathname):
LOGGER.debug("Message sent: %s", str(msg))


class WatchdogCreationHandler(FileSystemEventHandler):
"""Trigger processing on filesystem events."""

def __init__(self, fun, pattern, use_polling=False):
"""Initialize the processor."""
super().__init__()
self.pattern = pattern
self.fun = fun
self.use_polling = use_polling

def dispatch(self, event):
"""Dispatches events to the appropriate methods."""
if event.is_directory:
return
if hasattr(event, 'dest_path'):
pathname = os.fsdecode(event.dest_path)
elif event.src_path:
pathname = os.fsdecode(event.src_path)
if fnmatch.fnmatch(pathname, self.pattern):
super().dispatch(event)

def on_created(self, event):
"""Process created files.
The function is called only when using polling observer.
"""
if self.use_polling:
self.fun(event.src_path)

def on_closed(self, event):
"""Process file closing."""
self.fun(event.src_path)

def on_moved(self, event):
"""Process a file being moved to the destination directory."""
self.fun(event.dest_path)


def create_message_with_request_info(pathname, orig_pathname, attrs):
"""Create a message containing request info."""
info = _get_notify_message_info(attrs, orig_pathname, pathname)
Expand Down
24 changes: 10 additions & 14 deletions trollmoves/tests/test_move_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,7 @@

def test_move_it_moves_files(tmp_path):
"""Test that move it moves a file."""
input_dir = tmp_path / "in"
output_dir = tmp_path / "out"
os.mkdir(input_dir)
os.mkdir(output_dir)
origin = "origin=" + str(input_dir / "bla{number:1s}.txt")
destinations = "destinations=" + str(output_dir)
local_move_it_config = "\n".join([move_it_config_template, origin, destinations])
config_file = tmp_path / "move_it.cfg"
with open(config_file, "w") as fd:
fd.write(local_move_it_config)
input_dir, output_dir, config_file = create_config_file(tmp_path)

cmd_args = parse_args([str(config_file)], default_port=None)
move_it_thread = MoveItSimple(cmd_args)
Expand All @@ -47,10 +38,8 @@ def test_move_it_moves_files(tmp_path):
move_it_thread.chains_stop()


def test_move_it_published_a_message(tmp_path):
"""Test that move it is publishing messages when provided a port."""
from posttroll.message import Message

def create_config_file(tmp_path):
"""Create a move_it config file."""
input_dir = tmp_path / "in"
output_dir = tmp_path / "out"
os.mkdir(input_dir)
Expand All @@ -61,6 +50,13 @@ def test_move_it_published_a_message(tmp_path):
config_file = tmp_path / "move_it.cfg"
with open(config_file, "w") as fd:
fd.write(local_move_it_config)
return input_dir, output_dir, config_file


def test_move_it_published_a_message(tmp_path):
"""Test that move it is publishing messages when provided a port."""
from posttroll.message import Message
input_dir, output_dir, config_file = create_config_file(tmp_path)

with patched_publisher() as message_list:
cmd_args = parse_args([str(config_file), "-p", "2022"])
Expand Down
12 changes: 7 additions & 5 deletions trollmoves/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ def test_create_watchdog_notifier(tmp_path):
from trollmoves.server import create_watchdog_polling_notifier

fname = "20200428_1000_foo.tif"
fname_pattern = "{start_time:%Y%m%d_%H%M}_{product}.tif"
pattern_path = os.path.join(tmp_path, fname_pattern)
file_path = os.path.join(tmp_path, fname)
file_path = tmp_path / fname

fname_pattern = tmp_path / "{start_time:%Y%m%d_%H%M}_{product}.tif"
pattern_path = tmp_path / fname_pattern

function_to_run = MagicMock()
observer = create_watchdog_polling_notifier(globify(pattern_path), function_to_run, timeout=.1)
observer = create_watchdog_polling_notifier(globify(str(pattern_path)), function_to_run, timeout=.1)
observer.start()

with open(os.path.join(file_path), "w") as fid:
Expand All @@ -98,7 +100,7 @@ def test_create_watchdog_notifier(tmp_path):
observer.stop()
observer.join()

function_to_run.assert_called_with(file_path)
function_to_run.assert_called_with(str(file_path))


@pytest.mark.parametrize("config,expected_timeout",
Expand Down

0 comments on commit 3e1b339

Please sign in to comment.