Skip to content

Commit

Permalink
fix(notify_cancel): removed usage of async trait and made the trait d…
Browse files Browse the repository at this point in the history
…yn incompatible (#2665)

## Linked Issues/PRs
<!-- List of related issues/PRs -->
- none

## Description
<!-- List of detailed changes -->

The code will bloat a little more, but that's fine :_) we don't have dyn
dispatch for `NotifyCancel` anymore.



This pull request introduces changes to the `fuel-core` crate, primarily
focusing on enhancing the flexibility and functionality of the
`TaskManager` and `CancellationToken` structures. The changes include
adding a generic parameter to these structures and updating related
implementations to accommodate this new parameter.

Key changes include:

### Enhancements to Task Management:

*
[`crates/fuel-core/src/service/genesis/task_manager.rs`](diffhunk://#diff-c22d48a385f976cd771e8cb7c28d17e3cf2096cfa06752611bf6c7ee1a77ae83L11-L22):
Introduced a generic parameter `N` to the `TaskManager` and
`CancellationToken` structures, allowing for more flexible cancellation
handling. Updated the `NotifyCancel` trait to return a future instead of
using `async_trait`.
[[1]](diffhunk://#diff-c22d48a385f976cd771e8cb7c28d17e3cf2096cfa06752611bf6c7ee1a77ae83L11-L22)
[[2]](diffhunk://#diff-c22d48a385f976cd771e8cb7c28d17e3cf2096cfa06752611bf6c7ee1a77ae83L33)
[[3]](diffhunk://#diff-c22d48a385f976cd771e8cb7c28d17e3cf2096cfa06752611bf6c7ee1a77ae83L53-R61)
[[4]](diffhunk://#diff-c22d48a385f976cd771e8cb7c28d17e3cf2096cfa06752611bf6c7ee1a77ae83L69-R81)
[[5]](diffhunk://#diff-c22d48a385f976cd771e8cb7c28d17e3cf2096cfa06752611bf6c7ee1a77ae83L87-R104)
[[6]](diffhunk://#diff-c22d48a385f976cd771e8cb7c28d17e3cf2096cfa06752611bf6c7ee1a77ae83L109-R113)

### Updates to Genesis Exporter and Importer:

*
[`crates/fuel-core/src/service/genesis/exporter.rs`](diffhunk://#diff-e7916e60a2d3ed838c5e0e668a697360da9c84bf37db278938a88bd829daf34dL68-R87):
Modified the `Exporter` struct to include the new generic parameter `N`
and updated its implementation accordingly.
*
[`crates/fuel-core/src/service/genesis/importer.rs`](diffhunk://#diff-24642391bda7217130a665037de61e936e0ae88613e6af1b583759698512b1e8L76-R78):
Updated the `SnapshotImporter` struct to include the new generic
parameter `N` and adjusted its implementation.

### Modifications to Import Task:

*
[`crates/fuel-core/src/service/genesis/importer/import_task.rs`](diffhunk://#diff-954b9280f787dd90eda70a58c77f53b3543e8286462d498dcda45536ca7a175fR27):
Updated the `run` method to accept a `CancellationToken` with the new
generic parameter `N` and modified the `never_cancel` test function to
reflect this change.
[[1]](diffhunk://#diff-954b9280f787dd90eda70a58c77f53b3543e8286462d498dcda45536ca7a175fR27)
[[2]](diffhunk://#diff-954b9280f787dd90eda70a58c77f53b3543e8286462d498dcda45536ca7a175fL103-R107)
[[3]](diffhunk://#diff-954b9280f787dd90eda70a58c77f53b3543e8286462d498dcda45536ca7a175fL612-R616)

### Removal of Async Trait:

*
[`crates/fuel-core/src/lib.rs`](diffhunk://#diff-56f4bee7f587862b3c7b4c2dd26310911042ff83f449e4421c1704471e6fa98cL96):
Removed the `async_trait` annotation from the `NotifyCancel`
implementation for `ShutdownListener`.

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [x] I have created follow-up issues caused by this PR and linked them
here

### After merging, notify other teams

[Add or remove entries as needed]

- [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/)
- [ ] [Sway compiler](https://github.com/FuelLabs/sway/)
- [ ] [Platform
documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+)
(for out-of-organization contributors, the person merging the PR will do
this)
- [ ] Someone else?
  • Loading branch information
rymnc authored Feb 7, 2025
1 parent 66d4ece commit edd77ac
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 27 deletions.
1 change: 0 additions & 1 deletion crates/fuel-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ impl ShutdownListener {
}
}

#[async_trait::async_trait]
impl NotifyCancel for ShutdownListener {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
self.token.cancelled().await;
Expand Down
9 changes: 5 additions & 4 deletions crates/fuel-core/src/service/genesis/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,26 @@ use super::{
NotifyCancel,
};

pub struct Exporter<Fun> {
pub struct Exporter<Fun, N> {
db: CombinedDatabase,
prev_chain_config: ChainConfig,
writer: Fun,
group_size: usize,
task_manager: TaskManager<SnapshotFragment>,
task_manager: TaskManager<SnapshotFragment, N>,
multi_progress: MultipleProgressReporter,
}

impl<Fun> Exporter<Fun>
impl<Fun, N> Exporter<Fun, N>
where
Fun: Fn() -> anyhow::Result<SnapshotWriter>,
N: NotifyCancel + Send + Sync + Clone + 'static,
{
pub fn new(
db: CombinedDatabase,
prev_chain_config: ChainConfig,
writer: Fun,
group_size: usize,
cancel_token: impl NotifyCancel + Send + Sync + 'static,
cancel_token: N,
) -> Self {
Self {
db,
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-core/src/service/genesis/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ mod on_chain;

const GROUPS_NUMBER_FOR_PARALLELIZATION: usize = 10;

pub struct SnapshotImporter {
pub struct SnapshotImporter<N = StateWatcher> {
db: CombinedGenesisDatabase,
task_manager: TaskManager<()>,
task_manager: TaskManager<(), N>,
genesis_block: Block,
snapshot_reader: SnapshotReader,
multi_progress_reporter: MultipleProgressReporter,
Expand Down
8 changes: 6 additions & 2 deletions crates/fuel-core/src/service/genesis/importer/import_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
service::genesis::{
progress::ProgressReporter,
task_manager::CancellationToken,
NotifyCancel,
},
};

Expand Down Expand Up @@ -100,7 +101,10 @@ where
for<'a> StorageTransaction<&'a mut GenesisDatabase<DbDesc>>:
StorageMutate<GenesisMetadata<DbDesc>, Error = fuel_core_storage::Error>,
{
pub fn run(mut self, cancel_token: CancellationToken) -> anyhow::Result<()> {
pub fn run<N>(mut self, cancel_token: CancellationToken<N>) -> anyhow::Result<()>
where
N: NotifyCancel + Send + Sync + 'static,
{
let mut db = self.db;
let mut is_cancelled = cancel_token.is_cancelled();
self.groups
Expand Down Expand Up @@ -609,7 +613,7 @@ mod tests {
assert!(result.is_err());
}

fn never_cancel() -> CancellationToken {
fn never_cancel() -> CancellationToken<tokio_util::sync::CancellationToken> {
CancellationToken::new(tokio_util::sync::CancellationToken::new())
}
}
40 changes: 22 additions & 18 deletions crates/fuel-core/src/service/genesis/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ use futures::{
use itertools::Itertools;
use tokio::task::JoinSet;

pub struct TaskManager<T> {
pub struct TaskManager<T, N> {
set: JoinSet<anyhow::Result<T>>,
cancel_token: CancellationToken,
cancel_token: CancellationToken<N>,
}

#[async_trait::async_trait]
pub trait NotifyCancel {
async fn wait_until_cancelled(&self) -> anyhow::Result<()>;
fn wait_until_cancelled(
&self,
) -> impl core::future::Future<Output = anyhow::Result<()>> + Send;
fn is_cancelled(&self) -> bool;
}

#[async_trait::async_trait]
impl NotifyCancel for tokio_util::sync::CancellationToken {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
self.cancelled().await;
Expand All @@ -30,7 +30,6 @@ impl NotifyCancel for tokio_util::sync::CancellationToken {
}
}

#[async_trait::async_trait]
impl NotifyCancel for StateWatcher {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
let mut state = self.clone();
Expand All @@ -50,13 +49,16 @@ impl NotifyCancel for StateWatcher {
/// A token that implements [`NotifyCancel`]. Given to jobs inside of [`TaskManager`] so they can
/// stop either when commanded by the [`TaskManager`] or by an outside source.
#[derive(Clone)]
pub struct CancellationToken {
outside_signal: Arc<dyn NotifyCancel + Send + Sync>,
pub struct CancellationToken<N> {
outside_signal: Arc<N>,
inner_signal: tokio_util::sync::CancellationToken,
}

impl CancellationToken {
pub fn new(outside_signal: impl NotifyCancel + Send + Sync + 'static) -> Self {
impl<N> CancellationToken<N>
where
N: NotifyCancel,
{
pub fn new(outside_signal: N) -> Self {
Self {
outside_signal: Arc::new(outside_signal),
inner_signal: tokio_util::sync::CancellationToken::new(),
Expand All @@ -66,16 +68,17 @@ impl CancellationToken {
pub fn cancel(&self) {
self.inner_signal.cancel()
}
}

impl CancellationToken {
pub fn is_cancelled(&self) -> bool {
self.inner_signal.is_cancelled() || self.outside_signal.is_cancelled()
}
}

impl<T> TaskManager<T> {
pub fn new(outside_cancel: impl NotifyCancel + Send + Sync + 'static) -> Self {
impl<T, N> TaskManager<T, N>
where
N: NotifyCancel + Clone,
{
pub fn new(outside_cancel: N) -> Self {
Self {
set: JoinSet::new(),
cancel_token: CancellationToken::new(outside_cancel),
Expand All @@ -84,20 +87,21 @@ impl<T> TaskManager<T> {

pub fn run<F>(&mut self, arg: F) -> anyhow::Result<T>
where
F: FnOnce(CancellationToken) -> anyhow::Result<T>,
F: FnOnce(CancellationToken<N>) -> anyhow::Result<T>,
{
arg(self.cancel_token.clone())
}
}

impl<T> TaskManager<T>
impl<T, N> TaskManager<T, N>
where
T: Send + 'static,
N: NotifyCancel + Send + Sync + Clone + 'static,
{
#[cfg(test)]
pub fn spawn<F, Fut>(&mut self, arg: F)
where
F: FnOnce(CancellationToken) -> Fut,
F: FnOnce(CancellationToken<N>) -> Fut,
Fut: futures::Future<Output = anyhow::Result<T>> + Send + 'static,
{
let token = self.cancel_token.clone();
Expand All @@ -106,7 +110,7 @@ where

pub fn spawn_blocking<F>(&mut self, arg: F)
where
F: FnOnce(CancellationToken) -> anyhow::Result<T> + Send + 'static,
F: FnOnce(CancellationToken<N>) -> anyhow::Result<T> + Send + 'static,
{
let token = self.cancel_token.clone();
self.set.spawn_blocking(move || arg(token));
Expand Down

0 comments on commit edd77ac

Please sign in to comment.