Skip to content

Commit

Permalink
refactor: Remove async-broadcast and stream clones (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Apr 13, 2024
1 parent 61766e7 commit 10eb995
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 137 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- chore: Update Cargo.toml to gate non-wasm dependencies. [PR 176](https://github.com/dariusc93/rust-ipfs/pull/176)
- chore: Reenable websocket support and add support for secured websockets.
- feat: Implmenets webrtc transport and add a new feature. [PR 177](https://github.com/dariusc93/rust-ipfs/pull/177)
- refactor: Remove async-broadcast and stream clones. [PR 174](httops://github.com/dariusc93/rust-ipfs/pull/174)

# 0.11.4
- fix: Send a wantlist of missing blocks.
Expand Down
24 changes: 1 addition & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ libp2p-webrtc = { version = "=0.7.1-alpha", features = [
"tokio",
], optional = true }

async-broadcast = "0.6"
fs2 = "0.4"
sled = { version = "0.34", optional = true }

Expand Down
134 changes: 38 additions & 96 deletions src/p2p/gossipsub.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use async_broadcast::TrySendError;
use futures::channel::mpsc::{self as channel};
use futures::stream::{FusedStream, Stream};
use libp2p::gossipsub::PublishError;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tracing::debug;

Expand All @@ -26,9 +23,7 @@ use libp2p::swarm::{
/// to different topics.
pub struct GossipsubStream {
// Tracks the topic subscriptions.
streams: HashMap<TopicHash, async_broadcast::Sender<GossipsubMessage>>,

active_streams: HashMap<TopicHash, Arc<AtomicUsize>>,
streams: HashMap<TopicHash, futures::channel::mpsc::Sender<GossipsubMessage>>,

// Gossipsub protocol
gossipsub: Gossipsub,
Expand Down Expand Up @@ -58,34 +53,15 @@ impl core::ops::DerefMut for GossipsubStream {
pub struct SubscriptionStream {
on_drop: Option<channel::UnboundedSender<TopicHash>>,
topic: Option<TopicHash>,
inner: async_broadcast::Receiver<GossipsubMessage>,
counter: Arc<AtomicUsize>,
}

impl Clone for SubscriptionStream {
fn clone(&self) -> Self {
self.counter.fetch_add(1, Ordering::SeqCst);
Self {
on_drop: self.on_drop.clone(),
topic: self.topic.clone(),
inner: self.inner.clone(),
counter: self.counter.clone(),
}
}
inner: futures::channel::mpsc::Receiver<GossipsubMessage>,
}

impl Drop for SubscriptionStream {
fn drop(&mut self) {
if self.counter.load(Ordering::SeqCst) == 1 {
// the on_drop option allows us to disable this unsubscribe on drop once the stream has
// ended.
if let Some(sender) = self.on_drop.take() {
if let Some(topic) = self.topic.take() {
let _ = sender.unbounded_send(topic);
}
if let Some(sender) = self.on_drop.take() {
if let Some(topic) = self.topic.take() {
let _ = sender.unbounded_send(topic);
}
} else {
self.counter.fetch_sub(1, Ordering::SeqCst);
}
}
}
Expand Down Expand Up @@ -140,7 +116,6 @@ impl From<Gossipsub> for GossipsubStream {
streams: HashMap::new(),
gossipsub,
unsubscriptions: (tx, rx),
active_streams: Default::default(),
}
}
}
Expand All @@ -150,65 +125,43 @@ impl GossipsubStream {
/// Returns a receiver for messages sent to the topic or `None` if subscription existed
/// already.
pub fn subscribe(&mut self, topic: impl Into<String>) -> anyhow::Result<SubscriptionStream> {
use std::collections::hash_map::Entry;
let topic = Topic::new(topic);

match self.streams.entry(topic.hash()) {
Entry::Vacant(ve) => {
match self.gossipsub.subscribe(&topic) {
Ok(true) => {
let counter = Arc::new(AtomicUsize::new(1));
self.active_streams
.insert(topic.hash(), Arc::clone(&counter));
let (tx, rx) = async_broadcast::broadcast(15000);
let key = ve.key().clone();
ve.insert(tx);
Ok(SubscriptionStream {
on_drop: Some(self.unsubscriptions.0.clone()),
topic: Some(key),
inner: rx,
counter,
})
}
Ok(false) => anyhow::bail!("Already subscribed to topic; shouldnt reach this"),
Err(e) => {
debug!("{}", e); //"subscribing to a unsubscribed topic should have succeeded"
Err(anyhow::Error::from(e))
}
}
}
Entry::Occupied(entry) => {
let rx = entry.get().clone().new_receiver();
let key = entry.key().clone();
let counter = self
.active_streams
.get(&key)
.cloned()
.ok_or(anyhow::anyhow!("No active stream"))?;
counter.fetch_add(1, Ordering::SeqCst);
Ok(SubscriptionStream {
on_drop: Some(self.unsubscriptions.0.clone()),
topic: Some(key),
inner: rx,
counter,
})
}
if self.streams.contains_key(&topic.hash()) {
anyhow::bail!("Already subscribed to topic")
}

if !self.gossipsub.subscribe(&topic)? {
anyhow::bail!("Already subscribed to topic")
}

let (tx, rx) = futures::channel::mpsc::channel(15000);
self.streams.insert(topic.hash(), tx);
Ok(SubscriptionStream {
on_drop: Some(self.unsubscriptions.0.clone()),
topic: Some(topic.hash()),
inner: rx,
})
}

/// Unsubscribes from a topic. Unsubscription is usually done through dropping the
/// SubscriptionStream.
///
/// Returns true if an existing subscription was dropped, false otherwise
pub fn unsubscribe(&mut self, topic: impl Into<String>) -> anyhow::Result<bool> {
let topic = Topic::new(topic.into());
if let Some(sender) = self.streams.remove(&topic.hash()) {
sender.close();
self.active_streams.remove(&topic.hash());
Ok(self.gossipsub.unsubscribe(&topic)?)
} else {
let topic = Topic::new(topic);

if !self.streams.contains_key(&topic.hash()) {
anyhow::bail!("Unable to unsubscribe from topic.")
}

self.streams
.remove(&topic.hash())
.expect("subscribed to topic");

self.gossipsub
.unsubscribe(&topic)
.map_err(anyhow::Error::from)
}

/// Publish to subscribed topic
Expand Down Expand Up @@ -314,16 +267,15 @@ impl NetworkBehaviour for GossipsubStream {
loop {
match self.unsubscriptions.1.poll_next_unpin(ctx) {
Poll::Ready(Some(dropped)) => {
if let Some(sender) = self.streams.remove(&dropped) {
sender.close();
if let Some(mut sender) = self.streams.remove(&dropped) {
sender.close_channel();
debug!("unsubscribing via drop from {:?}", dropped);
assert!(
self.gossipsub
.unsubscribe(&Topic::new(dropped.to_string()))
.unwrap_or_default(),
"Failed to unsubscribe a dropped subscription"
);
self.active_streams.remove(&dropped);
}
}
Poll::Ready(None) => unreachable!("we own the sender"),
Expand All @@ -335,8 +287,11 @@ impl NetworkBehaviour for GossipsubStream {
match futures::ready!(self.gossipsub.poll(ctx)) {
ToSwarm::GenerateEvent(GossipsubEvent::Message { message, .. }) => {
let topic = message.topic.clone();
if let Entry::Occupied(oe) = self.streams.entry(topic) {
if let Err(TrySendError::Closed(_)) = oe.get().try_broadcast(message) {
if let Entry::Occupied(mut oe) = self.streams.entry(topic) {
if let Err(e) = oe.get_mut().try_send(message) {
if e.is_full() {
continue;
}
// receiver has dropped
let (topic, _) = oe.remove_entry();
debug!("unsubscribing via SendError from {:?}", &topic);
Expand All @@ -346,27 +301,14 @@ impl NetworkBehaviour for GossipsubStream {
.unwrap_or_default(),
"Failed to unsubscribe following SendError"
);
self.active_streams.remove(&topic);
}
}
continue;
}
ToSwarm::GenerateEvent(GossipsubEvent::Subscribed { peer_id, topic }) => {
return Poll::Ready(ToSwarm::GenerateEvent(GossipsubEvent::Subscribed {
peer_id,
topic,
}));
}
ToSwarm::GenerateEvent(GossipsubEvent::Unsubscribed { peer_id, topic }) => {
return Poll::Ready(ToSwarm::GenerateEvent(GossipsubEvent::Unsubscribed {
peer_id,
topic,
}));
}
action => {
return Poll::Ready(action);
}
}
}
}
}
}
34 changes: 17 additions & 17 deletions tests/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ async fn subscribe_only_once() {
let _stream = a.pubsub_subscribe("some_topic").await.unwrap();
}

#[tokio::test]
async fn subscribe_multiple_times() {
let a = Node::new("test_node").await;
let _stream = a.pubsub_subscribe("some_topic").await.unwrap();
a.pubsub_subscribe("some_topic").await.unwrap();
}
// #[tokio::test]
// async fn subscribe_multiple_times() {
// let a = Node::new("test_node").await;
// let _stream = a.pubsub_subscribe("some_topic").await.unwrap();
// a.pubsub_subscribe("some_topic").await.unwrap();
// }

#[tokio::test]
async fn resubscribe_after_unsubscribe() {
Expand All @@ -32,22 +32,22 @@ async fn resubscribe_after_unsubscribe() {
drop(a.pubsub_subscribe("topic").await.unwrap());
}

#[tokio::test]
async fn unsubscribe_cloned_via_drop() {
let empty: &[&str] = &[];
let a = Node::new("test_node").await;
// #[tokio::test]
// async fn unsubscribe_cloned_via_drop() {
// let empty: &[&str] = &[];
// let a = Node::new("test_node").await;

let msgs_1 = a.pubsub_subscribe("topic").await.unwrap();
let msgs_2 = a.pubsub_subscribe("topic").await.unwrap();
// let msgs_1 = a.pubsub_subscribe("topic").await.unwrap();
// let msgs_2 = a.pubsub_subscribe("topic").await.unwrap();

drop(msgs_1);
// drop(msgs_1);

assert_ne!(a.pubsub_subscribed().await.unwrap(), empty);
// assert_ne!(a.pubsub_subscribed().await.unwrap(), empty);

drop(msgs_2);
// drop(msgs_2);

assert_eq!(a.pubsub_subscribed().await.unwrap(), empty);
}
// assert_eq!(a.pubsub_subscribed().await.unwrap(), empty);
// }

#[tokio::test]
async fn unsubscribe_via_drop() {
Expand Down

0 comments on commit 10eb995

Please sign in to comment.