From 58ceb73ed45e4ee0940a9a4dbe76ab1cf8561926 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Tue, 22 Oct 2024 08:53:04 +0000 Subject: [PATCH 01/18] add Tokio::Notify --- rs/bitcoin/adapter/src/lib.rs | 11 ++++++++++- rs/bitcoin/adapter/src/router.rs | 7 +++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index 94637d1e303..540faf67333 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -13,7 +13,7 @@ use std::{ sync::{Arc, Mutex}, time::Instant, }; -use tokio::sync::mpsc::channel; +use tokio::sync::{Notify, mpsc::channel}; /// This module contains the AddressManager struct. The struct stores addresses /// that will be used to create new connections. It also tracks addresses that /// are in current use to encourage use from non-utilized addresses. @@ -154,6 +154,8 @@ pub struct AdapterState { last_received_at: Arc>>, /// The field contains how long the adapter should wait to before becoming idle. idle_seconds: u64, + /// + notify: Arc, } impl AdapterState { @@ -162,6 +164,7 @@ impl AdapterState { Self { last_received_at: Arc::new(RwLock::new(None)), idle_seconds, + notify: Arc::new(Notify::new()), } } @@ -178,6 +181,12 @@ impl AdapterState { pub fn received_now(&self) { // Instant::now() is monotonically nondecreasing clock. *self.last_received_at.write() = Some(Instant::now()); + self.notify.notify_waiters(); + } + + /// Returns a clone of the notify handle. + pub fn notifier(&self) -> Arc { + self.notify.clone() } } diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index 05e54fab985..b8e10bbca3b 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::{ sync::mpsc::{channel, Receiver}, - time::{interval, sleep}, + time::interval, }; /// The function starts a Tokio task that awaits messages from the ConnectionManager. @@ -49,13 +49,12 @@ pub fn start_main_event_loop( tokio::task::spawn(async move { let mut tick_interval = interval(Duration::from_millis(100)); + let adapter_notifier = adapter_state.notifier(); loop { - let sleep_idle_interval = Duration::from_millis(100); if adapter_state.is_idle() { connection_manager.make_idle(); blockchain_manager.make_idle(); - // TODO: instead of sleeping here add some async synchronization. - sleep(sleep_idle_interval).await; + adapter_notifier.notified().await; continue; } From b8839600b87ba43df217f3db185dbbf8a651f749 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Tue, 22 Oct 2024 09:01:11 +0000 Subject: [PATCH 02/18] . --- rs/bitcoin/adapter/src/router.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index b8e10bbca3b..83f6b7b2e61 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -46,10 +46,10 @@ pub fn start_main_event_loop( network_message_sender, router_metrics.clone(), ); + let adapter_notifier = adapter_state.notifier(); tokio::task::spawn(async move { let mut tick_interval = interval(Duration::from_millis(100)); - let adapter_notifier = adapter_state.notifier(); loop { if adapter_state.is_idle() { connection_manager.make_idle(); From 8c96949b4f4e698cc2e5030228a727c8f756d6b6 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Tue, 22 Oct 2024 09:11:59 +0000 Subject: [PATCH 03/18] clippy --- rs/bitcoin/adapter/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index 540faf67333..8f7d088c70c 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -13,7 +13,7 @@ use std::{ sync::{Arc, Mutex}, time::Instant, }; -use tokio::sync::{Notify, mpsc::channel}; +use tokio::sync::{mpsc::channel, Notify}; /// This module contains the AddressManager struct. The struct stores addresses /// that will be used to create new connections. It also tracks addresses that /// are in current use to encourage use from non-utilized addresses. @@ -154,7 +154,7 @@ pub struct AdapterState { last_received_at: Arc>>, /// The field contains how long the adapter should wait to before becoming idle. idle_seconds: u64, - /// + /// The field contains a notify handle to notify waiters when the adapter is no longer idle. notify: Arc, } From 1914c4bb4de041a6f8365df2b448b295fb99e211 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Tue, 22 Oct 2024 09:18:43 +0000 Subject: [PATCH 04/18] rename --- rs/bitcoin/adapter/src/lib.rs | 10 +++++----- rs/bitcoin/adapter/src/router.rs | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index 8f7d088c70c..7f4cd7b368f 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -155,7 +155,7 @@ pub struct AdapterState { /// The field contains how long the adapter should wait to before becoming idle. idle_seconds: u64, /// The field contains a notify handle to notify waiters when the adapter is no longer idle. - notify: Arc, + awake_notify: Arc, } impl AdapterState { @@ -164,7 +164,7 @@ impl AdapterState { Self { last_received_at: Arc::new(RwLock::new(None)), idle_seconds, - notify: Arc::new(Notify::new()), + awake_notify: Arc::new(Notify::new()), } } @@ -181,12 +181,12 @@ impl AdapterState { pub fn received_now(&self) { // Instant::now() is monotonically nondecreasing clock. *self.last_received_at.write() = Some(Instant::now()); - self.notify.notify_waiters(); + self.awake_notify.notify_waiters(); } /// Returns a clone of the notify handle. - pub fn notifier(&self) -> Arc { - self.notify.clone() + pub fn awake_notifier(&self) -> Arc { + self.awake_notify.clone() } } diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index 83f6b7b2e61..4765535317b 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -46,7 +46,7 @@ pub fn start_main_event_loop( network_message_sender, router_metrics.clone(), ); - let adapter_notifier = adapter_state.notifier(); + let adapter_awake_notifier = adapter_state.awake_notifier(); tokio::task::spawn(async move { let mut tick_interval = interval(Duration::from_millis(100)); @@ -54,7 +54,7 @@ pub fn start_main_event_loop( if adapter_state.is_idle() { connection_manager.make_idle(); blockchain_manager.make_idle(); - adapter_notifier.notified().await; + adapter_awake_notifier.notified().await; continue; } From 62b94f7a8d2fc9ff00b809fbfe651927a714facf Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Wed, 23 Oct 2024 11:58:11 +0000 Subject: [PATCH 05/18] experiment --- rs/bitcoin/adapter/src/lib.rs | 20 ++++++++++++-------- rs/bitcoin/adapter/src/router.rs | 5 ++++- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index 7f4cd7b368f..4af6f770d91 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -13,7 +13,7 @@ use std::{ sync::{Arc, Mutex}, time::Instant, }; -use tokio::sync::{mpsc::channel, Notify}; +use tokio::sync::{mpsc::channel, watch, Notify}; /// This module contains the AddressManager struct. The struct stores addresses /// that will be used to create new connections. It also tracks addresses that /// are in current use to encourage use from non-utilized addresses. @@ -154,17 +154,18 @@ pub struct AdapterState { last_received_at: Arc>>, /// The field contains how long the adapter should wait to before becoming idle. idle_seconds: u64, - /// The field contains a notify handle to notify waiters when the adapter is no longer idle. - awake_notify: Arc, + /// The field contains a notify handle that is used to wake up the adapter when it should become idle. + sender: watch::Sender<()>, } impl AdapterState { /// Crates new instance of the AdapterState. pub fn new(idle_seconds: u64) -> Self { + let (sender, _) = watch::channel(()); Self { last_received_at: Arc::new(RwLock::new(None)), idle_seconds, - awake_notify: Arc::new(Notify::new()), + sender, } } @@ -177,16 +178,19 @@ impl AdapterState { } } + pub fn subscribe(&self) -> watch::Receiver<()> { + self.sender.subscribe() + } + /// Updates the current state of the adapter given a request was received. pub fn received_now(&self) { // Instant::now() is monotonically nondecreasing clock. *self.last_received_at.write() = Some(Instant::now()); - self.awake_notify.notify_waiters(); + let _ = self.sender.send(()); } - /// Returns a clone of the notify handle. - pub fn awake_notifier(&self) -> Arc { - self.awake_notify.clone() + async fn wait_until_active(&self, receive: &mut watch::Receiver<()>) { + receive.changed().await.unwrap(); } } diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index 4765535317b..3b52cce1dc0 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -60,7 +60,10 @@ pub fn start_main_event_loop( // We do a select over tokio::sync::mpsc::Receiver::recv, tokio::sync::mpsc::UnboundedReceiver::recv, // tokio::time::Interval::tick which are all cancellation safe. - tokio::select! { + tokio::join! { + is_idle = adapter_state.make_idle() => { + // + }, event = connection_manager.receive_stream_event() => { if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = connection_manager.process_event(&event) From 3b709d287da6fa331cc4dd0556a2c963c42235bd Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Wed, 23 Oct 2024 12:32:57 +0000 Subject: [PATCH 06/18] exp --- rs/bitcoin/adapter/src/lib.rs | 33 +++++---- rs/bitcoin/adapter/src/router.rs | 111 +++++++++++++++---------------- 2 files changed, 70 insertions(+), 74 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index 4af6f770d91..b99474db248 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -14,6 +14,7 @@ use std::{ time::Instant, }; use tokio::sync::{mpsc::channel, watch, Notify}; +use tokio::sync::mpsc; /// This module contains the AddressManager struct. The struct stores addresses /// that will be used to create new connections. It also tracks addresses that /// are in current use to encourage use from non-utilized addresses. @@ -154,39 +155,35 @@ pub struct AdapterState { last_received_at: Arc>>, /// The field contains how long the adapter should wait to before becoming idle. idle_seconds: u64, - /// The field contains a notify handle that is used to wake up the adapter when it should become idle. - sender: watch::Sender<()>, + + idle_tx: mpsc::Sender<()>, + idle_rx: mpsc::Receiver<()>, + + awake_tx: mpsc::Sender<()>, + awake_rx: mpsc::Receiver<()>, } impl AdapterState { /// Crates new instance of the AdapterState. pub fn new(idle_seconds: u64) -> Self { - let (sender, _) = watch::channel(()); + let (idle_tx, idle_rx) = mpsc::channel(()); + let (awake_tx, awake_rx) = mpsc::channel(()); Self { last_received_at: Arc::new(RwLock::new(None)), idle_seconds, - sender, - } - } - - /// Returns if the adapter is idle. - pub fn is_idle(&self) -> bool { - match *self.last_received_at.read() { - Some(last) => last.elapsed().as_secs() > self.idle_seconds, - // Nothing received yet still in idle from startup. - None => true, + idle_tx, + idle_rx, + awake_tx, + awake_rx, } } - pub fn subscribe(&self) -> watch::Receiver<()> { - self.sender.subscribe() - } - /// Updates the current state of the adapter given a request was received. pub fn received_now(&self) { // Instant::now() is monotonically nondecreasing clock. *self.last_received_at.write() = Some(Instant::now()); - let _ = self.sender.send(()); + self.awake_tx.send(()).unwrap(); + self.awake_rx.recv(); } async fn wait_until_active(&self, receive: &mut watch::Receiver<()>) { diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index 3b52cce1dc0..a9ed816f4ba 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -46,73 +46,72 @@ pub fn start_main_event_loop( network_message_sender, router_metrics.clone(), ); - let adapter_awake_notifier = adapter_state.awake_notifier(); + tokio::task::spawn(async move { let mut tick_interval = interval(Duration::from_millis(100)); loop { - if adapter_state.is_idle() { - connection_manager.make_idle(); - blockchain_manager.make_idle(); - adapter_awake_notifier.notified().await; - continue; - } + connection_manager.make_idle(); + blockchain_manager.make_idle(); + adapter_state.is_awake_rx.recv().await; // We do a select over tokio::sync::mpsc::Receiver::recv, tokio::sync::mpsc::UnboundedReceiver::recv, // tokio::time::Interval::tick which are all cancellation safe. - tokio::join! { - is_idle = adapter_state.make_idle() => { - // - }, - event = connection_manager.receive_stream_event() => { - if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = - connection_manager.process_event(&event) - { - connection_manager.discard(&event.address); - } - }, - network_message = network_message_receiver.recv() => { - let (address, message) = network_message.unwrap(); - router_metrics - .bitcoin_messages_received - .with_label_values(&[message.cmd()]) - .inc(); - if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = - connection_manager.process_bitcoin_network_message(address, &message) { - connection_manager.discard(&address); - } + loop { + tokio::select! { + is_idle = adapter_state.is_idle_rx.recv() => { + break; + }, + event = connection_manager.receive_stream_event() => { + if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = + connection_manager.process_event(&event) + { + connection_manager.discard(&event.address); + } + }, + network_message = network_message_receiver.recv() => { + let (address, message) = network_message.unwrap(); + router_metrics + .bitcoin_messages_received + .with_label_values(&[message.cmd()]) + .inc(); + if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = + connection_manager.process_bitcoin_network_message(address, &message) { + connection_manager.discard(&address); + } - if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = blockchain_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) { - connection_manager.discard(&address); - } - if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = transaction_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) { - connection_manager.discard(&address); - } - }, - result = blockchain_manager_rx.recv() => { - let command = result.expect("Receiving should not fail because the sender part of the channel is never closed."); - match command { - BlockchainManagerRequest::EnqueueNewBlocksToDownload(next_headers) => { - blockchain_manager.enqueue_new_blocks_to_download(next_headers); + if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = blockchain_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) { + connection_manager.discard(&address); } - BlockchainManagerRequest::PruneBlocks(anchor, processed_block_hashes) => { - blockchain_manager.prune_blocks(anchor, processed_block_hashes); + if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = transaction_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) { + connection_manager.discard(&address); + } + }, + result = blockchain_manager_rx.recv() => { + let command = result.expect("Receiving should not fail because the sender part of the channel is never closed."); + match command { + BlockchainManagerRequest::EnqueueNewBlocksToDownload(next_headers) => { + blockchain_manager.enqueue_new_blocks_to_download(next_headers); + } + BlockchainManagerRequest::PruneBlocks(anchor, processed_block_hashes) => { + blockchain_manager.prune_blocks(anchor, processed_block_hashes); + } + }; + } + transaction_manager_request = transaction_manager_rx.recv() => { + match transaction_manager_request.unwrap() { + TransactionManagerRequest::SendTransaction(transaction) => transaction_manager.enqueue_transaction(&transaction), } - }; - } - transaction_manager_request = transaction_manager_rx.recv() => { - match transaction_manager_request.unwrap() { - TransactionManagerRequest::SendTransaction(transaction) => transaction_manager.enqueue_transaction(&transaction), + }, + _ = tick_interval.tick() => { + // After an event is dispatched, the managers `tick` method is called to process possible + // outgoing messages. + connection_manager.tick(blockchain_manager.get_height(), handle_stream); + blockchain_manager.tick(&mut connection_manager); + transaction_manager.advertise_txids(&mut connection_manager); } - }, - _ = tick_interval.tick() => { - // After an event is dispatched, the managers `tick` method is called to process possible - // outgoing messages. - connection_manager.tick(blockchain_manager.get_height(), handle_stream); - blockchain_manager.tick(&mut connection_manager); - transaction_manager.advertise_txids(&mut connection_manager); - } - }; + }; + } } }); } From 03c4125c994e71b6a46f916312b9c18baef88911 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Wed, 23 Oct 2024 13:59:14 +0000 Subject: [PATCH 07/18] exp --- rs/bitcoin/adapter/src/lib.rs | 62 +++++++++++++++++++++++--------- rs/bitcoin/adapter/src/router.rs | 7 ++-- 2 files changed, 51 insertions(+), 18 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index b99474db248..f916a794f00 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -5,6 +5,7 @@ //! component to provide blocks and collect outgoing transactions. use bitcoin::{network::message::NetworkMessage, BlockHash, BlockHeader}; +use std::time::Duration; use ic_logger::ReplicaLogger; use ic_metrics::MetricsRegistry; use parking_lot::RwLock; @@ -13,7 +14,11 @@ use std::{ sync::{Arc, Mutex}, time::Instant, }; -use tokio::sync::{mpsc::channel, watch, Notify}; +use tokio::{ + sync::mpsc::{channel, Receiver}, + time::interval, +}; +use tokio::sync::{watch, Notify}; use tokio::sync::mpsc; /// This module contains the AddressManager struct. The struct stores addresses /// that will be used to create new connections. It also tracks addresses that @@ -155,27 +160,20 @@ pub struct AdapterState { last_received_at: Arc>>, /// The field contains how long the adapter should wait to before becoming idle. idle_seconds: u64, - - idle_tx: mpsc::Sender<()>, - idle_rx: mpsc::Receiver<()>, - awake_tx: mpsc::Sender<()>, - awake_rx: mpsc::Receiver<()>, + awake_tx: watch::Sender<()>, } impl AdapterState { /// Crates new instance of the AdapterState. pub fn new(idle_seconds: u64) -> Self { - let (idle_tx, idle_rx) = mpsc::channel(()); - let (awake_tx, awake_rx) = mpsc::channel(()); - Self { + let (awake_tx, _) = watch::channel(()); + let state = Self { last_received_at: Arc::new(RwLock::new(None)), idle_seconds, - idle_tx, - idle_rx, awake_tx, - awake_rx, - } + }; + state } /// Updates the current state of the adapter given a request was received. @@ -183,11 +181,43 @@ impl AdapterState { // Instant::now() is monotonically nondecreasing clock. *self.last_received_at.write() = Some(Instant::now()); self.awake_tx.send(()).unwrap(); - self.awake_rx.recv(); } - async fn wait_until_active(&self, receive: &mut watch::Receiver<()>) { - receive.changed().await.unwrap(); + /// A future that returns when the adapter becomes awake. + pub async fn is_awake(&self) -> () { + let mut awake_rx = self.awake_tx.clone().subscribe(); + + loop { + match *self.last_received_at.read() { + Some(last) => { + if last.elapsed().as_secs() < self.idle_seconds { + return (); + } + } + // Nothing received yet still in idle from startup. + None => {}, + }; + let _ = awake_rx.changed().await; + } + } + + /// A future that returns when the adapter becomes idle. + pub async fn is_idle(&self) -> () { + let mut tick_interval = interval(Duration::from_secs(self.idle_seconds)); + + loop { + tick_interval.tick().await; + match *self.last_received_at.read() { + Some(last) => { + if last.elapsed().as_secs() > self.idle_seconds { + return (); + } + tick_interval = interval(Duration::from_secs(self.idle_seconds - last.elapsed().as_secs())); + } + // Nothing received yet still in idle from startup. + None => return (), + }; + } } } diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index a9ed816f4ba..0f502ac09aa 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -47,19 +47,20 @@ pub fn start_main_event_loop( router_metrics.clone(), ); + let mut awake_rx = adapter_state.awake_tx.clone().subscribe(); tokio::task::spawn(async move { let mut tick_interval = interval(Duration::from_millis(100)); loop { connection_manager.make_idle(); blockchain_manager.make_idle(); - adapter_state.is_awake_rx.recv().await; + let _ = awake_rx.changed().await; // We do a select over tokio::sync::mpsc::Receiver::recv, tokio::sync::mpsc::UnboundedReceiver::recv, // tokio::time::Interval::tick which are all cancellation safe. loop { tokio::select! { - is_idle = adapter_state.is_idle_rx.recv() => { + _ = adapter_state.is_idle() => { break; }, event = connection_manager.receive_stream_event() => { @@ -115,3 +116,5 @@ pub fn start_main_event_loop( } }); } + + From d626ea874e5b790e0d5182a5b76a8191c6132937 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Wed, 23 Oct 2024 14:04:20 +0000 Subject: [PATCH 08/18] fix --- rs/bitcoin/adapter/src/router.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index 0f502ac09aa..6b4afcf1941 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -47,14 +47,12 @@ pub fn start_main_event_loop( router_metrics.clone(), ); - let mut awake_rx = adapter_state.awake_tx.clone().subscribe(); - tokio::task::spawn(async move { let mut tick_interval = interval(Duration::from_millis(100)); loop { connection_manager.make_idle(); blockchain_manager.make_idle(); - let _ = awake_rx.changed().await; + let _ = adapter_state.is_awake().await; // We do a select over tokio::sync::mpsc::Receiver::recv, tokio::sync::mpsc::UnboundedReceiver::recv, // tokio::time::Interval::tick which are all cancellation safe. From 7af678fc3f61f30d5eda1086a32f3c7690466079 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Wed, 23 Oct 2024 14:25:20 +0000 Subject: [PATCH 09/18] exp --- rs/bitcoin/adapter/src/lib.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index f916a794f00..1c841c9a528 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -15,11 +15,9 @@ use std::{ time::Instant, }; use tokio::{ - sync::mpsc::{channel, Receiver}, + sync::{mpsc::channel, watch}, time::interval, }; -use tokio::sync::{watch, Notify}; -use tokio::sync::mpsc; /// This module contains the AddressManager struct. The struct stores addresses /// that will be used to create new connections. It also tracks addresses that /// are in current use to encourage use from non-utilized addresses. @@ -183,10 +181,9 @@ impl AdapterState { self.awake_tx.send(()).unwrap(); } - /// A future that returns when the adapter becomes awake. + /// A future that returns when/if the adapter becomes/is awake. pub async fn is_awake(&self) -> () { let mut awake_rx = self.awake_tx.clone().subscribe(); - loop { match *self.last_received_at.read() { Some(last) => { @@ -201,12 +198,10 @@ impl AdapterState { } } - /// A future that returns when the adapter becomes idle. + /// A future that returns when/if the adapter becomes/is idle. pub async fn is_idle(&self) -> () { - let mut tick_interval = interval(Duration::from_secs(self.idle_seconds)); - loop { - tick_interval.tick().await; + let mut tick_interval: tokio::time::Interval; match *self.last_received_at.read() { Some(last) => { if last.elapsed().as_secs() > self.idle_seconds { @@ -217,6 +212,7 @@ impl AdapterState { // Nothing received yet still in idle from startup. None => return (), }; + tick_interval.tick().await; } } } From 9482c9443f3f5c77befb81f1fa1d3638cc3a957e Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Wed, 23 Oct 2024 14:34:32 +0000 Subject: [PATCH 10/18] exp --- rs/bitcoin/adapter/src/lib.rs | 8 +++++--- rs/bitcoin/adapter/src/router.rs | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index 1c841c9a528..d8cb6b4ef9e 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -178,11 +178,12 @@ impl AdapterState { pub fn received_now(&self) { // Instant::now() is monotonically nondecreasing clock. *self.last_received_at.write() = Some(Instant::now()); - self.awake_tx.send(()).unwrap(); + // TODO: perhaps log something if this fails + let _ = self.awake_tx.send(()); } /// A future that returns when/if the adapter becomes/is awake. - pub async fn is_awake(&self) -> () { + pub async fn become_awake(&self) { let mut awake_rx = self.awake_tx.clone().subscribe(); loop { match *self.last_received_at.read() { @@ -199,7 +200,7 @@ impl AdapterState { } /// A future that returns when/if the adapter becomes/is idle. - pub async fn is_idle(&self) -> () { + pub async fn become_idle(&self) { loop { let mut tick_interval: tokio::time::Interval; match *self.last_received_at.read() { @@ -207,6 +208,7 @@ impl AdapterState { if last.elapsed().as_secs() > self.idle_seconds { return (); } + // tick again for the remaining seconds tick_interval = interval(Duration::from_secs(self.idle_seconds - last.elapsed().as_secs())); } // Nothing received yet still in idle from startup. diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index 6b4afcf1941..b3a5038f1fa 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -52,13 +52,13 @@ pub fn start_main_event_loop( loop { connection_manager.make_idle(); blockchain_manager.make_idle(); - let _ = adapter_state.is_awake().await; + adapter_state.become_awake().await; // We do a select over tokio::sync::mpsc::Receiver::recv, tokio::sync::mpsc::UnboundedReceiver::recv, // tokio::time::Interval::tick which are all cancellation safe. loop { tokio::select! { - _ = adapter_state.is_idle() => { + _ = adapter_state.become_idle() => { break; }, event = connection_manager.receive_stream_event() => { From 416ea36dbb721283c5f9737a3582406bbeb42e1a Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Wed, 23 Oct 2024 14:44:32 +0000 Subject: [PATCH 11/18] add docs --- rs/bitcoin/adapter/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index d8cb6b4ef9e..4e99f3046cd 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -159,6 +159,7 @@ pub struct AdapterState { /// The field contains how long the adapter should wait to before becoming idle. idle_seconds: u64, + /// The field contains a watch channel that is used to wake up the adapter when it is idle. awake_tx: watch::Sender<()>, } From 098a1aba993447984ab840731b419a3de654b078 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Wed, 23 Oct 2024 16:54:24 +0000 Subject: [PATCH 12/18] remodel --- rs/bitcoin/adapter/src/lib.rs | 79 +++++++++++++++----------------- rs/bitcoin/adapter/src/router.rs | 4 +- 2 files changed, 40 insertions(+), 43 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index 4e99f3046cd..6c2de69d1ee 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -8,16 +8,12 @@ use bitcoin::{network::message::NetworkMessage, BlockHash, BlockHeader}; use std::time::Duration; use ic_logger::ReplicaLogger; use ic_metrics::MetricsRegistry; -use parking_lot::RwLock; use std::{ net::SocketAddr, sync::{Arc, Mutex}, time::Instant, }; -use tokio::{ - sync::{mpsc::channel, watch}, - time::interval, -}; +use tokio::{select, sync::{mpsc::channel, watch}, time::sleep}; /// This module contains the AddressManager struct. The struct stores addresses /// that will be used to create new connections. It also tracks addresses that /// are in current use to encourage use from non-utilized addresses. @@ -145,7 +141,11 @@ pub enum TransactionManagerRequest { /// thread-safe. #[derive(Clone)] pub struct AdapterState { - /// The field contains instant of the latest received request. + /// The field contains how long the adapter should wait before becoming idle. + idle_seconds: u64, + + /// The watch channel that holds the last received time. + /// The field contains the instant of the latest received request. /// None means that we haven't reveived a request yet and the adapter should be in idle mode! /// /// !!! BE CAREFUL HERE !!! since the adapter should ALWAYS be idle when starting up. @@ -155,67 +155,64 @@ pub struct AdapterState { /// This way the adapter would always be in idle when starting since 'elapsed()' is greater than 'idle_seconds'. /// On MacOS this approach caused issues since on MacOS Instant::now() is time since boot and when subtracting /// 'idle_seconds' we encountered an underflow and panicked. - last_received_at: Arc>>, - /// The field contains how long the adapter should wait to before becoming idle. - idle_seconds: u64, - - /// The field contains a watch channel that is used to wake up the adapter when it is idle. - awake_tx: watch::Sender<()>, + /// + /// I't simportant that this value is set to None on startup. + last_received_tx: watch::Sender>, } impl AdapterState { - /// Crates new instance of the AdapterState. + /// Creates a new instance of the AdapterState. pub fn new(idle_seconds: u64) -> Self { - let (awake_tx, _) = watch::channel(()); - let state = Self { - last_received_at: Arc::new(RwLock::new(None)), + // Initialize the watch channel with `None`, indicating no requests have been received yet. + let (last_received_tx, _) = watch::channel(None); + Self { idle_seconds, - awake_tx, - }; - state + last_received_tx, + } } - /// Updates the current state of the adapter given a request was received. + /// Updates the state to indicate a request was received now. pub fn received_now(&self) { - // Instant::now() is monotonically nondecreasing clock. - *self.last_received_at.write() = Some(Instant::now()); - // TODO: perhaps log something if this fails - let _ = self.awake_tx.send(()); + let _ = self.last_received_tx.send(Some(Instant::now())); } /// A future that returns when/if the adapter becomes/is awake. pub async fn become_awake(&self) { - let mut awake_rx = self.awake_tx.clone().subscribe(); + let mut last_received_rx = self.last_received_tx.subscribe(); loop { - match *self.last_received_at.read() { + match *last_received_rx.borrow() { Some(last) => { if last.elapsed().as_secs() < self.idle_seconds { - return (); + return; } } - // Nothing received yet still in idle from startup. - None => {}, - }; - let _ = awake_rx.changed().await; + // No requests received yet; the adapter is idle. + None => {} + } + // Wait for a change in the last received time. + let _ = last_received_rx.changed().await; } } /// A future that returns when/if the adapter becomes/is idle. pub async fn become_idle(&self) { + let mut last_received_rx = self.last_received_tx.subscribe(); loop { - let mut tick_interval: tokio::time::Interval; - match *self.last_received_at.read() { + let last_received = *last_received_rx.borrow(); + match last_received { Some(last) => { - if last.elapsed().as_secs() > self.idle_seconds { - return (); + let elapsed = last.elapsed().as_secs(); + if elapsed >= self.idle_seconds { + return; + } + select! { + _ = sleep(Duration::from_secs(self.idle_seconds - elapsed)) => {} + _ = last_received_rx.changed() => {} } - // tick again for the remaining seconds - tick_interval = interval(Duration::from_secs(self.idle_seconds - last.elapsed().as_secs())); } - // Nothing received yet still in idle from startup. - None => return (), - }; - tick_interval.tick().await; + // No requests received yet; the adapter is idle. + None => return, + } } } } diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index b3a5038f1fa..824e88e4054 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -50,8 +50,6 @@ pub fn start_main_event_loop( tokio::task::spawn(async move { let mut tick_interval = interval(Duration::from_millis(100)); loop { - connection_manager.make_idle(); - blockchain_manager.make_idle(); adapter_state.become_awake().await; // We do a select over tokio::sync::mpsc::Receiver::recv, tokio::sync::mpsc::UnboundedReceiver::recv, @@ -111,6 +109,8 @@ pub fn start_main_event_loop( } }; } + connection_manager.make_idle(); + blockchain_manager.make_idle(); } }); } From 75cf884af8e73b964b8ff6bd9b9f287ee25d253a Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Thu, 24 Oct 2024 07:04:40 +0000 Subject: [PATCH 13/18] clippy --- rs/bitcoin/adapter/src/lib.rs | 20 ++++++++++---------- rs/bitcoin/adapter/src/router.rs | 4 +--- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index 6c2de69d1ee..fb40d00a47a 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -5,15 +5,19 @@ //! component to provide blocks and collect outgoing transactions. use bitcoin::{network::message::NetworkMessage, BlockHash, BlockHeader}; -use std::time::Duration; use ic_logger::ReplicaLogger; use ic_metrics::MetricsRegistry; +use std::time::Duration; use std::{ net::SocketAddr, sync::{Arc, Mutex}, time::Instant, }; -use tokio::{select, sync::{mpsc::channel, watch}, time::sleep}; +use tokio::{ + select, + sync::{mpsc::channel, watch}, + time::sleep, +}; /// This module contains the AddressManager struct. The struct stores addresses /// that will be used to create new connections. It also tracks addresses that /// are in current use to encourage use from non-utilized addresses. @@ -155,7 +159,7 @@ pub struct AdapterState { /// This way the adapter would always be in idle when starting since 'elapsed()' is greater than 'idle_seconds'. /// On MacOS this approach caused issues since on MacOS Instant::now() is time since boot and when subtracting /// 'idle_seconds' we encountered an underflow and panicked. - /// + /// /// I't simportant that this value is set to None on startup. last_received_tx: watch::Sender>, } @@ -180,14 +184,10 @@ impl AdapterState { pub async fn become_awake(&self) { let mut last_received_rx = self.last_received_tx.subscribe(); loop { - match *last_received_rx.borrow() { - Some(last) => { - if last.elapsed().as_secs() < self.idle_seconds { - return; - } + if let Some(last) = *last_received_rx.borrow() { + if last.elapsed().as_secs() < self.idle_seconds { + return; } - // No requests received yet; the adapter is idle. - None => {} } // Wait for a change in the last received time. let _ = last_received_rx.changed().await; diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index 824e88e4054..236d8856278 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -108,11 +108,9 @@ pub fn start_main_event_loop( transaction_manager.advertise_txids(&mut connection_manager); } }; - } + } connection_manager.make_idle(); blockchain_manager.make_idle(); } }); } - - From 31db66e646f02b1dc313bf37a5038ad3c64360b6 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Thu, 24 Oct 2024 09:29:59 +0000 Subject: [PATCH 14/18] comments --- rs/bitcoin/adapter/src/lib.rs | 4 ++-- rs/bitcoin/adapter/src/router.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index fb40d00a47a..b9014fcded5 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -181,7 +181,7 @@ impl AdapterState { } /// A future that returns when/if the adapter becomes/is awake. - pub async fn become_awake(&self) { + pub async fn active(&self) { let mut last_received_rx = self.last_received_tx.subscribe(); loop { if let Some(last) = *last_received_rx.borrow() { @@ -195,7 +195,7 @@ impl AdapterState { } /// A future that returns when/if the adapter becomes/is idle. - pub async fn become_idle(&self) { + pub async fn idle(&self) { let mut last_received_rx = self.last_received_tx.subscribe(); loop { let last_received = *last_received_rx.borrow(); diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index 236d8856278..087f47ccdf2 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -50,13 +50,13 @@ pub fn start_main_event_loop( tokio::task::spawn(async move { let mut tick_interval = interval(Duration::from_millis(100)); loop { - adapter_state.become_awake().await; + adapter_state.active().await; // We do a select over tokio::sync::mpsc::Receiver::recv, tokio::sync::mpsc::UnboundedReceiver::recv, // tokio::time::Interval::tick which are all cancellation safe. loop { tokio::select! { - _ = adapter_state.become_idle() => { + _ = adapter_state.idle() => { break; }, event = connection_manager.receive_stream_event() => { From 23fddd5fbd34e343f411d0eec90bb035ed8cf9b8 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Fri, 25 Oct 2024 08:52:18 +0000 Subject: [PATCH 15/18] add changes --- rs/bitcoin/adapter/benches/e2e.rs | 4 +- rs/bitcoin/adapter/src/lib.rs | 77 +++++++++++++--------------- rs/bitcoin/adapter/src/router.rs | 2 +- rs/bitcoin/adapter/src/rpc_server.rs | 20 ++++---- 4 files changed, 49 insertions(+), 54 deletions(-) diff --git a/rs/bitcoin/adapter/benches/e2e.rs b/rs/bitcoin/adapter/benches/e2e.rs index f4700968ca0..04f11a3ae18 100644 --- a/rs/bitcoin/adapter/benches/e2e.rs +++ b/rs/bitcoin/adapter/benches/e2e.rs @@ -113,13 +113,13 @@ fn e2e(criterion: &mut Criterion) { &MetricsRegistry::default(), ); - let adapter_state = AdapterState::new(config.idle_seconds); + let (_, tx) = AdapterState::new(config.idle_seconds); let (transaction_manager_tx, _) = channel(100); start_grpc_server( config.clone(), no_op_logger(), - adapter_state.clone(), + tx, handler, transaction_manager_tx, &MetricsRegistry::default(), diff --git a/rs/bitcoin/adapter/src/lib.rs b/rs/bitcoin/adapter/src/lib.rs index b9014fcded5..a5178d4fec4 100644 --- a/rs/bitcoin/adapter/src/lib.rs +++ b/rs/bitcoin/adapter/src/lib.rs @@ -160,60 +160,53 @@ pub struct AdapterState { /// On MacOS this approach caused issues since on MacOS Instant::now() is time since boot and when subtracting /// 'idle_seconds' we encountered an underflow and panicked. /// - /// I't simportant that this value is set to None on startup. - last_received_tx: watch::Sender>, + /// It's simportant that this value is set to [`None`] on startup. + last_received_rx: watch::Receiver>, } impl AdapterState { - /// Creates a new instance of the AdapterState. - pub fn new(idle_seconds: u64) -> Self { + /// Creates a new instance of the [`AdapterState`]. + pub fn new(idle_seconds: u64) -> (Self, watch::Sender>) { // Initialize the watch channel with `None`, indicating no requests have been received yet. - let (last_received_tx, _) = watch::channel(None); - Self { - idle_seconds, - last_received_tx, - } + let (tx, last_received_rx) = watch::channel(None); + ( + Self { + idle_seconds, + last_received_rx, + }, + tx, + ) } - /// Updates the state to indicate a request was received now. - pub fn received_now(&self) { - let _ = self.last_received_tx.send(Some(Instant::now())); - } + /// A future that returns when/if the adapter becomes/is idle. + pub async fn idle(&mut self) { + let mut last_time = self + .last_received_rx + .borrow_and_update() + .unwrap_or_else(Instant::now); - /// A future that returns when/if the adapter becomes/is awake. - pub async fn active(&self) { - let mut last_received_rx = self.last_received_tx.subscribe(); loop { - if let Some(last) = *last_received_rx.borrow() { - if last.elapsed().as_secs() < self.idle_seconds { - return; + let seconds_left_until_idle = self.idle_seconds - last_time.elapsed().as_secs(); + select! { + _ = sleep(Duration::from_secs(seconds_left_until_idle)) => {return}, + Ok(_) = self.last_received_rx.changed() => { + last_time = self.last_received_rx.borrow_and_update().unwrap_or_else(Instant::now); } } - // Wait for a change in the last received time. - let _ = last_received_rx.changed().await; } } - /// A future that returns when/if the adapter becomes/is idle. - pub async fn idle(&self) { - let mut last_received_rx = self.last_received_tx.subscribe(); - loop { - let last_received = *last_received_rx.borrow(); - match last_received { - Some(last) => { - let elapsed = last.elapsed().as_secs(); - if elapsed >= self.idle_seconds { - return; - } - select! { - _ = sleep(Duration::from_secs(self.idle_seconds - elapsed)) => {} - _ = last_received_rx.changed() => {} - } + /// A future that returns when/if the adapter becomes/is awake. + pub async fn active(&mut self) { + let _ = self + .last_received_rx + .wait_for(|v| { + if let Some(last) = v { + return last.elapsed().as_secs() < self.idle_seconds; } - // No requests received yet; the adapter is idle. - None => return, - } - } + false + }) + .await; } } @@ -226,7 +219,7 @@ pub fn start_server( ) { let _enter = rt_handle.enter(); - let adapter_state = AdapterState::new(config.idle_seconds); + let (adapter_state, tx) = AdapterState::new(config.idle_seconds); let (blockchain_manager_tx, blockchain_manager_rx) = channel(100); let blockchain_state = Arc::new(Mutex::new(BlockchainState::new(&config, metrics_registry))); @@ -244,7 +237,7 @@ pub fn start_server( start_grpc_server( config.clone(), log.clone(), - adapter_state.clone(), + tx, get_successors_handler, transaction_manager_tx, metrics_registry, diff --git a/rs/bitcoin/adapter/src/router.rs b/rs/bitcoin/adapter/src/router.rs index 087f47ccdf2..34d7d02737c 100644 --- a/rs/bitcoin/adapter/src/router.rs +++ b/rs/bitcoin/adapter/src/router.rs @@ -28,7 +28,7 @@ pub fn start_main_event_loop( logger: ReplicaLogger, blockchain_state: Arc>, mut transaction_manager_rx: Receiver, - adapter_state: AdapterState, + mut adapter_state: AdapterState, mut blockchain_manager_rx: Receiver, metrics_registry: &MetricsRegistry, ) { diff --git a/rs/bitcoin/adapter/src/rpc_server.rs b/rs/bitcoin/adapter/src/rpc_server.rs index fe86e727075..fd9239e4197 100644 --- a/rs/bitcoin/adapter/src/rpc_server.rs +++ b/rs/bitcoin/adapter/src/rpc_server.rs @@ -2,7 +2,7 @@ use crate::{ config::{Config, IncomingSource}, get_successors_handler::{GetSuccessorsRequest, GetSuccessorsResponse}, metrics::{ServiceMetrics, LABEL_GET_SUCCESSOR, LABEL_SEND_TRANSACTION}, - AdapterState, GetSuccessorsHandler, TransactionManagerRequest, + GetSuccessorsHandler, TransactionManagerRequest, }; use bitcoin::{consensus::Encodable, hashes::Hash, BlockHash}; use ic_async_utils::{incoming_from_first_systemd_socket, incoming_from_path}; @@ -14,13 +14,15 @@ use ic_btc_service::{ use ic_logger::{debug, ReplicaLogger}; use ic_metrics::MetricsRegistry; use std::convert::{TryFrom, TryInto}; -use tokio::sync::mpsc::Sender; +use std::time::Instant; +use tokio::sync::mpsc; +use tokio::sync::watch; use tonic::{transport::Server, Request, Response, Status}; struct BtcServiceImpl { - adapter_state: AdapterState, + last_received_tx: watch::Sender>, get_successors_handler: GetSuccessorsHandler, - transaction_manager_tx: Sender, + transaction_manager_tx: mpsc::Sender, logger: ReplicaLogger, metrics: ServiceMetrics, } @@ -83,7 +85,7 @@ impl BtcService for BtcServiceImpl { .request_duration .with_label_values(&[LABEL_GET_SUCCESSOR]) .start_timer(); - self.adapter_state.received_now(); + let _ = self.last_received_tx.send(Some(Instant::now())); let inner = request.into_inner(); debug!(self.logger, "Received GetSuccessorsRequest: {:?}", inner); let request = inner.try_into()?; @@ -108,7 +110,7 @@ impl BtcService for BtcServiceImpl { .request_duration .with_label_values(&[LABEL_SEND_TRANSACTION]) .start_timer(); - self.adapter_state.received_now(); + let _ = self.last_received_tx.send(Some(Instant::now())); let transaction = request.into_inner().transaction; self.transaction_manager_tx .send(TransactionManagerRequest::SendTransaction(transaction)) @@ -124,13 +126,13 @@ impl BtcService for BtcServiceImpl { pub fn start_grpc_server( config: Config, logger: ReplicaLogger, - adapter_state: AdapterState, + last_received_tx: watch::Sender>, get_successors_handler: GetSuccessorsHandler, - transaction_manager_tx: Sender, + transaction_manager_tx: mpsc::Sender, metrics_registry: &MetricsRegistry, ) { let btc_adapter_impl = BtcServiceImpl { - adapter_state, + last_received_tx, get_successors_handler, transaction_manager_tx, logger, From 5e20a1d59c85b136e4765a15b05c3ed877c039c8 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Fri, 25 Oct 2024 08:56:20 +0000 Subject: [PATCH 16/18] fix benchmark test --- rs/bitcoin/adapter/benches/e2e.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/rs/bitcoin/adapter/benches/e2e.rs b/rs/bitcoin/adapter/benches/e2e.rs index 04f11a3ae18..80d67b1d353 100644 --- a/rs/bitcoin/adapter/benches/e2e.rs +++ b/rs/bitcoin/adapter/benches/e2e.rs @@ -2,6 +2,7 @@ use bitcoin::{Block, BlockHash, BlockHeader, Network}; use criterion::{criterion_group, criterion_main, Criterion}; use ic_btc_adapter::config::IncomingSource; use ic_btc_adapter::start_grpc_server; +use ic_btc_adapter::start_server; use ic_btc_adapter::AdapterState; use ic_btc_adapter::{ config::Config, BlockchainManagerRequest, BlockchainState, GetSuccessorsHandler, @@ -113,17 +114,7 @@ fn e2e(criterion: &mut Criterion) { &MetricsRegistry::default(), ); - let (_, tx) = AdapterState::new(config.idle_seconds); - - let (transaction_manager_tx, _) = channel(100); - start_grpc_server( - config.clone(), - no_op_logger(), - tx, - handler, - transaction_manager_tx, - &MetricsRegistry::default(), - ); + start_server(&no_op_logger(), &MetricsRegistry::default(), &rt.handle(), config.clone()); start_client(uds_path).await })) From 58137e1ff3da4f62fe358e2e370e98f97e2e8b46 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Fri, 25 Oct 2024 09:03:09 +0000 Subject: [PATCH 17/18] fix e2e --- rs/bitcoin/adapter/benches/e2e.rs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/rs/bitcoin/adapter/benches/e2e.rs b/rs/bitcoin/adapter/benches/e2e.rs index 80d67b1d353..8c6a8a55fe9 100644 --- a/rs/bitcoin/adapter/benches/e2e.rs +++ b/rs/bitcoin/adapter/benches/e2e.rs @@ -1,11 +1,9 @@ use bitcoin::{Block, BlockHash, BlockHeader, Network}; use criterion::{criterion_group, criterion_main, Criterion}; use ic_btc_adapter::config::IncomingSource; -use ic_btc_adapter::start_grpc_server; use ic_btc_adapter::start_server; -use ic_btc_adapter::AdapterState; use ic_btc_adapter::{ - config::Config, BlockchainManagerRequest, BlockchainState, GetSuccessorsHandler, + config::Config, BlockchainManagerRequest, BlockchainState, }; use ic_btc_adapter_client::setup_bitcoin_adapter_clients; use ic_btc_adapter_test_utils::generate_headers; @@ -97,7 +95,6 @@ fn e2e(criterion: &mut Criterion) { 1975, ); - let blockchain_state = Arc::new(Mutex::new(blockchain_state)); let rt = tokio::runtime::Runtime::new().unwrap(); @@ -106,16 +103,7 @@ fn e2e(criterion: &mut Criterion) { Ok(rt.block_on(async { config.incoming_source = IncomingSource::Path(uds_path.to_path_buf()); - let (blockchain_manager_tx, _) = channel::(10); - let handler = GetSuccessorsHandler::new( - &config, - blockchain_state.clone(), - blockchain_manager_tx, - &MetricsRegistry::default(), - ); - start_server(&no_op_logger(), &MetricsRegistry::default(), &rt.handle(), config.clone()); - start_client(uds_path).await })) }) From 2fc6544c02029adfd5d85664554b570285793f75 Mon Sep 17 00:00:00 2001 From: Mihail Jianu Date: Fri, 25 Oct 2024 09:09:15 +0000 Subject: [PATCH 18/18] clippy --- rs/bitcoin/adapter/benches/e2e.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rs/bitcoin/adapter/benches/e2e.rs b/rs/bitcoin/adapter/benches/e2e.rs index 8c6a8a55fe9..cb4070648b6 100644 --- a/rs/bitcoin/adapter/benches/e2e.rs +++ b/rs/bitcoin/adapter/benches/e2e.rs @@ -2,9 +2,7 @@ use bitcoin::{Block, BlockHash, BlockHeader, Network}; use criterion::{criterion_group, criterion_main, Criterion}; use ic_btc_adapter::config::IncomingSource; use ic_btc_adapter::start_server; -use ic_btc_adapter::{ - config::Config, BlockchainManagerRequest, BlockchainState, -}; +use ic_btc_adapter::{config::Config, BlockchainState}; use ic_btc_adapter_client::setup_bitcoin_adapter_clients; use ic_btc_adapter_test_utils::generate_headers; use ic_btc_replica_types::BitcoinAdapterRequestWrapper; @@ -17,9 +15,7 @@ use ic_interfaces_adapter_client::RpcAdapterClient; use ic_logger::replica_logger::no_op_logger; use ic_metrics::MetricsRegistry; use std::path::Path; -use std::sync::{Arc, Mutex}; use tempfile::Builder; -use tokio::sync::mpsc::channel; type BitcoinAdapterClient = Box< dyn RpcAdapterClient, @@ -95,7 +91,6 @@ fn e2e(criterion: &mut Criterion) { 1975, ); - let rt = tokio::runtime::Runtime::new().unwrap(); let (client, _temp) = Builder::new() @@ -103,7 +98,12 @@ fn e2e(criterion: &mut Criterion) { Ok(rt.block_on(async { config.incoming_source = IncomingSource::Path(uds_path.to_path_buf()); - start_server(&no_op_logger(), &MetricsRegistry::default(), &rt.handle(), config.clone()); + start_server( + &no_op_logger(), + &MetricsRegistry::default(), + rt.handle(), + config.clone(), + ); start_client(uds_path).await })) })