From e7fe9b173f0e3d6c7bd28107684a2390b60c8b56 Mon Sep 17 00:00:00 2001 From: Steven Ewald Date: Mon, 14 Oct 2024 15:26:34 -0500 Subject: [PATCH] Separate match and account update messages --- .../messages/messages_exchange_to_wrapper.hpp | 42 ++++------- .../src/exchange/algos/dev_mode/dev_mode.cpp | 3 +- .../algos/normal_mode/normal_mode.cpp | 3 +- exchange/src/exchange/matching/engine.cpp | 17 +++-- exchange/src/exchange/matching/engine.hpp | 3 +- exchange/src/exchange/matching/order_pair.hpp | 29 +++----- .../matching_cycle/base/base_cycle.cpp | 64 +++++++++++++++-- .../matching_cycle/base/base_cycle.hpp | 13 +++- .../matching_cycle/cycle_interface.hpp | 4 +- .../exchange/matching_cycle/dev/dev_cycle.hpp | 4 +- .../src/exchange/metrics/on_tick_metrics.cpp | 14 ++-- .../src/exchange/metrics/on_tick_metrics.hpp | 2 +- .../exchange/orders/storage/order_storage.hpp | 16 +++++ exchange/src/exchange/sandbox_server/crow.cpp | 6 +- .../traders/trader_types/bot_trader.cpp | 2 - .../wrappers/creation/rmq_wrapper_init.cpp | 13 ++-- .../wrappers/creation/rmq_wrapper_init.hpp | 9 ++- exchange/src/wrapper/main.cpp | 3 +- .../messaging/exchange_communicator.cpp | 69 ++++++++++++------- .../messaging/exchange_communicator.hpp | 45 ++++++------ exchange/src/wrapper/runtime/runtime.cpp | 49 +++++++------ exchange/src/wrapper/runtime/runtime.hpp | 6 +- exchange/test/src/integration/tests/basic.cpp | 3 +- .../test/src/unit/matching/basic_matching.cpp | 4 +- .../test/src/unit/matching/invalid_orders.cpp | 2 +- .../test/src/unit/matching/many_orders.cpp | 2 +- exchange/test/src/unit/matching/match_ioc.cpp | 6 +- .../test/src/unit/matching/match_market.cpp | 2 +- .../src/unit/matching/order_fee_matching.cpp | 4 +- exchange/test/src/util/helpers/test_cycle.cpp | 2 +- exchange/test/src/util/helpers/test_cycle.hpp | 4 +- exchange/test/src/util/macros.cpp | 8 +-- exchange/test/src/util/macros.hpp | 16 ++--- 33 files changed, 278 insertions(+), 191 deletions(-) diff --git a/exchange/src/common/types/messages/messages_exchange_to_wrapper.hpp b/exchange/src/common/types/messages/messages_exchange_to_wrapper.hpp index d7e23c13..c67e4265 100644 --- a/exchange/src/common/types/messages/messages_exchange_to_wrapper.hpp +++ b/exchange/src/common/types/messages/messages_exchange_to_wrapper.hpp @@ -7,43 +7,33 @@ #include #include +#include + namespace nutc::common { struct start_time { - int64_t start_time_ns; + std::int64_t start_time_ns; start_time() = default; - explicit start_time(int64_t stns) : start_time_ns(stns) {} + explicit start_time(std::chrono::high_resolution_clock::time_point stns) : + start_time_ns(stns.time_since_epoch().count()) + {} }; -struct match { - common::position position; - std::string buyer_id; - std::string seller_id; - common::decimal_price buyer_capital; - common::decimal_price seller_capital; - std::string match_type{}; - - match() = default; - - match( - const common::position& position, std::string bid, std::string sid, - common::decimal_price bcap, common::decimal_price scap - ) : - position(position), buyer_id(std::move(bid)), seller_id(std::move(sid)), - buyer_capital(bcap), seller_capital(scap) - {} +struct account_update { + common::position trade; + common::decimal_price available_capital; }; struct tick_update { std::vector ob_updates; - std::vector matches; + std::vector matches; tick_update() = default; explicit tick_update( - std::vector ob_updates, std::vector matches + std::vector ob_updates, std::vector matches ) : ob_updates(std::move(ob_updates)), matches(std::move(matches)) {} }; @@ -67,12 +57,10 @@ struct glz::meta { }; template <> -struct glz::meta { - using t = nutc::common::match; - static constexpr auto value = object( - "match", &t::position, &t::buyer_id, &t::seller_id, &t::buyer_capital, - &t::seller_capital - ); +struct glz::meta { + using t = nutc::common::account_update; + static constexpr auto value = + object("account_update", &t::trade, &t::available_capital); }; template <> diff --git a/exchange/src/exchange/algos/dev_mode/dev_mode.cpp b/exchange/src/exchange/algos/dev_mode/dev_mode.cpp index cd038513..f87632e6 100644 --- a/exchange/src/exchange/algos/dev_mode/dev_mode.cpp +++ b/exchange/src/exchange/algos/dev_mode/dev_mode.cpp @@ -18,10 +18,11 @@ DevModeAlgoInitializer::initialize_trader_container( traders.add_trader(algo, start_capital); } - int64_t start_time = get_start_time(WAIT_SECS); + auto start_time = get_start_time(WAIT_SECS); std::for_each(traders.begin(), traders.end(), [start_time](auto& trader) { send_start_time(trader, start_time); }); + std::this_thread::sleep_until(start_time); } void diff --git a/exchange/src/exchange/algos/normal_mode/normal_mode.cpp b/exchange/src/exchange/algos/normal_mode/normal_mode.cpp index 84700d2c..9fc90023 100644 --- a/exchange/src/exchange/algos/normal_mode/normal_mode.cpp +++ b/exchange/src/exchange/algos/normal_mode/normal_mode.cpp @@ -48,10 +48,11 @@ NormalModeAlgoInitializer::initialize_trader_container( } } - int64_t start_time = get_start_time(WAIT_SECS); + auto start_time = get_start_time(WAIT_SECS); std::for_each(traders.begin(), traders.end(), [start_time](auto& trader) { send_start_time(trader, start_time); }); + std::this_thread::sleep_until(start_time); } glz::json_t::object_t diff --git a/exchange/src/exchange/matching/engine.cpp b/exchange/src/exchange/matching/engine.cpp index cfa679e4..def98ad8 100644 --- a/exchange/src/exchange/matching/engine.cpp +++ b/exchange/src/exchange/matching/engine.cpp @@ -9,10 +9,9 @@ namespace nutc::exchange { enum class MatchFailure { buyer_failure, seller_failure, done_matching }; -using match = nutc::common::match; template -glz::expected +glz::expected match_orders_( OrderPairT& orders, CompositeOrderBook& orderbook, common::decimal_price order_fee ) @@ -38,7 +37,7 @@ match_orders_( } template -glz::expected +glz::expected match_incoming_order_( OrderT& aggressive_order, LimitOrderBook::stored_limit_order passive_order, CompositeOrderBook& orderbook, common::decimal_price order_fee @@ -73,7 +72,7 @@ total_order_cost_( } template -glz::expected +glz::expected attempt_match_(OrderPairT& orders, common::decimal_price order_fee) { auto price_opt = orders.potential_match_price(); @@ -106,7 +105,7 @@ attempt_match_(OrderPairT& orders, common::decimal_price order_fee) } template -glz::expected +glz::expected match_incoming_order_( OrderT& aggressive_order, CompositeOrderBook& orderbook, common::decimal_price order_fee @@ -130,12 +129,12 @@ match_incoming_order_( } template -std::vector +std::vector match_order( OrderT order, CompositeOrderBook& orderbook, common::decimal_price order_fee ) { - std::vector matches; + std::vector matches; while (order.quantity != 0.0) { auto match_opt = match_incoming_order_(order, orderbook, order_fee); @@ -154,9 +153,9 @@ match_order( return matches; } -template std::vector +template std::vector match_order<>(tagged_limit_order, CompositeOrderBook&, common::decimal_price); -template std::vector +template std::vector match_order<>(tagged_market_order, CompositeOrderBook&, common::decimal_price); } // namespace nutc::exchange diff --git a/exchange/src/exchange/matching/engine.hpp b/exchange/src/exchange/matching/engine.hpp index b0cacc64..bc7bb632 100644 --- a/exchange/src/exchange/matching/engine.hpp +++ b/exchange/src/exchange/matching/engine.hpp @@ -1,6 +1,5 @@ #pragma once -#include "common/types/messages/messages_exchange_to_wrapper.hpp" #include "exchange/orders/orderbook/composite_orderbook.hpp" #include @@ -10,7 +9,7 @@ namespace nutc::exchange { template -std::vector match_order( +std::vector match_order( OrderT order, CompositeOrderBook& orderbook, common::decimal_price order_fee = 0.0 ); diff --git a/exchange/src/exchange/matching/order_pair.hpp b/exchange/src/exchange/matching/order_pair.hpp index a76d5538..c40b913b 100644 --- a/exchange/src/exchange/matching/order_pair.hpp +++ b/exchange/src/exchange/matching/order_pair.hpp @@ -101,24 +101,13 @@ class OrderPair { } template - common::match + tagged_match create_match(common::decimal_quantity quantity, common::decimal_price price) const { auto& buyer = get_underlying_order(); auto& seller = get_underlying_order(); common::position position{buyer.ticker, AggressiveSide, quantity, price}; - // TODO: match_type is pretty bad, we should have a better way of tracking this. - // It's only used for metrics - std::string match_type = - fmt::format("{}->{}", seller.trader->get_type(), buyer.trader->get_type()); - // TODO: can just use TraderPortfolio instead of entire trader - common::match match{ - position, buyer.trader->get_id(), seller.trader->get_id(), - buyer.trader->get_portfolio().get_capital(), - seller.trader->get_portfolio().get_capital() - }; - match.match_type = match_type; - return match; + return {buyer.trader, seller.trader, position}; } common::decimal_quantity @@ -131,21 +120,21 @@ class OrderPair { void handle_match( - const common::match& match, common::decimal_price order_fee, + const tagged_match& match, common::decimal_price order_fee, CompositeOrderBook& orderbook ) { get_underlying_order().trader->get_portfolio().notify_match( - {match.position.ticker, common::Side::buy, match.position.quantity, - match.position.price * (common::decimal_price{1.0} + order_fee)} + {match.ticker, common::Side::buy, match.quantity, + match.price * (common::decimal_price{1.0} + order_fee)} ); get_underlying_order().trader->get_portfolio().notify_match( - {match.position.ticker, common::Side::sell, match.position.quantity, - match.position.price * (common::decimal_price{1.0} - order_fee)} + {match.ticker, common::Side::sell, match.quantity, + match.price * (common::decimal_price{1.0} - order_fee)} ); - change_order_quantity(seller, -match.position.quantity, orderbook); - change_order_quantity(buyer, -match.position.quantity, orderbook); + change_order_quantity(seller, -match.quantity, orderbook); + change_order_quantity(buyer, -match.quantity, orderbook); } private: diff --git a/exchange/src/exchange/matching_cycle/base/base_cycle.cpp b/exchange/src/exchange/matching_cycle/base/base_cycle.cpp index a001c428..41da2477 100644 --- a/exchange/src/exchange/matching_cycle/base/base_cycle.cpp +++ b/exchange/src/exchange/matching_cycle/base/base_cycle.cpp @@ -52,10 +52,10 @@ BaseMatchingCycle::collect_orders(uint64_t) -> std::vector return orders; } -std::vector +std::vector BaseMatchingCycle::match_orders_(std::vector orders) { - std::vector matches; + std::vector matches; auto match_incoming_order = [&](OrderT& order) { auto& ticker_data = tickers_[order.ticker]; @@ -83,20 +83,63 @@ BaseMatchingCycle::match_orders_(std::vector orders) return matches; } -void -BaseMatchingCycle::handle_matches_(std::vector matches) +std::vector +BaseMatchingCycle::get_orderbook_updates_() { - std::vector ob_updates{}; + std::vector ob_updates; for (auto [symbol, info] : tickers_) { auto tmp = info.get_orderbook().get_and_reset_updates(); std::ranges::copy(tmp, std::back_inserter(ob_updates)); } + return ob_updates; +} + +std::vector +BaseMatchingCycle::tagged_matches_to_positions(const std::vector& matches) +{ + std::vector untagged_matches(matches.size()); + std::transform( + matches.begin(), matches.end(), untagged_matches.begin(), + [](const tagged_match& match) { return match; } + ); + return untagged_matches; +} - if (ob_updates.empty() && matches.empty()) +void +BaseMatchingCycle::send_account_updates(const std::vector& matches) +{ + std::for_each(matches.begin(), matches.end(), [](const tagged_match& match) { + common::position trade{match.ticker, match.side, match.quantity, match.price}; + common::account_update buyer_update{ + trade, match.buyer->get_portfolio().get_capital() + }; + common::account_update seller_update{ + trade, match.seller->get_portfolio().get_capital() + }; + auto buyer_update_str = glz::write_json(buyer_update); + if (!buyer_update_str.has_value()) [[unlikely]] { + throw std::runtime_error(glz::format_error(buyer_update_str.error())); + } + auto seller_update_str = glz::write_json(seller_update); + if (!seller_update_str.has_value()) [[unlikely]] { + throw std::runtime_error(glz::format_error(seller_update_str.error())); + } + match.buyer->send_message(buyer_update_str.value()); + match.seller->send_message(seller_update_str.value()); + }); +} + +void +BaseMatchingCycle::send_market_updates_(const std::vector& matches) +{ + std::vector orderbook_updates = get_orderbook_updates_(); + std::vector trade_updates = tagged_matches_to_positions(matches); + + if (orderbook_updates.empty() && trade_updates.empty()) [[unlikely]] return; - common::tick_update updates{ob_updates, matches}; + common::tick_update updates{orderbook_updates, trade_updates}; auto update = glz::write_json(updates); if (!update.has_value()) [[unlikely]] { throw std::runtime_error(glz::format_error(update.error())); @@ -108,4 +151,11 @@ BaseMatchingCycle::handle_matches_(std::vector matches) ); } +void +BaseMatchingCycle::handle_matches_(std::vector matches) +{ + send_market_updates_(matches); + send_account_updates(matches); +} + } // namespace nutc::exchange diff --git a/exchange/src/exchange/matching_cycle/base/base_cycle.hpp b/exchange/src/exchange/matching_cycle/base/base_cycle.hpp index 28573211..0097f9b5 100644 --- a/exchange/src/exchange/matching_cycle/base/base_cycle.hpp +++ b/exchange/src/exchange/matching_cycle/base/base_cycle.hpp @@ -41,13 +41,22 @@ class BaseMatchingCycle : public MatchingCycleInterface { std::vector collect_orders(uint64_t) override; - std::vector match_orders_(std::vector orders) override; + std::vector match_orders_(std::vector orders) override; - void handle_matches_(std::vector matches) override; + void handle_matches_(std::vector matches) override; void post_cycle_(uint64_t) override {} + +private: + std::vector get_orderbook_updates_(); + + static std::vector + tagged_matches_to_positions(const std::vector& matches); + + static void send_account_updates(const std::vector& matches); + void send_market_updates_(const std::vector& matches); }; } // namespace nutc::exchange diff --git a/exchange/src/exchange/matching_cycle/cycle_interface.hpp b/exchange/src/exchange/matching_cycle/cycle_interface.hpp index 25a0a120..25aac2c2 100644 --- a/exchange/src/exchange/matching_cycle/cycle_interface.hpp +++ b/exchange/src/exchange/matching_cycle/cycle_interface.hpp @@ -30,10 +30,10 @@ class MatchingCycleInterface { virtual std::vector collect_orders(uint64_t new_tick) = 0; - virtual std::vector match_orders_(std::vector orders + virtual std::vector match_orders_(std::vector orders ) = 0; - virtual void handle_matches_(std::vector matches) = 0; + virtual void handle_matches_(std::vector matches) = 0; virtual void post_cycle_(uint64_t new_tick) = 0; diff --git a/exchange/src/exchange/matching_cycle/dev/dev_cycle.hpp b/exchange/src/exchange/matching_cycle/dev/dev_cycle.hpp index 9652866d..2d8a9044 100644 --- a/exchange/src/exchange/matching_cycle/dev/dev_cycle.hpp +++ b/exchange/src/exchange/matching_cycle/dev/dev_cycle.hpp @@ -22,7 +22,7 @@ class DevMatchingCycle : public BaseMatchingCycle { {} protected: - std::vector + std::vector match_orders_(std::vector orders) override { // TODO: add back @@ -31,7 +31,7 @@ class DevMatchingCycle : public BaseMatchingCycle { } void - handle_matches_(std::vector matches) override + handle_matches_(std::vector matches) override { pusher.report_matches(matches); BaseMatchingCycle::handle_matches_(std::move(matches)); diff --git a/exchange/src/exchange/metrics/on_tick_metrics.cpp b/exchange/src/exchange/metrics/on_tick_metrics.cpp index 83b373e3..fa0a64e2 100644 --- a/exchange/src/exchange/metrics/on_tick_metrics.cpp +++ b/exchange/src/exchange/metrics/on_tick_metrics.cpp @@ -7,6 +7,8 @@ #include "exchange/traders/trader_container.hpp" #include "prometheus.hpp" +#include + #include namespace nutc::exchange { @@ -107,15 +109,17 @@ TickerMetricsPusher::report_ticker_stats(TickerContainer& tickers) } void -TickerMetricsPusher::report_matches(const std::vector& orders) +TickerMetricsPusher::report_matches(const std::vector& orders) { - auto log_match = [this](const common::match& match) { + auto log_match = [this](const tagged_match& match) { + std::string match_type = + fmt::format("{}->{}", match.seller->get_type(), match.buyer->get_type()); matches_quantity_counter .Add({ - {"ticker", common::to_string(match.position.ticker)}, - {"match_type", match.match_type } + {"ticker", common::to_string(match.ticker)}, + {"match_type", match_type } }) - .Increment(double{match.position.quantity}); + .Increment(double{match.quantity}); }; std::for_each(orders.begin(), orders.end(), log_match); diff --git a/exchange/src/exchange/metrics/on_tick_metrics.hpp b/exchange/src/exchange/metrics/on_tick_metrics.hpp index 378ec431..68fbbd70 100644 --- a/exchange/src/exchange/metrics/on_tick_metrics.hpp +++ b/exchange/src/exchange/metrics/on_tick_metrics.hpp @@ -39,7 +39,7 @@ class TickerMetricsPusher { void report_trader_stats(const TickerContainer& tickers); void report_ticker_stats(TickerContainer& tickers); void report_orders(const std::vector& orders); - void report_matches(const std::vector& orders); + void report_matches(const std::vector& orders); private: static Gauge create_gauge_(const std::string& gauge_name); diff --git a/exchange/src/exchange/orders/storage/order_storage.hpp b/exchange/src/exchange/orders/storage/order_storage.hpp index 074a40ea..5c97cf3e 100644 --- a/exchange/src/exchange/orders/storage/order_storage.hpp +++ b/exchange/src/exchange/orders/storage/order_storage.hpp @@ -27,6 +27,22 @@ class tagged_order : public BaseOrderT { using tagged_limit_order = tagged_order; using tagged_market_order = tagged_order; +class tagged_match : public common::position { +public: + GenericTrader* buyer; + GenericTrader* seller; + + tagged_match( + GenericTrader& buyer, GenericTrader& seller, const common::position& match + ) : common::position(match), buyer(&buyer), seller(&seller) + {} + + tagged_match( + GenericTrader* buyer, GenericTrader* seller, const common::position& match + ) : common::position(match), buyer(buyer), seller(seller) + {} +}; + using OrderVariant = std::variant; diff --git a/exchange/src/exchange/sandbox_server/crow.cpp b/exchange/src/exchange/sandbox_server/crow.cpp index 5105c699..e3cac051 100644 --- a/exchange/src/exchange/sandbox_server/crow.cpp +++ b/exchange/src/exchange/sandbox_server/crow.cpp @@ -12,6 +12,8 @@ #include #include +#include + namespace nutc::exchange { CrowServer::CrowServer(std::uint16_t port) : @@ -100,7 +102,9 @@ CrowServer::add_pending_trader_( start_remove_timer_(trial_secs, trader, algo_id, logfile_url); auto get_start_message = []() { - static auto start_message = glz::write_json(common::start_time{0}); + static auto start_message = + glz::write_json(common::start_time{std::chrono::high_resolution_clock::now() + }); if (!start_message.has_value()) [[unlikely]] throw std::runtime_error(glz::format_error(start_message.error())); return start_message.value(); diff --git a/exchange/src/exchange/traders/trader_types/bot_trader.cpp b/exchange/src/exchange/traders/trader_types/bot_trader.cpp index f9a50551..6fd265e4 100644 --- a/exchange/src/exchange/traders/trader_types/bot_trader.cpp +++ b/exchange/src/exchange/traders/trader_types/bot_trader.cpp @@ -1,7 +1,5 @@ #include "bot_trader.hpp" -#include "common/types/decimal.hpp" - #include #include diff --git a/exchange/src/exchange/wrappers/creation/rmq_wrapper_init.cpp b/exchange/src/exchange/wrappers/creation/rmq_wrapper_init.cpp index a4bc6119..f53c7997 100644 --- a/exchange/src/exchange/wrappers/creation/rmq_wrapper_init.cpp +++ b/exchange/src/exchange/wrappers/creation/rmq_wrapper_init.cpp @@ -4,20 +4,21 @@ #include +#include + namespace nutc::exchange { -int64_t +std::chrono::high_resolution_clock::time_point get_start_time(size_t wait_seconds) { using hrq = std::chrono::high_resolution_clock; - hrq::time_point time = hrq::now() + std::chrono::seconds(wait_seconds); - return std::chrono::time_point_cast(time) - .time_since_epoch() - .count(); + return hrq::now() + std::chrono::seconds(wait_seconds); } void -send_start_time(GenericTrader& trader, int64_t start_time) +send_start_time( + GenericTrader& trader, std::chrono::high_resolution_clock::time_point start_time +) { static auto mess = glz::write_json(common::start_time{start_time}); if (!mess.has_value()) [[unlikely]] diff --git a/exchange/src/exchange/wrappers/creation/rmq_wrapper_init.hpp b/exchange/src/exchange/wrappers/creation/rmq_wrapper_init.hpp index cdfb7213..de2f5405 100644 --- a/exchange/src/exchange/wrappers/creation/rmq_wrapper_init.hpp +++ b/exchange/src/exchange/wrappers/creation/rmq_wrapper_init.hpp @@ -2,9 +2,12 @@ #include "exchange/traders/trader_types/generic_trader.hpp" -namespace nutc::exchange { +#include -int64_t get_start_time(size_t wait_seconds); +namespace nutc::exchange { +std::chrono::high_resolution_clock::time_point get_start_time(size_t wait_seconds); -void send_start_time(GenericTrader& trader, int64_t start_time); +void send_start_time( + GenericTrader& trader, std::chrono::high_resolution_clock::time_point start_time +); } // namespace nutc::exchange diff --git a/exchange/src/wrapper/main.cpp b/exchange/src/wrapper/main.cpp index 4867754b..a98092d0 100644 --- a/exchange/src/wrapper/main.cpp +++ b/exchange/src/wrapper/main.cpp @@ -32,9 +32,10 @@ main(int argc, const char** argv) { using namespace nutc::wrapper; + auto [verbosity, trader_id, algo_type] = process_arguments(argc, argv); + std::signal(SIGINT, catch_sigint); std::signal(SIGTERM, catch_sigterm); - auto [verbosity, trader_id, algo_type] = process_arguments(argc, argv); static constexpr std::uint32_t MAX_LOG_SIZE = 50'000; nutc::logging::init_file_only( diff --git a/exchange/src/wrapper/messaging/exchange_communicator.cpp b/exchange/src/wrapper/messaging/exchange_communicator.cpp index 0608e4bd..e5641e70 100644 --- a/exchange/src/wrapper/messaging/exchange_communicator.cpp +++ b/exchange/src/wrapper/messaging/exchange_communicator.cpp @@ -15,8 +15,6 @@ namespace nutc::wrapper { -using start_tick_variant_t = std::variant; - void ExchangeCommunicator::publish_message(const std::string& message) { @@ -29,16 +27,28 @@ ExchangeCommunicator::publish_message(const std::string& message) algorithm_content ExchangeCommunicator::consume_algorithm() { - auto algorithm = consume_message(); + auto algorithm = wait_and_consume_message(); algorithm.algorithm_content_str = base64_decode(algorithm.algorithm_content_str); return algorithm; } -common::tick_update -ExchangeCommunicator::consume_tick_update() +std::variant +ExchangeCommunicator::consume_market_update() +{ + return consume_message>(); +} + +template +T +ExchangeCommunicator::wait_and_consume_message() { - return consume_message(); + using message_variant = std::variant; + message_variant data; + while (!std::holds_alternative(data)) { + data = consume_message(); + } + return std::get(data); } template @@ -51,6 +61,7 @@ ExchangeCommunicator::consume_message() throw std::runtime_error("Wrapper received empty buffer from stdin"); T data{}; + // TODO(anyone): this disables NRVO. Change up? auto err = glz::read_json(data, buf); if (err) { std::string error = glz::format_error(err, buf); @@ -61,6 +72,20 @@ ExchangeCommunicator::consume_message() return data; } +template +requires std::is_constructible_v +T +ExchangeCommunicator::publish_message(Args... args) +{ + T message{args...}; + auto message_opt = glz::write_json(message); + if (!message_opt.has_value()) [[unlikely]] + throw std::runtime_error(glz::format_error(message_opt.error())); + + publish_message(message_opt.value()); + return message; +} + LimitOrderFunction ExchangeCommunicator::place_limit_order() { @@ -68,10 +93,11 @@ ExchangeCommunicator::place_limit_order() common::Side side, common::Ticker ticker, double quantity, double price, bool ioc ) -> order_id_t { - limit_order order{ticker, side, quantity, price, ioc}; - if (!publish_message(order)) + if (limiter_.should_rate_limit()) { return -1; - return order.order_id; + } + return publish_message(ticker, side, quantity, price, ioc) + .order_id; }; } @@ -79,23 +105,27 @@ MarketOrderFunction ExchangeCommunicator::place_market_order() { return [this](common::Side side, common::Ticker ticker, double quantity) { - market_order order{ticker, side, quantity}; - return publish_message(order); + if (limiter_.should_rate_limit()) { + return false; + } + publish_message(ticker, side, quantity); + return true; }; } CancelOrderFunction ExchangeCommunicator::cancel_order() { - return [this](common::Ticker ticker, order_id_t order_id) -> bool { - return publish_message(common::cancel_order{ticker, order_id}); + return [](common::Ticker ticker, order_id_t order_id) { + publish_message(ticker, order_id); + return true; }; } -bool +void ExchangeCommunicator::report_startup_complete() { - return publish_message(common::init_message{}); + publish_message(); } // If wait_blocking is disabled, we block until we *receive* the message, but not @@ -105,14 +135,7 @@ ExchangeCommunicator::wait_for_start_time() { using nanoseconds = std::chrono::nanoseconds; using time_point = std::chrono::high_resolution_clock::time_point; - auto message = consume_message(); - - // Sandbox may get ob updates before it's initialized - while (!std::holds_alternative(message)) { - message = consume_message(); - } - - start_time start = std::get(message); + auto start = wait_and_consume_message(); time_point wait_until{nanoseconds{start.start_time_ns}}; std::this_thread::sleep_until(wait_until); diff --git a/exchange/src/wrapper/messaging/exchange_communicator.hpp b/exchange/src/wrapper/messaging/exchange_communicator.hpp index ee1f393e..8601b403 100644 --- a/exchange/src/wrapper/messaging/exchange_communicator.hpp +++ b/exchange/src/wrapper/messaging/exchange_communicator.hpp @@ -20,44 +20,41 @@ using MarketOrderFunction = using CancelOrderFunction = std::function; +// TODO: this class handles low-level communication (encoding, decoding, etc) and higher +// level abstractions like the messages themselves, waiting for algo, etc +// We should split this into two classes class ExchangeCommunicator { - RateLimiter limiter_{}; + RateLimiter limiter_; std::string trader_id_; public: ExchangeCommunicator(std::string trader_id) : trader_id_(std::move(trader_id)) {} - bool report_startup_complete(); + static void report_startup_complete(); - void wait_for_start_time(); + static void wait_for_start_time(); - algorithm_content consume_algorithm(); - - common::tick_update consume_tick_update(); - - static void publish_message(const std::string& message); - - [[nodiscard]] bool - publish_message(const auto& message) - { - if (limiter_.should_rate_limit()) { - return false; - } - auto message_opt = glz::write_json(message); - if (!message_opt.has_value()) [[unlikely]] - throw std::runtime_error(glz::format_error(message_opt.error())); - - publish_message(message_opt.value()); - return true; - } + static algorithm_content consume_algorithm(); + static std::variant + consume_market_update(); LimitOrderFunction place_limit_order(); MarketOrderFunction place_market_order(); - CancelOrderFunction cancel_order(); + static CancelOrderFunction cancel_order(); private: template - T consume_message(); + static T consume_message(); + + template + static T wait_and_consume_message(); + + // TODO: glaze concepts maybe? + template + requires std::is_constructible_v + static T publish_message(Args... args); + + static void publish_message(const std::string& message); }; } // namespace nutc::wrapper diff --git a/exchange/src/wrapper/runtime/runtime.cpp b/exchange/src/wrapper/runtime/runtime.cpp index cd36b083..097d8451 100644 --- a/exchange/src/wrapper/runtime/runtime.cpp +++ b/exchange/src/wrapper/runtime/runtime.cpp @@ -2,44 +2,49 @@ #include "wrapper/messaging/exchange_communicator.hpp" +#include + namespace nutc::wrapper { -template <> void -Runtime::process_message(start_time&) -{} +Runtime::process_account_update(const account_update& update) const +{ + fire_on_account_update( + update.trade.ticker, update.trade.side, update.trade.price, + update.trade.quantity, update.available_capital + ); +} -template <> void -Runtime::process_message(tick_update&& tick_update) +Runtime::process_tick_update(const tick_update& update) const { - std::ranges::for_each(tick_update.ob_updates, [&](const position& u) { + std::ranges::for_each(update.ob_updates, [&](const position& u) { fire_on_orderbook_update(u.ticker, u.side, u.quantity, u.price); }); - std::ranges::for_each(tick_update.matches, [&](const match& m) { - const auto& p = m.position; - fire_on_trade_update(p.ticker, p.side, p.quantity, p.price); - - if (m.buyer_id == trader_id_) [[unlikely]] { - fire_on_account_update( - p.ticker, p.side, p.price, p.quantity, m.buyer_capital - ); - } - if (m.seller_id == trader_id_) [[unlikely]] { - fire_on_account_update( - p.ticker, p.side, p.price, p.quantity, m.seller_capital - ); - } + std::ranges::for_each(update.matches, [&](const common::position& position) { + fire_on_trade_update( + position.ticker, position.side, position.quantity, position.price + ); }); } +// Should not receive account update before we place an order using start_tick_variant_t = std::variant; void -Runtime::main_event_loop() +Runtime::main_event_loop() const { while (true) { - process_message(communicator_.consume_tick_update()); + auto update = ExchangeCommunicator::consume_market_update(); + if (std::holds_alternative(update)) [[likely]] { + process_tick_update(std::get(update)); + } + else if (std::holds_alternative(update)) { + process_account_update(std::get(update)); + } + else { + throw std::runtime_error("Unknown update type received"); + } } } } // namespace nutc::wrapper diff --git a/exchange/src/wrapper/runtime/runtime.hpp b/exchange/src/wrapper/runtime/runtime.hpp index cf1258b3..1c437e96 100644 --- a/exchange/src/wrapper/runtime/runtime.hpp +++ b/exchange/src/wrapper/runtime/runtime.hpp @@ -38,10 +38,10 @@ class Runtime { decimal_price buyer_capital ) const = 0; - void main_event_loop(); + void main_event_loop() const; - template - void process_message(T&& message); + void process_tick_update(const tick_update& update) const; + void process_account_update(const account_update& update) const; const std::string& get_trader_id() const diff --git a/exchange/test/src/integration/tests/basic.cpp b/exchange/test/src/integration/tests/basic.cpp index a2e7fea5..657bc836 100644 --- a/exchange/test/src/integration/tests/basic.cpp +++ b/exchange/test/src/integration/tests/basic.cpp @@ -209,13 +209,12 @@ TEST_P(IntegrationBasicAlgo, OnAccountUpdateBuy) TEST_P(IntegrationBasicAlgo, AlgoStartDelay) { + auto start = std::chrono::high_resolution_clock::now(); start_wrappers( traders_, GetParam(), "buy_tsla_at_100", TEST_STARTING_CAPITAL, TEST_CLIENT_WAIT_SECS ); - auto start = std::chrono::high_resolution_clock::now(); - auto trader2 = traders_.add_trader(0); trader2->get_portfolio().modify_holdings(Ticker::ETH, 1000.0); // NOLINT trader2->add_order({Ticker::ETH, sell, 100.0, 100.0}); diff --git a/exchange/test/src/unit/matching/basic_matching.cpp b/exchange/test/src/unit/matching/basic_matching.cpp index 5c8d8615..34fdd110 100644 --- a/exchange/test/src/unit/matching/basic_matching.cpp +++ b/exchange/test/src/unit/matching/basic_matching.cpp @@ -37,7 +37,7 @@ class UnitBasicMatching : public ::testing::Test { nutc::exchange::CompositeOrderBook orderbook_{Ticker::ETH}; - std::vector + std::vector add_to_engine_(const tagged_limit_order& order) { return nutc::exchange::match_order(order, orderbook_); @@ -197,7 +197,7 @@ TEST_F(UnitBasicMatching, PassivePriceMatchReversed) matches = add_to_engine_(order2); ASSERT_EQ(matches.size(), 1); - ASSERT_EQ(matches.at(0).position.price, 1.0); + ASSERT_EQ(matches.at(0).price, 1.0); ASSERT_EQ_MATCH(matches.at(0), Ticker::ETH, "DEF", "ABC", buy, 1, 1); } diff --git a/exchange/test/src/unit/matching/invalid_orders.cpp b/exchange/test/src/unit/matching/invalid_orders.cpp index fdc64736..41862f47 100644 --- a/exchange/test/src/unit/matching/invalid_orders.cpp +++ b/exchange/test/src/unit/matching/invalid_orders.cpp @@ -28,7 +28,7 @@ class UnitInvalidOrders : public ::testing::Test { nutc::exchange::CompositeOrderBook orderbook_{Ticker::ETH}; - std::vector + std::vector add_to_engine_(const tagged_limit_order& order) { return nutc::exchange::match_order(order, orderbook_); diff --git a/exchange/test/src/unit/matching/many_orders.cpp b/exchange/test/src/unit/matching/many_orders.cpp index 3e297368..04d0bffd 100644 --- a/exchange/test/src/unit/matching/many_orders.cpp +++ b/exchange/test/src/unit/matching/many_orders.cpp @@ -32,7 +32,7 @@ class UnitManyOrders : public ::testing::Test { nutc::exchange::CompositeOrderBook orderbook_{Ticker::ETH}; - std::vector + std::vector add_to_engine_(const tagged_limit_order& order) { return nutc::exchange::match_order(order, orderbook_); diff --git a/exchange/test/src/unit/matching/match_ioc.cpp b/exchange/test/src/unit/matching/match_ioc.cpp index 9e80f620..35b93992 100644 --- a/exchange/test/src/unit/matching/match_ioc.cpp +++ b/exchange/test/src/unit/matching/match_ioc.cpp @@ -28,7 +28,7 @@ class UnitMatchIOC : public ::testing::Test { nutc::exchange::CompositeOrderBook orderbook_{Ticker::ETH}; - std::vector + std::vector add_to_engine_(const tagged_limit_order& order) { return nutc::exchange::match_order(order, orderbook_); @@ -145,7 +145,7 @@ TEST_F(UnitMatchIOC, IOCBuyOrderPriceImprovement) matches = add_to_engine_(order2); ASSERT_EQ(matches.size(), 1); - ASSERT_EQ(matches[0].position.price, 1.0); + ASSERT_EQ(matches[0].price, 1.0); } TEST_F(UnitMatchIOC, IOCSellOrderPriceImprovement) @@ -158,7 +158,7 @@ TEST_F(UnitMatchIOC, IOCSellOrderPriceImprovement) matches = add_to_engine_(order2); ASSERT_EQ(matches.size(), 1); - ASSERT_EQ(matches[0].position.price, 1.5); + ASSERT_EQ(matches[0].price, 1.5); } TEST_F(UnitMatchIOC, MultipleIOCOrdersMatchingAtDifferentPriceLevels) diff --git a/exchange/test/src/unit/matching/match_market.cpp b/exchange/test/src/unit/matching/match_market.cpp index c0ac1c7a..ce1a01ff 100644 --- a/exchange/test/src/unit/matching/match_market.cpp +++ b/exchange/test/src/unit/matching/match_market.cpp @@ -32,7 +32,7 @@ class UnitMatchMarket : public ::testing::Test { nutc::exchange::CompositeOrderBook orderbook_{Ticker::ETH}; - std::vector + std::vector add_to_engine_(const auto& order) { return nutc::exchange::match_order(order, orderbook_); diff --git a/exchange/test/src/unit/matching/order_fee_matching.cpp b/exchange/test/src/unit/matching/order_fee_matching.cpp index c7edf96a..21fb957d 100644 --- a/exchange/test/src/unit/matching/order_fee_matching.cpp +++ b/exchange/test/src/unit/matching/order_fee_matching.cpp @@ -35,7 +35,7 @@ class UnitOrderFeeMatching : public ::testing::Test { nutc::exchange::CompositeOrderBook orderbook_{Ticker::ETH}; - std::vector + std::vector add_to_engine_(const tagged_limit_order& order) { return nutc::exchange::match_order(order, orderbook_, .5); @@ -222,7 +222,7 @@ TEST_F(UnitOrderFeeMatching, PassivePriceMatchReversed) matches = add_to_engine_(order2); ASSERT_EQ(matches.size(), 1); - ASSERT_EQ(matches.at(0).position.price, 1.0); + ASSERT_EQ(matches.at(0).price, 1.0); ASSERT_EQ_MATCH(matches.at(0), Ticker::ETH, "DEF", "ABC", buy, 1, 1); ASSERT_EQ(trader2.get_portfolio().get_capital_delta(), -1 * 1 * 1.5); ASSERT_EQ(trader1.get_portfolio().get_capital_delta(), 1 * .5); diff --git a/exchange/test/src/util/helpers/test_cycle.cpp b/exchange/test/src/util/helpers/test_cycle.cpp index c45332de..515b6ce6 100644 --- a/exchange/test/src/util/helpers/test_cycle.cpp +++ b/exchange/test/src/util/helpers/test_cycle.cpp @@ -32,7 +32,7 @@ get_base_order(const auto& order) } // namespace -std::vector +std::vector TestMatchingCycle::match_orders_(std::vector orders) { for (auto& order : orders) { diff --git a/exchange/test/src/util/helpers/test_cycle.hpp b/exchange/test/src/util/helpers/test_cycle.hpp index fd612807..62493e03 100644 --- a/exchange/test/src/util/helpers/test_cycle.hpp +++ b/exchange/test/src/util/helpers/test_cycle.hpp @@ -27,8 +27,8 @@ class TestMatchingCycle : public exchange::BaseMatchingCycle { wait_for_order(const OrderT& order, std::function equality_function = soft_equality); private: - std::vector match_orders_(std::vector orders - ) override; + std::vector + match_orders_(std::vector orders) override; }; } // namespace nutc::test diff --git a/exchange/test/src/util/macros.cpp b/exchange/test/src/util/macros.cpp index d202a7f2..847d7791 100644 --- a/exchange/test/src/util/macros.cpp +++ b/exchange/test/src/util/macros.cpp @@ -18,14 +18,14 @@ order_equality(const limit_order& order1, const limit_order& order2) bool validate_match( - const nutc::common::match& match, common::Ticker ticker, + const nutc::exchange::tagged_match& match, common::Ticker ticker, const std::string& buyer_id, const std::string& seller_id, common::Side side, double quantity, double price ) { - return match.position.ticker == ticker && match.buyer_id == buyer_id - && match.seller_id == seller_id && match.position.side == side - && match.position.price == price && match.position.quantity == quantity; + return match.ticker == ticker && match.buyer->get_id() == buyer_id + && match.seller->get_id() == seller_id && match.side == side + && match.price == price && match.quantity == quantity; } bool diff --git a/exchange/test/src/util/macros.hpp b/exchange/test/src/util/macros.hpp index 300a77a1..58981056 100644 --- a/exchange/test/src/util/macros.hpp +++ b/exchange/test/src/util/macros.hpp @@ -31,7 +31,7 @@ PrintTo(const AlgoLanguage& op, std::ostream* os) namespace nutc::test { bool validate_match( - const nutc::common::match& match, common::Ticker ticker, + const nutc::exchange::tagged_match& match, common::Ticker ticker, const std::string& buyer_id, const std::string& seller_id, common::Side side, double quantity, double price ); @@ -66,13 +66,13 @@ order_equality(const common::market_order& order1, const common::market_order& o << "Expected match with ticker = " << (nutc::common::to_string(ticker_)) \ << ", buyer_id = " << (buyer_id_) << ", seller_id = " << (seller_id_) \ << ", side = " << static_cast(side_) << ", price = " << (price_) \ - << ", quantity = " << (quantity_) << ". Actual match: ticker = " \ - << nutc::common::to_string((match).position.ticker) \ - << ", buyer_id = " << (match).buyer_id \ - << ", seller_id = " << (match).seller_id \ - << ", side = " << static_cast((match).position.side) \ - << ", price = " << double{(match).position.price} \ - << ", quantity = " << double{(match).position.quantity}; \ + << ", quantity = " << (quantity_) \ + << ". Actual match: ticker = " << nutc::common::to_string((match).ticker) \ + << ", buyer_id = " << (match).buyer->get_id() \ + << ", seller_id = " << (match).seller->get_id() \ + << ", side = " << static_cast((match).side) \ + << ", price = " << double{(match).price} \ + << ", quantity = " << double{(match).quantity}; \ } while (0) #define ASSERT_EQ_OB_UPDATE(/* NOLINT(cppcoreguidelines-macro-usage) */ \