Skip to content

Commit

Permalink
fix: remove adapter <> api deadlock (#108)
Browse files Browse the repository at this point in the history
Adapter sends price update notifications to the subscriber
API connection actor but at the same time the subscriber
actor can get blcoked on receiving messages (e.g. product
info) from Adapter. If the channel from Adapter to the
subscriber gets full the Adapter can get blocked and
this results in a deadlock. This situation propagates
and queues messages in other actors and WS messages
from the clients will grow in an unbounded queue that
results in a constant memory growth (leak).

This change simply makes the Adaptor to subscriber API
connection nonblocking by dropping messages if the channels
are full.
  • Loading branch information
ali-bahjati authored Feb 25, 2024
1 parent d19023b commit 670c080
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 22 deletions.
38 changes: 20 additions & 18 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Adapter {
Some(message) = self.message_rx.recv() => {
if let Err(err) = self.handle_message(message).await {
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
_ = self.shutdown_rx.recv() => {
Expand All @@ -224,7 +224,7 @@ impl Adapter {
_ = self.notify_price_sched_interval.tick() => {
if let Err(err) = self.send_notify_price_sched().await {
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}
}
Expand Down Expand Up @@ -508,12 +508,14 @@ impl Adapter {

async fn send_notify_price_sched(&self) -> Result<()> {
for subscription in self.notify_price_sched_subscriptions.values().flatten() {
// Send the notify price sched update without awaiting. This results in raising errors
// if the channel is full which normally should not happen. This is because we do not
// want to block the adapter if the channel is full.
subscription
.notify_price_sched_tx
.send(NotifyPriceSched {
.try_send(NotifyPriceSched {
subscription: subscription.subscription_id,
})
.await?;
})?;
}

Ok(())
Expand Down Expand Up @@ -580,19 +582,19 @@ impl Adapter {

// Send the Notify Price update to each subscription
for subscription in subscriptions {
subscription
.notify_price_tx
.send(NotifyPrice {
subscription: subscription.subscription_id,
result: PriceUpdate {
price,
conf,
status: Self::price_status_to_str(status),
valid_slot,
pub_slot,
},
})
.await?;
// Send the notify price update without awaiting. This results in raising errors if the
// channel is full which normally should not happen. This is because we do not want to
// block the adapter if the channel is full.
subscription.notify_price_tx.try_send(NotifyPrice {
subscription: subscription.subscription_id,
result: PriceUpdate {
price,
conf,
status: Self::price_status_to_str(status),
valid_slot,
pub_slot,
},
})?;
}

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions src/agent/remote_keypair_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,7 @@ async fn handle_key_requests(
match response_tx.send(copied_keypair) {
Ok(()) => {}
Err(_e) => {
warn!(logger, "remote_keypair_loader: Could not send back secondary keypair to channel";
);
warn!(logger, "remote_keypair_loader: Could not send back secondary keypair to channel");
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl Exporter {
self.update_our_prices(&publish_keypair.pubkey());

debug!(self.logger, "Exporter: filtering prices permissioned to us";
"our_prices" => format!("{:?}", self.our_prices),
"our_prices" => format!("{:?}", self.our_prices.keys()),
"publish_pubkey" => publish_keypair.pubkey().to_string(),
);

Expand Down Expand Up @@ -594,7 +594,6 @@ impl Exporter {
trace!(
self.logger,
"Exporter: No more permissioned price accounts in channel, using cached value";
"cached_value" => format!("{:?}", self.our_prices),
);
break;
}
Expand Down

0 comments on commit 670c080

Please sign in to comment.