Skip to content

Commit

Permalink
feat: add kucoin source
Browse files Browse the repository at this point in the history
  • Loading branch information
cdummett committed Jun 6, 2024
1 parent 15de2f9 commit 9f5e755
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 29 deletions.
6 changes: 3 additions & 3 deletions vega_sim/devops/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

SCENARIOS = {
"ETHUSD": lambda: DevOpsScenario(
binance_code="ETHDAI",
price_symbol="ETHDAI",
market_manager_args=MarketManagerArgs(
market_config=configs.mainnet.ETHUSDT.CONFIG,
),
Expand Down Expand Up @@ -68,7 +68,7 @@
),
),
"SPOT": lambda: DevOpsScenario(
binance_code="ETHDAI",
price_symbol="ETHDAI",
market_manager_args=MarketManagerArgs(
market_config=configs.research.SPOT.CONFIG,
),
Expand Down Expand Up @@ -110,7 +110,7 @@
),
),
"BTCUSD": lambda: DevOpsScenario(
binance_code="BTCDAI",
price_symbol="BTCDAI",
market_manager_args=MarketManagerArgs(
market_config=configs.mainnet.BTCUSDT.CONFIG,
),
Expand Down
10 changes: 7 additions & 3 deletions vega_sim/devops/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
class DevOpsScenario(Scenario):
def __init__(
self,
binance_code: str,
price_symbol: str,
price_source: Optional[str],
market_manager_args: MarketManagerArgs,
market_maker_args: MarketMakerArgs,
auction_trader_args: AuctionTraderArgs,
Expand All @@ -70,7 +71,8 @@ def __init__(
):
super().__init__(state_extraction_fn=state_extraction_fn)

self.binance_code = binance_code
self.price_symbol = price_symbol
self.price_source = price_source
self.feed_price_multiplier = feed_price_multiplier

self.market_manager_args = market_manager_args
Expand Down Expand Up @@ -134,7 +136,9 @@ def configure_agents(
)
else:
self.price_process = get_live_price(
product=self.binance_code, multiplier=self.feed_price_multiplier
product=self.price_symbol,
multiplier=self.feed_price_multiplier,
price_source=self.price_source,
)

if self.scenario_wallet.market_creator_agent is None:
Expand Down
72 changes: 49 additions & 23 deletions vega_sim/scenario/common/utils/price_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,33 @@ def _on_message(iter_obj, message, symbol):
iter_obj.latest_price = float(json.loads(message)["k"]["c"])


def _price_listener(iter_obj, symbol):
def _kc_price_listener(iter_obj, symbol):
token = requests.post("https://api.kucoin.com/api/v1/bullet-public").json()["data"][
"token"
]
ws = websocket.WebSocketApp(
f"wss://stream.binance.com:9443/ws/{symbol}@kline_1s",
# on_open=lambda ws: print("ok"),
on_message=lambda _, msg: _on_message(iter_obj, msg),
f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId=]",
on_open=lambda ws: _kc_on_open(ws, symbol),
on_message=lambda _, msg: _kc_on_message(iter_obj, msg, symbol),
)
ws.run_forever(reconnect=5, ping_interval=10, ping_timeout=5)


def _kc_on_open(ws: websocket.WebSocketApp, symbol):
ws.send(
json.dumps(
{
"id": 1545910660739,
"type": "subscribe",
"topic": f"/market/ticker:{symbol}",
"response": True,
}
),
)
ws.run_forever(reconnect=5)


def _on_message(iter_obj, message):
iter_obj.latest_price = float(json.loads(message)["k"]["c"])
def _kc_on_message(iter_obj, message, symbol):
iter_obj.latest_price = float(json.loads(message)["data"]["price"])


class LivePrice:
Expand All @@ -210,14 +226,31 @@ class LivePrice:
"""

def __init__(self, product: str = "BTCBUSD", multiplier: int = 1):
def __init__(
self,
product: str = "BTCBUSD",
multiplier: int = 1,
price_source: Optional[str] = "binance",
):
self.product = product
self.latest_price = None
self.multiplier = multiplier

match price_source:
case "binance":
target = _price_listener
product = self.product.lower()

case "kucoin":
target = _kc_price_listener
product = self.product

case _:
raise ValueError("Unimplemented price source")

self._forwarding_thread = threading.Thread(
target=_price_listener,
args=(self, self.product.lower()),
target=target,
args=(self, product),
daemon=True,
)
self._forwarding_thread.start()
Expand All @@ -241,24 +274,17 @@ def _get_price(self):
_live_prices_lock = threading.Lock()


def get_live_price(product: str, multiplier: int) -> LivePrice:
def get_live_price(product: str, multiplier: int, price_source: str) -> LivePrice:
global _live_prices
global _live_prices_lock

feed_key = f"{product}_{multiplier}"

with _live_prices_lock:
if not feed_key in _live_prices:
_live_prices[feed_key] = LivePrice(product=product, multiplier=multiplier)
_live_prices[feed_key] = LivePrice(
product=product,
multiplier=multiplier,
price_source=price_source,
)
return _live_prices[feed_key]


if __name__ == "__main__":
print(
get_historic_price_series(
"ETH-USD",
granularity=Granularity.HOUR,
start="2022-08-02 01:01:50",
end="2022-09-05 09:05:20",
)
)

0 comments on commit 9f5e755

Please sign in to comment.