Skip to content

Commit

Permalink
Keep track of publications
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Nov 16, 2024
1 parent ca25049 commit 468806e
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions rhio/src/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub enum ToNodeActor {
pub struct NodeActor {
inbox: mpsc::Receiver<ToNodeActor>,
subscriptions: Vec<Subscription>,
publications: Vec<Publication>,
nats_consumer_rx: SelectAll<BroadcastStream<JetStreamEvent>>,
p2panda_topic_rx: SelectAll<BroadcastStream<FromNetwork>>,
s3_watcher_rx: mpsc::Receiver<Result<S3Event, S3Error>>,
Expand All @@ -58,6 +59,7 @@ impl NodeActor {
Self {
nats,
subscriptions: Vec::new(),
publications: Vec::new(),
nats_consumer_rx: SelectAll::new(),
p2panda_topic_rx: SelectAll::new(),
s3_watcher_rx,
Expand Down Expand Up @@ -155,6 +157,10 @@ impl NodeActor {
}

async fn on_publish(&mut self, publication: Publication) -> Result<()> {
self.publications.push(publication.clone());

// 1. Subscribe to p2panda gossip overlay for "live-mode".
//
// When publishing we don't want to sync but only gossip. Only subscribing peers will want
// to initiate sync sessions with us.
//
Expand All @@ -172,14 +178,22 @@ impl NodeActor {
let topic_id = topic_query.id();
let network_rx = self.panda.subscribe(topic_query).await?;

// Wrap broadcast receiver stream into tokio helper, to make it implement the `Stream`
// trait which is required by `SelectAll`.
self.p2panda_topic_rx.push(BroadcastStream::new(network_rx));

// 2. Subscribe to an external data source for newly incoming data, so we can forward it to
// the gossip overlay later.
match publication {
Publication::Bucket { bucket: _ } => {
// @TODO
// Do nothing here. We handle incoming new blob events via the "on_watcher_event"
// method.
}
Publication::Subject {
stream_name,
subject,
} => {
// Subscribe to the NATS stream to receive new NATS messages from here.
let (_, nats_rx) = self
.nats
.subscribe(stream_name, subject, DeliverPolicy::New, topic_id)
Expand All @@ -188,10 +202,6 @@ impl NodeActor {
}
};

// Wrap broadcast receiver stream into tokio helper, to make it implement the `Stream`
// trait which is required by `SelectAll`.
self.p2panda_topic_rx.push(BroadcastStream::new(network_rx));

Ok(())
}

Expand Down

0 comments on commit 468806e

Please sign in to comment.