Skip to content

Commit

Permalink
Use dynamic feeds list
Browse files Browse the repository at this point in the history
  • Loading branch information
ross-spencer committed Aug 12, 2024
1 parent b0ee06d commit 7ef9aa0
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 35 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,4 @@ dmypy.json
*.bk
cnt/*
tar/
cer-feeds.json
67 changes: 32 additions & 35 deletions src/collector_node/collector_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

import argparse
import asyncio
import json
import logging
Expand All @@ -37,14 +38,15 @@
# Import config.
try:
import config
import feed_helper
import flock
from version import get_version
except ModuleNotFoundError:
try:
from collector_node import config, flock
from collector_node import config, feed_helper, flock
from collector_node.version import get_version
except ModuleNotFoundError:
from src.collector_node import config, flock
from src.collector_node import config, feed_helper, flock
from src.collector_node.version import get_version

try:
Expand Down Expand Up @@ -159,11 +161,11 @@ async def fetch_cex_data(feed: str) -> dict:
stdout = json.loads(ps_out.stdout.decode())
stderr = ps_out.stderr.decode()
except subprocess.CalledProcessError as err:
logger.error("call failed with: %s stderr: %s", err, err.stderr)
feed = {}
logger.error("call failed with: %s", err)
return {}
except json.decoder.JSONDecodeError as err:
logger.error("json decode failed: %s", err)
feed = {}
return {}
logger.info("stderr: %s", stderr)
return stdout.get(feed)

Expand Down Expand Up @@ -216,35 +218,18 @@ async def send_to_ws(validator_websocket, data_to_send: dict):
return


async def fetch_and_send(identity: dict) -> None:
async def fetch_and_send(feeds: list, identity: dict) -> None:
"""Fetch feed data and send it to a validator websocket."""

# CEX feeds. Others can be added to the array as required,
# e,g, [ "ADA/USD", "ADA/EUR", "USDT/USD" ]
cex_feeds = [
"ADA/USD",
"ADA/EUR",
]

# DEX feeds. Others can be added as per cex_feeds as long as they
# are in DEX_PAIRS.
dex_feeds = [
"FACT-ADA",
"NEWM-ADA",
"WMT-ADA",
# Sponsored.
"ADA-DJED",
"ADA-iUSD",
"ADA-USDM",
"HUNT-ADA",
"iBTC-ADA",
"iETH-ADA",
"LENFI-ADA",
"LQ-ADA",
"MIN-ADA",
"SHEN-ADA",
"SNEK-ADA",
]
cex_feeds = []
dex_feeds = []

for feed in feeds:
if feed.source == "cex":
cex_feeds.append(feed.label.replace("-", "/", 1))
continue
if feed.source == "dex":
dex_feeds.append(feed.label)

data_cex = fetch_cex_feeds(cex_feeds)
data_dex = []
Expand Down Expand Up @@ -287,7 +272,7 @@ async def fetch_and_send(identity: dict) -> None:
)


async def collector_main():
async def collector_main(feeds_file: str):
"""Collector node main.
The script is designed so that it is staggered between 1 and 20 seconds
Expand All @@ -303,18 +288,30 @@ async def collector_main():
logging.info("collector-node version: '%s'", get_version())
await asyncio.sleep(random.randint(1, 15))
identity = await read_identity()
await fetch_and_send(identity)
feeds = await feed_helper.read_feeds_file(feeds_file=feeds_file)
await fetch_and_send(feeds=feeds, identity=identity)


def main():
"""Primary entry point of this script."""
parser = argparse.ArgumentParser(
prog="collector-node",
description="Orcfax collector routines retrieves CER from CEX and DEX sources and forwards them to a validator-node",
epilog="for more information visit https://orcfax.io",
)
parser.add_argument(
"--feeds",
help="feed data describing feeds being monitored (CER-feeds (JSON))",
required=True,
)
args = parser.parse_args()
pid = os.getpid()
start_time = time.time()
logger.info("----- node runner (%s) -----", pid)
try:
with flock.FlockContext(flock_name_base="cnode_runner"):
try:
asyncio.run(collector_main())
asyncio.run(collector_main(feeds_file=args.feeds))
# pylint: disable=W0718 # global catch, if this doesn't run, nothing does.
except Exception as err:
logger.error("collector node runner not running: %s", err)
Expand Down
37 changes: 37 additions & 0 deletions src/collector_node/feed_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Helpers for processing feed specification data."""

# pylint: disable=E0611,R0902

import json
import logging

from pydantic.dataclasses import dataclass
from pydantic.tools import parse_obj_as

logger = logging.getLogger(__name__)


@dataclass
class FeedSpec:
pair: str
label: str
interval: int
deviation: int
source: str
calculation: str
status: str
type: str = "CER"


async def read_feeds_file(feeds_file: str) -> list[FeedSpec]:
""" "Read feed data into memory for use in the script."""
feed_dict = None
with open(feeds_file, "r", encoding="utf-8") as json_feeds:
feed_dict = json.loads(json_feeds.read())
logger.info("cer-feeds version: %s", feed_dict["meta"]["version"])
logger.info("number of feeds: %s", len(feed_dict["feeds"]))
feeds = []
for item in feed_dict["feeds"]:
feed = parse_obj_as(FeedSpec, item)
feeds.append(feed)
return feeds

0 comments on commit 7ef9aa0

Please sign in to comment.