From 1ed02ba9c0e47ca15e2b5eb746648e67bdbd4c6a Mon Sep 17 00:00:00 2001 From: Steven Ewald Date: Tue, 15 Oct 2024 14:43:52 -0500 Subject: [PATCH 1/6] Fixed order ids - checkpoint --- exchange/benchmark/src/orderbook.cpp | 1 + .../common/messages_wrapper_to_exchange.hpp | 36 ++++++------ exchange/src/common/util.cpp | 10 ++-- .../matching_cycle/base/base_cycle.cpp | 7 ++- .../orders/orderbook/order_id_tracker.cpp | 8 ++- .../exchange/orders/storage/order_storage.hpp | 47 +++++++++++---- .../traders/trader_types/bot_trader.hpp | 15 ++++- .../messaging/exchange_communicator.cpp | 6 +- exchange/test/src/integration/tests/basic.cpp | 57 ++++++++++--------- .../src/integration/tests/cancellation.cpp | 19 ++++--- .../test/src/unit/matching/basic_matching.cpp | 4 +- .../test/src/unit/matching/invalid_orders.cpp | 8 ++- .../test/src/unit/matching/many_orders.cpp | 3 +- .../src/unit/matching/order_fee_matching.cpp | 11 ++-- exchange/test/src/util/macros.cpp | 32 +++++++++++ exchange/test/src/util/macros.hpp | 13 +++++ 16 files changed, 191 insertions(+), 86 deletions(-) diff --git a/exchange/benchmark/src/orderbook.cpp b/exchange/benchmark/src/orderbook.cpp index aa9d6235..59bb47db 100644 --- a/exchange/benchmark/src/orderbook.cpp +++ b/exchange/benchmark/src/orderbook.cpp @@ -1,3 +1,4 @@ +#include "common/util.hpp" #include "exchange/orders/orderbook/limit_orderbook.hpp" #include "helpers/benchmark_trader.hpp" diff --git a/exchange/src/common/messages_wrapper_to_exchange.hpp b/exchange/src/common/messages_wrapper_to_exchange.hpp index 5e074d58..fc674ebc 100644 --- a/exchange/src/common/messages_wrapper_to_exchange.hpp +++ b/exchange/src/common/messages_wrapper_to_exchange.hpp @@ -18,7 +18,13 @@ struct init_message { struct cancel_order { Ticker ticker; order_id_t order_id; - std::uint64_t timestamp = get_time(); + std::uint64_t timestamp; + + cancel_order(Ticker ticker, order_id_t order_id, std::uint64_t timestamp) : + ticker(ticker), order_id(order_id), timestamp(timestamp) + {} + + cancel_order() = default; bool operator==(const cancel_order& other) const @@ -31,12 +37,13 @@ struct market_order { Ticker ticker; Side side; decimal_quantity quantity; - std::uint64_t timestamp = get_time(); + std::uint64_t timestamp; constexpr market_order() = default; - market_order(Ticker ticker, Side side, decimal_quantity quantity) : - ticker(ticker), side(side), quantity(quantity) + market_order( + Ticker ticker, Side side, decimal_quantity quantity, std::uint64_t timestamp + ) : ticker(ticker), side(side), quantity(quantity), timestamp(timestamp) {} bool @@ -50,7 +57,7 @@ struct market_order { struct limit_order : market_order { decimal_price price; bool ioc{false}; - order_id_t order_id = generate_order_id(); + order_id_t order_id; // TODO: fix tests and remove bool @@ -61,16 +68,12 @@ struct limit_order : market_order { && ioc == other.ioc; } - limit_order( - std::string_view ticker, Side side, decimal_quantity quantity, - decimal_price price, bool ioc = false - ) : market_order{force_to_ticker(ticker), side, quantity}, price{price}, ioc{ioc} - {} - limit_order( Ticker ticker, Side side, decimal_quantity quantity, decimal_price price, - bool ioc = false - ) : market_order{ticker, side, quantity}, price{price}, ioc{ioc} + bool ioc, std::uint64_t timestamp, order_id_t order_id + ) : + market_order{ticker, side, quantity, timestamp}, price{price}, ioc{ioc}, + order_id{order_id} {} limit_order() = default; @@ -85,7 +88,8 @@ using IncomingMessageVariant = template <> struct glz::meta { using t = nutc::common::cancel_order; - static constexpr auto value = object("cancel", &t::ticker, &t::order_id); + static constexpr auto value = + object("cancel", &t::ticker, &t::order_id, &t::timestamp); }; /// \cond @@ -93,7 +97,7 @@ template <> struct glz::meta { using t = nutc::common::limit_order; static constexpr auto value = object( - "limit", &t::timestamp, &t::ticker, &t::side, &t::quantity, &t::price, &t::ioc, + "limit", &t::ticker, &t::side, &t::quantity, &t::timestamp, &t::price, &t::ioc, &t::order_id ); }; @@ -103,7 +107,7 @@ template <> struct glz::meta { using t = nutc::common::market_order; static constexpr auto value = - object("market", &t::timestamp, &t::ticker, &t::side, &t::quantity); + object("market", &t::ticker, &t::side, &t::quantity, &t::timestamp); }; /// \cond diff --git a/exchange/src/common/util.cpp b/exchange/src/common/util.cpp index 3b8b93d7..fa580f85 100644 --- a/exchange/src/common/util.cpp +++ b/exchange/src/common/util.cpp @@ -10,7 +10,7 @@ #include #include -#include +#include namespace nutc::common { namespace bi = boost::archive::iterators; @@ -52,9 +52,11 @@ find_project_file(const std::string& file_name) order_id_t generate_order_id() { - static std::mt19937_64 gen{std::random_device{}()}; - static std::uniform_int_distribution dis; - return dis(gen); + // static constexpr auto MAX_PID_BITS = 22; + static auto pid = static_cast(getpid()); + // static std::atomic start_order_id{(pid << 46)}; + static std::int64_t start_order_id{(pid<<46)}; + return ++start_order_id; } uint64_t diff --git a/exchange/src/exchange/matching_cycle/base/base_cycle.cpp b/exchange/src/exchange/matching_cycle/base/base_cycle.cpp index eae0a061..d6990c53 100644 --- a/exchange/src/exchange/matching_cycle/base/base_cycle.cpp +++ b/exchange/src/exchange/matching_cycle/base/base_cycle.cpp @@ -34,8 +34,11 @@ BaseMatchingCycle::collect_orders(uint64_t) -> std::vector else if constexpr (std::is_same_v) { return order; } - else { - return tagged_order{trader, order}; + else if constexpr (std::is_same_v) { + return tagged_limit_order{trader, order}; + } + else if constexpr (std::is_same_v) { + return tagged_market_order{trader, order}; } }; diff --git a/exchange/src/exchange/orders/orderbook/order_id_tracker.cpp b/exchange/src/exchange/orders/orderbook/order_id_tracker.cpp index dd2d969a..ea128f45 100644 --- a/exchange/src/exchange/orders/orderbook/order_id_tracker.cpp +++ b/exchange/src/exchange/orders/orderbook/order_id_tracker.cpp @@ -16,8 +16,10 @@ OrderIdTracker::remove_order(common::order_id_t order_id) void OrderIdTracker::add_order(LimitOrderBook::stored_limit_order order) { - if (!order->ioc) { - order_map_.emplace(order->order_id, order); - } + if (order->ioc) + return; + if (order_map_.contains(order->order_id)) [[unlikely]] + throw std::runtime_error("Uhhhh"); + order_map_.emplace(order->order_id, order); } } // namespace nutc::exchange diff --git a/exchange/src/exchange/orders/storage/order_storage.hpp b/exchange/src/exchange/orders/storage/order_storage.hpp index 93659411..18a9145b 100644 --- a/exchange/src/exchange/orders/storage/order_storage.hpp +++ b/exchange/src/exchange/orders/storage/order_storage.hpp @@ -1,31 +1,56 @@ #pragma once #include "common/messages_wrapper_to_exchange.hpp" +#include "common/types/decimal.hpp" +#include "common/util.hpp" #include "exchange/traders/trader_types/generic_trader.hpp" #include namespace nutc::exchange { -template -class tagged_order : public BaseOrderT { +// TODO: make generic again +class tagged_limit_order : public common::limit_order { public: GenericTrader* trader; - tagged_order(GenericTrader& order_creator, const auto& order) : - BaseOrderT(order), trader(&order_creator) + tagged_limit_order(GenericTrader& order_creator, const limit_order& order) : + limit_order(order), trader(&order_creator) {} - template - tagged_order(GenericTrader& order_creator, Args&&... args) - requires std::is_constructible_v - : BaseOrderT(args...), trader(&order_creator) + tagged_limit_order( + GenericTrader& order_creator, common::Ticker ticker, common::Side side, + common::decimal_quantity decimal_quantity, common::decimal_price decimal_price, + bool ioc = false + ) : + limit_order( + ticker, side, decimal_quantity, decimal_price, ioc, common::get_time(), + common::generate_order_id() + ), + trader(&order_creator) {} - bool operator==(const tagged_order& other) const = default; + bool operator==(const tagged_limit_order& other) const = default; }; -using tagged_limit_order = tagged_order; -using tagged_market_order = tagged_order; +// TODO: make generic again +class tagged_market_order : public common::market_order { +public: + GenericTrader* trader; + + tagged_market_order(GenericTrader& order_creator, const market_order& order) : + market_order(order), trader(&order_creator) + {} + + tagged_market_order( + GenericTrader& order_creator, common::Ticker ticker, common::Side side, + common::decimal_quantity decimal_quantity + ) : + market_order(ticker, side, decimal_quantity, common::get_time()), + trader(&order_creator) + {} + + bool operator==(const tagged_market_order& other) const = default; +}; using OrderVariant = std::variant; diff --git a/exchange/src/exchange/traders/trader_types/bot_trader.hpp b/exchange/src/exchange/traders/trader_types/bot_trader.hpp index 9a4bb264..26093356 100644 --- a/exchange/src/exchange/traders/trader_types/bot_trader.hpp +++ b/exchange/src/exchange/traders/trader_types/bot_trader.hpp @@ -73,7 +73,13 @@ class BotTrader : public GenericTrader { common::decimal_price price, bool ioc ) { - common::limit_order order{TICKER, side, quantity, price, ioc}; + common::limit_order order{TICKER, + side, + quantity, + price, + ioc, + common::get_time(), + common::generate_order_id()}; orders_.emplace_back(order); return order.order_id; } @@ -81,13 +87,16 @@ class BotTrader : public GenericTrader { void cancel_order(common::order_id_t order_id) { - orders_.emplace_back(common::cancel_order{TICKER, order_id}); + orders_.emplace_back(common::cancel_order{TICKER, order_id, common::get_time()} + ); } void add_market_order(common::Side side, common::decimal_quantity quantity) { - orders_.emplace_back(common::market_order{TICKER, side, quantity}); + orders_.emplace_back( + common::market_order{TICKER, side, quantity, common::get_time()} + ); } void diff --git a/exchange/src/wrapper/messaging/exchange_communicator.cpp b/exchange/src/wrapper/messaging/exchange_communicator.cpp index f6adc888..781f20d1 100644 --- a/exchange/src/wrapper/messaging/exchange_communicator.cpp +++ b/exchange/src/wrapper/messaging/exchange_communicator.cpp @@ -68,7 +68,7 @@ ExchangeCommunicator::place_limit_order() common::Side side, common::Ticker ticker, double quantity, double price, bool ioc ) -> order_id_t { - limit_order order{ticker, side, quantity, price, ioc}; + limit_order order{ticker, side, quantity, price, ioc, get_time(), generate_order_id()}; if (!publish_message(order)) return -1; return order.order_id; @@ -79,7 +79,7 @@ MarketOrderFunction ExchangeCommunicator::place_market_order() { return [this](common::Side side, common::Ticker ticker, double quantity) { - market_order order{ticker, side, quantity}; + market_order order{ticker, side, quantity, common::get_time()}; return publish_message(order); }; } @@ -88,7 +88,7 @@ CancelOrderFunction ExchangeCommunicator::cancel_order() { return [this](common::Ticker ticker, order_id_t order_id) -> bool { - return publish_message(common::cancel_order{ticker, order_id}); + return publish_message(common::cancel_order{ticker, order_id, common::get_time()}); }; } diff --git a/exchange/test/src/integration/tests/basic.cpp b/exchange/test/src/integration/tests/basic.cpp index a2e7fea5..ed8b9289 100644 --- a/exchange/test/src/integration/tests/basic.cpp +++ b/exchange/test/src/integration/tests/basic.cpp @@ -25,11 +25,11 @@ TEST_P(IntegrationBasicAlgo, ConfirmOrderReceived) start_wrappers(traders_, GetParam(), "buy_tsla_at_100"); auto trader2 = traders_.add_trader(0); trader2->get_portfolio().modify_holdings(Ticker::ETH, 1000.0); // NOLINT - trader2->add_order(limit_order{Ticker::ETH, sell, 100.0, 10.0}); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 100.0, 10.0)); TestMatchingCycle cycle{traders_}; - cycle.wait_for_order(limit_order{Ticker::ETH, buy, 100.0, 10.0}); + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 100.0, 10.0)); ASSERT_EQ( double{ trader2->get_portfolio().get_capital() @@ -44,11 +44,11 @@ TEST_P(IntegrationBasicAlgo, ConfirmOrderFeeApplied) start_wrappers(traders_, GetParam(), "buy_tsla_at_100"); auto trader2 = traders_.add_trader(0); trader2->get_portfolio().modify_holdings(Ticker::ETH, 1000.0); // NOLINT - trader2->add_order(limit_order{Ticker::ETH, sell, 100.0, 10.0}); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 100.0, 10.0)); TestMatchingCycle cycle{traders_, .5}; - cycle.wait_for_order(limit_order{Ticker::ETH, buy, 100.0, 10.0}); + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 100.0, 10.0)); ASSERT_EQ( double{ trader2->get_portfolio().get_capital() @@ -63,7 +63,7 @@ TEST_P(IntegrationBasicAlgo, RemoveIOCOrder) auto& trader1 = start_wrappers(traders_, GetParam(), "buy_tsla_at_100"); auto trader2 = traders_.add_trader(0); trader2->get_portfolio().modify_holdings(Ticker::ETH, 1000.0); // NOLINT - trader2->add_order({Ticker::ETH, sell, 100.0, 100.0}); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 100.0, 100.0)); TestMatchingCycle cycle{traders_}; @@ -82,11 +82,11 @@ TEST_P(IntegrationBasicAlgo, MarketOrderBuy) start_wrappers(traders_, GetParam(), "buy_market_order_1000"); auto trader2 = traders_.add_trader(0); trader2->get_portfolio().modify_holdings(Ticker::ETH, 1000.0); - trader2->add_order({Ticker::ETH, sell, 100.0, 100.0}); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 100.0, 100.0)); TestMatchingCycle cycle{traders_}; - cycle.wait_for_order(limit_order{Ticker::BTC, buy, 1.0, 100.0}); + cycle.wait_for_order(make_limit_order(Ticker::BTC, buy, 1.0, 100.0)); } TEST_P(IntegrationBasicAlgo, MarketOrderSell) @@ -94,12 +94,12 @@ TEST_P(IntegrationBasicAlgo, MarketOrderSell) auto& trader1 = start_wrappers(traders_, GetParam(), "sell_market_order_1000"); auto trader2 = traders_.add_trader(0); trader1.get_portfolio().modify_holdings(Ticker::ETH, 1000.0); - trader2->add_order({Ticker::ETH, buy, 1.0, 100.0}); + trader2->add_order(make_limit_order(Ticker::ETH, buy, 1.0, 100.0)); trader2->get_portfolio().modify_capital(1000.0); TestMatchingCycle cycle{traders_}; - cycle.wait_for_order(limit_order{Ticker::BTC, buy, 1.0, 100.0}); + cycle.wait_for_order(make_limit_order(Ticker::BTC, buy, 1.0, 100.0)); } TEST_P(IntegrationBasicAlgo, ManyUpdates) @@ -112,12 +112,14 @@ TEST_P(IntegrationBasicAlgo, ManyUpdates) TestMatchingCycle cycle{traders_}; for (int i = 0; i < 10000; i++) { - trader2->add_order({Ticker::ETH, sell, 1.0, static_cast(i)}); + trader2->add_order( + make_limit_order(Ticker::ETH, sell, 1.0, static_cast(i)) + ); } cycle.on_tick(0); - cycle.wait_for_order(limit_order{Ticker::ETH, buy, 10.0, 100.0}); + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 10.0, 100.0)); } TEST_P(IntegrationBasicAlgo, OrderVolumeLimitsPreventGoingAboveLimit) @@ -127,7 +129,8 @@ TEST_P(IntegrationBasicAlgo, OrderVolumeLimitsPreventGoingAboveLimit) TestMatchingCycle cycle{traders_, 0.0, 10.0}; for (int i = 1; i < 21; i++) { - cycle.wait_for_order(limit_order{Ticker::ETH, buy, 1.0, static_cast(i)} + cycle.wait_for_order( + make_limit_order(Ticker::ETH, buy, 1.0, static_cast(i)) ); cycle.on_tick(0); } @@ -144,11 +147,11 @@ TEST_P(IntegrationBasicAlgo, OnTradeUpdate) TestMatchingCycle cycle{traders_}; - trader2->add_order({Ticker::ETH, sell, 100.0, 100.0}); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 100.0, 100.0)); - cycle.wait_for_order(limit_order{Ticker::ETH, buy, 10.0, 102.0}); + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 10.0, 102.0)); - cycle.wait_for_order(limit_order{Ticker::BTC, buy, 1.0, 100.0}); + cycle.wait_for_order(make_limit_order(Ticker::BTC, buy, 1.0, 100.0)); } // Sanity check that it goes through the orderbook @@ -161,10 +164,10 @@ TEST_P(IntegrationBasicAlgo, MultipleLevelOrder) TestMatchingCycle cycle{traders_}; - trader2->add_order({Ticker::ETH, sell, 55.0, 1.0}); - trader2->add_order({Ticker::ETH, sell, 45.0, 1.0}); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 55.0, 1.0)); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 45.0, 1.0)); - cycle.wait_for_order(limit_order{Ticker::ETH, buy, 100.0, 10.0}); + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 100.0, 10.0)); ASSERT_EQ( trader1.get_portfolio().get_capital() - trader1.get_portfolio().get_initial_capital(), @@ -178,16 +181,16 @@ TEST_P(IntegrationBasicAlgo, OnAccountUpdateSell) trader1.get_portfolio().modify_holdings(Ticker::ETH, 1000.0); auto trader2 = traders_.add_trader(100000); - trader2->add_order({Ticker::ETH, buy, 102.0, 102.0}); + trader2->add_order(make_limit_order(Ticker::ETH, buy, 102.0, 102.0)); TestMatchingCycle cycle{traders_}; // obupdate triggers one user to place acommon::Side::buy order of 10 ABC at 102 - cycle.wait_for_order(limit_order{Ticker::ETH, sell, 10.0, 100.0}); + cycle.wait_for_order(make_limit_order(Ticker::ETH, sell, 10.0, 100.0)); // on_trade_match triggers one user to place acommon::Side::buy order of 1 ABC // at 100 - cycle.wait_for_order(limit_order{Ticker::BTC, buy, 1.0, 100.0}); + cycle.wait_for_order(make_limit_order(Ticker::BTC, buy, 1.0, 100.0)); } TEST_P(IntegrationBasicAlgo, OnAccountUpdateBuy) @@ -196,15 +199,15 @@ TEST_P(IntegrationBasicAlgo, OnAccountUpdateBuy) auto trader2 = traders_.add_trader(0); trader2->get_portfolio().modify_holdings(Ticker::ETH, 1000.0); // NOLINT - trader2->add_order({Ticker::ETH, sell, 100.0, 100.0}); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 100.0, 100.0)); TestMatchingCycle cycle{traders_}; // obupdate triggers one user to place acommon::Side::buy order of 10 ABC at 102 - cycle.wait_for_order(limit_order{Ticker::ETH, buy, 10.0, 102.0}); + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 10.0, 102.0)); // on_trade_match triggers one user to place acommon::Side::buy order of 1 ABC // at 100 - cycle.wait_for_order(limit_order{Ticker::BTC, buy, 1.0, 100.0}); + cycle.wait_for_order(make_limit_order(Ticker::BTC, buy, 1.0, 100.0)); } TEST_P(IntegrationBasicAlgo, AlgoStartDelay) @@ -218,11 +221,11 @@ TEST_P(IntegrationBasicAlgo, AlgoStartDelay) auto trader2 = traders_.add_trader(0); trader2->get_portfolio().modify_holdings(Ticker::ETH, 1000.0); // NOLINT - trader2->add_order({Ticker::ETH, sell, 100.0, 100.0}); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 100.0, 100.0)); TestMatchingCycle cycle{traders_}; - cycle.wait_for_order(limit_order{Ticker::ETH, buy, 100.0, 10.0}); + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 100.0, 10.0)); auto end = std::chrono::high_resolution_clock::now(); const int64_t observed_duration_ms = @@ -239,7 +242,7 @@ TEST_P(IntegrationBasicAlgo, DisableTrader) auto& trader1 = start_wrappers(traders_, GetParam(), "buy_tsla_at_100"); auto trader2 = traders_.add_trader(0); trader2->get_portfolio().modify_holdings(Ticker::ETH, 1000.0); // NOLINT - trader2->add_order({Ticker::ETH, sell, 100.0, 100.0}); + trader2->add_order(make_limit_order(Ticker::ETH, sell, 100.0, 100.0)); trader1.disable(); diff --git a/exchange/test/src/integration/tests/cancellation.cpp b/exchange/test/src/integration/tests/cancellation.cpp index cacb5f05..f63b6a34 100644 --- a/exchange/test/src/integration/tests/cancellation.cpp +++ b/exchange/test/src/integration/tests/cancellation.cpp @@ -23,9 +23,10 @@ TEST_P(IntegrationBasicCancellation, CancelMessageHasSameIdAsOrder) start_wrappers(traders_, GetParam(), "cancel_limit_order"); TestMatchingCycle cycle{traders_}; - auto order_id = cycle.wait_for_order(limit_order{Ticker::ETH, buy, 100.0, 10.0}); + auto order_id = + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 100.0, 10.0)); EXPECT_TRUE(order_id.has_value()); - cycle.wait_for_order(common::cancel_order{common::Ticker::ETH, *order_id}); + cycle.wait_for_order(make_cancel_order(common::Ticker::ETH, *order_id)); } TEST_P(IntegrationBasicCancellation, CancelMessagePreventsOrderFromExecuting) @@ -35,10 +36,11 @@ TEST_P(IntegrationBasicCancellation, CancelMessagePreventsOrderFromExecuting) trader2->get_portfolio().modify_holdings(Ticker::ETH, 100.0); TestMatchingCycle cycle{traders_}; - auto order_id = cycle.wait_for_order(limit_order{Ticker::ETH, buy, 100.0, 10.0}); + auto order_id = + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 100.0, 10.0)); EXPECT_TRUE(order_id.has_value()); - cycle.wait_for_order(common::cancel_order{common::Ticker::ETH, *order_id}); - trader2->add_order(common::market_order{Ticker::ETH, sell, 10.0}); + cycle.wait_for_order(make_cancel_order(common::Ticker::ETH, *order_id)); + trader2->add_order(make_market_order(Ticker::ETH, sell, 10.0)); cycle.on_tick(0); @@ -54,11 +56,12 @@ TEST_P(IntegrationBasicCancellation, OneOfTwoOrdersCancelledResultsInMatch) trader2->get_portfolio().modify_holdings(Ticker::ETH, 100.0); TestMatchingCycle cycle{traders_}; - auto order_id = cycle.wait_for_order(limit_order{Ticker::ETH, buy, 100.0, 10.0}); + auto order_id = + cycle.wait_for_order(make_limit_order(Ticker::ETH, buy, 100.0, 10.0)); // Assume non-cancelled order got through EXPECT_TRUE(order_id.has_value()); - cycle.wait_for_order(common::cancel_order{common::Ticker::ETH, *order_id}); - trader2->add_order(common::market_order{Ticker::ETH, sell, 10.0}); + cycle.wait_for_order(make_cancel_order(common::Ticker::ETH, *order_id)); + trader2->add_order(make_market_order(Ticker::ETH, sell, 10.0)); cycle.on_tick(0); diff --git a/exchange/test/src/unit/matching/basic_matching.cpp b/exchange/test/src/unit/matching/basic_matching.cpp index 92c82634..5c8d8615 100644 --- a/exchange/test/src/unit/matching/basic_matching.cpp +++ b/exchange/test/src/unit/matching/basic_matching.cpp @@ -10,6 +10,7 @@ using nutc::common::Ticker; using nutc::common::Side::buy; using nutc::common::Side::sell; +using namespace nutc::test; class UnitBasicMatching : public ::testing::Test { protected: @@ -99,12 +100,13 @@ TEST_F(UnitBasicMatching, NoMatchThenMatchSell) tagged_limit_order order1{trader1, Ticker::ETH, buy, 1.0, 1.0}; tagged_limit_order order2{trader2, Ticker::ETH, buy, 1.0, 1.0}; tagged_limit_order order3{trader3, Ticker::ETH, sell, 2.0, 0.0}; + tagged_limit_order order4{trader1, Ticker::ETH, buy, 1.0, 1.0}; auto matches = add_to_engine_(order1); ASSERT_EQ(matches.size(), 0); matches = add_to_engine_(order2); ASSERT_EQ(matches.size(), 0); - matches = add_to_engine_(order1); + matches = add_to_engine_(order4); ASSERT_EQ(matches.size(), 0); matches = add_to_engine_(order3); ASSERT_EQ(matches.size(), 2); diff --git a/exchange/test/src/unit/matching/invalid_orders.cpp b/exchange/test/src/unit/matching/invalid_orders.cpp index 2a97dd91..fdc64736 100644 --- a/exchange/test/src/unit/matching/invalid_orders.cpp +++ b/exchange/test/src/unit/matching/invalid_orders.cpp @@ -39,8 +39,10 @@ TEST_F(UnitInvalidOrders, RemoveThenAddFunds) { trader1.get_portfolio().modify_capital(-TEST_STARTING_CAPITAL); - tagged_limit_order order2{trader2, Ticker::ETH, sell, 1.0, 1.0}; tagged_limit_order order1{trader1, Ticker::ETH, buy, 1.0, 1.0}; + tagged_limit_order order11{trader1, Ticker::ETH, buy, 1.0, 1.0}; + tagged_limit_order order2{trader2, Ticker::ETH, sell, 1.0, 1.0}; + tagged_limit_order order22{trader2, Ticker::ETH, sell, 1.0, 1.0}; // Thrown out auto matches = add_to_engine_(order1); @@ -53,11 +55,11 @@ TEST_F(UnitInvalidOrders, RemoveThenAddFunds) trader1.get_portfolio().modify_capital(TEST_STARTING_CAPITAL); // Kept, but not matched - matches = add_to_engine_(order2); + matches = add_to_engine_(order22); ASSERT_EQ(matches.size(), 0); // Kept and matched - matches = add_to_engine_(order1); + matches = add_to_engine_(order11); ASSERT_EQ(matches.size(), 1); ASSERT_EQ_MATCH(matches[0], Ticker::ETH, "ABC", "DEF", buy, 1, 1); } diff --git a/exchange/test/src/unit/matching/many_orders.cpp b/exchange/test/src/unit/matching/many_orders.cpp index 39049f30..3e297368 100644 --- a/exchange/test/src/unit/matching/many_orders.cpp +++ b/exchange/test/src/unit/matching/many_orders.cpp @@ -65,11 +65,12 @@ TEST_F(UnitManyOrders, CorrectTimePriority) TEST_F(UnitManyOrders, OnlyMatchesOne) { tagged_limit_order order1{trader1, Ticker::ETH, buy, 1.0, 1.0}; + tagged_limit_order order11{trader1, Ticker::ETH, buy, 1.0, 1.0}; tagged_limit_order order2{trader2, Ticker::ETH, sell, 1.0, 1.0}; auto matches = add_to_engine_(order1); ASSERT_EQ(matches.size(), 0); - matches = add_to_engine_(order1); + matches = add_to_engine_(order11); ASSERT_EQ(matches.size(), 0); matches = add_to_engine_(order2); diff --git a/exchange/test/src/unit/matching/order_fee_matching.cpp b/exchange/test/src/unit/matching/order_fee_matching.cpp index bc0b1beb..c7edf96a 100644 --- a/exchange/test/src/unit/matching/order_fee_matching.cpp +++ b/exchange/test/src/unit/matching/order_fee_matching.cpp @@ -109,6 +109,7 @@ TEST_F(UnitOrderFeeMatching, NoMatchThenMatchBuy) TEST_F(UnitOrderFeeMatching, NoMatchThenMatchSell) { tagged_limit_order order1{trader1, Ticker::ETH, buy, 1.0, 1.0, 0}; + tagged_limit_order order11{trader1, Ticker::ETH, buy, 1.0, 1.0, 0}; tagged_limit_order order2{trader2, Ticker::ETH, buy, 1.0, 1.0, 0}; tagged_limit_order order3{trader3, Ticker::ETH, sell, 2.0, 0.0, 0}; @@ -116,7 +117,7 @@ TEST_F(UnitOrderFeeMatching, NoMatchThenMatchSell) ASSERT_EQ(matches.size(), 0); matches = add_to_engine_(order2); ASSERT_EQ(matches.size(), 0); - matches = add_to_engine_(order1); + matches = add_to_engine_(order11); ASSERT_EQ(matches.size(), 0); matches = add_to_engine_(order3); ASSERT_EQ(matches.size(), 2); @@ -284,8 +285,10 @@ TEST_F(UnitOrderFeeMatching, NotEnoughToEnough) { trader1.get_portfolio().modify_capital(-TEST_STARTING_CAPITAL + 1); - tagged_limit_order order2{trader2, Ticker::ETH, sell, 1.0, 1.0, 0}; tagged_limit_order order1{trader1, Ticker::ETH, buy, 1.0, 1.0, 0}; + tagged_limit_order order11{trader1, Ticker::ETH, buy, 1.0, 1.0, 0}; + tagged_limit_order order2{trader2, Ticker::ETH, sell, 1.0, 1.0, 0}; + tagged_limit_order order22{trader2, Ticker::ETH, sell, 1.0, 1.0, 0}; // Thrown out auto matches = add_to_engine_(order1); @@ -301,11 +304,11 @@ TEST_F(UnitOrderFeeMatching, NotEnoughToEnough) trader1.get_portfolio().modify_capital(0.5); // Kept, but not matched - matches = add_to_engine_(order2); + matches = add_to_engine_(order22); ASSERT_EQ(matches.size(), 0); // Kept and matched - matches = add_to_engine_(order1); + matches = add_to_engine_(order11); ASSERT_EQ(matches.size(), 1); ASSERT_EQ_MATCH(matches[0], Ticker::ETH, "ABC", "DEF", buy, 1, 1); diff --git a/exchange/test/src/util/macros.cpp b/exchange/test/src/util/macros.cpp index d202a7f2..3c31a7b5 100644 --- a/exchange/test/src/util/macros.cpp +++ b/exchange/test/src/util/macros.cpp @@ -1,6 +1,38 @@ #include "macros.hpp" +#include "common/util.hpp" + namespace nutc::test { + +limit_order +make_limit_order( + common::Ticker ticker, common::Side side, common::decimal_quantity quantity, + common::decimal_price price, bool ioc +) +{ + return {ticker, + side, + quantity, + price, + ioc, + common::get_time(), + common::generate_order_id()}; +} + +common::market_order +make_market_order( + common::Ticker ticker, common::Side side, common::decimal_quantity quantity +) +{ + return {ticker, side, quantity, common::get_time()}; +} + +common::cancel_order +make_cancel_order(common::Ticker ticker, common::order_id_t order_id) +{ + return {ticker, order_id, common::get_time()}; +} + bool order_equality(const common::market_order& order1, const common::market_order& order2) { diff --git a/exchange/test/src/util/macros.hpp b/exchange/test/src/util/macros.hpp index ec17e393..896ae378 100644 --- a/exchange/test/src/util/macros.hpp +++ b/exchange/test/src/util/macros.hpp @@ -2,6 +2,7 @@ #include "common/messages_wrapper_to_exchange.hpp" #include "common/types/algorithm/base_algorithm.hpp" #include "common/types/ticker.hpp" +#include "common/util.hpp" #include "exchange/orders/storage/order_storage.hpp" #include "exchange/traders/trader_container.hpp" @@ -30,6 +31,18 @@ PrintTo(const AlgoLanguage& op, std::ostream* os) namespace nutc::test { +limit_order make_limit_order( + common::Ticker ticker, common::Side side, common::decimal_quantity quantity, + common::decimal_price price, bool ioc = false +); + +common::market_order make_market_order( + common::Ticker ticker, common::Side side, common::decimal_quantity quantity +); + +common::cancel_order +make_cancel_order(common::Ticker ticker, common::order_id_t order_id); + bool validate_match( const nutc::common::match& match, common::Ticker ticker, const std::string& buyer_id, const std::string& seller_id, common::Side side, From 9fc14c83ed21f2ad5d0f2408f4650e4fddff8d29 Mon Sep 17 00:00:00 2001 From: stevenewald Date: Tue, 15 Oct 2024 17:32:43 -0500 Subject: [PATCH 2/6] Rate limiter and outgoing message limit increases --- exchange/src/exchange/config/static/config.hpp | 2 +- exchange/src/exchange/matching_cycle/base/base_cycle.cpp | 1 + exchange/src/exchange/wrappers/messaging/pipe_writer.cpp | 2 ++ exchange/src/wrapper/messaging/rate_limiter.hpp | 2 +- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/exchange/src/exchange/config/static/config.hpp b/exchange/src/exchange/config/static/config.hpp index 6b0c9d75..ecbdf5b9 100644 --- a/exchange/src/exchange/config/static/config.hpp +++ b/exchange/src/exchange/config/static/config.hpp @@ -4,7 +4,7 @@ #define DEBUG_NUM_USERS 2 // How many outgoing messages for one wrapper before we start dropping -#define MAX_OUTGOING_MQ_SIZE 1000 +#define MAX_OUTGOING_MQ_SIZE 10000 // Limit to 16kb #define MAX_PIPE_MSG_SIZE 16000 diff --git a/exchange/src/exchange/matching_cycle/base/base_cycle.cpp b/exchange/src/exchange/matching_cycle/base/base_cycle.cpp index d6990c53..8502724b 100644 --- a/exchange/src/exchange/matching_cycle/base/base_cycle.cpp +++ b/exchange/src/exchange/matching_cycle/base/base_cycle.cpp @@ -107,6 +107,7 @@ BaseMatchingCycle::handle_matches_(std::vector matches) traders_.begin(), traders_.end(), [&message = *update](GenericTrader& trader) { trader.send_message(message); } ); + usleep(3000); } } // namespace nutc::exchange diff --git a/exchange/src/exchange/wrappers/messaging/pipe_writer.cpp b/exchange/src/exchange/wrappers/messaging/pipe_writer.cpp index a0f2fb56..791cf86b 100644 --- a/exchange/src/exchange/wrappers/messaging/pipe_writer.cpp +++ b/exchange/src/exchange/wrappers/messaging/pipe_writer.cpp @@ -2,6 +2,7 @@ #include "async_pipe_runner.hpp" #include "exchange/config/static/config.hpp" +#include namespace nutc::exchange { @@ -22,6 +23,7 @@ PipeWriter::send_message(const std::string& message) queued_shared_.push_back(message); if (queued_shared_.size() > MAX_OUTGOING_MQ_SIZE) [[unlikely]] { queued_shared_.pop_front(); + std::cerr << "DROPPED MESSAGE\n"; } // It will enqueue these shared anyway diff --git a/exchange/src/wrapper/messaging/rate_limiter.hpp b/exchange/src/wrapper/messaging/rate_limiter.hpp index cfb77559..f6645ef7 100644 --- a/exchange/src/wrapper/messaging/rate_limiter.hpp +++ b/exchange/src/wrapper/messaging/rate_limiter.hpp @@ -8,7 +8,7 @@ class RateLimiter { std::queue timestamps_; // TODO(stevenewald): make configurable - static constexpr size_t MAX_CALLS = 3'000; + static constexpr size_t MAX_CALLS = 50'000; static constexpr std::chrono::seconds TIME_WINDOW = std::chrono::seconds(1); public: From 6d0f4c2202ec5dfc466d159ff4d0aacd191578a8 Mon Sep 17 00:00:00 2001 From: Max Glass Date: Thu, 17 Oct 2024 21:20:18 -0500 Subject: [PATCH 3/6] make webserver return algo code and language --- webserver/src/main.rs | 162 ++++++++++++++++++++++++++++++------------ 1 file changed, 118 insertions(+), 44 deletions(-) diff --git a/webserver/src/main.rs b/webserver/src/main.rs index 01c1c0cc..f6883e01 100644 --- a/webserver/src/main.rs +++ b/webserver/src/main.rs @@ -5,7 +5,7 @@ use aws_sdk_s3::presigning::PresigningConfig; use base64::encode; use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod}; use reqwest::Client; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::json; use tokio_postgres::NoTls; @@ -60,6 +60,7 @@ async fn set_linter_status( async fn handle_algo_submission( data: web::Path<(String, String)>, db_pool: web::Data, + s3_client: web::Data, ) -> impl Responder { dotenv::dotenv().ok(); @@ -115,7 +116,7 @@ async fn handle_algo_submission( } // todo: this function shouldnt return a http request - return request_sandbox(algo_id, algo_url, language, &db_pool).await; + return request_sandbox(algo_id, algo_url, language, &db_pool, &s3_client).await; } async fn request_sandbox( @@ -123,6 +124,7 @@ async fn request_sandbox( algo_url: String, language: String, pool: &Pool, + s3_client: &web::Data, ) -> HttpResponse { let mut postgres_client = if let Ok(client) = pool.get().await { client @@ -144,23 +146,6 @@ async fn request_sandbox( .await .unwrap(); - let s3_region = std::env::var("AWS_REGION").expect("env variable `AWS_REGION` should be set"); - let s3_endpoint = - std::env::var("S3_ENDPOINT").expect("env variable `S3_ENDPOINT` should be set"); - - let region_provider = RegionProviderChain::default_provider().or_else(Region::new(s3_region)); - let mut sdk_config = aws_config::from_env() - .region(region_provider) - .endpoint_url(s3_endpoint) - .load() - .await; - - let config = aws_sdk_s3::config::Builder::from(&sdk_config) - .force_path_style(true) - .build(); - - let s3_client = aws_sdk_s3::Client::from_conf(config); - let expires_in = match PresigningConfig::expires_in(std::time::Duration::from_secs(60 * 10)) { Ok(expires_in) => expires_in, Err(e) => { @@ -247,7 +232,12 @@ async fn request_sandbox( } } -async fn get_algorithms(case: String, uid: Option, pool: &Pool) -> impl Responder { +async fn get_algorithms( + case: String, + uid: Option, + pool: &Pool, + s3_client: &web::Data, +) -> impl Responder { let postgres_client = match pool.get().await { Ok(client) => client, Err(e) => { @@ -257,27 +247,28 @@ async fn get_algorithms(case: String, uid: Option, pool: &Pool) -> impl }; let query = r#" - WITH "MostRecentAlgos" AS ( - WITH "FilteredAlgosByCase" AS ( - SELECT a."uid", f."s3Key", f."createdAt" AS "timestamp" - FROM "algos" AS a - INNER JOIN "algo_file" AS f ON a."algoFileS3Key" = f."s3Key" - WHERE a."case" = $1 - AND ($2::text IS NULL OR a."uid" = $2) - ) - SELECT "uid", MAX("timestamp") AS "timestamp" - FROM "FilteredAlgosByCase" - GROUP BY "uid" - ) - SELECT - p."firstName" || ' ' || p."lastName" AS "name", - a."s3Key" - FROM "profiles" AS p - INNER JOIN ( - SELECT u."uid", f."s3Key" - FROM "MostRecentAlgos" AS u - INNER JOIN "algo_file" AS f ON u."timestamp" = f."createdAt" - ) AS a ON p."uid" = a."uid"; + WITH "MostRecentAlgos" AS ( + WITH "FilteredAlgosByCase" AS ( + SELECT a."uid", a."language", f."s3Key", f."createdAt" AS "timestamp" + FROM "algos" AS a + INNER JOIN "algo_file" AS f ON a."algoFileS3Key" = f."s3Key" + WHERE a."case" = $1 + AND ($2::text IS NULL OR a."uid" = $2) + ) + SELECT "uid", "language", MAX("timestamp") AS "timestamp" + FROM "FilteredAlgosByCase" + GROUP BY "uid", "language" + ) + SELECT + p."firstName" || ' ' || p."lastName" AS "name", + a."s3Key", + a."language" + FROM "profiles" AS p + INNER JOIN ( + SELECT u."uid", f."s3Key", u."language" + FROM "MostRecentAlgos" AS u + INNER JOIN "algo_file" AS f ON u."timestamp" = f."createdAt" + ) AS a ON p."uid" = a."uid"; "#; let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&case, &uid]; @@ -303,7 +294,70 @@ async fn get_algorithms(case: String, uid: Option, pool: &Pool) -> impl }) .collect(); - HttpResponse::Ok().json(response_data) + #[derive(Serialize)] + struct Response { + name: String, + language: String, + code: String, + } + + let mut out = Vec::new(); + + for algo_response in &response_data { + let algo_response = match algo_response.as_array() { + Some(algo_response) => algo_response, + None => continue, + }; + + let name = match algo_response.get(0) { + Some(name) => name, + None => continue, + }; + let s3_key = match algo_response.get(1).map(serde_json::Value::as_str) { + Some(Some(s3_key)) => s3_key, + _ => continue, + }; + let algo_language = match algo_response.get(2).map(serde_json::Value::as_str) { + Some(Some(lang)) => lang, + _ => continue, + }; + + let res = if let Ok(res) = s3_client + .get_object() + .bucket("nutc") + .key(s3_key) + .send() + .await + { + res + } else { + eprintln!("failed to fetch algo file. key: {s3_key}"); + continue; + }; + + let file_content_bytes = res.body.collect().await.map(|x| x.to_vec()); + let file_content_bytes = if let Ok(b) = file_content_bytes { + b + } else { + continue; + }; + + match String::from_utf8(file_content_bytes) { + Ok(file_content) => { + out.push(Response { + name: name.to_string(), + language: algo_language.to_string(), + code: file_content, + }); + } + Err(err) => { + eprintln!("fetched file was not utf8: {err}"); + continue; + } + } + } + + HttpResponse::Ok().json(json!(out)) } #[tracing::instrument] @@ -311,9 +365,10 @@ async fn get_algorithms(case: String, uid: Option, pool: &Pool) -> impl async fn get_single_user_algorithm( data: web::Path<(String, String)>, db_pool: web::Data, + s3_client: web::Data, ) -> impl Responder { let (case, uid) = data.into_inner(); - get_algorithms(case, Some(uid), &db_pool).await + get_algorithms(case, Some(uid), &db_pool, &s3_client).await } #[tracing::instrument] @@ -321,9 +376,10 @@ async fn get_single_user_algorithm( async fn get_all_user_algorithms( data: web::Path, db_pool: web::Data, + s3_client: web::Data, ) -> impl Responder { let case = data.into_inner(); - get_algorithms(case, None, &db_pool).await + get_algorithms(case, None, &db_pool, &s3_client).await } #[actix_web::main] @@ -358,6 +414,23 @@ async fn main() -> std::io::Result<()> { ) .init(); + let s3_region = std::env::var("AWS_REGION").expect("env variable `AWS_REGION` should be set"); + let s3_endpoint = + std::env::var("S3_ENDPOINT").expect("env variable `S3_ENDPOINT` should be set"); + + let region_provider = RegionProviderChain::default_provider().or_else(Region::new(s3_region)); + let sdk_config = aws_config::from_env() + .region(region_provider) + .endpoint_url(s3_endpoint) + .load() + .await; + + let config = aws_sdk_s3::config::Builder::from(&sdk_config) + .force_path_style(true) + .build(); + + let s3_client = aws_sdk_s3::Client::from_conf(config); + tracing::info!("Starting server."); HttpServer::new(move || { @@ -378,6 +451,7 @@ async fn main() -> std::io::Result<()> { .supports_credentials(); App::new() .app_data(web::Data::new(pool.clone())) + .app_data(web::Data::new(s3_client.clone())) .service(handle_algo_submission) .service(get_single_user_algorithm) .service(get_all_user_algorithms) From 1a4e25124085f7134b9ec11cbcb9b7c681856111 Mon Sep 17 00:00:00 2001 From: Steven Ewald Date: Fri, 18 Oct 2024 10:06:26 -0500 Subject: [PATCH 4/6] Pull algos in normal mode --- docker/dev/docker-compose.yml | 11 ++-- .../types/algorithm/remote_algorithm.hpp | 8 ++- .../algos/normal_mode/normal_mode.cpp | 57 +++++++++---------- .../algos/normal_mode/normal_mode.hpp | 2 +- exchange/src/exchange/main.cpp | 2 +- exchange/src/exchange/sandbox_server/crow.cpp | 3 +- exchange/src/wrapper/runtime/runtime.hpp | 4 +- webserver/src/main.rs | 41 +++++++------ 8 files changed, 65 insertions(+), 63 deletions(-) diff --git a/docker/dev/docker-compose.yml b/docker/dev/docker-compose.yml index a35d127c..cf1773a1 100644 --- a/docker/dev/docker-compose.yml +++ b/docker/dev/docker-compose.yml @@ -8,12 +8,15 @@ services: build: context: ../.. dockerfile: exchange/docker/sandbox/Dockerfile - args: - firebase_emulator: "false" restart: unless-stopped environment: - NUTC_EXPOSE_METRICS=1 - command: ["--sandbox"] + - NUTC_ALGO_ENDPOINT=http://webserver:16124/algorithms/HFT + depends_on: + - webserver + - localstack + - postgres + # command: ["--sandbox"] #port: 16124 webserver: @@ -29,8 +32,6 @@ services: build: context: ../.. dockerfile: exchange/docker/linter/Dockerfile - args: - firebase_emulator: "false" # Exposed on port 9000 prometheus: diff --git a/exchange/src/common/types/algorithm/remote_algorithm.hpp b/exchange/src/common/types/algorithm/remote_algorithm.hpp index bc37d0bf..3ec15527 100644 --- a/exchange/src/common/types/algorithm/remote_algorithm.hpp +++ b/exchange/src/common/types/algorithm/remote_algorithm.hpp @@ -13,11 +13,15 @@ namespace nutc::common { class RemoteAlgorithm : public BaseAlgorithm { std::string id_; std::string algo_data_; + std::string display_name_; public: - RemoteAlgorithm(AlgoLanguage language, std::string algo_id, std::string algo_data) : + RemoteAlgorithm( + AlgoLanguage language, std::string algo_id, std::string algo_data, + std::string display_name + ) : BaseAlgorithm{language}, id_{std::move(algo_id)}, - algo_data_{std::move(algo_data)} + algo_data_{std::move(algo_data)}, display_name_{std::move(display_name)} {} std::string diff --git a/exchange/src/exchange/algos/normal_mode/normal_mode.cpp b/exchange/src/exchange/algos/normal_mode/normal_mode.cpp index 84700d2c..0d2bd105 100644 --- a/exchange/src/exchange/algos/normal_mode/normal_mode.cpp +++ b/exchange/src/exchange/algos/normal_mode/normal_mode.cpp @@ -17,34 +17,31 @@ NormalModeAlgoInitializer::initialize_trader_container( TraderContainer& traders, common::decimal_price start_capital ) const { - constexpr const std::array REQUIRED_DB_FIELDS = { - "latestAlgoId", "firstName", "lastName", "algos" - }; - glz::json_t::object_t firebase_users = get_remote_traders(); - for (const auto& user_it : firebase_users) { - const auto& user_id = user_it.first; - const auto& user = user_it.second; - - bool contains_all_fields = - std::ranges::all_of(REQUIRED_DB_FIELDS, [&user](const char* field) { - return user.contains(field); - }); - if (!contains_all_fields) { - continue; - } - - std::string full_name = fmt::format( - "{} {}", user["firstName"].get(), - user["lastName"].get() - ); - std::string algo_id = user["latestAlgoId"].get(); - + static int id = 0; + auto firebase_users = get_remote_traders(); + for (const auto& user : firebase_users) { try { - // TODO: add back - // traders.add_trader(common::RemoteAlgorithm{}, start_capital); - log_i(main, "Created user"); + if (!user.contains("language") || !user.contains("code") + || !user.contains("name")) { + throw std::runtime_error("Not contain field"); + } + if (!user["language"].is_string() || !user["code"].is_string() + || !user["name"].is_string()) { + throw std::runtime_error("not string"); + } + common::AlgoLanguage language = + user["language"].get() == "Python" + ? common::AlgoLanguage::python + : common::AlgoLanguage::cpp; + std::string code = user["code"].get(); + std::string name = user["name"].get(); + traders.add_trader( + common::RemoteAlgorithm(language, std::to_string(id++), code, name), + start_capital + ); + log_i(main, "Created user {}", name); } catch (const std::runtime_error& err) { - log_w(main, "Failed to create user {}", user_id); + log_w(main, "Failed to create user: {}", err.what()); } } @@ -52,14 +49,16 @@ NormalModeAlgoInitializer::initialize_trader_container( std::for_each(traders.begin(), traders.end(), [start_time](auto& trader) { send_start_time(trader, start_time); }); + std::this_thread::sleep_for(std::chrono::seconds(WAIT_SECS)); } -glz::json_t::object_t +glz::json_t::array_t NormalModeAlgoInitializer::get_remote_traders() { - const std::string endpoint = common::get_firebase_endpoint("users.json"); + const auto* endpoint_c = std::getenv("NUTC_ALGO_ENDPOINT"); + const std::string endpoint{endpoint_c}; glz::json_t res = request_to_json("GET", endpoint); - return res.get(); + return res.get(); } } // namespace nutc::exchange diff --git a/exchange/src/exchange/algos/normal_mode/normal_mode.hpp b/exchange/src/exchange/algos/normal_mode/normal_mode.hpp index 6b43162f..14ed5012 100644 --- a/exchange/src/exchange/algos/normal_mode/normal_mode.hpp +++ b/exchange/src/exchange/algos/normal_mode/normal_mode.hpp @@ -21,6 +21,6 @@ class NormalModeAlgoInitializer : public AlgoInitializer { {} private: - static glz::json_t::object_t get_remote_traders(); + static glz::json_t::array_t get_remote_traders(); }; } // namespace nutc::exchange diff --git a/exchange/src/exchange/main.cpp b/exchange/src/exchange/main.cpp index 815126cf..4dc6d2ad 100644 --- a/exchange/src/exchange/main.cpp +++ b/exchange/src/exchange/main.cpp @@ -28,7 +28,7 @@ create_cycle(TraderContainer& traders, const auto& mode) switch (mode) { case Mode::normal: - return std::make_unique( + return std::make_unique( tickers, traders, order_fee, max_order_volume ); case Mode::sandbox: diff --git a/exchange/src/exchange/sandbox_server/crow.cpp b/exchange/src/exchange/sandbox_server/crow.cpp index 9dd1f59f..ddeab1db 100644 --- a/exchange/src/exchange/sandbox_server/crow.cpp +++ b/exchange/src/exchange/sandbox_server/crow.cpp @@ -88,7 +88,8 @@ CrowServer::add_pending_trader_( static const auto STARTING_CAPITAL = Config::get().constants().STARTING_CAPITAL; auto trader = std::make_shared( - common::RemoteAlgorithm{language, algo_id, algorithm_data}, STARTING_CAPITAL + common::RemoteAlgorithm{language, algo_id, algorithm_data, "SANDBOX"}, + STARTING_CAPITAL ); trader_lock.lock(); diff --git a/exchange/src/wrapper/runtime/runtime.hpp b/exchange/src/wrapper/runtime/runtime.hpp index cf1258b3..627df61c 100644 --- a/exchange/src/wrapper/runtime/runtime.hpp +++ b/exchange/src/wrapper/runtime/runtime.hpp @@ -57,13 +57,13 @@ class Runtime { static void log_text(const std::string& text) { - log_i(algo_print, "{}", text); + // log_i(algo_print, "{}", text); } static void log_error(const std::string& text) { - log_e(ALGO_ERROR, "{}", text); + // log_e(ALGO_ERROR, "{}", text); } }; diff --git a/webserver/src/main.rs b/webserver/src/main.rs index f6883e01..70127745 100644 --- a/webserver/src/main.rs +++ b/webserver/src/main.rs @@ -247,28 +247,25 @@ async fn get_algorithms( }; let query = r#" - WITH "MostRecentAlgos" AS ( - WITH "FilteredAlgosByCase" AS ( - SELECT a."uid", a."language", f."s3Key", f."createdAt" AS "timestamp" - FROM "algos" AS a - INNER JOIN "algo_file" AS f ON a."algoFileS3Key" = f."s3Key" - WHERE a."case" = $1 - AND ($2::text IS NULL OR a."uid" = $2) - ) - SELECT "uid", "language", MAX("timestamp") AS "timestamp" - FROM "FilteredAlgosByCase" - GROUP BY "uid", "language" - ) - SELECT - p."firstName" || ' ' || p."lastName" AS "name", - a."s3Key", - a."language" - FROM "profiles" AS p - INNER JOIN ( - SELECT u."uid", f."s3Key", u."language" - FROM "MostRecentAlgos" AS u - INNER JOIN "algo_file" AS f ON u."timestamp" = f."createdAt" - ) AS a ON p."uid" = a."uid"; + WITH "FilteredAlgosByCase" AS ( + SELECT + a."uid", + a."language", + f."s3Key", + f."createdAt" AS "timestamp", + ROW_NUMBER() OVER (PARTITION BY a."uid" ORDER BY f."createdAt" DESC) AS rn + FROM "algos" AS a + INNER JOIN "algo_file" AS f ON a."algoFileS3Key" = f."s3Key" + WHERE a."case" = $1 + AND ($2::text IS NULL OR a."uid" = $2) + ) + SELECT + p."firstName" || ' ' || p."lastName" AS "name", + a."s3Key", + a."language" + FROM "profiles" AS p + INNER JOIN "FilteredAlgosByCase" AS a ON p."uid" = a."uid" + WHERE a.rn = 1; "#; let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&case, &uid]; From 69752c1cfbeffd2889d53d7a2bb3dde09fcde066 Mon Sep 17 00:00:00 2001 From: Steven Ewald Date: Fri, 18 Oct 2024 11:04:49 -0500 Subject: [PATCH 5/6] checkpoint --- docker/dev/docker-compose.yml | 118 ++++++------------ exchange/docker/dev/grafana_data/grafana.db | Bin 1589248 -> 1589248 bytes exchange/docker/sandbox/prometheus.yml | 2 +- .../src/common/compilation/compile_cpp.cpp | 5 +- .../util => common}/resource_limits.hpp | 14 +++ .../algos/normal_mode/normal_mode.cpp | 29 ++--- exchange/src/exchange/main.cpp | 4 + .../matching_cycle/base/base_cycle.cpp | 13 +- .../src/exchange/metrics/on_tick_metrics.cpp | 10 +- .../traders/trader_types/algo_trader.hpp | 11 +- .../traders/trader_types/generic_trader.hpp | 11 +- .../wrappers/handle/wrapper_handle.cpp | 12 ++ .../wrappers/messaging/async_pipe_runner.cpp | 7 +- .../wrappers/messaging/pipe_writer.cpp | 3 +- exchange/src/wrapper/config/argparse.cpp | 9 +- exchange/src/wrapper/config/argparse.hpp | 1 + exchange/src/wrapper/main.cpp | 7 +- .../wrapper/runtime/python/python_runtime.cpp | 12 +- 18 files changed, 142 insertions(+), 126 deletions(-) rename exchange/src/{wrapper/util => common}/resource_limits.hpp (78%) diff --git a/docker/dev/docker-compose.yml b/docker/dev/docker-compose.yml index cf1773a1..ca03e19f 100644 --- a/docker/dev/docker-compose.yml +++ b/docker/dev/docker-compose.yml @@ -3,7 +3,7 @@ version: '3' services: - sandbox: + sandbox1: image: nutc-exchange build: context: ../.. @@ -12,10 +12,45 @@ services: environment: - NUTC_EXPOSE_METRICS=1 - NUTC_ALGO_ENDPOINT=http://webserver:16124/algorithms/HFT + - NUTC_ID=0 + depends_on: + - webserver + # sandbox2: + # image: nutc-exchange + # build: + # context: ../.. + # dockerfile: exchange/docker/sandbox/Dockerfile + # restart: unless-stopped + # environment: + # - NUTC_EXPOSE_METRICS=1 + # - NUTC_ALGO_ENDPOINT=http://webserver:16124/algorithms/HFT + # - NUTC_ID=1 + # depends_on: + # - webserver + # sandbox3: + # image: nutc-exchange + # build: + # context: ../.. + # dockerfile: exchange/docker/sandbox/Dockerfile + # restart: unless-stopped + # environment: + # - NUTC_EXPOSE_METRICS=1 + # - NUTC_ALGO_ENDPOINT=http://webserver:16124/algorithms/HFT + # - NUTC_ID=2 + # depends_on: + # - webserver + sandbox4: + image: nutc-exchange + build: + context: ../.. + dockerfile: exchange/docker/sandbox/Dockerfile + restart: unless-stopped + environment: + - NUTC_EXPOSE_METRICS=1 + - NUTC_ALGO_ENDPOINT=http://webserver:16124/algorithms/HFT + - NUTC_ID=3 depends_on: - webserver - - localstack - - postgres # command: ["--sandbox"] #port: 16124 @@ -26,13 +61,6 @@ services: dockerfile: webserver/Dockerfile restart: unless-stopped - linter: - image: nutc-linter - restart: unless-stopped - build: - context: ../.. - dockerfile: exchange/docker/linter/Dockerfile - # Exposed on port 9000 prometheus: image: prom/prometheus @@ -43,16 +71,6 @@ services: - '--storage.tsdb.retention.time=12h' restart: unless-stopped - reverse-proxy: - image: nginx:latest - ports: - - "26389:80" - volumes: - - ./nginx.conf:/etc/nginx/nginx.conf - restart: unless-stopped - depends_on: - - webserver - - grafana grafana: image: grafana/grafana @@ -69,63 +87,3 @@ services: - ../../exchange/docker/dev/grafana_data:/var/lib/grafana # - /var/lib/grafana/grafana.db # - /var/lib/grafana/alerting - - web: - image: node:latest - restart: unless-stopped - working_dir: /app - environment: - - PORT=3001 - volumes: - - ../../web:/app - command: ["npm", "run", "dev",] - ports: - - "3001:3001" - depends_on: - - postgres - - localstack: - image: localstack/localstack - restart: unless-stopped - ports: - - "4566:4566" - - "4571:4571" - - "8080:8080" - environment: - - SERVICES=s3 - - DEFAULT_REGION=us-east-1 - - DATA_DIR=/var/lib/localstack/data - - AWS_ACCESS_KEY_ID=test - - AWS_SECRET_ACCESS_KEY=test - - S3_FORCE_PATH_STYLE=true - volumes: - - localstack:/var/lib/localstack - - awscli: - image: amazon/aws-cli - environment: - AWS_ACCESS_KEY_ID: test - AWS_SECRET_ACCESS_KEY: test - AWS_DEFAULT_REGION: us-east-1 - depends_on: - - localstack - volumes: - - ../../web/dev/local-dev-setup.sh:/home/init.sh - - ../../web/dev/cors.json:/home/cors.json - entrypoint: /home/init.sh - - postgres: - image: postgres:13 - restart: always - environment: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - POSTGRES_DB: nutc - ports: - - "5432:5432" - volumes: - - postgres_data:/var/lib/postgresql/data - -volumes: - postgres_data: - localstack: diff --git a/exchange/docker/dev/grafana_data/grafana.db b/exchange/docker/dev/grafana_data/grafana.db index 8ae6c2208e6089013edc8feee276d029806a0abb..c4ee535241b88ce062d9be4ce09c862e10477bcf 100644 GIT binary patch delta 8630 zcmcgx4Rlo1oqu<}GBZi$zWK-p%Eu%jB?K7WoA)vA%}N$Zt!P1#21Fo~yqWi2zLP0L z0i&A?+k>Um8j9uBJzESFC}=HMx^sJ6+a^|b+rwFlryh0J?NO}fbm`hPRohy3mHppI zNG2hHpwYbZ|KFK6_rCZ0|L_0vrms)y>(lzL^LeWo27LvHfxm?Zs$`qD`iaI<%h17t zY@`aM4yuvc5PPsc@;X{{X6N#~Tt2Uac_kj@;*z+xG1G{geNJZ$pGOZ^0|D1>u1|lfdUV#w>WQ;b*;xc; zt+12t#Y&Ut&}P;b+_1j(3sut6uy$utII$;O)v0w@?1#Y++fcr0?M>?&%I|5ogX08) zv(VkaqonrH;;%a#Alu*U8%FzvZTp7p`-UBvVW%yuyvYRd{U9cfO*@2?Js$_`y!*`c#DJ9KnphxYdD(AJh6T3fS2OG|cW zZq5!(P1(UTvx8w|hiz0O1vOBic(j#@RgJc6VJbjIw3*5_7u`;!D?*#7d_Hs=l`s!& z+*o)9owKie#Bpxry^cW_aSZ0&yWIKtrof3et^T+2R@(zdZ?Il`E3eX7+!WaIY-Tv` zC`JdZV;A!@?**ptg7205hw>i+wA2B79VisA@!wHU%E}Ezsjlf%H&%CuY-KITE-VyX$*R8%bn(qB!26Eze)rM>0sCH?Oj)qjFBHIX+|jgOfc zY(YX(^@OPEvYJ#3K}v{%A!-T_?-zxHXvjRMO%-`fNyr+t$GoaxdWD;HzH7n0bMDMJ&j-CUIAm(Qc>~Bs+{50O6LRbZJ{pú?d{_LGo z%5%$T-W|Oc5n{X&i^G_lQC7ZX?U^{p%1i1R_VngVibgYQG6rqk_ZZf4QC zb#>`o)b~{}5)&g3mZDr#Ts8_(jB`qyi$=f^cyZ+Eob~F%zHoW&EGf=|a8g77YZo~& znnjVd@SBx%$H-giKgwGwa^Dh*i)uV7L{tUhoGhz3uel@B=<{%S8T<++n1GZDw$?oC z>$F~Z*tbwFgYBwJmuqenDY&{c6UsTJVqp4hI~SS6i93s<#!8)fWDP7YX(atkfkGQ$1Es&pr=Ia4L5~Fd4sy7>*nuqeuh?ye3hmI+ zP^tS;foAdckm2muK0-$+73XlMF9Z)e4q<;jJ0IG*I@52}b z3IO?mAU+^k3PKri9rD2ks{j9mkyd*Y&~BhTr0mhacTi?UX%LaxckmAj;PqtH+(03+ z=0E25SnRLZVKR6Yrx02BMA3iSNFyI`*huPM*(Yq|;z=AqR`@JiQhZ5SBTCxd(W0kz zv?ML`Q*2nLAH%Dx!61tdHM8Yp-Q)O(t%T=dOUri|Mq7Qiwxhc|US56=z;RjrBZ2do z16Z(=)2H#{$QrHWJv#|!!15o2c=2WuUv@-HJR8$xbS zAz$>oZDHEF-5m3my+7zN%v@KM)!E+Dy+hND9oxJ2q>L5Q znl`i_-*+CW!d1}rpqqd^gs&+*i^$-Uc%f|yF9}?F)IE#KRzQG-KkNLDu|d6P%v(6> zEyzXE{~qjhk(7l!<9qahbJiT^%VaUg zx{Sr6d^}3dD>zq|(>8JiV;U%1G2X;&a!?J@x|<*ME`oUBx$Cj6LrE+ZVIJ>waQej z?Oo%toUG{V%9RDG#aLY6XROvM#RwWF7R_Fit@`AnEVcM0SIl9~M;Yg%6Aa{18PJD7 zwKc8dsy%U2OKYxbW7J}WnVZSfs$HB7zjw~a(?n~yiw%7$RY3{QN2dt()z=U{ke(R~ z*u!Y)jG`=0xC+)YRe3w3_?j(qk?;wXxzvu9f7C=o7mMV;BmOfJ3Mua2@Hv|`@FwVI zELS=lqfY+@<9x%K&gl$mE@%(aZlEr;iMDYFneXgich?3Sb2rAVZ31^^|Q=2i&ryeL-l09x3Et-;go7inMi+8n4a144h6h73?x*UIG z?0=lJVLzlBwyR}aDW}xtmPty9!iLpV<6lC-dkDXaX7CT9sWp@B&SV-wO0J&VptqPR z-F)sE!UnBY(Ty{TMxVbzJX=lbUJA~!k*UISZYyKKsR8ZM(VCf8bTIu>Yw9_9a{A5; zglsH5|L{K(^r6eD$=N;s;JL}K2UV>lWfua&Q#B`-b&cojRJZF$`p1FZJnOZieto1% zWM5>5XRi;hpgq&4q7lef>Ze(~amp0>Uw^IouOsLW_|LYfPON6JbFDzf!$mCJ@7if< zb^6dw3L8makh~-RbTmeXG-*mkM+ONqD7G&#tZ(G^hv!0trb72|?0N6c~vbb-*2y zMn4!GM}s=vMV+1C10OSv`06`4&L_C;=zKFGtn;a>vJ1@Wx;}>ZV$em|bF1^_Av5#O zbbsemRoAU^&OMLcJ-52IH`?19?K{ik`U%IO8-Y0ZRBfFa;$1&^qVw@;bn_iza4JgN zVFs5W&mDciLukTNSJiBiawR>W$PvYiXws(6OeeDU+HJRRwp*U^);1x!@*ryG$#aL$ zD^$|)7d#!uj*jQ=YarR~y@_7?nHZKR((u!Z_>~(O~u#7$1g7F#3{TAe}zB8AL0K7gWtff z;aBi(yc{>;I(#9Xg>@Vp`O=8wsK7lA!G%zBvvp`{vJOj^T8Bnv{Kcq&nSVZ7!X^+# zi`f)pRL>?c9W7$hn2Z*(iJXJ#*i;J90yY^Bn$M<_gXYb%gsog@9XdL!!-^Hwp}pNP znW=AUvo4n}w+^kX)?wK)>(J7Y{}kG5tG!>?Tf0Tr<)Ff@oGmr>8LQDRpC)~Wb1t-@ z`;L(}4(C+Z3s9$;F2kOZeIafL# zzNJ+z0X|MCs%uHp2#1waIAtVbhGxX#iIA!oayY6*!|{|MrBqW6>AI9sbTgVTOfw!e zO;a`#VOi33J)y@9MOP#}p4397B$**CoPtLsGZxZ9AytaSl4e*}LUO7g9#hP4*w7`o zlL{;GkWBWh^i1JZiTonpIiW`iO0pycWhE3e&7i5Ij~2h<8i~#)L1B%ip9gK98*$iJeAZElCHp zlap#(GE-_uN=P9!tSPBvLW(ApL_8KsCO}VI38kb|DjrWH^pIgD6M8Iqdh1uC$6tbA zMN?osqfOJ4KWF&MQY&F8sXet&pLrJ)dM-?-s%Q0O^gv}@n2`O_2n zZY&Xx8d}VVOLEwZ#Y0k3NlGa#rN!iA2tsLu3|Ujdx*peb89bg!B~qFckISl_GL&c> z;t|(i76~<^Y2V}Md%R@d0I=A+Wp8wx6{j7=O^Wcorz&V%)`5DRiHFkIyGUaD#;XDVmSZ*#5@-8TG34*wDV z34aD7e~YU$IH4<;jX1`(&oL+ z%wE?wJq#c+4ep$Ngs5}CZP;@qt(h%+p@=V_(=azx3g4I0z*Fsby*APVKt=B zkfg|#-ACO(xj=bz_tE_MtvMdg{QMT23v)X!k1p=;e2aRxvZnf`bg678K?NWH>R(c{ zYRRZp!j@b)qG`df4$vT(VS~w)BuL(rkxGUEXk=FAVFiEy6vv{`XgnD=Pbor{o;DuZe6s7ympDlPTpDRDM4X|L+6|ao&ab|Uztv?_dB_&ruxS8wG5a{Eof*#S=B7* z%y())4uLfhT@Fei#nd%fl}XPfo}i!IG9#)HQRQG*lN4Px!({g*o+X50=gdr}$0g;w zgXd$4Dfl9c$^4&rF0aU&Vyj0{Q;Q~(Aw^Awm8cX8NlG}C(h>} zF*|N>GUF}II@_Pw=|Nq&?rP4)&ElN191Ce)0B=LIug$xkC%qFKF0!rMdxX>$JG_o~ zwCl3&DA_vEaWUC>uI~~xdU;}vKnULP5szHUjwsPU4* zvR@q$AK*CKp^@R6HY{Fp!f-j16%Xcc$3VrWqsGhii^q+ZBI@9Hhz%;vhlhCdAm{uI zdXVNxxIm!&?cP_RpFFkKIuGHMWbf5tJL!7c_iN?_Tl&@+%_C~>Eo*AF)DDM@<)F^Y zoGzRu6t3Y=kb^75W+wg+Yh`C-YO$>g&854a#ckPu(#zI)O+>a8J8H;*?|HAGF!oP2 z+VDAkUZBqV{1a^K!D&RU^nF13-^7>}P4Iun)6ReMe}Tx}9-l;{`}_y#YiEn^+vxj! zVkb|+80qa5n} zy4c0j_a)y85a&L8hmG!P_dbTmEn7r4omwe=Xt~X#`#Ot1bY#bot{z|Cu(J;F%s0nr z^yfwQ2z21fqOCmXEBCrsJcy0bN6OD?6(NAZ)0?8y;pN$@c|i5|GZ|2|K5EfeSRwEi)?o#zdi z3>+5yf0vLVLz!8AZ8EvGsXKaQcX_0|{0GcK^x!TAS0~9gI)MBIc9y{dyznY#dzEf& z^S*02-kBXAI9uFOb~LbXlzVdP7LIj~YSM;_imL_Yo88FWFt*sn`-i%h!qSg zq_cojpemp_H2q@XE<{$X_I}BBz=daZUl~m#uUyr=p({DNqkPJ!OVW40H$@t570)LP zj~10p`OB)h5pNxKEvOsiEw(i35zPY1>6XI;M8vPKK)m<+pFOqf{splalxG^eHW8gS z4z2=-nVm`Y*i+s9G+@E<@ylt>-ATeiJ@cCH|J+h}b5AmpiZtwQx zIjH!C__+&xdr})lO86uSGj_yXR7bJ`FS`z;`JG2HuBpndL=1A|uWo9-(gZ&h}wXrh5C z-@CBn3?s*of!aw7EFqs>@Bf4j>=ZF(o@^ZQQjr(DR72;qc=eo3jiS!5*s| z8xU|?*)yVZC|7!~_pcaZkCh_b5!CcpLv2WoDNg-QyP93TA)2HzQ{?w)& z6PRf;Pnv+=7wGyg@K0^5VfbfjpwV_pd7;bLPVbN(K zuYLse4n5oe3-z3Fq0U2O;7J@h4eYY;3s(3qqTAoV9`x7riX!ObI3oYMcitcrWD)y> zDrB%#V1&odksi3;Uul)$g*08^+RoIaMTRi#HBt#3>c3Ha}~39I*fN;Jk_MYPJjIW|2929nRD1C_MzP5hs<;}!n~+g&m5s=GP> zcp-LsgwwV^^7sIf#)t9knSMVn@~EC}pH}R2uo!0Blvi@BR3%Sb?=QE&BfHaK`mz}k zywS^=u09;0hkxk(J#TFZvYl4p$x2G22kX4B01s;z(;N&#Q*hMse`k^1?#ky}`S4!E z=Bh~lc*U|2SKWhJmRarS;0rK#qoqW2WpHl;?=nhx0!`c&<~12;=_tX_Kf>A+rm#&p zj>+MgLNDya>~^7&6Do=KW9*|1{h~{#NRtbek7)emlGf#8HL6-fwq9Jomc8~Z-x2E{ zCaZs@wRhv^?WA`%&i8=wso6$f=L8+*GJ{E0TSrNjm$iwp4&D`cFXd%v9iHx{Y0x#UGdy;vM225EtK3@ z(`RtPjA7S!gz_|*y<(J-x$P^)Dbd+Z>WH0*eBPex%SxHZ2@}CcKQpqfebn=-+S @@ -43,8 +44,8 @@ compile_cpp(const std::filesystem::path& filepath) if (result) { throw std::runtime_error(fmt::format( - "Compilation of {} failed. Compiler output below:\n {}", filepath.string(), - result.value() + "Compilation of {} failed. Compiler output below:\n {}\n Code: {}", + filepath.string(), result.value(), read_file_content(filepath.string()) )); } return binary_output; diff --git a/exchange/src/wrapper/util/resource_limits.hpp b/exchange/src/common/resource_limits.hpp similarity index 78% rename from exchange/src/wrapper/util/resource_limits.hpp rename to exchange/src/common/resource_limits.hpp index da2c2078..7f64a5d4 100644 --- a/exchange/src/wrapper/util/resource_limits.hpp +++ b/exchange/src/common/resource_limits.hpp @@ -1,4 +1,5 @@ #pragma once +#include #include #ifdef __linux__ @@ -26,6 +27,19 @@ set_memory_limit(std::size_t limit_in_mb) } return true; } + +inline bool +set_cpu_affinity(int cpu) +{ + cpu_set_t mask; + CPU_ZERO(&mask); + CPU_SET(cpu, &mask); + + if (sched_setaffinity(0, sizeof(mask), &mask) == -1) { + return false; + } + return true; +} } // namespace nutc::wrapper #else namespace nutc::wrapper { diff --git a/exchange/src/exchange/algos/normal_mode/normal_mode.cpp b/exchange/src/exchange/algos/normal_mode/normal_mode.cpp index 0d2bd105..4adaddcf 100644 --- a/exchange/src/exchange/algos/normal_mode/normal_mode.cpp +++ b/exchange/src/exchange/algos/normal_mode/normal_mode.cpp @@ -20,35 +20,32 @@ NormalModeAlgoInitializer::initialize_trader_container( static int id = 0; auto firebase_users = get_remote_traders(); for (const auto& user : firebase_users) { + if (!user.contains("language") || !user.contains("code") + || !user.contains("name")) { + throw std::runtime_error("Not contain field"); + } + common::AlgoLanguage language = user["language"].get() == "Python" + ? common::AlgoLanguage::python + : common::AlgoLanguage::cpp; + std::string code = user["code"].get(); + std::string name = user["name"].get(); try { - if (!user.contains("language") || !user.contains("code") - || !user.contains("name")) { - throw std::runtime_error("Not contain field"); - } - if (!user["language"].is_string() || !user["code"].is_string() - || !user["name"].is_string()) { - throw std::runtime_error("not string"); - } - common::AlgoLanguage language = - user["language"].get() == "Python" - ? common::AlgoLanguage::python - : common::AlgoLanguage::cpp; - std::string code = user["code"].get(); - std::string name = user["name"].get(); traders.add_trader( common::RemoteAlgorithm(language, std::to_string(id++), code, name), - start_capital + start_capital, name ); log_i(main, "Created user {}", name); } catch (const std::runtime_error& err) { - log_w(main, "Failed to create user: {}", err.what()); + log_w(main, "Failed to create user {}: {}", name, err.what()); } } + log_i(main, "Done creating users, sending start time"); int64_t start_time = get_start_time(WAIT_SECS); std::for_each(traders.begin(), traders.end(), [start_time](auto& trader) { send_start_time(trader, start_time); }); + log_i(main, "Starting exchange"); std::this_thread::sleep_for(std::chrono::seconds(WAIT_SECS)); } diff --git a/exchange/src/exchange/main.cpp b/exchange/src/exchange/main.cpp index 4dc6d2ad..307e6391 100644 --- a/exchange/src/exchange/main.cpp +++ b/exchange/src/exchange/main.cpp @@ -1,5 +1,6 @@ #include "algos/algo_manager.hpp" #include "common/logging/logging.hpp" +#include "common/resource_limits.hpp" #include "common/util.hpp" #include "exchange/algos/algo_manager.hpp" #include "exchange/config/dynamic/argparse.hpp" @@ -11,6 +12,7 @@ #include "exchange/traders/trader_container.hpp" #include +#include #include @@ -59,6 +61,8 @@ main_event_loop(std::unique_ptr cycle) int main(int argc, const char** argv) { + int env = std::stoi(std::getenv("NUTC_ID")); + nutc::wrapper::set_cpu_affinity(env / 2); nutc::logging::init("exchange.log", quill::LogLevel::Info); std::signal(SIGINT, [](auto) { std::exit(0); }); std::signal(SIGPIPE, SIG_IGN); diff --git a/exchange/src/exchange/matching_cycle/base/base_cycle.cpp b/exchange/src/exchange/matching_cycle/base/base_cycle.cpp index 8502724b..af703b7c 100644 --- a/exchange/src/exchange/matching_cycle/base/base_cycle.cpp +++ b/exchange/src/exchange/matching_cycle/base/base_cycle.cpp @@ -84,6 +84,17 @@ BaseMatchingCycle::match_orders_(std::vector orders) return matches; } +void +yield_for_3_ms() +{ + auto start = std::chrono::steady_clock::now(); + auto end = start + std::chrono::milliseconds(3); + + while (std::chrono::steady_clock::now() < end) { + sched_yield(); + } +} + void BaseMatchingCycle::handle_matches_(std::vector matches) { @@ -107,7 +118,7 @@ BaseMatchingCycle::handle_matches_(std::vector matches) traders_.begin(), traders_.end(), [&message = *update](GenericTrader& trader) { trader.send_message(message); } ); - usleep(3000); + yield_for_3_ms(); } } // namespace nutc::exchange diff --git a/exchange/src/exchange/metrics/on_tick_metrics.cpp b/exchange/src/exchange/metrics/on_tick_metrics.cpp index b80d43f8..a80d8dac 100644 --- a/exchange/src/exchange/metrics/on_tick_metrics.cpp +++ b/exchange/src/exchange/metrics/on_tick_metrics.cpp @@ -137,7 +137,7 @@ TickerMetricsPusher::report_trader_stats(const TickerContainer& tickers) .Add({ {"ticker", common::to_string(ticker)}, {"trader_type", trader.get_type() }, - {"id", trader.get_id() }, + {"name", trader.get_display_name()}, }) .Set(amount_held); } @@ -165,14 +165,14 @@ TickerMetricsPusher::report_trader_stats(const TickerContainer& tickers) per_trader_pnl_gauge .Add({ - {"id", trader.get_id() }, - {"trader_type", trader.get_type()}, + {"trader_type", trader.get_type() }, + {"name", trader.get_display_name()}, }) .Set(pnl); per_trader_capital_gauge .Add({ - {"id", trader.get_id() }, - {"trader_type", trader.get_type()}, + {"trader_type", trader.get_type() }, + {"name", trader.get_display_name()}, }) .Set(capital); }; diff --git a/exchange/src/exchange/traders/trader_types/algo_trader.hpp b/exchange/src/exchange/traders/trader_types/algo_trader.hpp index 30da57b4..150ae3c8 100644 --- a/exchange/src/exchange/traders/trader_types/algo_trader.hpp +++ b/exchange/src/exchange/traders/trader_types/algo_trader.hpp @@ -15,9 +15,10 @@ class AlgoTrader : public GenericTrader { public: explicit AlgoTrader( - const common::algorithm_variant& algo_variant, common::decimal_price capital + const common::algorithm_variant& algo_variant, common::decimal_price capital, + std::string name = "" ) : - GenericTrader(common::get_id(algo_variant), capital), + GenericTrader(common::get_id(algo_variant), capital, std::move(name)), DISPLAY_NAME(common::get_id(algo_variant)), wrapper_handle_(std::make_optional(algo_variant)) {} @@ -29,12 +30,6 @@ class AlgoTrader : public GenericTrader { return TYPE; } - const std::string& - get_display_name() const override - { - return DISPLAY_NAME; - } - bool can_leverage() const override { diff --git a/exchange/src/exchange/traders/trader_types/generic_trader.hpp b/exchange/src/exchange/traders/trader_types/generic_trader.hpp index d7d8fe5c..494a4731 100644 --- a/exchange/src/exchange/traders/trader_types/generic_trader.hpp +++ b/exchange/src/exchange/traders/trader_types/generic_trader.hpp @@ -14,11 +14,16 @@ namespace nutc::exchange { class GenericTrader { std::string user_id_; + std::string display_name_; TraderPortfolio state_; public: - explicit GenericTrader(std::string user_id, common::decimal_price initial_capital) : - user_id_(std::move(user_id)), state_{initial_capital} + explicit GenericTrader( + std::string user_id, common::decimal_price initial_capital, + std::string display_name = "" + ) : + user_id_(std::move(user_id)), display_name_{std::move(display_name)}, + state_{initial_capital} {} GenericTrader(GenericTrader&&) = default; @@ -39,7 +44,7 @@ class GenericTrader { virtual const std::string& get_display_name() const { - return user_id_; + return display_name_; } const std::string& diff --git a/exchange/src/exchange/wrappers/handle/wrapper_handle.cpp b/exchange/src/exchange/wrappers/handle/wrapper_handle.cpp index 18ef1262..e58ff715 100644 --- a/exchange/src/exchange/wrappers/handle/wrapper_handle.cpp +++ b/exchange/src/exchange/wrappers/handle/wrapper_handle.cpp @@ -7,6 +7,8 @@ #include #include +#include + namespace { std::string quote_id(std::string user_id) @@ -78,6 +80,16 @@ WrapperHandle::create_arguments(const common::algorithm_variant& algo_variant) else if (language == common::AlgoLanguage::python) { args.emplace_back("--python_algo"); } + args.emplace_back("--core_num"); + static int id = std::stoi(std::getenv("NUTC_ID")); + if (id <= 1) { + static int num = 32; + args.emplace_back(std::to_string(--num)); + } + else { + static int num = 2; + args.emplace_back(std::to_string(++num)); + } return args; } diff --git a/exchange/src/exchange/wrappers/messaging/async_pipe_runner.cpp b/exchange/src/exchange/wrappers/messaging/async_pipe_runner.cpp index 80fa8e6f..0a3240a1 100644 --- a/exchange/src/exchange/wrappers/messaging/async_pipe_runner.cpp +++ b/exchange/src/exchange/wrappers/messaging/async_pipe_runner.cpp @@ -1,5 +1,7 @@ #include "async_pipe_runner.hpp" +#include "common/resource_limits.hpp" + #include #include #include @@ -18,7 +20,10 @@ AsyncPipeRunner::~AsyncPipeRunner() AsyncPipeRunner::AsyncPipeRunner() : ios(std::make_shared()), work_guard(ios->get_executor()), - ios_thread([this]() { ios->run(); }) + ios_thread([this]() { + wrapper::set_cpu_affinity(2); + ios->run(); + }) {} std::shared_ptr diff --git a/exchange/src/exchange/wrappers/messaging/pipe_writer.cpp b/exchange/src/exchange/wrappers/messaging/pipe_writer.cpp index 791cf86b..c49322cf 100644 --- a/exchange/src/exchange/wrappers/messaging/pipe_writer.cpp +++ b/exchange/src/exchange/wrappers/messaging/pipe_writer.cpp @@ -2,6 +2,7 @@ #include "async_pipe_runner.hpp" #include "exchange/config/static/config.hpp" + #include namespace nutc::exchange { @@ -23,7 +24,7 @@ PipeWriter::send_message(const std::string& message) queued_shared_.push_back(message); if (queued_shared_.size() > MAX_OUTGOING_MQ_SIZE) [[unlikely]] { queued_shared_.pop_front(); - std::cerr << "DROPPED MESSAGE\n"; + // std::cerr << "DROPPED MESSAGE\n"; } // It will enqueue these shared anyway diff --git a/exchange/src/wrapper/config/argparse.cpp b/exchange/src/wrapper/config/argparse.cpp index 8bd49eaa..9bcd9498 100644 --- a/exchange/src/wrapper/config/argparse.cpp +++ b/exchange/src/wrapper/config/argparse.cpp @@ -5,6 +5,8 @@ #include #include +#include + namespace nutc::wrapper { wrapper_args process_arguments(int argc, const char** argv) @@ -41,6 +43,8 @@ process_arguments(int argc, const char** argv) .implicit_value(true) .nargs(0); + program.add_argument("--core_num").nargs(1).required(); + try { program.parse_args(argc, argv); } catch (const std::runtime_error& err) { @@ -50,12 +54,13 @@ process_arguments(int argc, const char** argv) } auto trader_id = program.get("--uid"); + int core_num = std::stoi(program.get("--core_num")); if (program.get("--cpp_algo")) { - return {verbosity, trader_id, common::AlgoLanguage::cpp}; + return {verbosity, trader_id, common::AlgoLanguage::cpp, core_num}; } if (program.get("--python_algo")) { - return {verbosity, trader_id, common::AlgoLanguage::python}; + return {verbosity, trader_id, common::AlgoLanguage::python, core_num}; } throw std::runtime_error("No language provided"); diff --git a/exchange/src/wrapper/config/argparse.hpp b/exchange/src/wrapper/config/argparse.hpp index e46c7e8b..61607798 100644 --- a/exchange/src/wrapper/config/argparse.hpp +++ b/exchange/src/wrapper/config/argparse.hpp @@ -11,6 +11,7 @@ struct wrapper_args { const uint8_t VERBOSITY; const std::string TRADER_ID; const common::AlgoLanguage ALGO_TYPE; + const int CORE_NUM; }; wrapper_args process_arguments(int argc, const char** argv); diff --git a/exchange/src/wrapper/main.cpp b/exchange/src/wrapper/main.cpp index 4867754b..e0057079 100644 --- a/exchange/src/wrapper/main.cpp +++ b/exchange/src/wrapper/main.cpp @@ -3,7 +3,7 @@ #include "wrapper/messaging/exchange_communicator.hpp" #include "wrapper/runtime/cpp/cpp_runtime.hpp" #include "wrapper/runtime/python/python_runtime.hpp" -#include "wrapper/util/resource_limits.hpp" +#include "common/resource_limits.hpp" #include #include @@ -34,7 +34,8 @@ main(int argc, const char** argv) std::signal(SIGINT, catch_sigint); std::signal(SIGTERM, catch_sigterm); - auto [verbosity, trader_id, algo_type] = process_arguments(argc, argv); + auto [verbosity, trader_id, algo_type, core_num] = process_arguments(argc, argv); + nutc::wrapper::set_cpu_affinity(core_num); static constexpr std::uint32_t MAX_LOG_SIZE = 50'000; nutc::logging::init_file_only( @@ -43,7 +44,7 @@ main(int argc, const char** argv) ExchangeCommunicator communicator{trader_id}; - if (!set_memory_limit(1024) || !kill_on_exchange_death()) { + if (!set_memory_limit(2048) || !kill_on_exchange_death()) { log_e(main, "Failed to set memory limit"); communicator.report_startup_complete(); return 1; diff --git a/exchange/src/wrapper/runtime/python/python_runtime.cpp b/exchange/src/wrapper/runtime/python/python_runtime.cpp index 24698172..ba900834 100644 --- a/exchange/src/wrapper/runtime/python/python_runtime.cpp +++ b/exchange/src/wrapper/runtime/python/python_runtime.cpp @@ -5,6 +5,8 @@ #include #include +#include + namespace nutc::wrapper { namespace py = pybind11; @@ -20,7 +22,7 @@ PyRuntime::fire_on_trade_update( ); } catch (const py::error_already_set& err) { log_error(err.what()); - // std::cerr << err.what() << "\n"; + std::cerr << err.what() << "\n"; } } @@ -29,13 +31,17 @@ PyRuntime::fire_on_orderbook_update( Ticker ticker, Side side, decimal_quantity quantity, decimal_price price ) const { + static bool reported = false; try { py::globals()["strategy"].attr("on_orderbook_update")( ticker, side, static_cast(quantity), static_cast(price) ); } catch (const py::error_already_set& err) { log_error(err.what()); - // std::cerr << err.what() << "\n"; + if (reported) + return; + std::cerr << err.what() << "\n"; + reported = true; } } @@ -52,7 +58,7 @@ PyRuntime::fire_on_account_update( ); } catch (const py::error_already_set& err) { log_error(err.what()); - // std::cerr << err.what() << "\n"; + std::cerr << err.what() << "\n"; } } From a001cb567fc597b52bf488919bf75773515037ed Mon Sep 17 00:00:00 2001 From: Steven Ewald Date: Fri, 18 Oct 2024 18:42:22 -0500 Subject: [PATCH 6/6] check4 --- exchange/src/wrapper/runtime/runtime.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exchange/src/wrapper/runtime/runtime.cpp b/exchange/src/wrapper/runtime/runtime.cpp index cd36b083..a338dae3 100644 --- a/exchange/src/wrapper/runtime/runtime.cpp +++ b/exchange/src/wrapper/runtime/runtime.cpp @@ -22,12 +22,12 @@ Runtime::process_message(tick_update&& tick_update) if (m.buyer_id == trader_id_) [[unlikely]] { fire_on_account_update( - p.ticker, p.side, p.price, p.quantity, m.buyer_capital + p.ticker, Side::buy, p.price, p.quantity, m.buyer_capital ); } if (m.seller_id == trader_id_) [[unlikely]] { fire_on_account_update( - p.ticker, p.side, p.price, p.quantity, m.seller_capital + p.ticker, Side::sell, p.price, p.quantity, m.seller_capital ); } });