Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: synchronize properly with tokio::Notify instead of sleeping #2178

Merged
merged 18 commits into from
Oct 25, 2024
62 changes: 50 additions & 12 deletions rs/bitcoin/adapter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! component to provide blocks and collect outgoing transactions.

use bitcoin::{network::message::NetworkMessage, BlockHash, BlockHeader};
use std::time::Duration;
use ic_logger::ReplicaLogger;
use ic_metrics::MetricsRegistry;
use parking_lot::RwLock;
Expand All @@ -13,7 +14,10 @@ use std::{
sync::{Arc, Mutex},
time::Instant,
};
use tokio::sync::mpsc::channel;
use tokio::{
sync::{mpsc::channel, watch},
time::interval,
};
/// This module contains the AddressManager struct. The struct stores addresses
/// that will be used to create new connections. It also tracks addresses that
/// are in current use to encourage use from non-utilized addresses.
Expand Down Expand Up @@ -154,30 +158,64 @@ pub struct AdapterState {
last_received_at: Arc<RwLock<Option<Instant>>>,
/// The field contains how long the adapter should wait to before becoming idle.
idle_seconds: u64,

awake_tx: watch::Sender<()>,
}

impl AdapterState {
/// Crates new instance of the AdapterState.
pub fn new(idle_seconds: u64) -> Self {
Self {
let (awake_tx, _) = watch::channel(());
let state = Self {
last_received_at: Arc::new(RwLock::new(None)),
idle_seconds,
}
}

/// Returns if the adapter is idle.
pub fn is_idle(&self) -> bool {
match *self.last_received_at.read() {
Some(last) => last.elapsed().as_secs() > self.idle_seconds,
// Nothing received yet still in idle from startup.
None => true,
}
awake_tx,
};
state
}

/// Updates the current state of the adapter given a request was received.
pub fn received_now(&self) {
// Instant::now() is monotonically nondecreasing clock.
*self.last_received_at.write() = Some(Instant::now());
// TODO: perhaps log something if this fails
let _ = self.awake_tx.send(());
}

/// A future that returns when/if the adapter becomes/is awake.
mihailjianu1 marked this conversation as resolved.
Show resolved Hide resolved
pub async fn become_awake(&self) {
let mut awake_rx = self.awake_tx.clone().subscribe();
loop {
match *self.last_received_at.read() {
Some(last) => {
if last.elapsed().as_secs() < self.idle_seconds {
return ();
}
}
// Nothing received yet still in idle from startup.
None => {},
};
let _ = awake_rx.changed().await;
}
}

/// A future that returns when/if the adapter becomes/is idle.
pub async fn become_idle(&self) {
loop {
let mut tick_interval: tokio::time::Interval;
match *self.last_received_at.read() {
Some(last) => {
if last.elapsed().as_secs() > self.idle_seconds {
return ();
}
// tick again for the remaining seconds
tick_interval = interval(Duration::from_secs(self.idle_seconds - last.elapsed().as_secs()));
}
// Nothing received yet still in idle from startup.
None => return (),
};
tick_interval.tick().await;
}
}
}

Expand Down
112 changes: 57 additions & 55 deletions rs/bitcoin/adapter/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::{
sync::mpsc::{channel, Receiver},
time::{interval, sleep},
time::interval,
};

/// The function starts a Tokio task that awaits messages from the ConnectionManager.
Expand Down Expand Up @@ -50,67 +50,69 @@ pub fn start_main_event_loop(
tokio::task::spawn(async move {
let mut tick_interval = interval(Duration::from_millis(100));
loop {
let sleep_idle_interval = Duration::from_millis(100);
if adapter_state.is_idle() {
connection_manager.make_idle();
blockchain_manager.make_idle();
// TODO: instead of sleeping here add some async synchronization.
sleep(sleep_idle_interval).await;
continue;
}
connection_manager.make_idle();
blockchain_manager.make_idle();
mihailjianu1 marked this conversation as resolved.
Show resolved Hide resolved
adapter_state.become_awake().await;

// We do a select over tokio::sync::mpsc::Receiver::recv, tokio::sync::mpsc::UnboundedReceiver::recv,
// tokio::time::Interval::tick which are all cancellation safe.
rumenov marked this conversation as resolved.
Show resolved Hide resolved
tokio::select! {
event = connection_manager.receive_stream_event() => {
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) =
connection_manager.process_event(&event)
{
connection_manager.discard(&event.address);
}
},
network_message = network_message_receiver.recv() => {
let (address, message) = network_message.unwrap();
router_metrics
.bitcoin_messages_received
.with_label_values(&[message.cmd()])
.inc();
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) =
connection_manager.process_bitcoin_network_message(address, &message) {
connection_manager.discard(&address);
}
loop {
tokio::select! {
_ = adapter_state.become_idle() => {
break;
},
event = connection_manager.receive_stream_event() => {
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) =
connection_manager.process_event(&event)
{
connection_manager.discard(&event.address);
}
},
network_message = network_message_receiver.recv() => {
let (address, message) = network_message.unwrap();
router_metrics
.bitcoin_messages_received
.with_label_values(&[message.cmd()])
.inc();
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) =
connection_manager.process_bitcoin_network_message(address, &message) {
connection_manager.discard(&address);
}

if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = blockchain_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) {
connection_manager.discard(&address);
}
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = transaction_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) {
connection_manager.discard(&address);
}
},
result = blockchain_manager_rx.recv() => {
let command = result.expect("Receiving should not fail because the sender part of the channel is never closed.");
match command {
BlockchainManagerRequest::EnqueueNewBlocksToDownload(next_headers) => {
blockchain_manager.enqueue_new_blocks_to_download(next_headers);
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = blockchain_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) {
connection_manager.discard(&address);
}
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = transaction_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) {
connection_manager.discard(&address);
}
BlockchainManagerRequest::PruneBlocks(anchor, processed_block_hashes) => {
blockchain_manager.prune_blocks(anchor, processed_block_hashes);
},
result = blockchain_manager_rx.recv() => {
let command = result.expect("Receiving should not fail because the sender part of the channel is never closed.");
match command {
BlockchainManagerRequest::EnqueueNewBlocksToDownload(next_headers) => {
blockchain_manager.enqueue_new_blocks_to_download(next_headers);
}
BlockchainManagerRequest::PruneBlocks(anchor, processed_block_hashes) => {
blockchain_manager.prune_blocks(anchor, processed_block_hashes);
}
};
}
transaction_manager_request = transaction_manager_rx.recv() => {
match transaction_manager_request.unwrap() {
TransactionManagerRequest::SendTransaction(transaction) => transaction_manager.enqueue_transaction(&transaction),
}
};
}
transaction_manager_request = transaction_manager_rx.recv() => {
match transaction_manager_request.unwrap() {
TransactionManagerRequest::SendTransaction(transaction) => transaction_manager.enqueue_transaction(&transaction),
},
_ = tick_interval.tick() => {
// After an event is dispatched, the managers `tick` method is called to process possible
// outgoing messages.
connection_manager.tick(blockchain_manager.get_height(), handle_stream);
blockchain_manager.tick(&mut connection_manager);
transaction_manager.advertise_txids(&mut connection_manager);
}
},
_ = tick_interval.tick() => {
// After an event is dispatched, the managers `tick` method is called to process possible
// outgoing messages.
connection_manager.tick(blockchain_manager.get_height(), handle_stream);
blockchain_manager.tick(&mut connection_manager);
transaction_manager.advertise_txids(&mut connection_manager);
}
};
};
}
}
});
}


Loading