Skip to content

Commit

Permalink
feat: add additional price sources (#677)
Browse files Browse the repository at this point in the history
* feat: add kucoin source

* fix: websocket dependencies

* fix: source defaults to binance

* feat: implement pyth feeds

* feat: allow to pass pyth api url

* feat: import logging for whole price_process.py file

* chore: run black linter

---------

Co-authored-by: Daniel <[email protected]>
  • Loading branch information
cdummett and daniel1302 authored Jun 10, 2024
1 parent 54d13df commit 45b8129
Show file tree
Hide file tree
Showing 9 changed files with 35,551 additions and 352 deletions.
319 changes: 13 additions & 306 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ python-dotenv = "*"
deprecated = "*"
psutil = "*"
docker = "*"
websocket = "*"
websockets = "*"
websocket-client = "*"
gymnasium = {version = "*", optional = true}
sb3-contrib = {version = "^2.0.0a1", optional = true}
stable-baselines3 = {version = "*", optional = true }
Expand Down
35,426 changes: 35,426 additions & 0 deletions replay

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ executing==2.0.1 ; python_version >= "3.10" and python_version < "3.12"
fastjsonschema==2.19.1 ; python_version >= "3.10" and python_version < "3.12"
flake8==7.0.0 ; python_version >= "3.10" and python_version < "3.12"
fonttools==4.52.4 ; python_version >= "3.10" and python_version < "3.12"
gevent==24.2.1 ; python_version >= "3.10" and python_version < "3.12"
googleapis-common-protos==1.63.0 ; python_version >= "3.10" and python_version < "3.12"
greenlet==3.0.3 ; python_version >= "3.10" and python_version < "3.12"
grpcio-tools==1.62.2 ; python_version >= "3.10" and python_version < "3.12"
grpcio==1.64.0 ; python_version >= "3.10" and python_version < "3.12"
idna==3.7 ; python_version >= "3.10" and python_version < "3.12"
Expand Down Expand Up @@ -98,8 +96,5 @@ tzdata==2024.1 ; python_version >= "3.10" and python_version < "3.12"
urllib3==2.2.1 ; python_version >= "3.10" and python_version < "3.12"
vegapy @ git+https://github.com/cdummett/vegapy/@c5636a17f288b5fe9a5dbe8b16ebd2542ab3af15 ; python_version >= "3.10" and python_version < "3.12"
wcwidth==0.2.13 ; python_version >= "3.10" and python_version < "3.12"
websocket==0.2.1 ; python_version >= "3.10" and python_version < "3.12"
websockets==12.0 ; python_version >= "3.10" and python_version < "3.12"
websocket-client==1.8.0 ; python_version >= "3.10" and python_version < "3.12"
wrapt==1.16.0 ; python_version >= "3.10" and python_version < "3.12"
zope-event==5.0 ; python_version >= "3.10" and python_version < "3.12"
zope-interface==6.4.post2 ; python_version >= "3.10" and python_version < "3.12"
7 changes: 1 addition & 6 deletions requirements-learning.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ filelock==3.14.0 ; python_version >= "3.10" and python_version < "3.12"
flake8==7.0.0 ; python_version >= "3.10" and python_version < "3.12"
fonttools==4.52.4 ; python_version >= "3.10" and python_version < "3.12"
fsspec==2024.5.0 ; python_version >= "3.10" and python_version < "3.12"
gevent==24.2.1 ; python_version >= "3.10" and python_version < "3.12"
googleapis-common-protos==1.63.0 ; python_version >= "3.10" and python_version < "3.12"
greenlet==3.0.3 ; python_version >= "3.10" and python_version < "3.12"
grpcio-tools==1.62.2 ; python_version >= "3.10" and python_version < "3.12"
grpcio==1.64.0 ; python_version >= "3.10" and python_version < "3.12"
gymnasium==0.29.1 ; python_version >= "3.10" and python_version < "3.12"
Expand Down Expand Up @@ -134,9 +132,6 @@ tzdata==2024.1 ; python_version >= "3.10" and python_version < "3.12"
urllib3==2.2.1 ; python_version >= "3.10" and python_version < "3.12"
vegapy @ git+https://github.com/cdummett/vegapy/@c5636a17f288b5fe9a5dbe8b16ebd2542ab3af15 ; python_version >= "3.10" and python_version < "3.12"
wcwidth==0.2.13 ; python_version >= "3.10" and python_version < "3.12"
websocket==0.2.1 ; python_version >= "3.10" and python_version < "3.12"
websockets==12.0 ; python_version >= "3.10" and python_version < "3.12"
websocket-client==1.8.0 ; python_version >= "3.10" and python_version < "3.12"
werkzeug==3.0.3 ; python_version >= "3.10" and python_version < "3.12"
wrapt==1.16.0 ; python_version >= "3.10" and python_version < "3.12"
zope-event==5.0 ; python_version >= "3.10" and python_version < "3.12"
zope-interface==6.4.post2 ; python_version >= "3.10" and python_version < "3.12"
7 changes: 1 addition & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ cycler==0.12.1 ; python_version >= "3.10" and python_version < "3.12"
deprecated==1.2.14 ; python_version >= "3.10" and python_version < "3.12"
docker==7.1.0 ; python_version >= "3.10" and python_version < "3.12"
fonttools==4.52.4 ; python_version >= "3.10" and python_version < "3.12"
gevent==24.2.1 ; python_version >= "3.10" and python_version < "3.12"
googleapis-common-protos==1.63.0 ; python_version >= "3.10" and python_version < "3.12"
greenlet==3.0.3 ; python_version >= "3.10" and python_version < "3.12"
grpcio-tools==1.62.2 ; python_version >= "3.10" and python_version < "3.12"
grpcio==1.64.0 ; python_version >= "3.10" and python_version < "3.12"
idna==3.7 ; python_version >= "3.10" and python_version < "3.12"
Expand Down Expand Up @@ -39,8 +37,5 @@ tenacity==8.3.0 ; python_version >= "3.10" and python_version < "3.12"
toml==0.10.2 ; python_version >= "3.10" and python_version < "3.12"
tzdata==2024.1 ; python_version >= "3.10" and python_version < "3.12"
urllib3==2.2.1 ; python_version >= "3.10" and python_version < "3.12"
websocket==0.2.1 ; python_version >= "3.10" and python_version < "3.12"
websockets==12.0 ; python_version >= "3.10" and python_version < "3.12"
websocket-client==1.8.0 ; python_version >= "3.10" and python_version < "3.12"
wrapt==1.16.0 ; python_version >= "3.10" and python_version < "3.12"
zope-event==5.0 ; python_version >= "3.10" and python_version < "3.12"
zope-interface==6.4.post2 ; python_version >= "3.10" and python_version < "3.12"
6 changes: 3 additions & 3 deletions vega_sim/devops/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

SCENARIOS = {
"ETHUSD": lambda: DevOpsScenario(
binance_code="ETHDAI",
price_symbol="ETHDAI",
market_manager_args=MarketManagerArgs(
market_config=configs.mainnet.ETHUSDT.CONFIG,
),
Expand Down Expand Up @@ -68,7 +68,7 @@
),
),
"SPOT": lambda: DevOpsScenario(
binance_code="ETHDAI",
price_symbol="ETHDAI",
market_manager_args=MarketManagerArgs(
market_config=configs.research.SPOT.CONFIG,
),
Expand Down Expand Up @@ -110,7 +110,7 @@
),
),
"BTCUSD": lambda: DevOpsScenario(
binance_code="BTCDAI",
price_symbol="BTCDAI",
market_manager_args=MarketManagerArgs(
market_config=configs.mainnet.BTCUSDT.CONFIG,
),
Expand Down
10 changes: 7 additions & 3 deletions vega_sim/devops/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
class DevOpsScenario(Scenario):
def __init__(
self,
binance_code: str,
price_symbol: str,
market_manager_args: MarketManagerArgs,
market_maker_args: MarketMakerArgs,
auction_trader_args: AuctionTraderArgs,
Expand All @@ -67,10 +67,12 @@ def __init__(
scenario_wallet: Optional[ScenarioWallet] = None,
step_length_seconds: float = 10,
market_name: Optional[str] = None,
price_source: str = "binance",
):
super().__init__(state_extraction_fn=state_extraction_fn)

self.binance_code = binance_code
self.price_symbol = price_symbol
self.price_source = price_source
self.feed_price_multiplier = feed_price_multiplier

self.market_manager_args = market_manager_args
Expand Down Expand Up @@ -134,7 +136,9 @@ def configure_agents(
)
else:
self.price_process = get_live_price(
product=self.binance_code, multiplier=self.feed_price_multiplier
product=self.price_symbol,
multiplier=self.feed_price_multiplier,
price_source=self.price_source,
)

if self.scenario_wallet.market_creator_agent is None:
Expand Down
118 changes: 98 additions & 20 deletions vega_sim/scenario/common/utils/price_process.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from vega_sim.api.helpers import num_from_padded_int
import time
import os
import logging

import numpy as np
import pandas as pd
import requests
import json
from websockets.sync.client import connect
import threading
import websocket

Expand Down Expand Up @@ -188,17 +190,45 @@ def _on_message(iter_obj, message, symbol):
iter_obj.latest_price = float(json.loads(message)["k"]["c"])


def _price_listener(iter_obj, symbol):
def _kc_price_listener(iter_obj, symbol):
token = requests.post("https://api.kucoin.com/api/v1/bullet-public").json()["data"][
"token"
]
ws = websocket.WebSocketApp(
f"wss://stream.binance.com:9443/ws/{symbol}@kline_1s",
# on_open=lambda ws: print("ok"),
on_message=lambda _, msg: _on_message(iter_obj, msg),
f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId=]",
on_open=lambda ws: _kc_on_open(ws, symbol),
on_message=lambda _, msg: _kc_on_message(iter_obj, msg, symbol),
)
ws.run_forever(reconnect=5, ping_interval=10, ping_timeout=5)


def _py_price_listener(iter_obj, symbol, update_freq=5):
api_url = os.environ.get("PYTH_PRICE_PULL_API_URL", "http://localhost:8080")
while True:
try:

res = requests.get(f"{api_url}/avgPrice", params={"symbol": symbol})
iter_obj.latest_price = num_from_padded_int(res.json()["price"], 18)
except requests.RequestException as e:
logging.warning(e)
time.sleep(update_freq)


def _kc_on_open(ws: websocket.WebSocketApp, symbol):
ws.send(
json.dumps(
{
"id": 1545910660739,
"type": "subscribe",
"topic": f"/market/ticker:{symbol}",
"response": True,
}
),
)
ws.run_forever(reconnect=5)


def _on_message(iter_obj, message):
iter_obj.latest_price = float(json.loads(message)["k"]["c"])
def _kc_on_message(iter_obj, message, symbol):
iter_obj.latest_price = float(json.loads(message)["data"]["price"])


class LivePrice:
Expand All @@ -210,14 +240,36 @@ class LivePrice:
"""

def __init__(self, product: str = "BTCBUSD", multiplier: int = 1):
def __init__(
self,
product: str = "BTCBUSD",
multiplier: int = 1,
price_source: Optional[str] = "binance",
update_frequency: Optional[int] = 5,
):
self.product = product
self.latest_price = None
self.multiplier = multiplier

match price_source:
case "binance":
target = _price_listener
product = self.product.lower()

case "kucoin":
target = _kc_price_listener
product = self.product

case "pyth":
target = _py_price_listener
product = self.product

case _:
raise ValueError("Unimplemented price source")

self._forwarding_thread = threading.Thread(
target=_price_listener,
args=(self, self.product.lower()),
target=target,
args=(self, product),
daemon=True,
)
self._forwarding_thread.start()
Expand All @@ -241,24 +293,50 @@ def _get_price(self):
_live_prices_lock = threading.Lock()


def get_live_price(product: str, multiplier: int) -> LivePrice:
def get_live_price(
product: str,
multiplier: int,
price_source: Optional[str] = None,
update_frequency: int = 5,
) -> LivePrice:
global _live_prices
global _live_prices_lock

feed_key = f"{product}_{multiplier}"

with _live_prices_lock:
if not feed_key in _live_prices:
_live_prices[feed_key] = LivePrice(product=product, multiplier=multiplier)
_live_prices[feed_key] = LivePrice(
product=product,
multiplier=multiplier,
price_source=price_source,
update_frequency=update_frequency,
)
return _live_prices[feed_key]


if __name__ == "__main__":
print(
get_historic_price_series(
"ETH-USD",
granularity=Granularity.HOUR,
start="2022-08-02 01:01:50",
end="2022-09-05 09:05:20",
)

logging.basicConfig(level=logging.DEBUG)

bn_stream = get_live_price(
"btcusdt",
1,
price_source="binance",
)
kc_stream = get_live_price(
"BTC-USDT",
1,
price_source="kucoin",
)
py_stream = get_live_price(
"BTC/USD",
1,
price_source="pyth",
)

for _ in range(300):
print(
f"Prices: bn={next(bn_stream):.2f} kc={next(kc_stream):.2f}, py={next(py_stream):.2f}"
)
time.sleep(1)

0 comments on commit 45b8129

Please sign in to comment.