From 12f569038b2d04e8d27727b8472b7dea6a22d17f Mon Sep 17 00:00:00 2001 From: Christopher Hotchkiss Date: Sat, 21 Aug 2021 18:37:59 -0400 Subject: [PATCH] New lock and cache layer has been implemented for #17, need to hook it up now. --- src/engine/io.rs | 4 + src/engine/io/file_manager.rs | 5 +- src/engine/io/file_manager/file_executor.rs | 1 - src/engine/io/free_space_manager.rs | 2 +- src/engine/io/index_manager.rs | 2 +- src/engine/io/lock_cache_manager.rs | 130 ++++++++++++++++++++ src/engine/io/lock_manager.rs | 6 +- src/engine/io/page_cache_manager.rs | 1 - src/engine/io/row_manager.rs | 6 +- 9 files changed, 145 insertions(+), 12 deletions(-) create mode 100644 src/engine/io/lock_cache_manager.rs delete mode 100644 src/engine/io/page_cache_manager.rs diff --git a/src/engine/io.rs b/src/engine/io.rs index e8001ad..b1265a4 100644 --- a/src/engine/io.rs +++ b/src/engine/io.rs @@ -19,6 +19,10 @@ pub use file_manager::FileManagerError; mod free_space_manager; pub use free_space_manager::FreeSpaceManager; +mod lock_cache_manager; +pub use lock_cache_manager::LockCacheManager; +pub use lock_cache_manager::LockCacheManagerError; + mod lock_manager; pub use lock_manager::LockManager; diff --git a/src/engine/io/file_manager.rs b/src/engine/io/file_manager.rs index 03f0700..a3d4844 100644 --- a/src/engine/io/file_manager.rs +++ b/src/engine/io/file_manager.rs @@ -84,6 +84,7 @@ impl FileManager { Ok(res_receiver.await??) } + //TODO I'm chewing on if this stream implementation is way too low level especially considering locking pub fn get_stream( &self, page_id: &PageId, @@ -111,8 +112,8 @@ impl FileManager { pub async fn update_page( &self, page_id: &PageId, - page: Bytes, offset: &PageOffset, + page: Bytes, ) -> Result<(), FileManagerError> { let size = UInt12::try_from(page.len() - 1)?; if size != UInt12::max() { @@ -213,7 +214,7 @@ mod tests { assert_eq!(test_page, test_page_get); let test_page2 = get_test_page(2); - fm.update_page(&page_id, test_page2.clone(), &test_page_num) + fm.update_page(&page_id, &test_page_num, test_page2.clone()) .await?; let test_page_get2 = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num)) diff --git a/src/engine/io/file_manager/file_executor.rs b/src/engine/io/file_manager/file_executor.rs index cb6be71..628f188 100644 --- a/src/engine/io/file_manager/file_executor.rs +++ b/src/engine/io/file_manager/file_executor.rs @@ -138,7 +138,6 @@ impl FileExecutor { } } - //Still haven't figured out incrementing file_handles if file_handles_open < MAX_FILE_HANDLE_COUNT && !request_queue.is_empty() { let mut new_request_queue = VecDeque::with_capacity(request_queue.len()); for (page_id, req_type) in request_queue.into_iter() { diff --git a/src/engine/io/free_space_manager.rs b/src/engine/io/free_space_manager.rs index f07a641..92fbb87 100644 --- a/src/engine/io/free_space_manager.rs +++ b/src/engine/io/free_space_manager.rs @@ -82,7 +82,7 @@ impl FreeSpaceManager { Some(mut page) => { Self::set_status_inside_page(&mut page, offset, status); self.file_manager - .update_page(&page_id, page.freeze(), &po) + .update_page(&page_id, &po, page.freeze()) .await?; Ok(()) } diff --git a/src/engine/io/index_manager.rs b/src/engine/io/index_manager.rs index 318b221..8a5d9d7 100644 --- a/src/engine/io/index_manager.rs +++ b/src/engine/io/index_manager.rs @@ -96,7 +96,7 @@ impl IndexManager { if l.can_fit(&new_key) { return Ok(self .file_manager - .update_page(&page_id, l.serialize()?, ¤t_node.1) + .update_page(&page_id, ¤t_node.1, l.serialize()?) .await?); } diff --git a/src/engine/io/lock_cache_manager.rs b/src/engine/io/lock_cache_manager.rs new file mode 100644 index 0000000..e60c424 --- /dev/null +++ b/src/engine/io/lock_cache_manager.rs @@ -0,0 +1,130 @@ +use std::{ + collections::{HashMap, VecDeque}, + ptr::read, + sync::Arc, +}; + +use super::{ + page_formats::{PageId, PageOffset}, + FileManager, FileManagerError, +}; +use bytes::{Bytes, BytesMut}; +use lru::LruCache; +use thiserror::Error; +use tokio::sync::{Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; + +//How do I support readers and writers? I want to use RwLocks but I need a way to stop unbounded +//Lrucache growth +pub struct LockCacheManager { + //TODO I don't like these massive single hashes protected with a single lock + // Long term I need to make a fixed hashmap and evict them myself. + // Holding on this since I might be able to work around it + cache: Arc>>>>>, + file_manager: FileManager, +} + +impl LockCacheManager { + pub fn new(file_manager: FileManager) -> LockCacheManager { + LockCacheManager { + //The unbounded nature of the cache worries me. I think I'll have to manage its eviction carefully + cache: Arc::new(Mutex::new(LruCache::unbounded())), + file_manager, + } + } + + pub async fn get_page( + &self, + page_id: PageId, + offset: PageOffset, + ) -> Result>, LockCacheManagerError> { + Ok(self + .get_page_internal(page_id, offset) + .await? + .read_owned() + .await) + } + + pub async fn get_page_for_update( + &self, + page_id: PageId, + offset: PageOffset, + ) -> Result>, LockCacheManagerError> { + Ok(self + .get_page_internal(page_id, offset) + .await? + .write_owned() + .await) + } + + async fn get_page_internal( + &self, + page_id: PageId, + offset: PageOffset, + ) -> Result>>, LockCacheManagerError> { + let mut cache = self.cache.lock().await; + match cache.get(&(page_id, offset)) { + Some(s) => return Ok(s.clone()), + None => { + //Cache miss, let's make the RwLock and drop the mutex + let page_lock = Arc::new(RwLock::new(None)); + let mut page_lock_write = page_lock.write().await; + cache.put((page_id, offset), page_lock.clone()); + drop(cache); + + //Now we can load the underlying page without blocking everyone + match self.file_manager.get_page(&page_id, &offset).await? { + Some(s) => { + page_lock_write.replace(s); + } + None => {} + }; + drop(page_lock_write); + + Ok(page_lock) + } + } + } + + pub async fn update_page( + &self, + page_id: PageId, + offset: PageOffset, + guard: OwnedRwLockWriteGuard>, + ) -> Result<(), LockCacheManagerError> { + let page = match guard.as_ref() { + Some(s) => s.clone(), + None => { + return Err(LockCacheManagerError::PageMissing()); + } + }; + Ok(self + .file_manager + .update_page(&page_id, &offset, page.freeze()) + .await?) + } + + //TODO Need to figure how to lock for add, without blocking everyone + //for now doing the naive implementation since I can just hold the mutex during the add + //would be easily solved if I disconnect offset generation from the I/O + pub async fn add_page( + &self, + page_id: PageId, + page: Bytes, + ) -> Result { + let mut cache = self.cache.lock().await; + let po = self.file_manager.add_page(&page_id, page.clone()).await?; + + let mut new_page = BytesMut::with_capacity(page.len()); + new_page.copy_from_slice(&page.slice(0..page.len())); + cache.put((page_id, po), Arc::new(RwLock::new(Some(new_page)))); + Ok(po) + } +} + +#[derive(Debug, Error)] +pub enum LockCacheManagerError { + #[error(transparent)] + FileManagerError(#[from] FileManagerError), + #[error("Cannot update a page without contents")] + PageMissing(), +} diff --git a/src/engine/io/lock_manager.rs b/src/engine/io/lock_manager.rs index 9d0b568..0dd856e 100644 --- a/src/engine/io/lock_manager.rs +++ b/src/engine/io/lock_manager.rs @@ -64,7 +64,7 @@ impl LockManager { self.cleanup(&mut lm).await; - lock + lock.clone() } async fn cleanup( @@ -111,7 +111,7 @@ impl LockManagerEntry { match le.get(offset) { Some(s) => match s.upgrade() { - Some(s1) => s1, + Some(s1) => s1.clone(), None => { //Weak failed, need to recreate drop(le); @@ -155,7 +155,7 @@ impl LockManagerEntry { }; self.cleanup(&mut le); - lock + lock.clone() } fn cleanup(&self, le: &mut RwLockWriteGuard>>>) { diff --git a/src/engine/io/page_cache_manager.rs b/src/engine/io/page_cache_manager.rs deleted file mode 100644 index 725d0e6..0000000 --- a/src/engine/io/page_cache_manager.rs +++ /dev/null @@ -1 +0,0 @@ -//! This is a simple cache implementation to avoid having to re-parse pages non-stop diff --git a/src/engine/io/row_manager.rs b/src/engine/io/row_manager.rs index 818d550..2829b8d 100644 --- a/src/engine/io/row_manager.rs +++ b/src/engine/io/row_manager.rs @@ -63,7 +63,7 @@ impl RowManager { page_type: PageType::Data, }; self.file_manager - .update_page(&page_id, page.serialize(), &row_pointer.page) + .update_page(&page_id, &row_pointer.page, page.serialize()) .await?; Ok(()) } @@ -107,7 +107,7 @@ impl RowManager { page_type: PageType::Data, }; self.file_manager - .update_page(&page_id, old_page.serialize(), &row_pointer.page) + .update_page(&page_id, &row_pointer.page, old_page.serialize()) .await?; Ok(new_row_pointer) @@ -192,7 +192,7 @@ impl RowManager { let new_row_pointer = page.insert(current_tran_id, &table, user_data)?; let new_page_bytes = page.serialize(); self.file_manager - .update_page(&page_id, new_page_bytes, &page_num) + .update_page(&page_id, &page_num, new_page_bytes) .await?; return Ok(new_row_pointer); } else {