diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 521da66..bb3006b 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -462,6 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey true => (None, true), false => (Some(configs::FIXED_SEED), false), }; + let mut wanted_capacity = -1; let mut nocreate = false; let mut items_provided = false; while idx < argc { @@ -553,6 +554,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; } + "ATLEASTCAPACITY" => { + if idx >= (argc - 1) { + return Err(ValkeyError::WrongArity); + } + idx += 1; + wanted_capacity = match input_args[idx].to_string_lossy().parse::() { + Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num, + Ok(0) => { + return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0)); + } + _ => { + return Err(ValkeyError::Str(utils::BAD_CAPACITY)); + } + }; + } "ITEMS" => { idx += 1; items_provided = true; @@ -568,6 +584,26 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey // When the `ITEMS` argument is provided, we expect additional item arg/s to be provided. return Err(ValkeyError::WrongArity); } + // Check if we have a wanted capacity and calculate if we can reach that capacity + if wanted_capacity > 0 { + if expansion == 0 { + return Err(ValkeyError::Str( + utils::NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID, + )); + } + match utils::BloomObject::calculate_if_wanted_capacity_is_valid( + capacity, + fp_rate, + wanted_capacity, + tightening_ratio, + expansion, + ) { + Ok(result) => result, + Err(e) => { + return Err(e); + } + } + } // If the filter does not exist, create one let filter_key = ctx.open_key_writable(filter_name); let value = match filter_key.get_value::(&BLOOM_TYPE) { diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index e619482..67ef1d1 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -10,6 +10,7 @@ use bloomfilter::Bloom; use bloomfilter::{deserialize, serialize}; use serde::{Deserialize, Deserializer, Serialize}; use std::sync::atomic::Ordering; +use valkey_module::ValkeyError; /// KeySpace Notification Events pub const ADD_EVENT: &str = "bloom.add"; @@ -32,10 +33,16 @@ pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0) pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters"; pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received"; pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit"; +pub const WANTED_CAPACITY_EXCEEDS_MAX_SIZE: &str = + "ERR Wanted capacity would go beyond bloom object memory limit"; +pub const WANTED_CAPACITY_FALSE_POSITIVE_INVALID: &str = + "ERR False positive degrades too much to reach wanted capacity"; pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists."; pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed"; pub const DECODE_UNSUPPORTED_VERSION: &str = "ERR bloom object decoding failed. Unsupported version"; +pub const NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID: &str = + "ERR Specifying NONSCALING and ATLEASTCAPCITY is not allowed"; /// Logging Error messages pub const ENCODE_BLOOM_OBJECT_FAILED: &str = "Failed to encode bloom object."; @@ -455,6 +462,44 @@ impl BloomObject { _ => Err(BloomError::DecodeUnsupportedVersion), } } + + pub fn calculate_if_wanted_capacity_is_valid( + capacity: i64, + fp_rate: f64, + wanted_capacity: i64, + tightening_ratio: f64, + expansion: u32, + ) -> Result<(), ValkeyError> { + let mut curr_capacity = capacity; + let mut curr_num_filters: u64 = 1; + let mut curr_fp_rate = fp_rate; + let mut filters_memory_usage = 0; + while curr_capacity < wanted_capacity { + curr_fp_rate = match BloomObject::calculate_fp_rate( + curr_fp_rate, + curr_num_filters as i32, + tightening_ratio, + ) { + Ok(rate) => rate, + Err(_) => { + return Err(ValkeyError::Str(WANTED_CAPACITY_FALSE_POSITIVE_INVALID)); + } + }; + let curr_filter_size = BloomFilter::compute_size(curr_capacity, curr_fp_rate); + // For vectors of size < 4 the capacity of the vector is 4. However after that the capacity is always a power of two above or equal to the size + let curr_object_size = BloomObject::compute_size( + std::cmp::max(4, curr_num_filters).next_power_of_two() as usize, + ) + filters_memory_usage + + curr_filter_size; + if !BloomObject::validate_size(curr_object_size) { + return Err(ValkeyError::Str(WANTED_CAPACITY_EXCEEDS_MAX_SIZE)); + } + filters_memory_usage += curr_filter_size; + curr_capacity *= expansion as i64; + curr_num_filters += 1; + } + Ok(()) + } } /// Structure representing a single bloom filter. 200 Bytes. diff --git a/tests/test_bloom_command.py b/tests/test_bloom_command.py index 02afe3f..890f407 100644 --- a/tests/test_bloom_command.py +++ b/tests/test_bloom_command.py @@ -44,6 +44,9 @@ def test_bloom_command_error(self): ('BF.INSERT TEST_LIMIT EXPANSION 4294967299 ITEMS EXPAN', 'bad expansion'), ('BF.INSERT TEST_NOCREATE NOCREATE ITEMS A B', 'not found'), ('BF.INSERT KEY HELLO', 'unknown argument received'), + ('BF.INSERT KEY CAPACITY 1 ERROR 0.0000000001 ATLEASTCAPACITY 10000000 EXPANSION 1', 'False positive degrades too much to reach wanted capacity'), + ('BF.INSERT KEY ATLEASTCAPACITY 1000000000000', 'Wanted capacity would go beyond bloom object memory limit'), + ('BF.INSERT KEY ATLEASTCAPACITY 1000000000000 NONSCALING', 'Specifying NONSCALING and ATLEASTCAPCITY is not allowed'), ('BF.RESERVE KEY String 100', 'bad error rate'), ('BF.RESERVE KEY 0.99999999999999999 3000', '(0 < error rate range < 1)'), ('BF.RESERVE KEY 2 100', '(0 < error rate range < 1)'),