Skip to content

Commit

Permalink
feat: Add Executor trait with async executor impl (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Nov 28, 2024
1 parent 528577c commit 0c94d74
Show file tree
Hide file tree
Showing 4 changed files with 390 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# 0.13.1
- feat: Add Executor trait and impl for tokio and wasm-bindgen-futures.
- refactor: return `ConnectionId` when establishing connection.

# 0.13.0
Expand Down
46 changes: 25 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use repo::{
BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, RepoInsertPin, RepoRemovePin,
};

use tokio_util::sync::{CancellationToken, DropGuard};
use rt::{AbortableJoinHandle, Executor, ExecutorSwitch};
use tracing::Span;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -340,7 +340,9 @@ pub struct Ipfs {
identify_conf: IdentifyConfiguration,
to_task: Sender<IpfsEvent>,
record_key_validator: HashMap<String, Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>>,
_guard: Arc<DropGuard>,
_guard: AbortableJoinHandle<()>,
_gc_guard: AbortableJoinHandle<()>,
executor: ExecutorSwitch,
}

impl std::fmt::Debug for Ipfs {
Expand Down Expand Up @@ -909,6 +911,8 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
..
} = self;

let executor = ExecutorSwitch;

let keys = keys.unwrap_or(Keypair::generate_ed25519());

let root_span = Option::take(&mut options.span)
Expand Down Expand Up @@ -972,15 +976,15 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
}
}

let token = CancellationToken::new();
let _guard = Arc::new(token.clone().drop_guard());
let mut _guard = AbortableJoinHandle::empty();
let mut _gc_guard = AbortableJoinHandle::empty();

let (to_task, receiver) = channel::<IpfsEvent>(1);
let id_conf = options.identify_configuration.clone();

let keystore = options.keystore.clone();

let ipfs = Ipfs {
let mut ipfs = Ipfs {
span: facade_span,
repo,
identify_conf: id_conf,
Expand All @@ -989,6 +993,8 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
to_task,
record_key_validator,
_guard,
_gc_guard,
executor,
};

//Note: If `All` or `Pinned` are used, we would have to auto adjust the amount of
Expand Down Expand Up @@ -1032,6 +1038,7 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
let swarm = create_swarm(
&keys,
&options,
executor,
&ipfs.repo,
exec_span,
(custom_behaviour, custom_transport),
Expand All @@ -1041,10 +1048,9 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
listening_addrs, ..
} = options;

if let Some(config) = gc_config {
rt::spawn({
let gc_handle = gc_config.map(|config| {
executor.spawn_abortable({
let repo = ipfs.repo.clone();
let token = token.clone();
async move {
let GCConfig { duration, trigger } = config;
let use_config_timer = duration != Duration::ZERO;
Expand All @@ -1062,10 +1068,6 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {

loop {
tokio::select! {
_ = token.cancelled() => {
tracing::debug!("gc task cancelled");
break
},
_ = &mut interval => {
let _g = repo.inner.gclock.write().await;
tracing::debug!("preparing gc operation");
Expand Down Expand Up @@ -1114,8 +1116,8 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
}
}
}
});
}
})
}).unwrap_or(AbortableJoinHandle::empty());

let mut fut = task::IpfsTask::new(
swarm,
Expand Down Expand Up @@ -1153,7 +1155,7 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
}
}

rt::spawn({
ipfs._guard.replace(executor.spawn_abortable({
async move {
//Note: For now this is not configurable as its meant for internal testing purposes but may change in the future
let as_fut = false;
Expand All @@ -1164,13 +1166,11 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
fut.run().boxed()
};

tokio::select! {
_ = fut => {}
_ = token.cancelled() => {},
};
fut.await
}
.instrument(swarm_span)
});
}));
ipfs._gc_guard.replace(gc_handle);
Ok(ipfs)
}
}
Expand Down Expand Up @@ -2512,7 +2512,7 @@ impl Ipfs {
self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?;
let fut = rx.await??;

rt::spawn(async move {
self.executor.dispatch(async move {
if let Err(e) = fut.await.map_err(|e| anyhow!(e)) {
tracing::error!(error = %e, "failed to bootstrap");
}
Expand Down Expand Up @@ -2598,6 +2598,10 @@ impl Ipfs {

// ignoring the error because it'd mean that the background task had already been dropped
let _ = self.to_task.try_send(IpfsEvent::Exit);

// terminte task that handles GC and spawn task
self._gc_guard.abort();
self._guard.abort();
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! P2P handling for IPFS nodes.
use crate::error::Error;
use crate::repo::Repo;
use crate::rt::{Executor, ExecutorSwitch};
use crate::{IpfsOptions, TTransportFn};
use std::convert::TryInto;
use std::num::{NonZeroU8, NonZeroUsize};
Expand Down Expand Up @@ -205,6 +206,7 @@ impl Default for SwarmConfig {
pub(crate) fn create_swarm<C>(
keypair: &Keypair,
options: &IpfsOptions,
executor: ExecutorSwitch,
repo: &Repo,
span: Span,
(custom, custom_transport): (Option<C>, Option<TTransportFn>),
Expand Down Expand Up @@ -233,7 +235,7 @@ where
transport,
behaviour,
peer_id,
libp2p::swarm::Config::with_executor(SpannedExecutor(span))
libp2p::swarm::Config::with_executor(SpannedExecutor { executor, span })
.with_notify_handler_buffer_size(swarm_config.notify_handler_buffer_size)
.with_per_connection_event_buffer_size(swarm_config.connection_event_buffer_size)
.with_dial_concurrency_factor(swarm_config.dial_concurrency_factor)
Expand All @@ -244,14 +246,17 @@ where
Ok(swarm)
}

struct SpannedExecutor(Span);
struct SpannedExecutor {
executor: ExecutorSwitch,
span: Span,
}

impl libp2p::swarm::Executor for SpannedExecutor {
fn exec(
&self,
future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + 'static + Send>>,
) {
use tracing_futures::Instrument;
crate::rt::spawn(future.instrument(self.0.clone()));
self.executor.dispatch(future.instrument(self.span.clone()));
}
}
Loading

0 comments on commit 0c94d74

Please sign in to comment.