diff --git a/.gitignore b/.gitignore index 5be64fd..309862d 100644 --- a/.gitignore +++ b/.gitignore @@ -142,3 +142,4 @@ dmypy.json *.bk cnt/* tar/ +cer-feeds.json diff --git a/src/collector_node/collector_node.py b/src/collector_node/collector_node.py index 2f2c1e5..cf91e76 100644 --- a/src/collector_node/collector_node.py +++ b/src/collector_node/collector_node.py @@ -14,6 +14,7 @@ """ +import argparse import asyncio import json import logging @@ -37,14 +38,15 @@ # Import config. try: import config + import feed_helper import flock from version import get_version except ModuleNotFoundError: try: - from collector_node import config, flock + from collector_node import config, feed_helper, flock from collector_node.version import get_version except ModuleNotFoundError: - from src.collector_node import config, flock + from src.collector_node import config, feed_helper, flock from src.collector_node.version import get_version try: @@ -159,11 +161,11 @@ async def fetch_cex_data(feed: str) -> dict: stdout = json.loads(ps_out.stdout.decode()) stderr = ps_out.stderr.decode() except subprocess.CalledProcessError as err: - logger.error("call failed with: %s stderr: %s", err, err.stderr) - feed = {} + logger.error("call failed with: %s", err) + return {} except json.decoder.JSONDecodeError as err: logger.error("json decode failed: %s", err) - feed = {} + return {} logger.info("stderr: %s", stderr) return stdout.get(feed) @@ -216,35 +218,18 @@ async def send_to_ws(validator_websocket, data_to_send: dict): return -async def fetch_and_send(identity: dict) -> None: +async def fetch_and_send(feeds: list, identity: dict) -> None: """Fetch feed data and send it to a validator websocket.""" - # CEX feeds. Others can be added to the array as required, - # e,g, [ "ADA/USD", "ADA/EUR", "USDT/USD" ] - cex_feeds = [ - "ADA/USD", - "ADA/EUR", - ] - - # DEX feeds. Others can be added as per cex_feeds as long as they - # are in DEX_PAIRS. - dex_feeds = [ - "FACT-ADA", - "NEWM-ADA", - "WMT-ADA", - # Sponsored. - "ADA-DJED", - "ADA-iUSD", - "ADA-USDM", - "HUNT-ADA", - "iBTC-ADA", - "iETH-ADA", - "LENFI-ADA", - "LQ-ADA", - "MIN-ADA", - "SHEN-ADA", - "SNEK-ADA", - ] + cex_feeds = [] + dex_feeds = [] + + for feed in feeds: + if feed.source == "cex": + cex_feeds.append(feed.label.replace("-", "/", 1)) + continue + if feed.source == "dex": + dex_feeds.append(feed.label) data_cex = fetch_cex_feeds(cex_feeds) data_dex = [] @@ -287,7 +272,7 @@ async def fetch_and_send(identity: dict) -> None: ) -async def collector_main(): +async def collector_main(feeds_file: str): """Collector node main. The script is designed so that it is staggered between 1 and 20 seconds @@ -303,18 +288,30 @@ async def collector_main(): logging.info("collector-node version: '%s'", get_version()) await asyncio.sleep(random.randint(1, 15)) identity = await read_identity() - await fetch_and_send(identity) + feeds = await feed_helper.read_feeds_file(feeds_file=feeds_file) + await fetch_and_send(feeds=feeds, identity=identity) def main(): """Primary entry point of this script.""" + parser = argparse.ArgumentParser( + prog="collector-node", + description="Orcfax collector routines retrieves CER from CEX and DEX sources and forwards them to a validator-node", + epilog="for more information visit https://orcfax.io", + ) + parser.add_argument( + "--feeds", + help="feed data describing feeds being monitored (CER-feeds (JSON))", + required=True, + ) + args = parser.parse_args() pid = os.getpid() start_time = time.time() logger.info("----- node runner (%s) -----", pid) try: with flock.FlockContext(flock_name_base="cnode_runner"): try: - asyncio.run(collector_main()) + asyncio.run(collector_main(feeds_file=args.feeds)) # pylint: disable=W0718 # global catch, if this doesn't run, nothing does. except Exception as err: logger.error("collector node runner not running: %s", err) diff --git a/src/collector_node/feed_helper.py b/src/collector_node/feed_helper.py new file mode 100644 index 0000000..aa3ac7f --- /dev/null +++ b/src/collector_node/feed_helper.py @@ -0,0 +1,37 @@ +"""Helpers for processing feed specification data.""" + +# pylint: disable=E0611,R0902 + +import json +import logging + +from pydantic.dataclasses import dataclass +from pydantic.tools import parse_obj_as + +logger = logging.getLogger(__name__) + + +@dataclass +class FeedSpec: + pair: str + label: str + interval: int + deviation: int + source: str + calculation: str + status: str + type: str = "CER" + + +async def read_feeds_file(feeds_file: str) -> list[FeedSpec]: + """ "Read feed data into memory for use in the script.""" + feed_dict = None + with open(feeds_file, "r", encoding="utf-8") as json_feeds: + feed_dict = json.loads(json_feeds.read()) + logger.info("cer-feeds version: %s", feed_dict["meta"]["version"]) + logger.info("number of feeds: %s", len(feed_dict["feeds"])) + feeds = [] + for item in feed_dict["feeds"]: + feed = parse_obj_as(FeedSpec, item) + feeds.append(feed) + return feeds