diff --git a/src/constants.rs b/src/constants.rs index edd04ee..5b8d83c 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -2,6 +2,8 @@ mod nullable; pub use nullable::Nullable; mod page_settings; +pub use page_settings::MAX_FILE_HANDLE_COUNT; +pub use page_settings::MAX_PAGE_CACHE; pub use page_settings::PAGES_PER_FILE; pub use page_settings::PAGE_SIZE; diff --git a/src/constants/page_settings.rs b/src/constants/page_settings.rs index 18fe10e..e7fc80f 100644 --- a/src/constants/page_settings.rs +++ b/src/constants/page_settings.rs @@ -5,3 +5,10 @@ pub const PAGE_SIZE: u16 = 4096; /// I have been careful to use usize in most places, as a result a variety of limits /// will be lower on a 32bit platform. pub const PAGES_PER_FILE: usize = 256; + +/// Number of pages to hold in cache, each will consume PAGE_SIZE of memory +pub const MAX_PAGE_CACHE: usize = 128; + +/// Linux seems to limit to 1024, macos 256, windows 512 but I'm staying low until +/// a benchmark proves I need to change it. +pub const MAX_FILE_HANDLE_COUNT: usize = 128; diff --git a/src/engine/analyzer/definition_lookup.rs b/src/engine/analyzer/definition_lookup.rs index ae94c5e..234527d 100644 --- a/src/engine/analyzer/definition_lookup.rs +++ b/src/engine/analyzer/definition_lookup.rs @@ -322,9 +322,6 @@ pub enum DefinitionLookupError { mod tests { use tempfile::TempDir; - use crate::engine::io::block_layer::file_manager::FileManager; - use crate::engine::io::block_layer::lock_cache_manager::LockCacheManager; - // Note this useful idiom: importing names from outer (for mod tests) scope. use super::super::super::io::RowManager; use super::super::super::transactions::TransactionManager; diff --git a/src/engine/io/block_layer.rs b/src/engine/io/block_layer.rs index e4778de..c32a242 100644 --- a/src/engine/io/block_layer.rs +++ b/src/engine/io/block_layer.rs @@ -27,10 +27,14 @@ let page = get_page_for_read() */ -pub mod file_manager; +//pub mod file_manager; +pub mod file_manager2; -pub mod free_space_manager; +pub mod file_operations; -pub mod lock_cache_manager; +pub mod free_space_manager; pub mod lock_manager; + +mod resource_formatter; +pub use resource_formatter::ResourceFormatter; diff --git a/src/engine/io/block_layer/file_manager.rs b/src/engine/io/block_layer/file_manager.rs index 07a1b28..03bf2d8 100644 --- a/src/engine/io/block_layer/file_manager.rs +++ b/src/engine/io/block_layer/file_manager.rs @@ -11,15 +11,7 @@ use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::sync::oneshot::error::RecvError; use tokio::sync::oneshot::{self, Sender}; -//Inner Types -mod file_executor; -use file_executor::FileExecutor; -use file_executor::FileExecutorError; -mod file_operations; -mod request_type; -use request_type::RequestType; -mod resource_formatter; -pub use resource_formatter::ResourceFormatter; + #[derive(Clone, Debug)] pub struct FileManager { diff --git a/src/engine/io/block_layer/file_manager2.rs b/src/engine/io/block_layer/file_manager2.rs new file mode 100644 index 0000000..6393634 --- /dev/null +++ b/src/engine/io/block_layer/file_manager2.rs @@ -0,0 +1,388 @@ +use bytes::{Bytes, BytesMut}; +use moka::future::Cache; +use std::convert::TryFrom; +use std::io::SeekFrom; +use std::num::TryFromIntError; +use std::ops::DerefMut; +use std::{ + ffi::{OsStr, OsString}, + path::{Path, PathBuf}, + sync::{atomic::AtomicUsize, Arc}, +}; +use thiserror::Error; +use tokio::fs::{read_dir, File}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::sync::{Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard}; + +use crate::constants::{MAX_FILE_HANDLE_COUNT, MAX_PAGE_CACHE}; +use crate::engine::io::block_layer::ResourceFormatter; +use crate::{ + constants::PAGE_SIZE, + engine::io::page_formats::{PageId, PageOffset}, +}; + +use super::file_operations::{FileOperations, FileOperationsError}; +use super::lock_manager::LockManager; + +/// Empty page buffer +const EMPTY_BUFFER: [u8; 16] = [0u8; 16]; + +/// Attempt to move away from channels for the FileManager Service. +/// +/// This code has ended up tremendously simpler than the prior version! +pub struct FileManager2 { + data_dir: PathBuf, + file_handles: Cache<(PageId, usize), Arc>>, + file_offsets: Cache>, + lock_manager: LockManager, + page_cache: Cache<(PageId, PageOffset), Bytes>, +} + +impl FileManager2 { + pub fn new(raw_path: OsString) -> Result { + let data_dir = Path::new(&raw_path).to_path_buf(); + + if !data_dir.is_dir() { + return Err(FileManager2Error::NeedDirectory( + data_dir.to_string_lossy().to_string(), + )); + } + + Ok(FileManager2 { + data_dir, + file_handles: Cache::new(MAX_FILE_HANDLE_COUNT), + file_offsets: Cache::new(10000), + lock_manager: LockManager::new(), + page_cache: Cache::new(MAX_PAGE_CACHE), + }) + } + + pub async fn get_next_offset( + &self, + page_id: &PageId, + ) -> Result<(PageOffset, OwnedRwLockWriteGuard<(PageId, PageOffset)>), FileManager2Error> { + let data_dir = self.data_dir.clone(); + let page_id = *page_id; + let current_offset = self + .file_offsets + .get_or_try_insert_with(page_id, async move { + let po = Self::find_next_offset(&data_dir, &page_id).await?; + let start_atomic: Arc = Arc::new(AtomicUsize::new(po.0)); + Ok::, FileManager2Error>(start_atomic) + }) + .await?; + let new_offset = current_offset.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let new_po = PageOffset(new_offset); + + let write_lock = self.lock_manager.write(page_id, new_po).await; + Ok((new_po, write_lock)) + } + + pub async fn get_next_offset_non_zero( + &self, + page_id: &PageId, + ) -> Result<(PageOffset, OwnedRwLockWriteGuard<(PageId, PageOffset)>), FileManager2Error> { + loop { + let (offset, write_lock) = self.get_next_offset(&page_id).await?; + if offset != PageOffset(0) { + return Ok((offset, write_lock)); + } + } + } + + pub async fn add_page( + &self, + guard: OwnedRwLockWriteGuard<(PageId, PageOffset)>, + page: Bytes, + ) -> Result<(), FileManager2Error> { + let data_dir = self.data_dir.clone(); + let page_id = guard.0; + let file_number = guard.1.get_file_number(); + let file_handle = self + .file_handles + .get_or_try_insert_with((page_id, file_number), async move { + let handle = FileOperations::open_path(&data_dir, &page_id, file_number).await?; + Ok::>, FileManager2Error>(Arc::new(Mutex::const_new(handle))) + }) + .await?; + let mut file = file_handle.lock().await; + + self.page_cache.insert((page_id, guard.1), page.clone()); + let _ = FileOperations::add_chunk(file.deref_mut(), &guard.1, page).await?; + Ok(()) + } + + pub async fn get_page( + &self, + page_id: &PageId, + offset: &PageOffset, + ) -> Result<(Bytes, OwnedRwLockReadGuard<(PageId, PageOffset)>), FileManager2Error> { + let data_dir = self.data_dir.clone(); + let page_id = *page_id; + let offset = *offset; + let file_number = offset.get_file_number(); + let file_handles = self.file_handles.clone(); + + let chunk = self + .page_cache + .get_or_try_insert_with((page_id, offset), async move { + let file_handle = file_handles + .get_or_try_insert_with((page_id, file_number), async move { + let handle = + FileOperations::open_path(&data_dir, &page_id, file_number).await?; + Ok::>, FileManager2Error>(Arc::new(Mutex::const_new( + handle, + ))) + }) + .await?; + let mut file = file_handle.lock().await; + + let chunk = FileOperations::read_chunk(file.deref_mut(), &offset).await?; + Ok::(chunk) + }) + .await?; + + let read_lock = self.lock_manager.read(page_id, offset).await; + Ok((chunk, read_lock)) + } + + pub async fn get_page_for_update( + &self, + page_id: &PageId, + offset: &PageOffset, + ) -> Result<(Bytes, OwnedRwLockWriteGuard<(PageId, PageOffset)>), FileManager2Error> { + let data_dir = self.data_dir.clone(); + let page_id = *page_id; + let offset = *offset; + let file_number = offset.get_file_number(); + let file_handles = self.file_handles.clone(); + + let chunk = self + .page_cache + .get_or_try_insert_with((page_id, offset), async move { + let file_handle = file_handles + .get_or_try_insert_with((page_id, file_number), async move { + let handle = + FileOperations::open_path(&data_dir, &page_id, file_number).await?; + Ok::>, FileManager2Error>(Arc::new(Mutex::const_new( + handle, + ))) + }) + .await?; + let mut file = file_handle.lock().await; + + let chunk = FileOperations::read_chunk(file.deref_mut(), &offset).await?; + Ok::(chunk) + }) + .await?; + + let write_lock = self.lock_manager.write(page_id, offset).await; + Ok((chunk, write_lock)) + } + + pub async fn update_page( + &self, + guard: OwnedRwLockWriteGuard<(PageId, PageOffset)>, + page: Bytes, + ) -> Result<(), FileManager2Error> { + let data_dir = self.data_dir.clone(); + let page_id = guard.0; + let file_number = guard.1.get_file_number(); + let file_handle = self + .file_handles + .get_or_try_insert_with((page_id, file_number), async move { + let handle = FileOperations::open_path(&data_dir, &page_id, file_number).await?; + Ok::>, FileManager2Error>(Arc::new(Mutex::const_new(handle))) + }) + .await?; + let mut file = file_handle.lock().await; + + self.page_cache.insert((page_id, guard.1), page.clone()); + let _ = FileOperations::update_chunk(file.deref_mut(), &guard.1, page).await?; + Ok(()) + } + + async fn find_next_offset( + data_dir: &Path, + page_id: &PageId, + ) -> Result { + let (path, count) = match Self::search_for_max_file(data_dir, page_id).await? { + Some((p, c)) => (p, c), + None => { + return Ok(PageOffset(0)); + } + }; + + let mut file = File::open(path.clone()).await?; + let file_meta = file.metadata().await?; + let file_len = file_meta.len(); + + if file_len % PAGE_SIZE as u64 != 0 { + return Err(FileManager2Error::IncorrectPageSize(file_len, path)); + } + + // If this fails you are probably on a 32bit platform and + // have changed the PAGE_SIZE constant. I would reduce PAGE_SIZE. + let file_len = usize::try_from(file_len)?; + + //Now we need to scan backwards in the file to make sure we find the last non-zero page. + let mut in_file_len = file_len; + while in_file_len != 0 { + //Move back to test a block + in_file_len = file_len.saturating_sub(PAGE_SIZE as usize); + + let in_file_len_u64 = u64::try_from(in_file_len)?; + file.seek(SeekFrom::Start(in_file_len_u64)).await?; + + //Each page should start with a non-zero number within the first 16 bytes, if it has data + let mut buffer = BytesMut::with_capacity(EMPTY_BUFFER.len()); + file.read_buf(&mut buffer).await?; + let buffer = buffer.freeze(); + if buffer == Bytes::from_static(&EMPTY_BUFFER) { + //Okay we keep going + continue; + } else { + //We can calucate our page offset now + in_file_len = file_len.saturating_add(PAGE_SIZE as usize); + let po = PageOffset::calculate_page_offset(count, in_file_len); + return Ok(po); + } + } + + //Okay so the file is empty + let po = PageOffset::calculate_page_offset(count, in_file_len); + Ok(po) + } + + /// This will search for the highest numbered file for the Uuid + async fn search_for_max_file( + data_dir: &Path, + page_id: &PageId, + ) -> Result, FileManager2Error> { + let sub_path = FileOperations::make_sub_path(data_dir, page_id).await?; + let target_uuid = ResourceFormatter::format_uuid(&page_id.resource_key); + let target_type = page_id.page_type.to_string(); + let target_filename = format!("{0}.{1}", target_uuid, target_type); + + let mut max_file_count = 0; + let mut max_file_path = None; + + let mut files = read_dir(sub_path).await?; + while let Some(entry) = files.next_entry().await? { + let path = entry.path(); + let file_stem = match path.file_stem() { + Some(s) => Self::format_os_string(s), + None => { + continue; + } + }; + let file_ext = match path.extension() { + Some(s) => Self::format_os_string(s), + None => { + continue; + } + }; + if !file_stem.eq(&target_filename) { + continue; + } + let file_count = match file_ext.parse::() { + Ok(s) => s, + Err(_) => { + continue; + } + }; + + if file_count >= max_file_count { + max_file_count = file_count; + max_file_path = Some(path); + } + } + + match max_file_path { + Some(s) => Ok(Some((s, max_file_count))), + None => Ok(None), + } + } + + fn format_os_string(input: &OsStr) -> String { + input.to_ascii_lowercase().to_string_lossy().into_owned() + } +} + +#[derive(Debug, Error)] +pub enum FileManager2Error { + #[error(transparent)] + FileManager2Error(#[from] Arc), + #[error(transparent)] + FileOperationsError(#[from] FileOperationsError), + #[error("Incorrect page size of {0} on file {1} found. System cannot function")] + IncorrectPageSize(u64, PathBuf), + #[error(transparent)] + IOError(#[from] std::io::Error), + #[error("Need a directory to store the data. Got ({0}) may be stripped of non Unicode chars.")] + NeedDirectory(String), + #[error(transparent)] + TryFromIntError(#[from] TryFromIntError), +} + +#[cfg(test)] +mod tests { + use bytes::{Bytes, BytesMut}; + use tempfile::TempDir; + use uuid::Uuid; + + use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType}; + + use super::*; + + fn get_test_page(fill: u8) -> Bytes { + let mut test_page = BytesMut::with_capacity(PAGE_SIZE as usize); + let free_space = vec![fill; PAGE_SIZE as usize]; + test_page.extend_from_slice(&free_space); + test_page.freeze() + } + + #[tokio::test] + async fn test_roundtrips() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path(); + + let fm = FileManager2::new(tmp_dir.as_os_str().to_os_string())?; + + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; + + let test_page = get_test_page(1); + let test_po = fm.get_next_offset(&page_id).await?; + fm.add_page(&page_id, &test_po, test_page.clone()).await?; + + assert_eq!(test_po, PageOffset(0)); + + let test_page_get = fm.get_page(&page_id, &test_po).await?; + + assert_eq!(test_page, test_page_get); + + let test_page2 = get_test_page(2); + fm.update_page(&page_id, &test_po, test_page2.clone()) + .await?; + + let test_page_get2 = fm.get_page(&page_id, &test_po).await?; + + assert_eq!(test_page2, test_page_get2); + + let fm2 = FileManager2::new(tmp_dir.as_os_str().to_os_string())?; + let test_page3 = get_test_page(3); + let test_po3 = fm2.get_next_offset(&page_id).await?; + fm2.add_page(&page_id, &test_po3, test_page3.clone()) + .await?; + assert!(test_po3 > test_po); + + let test_page_get2 = fm2.get_page(&page_id, &test_po).await?; + + assert_eq!(test_page2, test_page_get2); + + Ok(()) + } +} diff --git a/src/engine/io/block_layer/file_manager/file_operations.rs b/src/engine/io/block_layer/file_operations.rs similarity index 86% rename from src/engine/io/block_layer/file_manager/file_operations.rs rename to src/engine/io/block_layer/file_operations.rs index 290a92f..615259e 100644 --- a/src/engine/io/block_layer/file_manager/file_operations.rs +++ b/src/engine/io/block_layer/file_operations.rs @@ -14,7 +14,7 @@ use tokio::{ }; use crate::constants::PAGE_SIZE; -use crate::engine::io::block_layer::file_manager::ResourceFormatter; +use crate::engine::io::block_layer::ResourceFormatter; use crate::engine::io::page_formats::{PageId, PageOffset}; pub struct FileOperations {} @@ -42,10 +42,10 @@ impl FileOperations { /// Note the File Handle AND PageOffset should point to where the add should occur /// If the file is larger than requested nothing is done. pub async fn add_chunk( - file: File, + file: &mut File, page_offset: &PageOffset, buffer: Bytes, - ) -> Result { + ) -> Result<(), FileOperationsError> { let metadata = file.metadata().await?; let chunk_size_u64 = u64::try_from(page_offset.get_file_chunk_size())?; @@ -53,7 +53,8 @@ impl FileOperations { file.set_len(chunk_size_u64).await?; } - Self::update_chunk(file, page_offset, buffer).await + Self::update_chunk(file, page_offset, buffer).await; + Ok(()) } //Makes the prefix folder so we don't fill up folders. Will consider more nesting eventually @@ -72,16 +73,19 @@ impl FileOperations { } pub async fn read_chunk( - mut file: File, + file: &mut File, page_offset: &PageOffset, - ) -> Result<(File, Option), FileOperationsError> { + ) -> Result { let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); let file_meta = file.metadata().await?; let file_len = file_meta.len(); if u64::try_from(page_offset.get_file_chunk_size())? > file_len { - return Ok((file, None)); + return Err(FileOperationsError::FileTooSmall( + page_offset.get_file_chunk_size(), + file_len, + )); } file.seek(SeekFrom::Start(u64::try_from(page_offset.get_file_seek())?)) @@ -94,14 +98,14 @@ impl FileOperations { } } - Ok((file, Some(buffer.freeze()))) + Ok(buffer.freeze()) } pub async fn update_chunk( - mut file: File, + file: &mut File, page_offset: &PageOffset, mut buffer: Bytes, - ) -> Result { + ) -> Result<(), FileOperationsError> { file.seek(SeekFrom::Start(u64::try_from(page_offset.get_file_seek())?)) .await?; @@ -109,7 +113,7 @@ impl FileOperations { //file.sync_all().await?; - Ok(file) + Ok(()) } } @@ -121,6 +125,8 @@ pub enum FileOperationsError { IOError(#[from] std::io::Error), #[error(transparent)] TryFromIntError(#[from] TryFromIntError), + #[error("File too small for requested read {0}, size is {1}")] + FileTooSmall(usize, u64), } #[cfg(test)] diff --git a/src/engine/io/block_layer/free_space_manager.rs b/src/engine/io/block_layer/free_space_manager.rs index 1431ac3..4b75883 100644 --- a/src/engine/io/block_layer/free_space_manager.rs +++ b/src/engine/io/block_layer/free_space_manager.rs @@ -4,20 +4,26 @@ use super::{ super::page_formats::{PageId, PageOffset, PageType}, - lock_cache_manager::{LockCacheManager, LockCacheManagerError}, + file_manager2::{FileManager2, FileManager2Error}, + lock_manager::{LockManager, LockManagerError}, }; use crate::constants::PAGE_SIZE; use bytes::{Buf, Bytes, BytesMut}; +use std::sync::Arc; use thiserror::Error; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct FreeSpaceManager { - lock_cache_manager: LockCacheManager, + file_manager: Arc, + lock_manager: LockManager, } impl FreeSpaceManager { - pub fn new(lock_cache_manager: LockCacheManager) -> FreeSpaceManager { - FreeSpaceManager { lock_cache_manager } + pub fn new(file_manager: Arc, lock_manager: LockManager) -> FreeSpaceManager { + FreeSpaceManager { + file_manager, + lock_manager, + } } pub async fn get_next_free_page( @@ -30,9 +36,10 @@ impl FreeSpaceManager { page_type: PageType::FreeSpaceMap, }; loop { - let page_handle = self.lock_cache_manager.get_page(free_id, &offset).await?; - match page_handle.as_ref() { - Some(s) => { + let page_lock = self.lock_manager.read(page_id, offset).await; + + match self.file_manager.get_page(&page_id, &offset).await { + Ok(s) => { let mut page_frozen = s.clone(); match Self::find_first_free_page_in_page(&mut page_frozen) { Some(s) => { @@ -46,26 +53,24 @@ impl FreeSpaceManager { } } } - None => { - drop(page_handle); - //Get the next offset, BUT since there could be a gap, we're going to blindly write all free - //and loop to get it again. - let next_po = self.lock_cache_manager.get_offset(free_id).await?; - let mut new_page_handle = self - .lock_cache_manager - .get_page_for_update(free_id, &next_po) - .await?; + Err(e) => { + // Create the next offset page and loop again as a test. + // Note: due to possible timing issues the next page might not be sequentially + // next so we will check again on the next loop + + drop(page_lock); + let next_offset = self.file_manager.get_next_offset(&page_id).await?; + let next_page_lock_cache = self.lock_manager.write(page_id, next_offset).await; let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); let new_page = vec![FreeStat::Free as u8; PAGE_SIZE as usize]; buffer.extend_from_slice(&new_page); - new_page_handle.replace(buffer.freeze()); - self.lock_cache_manager - .add_page(free_id, next_po, new_page_handle) + self.file_manager + .add_page(&page_id, &next_offset, buffer.freeze()) .await?; - continue; //No increment since we could be at a gap + offset += PageOffset(1); } } } @@ -81,21 +86,30 @@ impl FreeSpaceManager { resource_key: page_id.resource_key, page_type: PageType::FreeSpaceMap, }; - let (po, offset) = po.get_bitmask_offset(); - let mut page_handle = self - .lock_cache_manager - .get_page_for_update(free_id, &po) + let (fs_po, inner_offset) = po.get_bitmask_offset(); + let page_lock = self.lock_manager.write(page_id, fs_po).await; + + let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + buffer.extend_from_slice( + &self + .file_manager + .get_page(&page_id, &fs_po) + .await + .unwrap_or_else({ + |_| { + let free = vec![FreeStat::Free as u8; PAGE_SIZE as usize]; + Bytes::copy_from_slice(&free) + } + }), + ); + + Self::set_status_inside_page(&mut buffer, inner_offset, status); + + self.file_manager + .add_page(&page_id, &fs_po, buffer.freeze()) .await?; - let page = page_handle - .as_mut() - .ok_or(FreeSpaceManagerError::PageDoesNotExist(page_id))?; - let new_page = Self::set_status_inside_page(page, offset, status); - page_handle.replace(new_page); - - Ok(self - .lock_cache_manager - .update_page(free_id, po, page_handle) - .await?) + + Ok(()) } fn find_first_free_page_in_page(buffer: &mut impl Buf) -> Option { @@ -119,9 +133,7 @@ impl FreeSpaceManager { /// Sets the status of a field inside a page, you MUST pass an offset /// that fits in the buffer. - fn set_status_inside_page(src: &Bytes, offset: usize, status: FreeStat) -> Bytes { - let mut buffer = BytesMut::with_capacity(src.len()); - buffer.extend_from_slice(&src[..]); + fn set_status_inside_page(buffer: &mut BytesMut, offset: usize, status: FreeStat) { let offset_index = offset / 8; let offset_subindex = offset % 8; @@ -139,8 +151,6 @@ impl FreeSpaceManager { } buffer[offset_index] = new_value; - - buffer.freeze() } } @@ -153,27 +163,26 @@ pub enum FreeStat { #[derive(Debug, Error)] pub enum FreeSpaceManagerError { #[error(transparent)] - LockCacheManagerError(#[from] LockCacheManagerError), + FileManager2Error(#[from] FileManager2Error), + #[error(transparent)] + LockManagerError(#[from] LockManagerError), #[error("Page Offset {0} doesn't exist")] PageDoesNotExist(PageId), } #[cfg(test)] mod tests { - use std::sync::Arc; - use bytes::BufMut; + use std::sync::Arc; use tempfile::TempDir; use uuid::Uuid; - use crate::engine::io::block_layer::file_manager::FileManager; - use super::*; /// Gets the status of a field inside a page, you MUST pass an offset /// that fits in the buffer. //This was in the implementation, I just only needed it for unit tests - fn get_status_inside_page(buffer: &Bytes, offset: usize) -> FreeStat { + fn get_status_inside_page(buffer: &BytesMut, offset: usize) -> FreeStat { let offset_index = offset / 8; let offset_subindex = offset % 8; @@ -192,13 +201,11 @@ mod tests { let mut test = BytesMut::with_capacity(2); test.put_u16(0x0); - let mut test = test.freeze(); - for i in 0..test.len() * 8 { assert_eq!(get_status_inside_page(&test, i), FreeStat::Free); - test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::InUse); + FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse); assert_eq!(get_status_inside_page(&test, i), FreeStat::InUse); - test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::Free); + FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::Free); assert_eq!(get_status_inside_page(&test, i), FreeStat::Free); } @@ -211,13 +218,11 @@ mod tests { test.put_u8(0x0); test.put_u8(0x0); - let mut test = test.freeze(); - for i in 0..test.len() * 8 { let free_page = FreeSpaceManager::find_first_free_page_in_page(&mut test.clone()); assert_eq!(free_page, Some(i)); - test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::InUse); + FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse); } assert_eq!( FreeSpaceManager::find_first_free_page_in_page(&mut test), @@ -232,9 +237,9 @@ mod tests { let tmp = TempDir::new()?; let tmp_dir = tmp.path().as_os_str().to_os_string(); - let fm = Arc::new(FileManager::new(tmp_dir)?); - let lm = LockCacheManager::new(fm); - let fsm = FreeSpaceManager::new(lm); + let fm = Arc::new(FileManager2::new(tmp_dir)?); + let lm = LockManager::new(); + let fsm = FreeSpaceManager::new(fm, lm); let page_id = PageId { resource_key: Uuid::new_v4(), diff --git a/src/engine/io/block_layer/lock_manager.rs b/src/engine/io/block_layer/lock_manager.rs index 46d4301..e547de0 100644 --- a/src/engine/io/block_layer/lock_manager.rs +++ b/src/engine/io/block_layer/lock_manager.rs @@ -1,10 +1,9 @@ use std::sync::Arc; -use super::file_manager::{FileManager, FileManagerError}; use crate::engine::io::page_formats::{PageId, PageOffset}; use moka::future::Cache; use thiserror::Error; -use tokio::sync::RwLock; +use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; /// The LockManager is used for cooperative access to pages in the system. /// @@ -16,45 +15,60 @@ use tokio::sync::RwLock; #[derive(Clone)] pub struct LockManager { - file_manager: Arc, - locks: Cache<(PageId, PageOffset), Arc>>, + locks: Cache<(PageId, PageOffset), Arc>>, } impl LockManager { - pub fn new(file_manager: Arc) -> LockManager { + pub fn new() -> LockManager { LockManager { - file_manager, locks: Cache::new(1000), } } - pub async fn get_offset(&self, page_id: PageId) -> Result { - Ok(self.file_manager.get_offset(&page_id).await?) + async fn get_lock( + &self, + page_id: PageId, + offset: PageOffset, + ) -> Arc> { + self.locks + .get_or_insert_with((page_id, offset), async move { + Arc::new(RwLock::const_new((page_id, offset))) + }) + .await } - pub async fn get_offset_non_zero( + pub async fn read( &self, page_id: PageId, - ) -> Result { - let mut offset = PageOffset(0); - while offset == PageOffset(0) { - offset = self.file_manager.get_offset(&page_id).await?; - } - Ok(offset) + offset: PageOffset, + ) -> OwnedRwLockReadGuard<(PageId, PageOffset)> { + self.get_lock(page_id, offset).await.read_owned().await } - pub async fn get_lock(&self, page_id: PageId, offset: PageOffset) -> Arc> { - self.locks - .get_or_insert_with( - (page_id, offset), - async move { Arc::new(RwLock::const_new(())) }, - ) - .await + pub async fn write( + &self, + page_id: PageId, + offset: PageOffset, + ) -> OwnedRwLockWriteGuard<(PageId, PageOffset)> { + self.get_lock(page_id, offset).await.write_owned().await } } #[derive(Debug, Error)] -pub enum LockManagerError { - #[error(transparent)] - FileManagerError(#[from] FileManagerError), +pub enum LockManagerError {} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn test_locking() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path().as_os_str().to_os_string(); + + //todo!("Figure out the new model"); + + Ok(()) + } } diff --git a/src/engine/io/block_layer/file_manager/resource_formatter.rs b/src/engine/io/block_layer/resource_formatter.rs similarity index 100% rename from src/engine/io/block_layer/file_manager/resource_formatter.rs rename to src/engine/io/block_layer/resource_formatter.rs diff --git a/src/engine/io/format_traits/serializable.rs b/src/engine/io/format_traits/serializable.rs index 8f83380..4741644 100644 --- a/src/engine/io/format_traits/serializable.rs +++ b/src/engine/io/format_traits/serializable.rs @@ -1,7 +1,6 @@ //! Serializes a given struct to a given ByteMut use bytes::{BufMut, Bytes, BytesMut}; -use tokio::sync::OwnedRwLockWriteGuard; use crate::constants::PAGE_SIZE; @@ -10,7 +9,7 @@ pub trait Serializable { fn serialize(&self, buffer: &mut impl BufMut); /// Produces a new page to support the change to how the I/O subsystem works - fn serialize_and_pad(&self, buffer: &mut OwnedRwLockWriteGuard>) { + fn serialize_and_pad(&self) -> Bytes { let mut page = BytesMut::with_capacity(PAGE_SIZE as usize); self.serialize(&mut page); @@ -19,16 +18,13 @@ pub trait Serializable { page.extend_from_slice(&padding); } - buffer.replace(page.freeze()); + page.freeze() } } #[cfg(test)] mod tests { - use std::sync::Arc; - use bytes::Buf; - use tokio::sync::RwLock; use super::*; @@ -45,20 +41,10 @@ mod tests { async fn test_roundtrip() -> Result<(), Box> { let test = Test { inner: 2000 }; - let page_lock = Arc::new(RwLock::new(None)); - let mut guard = page_lock.clone().write_owned().await; - - test.serialize_and_pad(&mut guard); - drop(guard); + let mut buffer = test.serialize_and_pad(); - let page = page_lock.read_owned().await; - if let Some(s) = page.as_ref() { - let mut s = s.clone(); - assert_eq!(s.len(), PAGE_SIZE as usize); - assert_eq!(test.inner, s.get_u32_le()); - } else { - panic!("None found!"); - } + assert_eq!(buffer.len(), PAGE_SIZE as usize); + assert_eq!(test.inner, buffer.get_u32_le()); Ok(()) } diff --git a/src/engine/io/page_formats/page_id.rs b/src/engine/io/page_formats/page_id.rs index 187da1c..ab7cb99 100644 --- a/src/engine/io/page_formats/page_id.rs +++ b/src/engine/io/page_formats/page_id.rs @@ -1,6 +1,5 @@ //! A struct to uniquely identify a page in all operations. This replaces adding additional arguments everywhere. -use crate::engine::io::block_layer::file_manager::ResourceFormatter; use nom::{ bytes::complete::tag_no_case, error::{convert_error, make_error, ContextError, ErrorKind, ParseError, VerboseError}, @@ -13,6 +12,8 @@ use std::{ use thiserror::Error; use uuid::Uuid; +use crate::engine::io::block_layer::ResourceFormatter; + #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub struct PageId { pub resource_key: Uuid, diff --git a/src/engine/io/row_manager.rs b/src/engine/io/row_manager.rs index f3c8540..fe5fb80 100644 --- a/src/engine/io/row_manager.rs +++ b/src/engine/io/row_manager.rs @@ -1,8 +1,8 @@ use super::super::objects::Table; use super::super::transactions::TransactionId; -use super::block_layer::file_manager::FileManagerError; +use super::block_layer::file_manager2::FileManager2; use super::block_layer::free_space_manager::{FreeSpaceManager, FreeSpaceManagerError, FreeStat}; -use super::block_layer::lock_cache_manager::{LockCacheManager, LockCacheManagerError}; +use super::block_layer::lock_manager::LockManager; use super::format_traits::Serializable; use super::page_formats::{PageData, PageDataError, PageId, PageOffset, PageType, UInt12}; use super::row_formats::{ItemPointer, RowData, RowDataError}; @@ -16,17 +16,23 @@ use thiserror::Error; /// The row manager is a mapper between rows and pages on disk. /// /// It operates at the lowest level, no visibility checks are done. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct RowManager { + file_manager: Arc, free_space_manager: FreeSpaceManager, - lock_cache_manager: LockCacheManager, + lock_manager: LockManager, } impl RowManager { - pub fn new(lock_cache_manager: LockCacheManager) -> RowManager { + pub fn new( + file_manager: Arc, + free_space_manager: FreeSpaceManager, + lock_manager: LockManager, + ) -> RowManager { RowManager { - free_space_manager: FreeSpaceManager::new(lock_cache_manager.clone()), - lock_cache_manager, + file_manager, + free_space_manager, + lock_manager, } } @@ -52,14 +58,13 @@ impl RowManager { resource_key: table.id, page_type: PageType::Data, }; - let mut page_handle = self - .lock_cache_manager - .get_page_for_update(page_id, &row_pointer.page) + + let page_lock = self.lock_manager.write(page_id, row_pointer.page).await; + + let page_buffer = self + .file_manager + .get_page(&page_id, &row_pointer.page) .await?; - let page_buffer = page_handle - .as_mut() - .ok_or(RowManagerError::NonExistentPage(row_pointer.page))? - .clone(); let mut page = PageData::parse(table, row_pointer.page, &page_buffer)?; let mut row = page @@ -80,10 +85,10 @@ impl RowManager { row.max = Some(current_tran_id); page.update(row, row_pointer.count)?; - page.serialize_and_pad(&mut page_handle); + let new_page = page.serialize_and_pad(); - self.lock_cache_manager - .update_page(page_id, row_pointer.page, page_handle) + self.file_manager + .update_page(&page_id, &row_pointer.page, new_page) .await?; Ok(()) @@ -102,14 +107,13 @@ impl RowManager { resource_key: table.id, page_type: PageType::Data, }; - let mut old_page_handle = self - .lock_cache_manager - .get_page_for_update(page_id, &row_pointer.page) + + let page_lock = self.lock_manager.write(page_id, row_pointer.page).await; + + let mut old_page_buffer = self + .file_manager + .get_page(&page_id, &row_pointer.page) .await?; - let old_page_buffer = old_page_handle - .as_mut() - .ok_or(RowManagerError::NonExistentPage(row_pointer.page))? - .clone(); let mut old_page = PageData::parse(table.clone(), row_pointer.page, &old_page_buffer)?; @@ -163,6 +167,9 @@ impl RowManager { page_type: PageType::Data, }; + let page_lock_cache = self.lock_manager.get_lock(page_id, row_pointer.page).await; + let page_lock = page_lock_cache.read().await; + let page_handle = self .lock_cache_manager .get_page(page_id, &row_pointer.page)