Skip to content

Commit

Permalink
Implemented hincrby & hincrbyfloat
Browse files Browse the repository at this point in the history
Fixed bug in `hset`: an already existing field was not updated
Fixed test flakiness when output is large (over 4K)
Added `HashDb::get_multi` and `HashDb::get`
  • Loading branch information
sabledb-io committed Apr 13, 2024
1 parent 4043f84 commit 27aa436
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 48 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ Below is a simple ( `ping` ) test conducted locally using WSL2 on Windows 10 (sa
| hlen ||| |
| hexists ||| |
| hgetall ||| |
| hincrby ||| |
| hincrbyfloat ||| |

### Generic commands

Expand Down
4 changes: 3 additions & 1 deletion libsabledb/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ impl Client {
| RedisCommandName::Hdel
| RedisCommandName::Hlen
| RedisCommandName::Hexists
| RedisCommandName::Hgetall => {
| RedisCommandName::Hgetall
| RedisCommandName::Hincrbyfloat
| RedisCommandName::Hincrby => {
match HashCommands::handle_command(client_state.clone(), command, tx).await? {
HandleCommandResult::Blocked(_) => {
return Err(SableError::OtherError(
Expand Down
2 changes: 2 additions & 0 deletions libsabledb/src/commands/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub enum RedisCommandName {
Hlen,
Hexists,
Hgetall,
Hincrby,
Hincrbyfloat,
NotSupported(String),
}

Expand Down
13 changes: 12 additions & 1 deletion libsabledb/src/commands/commander.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ impl CommandsManager {
}

#[derive(Default, Debug, Clone)]
#[allow(dead_code)]
pub struct CommandMetadata {
cmd_name: RedisCommandName,
cmd_flags: u64,
Expand Down Expand Up @@ -641,6 +640,18 @@ impl Default for CommandsManager {
.read_only()
.with_arity(2),
),
(
"hincrbyfloat",
CommandMetadata::new(RedisCommandName::Hincrbyfloat)
.write()
.with_arity(4),
),
(
"hincrby",
CommandMetadata::new(RedisCommandName::Hincrby)
.write()
.with_arity(4),
),
]),
}
}
Expand Down
146 changes: 139 additions & 7 deletions libsabledb/src/commands/hash_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
parse_string_to_number,
storage::{
GenericDb, GetHashMetadataResult, HashDb, HashDeleteResult, HashExistsResult,
HashGetResult, HashLenResult, HashPutResult,
HashGetMultiResult, HashGetResult, HashLenResult, HashPutResult,
},
types::List,
BytesMutUtils, Expiration, LockManager, PrimaryKeyMetadata, RedisCommand, RedisCommandName,
Expand Down Expand Up @@ -46,6 +46,12 @@ impl HashCommands {
RedisCommandName::Hexists => {
Self::hexists(client_state, command, &mut response_buffer).await?;
}
RedisCommandName::Hincrbyfloat => {
Self::hincrbyfloat(client_state, command, &mut response_buffer).await?;
}
RedisCommandName::Hincrby => {
Self::hincrby(client_state, command, &mut response_buffer).await?;
}
RedisCommandName::Hgetall => {
// HGETALL writes the response directly to the client
Self::hgetall(client_state, command, tx).await?;
Expand Down Expand Up @@ -109,7 +115,7 @@ impl HashCommands {
let _unused = LockManager::lock_user_key_exclusive(key, client_state.database_id());
let hash_db = HashDb::with_storage(client_state.database(), client_state.database_id());

let items_put = match hash_db.put(key, &field_values)? {
let items_put = match hash_db.put_multi(key, &field_values)? {
HashPutResult::Some(count) => count,
HashPutResult::WrongType => {
builder.error_string(response_buffer, ErrorStrings::WRONGTYPE);
Expand Down Expand Up @@ -137,17 +143,17 @@ impl HashCommands {
let _unused = LockManager::lock_user_key_shared(key, client_state.database_id());
let hash_db = HashDb::with_storage(client_state.database(), client_state.database_id());

let items = match hash_db.get(key, &[&field])? {
HashGetResult::WrongType => {
let items = match hash_db.get_multi(key, &[&field])? {
HashGetMultiResult::WrongType => {
builder.error_string(response_buffer, ErrorStrings::WRONGTYPE);
return Ok(());
}
HashGetResult::Some(items) => {
HashGetMultiResult::Some(items) => {
// update telemetries
Telemetry::inc_db_hit();
items
}
HashGetResult::None => {
HashGetMultiResult::None => {
// update telemetries
Telemetry::inc_db_miss();
builder.null_string(response_buffer);
Expand Down Expand Up @@ -285,7 +291,7 @@ impl HashCommands {
tx.write_all(&response_buffer).await?;
return Ok(());
}
GetHashMetadataResult::None => {
GetHashMetadataResult::NotFound => {
builder.empty_array(&mut response_buffer);
tx.write_all(&response_buffer).await?;
return Ok(());
Expand Down Expand Up @@ -359,6 +365,108 @@ impl HashCommands {

Ok(())
}

/// Increments the number stored at field in the hash stored at key by increment.
/// If key does not exist, a new key holding a hash is created. If field does not exist the value is set to 0
/// before the operation is performed.
/// The range of values supported by HINCRBY is limited to 64 bit signed integers.
async fn hincrby(
client_state: Rc<ClientState>,
command: Rc<RedisCommand>,
response_buffer: &mut BytesMut,
) -> Result<(), SableError> {
check_args_count!(command, 4, response_buffer);
let builder = RespBuilderV2::default();
let key = command_arg_at!(command, 1);
let field = command_arg_at!(command, 2);
let increment = command_arg_at!(command, 3);

let Some(increment) = BytesMutUtils::parse::<i64>(increment) else {
builder.error_string(
response_buffer,
ErrorStrings::VALUE_NOT_AN_INT_OR_OUT_OF_RANGE,
);
return Ok(());
};

// Lock and delete
let _unused = LockManager::lock_user_key_exclusive(key, client_state.database_id());
let hash_db = HashDb::with_storage(client_state.database(), client_state.database_id());

let prev_value = match hash_db.get(key, field)? {
HashGetResult::WrongType => {
builder.error_string(response_buffer, ErrorStrings::WRONGTYPE);
return Ok(());
}
HashGetResult::NotFound | HashGetResult::FieldNotFound => 0i64,
HashGetResult::Some(value) => {
let Some(value) = BytesMutUtils::parse::<i64>(&value) else {
builder.error_string(response_buffer, "ERR hash value is not an integer");
return Ok(());
};
value
}
};

let new_value = prev_value + increment;
builder.number::<i64>(response_buffer, new_value, false);

// store the new value
let new_value = BytesMutUtils::from::<i64>(&new_value);
let _ = hash_db.put_multi(key, &[(field, &new_value)])?;
Ok(())
}

/// Increments the number stored at field in the hash stored at key by increment.
/// If key does not exist, a new key holding a hash is created. If field does not exist the value is set to 0
/// before the operation is performed.
/// The range of values supported by HINCRBY is limited to 64 bit signed integers.
async fn hincrbyfloat(
client_state: Rc<ClientState>,
command: Rc<RedisCommand>,
response_buffer: &mut BytesMut,
) -> Result<(), SableError> {
check_args_count!(command, 4, response_buffer);
let builder = RespBuilderV2::default();
let key = command_arg_at!(command, 1);
let field = command_arg_at!(command, 2);
let increment = command_arg_at!(command, 3);

let Some(increment) = BytesMutUtils::parse::<f64>(increment) else {
builder.error_string(
response_buffer,
ErrorStrings::VALUE_NOT_AN_INT_OR_OUT_OF_RANGE,
);
return Ok(());
};

// Lock and delete
let _unused = LockManager::lock_user_key_exclusive(key, client_state.database_id());
let hash_db = HashDb::with_storage(client_state.database(), client_state.database_id());

let prev_value = match hash_db.get(key, field)? {
HashGetResult::WrongType => {
builder.error_string(response_buffer, ErrorStrings::WRONGTYPE);
return Ok(());
}
HashGetResult::NotFound | HashGetResult::FieldNotFound => 0f64,
HashGetResult::Some(value) => {
let Some(value) = BytesMutUtils::parse::<f64>(&value) else {
builder.error_string(response_buffer, "ERR hash value is not an integer");
return Ok(());
};
value
}
};

let new_value = prev_value + increment;
builder.number::<f64>(response_buffer, new_value, true);

// store the new value
let new_value = BytesMutUtils::from::<f64>(&new_value);
let _ = hash_db.put_multi(key, &[(field, &new_value)])?;
Ok(())
}
}

// _ _ _ _ _____ _______ _______ ______ _____ _______ _____ _ _ _____
Expand All @@ -383,6 +491,10 @@ mod test {
(vec!["hset", "myhash", "field1", "value1", "field1", "value1"], ":0\r\n"),
(vec!["hset", "myhash", "field1", "value1", "field2"], "-ERR wrong number of arguments for 'hset' command\r\n"),
(vec!["hset", "myhash", "field2", "value2"], ":1\r\n"),
(vec!["hset", "myhash", "f3", "v1"], ":1\r\n"),
(vec!["hset", "myhash", "f3", "v2"], ":0\r\n"),
(vec!["hset", "myhash", "f3", "v3"], ":0\r\n"),
(vec!["hget", "myhash", "f3"], "$2\r\nv3\r\n"), // expect the last update
(vec!["hset", "myhash"], "-ERR wrong number of arguments for 'hset' command\r\n"),
], "test_hset"; "test_hset")]
#[test_case(vec![
Expand Down Expand Up @@ -423,6 +535,26 @@ mod test {
(vec!["hgetall", "not_a_hash"], "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"),
(vec!["hgetall"], "-ERR wrong number of arguments for 'hgetall' command\r\n"),
], "test_hgetall"; "test_hgetall")]
#[test_case(vec![
(vec!["hincrby"], "-ERR wrong number of arguments for 'hincrby' command\r\n"),
(vec!["hincrby", "myhash"], "-ERR wrong number of arguments for 'hincrby' command\r\n"),
(vec!["hincrby", "myhash", "field"], "-ERR wrong number of arguments for 'hincrby' command\r\n"),
(vec!["hincrby", "myhash", "field", "1"], ":1\r\n"),
(vec!["hincrby", "myhash", "field", "1"], ":2\r\n"),
(vec!["hincrby", "myhash", "field", "1.0"], "-ERR value is not an integer or out of range\r\n"),
(vec!["set", "string", "field"], "+OK\r\n"),
(vec!["hincrby", "string", "field", "1"], "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"),
], "test_hincrby"; "test_hincrby")]
#[test_case(vec![
(vec!["hincrbyfloat"], "-ERR wrong number of arguments for 'hincrbyfloat' command\r\n"),
(vec!["hincrbyfloat", "myhash"], "-ERR wrong number of arguments for 'hincrbyfloat' command\r\n"),
(vec!["hincrbyfloat", "myhash", "field"], "-ERR wrong number of arguments for 'hincrbyfloat' command\r\n"),
(vec!["hincrbyfloat", "myhash", "field", "1"], ",1\r\n"),
(vec!["hincrbyfloat", "myhash", "field", "1"], ",2\r\n"),
(vec!["hincrbyfloat", "myhash", "field", "1.0"], ",3\r\n"),
(vec!["set", "string", "field"], "+OK\r\n"),
(vec!["hincrbyfloat", "string", "field", "1"], "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"),
], "test_hincrbyfloat"; "test_hincrbyfloat")]
fn test_hash_commands(
args_vec: Vec<(Vec<&'static str>, &'static str)>,
test_name: &str,
Expand Down
6 changes: 4 additions & 2 deletions libsabledb/src/commands/server_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ mod test {
.await
.unwrap();

let raw_response = sink.read_all().await;
// the output might be large, use a large buffer
let raw_response = sink.read_all_with_size(128 << 10).await;
let manager = crate::commands::commands_manager();

// the response should starts with "*<commands-count>\r\n"
Expand Down Expand Up @@ -186,7 +187,8 @@ mod test {
.await
.unwrap();

let raw_response = sink.read_all().await;
// the output might be large, use a large buffer
let raw_response = sink.read_all_with_size(128 << 10).await;
let manager = crate::commands::commands_manager();

// the response should starts with "*<2 x commands-count>\r\n"
Expand Down
7 changes: 5 additions & 2 deletions libsabledb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,16 @@ mod tests {
}

pub async fn read_all(&mut self) -> String {
self.read_all_with_size(4096).await
}

pub async fn read_all_with_size(&mut self, size: usize) -> String {
self.fp.sync_all().await.unwrap();
let mut fp = tokio::fs::File::open(&self.temp_file.fullpath())
.await
.unwrap();

let mut buffer = bytes::BytesMut::with_capacity(4096);
let mut buffer = bytes::BytesMut::with_capacity(size);
fp.read_buf(&mut buffer).await.unwrap();
crate::BytesMutUtils::to_string(&buffer)
}
Expand All @@ -187,7 +191,6 @@ mod tests {
fn drop(&mut self) {
if let Ok(md) = std::fs::metadata(self.dirpath.clone()) {
if md.is_dir() {
println!("deleting {}", self.dirpath.as_str());
std::fs::remove_dir_all(self.dirpath.clone()).unwrap();
}
}
Expand Down
Loading

0 comments on commit 27aa436

Please sign in to comment.