Skip to content

Commit

Permalink
Merge pull request #54 from northwesternfintech/cm-speed-fix
Browse files Browse the repository at this point in the history
Removed unnecessary vector construction
  • Loading branch information
stevenewald authored Nov 21, 2023
2 parents 43c52bb + e566a10 commit 728f258
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 67 deletions.
16 changes: 0 additions & 16 deletions exchange/src/client_manager/client_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,5 @@ ClientManager::set_active(const std::string& uid)
clients[uid].active = true;
}

// inefficient but who cares
std::vector<Client>
ClientManager::get_clients(bool active_status) const
{
std::vector<Client> client_vec;

auto add_client_to_vec = [&client_vec, &active_status](const auto& client) {
if (client.active == active_status)
client_vec.push_back(client);
};

for (auto& [_, client] : clients)
add_client_to_vec(client);

return client_vec;
}
} // namespace manager
} // namespace nutc
7 changes: 6 additions & 1 deletion exchange/src/client_manager/client_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ class ClientManager {

float get_capital(const std::string& uid) const;
float get_holdings(const std::string& uid, const std::string& ticker) const;
std::vector<Client> get_clients(bool active) const;

inline const std::unordered_map<std::string, Client>&
get_clients() const
{
return clients;
}

void modify_capital(const std::string& uid, float change_in_capital);
void modify_holdings(
Expand Down
4 changes: 2 additions & 2 deletions exchange/src/local_algos/sandbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

namespace nutc {
namespace sandbox {
int
size_t
initialize_client_manager(manager::ClientManager& users)
{
// check number of algos in algos directory
int num_users = 0;
size_t num_users = 0;
for (const auto& entry : std::filesystem::directory_iterator(ALGO_DIR)) {
std::string uid = entry.path().filename().string();
uid = uid.substr(0, uid.find(".py"));
Expand Down
2 changes: 1 addition & 1 deletion exchange/src/local_algos/sandbox.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace nutc {
namespace sandbox {

int initialize_client_manager(manager::ClientManager& users);
size_t initialize_client_manager(manager::ClientManager& users);

void create_sandbox_algo_files();

Expand Down
2 changes: 1 addition & 1 deletion exchange/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ main(int argc, const char** argv)
users.add_client(uid, algo_id, false);
}

int num_clients = nutc::client::initialize(users, mode);
size_t num_clients = nutc::client::initialize(users, mode);

engine_manager.add_engine("A");
engine_manager.add_engine("B");
Expand Down
38 changes: 24 additions & 14 deletions exchange/src/process_spawning/spawning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
namespace nutc {
namespace client {

int
size_t
initialize(manager::ClientManager& users, Mode mode)
{
int num_users;
size_t num_users;
switch (mode) {
case Mode::DEV:
dev_mode::initialize_client_manager(users, DEBUG_NUM_USERS);
Expand All @@ -28,7 +28,7 @@ initialize(manager::ClientManager& users, Mode mode)
users.initialize_from_firebase(firebase_users);

// Spawn clients
const int num_clients = nutc::client::spawn_all_clients(users);
const size_t num_clients = nutc::client::spawn_all_clients(users);

if (num_clients == 0) {
log_c(client_spawning, "Spawned 0 clients");
Expand All @@ -53,19 +53,29 @@ quote_id(std::string id)
return id;
}

int
size_t
spawn_all_clients(const nutc::manager::ClientManager& users)
{
int clients = 0;
for (const auto& client : users.get_clients(false)) {
log_i(client_spawning, "Spawning client: {}", client.uid);
std::string quoted_user_id = quote_id(client.uid);
std::string quoted_algo_id = quote_id(client.algo_id);

spawn_client(quoted_user_id, quoted_algo_id, client.is_local_algo);
clients++;
};
return clients;
size_t num_clients = 0;
auto spawn_one_client =
[&num_clients](const std::pair<std::string, manager::Client>& pair) {
const auto& [uid, client] = pair;
const std::string& algo_id = client.algo_id;

if (client.active)
return;

log_i(client_spawning, "Spawning client: {}", uid);

spawn_client(quote_id(uid), quote_id(algo_id), client.is_local_algo);
num_clients++;
};

const auto& clients = users.get_clients();

std::for_each(clients.begin(), clients.end(), spawn_one_client);

return num_clients;
}

void
Expand Down
4 changes: 2 additions & 2 deletions exchange/src/process_spawning/spawning.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ glz::json_t::object_t get_all_users();
* @param users The ClientManager to spawn clients for
* @returns the number of clients spawned
*/
int spawn_all_clients(const nutc::manager::ClientManager& users);
size_t spawn_all_clients(const nutc::manager::ClientManager& users);

int initialize(manager::ClientManager& users, Mode mode);
size_t initialize(manager::ClientManager& users, Mode mode);

} // namespace client
} // namespace nutc
20 changes: 12 additions & 8 deletions exchange/src/rabbitmq/client_manager/RabbitMQClientManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace rabbitmq {

void
RabbitMQClientManager::waitForClients(
manager::ClientManager& clients, const int num_clients
manager::ClientManager& clients, size_t num_clients
)
{
int num_running = 0;
Expand Down Expand Up @@ -42,7 +42,7 @@ RabbitMQClientManager::waitForClients(
return true; // indicate that function should continue
};

for (int i = 0; i < num_clients; i++) {
for (size_t i = 0; i < num_clients; i++) {
auto data = RabbitMQConsumer::consumeMessage();
if (!std::visit(processMessage, data)) {
return;
Expand All @@ -60,7 +60,6 @@ RabbitMQClientManager::sendStartTime(
const manager::ClientManager& manager, int wait_seconds
)
{
std::vector<manager::Client> active_clients = manager.get_clients(true);
using time_point = std::chrono::high_resolution_clock::time_point;
time_point time =
std::chrono::high_resolution_clock::now() + std::chrono::seconds(wait_seconds);
Expand All @@ -70,13 +69,18 @@ RabbitMQClientManager::sendStartTime(

messages::StartTime message{time_ns};
std::string buf = glz::write_json(message);
auto send_to_client = [buf](const manager::Client& client) {
RabbitMQPublisher::publishMessage(client.uid, buf);

auto send_to_client = [buf](const std::pair<std::string, manager::Client>& pair) {
const auto& [uid, client] = pair;

if (!client.active)
return;

RabbitMQPublisher::publishMessage(uid, buf);
};

for (const auto& client : active_clients) {
send_to_client(client);
}
const auto& clients = manager.get_clients();
std::for_each(clients.begin(), clients.end(), send_to_client);
}

} // namespace rabbitmq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class RabbitMQClientManager {
* This ensures that all clients are connected to the exchange and have successfully
* started (vs a RMQ or firebase error)
*/
static void waitForClients(manager::ClientManager& manager, int num_clients);
static void waitForClients(manager::ClientManager& manager, size_t num_clients);

static void sendStartTime(const manager::ClientManager& manager, int wait_seconds);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,21 @@ void
RabbitMQConnectionManager::closeConnection(const manager::ClientManager& client_manager)
{
// Handle client shutdown
auto shutdownClient = [&](const auto& client) {
auto shutdownClient = [&](const std::pair<std::string, manager::Client>& pair) {
const auto& [uid, client] = pair;

if (!client.active)
return;

log_i(rabbitmq, "Shutting down client {}", client.uid);
messages::ShutdownMessage shutdown{client.uid};
messages::ShutdownMessage shutdown{uid};
auto messageStr = glz::write_json(shutdown);
RabbitMQPublisher::publishMessage(client.uid, messageStr);
RabbitMQPublisher::publishMessage(uid, messageStr);
};

// Iterate over clients and shut them down
for (const auto& client : client_manager.get_clients(true)) {
shutdownClient(client);
}
const auto& clients = client_manager.get_clients();
std::for_each(clients.begin(), clients.end(), shutdownClient);

// Close channel and connection, then destroy connection
amqp_channel_close(connection_state, 1, AMQP_REPLY_SUCCESS);
Expand Down
36 changes: 21 additions & 15 deletions exchange/src/rabbitmq/publisher/RabbitMQPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,20 @@ RabbitMQPublisher::broadcastMatches(
const manager::ClientManager& clients, const std::vector<messages::Match>& matches
)
{
auto broadcastToClient = [&](const auto& client) {
auto broadcastToClient = [&](const std::pair<std::string, manager::Client>& pair) {
for (const auto& match : matches) {
const auto& [uid, client] = pair;

if (!client.active)
continue;

std::string buffer;
glz::write<glz::opts{}>(match, buffer);
publishMessage(client.uid, buffer);
publishMessage(uid, buffer);
}
};

const auto activeClients = clients.get_clients(true);
const auto& activeClients = clients.get_clients();
std::for_each(activeClients.begin(), activeClients.end(), broadcastToClient);
}

Expand All @@ -56,21 +61,21 @@ RabbitMQPublisher::broadcastObUpdates(
const std::vector<messages::ObUpdate>& updates, const std::string& ignore_uid
)
{
auto broadcastToClient = [&](const auto& client) {
if (client.uid == ignore_uid) {
auto broadcastToClient = [&](const std::pair<std::string, manager::Client>& pair) {
const auto& [uid, client] = pair;

if (!client.active || uid == ignore_uid) {
return;
}

for (const auto& update : updates) {
// if (update.quantity <= 1e-6f) {
// continue;
// }
std::string buffer;
glz::write<glz::opts{}>(update, buffer);
publishMessage(client.uid, buffer);
publishMessage(uid, buffer);
}
};

const auto activeClients = clients.get_clients(true);
const auto& activeClients = clients.get_clients();
std::for_each(activeClients.begin(), activeClients.end(), broadcastToClient);
}

Expand All @@ -79,14 +84,15 @@ RabbitMQPublisher::broadcastAccountUpdate(
const manager::ClientManager& clients, const messages::Match& match
)
{
std::string buyer_uid = match.buyer_uid;
std::string seller_uid = match.seller_uid;
const std::string& buyer_uid = match.buyer_uid;
const std::string& seller_uid = match.seller_uid;

messages::AccountUpdate buyer_update = {
clients.get_capital(match.buyer_uid), match.ticker, messages::SIDE::BUY,
match.price, match.quantity
clients.get_capital(buyer_uid), match.ticker, messages::SIDE::BUY, match.price,
match.quantity
};
messages::AccountUpdate seller_update = {
clients.get_capital(match.seller_uid), match.ticker, messages::SIDE::SELL,
clients.get_capital(seller_uid), match.ticker, messages::SIDE::SELL,
match.price, match.quantity
};

Expand Down

0 comments on commit 728f258

Please sign in to comment.