Skip to content

Commit

Permalink
Refactor fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
mraspaud committed Jan 28, 2025
1 parent f6cb954 commit d27b081
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 23 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
],
"remote_fs": ["pytroll-collectors>=0.16.0", "fsspec"],
"docs": [],
"fetcher": ["pytroll-watchers>=0.5.0"],
"fetcher": ["pytroll-watchers>=0.6.0"],
}

all_extras = []
Expand Down
43 changes: 25 additions & 18 deletions trollmoves/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
import logging
from contextlib import closing
from pathlib import Path
from urllib.parse import unquote

import yaml
from posttroll.publisher import create_publisher_from_dict_config
from posttroll.subscriber import create_subscriber_from_dict_config
from pytroll_watchers.fetch import fetch_file
from pytroll_watchers.publisher import file_publisher_from_generator

from trollmoves.logging import add_logging_options_to_parser, setup_logging

Expand Down Expand Up @@ -47,22 +46,30 @@ def fetch_from_subscriber(destination, subscriber_config, publisher_config):
""" # noqa
destination = Path(destination)

pub = create_publisher_from_dict_config(publisher_config)
pub.start()
with closing(pub):
sub = create_subscriber_from_dict_config(subscriber_config)
with closing(sub):
for message in sub.recv():
if message.type != "file":
continue
logger.info(f"Fetching from {str(message)}")
downloaded_file = fetch_from_message(message, destination)
message.data.pop("filesystem", None)
message.data.pop("path", None)
message.data["uri"] = unquote(downloaded_file.as_uri())
pub.send(str(message))
logger.info(f"Published {str(message)}")
generator = generate_file_items_from_subscriber(destination, subscriber_config)
config = dict(publisher_config=publisher_config, message_config=dict())
file_publisher_from_generator(generator, config)


def generate_file_items_from_subscriber(destination, subscriber_config):
"""Generate file items from subscriber."""
sub = create_subscriber_from_dict_config(subscriber_config)
with closing(sub):
for message in sub.recv():
if message.type != "file":
continue

logger.info(f"Fetching from {str(message)}")
file_item = fetch_from_message(message, destination)
file_metadata = message.data.copy()
file_metadata.pop("filesystem", None)
file_metadata.pop("path", None)
file_metadata.pop("uid", None)
file_metadata.pop("uri", None)
message_config = {"subject": message.subject,
"atype": message.type}
message_config["data"] = file_metadata
yield file_item, message_config


def cli(args=None):
Expand Down
8 changes: 4 additions & 4 deletions trollmoves/tests/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_fetch_message_logs(tmp_path, caplog):
create_data_file(sdr_file)
msg = ('pytroll://segment/viirs/l1b/ file [email protected] 2024-04-19T11:35:00.487388 v1.01 '
'application/json {"sensor": "viirs", '
f'"uid": "{uid}", "uri": "file://{str(sdr_file)}"' '}')
f'"uid": "{uid}", "uri": "{str(sdr_file)}"' '}')

dest_path2 = tmp_path / "dest2"
dest_path2.mkdir()
Expand All @@ -70,7 +70,7 @@ def test_fetch_message_logs(tmp_path, caplog):

assert str(msg) in caplog.text
assert str(dest_path2 / uid) in caplog.text
assert f"Published {messages[0]}" in caplog.text
assert f"Sending {messages[0]}" in caplog.text


def test_subscribe_and_fetch(tmp_path):
Expand All @@ -97,7 +97,7 @@ def test_subscribe_and_fetch(tmp_path):
assert (dest_path2 / uid).exists()
assert len(messages) == 1
message = Message(rawstr=messages[0])
expected_uri = f"file://{str(dest_path2)}/{uid}"
expected_uri = f"{str(dest_path2)}/{uid}"
assert "path" not in message.data
assert "filesystem" not in message.data
assert message.data["uri"] == expected_uri
Expand Down Expand Up @@ -126,7 +126,7 @@ def test_fetcher_cli(tmp_path):
assert (destination / uid).exists()
assert len(messages) == 1
message = Message(rawstr=messages[0])
expected_uri = f"file://{str(destination)}/{uid}"
expected_uri = f"{str(destination)}/{uid}"
assert "path" not in message.data
assert "filesystem" not in message.data
assert message.data["uri"] == expected_uri
Expand Down

0 comments on commit d27b081

Please sign in to comment.