diff --git a/src/price_monitor/feeds_to_monitor.py b/src/price_monitor/feeds_to_monitor.py new file mode 100644 index 0000000..23e421e --- /dev/null +++ b/src/price_monitor/feeds_to_monitor.py @@ -0,0 +1,59 @@ +"""Collection of feeds and price deviations to monitor for.""" + +from dataclasses import dataclass + + +@dataclass +class Feed: + name: str + deviation: float = 2.0 # 2% default. + + +feeds_to_monitor = [ + # Feed names are always upper-case. + Feed("ADA-USD", 1.0), + Feed( + "ADA-IUSD", + ), + Feed( + "ADA-USDM", + ), + Feed( + "ADA-DJED", + ), + Feed( + "SHEN-ADA", + ), + Feed( + "MIN-ADA", + ), + Feed( + "FACT-ADA", + ), + Feed( + "LQ-ADA", + ), + Feed( + "SNEK-ADA", + ), + Feed( + "LENFI-ADA", + ), + Feed( + "HUNT-ADA", + ), + Feed( + "IBTC-ADA", + ), + Feed( + "IETH-ADA", + ), +] + + +def get_deviation(feed_id: str): + """Retrieve deviation for a given price pair.""" + for feed in feeds_to_monitor: + if feed.name != feed_id: + continue + return feed.deviation diff --git a/src/price_monitor/price_monitor.py b/src/price_monitor/price_monitor.py index a345ca1..3eff466 100644 --- a/src/price_monitor/price_monitor.py +++ b/src/price_monitor/price_monitor.py @@ -30,6 +30,15 @@ import websockets from tenacity import retry, wait_exponential +try: + import feeds_to_monitor +except ModuleNotFoundError: + try: + from src.price_monitor import feeds_to_monitor + except ModuleNotFoundError: + from price_monitor import feeds_to_monitor + + logging.basicConfig( format="%(asctime)-15s %(levelname)s :: %(filename)s:%(lineno)s:%(funcName)s() :: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", @@ -48,8 +57,7 @@ ADA_USD_VALIDATION = "ADAUSD-ee4eed14-ffc2-11ed-9f67-67fb68ae3988" VALIDATOR_URI: Final[str] = os.environ.get("ORCFAX_VALIDATOR") MONITOR_URI: Final[str] = f"{VALIDATOR_URI}price_monitor/" -VALIDATION_REQUEST_URI: Final[str] = f"{VALIDATOR_URI}validate/{ADA_USD_VALIDATION}/" -FEED_ID: Final[str] = "ADA-USD" +VALIDATION_REQUEST_URI: Final[str] = f"{VALIDATOR_URI}validate_on_demand/" # Seconds after which to request current price off-chain. POLLING_TIME: Final[str] = 60 @@ -57,7 +65,8 @@ def price_request_msg() -> str: """Return a price request message to send to the websocket.""" - return json.dumps({"feed_ids": [FEED_ID]}) + feeds = [feed.name for feed in feeds_to_monitor.feeds_to_monitor] + return json.dumps({"feed_ids": feeds}) def get_user_agent() -> str: @@ -103,13 +112,19 @@ async def connect_to_websocket(ws_uri: str, msg_to_send: str, local: bool): await websocket.send(msg_to_send) logger.info(msg_to_send) msg = await websocket.recv() - return json.loads(msg) + try: + return json.loads(msg) + except json.JSONDecodeError: + pass + return msg except websockets.exceptions.InvalidURI as err: logger.error( "ensure 'ORCFAX_VALIDATOR' environment variable is set: %s (`export ORCFAX_VALIDATOR=wss://`)", err, ) sys.exit(1) + except TypeError as err: + logger.error("ensure data is sent as JSON: %s", err) except ( websockets.exceptions.ConnectionClosedError, websockets.exceptions.InvalidStatusCode, @@ -130,12 +145,12 @@ async def connect_to_websocket(ws_uri: str, msg_to_send: str, local: bool): return {} -async def request_new_price(local: bool): +async def request_new_prices(pairs_to_request: dict, local: bool): """Send a validation request to the server to ask for a new price to be placed on-chain. """ - validate_uri = VALIDATION_REQUEST_URI - await connect_to_websocket(validate_uri, "", local) + validate_uri = f"{VALIDATION_REQUEST_URI}" + await connect_to_websocket(validate_uri, pairs_to_request, local) return @@ -155,30 +170,47 @@ async def price_monitor(local: bool = False): ``` """ monitor_uri = MONITOR_URI - msg_to_send = price_request_msg() + feeds = price_request_msg() try: while True: - logger.info("request for prices: %s", msg_to_send) - data = await connect_to_websocket(monitor_uri, msg_to_send, local) + data = await connect_to_websocket(monitor_uri, feeds, local) values = [] if data.get("error"): logger.error("error in websocket response: %s", data.get("error")) time.sleep(POLLING_TIME) continue - data = data.get("data", []) - for item in data: - values = item.get(FEED_ID, []) - logger.info("received: %s", values) - deviation = determine_deviation(values) - logger.info("deviation (%%) calculated as: %s", deviation) - if deviation >= 1.0: + price_pairs = data.get("data", []) + pairs_to_request = [] + for price_pair in price_pairs: + pair = list(price_pair.keys())[0] + values = list(price_pair.values())[0] + deviation = determine_deviation(values) + if not deviation: + continue logger.info( - "deviation: %s '%s' greater than 1%% requesting new price on-chain", - values, + "'%s' deviation calculated as: '%s' from %s", + pair, deviation, + values, ) - await request_new_price(local) - logger.info("polling: %ss", POLLING_TIME) + feed_deviation = feeds_to_monitor.get_deviation(pair) + if deviation >= feed_deviation: + pairs_to_request.append(pair) + logger.info( + "deviation: %s '%s' greater than %s%% requesting new price on-chain", + values, + deviation, + feed_deviation, + ) + if not pairs_to_request: + logger.info( + "not requesting any updated pairs... polling in '%s' seconds", + POLLING_TIME, + ) + time.sleep(POLLING_TIME) + continue + req = json.dumps({"feeds": pairs_to_request}) + await request_new_prices(req, local) time.sleep(POLLING_TIME) except KeyboardInterrupt: print("", file=sys.stderr)