Skip to content

Commit

Permalink
chunk cache change hash to checksum (#156)
Browse files Browse the repository at this point in the history
fix XET-281

Calculating the hash of the cache file takes too long of a time relative
to the target cache put performance in CPU constrained systems. We've
already dropped the verification on gets, now dropping on writes and
replacing with a CRC.

PS includes some good style updates.
  • Loading branch information
assafvayner authored Jan 29, 2025
1 parent 0bc4927 commit 0f61841
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 130 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion chunk_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ thiserror = "2.0"
error_printer = { path = "../error_printer" }
file_utils = { path = "../file_utils" }
utils = { path = "../utils" }
blake3 = "1.5.4"
base64 = "0.22.1"
tracing = "0.1.40"
rand = "0.8.5"
mockall = "0.13.0"
clap = { version = "4.5.20", optional = true, features = ["derive"] }
once_cell = "1.20.2"
crc32fast = "1.4.2"
log = "0.4.22"

[dev-dependencies]
tokio = { version = "1.36", features = ["full"] }
Expand Down
162 changes: 57 additions & 105 deletions chunk_cache/src/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ impl DiskCache {
println!();
for item in items.iter() {
println!(
"\titem: chunk range [{}-{}) ; len({})",
"\titem: chunk range [{}-{}) ; len({}); checksum({})",
item.range.start,
item.range.end,
output_bytes(item.len as usize)
output_bytes(item.len as usize),
item.checksum,
);
}
}
Expand Down Expand Up @@ -146,22 +147,15 @@ impl DiskCache {
let mut num_items = 0;
let max_num_bytes = 2 * capacity;

let readdir = match read_dir(cache_root) {
Ok(Some(rd)) => rd,
Ok(None) => return Ok(CacheState::new(state, 0, 0)),
Err(e) => return Err(e),
let Some(cache_root_readdir) = read_dir(cache_root)? else {
return Ok(CacheState::new(state, 0, 0));
};

// loop through cache root directory, first level containing "prefix" directories
// each of which may contain key directories with cache items
for key_prefix_dir in readdir {
// this match pattern appears often in this function, and we could write a macro to replace it
// however this puts an implicit change of control flow with continue/return cases that is
// hard to decipher from a macro, so avoid replace it for readability
let key_prefix_dir = match is_ok_dir(key_prefix_dir) {
Ok(Some(dirent)) => dirent,
Ok(None) => continue,
Err(e) => return Err(e),
for key_prefix_dir in cache_root_readdir {
let Some(key_prefix_dir) = is_ok_dir(key_prefix_dir)? else {
continue;
};

let key_prefix_dir_name = key_prefix_dir.file_name();
Expand All @@ -170,14 +164,12 @@ impl DiskCache {
continue;
}

let key_prefix_readir = match read_dir(key_prefix_dir.path()) {
Ok(Some(rd)) => rd,
Ok(None) => continue,
Err(e) => return Err(e),
let Some(key_prefix_readdir) = read_dir(key_prefix_dir.path())? else {
continue;
};

// loop throught key directories inside prefix directory
for key_dir in key_prefix_readir {
for key_dir in key_prefix_readdir {
let key_dir = match is_ok_dir(key_dir) {
Ok(Some(dirent)) => dirent,
Ok(None) => continue,
Expand Down Expand Up @@ -243,31 +235,12 @@ impl DiskCache {
}

loop {
let cache_item = if let Some(item) = self.find_match(key, range)? {
item
} else {
let Some(cache_item) = self.find_match(key, range)? else {
return Ok(None);
};

let path = self.item_path(key, &cache_item)?;

// OLD, needed for hash validation, read file to buffer to do validation
// let mut file_buf = {
// let file = match File::open(&path) {
// Ok(file) => file,
// Err(e) => match e.kind() {
// ErrorKind::NotFound => {
// self.remove_item(key, &cache_item)?;
// continue;
// },
// _ => return Err(e.into()),
// },
// };
// // let mut buf = Vec::with_capacity(file.metadata()?.len() as usize);
// // file.read_to_end(&mut buf)?;
// // Cursor::new(buf)
// };

let mut file_buf = match File::open(&path) {
Ok(file) => file,
Err(e) => match e.kind() {
Expand All @@ -279,23 +252,13 @@ impl DiskCache {
},
};

// TODO: reintroduce hash validation of cache file, but not for every get, memoize success status per cache
// file let hash = compute_hash_from_reader(&mut file_buf)?;
// if hash != cache_item.hash {
// debug!("file hash mismatch on path: {path:?}, key: {key}, item: {cache_item}");
// if to_print {
// info!("removed cache item 0");
// }
// self.remove_item(key, &cache_item)?;
// continue;
// }
// TODO: reintroduce checksum validation of cache file, but not for every get, memoize success status per
// cache item

file_buf.seek(SeekFrom::Start(0))?;
let header_result = CacheFileHeader::deserialize(&mut file_buf)
.debug_error(format!("failed to deserialize cache file header on path: {path:?}"));
let header = if let Ok(header) = header_result {
header
} else {
let Ok(header) = CacheFileHeader::deserialize(&mut file_buf)
.debug_error(format!("failed to deserialize cache file header on path: {path:?}"))
else {
self.remove_item(key, &cache_item)?;
continue;
};
Expand All @@ -308,9 +271,7 @@ impl DiskCache {

fn find_match(&self, key: &Key, range: &ChunkRange) -> OptionResult<CacheItem, ChunkCacheError> {
let state = self.state.lock()?;
let items = if let Some(items) = state.inner.get(key) {
items
} else {
let Some(items) = state.inner.get(key) else {
return Ok(None);
};

Expand All @@ -331,13 +292,13 @@ impl DiskCache {
data: &[u8],
) -> Result<(), ChunkCacheError> {
if range.start >= range.end
|| chunk_byte_indices.len() != (range.end - range.start + 1) as usize
// chunk_byte_indices is guarenteed to be more than 1 element at this point
|| chunk_byte_indices[0] != 0
|| *chunk_byte_indices.last().unwrap() as usize != data.len()
|| !strictly_increasing(chunk_byte_indices)
// assert 1 new range doesn't take up more than 10% of capacity
|| data.len() > (self.capacity as usize / 10)
|| chunk_byte_indices.len() != (range.end - range.start + 1) as usize
// chunk_byte_indices is guarenteed to be more than 1 element at this point
|| chunk_byte_indices[0] != 0
|| *chunk_byte_indices.last().unwrap() as usize != data.len()
|| !strictly_increasing(chunk_byte_indices)
// assert 1 new range doesn't take up more than 10% of capacity
|| data.len() > (self.capacity as usize / 10)
{
return Err(ChunkCacheError::InvalidArguments);
}
Expand All @@ -352,21 +313,27 @@ impl DiskCache {
let header = CacheFileHeader::new(chunk_byte_indices);
let mut header_buf = Vec::with_capacity(header.header_len());
header.serialize(&mut header_buf)?;
let hash = compute_hash(&header_buf, data);
let checksum = {
let mut hasher = crc32fast::Hasher::new();
hasher.update(&header_buf);
hasher.update(data);
hasher.finalize()
};

let cache_item = CacheItem {
range: range.clone(),
len: (header_buf.len() + data.len()) as u64,
hash,
checksum,
};

let path = self.item_path(key, &cache_item)?;

let mut fw = SafeFileCreator::new(path)?;

fw.write_all(&header_buf)?;
fw.write_all(data)?;
fw.close()?;
{
// write cache item file
let path = self.item_path(key, &cache_item)?;
let mut fw = SafeFileCreator::new(path)?;
fw.write_all(&header_buf)?;
fw.write_all(data)?;
fw.close()?;
}

// evict items after ensuring the file write but before committing to cache state
// to avoid removing new item.
Expand All @@ -387,7 +354,7 @@ impl DiskCache {
let mut overlapping_item_paths = HashSet::new();
let mut total_bytes_rm = 0;
let num_items_rm = to_remove.len();
// removing by index in reverse to guarentee lower-index items aren't shifted/moved
// removing by index in reverse to guarantee lower-index items aren't shifted/moved
for item_idx in to_remove.into_iter().rev() {
let item = items.swap_remove(item_idx);
overlapping_item_paths.insert(self.item_path(key, &item)?);
Expand Down Expand Up @@ -439,31 +406,25 @@ impl DiskCache {

// validate stored data
let path = self.item_path(key, cache_item)?;
let mut r = {
let mut file = if let Ok(file) = File::open(path) {
file
} else {
self.remove_item(key, cache_item)?;
return Ok(false);
};
let md = file.metadata()?;
if md.len() != cache_item.len {
self.remove_item(key, cache_item)?;
return Ok(false);
}
let mut buf = Vec::with_capacity(md.len() as usize);
file.read_to_end(&mut buf)?;
Cursor::new(buf)

let Ok(mut file) = File::open(path) else {
self.remove_item(key, cache_item)?;
return Ok(false);
};
let hash = blake3::Hasher::new().update_reader(&mut r)?.finalize();
if hash != cache_item.hash {
let md = file.metadata()?;
if md.len() != cache_item.len {
self.remove_item(key, cache_item)?;
return Ok(false);
}
r.seek(SeekFrom::Start(0))?;
let header = if let Ok(header) = CacheFileHeader::deserialize(&mut r) {
header
} else {
let mut buf = Vec::with_capacity(md.len() as usize);
file.read_to_end(&mut buf)?;
let checksum = crc32fast::hash(&buf);
if checksum != cache_item.checksum {
self.remove_item(key, cache_item)?;
return Ok(false);
}
let mut reader = Cursor::new(buf);
let Ok(header) = CacheFileHeader::deserialize(&mut reader) else {
self.remove_item(key, cache_item)?;
return Ok(false);
};
Expand All @@ -489,7 +450,7 @@ impl DiskCache {
}
}

let stored_data = get_range_from_cache_file(&header, &mut r, range, cache_item.range.start)?;
let stored_data = get_range_from_cache_file(&header, &mut reader, range, cache_item.range.start)?;
if data != stored_data {
return Err(ChunkCacheError::InvalidArguments);
}
Expand Down Expand Up @@ -617,15 +578,6 @@ fn get_range_from_cache_file<R: Read + Seek>(
Ok(buf)
}

fn compute_hash(header: &[u8], data: &[u8]) -> blake3::Hash {
blake3::Hasher::new().update(header).update(data).finalize()
}

// OLD
// fn compute_hash_from_reader(r: &mut impl Read) -> Result<blake3::Hash, ChunkCacheError> {
// Ok(blake3::Hasher::new().update_reader(r)?.finalize())
// }

// wrapper over std::fs::read_dir
// returns Ok(None) on a not found error
fn read_dir(path: impl AsRef<Path>) -> OptionResult<std::fs::ReadDir, ChunkCacheError> {
Expand Down
Loading

0 comments on commit 0f61841

Please sign in to comment.