Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retroactively remove users based on the whitelist #64

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cdn-broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ impl Connections {
self.brokers.keys().cloned().collect()
}

/// Get all of the users that are connected to us. We use this when we need
/// to check if they are still whitelisted.
pub fn all_users(&self) -> Vec<UserPublicKey> {
self.users.keys().cloned().collect()
}

/// Insert a broker with its connection into our map.
pub fn add_broker(
&mut self,
Expand Down
8 changes: 8 additions & 0 deletions cdn-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ impl<R: RunDef> Broker<R> {
let inner_ = self.inner.clone();
let sync_task = AbortOnDropHandle(spawn(inner_.run_sync_task()));

// Spawn the whitelist task, which retroactively checks if existing users are still
// whitelisted
let inner_ = self.inner.clone();
let whitelist_task = AbortOnDropHandle(spawn(inner_.run_whitelist_task()));

// Spawn the public (user) listener task
// TODO: maybe macro this, since it's repeat code with the private listener task
let inner_ = self.inner.clone();
Expand Down Expand Up @@ -307,6 +312,9 @@ impl<R: RunDef> Broker<R> {
_ = broker_listener_task => {
Err(Error::Exited("broker listener task exited!".to_string()))
}
_ = whitelist_task => {
Err(Error::Exited("whitelist task exited!".to_string()))
}
}
}
}
1 change: 1 addition & 0 deletions cdn-broker/src/tasks/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pub mod heartbeat;
pub mod listener;
pub mod sender;
pub mod sync;
pub mod whitelist;
45 changes: 45 additions & 0 deletions cdn-broker/src/tasks/broker/whitelist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2024 Espresso Systems (espressosys.com)
// This file is part of the Push-CDN repository.

// You should have received a copy of the MIT License
// along with the Push-CDN repository. If not, see <https://mit-license.org/>.

//! The sync task syncs both users and topics to other brokers.

use std::{sync::Arc, time::Duration};

use cdn_proto::{def::RunDef, discovery::DiscoveryClient};
use tokio::time::sleep;

use crate::Inner;

impl<Def: RunDef> Inner<Def> {
/// Run the whitelist task. This is responsible for checking if users are still whitelisted
/// and kicking them off the network if they are not.
pub async fn run_whitelist_task(self: Arc<Self>) {
// Clone the discovery client because it's behind an `Arc`
let mut discovery_client = self.discovery_client.clone();

loop {
// Run every minute
sleep(Duration::from_secs(60)).await;

// Get a list of all users
let users = self.connections.read().all_users();

// Make sure each user is still whitelisted
for user in users {
if !discovery_client
.check_whitelist(&user)
.await
.unwrap_or(true)
{
// Kick the user off the network if they are not
self.connections
.write()
.remove_user(user, "not in whitelist");
}
}
}
}
}