Skip to content

Commit

Permalink
Adding optional arg to BF.INSERT to allow users to check if their blo…
Browse files Browse the repository at this point in the history
…om filter can reach the desired size

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Jan 21, 2025
1 parent 591ab10 commit 2be839e
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
36 changes: 36 additions & 0 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
true => (None, true),
false => (Some(configs::FIXED_SEED), false),
};
let mut wanted_capacity = -1;
let mut nocreate = false;
let mut items_provided = false;
while idx < argc {
Expand Down Expand Up @@ -553,6 +554,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"ATLEASTCAPACITY" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
wanted_capacity = match input_args[idx].to_string_lossy().parse::<i64>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_CAPACITY));
}
};
}
"ITEMS" => {
idx += 1;
items_provided = true;
Expand All @@ -568,6 +584,26 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
// When the `ITEMS` argument is provided, we expect additional item arg/s to be provided.
return Err(ValkeyError::WrongArity);
}
// Check if we have a wanted capacity and calculate if we can reach that capacity
if wanted_capacity > 0 {
if expansion == 0 {
return Err(ValkeyError::Str(
utils::NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID,
));
}
match utils::BloomObject::calculate_if_wanted_capacity_is_valid(
capacity,
fp_rate,
wanted_capacity,
tightening_ratio,
expansion,
) {
Ok(result) => result,
Err(e) => {
return Err(e);
}
}
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Expand Down
45 changes: 45 additions & 0 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bloomfilter::Bloom;
use bloomfilter::{deserialize, serialize};
use serde::{Deserialize, Deserializer, Serialize};
use std::sync::atomic::Ordering;
use valkey_module::ValkeyError;

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
Expand All @@ -32,10 +33,16 @@ pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit";
pub const WANTED_CAPACITY_EXCEEDS_MAX_SIZE: &str =
"ERR Wanted capacity would go beyond bloom object memory limit";
pub const WANTED_CAPACITY_FALSE_POSITIVE_INVALID: &str =
"ERR False positive degrades too much to reach wanted capacity";
pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists.";
pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed";
pub const DECODE_UNSUPPORTED_VERSION: &str =
"ERR bloom object decoding failed. Unsupported version";
pub const NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID: &str =
"ERR Specifying NONSCALING and ATLEASTCAPCITY is not allowed";
/// Logging Error messages
pub const ENCODE_BLOOM_OBJECT_FAILED: &str = "Failed to encode bloom object.";

Expand Down Expand Up @@ -455,6 +462,44 @@ impl BloomObject {
_ => Err(BloomError::DecodeUnsupportedVersion),
}
}

pub fn calculate_if_wanted_capacity_is_valid(
capacity: i64,
fp_rate: f64,
wanted_capacity: i64,
tightening_ratio: f64,
expansion: u32,
) -> Result<(), ValkeyError> {
let mut curr_capacity = capacity;
let mut curr_num_filters: u64 = 1;
let mut curr_fp_rate = fp_rate;
let mut filters_memory_usage = 0;
while curr_capacity < wanted_capacity {
curr_fp_rate = match BloomObject::calculate_fp_rate(
curr_fp_rate,
curr_num_filters as i32,
tightening_ratio,
) {
Ok(rate) => rate,
Err(_) => {
return Err(ValkeyError::Str(WANTED_CAPACITY_FALSE_POSITIVE_INVALID));
}
};
let curr_filter_size = BloomFilter::compute_size(curr_capacity, curr_fp_rate);
// For vectors of size < 4 the capacity of the vector is 4. However after that the capacity is always a power of two above or equal to the size
let curr_object_size = BloomObject::compute_size(
std::cmp::max(4, curr_num_filters).next_power_of_two() as usize,
) + filters_memory_usage
+ curr_filter_size;
if !BloomObject::validate_size(curr_object_size) {
return Err(ValkeyError::Str(WANTED_CAPACITY_EXCEEDS_MAX_SIZE));
}
filters_memory_usage += curr_filter_size;
curr_capacity *= expansion as i64;
curr_num_filters += 1;
}
Ok(())
}
}

/// Structure representing a single bloom filter. 200 Bytes.
Expand Down
3 changes: 3 additions & 0 deletions tests/test_bloom_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def test_bloom_command_error(self):
('BF.INSERT TEST_LIMIT EXPANSION 4294967299 ITEMS EXPAN', 'bad expansion'),
('BF.INSERT TEST_NOCREATE NOCREATE ITEMS A B', 'not found'),
('BF.INSERT KEY HELLO', 'unknown argument received'),
('BF.INSERT KEY CAPACITY 1 ERROR 0.0000000001 ATLEASTCAPACITY 10000000 EXPANSION 1', 'False positive degrades too much to reach wanted capacity'),
('BF.INSERT KEY ATLEASTCAPACITY 1000000000000', 'Wanted capacity would go beyond bloom object memory limit'),
('BF.INSERT KEY ATLEASTCAPACITY 1000000000000 NONSCALING', 'Specifying NONSCALING and ATLEASTCAPCITY is not allowed'),
('BF.RESERVE KEY String 100', 'bad error rate'),
('BF.RESERVE KEY 0.99999999999999999 3000', '(0 < error rate range < 1)'),
('BF.RESERVE KEY 2 100', '(0 < error rate range < 1)'),
Expand Down

0 comments on commit 2be839e

Please sign in to comment.