From 235cea4b89d47e729e34aa6c74da24e8d2c5e22d Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Fri, 27 Sep 2024 17:30:01 -0700 Subject: [PATCH 1/9] dissapointing save --- Cargo.lock | 14 + Cargo.toml | 5 +- chunk_cache/Cargo.toml | 14 + chunk_cache/src/disk_cache.rs | 247 ++++++++++++++++++ .../src/disk_cache/cache_file_header.rs | 52 ++++ chunk_cache/src/disk_cache/file_name.rs | 84 ++++++ chunk_cache/src/error.rs | 27 ++ chunk_cache/src/lib.rs | 16 ++ 8 files changed, 456 insertions(+), 3 deletions(-) create mode 100644 chunk_cache/Cargo.toml create mode 100644 chunk_cache/src/disk_cache.rs create mode 100644 chunk_cache/src/disk_cache/cache_file_header.rs create mode 100644 chunk_cache/src/disk_cache/file_name.rs create mode 100644 chunk_cache/src/error.rs create mode 100644 chunk_cache/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 9b06f5a2..9fad6382 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -554,6 +554,20 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "chunk_cache" +version = "0.1.0" +dependencies = [ + "base64 0.22.1", + "blake3", + "cas_types", + "error_printer", + "merklehash", + "rand 0.8.5", + "tracing", + "xet_error", +] + [[package]] name = "clap" version = "2.34.0" diff --git a/Cargo.toml b/Cargo.toml index 60031d16..a39c7578 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,11 +21,10 @@ members = [ "xet_error", "cas_object", "cas_types", + "chunk_cache", ] -exclude = [ - "hf_xet", -] +exclude = ["hf_xet"] [profile.release] opt-level = 3 diff --git a/chunk_cache/Cargo.toml b/chunk_cache/Cargo.toml new file mode 100644 index 00000000..2ee3db65 --- /dev/null +++ b/chunk_cache/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "chunk_cache" +version = "0.1.0" +edition = "2021" + +[dependencies] +cas_types = { path = "../cas_types" } +merklehash = { path = "../merklehash" } +error_printer = { path = "../error_printer" } +xet_error = { path = "../xet_error" } +base64 = "0.22.1" +blake3 = "1.5.4" +tracing = "0.1.40" +rand = "0.8.5" diff --git a/chunk_cache/src/disk_cache.rs b/chunk_cache/src/disk_cache.rs new file mode 100644 index 00000000..3565b027 --- /dev/null +++ b/chunk_cache/src/disk_cache.rs @@ -0,0 +1,247 @@ +use std::{ + collections::HashMap, + fs::File, + io::{Read, Seek, Write}, + os::unix::ffi::OsStrExt, + path::{Path, PathBuf}, + time::SystemTime, +}; + +use base64::prelude::*; +use cache_file_header::CacheFileHeader; +use cas_types::{Key, Range}; +use error_printer::ErrorPrinter; +use file_name::FileName; +use rand::{rngs::ThreadRng, seq::IteratorRandom}; +use tracing::warn; + +use crate::{error::ChunkCacheError, ChunkCache}; + +mod cache_file_header; +mod file_name; + +pub struct DiskCache { + cache_root: PathBuf, + total_bytes: u64, + items: HashMap, + capacity: Option, + rng: ThreadRng, +} + +impl DiskCache { + pub fn initialize>( + cache_root: T, + capacity: Option, + ) -> Result { + let cache_root = cache_root.into(); + let mut total_bytes = 0; + let mut items = HashMap::new(); + for subdir in std::fs::read_dir(&cache_root)? { + let subdir = subdir?; + let md = subdir.metadata()?; + if !md.is_dir() { + warn!("found a non directory in cache_root directory"); + continue; + } + for key_file in std::fs::read_dir(subdir.path())? { + let key_file = key_file?; + let md = key_file.metadata()?; + if !md.is_file() { + warn!("found non file under key dir: {:?}", key_file.path()); + continue; + } + let item: PathBuf = key_file + .path() + .strip_prefix(&cache_root) + .map(Path::to_path_buf) + .log_error("path under directory didn't strip prefix") + .unwrap_or_default(); + items.insert(item, md.len()); + total_bytes += md.len(); + } + } + + // ensures we only handle a real capacity + let capacity = capacity.and_then(|cap| if cap == 0 { None } else { Some(cap) }); + + Ok(Self { + cache_root, + total_bytes, + capacity, + items, + rng: rand::thread_rng(), + }) + } + + fn get_impl( + &mut self, + key: &Key, + range: &Range, + ) -> Result>, crate::error::ChunkCacheError> { + let dir = self.cache_root.join(key_to_subdir(key)); + + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + let md = entry.metadata()?; + if !md.is_file() { + continue; + } + let file_name = FileName::try_parse(entry.file_name().as_bytes())?; + if range.start >= file_name.start_idx && range.end <= file_name.end_idx { + let result = self.hit(&entry.path(), file_name, range)?; + return Ok(Some(result)); + } + } + Ok(None) + } + + fn hit( + &mut self, + path: &PathBuf, + mut file_name: FileName, + range: &Range, + ) -> Result, ChunkCacheError> { + let mut file = File::open(path)?; + let hash = blake3::Hasher::new().update_reader(&mut file)?.finalize(); + if hash != file_name.hash { + std::fs::remove_file(path)?; + return Err(ChunkCacheError::parse("file checksum mismatch")); + } + + let header = CacheFileHeader::deserialize(&mut file)?; + let start_byte_index = header + .chunk_byte_indicies + .get(range.start as usize) + .ok_or(ChunkCacheError::BadRange)?; + let end_byte_index = header + .chunk_byte_indicies + .get(range.end as usize) + .ok_or(ChunkCacheError::BadRange)?; + let start_position = (header.header_len + *start_byte_index as usize) as u64; + file.seek(std::io::SeekFrom::Start(start_position))?; + let len = (end_byte_index - start_byte_index) as usize; + let mut result = vec![0; len]; + file.read_exact(&mut result)?; + + // update last used timestamp in file name + file_name.timestamp = SystemTime::now(); + let key_dir = path + .parent() + .ok_or_else(|| ChunkCacheError::parse("failed to get key dir"))?; + let new_path = key_dir.join(Into::::into(&file_name)); + std::fs::rename(path, &new_path)?; + + if let Some(size) = self.items.remove(&self.items_key(path)) { + self.items.insert(self.items_key(new_path), size); + } + + Ok(result) + } + + fn put_impl( + &mut self, + key: &Key, + range: &Range, + chunk_byte_indicies: &[u32], + data: &[u8], + ) -> Result<(), ChunkCacheError> { + let dir = &self.cache_root.join(key_to_subdir(key)); + + let header = CacheFileHeader::new(chunk_byte_indicies); + let mut header_buf = Vec::with_capacity(header.header_len); + header.serialize(&mut header_buf)?; + + let hash = blake3::Hasher::new() + .update(&header_buf) + .update(data) + .finalize(); + let file_name = FileName::new(range.start, range.end, SystemTime::now(), hash); + let file_path = Path::join(dir, Into::::into(&file_name)); + + if !std::fs::exists(dir)? { + std::fs::create_dir_all(dir)?; + } + let mut file = std::fs::File::create(&file_path)?; + file.write_all(&header_buf)?; + file.write_all(data)?; + let len = (header_buf.len() + data.len()) as u64; + self.total_bytes += len; + self.items.insert(self.items_key(&file_path), len); + + self.maybe_evict()?; + + Ok(()) + } + + fn maybe_evict(&mut self) -> Result<(), ChunkCacheError> { + let capacity = match self.capacity { + Some(cap) => cap, + None => return Ok(()), + }; + if capacity <= self.total_bytes { + return Ok(()); + } + + while capacity > self.total_bytes { + self.maybe_evict_one()? + } + + Ok(()) + } + + // assumes the cache is in the right state for eviction + fn maybe_evict_one(&mut self) -> Result<(), ChunkCacheError> { + let key = if let Some(key) = self.items.keys().choose(&mut self.rng) { + key.clone() + } else { + return Err(ChunkCacheError::CacheEmpty); + }; + if let Some(len) = self.items.remove(&key) { + self.total_bytes -= len; + } + let path = self.cache_root.join(&key); + std::fs::remove_file(path)?; + + Ok(()) + } + + fn items_key>(&self, path: P) -> PathBuf { + items_key(&self.cache_root, &path) + } +} + +fn items_key>(cache_root: &PathBuf, path: P) -> PathBuf { + path.as_ref() + .strip_prefix(cache_root) + .log_error(format!( + "path should be under prefix, but is not, returning empty, path: {:?}, prefix: {:?}", + path.as_ref(), + cache_root, + )) + .map(Path::to_path_buf) + .unwrap_or_default() +} + +impl ChunkCache for DiskCache { + fn get( + &mut self, + key: &Key, + range: &Range, + ) -> Result>, crate::error::ChunkCacheError> { + self.get_impl(key, range) + } + + fn put( + &mut self, + key: &Key, + range: &Range, + chunk_byte_indicies: &Vec, + data: &[u8], + ) -> Result<(), crate::error::ChunkCacheError> { + self.put_impl(key, range, chunk_byte_indicies, data) + } +} + +fn key_to_subdir(key: &Key) -> String { + BASE64_STANDARD_NO_PAD.encode(key.to_string().as_bytes()) +} diff --git a/chunk_cache/src/disk_cache/cache_file_header.rs b/chunk_cache/src/disk_cache/cache_file_header.rs new file mode 100644 index 00000000..72f0e727 --- /dev/null +++ b/chunk_cache/src/disk_cache/cache_file_header.rs @@ -0,0 +1,52 @@ +use std::io::{Read, Seek, Write}; + +use crate::error::ChunkCacheError; + +pub struct CacheFileHeader { + pub chunk_byte_indicies: Vec, + pub header_len: usize, +} + +impl CacheFileHeader { + pub fn new>>(chunk_byte_indicies: T) -> Self { + let chunk_byte_indicies = chunk_byte_indicies.into(); + let header_len = (chunk_byte_indicies.len() + 1) * size_of::(); + Self { + chunk_byte_indicies, + header_len, + } + } + + pub fn deserialize(reader: &mut R) -> Result { + reader.seek(std::io::SeekFrom::Start(0))?; + let chunk_byte_indicies_len = read_u32(reader)?; + let mut chunk_byte_indicies = Vec::with_capacity(chunk_byte_indicies_len as usize); + for _ in 0..chunk_byte_indicies_len { + let idx = read_u32(reader)?; + chunk_byte_indicies.push(idx); + } + Ok(Self { + chunk_byte_indicies, + header_len: (chunk_byte_indicies_len as usize + 1) * size_of::(), + }) + } + + pub fn serialize(&self, writer: &mut W) -> Result { + let mut num_written = write_u32(writer, self.chunk_byte_indicies.len() as u32)?; + for idx in &self.chunk_byte_indicies { + num_written += write_u32(writer, *idx)?; + } + Ok(num_written) + } +} + +pub fn read_u32(reader: &mut R) -> Result { + let mut buf = [0u8; size_of::()]; + reader.read_exact(&mut buf[..])?; + Ok(u32::from_le_bytes(buf)) +} + +pub fn write_u32(writer: &mut W, v: u32) -> Result { + writer.write_all(&v.to_le_bytes())?; + Ok(size_of::()) +} diff --git a/chunk_cache/src/disk_cache/file_name.rs b/chunk_cache/src/disk_cache/file_name.rs new file mode 100644 index 00000000..85e16828 --- /dev/null +++ b/chunk_cache/src/disk_cache/file_name.rs @@ -0,0 +1,84 @@ +use std::{ + path::PathBuf, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use base64::{prelude::BASE64_STANDARD_NO_PAD, Engine}; +use blake3::Hash; +use error_printer::ErrorPrinter; + +use crate::error::ChunkCacheError; + +/// A file name is represented as the start index and end index of chunks for the given xorb +/// and a timestamp of last successful access or put +#[derive(Debug)] +pub struct FileName { + pub start_idx: u32, + pub end_idx: u32, + pub timestamp: SystemTime, + pub hash: blake3::Hash, +} + +const FILE_NAME_LEN: usize = size_of::() * 2 + size_of::() + size_of::(); + +impl FileName { + pub fn new(start_idx: u32, end_idx: u32, timestamp: SystemTime, hash: Hash) -> Self { + Self { + start_idx, + end_idx, + timestamp, + hash, + } + } + + pub fn try_parse>(file_name: T) -> Result { + let mut buf = Vec::with_capacity(FILE_NAME_LEN); + BASE64_STANDARD_NO_PAD + .decode_slice(file_name, &mut buf) + .map_err(ChunkCacheError::parse)?; + if buf.len() != FILE_NAME_LEN { + return Err(ChunkCacheError::parse("invalid size of decoded buffer")); + } + let start_idx = u32::from_le_bytes(buf[0..4].try_into()?); + let end_idx = u32::from_le_bytes(buf[4..8].try_into()?); + let timestamp = + UNIX_EPOCH + Duration::from_millis(u64::from_le_bytes(buf[8..16].try_into()?)); + let hash = blake3::Hash::from_bytes(buf[16..].try_into()?); + + Ok(FileName { + start_idx, + end_idx, + timestamp, + hash, + }) + } +} + +impl Into for &FileName { + fn into(self) -> String { + let timestamp = self + .timestamp + .duration_since(UNIX_EPOCH) + .log_error("filename has an invalid timestamp") + .unwrap_or_default() + .as_millis() as u64; + let mut buf = [0u8; FILE_NAME_LEN]; + buf[0..4].copy_from_slice(&self.start_idx.to_le_bytes()); + buf[4..8].copy_from_slice(&self.end_idx.to_le_bytes()); + buf[8..16].copy_from_slice(×tamp.to_le_bytes()); + buf[16..].copy_from_slice(self.hash.as_bytes()); + BASE64_STANDARD_NO_PAD.encode(buf) + } +} + +impl ToString for FileName { + fn to_string(&self) -> String { + self.into() + } +} + +impl Into for &FileName { + fn into(self) -> PathBuf { + PathBuf::from(self.to_string()) + } +} diff --git a/chunk_cache/src/error.rs b/chunk_cache/src/error.rs new file mode 100644 index 00000000..92e21028 --- /dev/null +++ b/chunk_cache/src/error.rs @@ -0,0 +1,27 @@ +use std::array::TryFromSliceError; + +use xet_error::Error; + +#[derive(Debug, Error)] +pub enum ChunkCacheError { + #[error("IO: {0}")] + IO(#[from] std::io::Error), + #[error("ParseError: {0}")] + Parse(String), + #[error("bad range")] + BadRange, + #[error("cache is empty when it is presumed no empty")] + CacheEmpty, +} + +impl ChunkCacheError { + pub fn parse(value: T) -> ChunkCacheError { + ChunkCacheError::Parse(value.to_string()) + } +} + +impl From for ChunkCacheError { + fn from(value: TryFromSliceError) -> Self { + ChunkCacheError::parse(value) + } +} diff --git a/chunk_cache/src/lib.rs b/chunk_cache/src/lib.rs new file mode 100644 index 00000000..e65b8ad5 --- /dev/null +++ b/chunk_cache/src/lib.rs @@ -0,0 +1,16 @@ +pub mod error; +mod disk_cache; + +use cas_types::{Key, Range}; +use error::ChunkCacheError; + +pub trait ChunkCache { + fn get(&mut self, key: &Key, range: &Range) -> Result>, ChunkCacheError>; + fn put( + &mut self, + key: &Key, + range: &Range, + chunk_byte_indicies: &Vec, + data: &[u8], + ) -> Result<(), ChunkCacheError>; +} From b26f89f03a328c4501c715dc4dad27abd4773f21 Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Mon, 30 Sep 2024 17:23:45 -0700 Subject: [PATCH 2/9] updates --- Cargo.lock | 1 + chunk_cache/Cargo.toml | 3 + chunk_cache/src/disk_cache.rs | 390 +++++++++++++++++++----- chunk_cache/src/disk_cache/file_name.rs | 24 +- chunk_cache/src/error.rs | 2 + chunk_cache/src/lib.rs | 2 +- 6 files changed, 339 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9fad6382..d48f5ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -564,6 +564,7 @@ dependencies = [ "error_printer", "merklehash", "rand 0.8.5", + "tempdir", "tracing", "xet_error", ] diff --git a/chunk_cache/Cargo.toml b/chunk_cache/Cargo.toml index 2ee3db65..758e761a 100644 --- a/chunk_cache/Cargo.toml +++ b/chunk_cache/Cargo.toml @@ -12,3 +12,6 @@ base64 = "0.22.1" blake3 = "1.5.4" tracing = "0.1.40" rand = "0.8.5" + +[dev-dependencies] +tempdir = "0.3.7" diff --git a/chunk_cache/src/disk_cache.rs b/chunk_cache/src/disk_cache.rs index 3565b027..89998ecf 100644 --- a/chunk_cache/src/disk_cache.rs +++ b/chunk_cache/src/disk_cache.rs @@ -2,17 +2,17 @@ use std::{ collections::HashMap, fs::File, io::{Read, Seek, Write}, - os::unix::ffi::OsStrExt, path::{Path, PathBuf}, time::SystemTime, }; -use base64::prelude::*; +use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE_NO_PAD, Engine}; use cache_file_header::CacheFileHeader; use cas_types::{Key, Range}; use error_printer::ErrorPrinter; use file_name::FileName; -use rand::{rngs::ThreadRng, seq::IteratorRandom}; +use merklehash::MerkleHash; +use rand::{seq::IteratorRandom, thread_rng}; use tracing::warn; use crate::{error::ChunkCacheError, ChunkCache}; @@ -20,12 +20,14 @@ use crate::{error::ChunkCacheError, ChunkCache}; mod cache_file_header; mod file_name; +const BASE64_ENGINE: GeneralPurpose = BASE64_URL_SAFE_NO_PAD; + +#[derive(Debug, Clone)] pub struct DiskCache { cache_root: PathBuf, total_bytes: u64, - items: HashMap, + items: HashMap<(Key, u32, u32), (FileName, u64)>, capacity: Option, - rng: ThreadRng, } impl DiskCache { @@ -38,6 +40,16 @@ impl DiskCache { let mut items = HashMap::new(); for subdir in std::fs::read_dir(&cache_root)? { let subdir = subdir?; + + let key_result = + subdir_to_key(subdir.file_name().as_encoded_bytes()).warn_error(format!( + "expected subdir: {:?} to be parsable as a key", + subdir.file_name() + )); + let key = match key_result { + Ok(k) => k, + Err(_) => continue, + }; let md = subdir.metadata()?; if !md.is_dir() { warn!("found a non directory in cache_root directory"); @@ -45,18 +57,30 @@ impl DiskCache { } for key_file in std::fs::read_dir(subdir.path())? { let key_file = key_file?; + let file_name_result = FileName::try_parse(key_file.file_name().as_encoded_bytes()) + .warn_error(format!( + "expected file name: {:?} to be parsed as a cache FileName", + key_file.file_name() + )); + let item_file_name = match file_name_result { + Ok(f) => f, + Err(_) => continue, + }; + let md = key_file.metadata()?; if !md.is_file() { warn!("found non file under key dir: {:?}", key_file.path()); continue; } - let item: PathBuf = key_file - .path() - .strip_prefix(&cache_root) - .map(Path::to_path_buf) - .log_error("path under directory didn't strip prefix") - .unwrap_or_default(); - items.insert(item, md.len()); + + items.insert( + ( + key.clone(), + item_file_name.start_idx, + item_file_name.end_idx, + ), + (item_file_name, md.len()), + ); total_bytes += md.len(); } } @@ -69,7 +93,6 @@ impl DiskCache { total_bytes, capacity, items, - rng: rand::thread_rng(), }) } @@ -78,37 +101,32 @@ impl DiskCache { key: &Key, range: &Range, ) -> Result>, crate::error::ChunkCacheError> { - let dir = self.cache_root.join(key_to_subdir(key)); - - for entry in std::fs::read_dir(dir)? { - let entry = entry?; - let md = entry.metadata()?; - if !md.is_file() { - continue; - } - let file_name = FileName::try_parse(entry.file_name().as_bytes())?; - if range.start >= file_name.start_idx && range.end <= file_name.end_idx { - let result = self.hit(&entry.path(), file_name, range)?; - return Ok(Some(result)); - } - } - Ok(None) - } - - fn hit( - &mut self, - path: &PathBuf, - mut file_name: FileName, - range: &Range, - ) -> Result, ChunkCacheError> { - let mut file = File::open(path)?; + let mut hits = self + .items + .iter() + .filter(|((k, start, end), _)| *start <= range.start && *end >= range.end && k == key) + .peekable(); + let hit_option = hits.peek(); + let ((key, start, end), (file_name, _)) = match hit_option { + Some(v) => v, + None => return Ok(None), + }; + let file_path = self.cache_file_path(key, file_name); + let mut file = File::open(&file_path)?; let hash = blake3::Hasher::new().update_reader(&mut file)?.finalize(); if hash != file_name.hash { - std::fs::remove_file(path)?; + std::fs::remove_file(&file_path)?; return Err(ChunkCacheError::parse("file checksum mismatch")); } - let header = CacheFileHeader::deserialize(&mut file)?; + let result = self.hit(&mut file, range)?; + self.update_time_stamp(&(key.clone(), *start, *end))?; + + Ok(Some(result)) + } + + fn hit(&self, file: &mut R, range: &Range) -> Result, ChunkCacheError> { + let header = CacheFileHeader::deserialize(file)?; let start_byte_index = header .chunk_byte_indicies .get(range.start as usize) @@ -123,18 +141,6 @@ impl DiskCache { let mut result = vec![0; len]; file.read_exact(&mut result)?; - // update last used timestamp in file name - file_name.timestamp = SystemTime::now(); - let key_dir = path - .parent() - .ok_or_else(|| ChunkCacheError::parse("failed to get key dir"))?; - let new_path = key_dir.join(Into::::into(&file_name)); - std::fs::rename(path, &new_path)?; - - if let Some(size) = self.items.remove(&self.items_key(path)) { - self.items.insert(self.items_key(new_path), size); - } - Ok(result) } @@ -145,6 +151,29 @@ impl DiskCache { chunk_byte_indicies: &[u32], data: &[u8], ) -> Result<(), ChunkCacheError> { + /* // match for coalescing opportunities + let matches = self + .items + .keys() + .filter(|(k, start, end)| { + k == key + && ((*start..=*end).contains(&range.start) + || (*start..=*end).contains(&range.end)) + }) + .cloned() + .collect::>(); + */ + let matches = self + .items + .keys() + .filter(|(k, start, end)| k == key && *start <= range.start && *end >= range.end) + .collect::>(); + if matches.len() > 0 { + self.update_time_stamp(&matches[0].clone())?; + return Ok(()); + } + + // create new cache file let dir = &self.cache_root.join(key_to_subdir(key)); let header = CacheFileHeader::new(chunk_byte_indicies); @@ -161,13 +190,17 @@ impl DiskCache { if !std::fs::exists(dir)? { std::fs::create_dir_all(dir)?; } - let mut file = std::fs::File::create(&file_path)?; + let mut file = std::fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&file_path)?; file.write_all(&header_buf)?; file.write_all(data)?; let len = (header_buf.len() + data.len()) as u64; self.total_bytes += len; - self.items.insert(self.items_key(&file_path), len); - + self.items + .insert((key.clone(), range.start, range.end), (file_name, len)); self.maybe_evict()?; Ok(()) @@ -178,12 +211,16 @@ impl DiskCache { Some(cap) => cap, None => return Ok(()), }; - if capacity <= self.total_bytes { + + if self.total_bytes <= capacity { return Ok(()); } - while capacity > self.total_bytes { - self.maybe_evict_one()? + // evict max 10 items + let mut i = 0; + while capacity <= self.total_bytes && i < 10 { + i += 1; + self.maybe_evict_one()?; } Ok(()) @@ -191,35 +228,47 @@ impl DiskCache { // assumes the cache is in the right state for eviction fn maybe_evict_one(&mut self) -> Result<(), ChunkCacheError> { - let key = if let Some(key) = self.items.keys().choose(&mut self.rng) { + let items_key = if let Some(key) = self.items.keys().choose(&mut thread_rng()) { key.clone() } else { return Err(ChunkCacheError::CacheEmpty); }; - if let Some(len) = self.items.remove(&key) { + if let Some((file_name, len)) = self.items.remove(&items_key) { self.total_bytes -= len; + let mut path = self.cache_file_path(&items_key.0, &file_name); + std::fs::remove_file(&path)?; + path.pop(); + if std::fs::read_dir(&path)?.next().is_none() { + std::fs::remove_dir(&path)?; + } } - let path = self.cache_root.join(&key); - std::fs::remove_file(path)?; Ok(()) } - fn items_key>(&self, path: P) -> PathBuf { - items_key(&self.cache_root, &path) + fn cache_file_path(&self, key: &Key, file_name: &FileName) -> PathBuf { + cache_file_path(self.cache_root.clone(), key, file_name) + } + + fn update_time_stamp(&mut self, item_key: &(Key, u32, u32)) -> Result<(), ChunkCacheError> { + let cache_root = self.cache_root.clone(); + let (file_name, _) = self + .items + .get_mut(&item_key) + .ok_or(ChunkCacheError::Infallible)?; + let old_file_path = cache_file_path(cache_root.clone(), &item_key.0, &file_name); + file_name.timestamp = SystemTime::now(); + let new_file_path = cache_file_path(cache_root, &item_key.0, &file_name); + std::fs::rename(old_file_path, new_file_path)?; + + Ok(()) } } -fn items_key>(cache_root: &PathBuf, path: P) -> PathBuf { - path.as_ref() - .strip_prefix(cache_root) - .log_error(format!( - "path should be under prefix, but is not, returning empty, path: {:?}, prefix: {:?}", - path.as_ref(), - cache_root, - )) - .map(Path::to_path_buf) - .unwrap_or_default() +fn cache_file_path(cache_root: PathBuf, key: &Key, file_name: &FileName) -> PathBuf { + cache_root + .join(key_to_subdir(key)) + .join(file_name.as_path_buf()) } impl ChunkCache for DiskCache { @@ -243,5 +292,198 @@ impl ChunkCache for DiskCache { } fn key_to_subdir(key: &Key) -> String { - BASE64_STANDARD_NO_PAD.encode(key.to_string().as_bytes()) + let prefix_bytes = key.prefix.as_bytes(); + let mut buf = vec![0u8; size_of::() + prefix_bytes.len()]; + buf[..size_of::()].copy_from_slice(key.hash.as_bytes()); + buf[size_of::()..].copy_from_slice(prefix_bytes); + BASE64_ENGINE.encode(buf) +} + +fn subdir_to_key>(subdir: T) -> Result { + let mut buf = Vec::new(); + BASE64_ENGINE + .decode_slice(subdir, &mut buf) + .map_err(ChunkCacheError::parse)?; + if buf.len() < 1 + size_of::() { + return Err(ChunkCacheError::parse("decoded too few bytes")); + } + let prefix_buf = buf.split_off(size_of::()); + let hash = + MerkleHash::from_slice(&buf[..size_of::()]).map_err(ChunkCacheError::parse)?; + let prefix = String::from_utf8(prefix_buf) + .map_err(|e| ChunkCacheError::parse(format!("prefix string parse error: {e}")))?; + Ok(Key { prefix, hash }) +} + +#[cfg(test)] +mod test_utils { + use std::path::Path; + + use cas_types::{Key, Range}; + use merklehash::MerkleHash; + use rand::Rng; + + pub const DEFAULT_CAPACITY: Option = Some(16 << 20); + pub const RANGE_LEN: u32 = 4000; + + pub fn print_directory_contents(path: &Path) { + // Read the contents of the directory + match std::fs::read_dir(path) { + Ok(entries) => { + for entry in entries { + match entry { + Ok(entry) => { + let path = entry.path(); + // Print the path + println!("{}", path.display()); + + // If it's a directory, call this function recursively + if path.is_dir() { + print_directory_contents(&path); + println!(""); + } + } + Err(e) => eprintln!("Error reading entry: {}", e), + } + } + } + Err(e) => eprintln!("Error reading directory: {}", e), + } + } + + pub fn random_key() -> Key { + Key { + prefix: "default".to_string(), + hash: MerkleHash::from_slice(&rand::random::<[u8; 32]>()).unwrap(), + } + } + + pub fn random_range() -> Range { + let start = rand::random::() % 1024; + let end = 1024.min(start + rand::random::() % 256); + Range { start, end } + } + + pub fn random_bytes(range: &Range) -> (Vec, Vec) { + let mut rng = rand::thread_rng(); + let random_vec: Vec = (0..RANGE_LEN).map(|_| rng.gen()).collect(); + let mut offsets: Vec = Vec::with_capacity((range.end - range.start + 1) as usize); + offsets.push(0); + for _ in range.start..range.end - 1 { + let mut num = rng.gen::() % RANGE_LEN; + while offsets.contains(&num) { + num = rng.gen::() % RANGE_LEN; + } + offsets.push(num); + } + offsets.push(4000); + offsets.sort(); + (offsets, random_vec) + } + + pub struct RandomEntryIterator; + + impl Iterator for RandomEntryIterator { + type Item = (Key, Range, Vec, Vec); + + fn next(&mut self) -> Option { + let key = random_key(); + let range = random_range(); + let (offsets, data) = random_bytes(&range); + Some((key, range, offsets, data)) + } + } +} + +#[cfg(test)] +mod tests { + use crate::disk_cache::test_utils::*; + + use cas_types::Range; + use tempdir::TempDir; + + use crate::ChunkCache; + + use super::DiskCache; + + #[test] + fn test_get_cache_empty() { + let cache_root = TempDir::new("empty").unwrap(); + let mut cache = DiskCache::initialize(cache_root.path(), DEFAULT_CAPACITY).unwrap(); + assert!(cache.get(&random_key(), &random_range()).unwrap().is_none()); + } + + #[test] + fn test_put_get_simple() { + let cache_root = TempDir::new("put_get_simple").unwrap(); + let mut cache = DiskCache::initialize(cache_root.path(), DEFAULT_CAPACITY).unwrap(); + + let key = random_key(); + let range = Range { start: 0, end: 4 }; + let (chunk_byte_indicies, data) = random_bytes(&range); + let put_result = cache.put(&key, &range, &chunk_byte_indicies, data.as_slice()); + assert!(put_result.is_ok(), "{put_result:?}"); + + print_directory_contents(cache_root.as_ref()); + + // hit + assert!(cache.get(&key, &range).unwrap().is_some()); + let miss_range = Range { + start: 100, + end: 101, + }; + // miss + assert!(cache.get(&key, &miss_range).unwrap().is_none()); + } + + #[test] + fn test_put_get_subrange() { + let cache_root = TempDir::new("put_get_subrange").unwrap(); + let mut cache = DiskCache::initialize(cache_root.path(), DEFAULT_CAPACITY).unwrap(); + + let key = random_key(); + let range = Range { start: 0, end: 4 }; + let (chunk_byte_indicies, data) = random_bytes(&range); + let put_result = cache.put(&key, &range, &chunk_byte_indicies, data.as_slice()); + assert!(put_result.is_ok(), "{put_result:?}"); + + print_directory_contents(cache_root.as_ref()); + + for start in range.start..range.end { + for end in (start + 1)..=range.end { + assert!( + cache.get(&key, &Range { start, end }).unwrap().is_some(), + "range: [{start} {end})" + ); + } + } + } + + #[test] + fn test_puts_eviction() { + const CAP: u64 = (RANGE_LEN * 4) as u64; + let cache_root = TempDir::new("puts_eviction").unwrap(); + let mut cache = DiskCache::initialize(cache_root.path(), Some(CAP)).unwrap(); + + // fill the cache to almost capacity + for _ in 0..3 { + let (key, range, offsets, data) = RandomEntryIterator.next().unwrap(); + assert!(cache.put(&key, &range, &offsets, &data).is_ok()); + } + assert!(cache.total_bytes <= CAP); + + let (key, range, offsets, data) = RandomEntryIterator.next().unwrap(); + assert!(cache.put(&key, &range, &offsets, &data).is_ok()); + assert!(cache.total_bytes <= CAP); + } + + #[test] + fn test_same_puts_noop() { + let cache_root = TempDir::new("puts_eviction").unwrap(); + let mut cache = DiskCache::initialize(cache_root.path(), DEFAULT_CAPACITY).unwrap(); + let (key, range, offsets, data) = RandomEntryIterator.next().unwrap(); + assert!(cache.put(&key, &range, &offsets, &data).is_ok()); + + assert!(cache.put(&key, &range, &offsets, &data).is_ok()); + } } diff --git a/chunk_cache/src/disk_cache/file_name.rs b/chunk_cache/src/disk_cache/file_name.rs index 85e16828..a06d9569 100644 --- a/chunk_cache/src/disk_cache/file_name.rs +++ b/chunk_cache/src/disk_cache/file_name.rs @@ -3,15 +3,16 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use base64::{prelude::BASE64_STANDARD_NO_PAD, Engine}; +use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE_NO_PAD, Engine}; use blake3::Hash; use error_printer::ErrorPrinter; use crate::error::ChunkCacheError; +const BASE64_ENGINE: GeneralPurpose = BASE64_URL_SAFE_NO_PAD; /// A file name is represented as the start index and end index of chunks for the given xorb /// and a timestamp of last successful access or put -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct FileName { pub start_idx: u32, pub end_idx: u32, @@ -19,7 +20,10 @@ pub struct FileName { pub hash: blake3::Hash, } -const FILE_NAME_LEN: usize = size_of::() * 2 + size_of::() + size_of::(); +/// length of the total data making up the file name +/// start_index, end_index, timestamp (unix u64), hash of the content +const FILE_NAME_LEN_PRE_BAS64: usize = + size_of::() + size_of::() + size_of::() + size_of::(); impl FileName { pub fn new(start_idx: u32, end_idx: u32, timestamp: SystemTime, hash: Hash) -> Self { @@ -32,11 +36,11 @@ impl FileName { } pub fn try_parse>(file_name: T) -> Result { - let mut buf = Vec::with_capacity(FILE_NAME_LEN); - BASE64_STANDARD_NO_PAD + let mut buf = Vec::with_capacity(FILE_NAME_LEN_PRE_BAS64); + BASE64_ENGINE .decode_slice(file_name, &mut buf) .map_err(ChunkCacheError::parse)?; - if buf.len() != FILE_NAME_LEN { + if buf.len() != FILE_NAME_LEN_PRE_BAS64 { return Err(ChunkCacheError::parse("invalid size of decoded buffer")); } let start_idx = u32::from_le_bytes(buf[0..4].try_into()?); @@ -52,6 +56,10 @@ impl FileName { hash, }) } + + pub fn as_path_buf(&self) -> PathBuf { + PathBuf::from(self.to_string()) + } } impl Into for &FileName { @@ -62,12 +70,12 @@ impl Into for &FileName { .log_error("filename has an invalid timestamp") .unwrap_or_default() .as_millis() as u64; - let mut buf = [0u8; FILE_NAME_LEN]; + let mut buf = [0u8; FILE_NAME_LEN_PRE_BAS64]; buf[0..4].copy_from_slice(&self.start_idx.to_le_bytes()); buf[4..8].copy_from_slice(&self.end_idx.to_le_bytes()); buf[8..16].copy_from_slice(×tamp.to_le_bytes()); buf[16..].copy_from_slice(self.hash.as_bytes()); - BASE64_STANDARD_NO_PAD.encode(buf) + BASE64_ENGINE.encode(buf) } } diff --git a/chunk_cache/src/error.rs b/chunk_cache/src/error.rs index 92e21028..9c0ca115 100644 --- a/chunk_cache/src/error.rs +++ b/chunk_cache/src/error.rs @@ -12,6 +12,8 @@ pub enum ChunkCacheError { BadRange, #[error("cache is empty when it is presumed no empty")] CacheEmpty, + #[error("Infallible")] + Infallible, } impl ChunkCacheError { diff --git a/chunk_cache/src/lib.rs b/chunk_cache/src/lib.rs index e65b8ad5..34c34e37 100644 --- a/chunk_cache/src/lib.rs +++ b/chunk_cache/src/lib.rs @@ -1,5 +1,5 @@ -pub mod error; mod disk_cache; +pub mod error; use cas_types::{Key, Range}; use error::ChunkCacheError; From 8ffb3baeba112221dfdea728148d6e7a940286d0 Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Tue, 1 Oct 2024 10:30:32 -0700 Subject: [PATCH 3/9] tests and lint --- chunk_cache/src/disk_cache.rs | 76 +++++++++++++++++-------- chunk_cache/src/disk_cache/file_name.rs | 71 ++++++++++++++++------- chunk_cache/src/lib.rs | 4 +- 3 files changed, 106 insertions(+), 45 deletions(-) diff --git a/chunk_cache/src/disk_cache.rs b/chunk_cache/src/disk_cache.rs index 89998ecf..02f33145 100644 --- a/chunk_cache/src/disk_cache.rs +++ b/chunk_cache/src/disk_cache.rs @@ -2,11 +2,12 @@ use std::{ collections::HashMap, fs::File, io::{Read, Seek, Write}, + os::unix::ffi::OsStringExt, path::{Path, PathBuf}, time::SystemTime, }; -use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE_NO_PAD, Engine}; +use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE, Engine}; use cache_file_header::CacheFileHeader; use cas_types::{Key, Range}; use error_printer::ErrorPrinter; @@ -20,7 +21,7 @@ use crate::{error::ChunkCacheError, ChunkCache}; mod cache_file_header; mod file_name; -const BASE64_ENGINE: GeneralPurpose = BASE64_URL_SAFE_NO_PAD; +const BASE64_ENGINE: GeneralPurpose = BASE64_URL_SAFE; #[derive(Debug, Clone)] pub struct DiskCache { @@ -41,11 +42,10 @@ impl DiskCache { for subdir in std::fs::read_dir(&cache_root)? { let subdir = subdir?; - let key_result = - subdir_to_key(subdir.file_name().as_encoded_bytes()).warn_error(format!( - "expected subdir: {:?} to be parsable as a key", - subdir.file_name() - )); + let key_result = subdir_to_key(subdir.file_name().into_vec()).warn_error(format!( + "expected subdir: {:?} to be parsable as a key", + subdir.file_name() + )); let key = match key_result { Ok(k) => k, Err(_) => continue, @@ -57,7 +57,7 @@ impl DiskCache { } for key_file in std::fs::read_dir(subdir.path())? { let key_file = key_file?; - let file_name_result = FileName::try_parse(key_file.file_name().as_encoded_bytes()) + let file_name_result = FileName::try_parse(key_file.file_name().into_vec()) .warn_error(format!( "expected file name: {:?} to be parsed as a cache FileName", key_file.file_name() @@ -119,21 +119,26 @@ impl DiskCache { return Err(ChunkCacheError::parse("file checksum mismatch")); } - let result = self.hit(&mut file, range)?; + let result = self.hit(&mut file, *start, range)?; self.update_time_stamp(&(key.clone(), *start, *end))?; Ok(Some(result)) } - fn hit(&self, file: &mut R, range: &Range) -> Result, ChunkCacheError> { + fn hit( + &self, + file: &mut R, + start: u32, + range: &Range, + ) -> Result, ChunkCacheError> { let header = CacheFileHeader::deserialize(file)?; let start_byte_index = header .chunk_byte_indicies - .get(range.start as usize) + .get((range.start - start) as usize) .ok_or(ChunkCacheError::BadRange)?; let end_byte_index = header .chunk_byte_indicies - .get(range.end as usize) + .get((range.end - start) as usize) .ok_or(ChunkCacheError::BadRange)?; let start_position = (header.header_len + *start_byte_index as usize) as u64; file.seek(std::io::SeekFrom::Start(start_position))?; @@ -168,7 +173,7 @@ impl DiskCache { .keys() .filter(|(k, start, end)| k == key && *start <= range.start && *end >= range.end) .collect::>(); - if matches.len() > 0 { + if !matches.is_empty() { self.update_time_stamp(&matches[0].clone())?; return Ok(()); } @@ -254,11 +259,11 @@ impl DiskCache { let cache_root = self.cache_root.clone(); let (file_name, _) = self .items - .get_mut(&item_key) + .get_mut(item_key) .ok_or(ChunkCacheError::Infallible)?; - let old_file_path = cache_file_path(cache_root.clone(), &item_key.0, &file_name); + let old_file_path = cache_file_path(cache_root.clone(), &item_key.0, file_name); file_name.timestamp = SystemTime::now(); - let new_file_path = cache_file_path(cache_root, &item_key.0, &file_name); + let new_file_path = cache_file_path(cache_root, &item_key.0, file_name); std::fs::rename(old_file_path, new_file_path)?; Ok(()) @@ -268,7 +273,7 @@ impl DiskCache { fn cache_file_path(cache_root: PathBuf, key: &Key, file_name: &FileName) -> PathBuf { cache_root .join(key_to_subdir(key)) - .join(file_name.as_path_buf()) + .join(Into::::into(file_name)) } impl ChunkCache for DiskCache { @@ -284,7 +289,7 @@ impl ChunkCache for DiskCache { &mut self, key: &Key, range: &Range, - chunk_byte_indicies: &Vec, + chunk_byte_indicies: &[u32], data: &[u8], ) -> Result<(), crate::error::ChunkCacheError> { self.put_impl(key, range, chunk_byte_indicies, data) @@ -300,9 +305,8 @@ fn key_to_subdir(key: &Key) -> String { } fn subdir_to_key>(subdir: T) -> Result { - let mut buf = Vec::new(); - BASE64_ENGINE - .decode_slice(subdir, &mut buf) + let mut buf = BASE64_ENGINE + .decode(subdir) .map_err(ChunkCacheError::parse)?; if buf.len() < 1 + size_of::() { return Err(ChunkCacheError::parse("decoded too few bytes")); @@ -340,7 +344,6 @@ mod test_utils { // If it's a directory, call this function recursively if path.is_dir() { print_directory_contents(&path); - println!(""); } } Err(e) => eprintln!("Error reading entry: {}", e), @@ -404,7 +407,7 @@ mod tests { use crate::ChunkCache; - use super::DiskCache; + use super::{subdir_to_key, DiskCache}; #[test] fn test_get_cache_empty() { @@ -486,4 +489,31 @@ mod tests { assert!(cache.put(&key, &range, &offsets, &data).is_ok()); } + + #[test] + fn test_initialize_non_empty() { + let cache_root = TempDir::new("puts_eviction").unwrap(); + let mut cache = DiskCache::initialize(cache_root.path(), DEFAULT_CAPACITY).unwrap(); + + let mut keys_and_ranges = Vec::new(); + + for _ in 0..20 { + let (key, range, offsets, data) = RandomEntryIterator.next().unwrap(); + assert!(cache.put(&key, &range, &offsets, &data).is_ok()); + keys_and_ranges.push((key, range)); + } + let mut cache2 = DiskCache::initialize(cache_root.path(), DEFAULT_CAPACITY).unwrap(); + for (i, (key, range)) in keys_and_ranges.iter().enumerate() { + let get_result = cache2.get(&key, &range); + assert!(get_result.is_ok(), "{i} {get_result:?}"); + assert!(get_result.unwrap().is_some(), "{i}"); + } + } + + #[test] + fn test_subdir_to_key() { + let s = "oL-Xqk1J00kVe1U4kCko-Kw4zaVv3-4U73i27w5DViBkZWZhdWx0"; + let key = subdir_to_key(s); + assert!(key.is_ok(), "{key:?}") + } } diff --git a/chunk_cache/src/disk_cache/file_name.rs b/chunk_cache/src/disk_cache/file_name.rs index a06d9569..930bae1a 100644 --- a/chunk_cache/src/disk_cache/file_name.rs +++ b/chunk_cache/src/disk_cache/file_name.rs @@ -3,13 +3,14 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE_NO_PAD, Engine}; +use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE, Engine}; use blake3::Hash; use error_printer::ErrorPrinter; +use std::mem::size_of; use crate::error::ChunkCacheError; -const BASE64_ENGINE: GeneralPurpose = BASE64_URL_SAFE_NO_PAD; +const BASE64_ENGINE: GeneralPurpose = BASE64_URL_SAFE; /// A file name is represented as the start index and end index of chunks for the given xorb /// and a timestamp of last successful access or put #[derive(Debug, PartialEq, Eq, Clone, Hash)] @@ -36,9 +37,8 @@ impl FileName { } pub fn try_parse>(file_name: T) -> Result { - let mut buf = Vec::with_capacity(FILE_NAME_LEN_PRE_BAS64); - BASE64_ENGINE - .decode_slice(file_name, &mut buf) + let buf = BASE64_ENGINE + .decode(file_name) .map_err(ChunkCacheError::parse)?; if buf.len() != FILE_NAME_LEN_PRE_BAS64 { return Err(ChunkCacheError::parse("invalid size of decoded buffer")); @@ -56,37 +56,66 @@ impl FileName { hash, }) } - - pub fn as_path_buf(&self) -> PathBuf { - PathBuf::from(self.to_string()) - } } -impl Into for &FileName { - fn into(self) -> String { - let timestamp = self +impl From<&FileName> for String { + fn from(value: &FileName) -> Self { + let timestamp = value .timestamp .duration_since(UNIX_EPOCH) .log_error("filename has an invalid timestamp") .unwrap_or_default() .as_millis() as u64; let mut buf = [0u8; FILE_NAME_LEN_PRE_BAS64]; - buf[0..4].copy_from_slice(&self.start_idx.to_le_bytes()); - buf[4..8].copy_from_slice(&self.end_idx.to_le_bytes()); + buf[0..4].copy_from_slice(&value.start_idx.to_le_bytes()); + buf[4..8].copy_from_slice(&value.end_idx.to_le_bytes()); buf[8..16].copy_from_slice(×tamp.to_le_bytes()); - buf[16..].copy_from_slice(self.hash.as_bytes()); + buf[16..].copy_from_slice(value.hash.as_bytes()); BASE64_ENGINE.encode(buf) } } -impl ToString for FileName { - fn to_string(&self) -> String { - self.into() +impl From<&FileName> for PathBuf { + fn from(value: &FileName) -> Self { + PathBuf::from(String::from(value)) } } -impl Into for &FileName { - fn into(self) -> PathBuf { - PathBuf::from(self.to_string()) +#[cfg(test)] +mod tests { + use std::{ + path::PathBuf, + time::{SystemTime, UNIX_EPOCH}, + }; + + use blake3::{Hash, OUT_LEN}; + + use super::FileName; + + #[test] + fn test_encode_decode() { + let f = FileName { + start_idx: 100, + end_idx: 1000, + timestamp: SystemTime::now(), + hash: Hash::from_bytes([6u8; OUT_LEN]), + }; + let encoded: PathBuf = (&f).into(); + let decoded_result = FileName::try_parse(encoded.file_name().unwrap().as_encoded_bytes()); + assert!(decoded_result.is_ok()); + let FileName { + start_idx, + end_idx, + timestamp, + hash, + } = decoded_result.unwrap(); + assert_eq!(start_idx, f.start_idx); + assert_eq!(end_idx, f.end_idx); + assert_eq!(hash, f.hash); + + // checking 1 by 1 because the timestamp nanos may shift a little bit by conversion to unix timestamp millis + let millis = + |t: SystemTime| -> u64 { t.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 }; + assert_eq!(millis(timestamp), millis(f.timestamp)); } } diff --git a/chunk_cache/src/lib.rs b/chunk_cache/src/lib.rs index 34c34e37..592e2f21 100644 --- a/chunk_cache/src/lib.rs +++ b/chunk_cache/src/lib.rs @@ -4,13 +4,15 @@ pub mod error; use cas_types::{Key, Range}; use error::ChunkCacheError; +pub use disk_cache::DiskCache; + pub trait ChunkCache { fn get(&mut self, key: &Key, range: &Range) -> Result>, ChunkCacheError>; fn put( &mut self, key: &Key, range: &Range, - chunk_byte_indicies: &Vec, + chunk_byte_indicies: &[u32], data: &[u8], ) -> Result<(), ChunkCacheError>; } From c14e28fd861e5f16cabeee1b41fe3a28a6e17c71 Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Tue, 1 Oct 2024 10:55:18 -0700 Subject: [PATCH 4/9] size_of --- chunk_cache/src/disk_cache.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/chunk_cache/src/disk_cache.rs b/chunk_cache/src/disk_cache.rs index 02f33145..13810333 100644 --- a/chunk_cache/src/disk_cache.rs +++ b/chunk_cache/src/disk_cache.rs @@ -2,6 +2,7 @@ use std::{ collections::HashMap, fs::File, io::{Read, Seek, Write}, + mem::size_of, os::unix::ffi::OsStringExt, path::{Path, PathBuf}, time::SystemTime, From ff0df1a13f68a923c5b69569febd2a1e18eb3471 Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Tue, 1 Oct 2024 11:33:33 -0700 Subject: [PATCH 5/9] modify for build --- chunk_cache/src/disk_cache.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/chunk_cache/src/disk_cache.rs b/chunk_cache/src/disk_cache.rs index 13810333..f74da1d7 100644 --- a/chunk_cache/src/disk_cache.rs +++ b/chunk_cache/src/disk_cache.rs @@ -3,7 +3,6 @@ use std::{ fs::File, io::{Read, Seek, Write}, mem::size_of, - os::unix::ffi::OsStringExt, path::{Path, PathBuf}, time::SystemTime, }; @@ -43,10 +42,11 @@ impl DiskCache { for subdir in std::fs::read_dir(&cache_root)? { let subdir = subdir?; - let key_result = subdir_to_key(subdir.file_name().into_vec()).warn_error(format!( - "expected subdir: {:?} to be parsable as a key", - subdir.file_name() - )); + let key_result = + subdir_to_key(subdir.file_name().as_encoded_bytes()).warn_error(format!( + "expected subdir: {:?} to be parsable as a key", + subdir.file_name() + )); let key = match key_result { Ok(k) => k, Err(_) => continue, @@ -58,7 +58,7 @@ impl DiskCache { } for key_file in std::fs::read_dir(subdir.path())? { let key_file = key_file?; - let file_name_result = FileName::try_parse(key_file.file_name().into_vec()) + let file_name_result = FileName::try_parse(key_file.file_name().as_encoded_bytes()) .warn_error(format!( "expected file name: {:?} to be parsed as a cache FileName", key_file.file_name() @@ -193,7 +193,7 @@ impl DiskCache { let file_name = FileName::new(range.start, range.end, SystemTime::now(), hash); let file_path = Path::join(dir, Into::::into(&file_name)); - if !std::fs::exists(dir)? { + if !dir.exists() { std::fs::create_dir_all(dir)?; } let mut file = std::fs::OpenOptions::new() From f156c5a682ec5db2a5ee09177bfe26bb6638d81b Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Tue, 1 Oct 2024 11:46:04 -0700 Subject: [PATCH 6/9] more size_of imports --- chunk_cache/src/disk_cache/cache_file_header.rs | 5 ++++- chunk_cache/src/disk_cache/file_name.rs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/chunk_cache/src/disk_cache/cache_file_header.rs b/chunk_cache/src/disk_cache/cache_file_header.rs index 72f0e727..00985c78 100644 --- a/chunk_cache/src/disk_cache/cache_file_header.rs +++ b/chunk_cache/src/disk_cache/cache_file_header.rs @@ -1,4 +1,7 @@ -use std::io::{Read, Seek, Write}; +use std::{ + io::{Read, Seek, Write}, + mem::size_of, +}; use crate::error::ChunkCacheError; diff --git a/chunk_cache/src/disk_cache/file_name.rs b/chunk_cache/src/disk_cache/file_name.rs index 930bae1a..e155a8b5 100644 --- a/chunk_cache/src/disk_cache/file_name.rs +++ b/chunk_cache/src/disk_cache/file_name.rs @@ -1,4 +1,5 @@ use std::{ + mem::size_of, path::PathBuf, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -6,7 +7,6 @@ use std::{ use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE, Engine}; use blake3::Hash; use error_printer::ErrorPrinter; -use std::mem::size_of; use crate::error::ChunkCacheError; From 521992972fcf49e23109592bff337e68bc8445a1 Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Tue, 1 Oct 2024 12:22:16 -0700 Subject: [PATCH 7/9] lint --- chunk_cache/src/disk_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chunk_cache/src/disk_cache.rs b/chunk_cache/src/disk_cache.rs index f74da1d7..fa706dd5 100644 --- a/chunk_cache/src/disk_cache.rs +++ b/chunk_cache/src/disk_cache.rs @@ -200,7 +200,7 @@ impl DiskCache { .create(true) .write(true) .truncate(true) - .open(&file_path)?; + .open(file_path)?; file.write_all(&header_buf)?; file.write_all(data)?; let len = (header_buf.len() + data.len()) as u64; From 3be9a270300f79083d1bf0044c7eb0d2247b4c25 Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Tue, 1 Oct 2024 16:42:44 -0700 Subject: [PATCH 8/9] fix fail case --- chunk_cache/src/disk_cache.rs | 89 +++++++++++++++++++------ chunk_cache/src/disk_cache/file_name.rs | 50 +++----------- 2 files changed, 76 insertions(+), 63 deletions(-) diff --git a/chunk_cache/src/disk_cache.rs b/chunk_cache/src/disk_cache.rs index fa706dd5..cdea4fa4 100644 --- a/chunk_cache/src/disk_cache.rs +++ b/chunk_cache/src/disk_cache.rs @@ -4,7 +4,6 @@ use std::{ io::{Read, Seek, Write}, mem::size_of, path::{Path, PathBuf}, - time::SystemTime, }; use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE, Engine}; @@ -108,12 +107,25 @@ impl DiskCache { .filter(|((k, start, end), _)| *start <= range.start && *end >= range.end && k == key) .peekable(); let hit_option = hits.peek(); - let ((key, start, end), (file_name, _)) = match hit_option { + let ((key, start, end), (file_name, len)) = match hit_option { Some(v) => v, None => return Ok(None), }; let file_path = self.cache_file_path(key, file_name); - let mut file = File::open(&file_path)?; + let file_result = File::open(&file_path); + + let mut file = match file_result { + Ok(file) => file, + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => { + // cache item got evicted by another process + self.total_bytes -= len; + self.items.remove(&(key.clone(), *start, *end)); + return Ok(None); + } + _ => return Err(e.into()), + }, + }; let hash = blake3::Hasher::new().update_reader(&mut file)?.finalize(); if hash != file_name.hash { std::fs::remove_file(&file_path)?; @@ -121,7 +133,6 @@ impl DiskCache { } let result = self.hit(&mut file, *start, range)?; - self.update_time_stamp(&(key.clone(), *start, *end))?; Ok(Some(result)) } @@ -133,6 +144,7 @@ impl DiskCache { range: &Range, ) -> Result, ChunkCacheError> { let header = CacheFileHeader::deserialize(file)?; + let start_byte_index = header .chunk_byte_indicies .get((range.start - start) as usize) @@ -141,8 +153,10 @@ impl DiskCache { .chunk_byte_indicies .get((range.end - start) as usize) .ok_or(ChunkCacheError::BadRange)?; + let start_position = (header.header_len + *start_byte_index as usize) as u64; file.seek(std::io::SeekFrom::Start(start_position))?; + let len = (end_byte_index - start_byte_index) as usize; let mut result = vec![0; len]; file.read_exact(&mut result)?; @@ -175,7 +189,6 @@ impl DiskCache { .filter(|(k, start, end)| k == key && *start <= range.start && *end >= range.end) .collect::>(); if !matches.is_empty() { - self.update_time_stamp(&matches[0].clone())?; return Ok(()); } @@ -190,7 +203,7 @@ impl DiskCache { .update(&header_buf) .update(data) .finalize(); - let file_name = FileName::new(range.start, range.end, SystemTime::now(), hash); + let file_name = FileName::new(range.start, range.end, hash); let file_path = Path::join(dir, Into::::into(&file_name)); if !dir.exists() { @@ -255,20 +268,6 @@ impl DiskCache { fn cache_file_path(&self, key: &Key, file_name: &FileName) -> PathBuf { cache_file_path(self.cache_root.clone(), key, file_name) } - - fn update_time_stamp(&mut self, item_key: &(Key, u32, u32)) -> Result<(), ChunkCacheError> { - let cache_root = self.cache_root.clone(); - let (file_name, _) = self - .items - .get_mut(item_key) - .ok_or(ChunkCacheError::Infallible)?; - let old_file_path = cache_file_path(cache_root.clone(), &item_key.0, file_name); - file_name.timestamp = SystemTime::now(); - let new_file_path = cache_file_path(cache_root, &item_key.0, file_name); - std::fs::rename(old_file_path, new_file_path)?; - - Ok(()) - } } fn cache_file_path(cache_root: PathBuf, key: &Key, file_name: &FileName) -> PathBuf { @@ -401,6 +400,8 @@ mod test_utils { #[cfg(test)] mod tests { + use std::collections::BTreeSet; + use crate::disk_cache::test_utils::*; use cas_types::Range; @@ -493,7 +494,7 @@ mod tests { #[test] fn test_initialize_non_empty() { - let cache_root = TempDir::new("puts_eviction").unwrap(); + let cache_root = TempDir::new("initialize_non_empty").unwrap(); let mut cache = DiskCache::initialize(cache_root.path(), DEFAULT_CAPACITY).unwrap(); let mut keys_and_ranges = Vec::new(); @@ -509,6 +510,10 @@ mod tests { assert!(get_result.is_ok(), "{i} {get_result:?}"); assert!(get_result.unwrap().is_some(), "{i}"); } + + let cache_keys = cache.items.keys().collect::>(); + let cache2_keys = cache2.items.keys().collect::>(); + assert_eq!(cache_keys, cache2_keys); } #[test] @@ -517,4 +522,46 @@ mod tests { let key = subdir_to_key(s); assert!(key.is_ok(), "{key:?}") } + + #[test] + fn test_unknown_eviction() { + let cache_root = TempDir::new("initialize_non_empty").unwrap(); + let capacity = Some(2 * RANGE_LEN as u64); + let mut cache = DiskCache::initialize(cache_root.path(), capacity).unwrap(); + let (key, range, chunk_byte_indicies, data) = RandomEntryIterator.next().unwrap(); + cache + .put(&key, &range, &chunk_byte_indicies, &data) + .unwrap(); + + let mut cache2 = DiskCache::initialize(cache_root.path(), capacity).unwrap(); + let get_result = cache2.get(&key, &range); + assert!(get_result.is_ok()); + assert!(get_result.unwrap().is_some()); + + let (key2, range2, chunk_byte_indicies2, data2) = RandomEntryIterator.next().unwrap(); + assert!(cache2 + .put(&key2, &range2, &chunk_byte_indicies2, &data2) + .is_ok()); + + let mut get_result_1 = cache2.get(&key, &range).unwrap(); + let get_result_2 = cache2.get(&key2, &range2).unwrap(); + assert!(get_result_1.is_some() != get_result_2.is_some()); + let mut i = 0; + while get_result_1.is_some() && i < 10 { + i += 1; + let (key2, range2, chunk_byte_indicies2, data2) = RandomEntryIterator.next().unwrap(); + assert!(cache2 + .put(&key2, &range2, &chunk_byte_indicies2, &data2) + .is_ok()); + get_result_1 = cache2.get(&key, &range).unwrap(); + } + if get_result_1.is_some() { + // randomness didn't evict the record after 10 tries, don't test this case now + } + // we've evicted the original record from the cache + // note using the original cache handle without updates! + let get_result_post_eviction = cache.get(&key, &range); + assert!(get_result_post_eviction.is_ok()); + assert!(get_result_post_eviction.unwrap().is_none()); + } } diff --git a/chunk_cache/src/disk_cache/file_name.rs b/chunk_cache/src/disk_cache/file_name.rs index e155a8b5..7134cbc1 100644 --- a/chunk_cache/src/disk_cache/file_name.rs +++ b/chunk_cache/src/disk_cache/file_name.rs @@ -1,12 +1,7 @@ -use std::{ - mem::size_of, - path::PathBuf, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; +use std::{mem::size_of, path::PathBuf}; use base64::{engine::GeneralPurpose, prelude::BASE64_URL_SAFE, Engine}; use blake3::Hash; -use error_printer::ErrorPrinter; use crate::error::ChunkCacheError; @@ -17,21 +12,19 @@ const BASE64_ENGINE: GeneralPurpose = BASE64_URL_SAFE; pub struct FileName { pub start_idx: u32, pub end_idx: u32, - pub timestamp: SystemTime, pub hash: blake3::Hash, } /// length of the total data making up the file name -/// start_index, end_index, timestamp (unix u64), hash of the content +/// start_index, end_index, hash of the content const FILE_NAME_LEN_PRE_BAS64: usize = - size_of::() + size_of::() + size_of::() + size_of::(); + size_of::() + size_of::() + size_of::(); impl FileName { - pub fn new(start_idx: u32, end_idx: u32, timestamp: SystemTime, hash: Hash) -> Self { + pub fn new(start_idx: u32, end_idx: u32, hash: Hash) -> Self { Self { start_idx, end_idx, - timestamp, hash, } } @@ -45,14 +38,11 @@ impl FileName { } let start_idx = u32::from_le_bytes(buf[0..4].try_into()?); let end_idx = u32::from_le_bytes(buf[4..8].try_into()?); - let timestamp = - UNIX_EPOCH + Duration::from_millis(u64::from_le_bytes(buf[8..16].try_into()?)); - let hash = blake3::Hash::from_bytes(buf[16..].try_into()?); + let hash = blake3::Hash::from_bytes(buf[8..].try_into()?); Ok(FileName { start_idx, end_idx, - timestamp, hash, }) } @@ -60,17 +50,10 @@ impl FileName { impl From<&FileName> for String { fn from(value: &FileName) -> Self { - let timestamp = value - .timestamp - .duration_since(UNIX_EPOCH) - .log_error("filename has an invalid timestamp") - .unwrap_or_default() - .as_millis() as u64; let mut buf = [0u8; FILE_NAME_LEN_PRE_BAS64]; buf[0..4].copy_from_slice(&value.start_idx.to_le_bytes()); buf[4..8].copy_from_slice(&value.end_idx.to_le_bytes()); - buf[8..16].copy_from_slice(×tamp.to_le_bytes()); - buf[16..].copy_from_slice(value.hash.as_bytes()); + buf[8..].copy_from_slice(value.hash.as_bytes()); BASE64_ENGINE.encode(buf) } } @@ -83,10 +66,7 @@ impl From<&FileName> for PathBuf { #[cfg(test)] mod tests { - use std::{ - path::PathBuf, - time::{SystemTime, UNIX_EPOCH}, - }; + use std::path::PathBuf; use blake3::{Hash, OUT_LEN}; @@ -97,25 +77,11 @@ mod tests { let f = FileName { start_idx: 100, end_idx: 1000, - timestamp: SystemTime::now(), hash: Hash::from_bytes([6u8; OUT_LEN]), }; let encoded: PathBuf = (&f).into(); let decoded_result = FileName::try_parse(encoded.file_name().unwrap().as_encoded_bytes()); assert!(decoded_result.is_ok()); - let FileName { - start_idx, - end_idx, - timestamp, - hash, - } = decoded_result.unwrap(); - assert_eq!(start_idx, f.start_idx); - assert_eq!(end_idx, f.end_idx); - assert_eq!(hash, f.hash); - - // checking 1 by 1 because the timestamp nanos may shift a little bit by conversion to unix timestamp millis - let millis = - |t: SystemTime| -> u64 { t.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 }; - assert_eq!(millis(timestamp), millis(f.timestamp)); + assert_eq!(decoded_result.unwrap(), f); } } From 5ebb2ad2fba63c2564833d8dc2263ddb87834148 Mon Sep 17 00:00:00 2001 From: Assaf Vayner Date: Tue, 1 Oct 2024 19:28:06 -0700 Subject: [PATCH 9/9] test updates --- chunk_cache/src/disk_cache.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/chunk_cache/src/disk_cache.rs b/chunk_cache/src/disk_cache.rs index cdea4fa4..223efd04 100644 --- a/chunk_cache/src/disk_cache.rs +++ b/chunk_cache/src/disk_cache.rs @@ -456,14 +456,20 @@ mod tests { for start in range.start..range.end { for end in (start + 1)..=range.end { - assert!( - cache.get(&key, &Range { start, end }).unwrap().is_some(), - "range: [{start} {end})" - ); + let get_result = cache.get(&key, &Range { start, end }).unwrap(); + assert!(get_result.is_some(), "range: [{start} {end})"); + let data_portion = get_data(&Range { start, end }, &chunk_byte_indicies, &data); + assert_eq!(data_portion, get_result.unwrap()) } } } + fn get_data<'a>(range: &Range, chunk_byte_indicies: &[u32], data: &'a [u8]) -> &'a [u8] { + let start = chunk_byte_indicies[range.start as usize] as usize; + let end = chunk_byte_indicies[range.end as usize] as usize; + &data[start..end] + } + #[test] fn test_puts_eviction() { const CAP: u64 = (RANGE_LEN * 4) as u64; @@ -550,13 +556,14 @@ mod tests { while get_result_1.is_some() && i < 10 { i += 1; let (key2, range2, chunk_byte_indicies2, data2) = RandomEntryIterator.next().unwrap(); - assert!(cache2 + cache2 .put(&key2, &range2, &chunk_byte_indicies2, &data2) - .is_ok()); + .unwrap(); get_result_1 = cache2.get(&key, &range).unwrap(); } if get_result_1.is_some() { // randomness didn't evict the record after 10 tries, don't test this case now + return; } // we've evicted the original record from the cache // note using the original cache handle without updates!