Skip to content

Commit

Permalink
Enable multi-feed requests
Browse files Browse the repository at this point in the history
Using the new validator on-demand endpoint we collect requests and
send them to the validator in a single call.
  • Loading branch information
ross-spencer committed Jul 29, 2024
1 parent 4c57584 commit 3826d82
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 21 deletions.
59 changes: 59 additions & 0 deletions src/price_monitor/feeds_to_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Collection of feeds and price deviations to monitor for."""

from dataclasses import dataclass


@dataclass
class Feed:
name: str
deviation: float = 2.0 # 2% default.


feeds_to_monitor = [
# Feed names are always upper-case.
Feed("ADA-USD", 1.0),
Feed(
"ADA-IUSD",
),
Feed(
"ADA-USDM",
),
Feed(
"ADA-DJED",
),
Feed(
"SHEN-ADA",
),
Feed(
"MIN-ADA",
),
Feed(
"FACT-ADA",
),
Feed(
"LQ-ADA",
),
Feed(
"SNEK-ADA",
),
Feed(
"LENFI-ADA",
),
Feed(
"HUNT-ADA",
),
Feed(
"IBTC-ADA",
),
Feed(
"IETH-ADA",
),
]


def get_deviation(feed_id: str):
"""Retrieve deviation for a given price pair."""
for feed in feeds_to_monitor:
if feed.name != feed_id:
continue
return feed.deviation
74 changes: 53 additions & 21 deletions src/price_monitor/price_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
import websockets
from tenacity import retry, wait_exponential

try:
import feeds_to_monitor
except ModuleNotFoundError:
try:
from src.price_monitor import feeds_to_monitor
except ModuleNotFoundError:
from price_monitor import feeds_to_monitor


logging.basicConfig(
format="%(asctime)-15s %(levelname)s :: %(filename)s:%(lineno)s:%(funcName)s() :: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
Expand All @@ -48,16 +57,16 @@
ADA_USD_VALIDATION = "ADAUSD-ee4eed14-ffc2-11ed-9f67-67fb68ae3988"
VALIDATOR_URI: Final[str] = os.environ.get("ORCFAX_VALIDATOR")
MONITOR_URI: Final[str] = f"{VALIDATOR_URI}price_monitor/"
VALIDATION_REQUEST_URI: Final[str] = f"{VALIDATOR_URI}validate/{ADA_USD_VALIDATION}/"
FEED_ID: Final[str] = "ADA-USD"
VALIDATION_REQUEST_URI: Final[str] = f"{VALIDATOR_URI}validate_on_demand/"

# Seconds after which to request current price off-chain.
POLLING_TIME: Final[str] = 60


def price_request_msg() -> str:
"""Return a price request message to send to the websocket."""
return json.dumps({"feed_ids": [FEED_ID]})
feeds = [feed.name for feed in feeds_to_monitor.feeds_to_monitor]
return json.dumps({"feed_ids": feeds})


def get_user_agent() -> str:
Expand Down Expand Up @@ -103,13 +112,19 @@ async def connect_to_websocket(ws_uri: str, msg_to_send: str, local: bool):
await websocket.send(msg_to_send)
logger.info(msg_to_send)
msg = await websocket.recv()
return json.loads(msg)
try:
return json.loads(msg)
except json.JSONDecodeError:
pass
return msg
except websockets.exceptions.InvalidURI as err:
logger.error(
"ensure 'ORCFAX_VALIDATOR' environment variable is set: %s (`export ORCFAX_VALIDATOR=wss://`)",
err,
)
sys.exit(1)
except TypeError as err:
logger.error("ensure data is sent as JSON: %s", err)
except (
websockets.exceptions.ConnectionClosedError,
websockets.exceptions.InvalidStatusCode,
Expand All @@ -130,12 +145,12 @@ async def connect_to_websocket(ws_uri: str, msg_to_send: str, local: bool):
return {}


async def request_new_price(local: bool):
async def request_new_prices(pairs_to_request: dict, local: bool):
"""Send a validation request to the server to ask for a new price
to be placed on-chain.
"""
validate_uri = VALIDATION_REQUEST_URI
await connect_to_websocket(validate_uri, "", local)
validate_uri = f"{VALIDATION_REQUEST_URI}"
await connect_to_websocket(validate_uri, pairs_to_request, local)
return


Expand All @@ -155,30 +170,47 @@ async def price_monitor(local: bool = False):
```
"""
monitor_uri = MONITOR_URI
msg_to_send = price_request_msg()
feeds = price_request_msg()
try:
while True:
logger.info("request for prices: %s", msg_to_send)
data = await connect_to_websocket(monitor_uri, msg_to_send, local)
data = await connect_to_websocket(monitor_uri, feeds, local)
values = []
if data.get("error"):
logger.error("error in websocket response: %s", data.get("error"))
time.sleep(POLLING_TIME)
continue
data = data.get("data", [])
for item in data:
values = item.get(FEED_ID, [])
logger.info("received: %s", values)
deviation = determine_deviation(values)
logger.info("deviation (%%) calculated as: %s", deviation)
if deviation >= 1.0:
price_pairs = data.get("data", [])
pairs_to_request = []
for price_pair in price_pairs:
pair = list(price_pair.keys())[0]
values = list(price_pair.values())[0]
deviation = determine_deviation(values)
if not deviation:
continue
logger.info(
"deviation: %s '%s' greater than 1%% requesting new price on-chain",
values,
"'%s' deviation calculated as: '%s' from %s",
pair,
deviation,
values,
)
await request_new_price(local)
logger.info("polling: %ss", POLLING_TIME)
feed_deviation = feeds_to_monitor.get_deviation(pair)
if deviation >= feed_deviation:
pairs_to_request.append(pair)
logger.info(
"deviation: %s '%s' greater than %s%% requesting new price on-chain",
values,
deviation,
feed_deviation,
)
if not pairs_to_request:
logger.info(
"not requesting any updated pairs... polling in '%s' seconds",
POLLING_TIME,
)
time.sleep(POLLING_TIME)
continue
req = json.dumps({"feeds": pairs_to_request})
await request_new_prices(req, local)
time.sleep(POLLING_TIME)
except KeyboardInterrupt:
print("", file=sys.stderr)
Expand Down

0 comments on commit 3826d82

Please sign in to comment.