Skip to content

Commit

Permalink
create a fake persitenceUpstream example
Browse files Browse the repository at this point in the history
- expose the struct Wrappers for as they will be required to build custom peristence upstreams
  • Loading branch information
lytefast committed Jul 20, 2022
1 parent 6b09dd1 commit 90b0987
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "A basic implementation of GCRA algorithm for rate limiting"
keywords = ["rate-limit", "rate", "limit", "gcra"]

[features]
default = []
default = ["rate-limiter"]
rate-limiter = ["thingvellir", "tokio"]

[dependencies]
Expand Down
5 changes: 4 additions & 1 deletion examples/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use gcra::{GcraError, RateLimit, RateLimiter, RateLimiterError};

const CACHE_CAPACITY: usize = 4;
const WORKER_SHARD_COUNT: u8 = 4;

#[tokio::main]
async fn main() -> Result<(), RateLimiterError> {
let rate_limit = RateLimit::per_sec(2);
let mut rl = RateLimiter::new(4, 4);
let mut rl = RateLimiter::new(CACHE_CAPACITY, WORKER_SHARD_COUNT);

rl.check("key", rate_limit.clone(), 1).await?;
rl.check("key", rate_limit.clone(), 1).await?;
Expand Down
86 changes: 86 additions & 0 deletions examples/rate_limiter_persisted.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::{collections::HashMap, time::Duration, hash::Hash, thread, sync::{RwLock, Arc}};

use gcra::{GcraError, RateLimit, RateLimiter, RateLimiterError, RateLimitEntry};
use thingvellir::{LoadFromUpstream, CommitToUpstream};


const CACHE_CAPACITY: usize = 4;
const WORKER_SHARD_COUNT: u8 = 4;

const IO_DURATION: Duration = Duration::from_millis(50);

#[tokio::main]
async fn main() -> Result<(), RateLimiterError> {
let rate_limit = RateLimit::per_sec(2);

// Use a persistence backed rate limiter
let upstream_factory = FakePersistenceUpstream {
data: Arc::new(RwLock::new(HashMap::new()))
};
let mut rl = RateLimiter::with_handle(
thingvellir::service_builder(CACHE_CAPACITY)
.num_shards(WORKER_SHARD_COUNT)
.build_mutable(upstream_factory, thingvellir::DefaultCommitPolicy::Immediate)
);

rl.check("key", rate_limit.clone(), 1).await?;
rl.check("key", rate_limit.clone(), 1).await?;

match rl.check("key", rate_limit.clone(), 1).await {
Err(RateLimiterError::GcraError(GcraError::DeniedUntil { next_allowed_at })) => {
println!("Denied: Request next at {:?}", next_allowed_at);

let duration = Duration::from_millis(10);
println!("Sleep for {:?} to allow commits", duration);
tokio::time::sleep(duration).await;
}
unexpected => panic!("Opps something went wrong! {:?}", unexpected),
};

Ok(())
}

#[derive(Clone)]
struct FakePersistenceUpstream<K, D>{data: Arc<RwLock<HashMap<K, D>>>}


impl<Key> LoadFromUpstream<Key, RateLimitEntry> for FakePersistenceUpstream<Key, RateLimitEntry>
where
Key: 'static + Send + Sync + Hash + Eq + Clone,
{
fn load(&mut self, request: thingvellir::DataLoadRequest<Key, RateLimitEntry>) {
let key = request.key().clone();
let data = self.data.clone();

request.spawn_default(async move {
println!("LOAD. Sleeping for {:?}", IO_DURATION);
tokio::time::sleep(IO_DURATION).await;

match data.read().expect("RwLock poisoned").get(&key) {
Some(value) => {
println!("LOADED {:?}", value);
Ok(value.clone())},
None => {
println!("LOADED NOT_FOUND");
Err(thingvellir::UpstreamError::KeyNotFound)
},
}
});
}
}

impl<Key> CommitToUpstream<Key, RateLimitEntry> for FakePersistenceUpstream<Key, RateLimitEntry>
where
Key: 'static + Send + Sync + Hash + Eq + Clone,
{
fn commit<'a>(&mut self, request: thingvellir::DataCommitRequest<'a, Key, RateLimitEntry>) {
let key = request.key().clone();
let data = request.data().clone();

println!("COMMIT. Sleeping for {:?}", IO_DURATION);
thread::sleep(IO_DURATION);

self.data.write().expect("RwLock poisoned").insert(key, data);
request.resolve()
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ mod rate_limiter;
pub use crate::gcra::{GcraError, GcraState};
pub use crate::rate_limit::RateLimit;
#[cfg(feature = "rate-limiter")]
pub use crate::rate_limiter::{RateLimiter, RateLimiterError};
pub use crate::rate_limiter::{RateLimiter, RateLimiterError, RateLimitEntry, RateLimitRequest};
5 changes: 2 additions & 3 deletions src/rate_limiter/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use thingvellir::{

use crate::{GcraState, RateLimit};

#[derive(Default, Clone)]
pub(crate) struct RateLimitEntry {
#[derive(Default, Debug, Clone)]
pub struct RateLimitEntry {
pub gcra_state: GcraState,
expires_at: Option<tokio::time::Instant>,
}
Expand All @@ -31,7 +31,6 @@ impl DerefMut for RateLimitEntry {

impl ServiceData for RateLimitEntry {
fn should_persist(&self) -> bool {
// we are in memory, always save to
true
}

Expand Down
1 change: 1 addition & 0 deletions src/rate_limiter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ mod entry;
mod rate_limiter;

pub use rate_limiter::*;
pub use entry::*;
13 changes: 11 additions & 2 deletions src/rate_limiter/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub enum RateLimiterError {
}

#[derive(Clone, Hash, PartialEq, Eq, Debug)]
struct RateLimitRequest<T> {
pub struct RateLimitRequest<T> {
key: T,
}

Expand All @@ -39,6 +39,7 @@ impl<Key> RateLimiter<Key, InstantClock>
where
Key: Send + Clone + Hash + Eq + Display + 'static,
{
/// Constructs an [InMemoryUpstream] sharded instance of a rate limiter.
pub fn new(max_data_capacity: usize, num_shards: u8) -> Self {
Self {
clock: InstantClock,
Expand All @@ -50,6 +51,14 @@ where
),
}
}

/// Constructs a instance using the [MutableServiceHandle] provided.
pub fn with_handle(shard_handle: MutableServiceHandle<RateLimitRequest<Key>, RateLimitEntry>) -> Self {
Self {
clock: InstantClock,
shard_handle
}
}
}

impl<Key, C> RateLimiter<Key, C>
Expand Down Expand Up @@ -92,7 +101,7 @@ where
match check_result {
Ok(_) => {
entry.update_expiration(&rate_limit);
Commit::immediately(check_result)
Commit::default(check_result)
}
Err(GcraError::DeniedUntil { .. })
| Err(GcraError::DeniedIndefinitely { .. }) => unsafe {
Expand Down

0 comments on commit 90b0987

Please sign in to comment.