Skip to content

Commit

Permalink
fix fail case
Browse files Browse the repository at this point in the history
  • Loading branch information
assafvayner committed Oct 1, 2024
1 parent 5219929 commit 3be9a27
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 63 deletions.
89 changes: 68 additions & 21 deletions chunk_cache/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
io::{Read, Seek, Write},
mem::size_of,
path::{Path, PathBuf},
time::SystemTime,
};

use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE, Engine};
Expand Down Expand Up @@ -108,20 +107,32 @@ impl DiskCache {
.filter(|((k, start, end), _)| *start <= range.start && *end >= range.end && k == key)
.peekable();
let hit_option = hits.peek();
let ((key, start, end), (file_name, _)) = match hit_option {
let ((key, start, end), (file_name, len)) = match hit_option {
Some(v) => v,
None => return Ok(None),
};
let file_path = self.cache_file_path(key, file_name);
let mut file = File::open(&file_path)?;
let file_result = File::open(&file_path);

let mut file = match file_result {
Ok(file) => file,
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => {
// cache item got evicted by another process
self.total_bytes -= len;
self.items.remove(&(key.clone(), *start, *end));
return Ok(None);
}
_ => return Err(e.into()),
},
};
let hash = blake3::Hasher::new().update_reader(&mut file)?.finalize();
if hash != file_name.hash {
std::fs::remove_file(&file_path)?;
return Err(ChunkCacheError::parse("file checksum mismatch"));
}

let result = self.hit(&mut file, *start, range)?;
self.update_time_stamp(&(key.clone(), *start, *end))?;

Ok(Some(result))
}
Expand All @@ -133,6 +144,7 @@ impl DiskCache {
range: &Range,
) -> Result<Vec<u8>, ChunkCacheError> {
let header = CacheFileHeader::deserialize(file)?;

let start_byte_index = header
.chunk_byte_indicies
.get((range.start - start) as usize)
Expand All @@ -141,8 +153,10 @@ impl DiskCache {
.chunk_byte_indicies
.get((range.end - start) as usize)
.ok_or(ChunkCacheError::BadRange)?;

let start_position = (header.header_len + *start_byte_index as usize) as u64;
file.seek(std::io::SeekFrom::Start(start_position))?;

let len = (end_byte_index - start_byte_index) as usize;
let mut result = vec![0; len];
file.read_exact(&mut result)?;
Expand Down Expand Up @@ -175,7 +189,6 @@ impl DiskCache {
.filter(|(k, start, end)| k == key && *start <= range.start && *end >= range.end)
.collect::<Vec<_>>();
if !matches.is_empty() {
self.update_time_stamp(&matches[0].clone())?;
return Ok(());
}

Expand All @@ -190,7 +203,7 @@ impl DiskCache {
.update(&header_buf)
.update(data)
.finalize();
let file_name = FileName::new(range.start, range.end, SystemTime::now(), hash);
let file_name = FileName::new(range.start, range.end, hash);
let file_path = Path::join(dir, Into::<PathBuf>::into(&file_name));

if !dir.exists() {
Expand Down Expand Up @@ -255,20 +268,6 @@ impl DiskCache {
fn cache_file_path(&self, key: &Key, file_name: &FileName) -> PathBuf {
cache_file_path(self.cache_root.clone(), key, file_name)
}

fn update_time_stamp(&mut self, item_key: &(Key, u32, u32)) -> Result<(), ChunkCacheError> {
let cache_root = self.cache_root.clone();
let (file_name, _) = self
.items
.get_mut(item_key)
.ok_or(ChunkCacheError::Infallible)?;
let old_file_path = cache_file_path(cache_root.clone(), &item_key.0, file_name);
file_name.timestamp = SystemTime::now();
let new_file_path = cache_file_path(cache_root, &item_key.0, file_name);
std::fs::rename(old_file_path, new_file_path)?;

Ok(())
}
}

fn cache_file_path(cache_root: PathBuf, key: &Key, file_name: &FileName) -> PathBuf {
Expand Down Expand Up @@ -401,6 +400,8 @@ mod test_utils {

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use crate::disk_cache::test_utils::*;

use cas_types::Range;
Expand Down Expand Up @@ -493,7 +494,7 @@ mod tests {

#[test]
fn test_initialize_non_empty() {
let cache_root = TempDir::new("puts_eviction").unwrap();
let cache_root = TempDir::new("initialize_non_empty").unwrap();
let mut cache = DiskCache::initialize(cache_root.path(), DEFAULT_CAPACITY).unwrap();

let mut keys_and_ranges = Vec::new();
Expand All @@ -509,6 +510,10 @@ mod tests {
assert!(get_result.is_ok(), "{i} {get_result:?}");
assert!(get_result.unwrap().is_some(), "{i}");
}

let cache_keys = cache.items.keys().collect::<BTreeSet<_>>();
let cache2_keys = cache2.items.keys().collect::<BTreeSet<_>>();
assert_eq!(cache_keys, cache2_keys);
}

#[test]
Expand All @@ -517,4 +522,46 @@ mod tests {
let key = subdir_to_key(s);
assert!(key.is_ok(), "{key:?}")
}

#[test]
fn test_unknown_eviction() {
let cache_root = TempDir::new("initialize_non_empty").unwrap();
let capacity = Some(2 * RANGE_LEN as u64);
let mut cache = DiskCache::initialize(cache_root.path(), capacity).unwrap();
let (key, range, chunk_byte_indicies, data) = RandomEntryIterator.next().unwrap();
cache
.put(&key, &range, &chunk_byte_indicies, &data)
.unwrap();

let mut cache2 = DiskCache::initialize(cache_root.path(), capacity).unwrap();
let get_result = cache2.get(&key, &range);
assert!(get_result.is_ok());
assert!(get_result.unwrap().is_some());

let (key2, range2, chunk_byte_indicies2, data2) = RandomEntryIterator.next().unwrap();
assert!(cache2
.put(&key2, &range2, &chunk_byte_indicies2, &data2)
.is_ok());

let mut get_result_1 = cache2.get(&key, &range).unwrap();
let get_result_2 = cache2.get(&key2, &range2).unwrap();
assert!(get_result_1.is_some() != get_result_2.is_some());
let mut i = 0;
while get_result_1.is_some() && i < 10 {
i += 1;
let (key2, range2, chunk_byte_indicies2, data2) = RandomEntryIterator.next().unwrap();
assert!(cache2
.put(&key2, &range2, &chunk_byte_indicies2, &data2)
.is_ok());
get_result_1 = cache2.get(&key, &range).unwrap();
}
if get_result_1.is_some() {
// randomness didn't evict the record after 10 tries, don't test this case now
}
// we've evicted the original record from the cache
// note using the original cache handle without updates!
let get_result_post_eviction = cache.get(&key, &range);
assert!(get_result_post_eviction.is_ok());
assert!(get_result_post_eviction.unwrap().is_none());
}
}
50 changes: 8 additions & 42 deletions chunk_cache/src/disk_cache/file_name.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use std::{
mem::size_of,
path::PathBuf,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use std::{mem::size_of, path::PathBuf};

use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE, Engine};
use blake3::Hash;
use error_printer::ErrorPrinter;

use crate::error::ChunkCacheError;

Expand All @@ -17,21 +12,19 @@ const BASE64_ENGINE: GeneralPurpose = BASE64_URL_SAFE;
pub struct FileName {
pub start_idx: u32,
pub end_idx: u32,
pub timestamp: SystemTime,
pub hash: blake3::Hash,
}

/// length of the total data making up the file name
/// start_index, end_index, timestamp (unix u64), hash of the content
/// start_index, end_index, hash of the content
const FILE_NAME_LEN_PRE_BAS64: usize =
size_of::<u32>() + size_of::<u32>() + size_of::<u64>() + size_of::<blake3::Hash>();
size_of::<u32>() + size_of::<u32>() + size_of::<blake3::Hash>();

impl FileName {
pub fn new(start_idx: u32, end_idx: u32, timestamp: SystemTime, hash: Hash) -> Self {
pub fn new(start_idx: u32, end_idx: u32, hash: Hash) -> Self {
Self {
start_idx,
end_idx,
timestamp,
hash,
}
}
Expand All @@ -45,32 +38,22 @@ impl FileName {
}
let start_idx = u32::from_le_bytes(buf[0..4].try_into()?);
let end_idx = u32::from_le_bytes(buf[4..8].try_into()?);
let timestamp =
UNIX_EPOCH + Duration::from_millis(u64::from_le_bytes(buf[8..16].try_into()?));
let hash = blake3::Hash::from_bytes(buf[16..].try_into()?);
let hash = blake3::Hash::from_bytes(buf[8..].try_into()?);

Ok(FileName {
start_idx,
end_idx,
timestamp,
hash,
})
}
}

impl From<&FileName> for String {
fn from(value: &FileName) -> Self {
let timestamp = value
.timestamp
.duration_since(UNIX_EPOCH)
.log_error("filename has an invalid timestamp")
.unwrap_or_default()
.as_millis() as u64;
let mut buf = [0u8; FILE_NAME_LEN_PRE_BAS64];
buf[0..4].copy_from_slice(&value.start_idx.to_le_bytes());
buf[4..8].copy_from_slice(&value.end_idx.to_le_bytes());
buf[8..16].copy_from_slice(&timestamp.to_le_bytes());
buf[16..].copy_from_slice(value.hash.as_bytes());
buf[8..].copy_from_slice(value.hash.as_bytes());
BASE64_ENGINE.encode(buf)
}
}
Expand All @@ -83,10 +66,7 @@ impl From<&FileName> for PathBuf {

#[cfg(test)]
mod tests {
use std::{
path::PathBuf,
time::{SystemTime, UNIX_EPOCH},
};
use std::path::PathBuf;

use blake3::{Hash, OUT_LEN};

Expand All @@ -97,25 +77,11 @@ mod tests {
let f = FileName {
start_idx: 100,
end_idx: 1000,
timestamp: SystemTime::now(),
hash: Hash::from_bytes([6u8; OUT_LEN]),
};
let encoded: PathBuf = (&f).into();
let decoded_result = FileName::try_parse(encoded.file_name().unwrap().as_encoded_bytes());
assert!(decoded_result.is_ok());
let FileName {
start_idx,
end_idx,
timestamp,
hash,
} = decoded_result.unwrap();
assert_eq!(start_idx, f.start_idx);
assert_eq!(end_idx, f.end_idx);
assert_eq!(hash, f.hash);

// checking 1 by 1 because the timestamp nanos may shift a little bit by conversion to unix timestamp millis
let millis =
|t: SystemTime| -> u64 { t.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 };
assert_eq!(millis(timestamp), millis(f.timestamp));
assert_eq!(decoded_result.unwrap(), f);
}
}

0 comments on commit 3be9a27

Please sign in to comment.