From a8d6941aa2f99bd746a350cf135fc9426d3796b8 Mon Sep 17 00:00:00 2001 From: Charlie <99198652+cdummett@users.noreply.github.com> Date: Fri, 17 Mar 2023 15:54:05 +0000 Subject: [PATCH] feat: converted market data * fix: correct tags in make file * feat: add market data handler * feat: rename market data methods * feat: migrate methods from data to service * feat: update state extraction * fix: make run black * fix: correct decimal mapping * feat: update market_data uses * fix: run make black * feat: add mark_price too snitch * fix: liquidity fee shares * feat: Fix up when loading new market data too fast * fix: run make black * feat: Fix up handling when market_id is None --------- Co-authored-by: Tom McLean --- Makefile | 4 +- .../feature_tests/0002-STTL-008.ipynb | 4 +- .../feature_tests/0004-AMND-010.ipynb | 4 +- examples/visualisations/utils.py | 2 +- tests/integration/test_trading.py | 4 +- tests/integration/test_transaction_store.py | 8 +- tests/vega_sim/api/test_data.py | 87 ----- tests/vega_sim/api/test_data_raw.py | 6 +- vega_sim/api/data.py | 358 ++++++++++++------ vega_sim/api/data_raw.py | 22 +- vega_sim/environment/environment.py | 13 +- vega_sim/local_data_cache.py | 57 ++- vega_sim/network_service.py | 4 +- vega_sim/parameter_test/parameter/loggers.py | 12 +- vega_sim/scenario/common/agents.py | 58 +-- vega_sim/service.py | 118 ++++-- vega_sim/tools/scenario_output.py | 1 + 17 files changed, 444 insertions(+), 318 deletions(-) diff --git a/Makefile b/Makefile index c46dd15df..d6357ae91 100644 --- a/Makefile +++ b/Makefile @@ -52,8 +52,8 @@ pull_deps_networks: @echo "Downloading Git dependencies into " ${EXTERN_DIR} @echo "Downloading Vega networks-internal" @if [ ! -d ./extern/networks-internal ]; then mkdir ./extern/networks-internal; git clone https://github.com/vegaprotocol/networks-internal ${EXTERN_DIR}/networks-internal; fi -ifneq (${VEGA_SIM_NETWORKS_TAG},develop) - @git -C ${EXTERN_DIR}/networks-internal pull; git -C ${EXTERN_DIR}/networks-internal checkout ${VEGA_SIM_NETWORKS_TAG} +ifneq (${VEGA_SIM_NETWORKS_INTERNAL_TAG},develop) + @git -C ${EXTERN_DIR}/networks-internal pull; git -C ${EXTERN_DIR}/networks-internal checkout ${VEGA_SIM_NETWORKS_INTERNAL_TAG} else @git -C ${EXTERN_DIR}/networks-internal checkout develop; git -C ${EXTERN_DIR}/networks-internal pull endif diff --git a/examples/notebooks/feature_tests/0002-STTL-008.ipynb b/examples/notebooks/feature_tests/0002-STTL-008.ipynb index 7902b9940..6e1746de9 100644 --- a/examples/notebooks/feature_tests/0002-STTL-008.ipynb +++ b/examples/notebooks/feature_tests/0002-STTL-008.ipynb @@ -243,7 +243,7 @@ " print(general+margin+bond - mint_amount[wallet.name])\n", " print(vega.order_status(order_id=buy_order_id))\n", "\n", - "print(vega.market_data(market_id=market_id))" + "print(vega.get_latest_market_data(market_id=market_id))" ] }, { @@ -306,7 +306,7 @@ "# Check order status/ market state after amend\n", "print(vega.order_status(order_id=buy_order_id))\n", "\n", - "print(vega.market_data(market_id=market_id))" + "print(vega.get_latest_market_data(market_id=market_id))" ] } ], diff --git a/examples/notebooks/feature_tests/0004-AMND-010.ipynb b/examples/notebooks/feature_tests/0004-AMND-010.ipynb index 3eb9e90b1..c00977ec1 100644 --- a/examples/notebooks/feature_tests/0004-AMND-010.ipynb +++ b/examples/notebooks/feature_tests/0004-AMND-010.ipynb @@ -151,7 +151,7 @@ "# Check order status/ market state\n", "print(vega.order_status(order_id=buy_order_id))\n", "\n", - "print(vega.market_data(market_id=market_id))" + "print(vega.get_latest_market_data(market_id=market_id))" ] }, { @@ -180,7 +180,7 @@ "# Check order status/ market state after amend\n", "print(vega.order_status(order_id=buy_order_id))\n", "\n", - "print(vega.market_data(market_id=market_id))" + "print(vega.get_latest_market_data(market_id=market_id))" ] } ], diff --git a/examples/visualisations/utils.py b/examples/visualisations/utils.py index a61e863c9..b7784a196 100644 --- a/examples/visualisations/utils.py +++ b/examples/visualisations/utils.py @@ -377,7 +377,7 @@ def move_market( Volume to be traded at new price. """ - market_data = vega.market_data(market_id=market_id) + market_data = vega.get_latest_market_data(market_id=market_id) market_info = vega.market_info(market_id=market_id) curr_price = int(market_data.mid_price) * 10**-market_info.decimal_places diff --git a/tests/integration/test_trading.py b/tests/integration/test_trading.py index c719c7d54..af7124a47 100644 --- a/tests/integration/test_trading.py +++ b/tests/integration/test_trading.py @@ -229,7 +229,7 @@ def test_one_off_transfer(vega_service_with_high_volume_with_market: VegaService to_account_type=vega_protos.vega.ACCOUNT_TYPE_GENERAL, asset=asset_id, amount=500, - delay=15, + delay=50, ) vega.wait_fn(10) @@ -256,7 +256,7 @@ def test_one_off_transfer(vega_service_with_high_volume_with_market: VegaService assert party_a_accounts_t2.general == 500 assert party_b_accounts_t2.general == 1000 - vega.wait_fn(10) + vega.wait_fn(100) vega.wait_for_total_catchup() party_a_accounts_t3 = vega.party_account( diff --git a/tests/integration/test_transaction_store.py b/tests/integration/test_transaction_store.py index 689a81415..8bbe0279b 100644 --- a/tests/integration/test_transaction_store.py +++ b/tests/integration/test_transaction_store.py @@ -21,18 +21,20 @@ def _test_transaction_store(): log_dir = vega.log_dir market_id = vega.all_markets()[0].id - final_oi = vega.market_data(market_id=market_id).open_interest + final_oi = vega.get_latest_market_data(market_id=market_id).open_interest time.sleep(1) with replay.replay_run_context(replay_path=log_dir) as vega: time.sleep(1) vega.wait_for_core_catchup() - final_oi_replay = vega.market_data(market_id=market_id).open_interest + final_oi_replay = vega.get_latest_market_data(market_id=market_id).open_interest with replay.replay_run_context(replay_path=log_dir) as vega: time.sleep(1) vega.wait_for_core_catchup() - final_oi_replay_2 = vega.market_data(market_id=market_id).open_interest + final_oi_replay_2 = vega.get_latest_market_data( + market_id=market_id + ).open_interest assert final_oi == final_oi_replay assert final_oi_replay_2 == final_oi_replay diff --git a/tests/vega_sim/api/test_data.py b/tests/vega_sim/api/test_data.py index 2f173eda1..4007c3cfe 100644 --- a/tests/vega_sim/api/test_data.py +++ b/tests/vega_sim/api/test_data.py @@ -20,8 +20,6 @@ Trade, AggregatedLedgerEntry, get_asset_decimals, - best_prices, - price_bounds, find_asset_id, get_trades, margin_levels, @@ -30,7 +28,6 @@ open_orders_by_market, party_account, list_transfers, - get_liquidity_fee_shares, list_ledger_entries, ) from vega_sim.grpc.client import ( @@ -198,43 +195,6 @@ def test_asset_decimals(mkt_info_mock): assert get_asset_decimals("ASSET", None) == 3 -@patch("vega_sim.api.data_raw.market_data") -def test_best_prices(mkt_data_mock): - mkt_data = MagicMock() - mkt_data_mock.return_value = mkt_data - - mkt_data.best_static_bid_price = "202" - mkt_data.best_static_offer_price = "212" - - bid_res, ask_res = best_prices("mkt", None, 2) - assert bid_res == pytest.approx(2.02) - assert ask_res == pytest.approx(2.12) - - -@patch("vega_sim.api.data_raw.market_data") -def test_price_bounds(mkt_data_mock): - mkt_data = MagicMock() - mkt_data_mock.return_value = mkt_data - mkt_data.price_monitoring_bounds = [ - vega_protos.vega.PriceMonitoringBounds( - min_valid_price="10000", - max_valid_price="20000", - ), - vega_protos.vega.PriceMonitoringBounds( - min_valid_price="11000", - max_valid_price="19000", - ), - vega_protos.vega.PriceMonitoringBounds( - min_valid_price="12000", - max_valid_price="18000", - ), - ] - - min_valid_price, max_valid_price = price_bounds("mkt", None, 2) - assert min_valid_price == pytest.approx(120) - assert max_valid_price == pytest.approx(180) - - def test_open_orders_by_market(trading_data_v2_servicer_and_port): def ListOrders(self, request, context): return data_node_protos_v2.trading_data.ListOrdersResponse( @@ -723,53 +683,6 @@ def ListTransfers(self, request, context): assert res == [expected] -def test_get_liquidity_fee_shares(trading_data_v2_servicer_and_port): - expected = {"party1": 0.75, "party2": 0.25} - - def GetLatestMarketData(self, request, context): - return data_node_protos_v2.trading_data.GetLatestMarketDataResponse( - market_data=vega_protos.vega.MarketData( - liquidity_provider_fee_share=[ - vega_protos.vega.LiquidityProviderFeeShare( - party="party1", - equity_like_share="0.75", - average_entry_valuation="75", - average_score="0.5", - ), - vega_protos.vega.LiquidityProviderFeeShare( - party="party2", - equity_like_share="0.25", - average_entry_valuation="100", - average_score="0.5", - ), - ] - ) - ) - - server, port, mock_servicer = trading_data_v2_servicer_and_port - mock_servicer.GetLatestMarketData = GetLatestMarketData - - add_TradingDataServiceServicer_v2_to_server(mock_servicer(), server) - - data_client = VegaTradingDataClientV2(f"localhost:{port}") - - res1 = get_liquidity_fee_shares( - data_client=data_client, market_id="na", party_id="party1" - ) - assert res1 == expected["party1"] - - res2 = get_liquidity_fee_shares( - data_client=data_client, market_id="na", party_id="party2" - ) - assert res2 == expected["party2"] - - res3 = get_liquidity_fee_shares( - data_client=data_client, - market_id="na", - ) - assert res3 == expected - - def test_list_ledger_entries(trading_data_v2_servicer_and_port): expected = data_node_protos_v2.trading_data.AggregatedLedgerEntry( timestamp=10000000, quantity="540", asset_id="asset1" diff --git a/tests/vega_sim/api/test_data_raw.py b/tests/vega_sim/api/test_data_raw.py index 4fbb8905d..0deadbeee 100644 --- a/tests/vega_sim/api/test_data_raw.py +++ b/tests/vega_sim/api/test_data_raw.py @@ -19,7 +19,7 @@ infrastructure_fee_accounts, liquidity_provisions, market_accounts, - market_data, + get_latest_market_data, market_data_history, market_info, order_status, @@ -364,7 +364,7 @@ def ListAccounts(self, request, context): assert res == expected -def test_market_data(trading_data_v2_servicer_and_port): +def test_get_latest_market_data(trading_data_v2_servicer_and_port): expected = vega_protos.vega.MarketData(mid_price="100", market="foobar") def GetLatestMarketData(self, request, context): @@ -380,7 +380,7 @@ def GetLatestMarketData(self, request, context): add_TradingDataServiceServicer_v2_to_server(mock_servicer(), server) data_client = VegaTradingDataClientV2(f"localhost:{port}") - res = market_data(market_id="foobar", data_client=data_client) + res = get_latest_market_data(market_id="foobar", data_client=data_client) assert res == expected diff --git a/vega_sim/api/data.py b/vega_sim/api/data.py index 27a5f56c2..5f833c4a9 100644 --- a/vega_sim/api/data.py +++ b/vega_sim/api/data.py @@ -176,6 +176,55 @@ class Trade: seller_auction_batch: int +@dataclass(frozen=True) +class MarketData: + mark_price: float + best_bid_price: float + best_bid_volume: float + best_offer_price: float + best_offer_volume: float + best_static_bid_price: float + best_static_bid_volume: float + best_static_offer_price: float + best_static_offer_volume: float + mid_price: float + static_mid_price: float + market_id: str + timestamp: str + open_interest: float + auction_end: str + auction_start: str + indicative_price: float + indicative_volume: float + market_trading_mode: str + trigger: str + extension_trigger: str + target_stake: float + supplied_stake: float + market_value_proxy: float + price_monitoring_bounds: list + liquidity_provider_fee_share: list + market_state: str + next_mark_to_market: float + last_traded_price: float + + +@dataclass(frozen=True) +class PriceMonitoringBounds: + min_valid_price: float + max_valid_price: float + trigger: str + reference_price: float + + +@dataclass(frozen=True) +class LiquidityProviderFeeShare: + party: str + equity_like_share: float + average_entry_valuation: float + average_score: float + + def _ledger_entry_from_proto( ledger_entry, asset_decimals: int, @@ -417,6 +466,128 @@ def positions_by_market( return positions[market_id] +def _market_data_from_proto( + market_data: vega_protos.vega.MarketData, + decimal_spec: DecimalSpec, +): + return MarketData( + mark_price=num_from_padded_int( + market_data.mark_price, decimal_spec.price_decimals + ), + best_bid_price=num_from_padded_int( + market_data.best_bid_price, decimal_spec.price_decimals + ), + best_bid_volume=num_from_padded_int( + market_data.best_bid_volume, decimal_spec.position_decimals + ), + best_offer_price=num_from_padded_int( + market_data.best_offer_price, decimal_spec.price_decimals + ), + best_offer_volume=num_from_padded_int( + market_data.best_offer_volume, decimal_spec.position_decimals + ), + best_static_bid_price=num_from_padded_int( + market_data.best_static_bid_price, decimal_spec.price_decimals + ), + best_static_bid_volume=num_from_padded_int( + market_data.best_static_bid_price, decimal_spec.position_decimals + ), + best_static_offer_price=num_from_padded_int( + market_data.best_static_offer_price, decimal_spec.price_decimals + ), + best_static_offer_volume=num_from_padded_int( + market_data.best_static_offer_volume, decimal_spec.position_decimals + ), + mid_price=num_from_padded_int( + market_data.mid_price, decimal_spec.price_decimals + ), + static_mid_price=num_from_padded_int( + market_data.static_mid_price, decimal_spec.price_decimals + ), + market_id=market_data.market, + timestamp=market_data.timestamp, + open_interest=market_data.open_interest, + auction_end=market_data.auction_end, + auction_start=market_data.auction_start, + indicative_price=num_from_padded_int( + market_data.static_mid_price, decimal_spec.price_decimals + ), + indicative_volume=num_from_padded_int( + market_data.static_mid_price, decimal_spec.price_decimals + ), + market_trading_mode=market_data.market_trading_mode, + trigger=market_data.trigger, + extension_trigger=market_data.extension_trigger, + target_stake=num_from_padded_int( + market_data.target_stake, decimal_spec.asset_decimals + ), + supplied_stake=num_from_padded_int( + market_data.supplied_stake, decimal_spec.asset_decimals + ), + market_value_proxy=num_from_padded_int( + market_data.market_value_proxy, decimal_spec.asset_decimals + ), + price_monitoring_bounds=_price_monitoring_bounds_from_proto( + market_data.price_monitoring_bounds, decimal_spec.price_decimals + ), + liquidity_provider_fee_share=_liquidity_provider_fee_share_from_proto( + market_data.liquidity_provider_fee_share, + decimal_spec.asset_decimals, + ), + market_state=market_data.market_state, + next_mark_to_market=market_data.next_mark_to_market, + last_traded_price=num_from_padded_int( + market_data.last_traded_price, decimal_spec.price_decimals + ), + ) + + +def _price_monitoring_bounds_from_proto( + price_monitoring_bounds, + price_decimals: int, +) -> List[PriceMonitoringBounds]: + return [ + PriceMonitoringBounds( + min_valid_price=num_from_padded_int( + individual_bound.min_valid_price, + price_decimals, + ), + max_valid_price=num_from_padded_int( + individual_bound.max_valid_price, + price_decimals, + ), + trigger=individual_bound.trigger, + reference_price=num_from_padded_int( + individual_bound.reference_price, + price_decimals, + ), + ) + for individual_bound in price_monitoring_bounds + ] + + +def _liquidity_provider_fee_share_from_proto( + liquidity_provider_fee_share, + asset_decimals, +) -> List[PriceMonitoringBounds]: + return [ + LiquidityProviderFeeShare( + party=individual_liquidity_provider_fee_share.party, + equity_like_share=float( + individual_liquidity_provider_fee_share.equity_like_share + ), + average_entry_valuation=num_from_padded_int( + float(individual_liquidity_provider_fee_share.average_entry_valuation), + asset_decimals, + ), + average_score=float( + individual_liquidity_provider_fee_share.equity_like_share + ), + ) + for individual_liquidity_provider_fee_share in liquidity_provider_fee_share + ] + + def list_accounts( data_client: vac.VegaTradingDataClientV2, pub_key: Optional[str] = None, @@ -628,67 +799,6 @@ def get_asset_decimals( ).details.decimals -def best_prices( - market_id: str, - data_client: vac.VegaTradingDataClientV2, - price_decimals: Optional[int] = None, - market_data: Optional[vega_protos.vega.MarketData] = None, -) -> Tuple[float, float]: - """ - Output the best static bid price and best static ask price in current market. - """ - mkt_data = ( - market_data - if market_data is not None - else data_raw.market_data(market_id=market_id, data_client=data_client) - ) - mkt_price_dp = ( - price_decimals - if price_decimals is not None - else market_price_decimals(market_id=market_id, data_client=data_client) - ) - - return ( - num_from_padded_int(mkt_data.best_static_bid_price, mkt_price_dp), - num_from_padded_int(mkt_data.best_static_offer_price, mkt_price_dp), - ) - - -def price_bounds( - market_id: str, - data_client: vac.VegaTradingDataClientV2, - price_decimals: Optional[int] = None, - market_data: Optional[vega_protos.vega.MarketData] = None, -) -> Tuple[Optional[float], Optional[float]]: - """ - Output the tightest price bounds in current market. - """ - mkt_data = ( - market_data - if market_data is not None - else data_raw.market_data(market_id=market_id, data_client=data_client) - ) - mkt_price_dp = ( - price_decimals - if price_decimals is not None - else market_price_decimals(market_id=market_id, data_client=data_client) - ) - - lower_bounds = [ - price_monitoring_bound.min_valid_price - for price_monitoring_bound in mkt_data.price_monitoring_bounds - ] - upper_bounds = [ - price_monitoring_bound.max_valid_price - for price_monitoring_bound in mkt_data.price_monitoring_bounds - ] - - return ( - num_from_padded_int(max(lower_bounds), mkt_price_dp) if lower_bounds else None, - num_from_padded_int(min(upper_bounds), mkt_price_dp) if upper_bounds else None, - ) - - def open_orders_by_market( market_id: str, data_client: vac.VegaTradingDataClientV2, @@ -717,8 +827,10 @@ def open_orders_by_market( open_only=True, ) for order in orders: - bids.append(order) if order.side == vega_protos.vega.SIDE_BUY else asks.append( - order + ( + bids.append(order) + if order.side == vega_protos.vega.SIDE_BUY + else asks.append(order) ) return OrdersBySide(bids, asks) @@ -1114,8 +1226,12 @@ def _stream_handler( event = extraction_fn(stream_item) - market_id = getattr(event, "market_id", None) - asset_decimals = asset_dp.get(getattr(event, "asset", mkt_to_asset.get(market_id))) + market_id = getattr(event, "market_id", getattr(event, "market", None)) + asset_decimals = asset_dp.get( + getattr( + event, "asset", mkt_to_asset[market_id] if market_id is not None else None + ) + ) return conversion_fn( event, @@ -1127,50 +1243,6 @@ def _stream_handler( ) -def get_liquidity_fee_shares( - data_client: vac.VegaTradingDataClientV2, - market_id: str, - party_id: Optional[str] = None, - market_data: Optional[vega_protos.vega.MarketData] = None, -) -> Union[Dict, float]: - """Gets the current liquidity fee share for each party or a specified party. - - Args: - data_client (vac.VegaTradingDataClientV2): - An instantiated gRPC data client - market_id (str): - Id of market to get liquidity fee shares from. - party_id (Optional[str], optional): - Id of party to get liquidity fee shares for. Defaults to None. - market_data (Optional[vega_protos.markets.MarketData]): - Market data to use. If not passed, loads from data node - """ - - market_data = ( - market_data - if market_data is not None - else data_raw.market_data(data_client=data_client, market_id=market_id) - ) - - # Calculate share of fees for each LP - shares = { - lp.party: float(lp.equity_like_share) * float(lp.average_score) - for lp in market_data.liquidity_provider_fee_share - } - total_shares = sum(shares.values()) - - # Scale share of fees for each LP pro rata - if total_shares != 0: - pro_rata_shares = {key: val / total_shares for key, val in shares.items()} - else: - pro_rata_shares = {key: 1 / len(shares) for key, val in shares.items()} - - if party_id is None: - return pro_rata_shares - else: - return pro_rata_shares[party_id] - - def list_ledger_entries( data_client: vac.VegaTradingDataClientV2, close_on_account_filters: bool = False, @@ -1350,6 +1422,74 @@ def ledger_entries_subscription_handler( return ledger_entries +def market_data_subscription_handler( + stream_item: vega_protos.api.v1.core.ObserveEventBusResponse, + mkt_pos_dp: Optional[Dict[str, int]] = None, + mkt_price_dp: Optional[Dict[str, int]] = None, + mkt_to_asset: Optional[Dict[str, str]] = None, + asset_dp: Optional[Dict[str, int]] = None, +): + return _stream_handler( + stream_item=stream_item, + extraction_fn=lambda evt: evt.market_data, + conversion_fn=_market_data_from_proto, + mkt_pos_dp=mkt_pos_dp, + mkt_price_dp=mkt_price_dp, + mkt_to_asset=mkt_to_asset, + asset_dp=asset_dp, + ) + + +def get_latest_market_data( + market_id: str, + data_client: vac.VegaTradingDataClientV2, + market_price_decimals_map: Optional[Dict[str, int]] = None, + market_position_decimals_map: Optional[Dict[str, int]] = None, + market_to_asset_map: Optional[Dict[str, str]] = None, + asset_decimals_map: Optional[Dict[str, int]] = None, +) -> MarketData: + # Get latest market data + market_data = data_raw.get_latest_market_data( + market_id=market_id, data_client=data_client + ) + + market_price_decimals_map = ( + market_price_decimals_map if market_price_decimals_map is not None else {} + ) + market_position_decimals_map = ( + market_position_decimals_map if market_position_decimals_map is not None else {} + ) + market_to_asset_map = market_to_asset_map if market_to_asset_map is not None else {} + asset_decimals_map = asset_decimals_map if asset_decimals_map is not None else {} + + if market_id not in market_price_decimals_map: + market_price_decimals_map[market_id] = market_price_decimals( + market_id=market_id, data_client=data_client + ) + if market_id not in market_position_decimals_map: + market_position_decimals_map[market_id] = market_position_decimals( + market_id=market_id, data_client=data_client + ) + if market_id not in market_to_asset_map: + market_to_asset_map[market_id] = data_raw.market_info( + market_id=market_id, data_client=data_client + ).tradable_instrument.instrument.future.settlement_asset + if market_to_asset_map[market_id] not in asset_decimals_map: + asset_decimals_map[market_to_asset_map[market_id]] = get_asset_decimals( + asset_id=market_to_asset_map[market_id], + data_client=data_client, + ) + # Convert from proto + return _market_data_from_proto( + market_data=market_data, + decimal_spec=DecimalSpec( + price_decimals=market_price_decimals_map[market_data.market], + position_decimals=market_position_decimals_map[market_data.market], + asset_decimals=asset_decimals_map[market_to_asset_map[market_data.market]], + ), + ) + + def get_risk_factors( data_client: vac.VegaTradingDataClientV2, market_id: str, diff --git a/vega_sim/api/data_raw.py b/vega_sim/api/data_raw.py index 0c93f023a..c81a3e3e1 100644 --- a/vega_sim/api/data_raw.py +++ b/vega_sim/api/data_raw.py @@ -155,7 +155,7 @@ def market_accounts( ) -def market_data( +def get_latest_market_data( market_id: str, data_client: vac.VegaTradingDataClientV2, ) -> vega_protos.vega.MarketData: @@ -522,9 +522,9 @@ def list_ledger_entries( asset_id=asset_id, party_ids=from_party_ids if from_party_ids is not None else [], market_ids=from_market_ids if from_market_ids is not None else [], - account_types=from_account_types - if from_account_types is not None - else [], + account_types=( + from_account_types if from_account_types is not None else [] + ), ), to_account_filter=data_node_protos_v2.trading_data.AccountFilter( asset_id=asset_id, @@ -538,12 +538,14 @@ def list_ledger_entries( if from_datetime is not None or to_datetime is not None: base_request.date_range.CopyFrom( data_node_protos_v2.trading_data.DateRange( - start_timestamp=from_datetime.timestamp() * 1e9 - if from_datetime is not None - else None, - end_timestamp=to_datetime.timestamp() * 1e9 - if to_datetime is not None - else None, + start_timestamp=( + from_datetime.timestamp() * 1e9 + if from_datetime is not None + else None + ), + end_timestamp=( + to_datetime.timestamp() * 1e9 if to_datetime is not None else None + ), ) ) diff --git a/vega_sim/environment/environment.py b/vega_sim/environment/environment.py index 9223c7ded..74671f911 100644 --- a/vega_sim/environment/environment.py +++ b/vega_sim/environment/environment.py @@ -346,18 +346,15 @@ def _default_state_extraction(self, vega: VegaService) -> VegaState: self.market_decimals_cache[market_id] = vega.market_info( market_id=market_id ).decimal_places - market_state[market_data.market] = MarketState( + market_state[market_id] = MarketState( state=market_data.market_state, trading_mode=market_data.market_trading_mode, - midprice=float(market_data.mid_price) - / 10 ** int(self.market_decimals_cache[market_id]), - best_bid_price=float(market_data.best_bid_price) - / 10 ** self.market_decimals_cache[market_id], - best_ask_price=float(market_data.best_offer_price) - / 10 ** self.market_decimals_cache[market_id], + midprice=market_data.mid_price, + best_bid_price=market_data.best_bid_price, + best_ask_price=market_data.best_offer_price, min_valid_price=vega.price_bounds(market_id=market_id)[0], max_valid_price=vega.price_bounds(market_id=market_id)[1], - orders=order_status.get(market_data.market, {}), + orders=order_status.get(market_id, {}), ) return VegaState(network_state=(), market_state=market_state) diff --git a/vega_sim/local_data_cache.py b/vega_sim/local_data_cache.py index 2b51a9904..635eb8d08 100644 --- a/vega_sim/local_data_cache.py +++ b/vega_sim/local_data_cache.py @@ -69,7 +69,7 @@ def _queue_forwarder( else: sink.put(output) except Exception: - logger.debug("Data cache event bus closed") + logger.info("Data cache event bus closed") class DecimalsCache(defaultdict): @@ -139,7 +139,13 @@ def __init__( ), ( (events_protos.BUS_EVENT_TYPE_MARKET_DATA,), - lambda evt: evt.market_data, + lambda evt: data.market_data_subscription_handler( + evt, + self._market_pos_decimals, + self._market_price_decimals, + self._market_to_asset, + self._asset_decimals, + ), ), ] self._high_load_stream_registry = [ @@ -256,12 +262,16 @@ def start_live_feeds( self.stream_registry + (self._high_load_stream_registry if start_high_load_feeds else []), self._aggregated_observation_feed, - (market_ids[0] if len(market_ids) == 1 else None) - if market_ids is not None - else None, - (party_ids[0] if len(party_ids) == 1 else None) - if party_ids is not None - else None, + ( + (market_ids[0] if len(market_ids) == 1 else None) + if market_ids is not None + else None + ), + ( + (party_ids[0] if len(party_ids) == 1 else None) + if party_ids is not None + else None + ), ), daemon=True, ) @@ -285,12 +295,16 @@ def initialise_order_monitoring( market_id=market_party_tuple[0], party_id=market_party_tuple[1], live_only=True, - price_decimals=self._market_price_decimals[market_party_tuple[0]] - if market_party_tuple[0] is not None - else None, - position_decimals=self._market_pos_decimals[market_party_tuple[0]] - if market_party_tuple[0] is not None - else None, + price_decimals=( + self._market_price_decimals[market_party_tuple[0]] + if market_party_tuple[0] is not None + else None + ), + position_decimals=( + self._market_pos_decimals[market_party_tuple[0]] + if market_party_tuple[0] is not None + else None + ), ) ) @@ -310,8 +324,15 @@ def initialise_market_data( ] with self.market_data_lock: for market_id in market_ids: - self.market_data_from_feed_store[market_id] = data_raw.market_data( - market_id, data_client=self._trading_data_client + self.market_data_from_feed_store[ + market_id + ] = data.get_latest_market_data( + market_id, + data_client=self._trading_data_client, + market_price_decimals_map=self._market_price_decimals, + market_position_decimals_map=self._market_pos_decimals, + asset_decimals_map=self._asset_decimals, + market_to_asset_map=self._market_to_asset, ) def initialise_transfer_monitoring( @@ -374,9 +395,9 @@ def _monitor_stream(self) -> None: with self.trades_lock: self._trades_from_feed.append(update) - elif isinstance(update, vega_protos.vega.MarketData): + elif isinstance(update, data.MarketData): with self.market_data_lock: - self.market_data_from_feed_store[update.market] = update + self.market_data_from_feed_store[update.market_id] = update elif isinstance(update, data.LedgerEntry): with self.ledger_entries_lock: diff --git a/vega_sim/network_service.py b/vega_sim/network_service.py index fb6c28692..fb81a57f6 100644 --- a/vega_sim/network_service.py +++ b/vega_sim/network_service.py @@ -537,7 +537,7 @@ def switch_datanode(self, max_attempts: Optional[int] = -1): logging.info(markets) # Show data for a specific market - market = vega.market_data(market_id=markets[0].id) + market = vega.get_latest_market_data(market_id=markets[0].id) # Create a service connected to the stagnet3 network. with VegaServiceNetwork( @@ -550,4 +550,4 @@ def switch_datanode(self, max_attempts: Optional[int] = -1): logging.info(markets) # Show data for a specific market - market = vega.market_data(market_id=markets[0].id) + market = vega.get_latest_market_data(market_id=markets[0].id) diff --git a/vega_sim/parameter_test/parameter/loggers.py b/vega_sim/parameter_test/parameter/loggers.py index 8997fc8ac..b85778d08 100644 --- a/vega_sim/parameter_test/parameter/loggers.py +++ b/vega_sim/parameter_test/parameter/loggers.py @@ -133,7 +133,7 @@ def _ideal_market_maker_single_data_extraction( entry_price = float(position.average_entry_price) / 10**mm_agent.mdp market_state = vega.market_info(market_id=mm_agent.market_id).state - market_data = vega.market_data(market_id=mm_agent.market_id) + market_data = vega.get_latest_market_data(market_id=mm_agent.market_id) markprice = float(market_data.mark_price) / 10**mm_agent.mdp mid_price = float(market_data.mid_price) / 10**mm_agent.mdp trading_mode = market_data.market_trading_mode @@ -259,7 +259,7 @@ def target_stake_additional_data( (OptimalMarketMakerV2, OptimalMarketMaker, ExponentialShapedMarketMaker), ) ][0] - market_data = vega.market_data(market_id=mm_agent.market_id) + market_data = vega.get_latest_market_data(market_id=mm_agent.market_id) scaling = 1 / 10 ** mm_agent.adp if hasattr(mm_agent, "adp") else 1 return { @@ -279,7 +279,7 @@ def tau_scaling_additional_data( (OptimalMarketMakerV2, OptimalMarketMaker, ExponentialShapedMarketMaker), ) ][0] - market_data = vega.market_data(market_id=mm_agent.market_id) + market_data = vega.get_latest_market_data(market_id=mm_agent.market_id) market_info = vega.market_info(market_id=mm_agent.market_id) return { @@ -361,9 +361,9 @@ def momentum_trader_data_extraction( inventory = float(position.open_volume) entry_price = float(position.average_entry_price) / 10**trader.mdp - market_data = vega.market_data(market_id=trader.market_id) - markprice = float(market_data.mark_price) / 10**trader.mdp - mid_price = float(market_data.mid_price) / 10**trader.mdp + market_data = vega.get_latest_market_data(market_id=trader.market_id) + markprice = market_data.mark_price + mid_price = market_data.mid_price trading_mode = market_data.market_trading_mode market_state = vega.market_info(market_id=trader.market_id).state diff --git a/vega_sim/scenario/common/agents.py b/vega_sim/scenario/common/agents.py index 66cce2eb4..5e79edd19 100644 --- a/vega_sim/scenario/common/agents.py +++ b/vega_sim/scenario/common/agents.py @@ -278,24 +278,30 @@ def step(self, vega_state: VegaState): ) def place_order(self, vega_state: VegaState, volume: float, side: vega_protos.Side): - if ( - ( - vega_state.market_state[self.market_id].trading_mode - == markets_protos.Market.TradingMode.TRADING_MODE_CONTINUOUS - ) - and vega_state.market_state[self.market_id].state - == markets_protos.Market.State.STATE_ACTIVE - and volume != 0 - ): - self.vega.submit_market_order( - trading_key=self.key_name, - market_id=self.market_id, - side=side, - volume=volume, - wait=False, - fill_or_kill=False, - trading_wallet=self.wallet_name, - ) + try: + if ( + ( + vega_state.market_state[self.market_id].trading_mode + == markets_protos.Market.TradingMode.TRADING_MODE_CONTINUOUS + ) + and vega_state.market_state[self.market_id].state + == markets_protos.Market.State.STATE_ACTIVE + and volume != 0 + ): + self.vega.submit_market_order( + trading_key=self.key_name, + market_id=self.market_id, + side=side, + volume=volume, + wait=False, + fill_or_kill=False, + trading_wallet=self.wallet_name, + ) + except: + import pdb + + pdb.set_trace() + a = 4 class PriceSensitiveLimitOrderTrader(StateAgentWithWallet): @@ -1317,9 +1323,11 @@ def _move_side( market_id=self.market_id, order_id=order_to_amend.id, price=order.price, - time_in_force="TIME_IN_FORCE_GTT" - if self.order_validity_length is not None - else "TIME_IN_FORCE_GTC", + time_in_force=( + "TIME_IN_FORCE_GTT" + if self.order_validity_length is not None + else "TIME_IN_FORCE_GTC" + ), size_delta=order.size - order_to_amend.remaining, expires_at=expires_at, ) @@ -1332,9 +1340,11 @@ def _move_side( price=order.price, size=order.size, order_type="TYPE_LIMIT", - time_in_force="TIME_IN_FORCE_GTT" - if self.order_validity_length is not None - else "TIME_IN_FORCE_GTC", + time_in_force=( + "TIME_IN_FORCE_GTT" + if self.order_validity_length is not None + else "TIME_IN_FORCE_GTC" + ), side=side, expires_at=expires_at, ) diff --git a/vega_sim/service.py b/vega_sim/service.py index 7064698ce..1b1340e2a 100644 --- a/vega_sim/service.py +++ b/vega_sim/service.py @@ -10,7 +10,7 @@ from functools import wraps from queue import Queue, Empty from itertools import product -from typing import Any, Dict, Generator, List, Optional, Set, Tuple, Union +from typing import Any, Dict, Generator, List, Optional, Set, Tuple, Union, Callable import grpc @@ -147,12 +147,18 @@ def __init__( ) self.seconds_per_block = seconds_per_block + def _after_total_catchup(self, fn: Callable, *args, **kwargs): + self.wait_for_total_catchup() + return fn(*args, **kwargs) + @property def market_price_decimals(self) -> int: if self._market_price_decimals is None: self._market_price_decimals = DecimalsCache( - lambda market_id: data.market_price_decimals( - market_id=market_id, data_client=self.trading_data_client_v2 + lambda market_id: self._after_total_catchup( + data.market_price_decimals, + market_id=market_id, + data_client=self.trading_data_client_v2, ) ) return self._market_price_decimals @@ -161,8 +167,10 @@ def market_price_decimals(self) -> int: def market_pos_decimals(self) -> int: if self._market_pos_decimals is None: self._market_pos_decimals = DecimalsCache( - lambda market_id: data.market_position_decimals( - market_id=market_id, data_client=self.trading_data_client_v2 + lambda market_id: self._after_total_catchup( + data.market_position_decimals, + market_id=market_id, + data_client=self.trading_data_client_v2, ) ) return self._market_pos_decimals @@ -171,8 +179,10 @@ def market_pos_decimals(self) -> int: def asset_decimals(self) -> int: if self._asset_decimals is None: self._asset_decimals = DecimalsCache( - lambda asset_id: data.get_asset_decimals( - asset_id=asset_id, data_client=self.trading_data_client_v2 + lambda asset_id: self._after_total_catchup( + data.get_asset_decimals, + asset_id=asset_id, + data_client=self.trading_data_client_v2, ) ) return self._asset_decimals @@ -181,8 +191,10 @@ def asset_decimals(self) -> int: def market_to_asset(self) -> str: if self._market_to_asset is None: self._market_to_asset = DecimalsCache( - lambda market_id: data_raw.market_info( - market_id=market_id, data_client=self.trading_data_client_v2 + lambda market_id: self._after_total_catchup( + data_raw.market_info, + market_id=market_id, + data_client=self.trading_data_client_v2, ).tradable_instrument.instrument.future.settlement_asset ) return self._market_to_asset @@ -1097,18 +1109,6 @@ def market_data_from_feed( """ return self.data_cache.market_data_from_feed(market_id) - @raw_data - def market_data( - self, - market_id: str, - ) -> vega_protos.vega.MarketData: - """ - Output market info. - """ - return data_raw.market_data( - market_id=market_id, data_client=self.trading_data_client_v2 - ) - @raw_data def infrastructure_fee_accounts( self, @@ -1136,6 +1136,22 @@ def market_accounts( data_client=self.trading_data_client_v2, ) + def get_latest_market_data( + self, + market_id: str, + ) -> vega_protos.vega.MarketData: + """ + Output market info. + """ + return data.get_latest_market_data( + market_id=market_id, + data_client=self.trading_data_client_v2, + market_price_decimals_map=self.market_price_decimals, + market_position_decimals_map=self.market_pos_decimals, + market_to_asset_map=self.market_to_asset, + asset_decimals_map=self.asset_decimals, + ) + def market_account( self, market_id: str, @@ -1166,11 +1182,13 @@ def best_prices( """ Output the best static bid price and best static ask price in current market. """ - return data.best_prices( + market_data = self.get_latest_market_data( market_id=market_id, - data_client=self.trading_data_client_v2, - market_data=self.data_cache.market_data_from_feed(market_id=market_id), - price_decimals=self.market_price_decimals[market_id], + ) + + return ( + market_data.best_static_bid_price, + market_data.best_static_offer_price, ) def price_bounds( @@ -1180,11 +1198,22 @@ def price_bounds( """ Output the tightest price bounds in the current market. """ - return data.price_bounds( + market_data = self.get_latest_market_data( market_id=market_id, - data_client=self.trading_data_client_v2, - market_data=self.data_cache.market_data_from_feed(market_id=market_id), - price_decimals=self.market_price_decimals[market_id], + ) + + lower_bounds = [ + price_monitoring_bound.min_valid_price + for price_monitoring_bound in market_data.price_monitoring_bounds + ] + upper_bounds = [ + price_monitoring_bound.max_valid_price + for price_monitoring_bound in market_data.price_monitoring_bounds + ] + + return ( + max(lower_bounds) if lower_bounds else None, + min(upper_bounds) if upper_bounds else None, ) def order_book_by_market( @@ -2109,16 +2138,27 @@ def get_liquidity_fee_shares( Name of specific key in wallet to get public key for. Defaults to None. """ - return data.get_liquidity_fee_shares( - data_client=self.trading_data_client_v2, - market_id=market_id, - party_id=( - self.wallet.public_key(wallet_name=wallet_name, name=key_name) - if wallet_name is not None - else None - ), - market_data=self.market_data_from_feed(market_id=market_id), - ) + market_data = self.get_latest_market_data(market_id=market_id) + + # Calculate share of fees for each LP + shares = { + lp.party: float(lp.equity_like_share) * float(lp.average_score) + for lp in market_data.liquidity_provider_fee_share + } + total_shares = sum(shares.values()) + + # Scale share of fees for each LP pro rata + if total_shares != 0: + pro_rata_shares = {key: val / total_shares for key, val in shares.items()} + else: + pro_rata_shares = {key: 1 / len(shares) for key, val in shares.items()} + + if key_name is None: + return pro_rata_shares + else: + return pro_rata_shares[ + self.wallet.public_key(name=key_name, wallet_name=wallet_name) + ] def list_ledger_entries( self, diff --git a/vega_sim/tools/scenario_output.py b/vega_sim/tools/scenario_output.py index e9d50d47b..25f10d066 100644 --- a/vega_sim/tools/scenario_output.py +++ b/vega_sim/tools/scenario_output.py @@ -22,6 +22,7 @@ def history_data_to_row(data: MarketHistoryData) -> List[pd.Series]: results.append( { "time": data.at_time, + "mark_price": market_data.mark_price, "market_id": market_id, "open_interest": market_data.open_interest, "best_bid": market_data.best_bid_price,