From a53cf6966328550550fd4d577b3ba011596668ec Mon Sep 17 00:00:00 2001 From: Christopher Hotchkiss Date: Sun, 22 Aug 2021 16:55:12 -0400 Subject: [PATCH] Fixed #17 and #23 by implementing a caching/locking layer. --- benches/feophant_benchmark.rs | 5 +- src/engine.rs | 9 +- src/engine/analyzer/definition_lookup.rs | 8 +- src/engine/io/constraint_manager.rs | 2 +- src/engine/io/file_manager.rs | 70 +++----- src/engine/io/file_manager/file_executor.rs | 81 +++++---- src/engine/io/file_manager/file_operations.rs | 9 +- src/engine/io/file_manager/request_type.rs | 3 +- src/engine/io/free_space_manager.rs | 89 ++++++---- src/engine/io/index_manager.rs | 27 +-- src/engine/io/lock_cache_manager.rs | 47 +++--- src/engine/io/page_formats/page_data.rs | 41 ++--- src/engine/io/row_manager.rs | 159 ++++++++++++------ src/engine/io/visible_row_manager.rs | 6 +- tests/visibility_tests.rs | 9 +- 15 files changed, 306 insertions(+), 259 deletions(-) diff --git a/benches/feophant_benchmark.rs b/benches/feophant_benchmark.rs index d88cc30..9225796 100644 --- a/benches/feophant_benchmark.rs +++ b/benches/feophant_benchmark.rs @@ -8,6 +8,7 @@ use criterion::{criterion_group, criterion_main}; use feophantlib::constants::Nullable; use feophantlib::engine::io::row_formats::RowData; use feophantlib::engine::io::FileManager; +use feophantlib::engine::io::LockCacheManager; use feophantlib::engine::io::LockManager; use feophantlib::engine::io::RowManager; use feophantlib::engine::objects::types::BaseSqlTypes; @@ -63,7 +64,7 @@ async fn row_manager_mass_insert(row_count: usize) -> Result<(), Box Result<(), Box = rm diff --git a/src/engine.rs b/src/engine.rs index e76e80b..b886c2b 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -29,7 +29,7 @@ use transactions::{TransactionId, TransactionManager}; use self::io::ConstraintManager; use self::io::FileManager; -use self::io::LockManager; +use self::io::LockCacheManager; use self::objects::QueryResult; use std::ops::Deref; use std::sync::Arc; @@ -44,9 +44,10 @@ pub struct Engine { impl Engine { pub fn new(file_manager: Arc, tran_manager: TransactionManager) -> Engine { - let lock_man = LockManager::new(); - let vis_row_man = - VisibleRowManager::new(RowManager::new(file_manager, lock_man), tran_manager); + let vis_row_man = VisibleRowManager::new( + RowManager::new(LockCacheManager::new(file_manager)), + tran_manager, + ); let con_man = ConstraintManager::new(vis_row_man.clone()); Engine { analyzer: Analyzer::new(vis_row_man), diff --git a/src/engine/analyzer/definition_lookup.rs b/src/engine/analyzer/definition_lookup.rs index 40f3b22..50dc9ad 100644 --- a/src/engine/analyzer/definition_lookup.rs +++ b/src/engine/analyzer/definition_lookup.rs @@ -190,7 +190,7 @@ pub enum DefinitionLookupError { mod tests { use tempfile::TempDir; - use crate::engine::io::{FileManager, LockManager}; + use crate::engine::io::{FileManager, LockCacheManager}; // Note this useful idiom: importing names from outer (for mod tests) scope. use super::super::super::io::RowManager; @@ -205,7 +205,7 @@ mod tests { let fm = Arc::new(FileManager::new(tmp_dir)?); let tm = TransactionManager::new(); - let rm = RowManager::new(fm, LockManager::new()); + let rm = RowManager::new(LockCacheManager::new(fm)); let vm = VisibleRowManager::new(rm, tm); let dl = DefinitionLookup::new(vm); @@ -224,7 +224,7 @@ mod tests { let fm = Arc::new(FileManager::new(tmp_dir)?); let tm = TransactionManager::new(); - let rm = RowManager::new(fm, LockManager::new()); + let rm = RowManager::new(LockCacheManager::new(fm)); let vm = VisibleRowManager::new(rm, tm); let dl = DefinitionLookup::new(vm); @@ -248,7 +248,7 @@ mod tests { let fm = Arc::new(FileManager::new(tmp_dir)?); let mut tm = TransactionManager::new(); - let rm = RowManager::new(fm.clone(), LockManager::new()); + let rm = RowManager::new(LockCacheManager::new(fm.clone())); let vm = VisibleRowManager::new(rm.clone(), tm.clone()); let dl = DefinitionLookup::new(vm); let mut engine = Engine::new(fm, tm.clone()); diff --git a/src/engine/io/constraint_manager.rs b/src/engine/io/constraint_manager.rs index a77ad08..45483f2 100644 --- a/src/engine/io/constraint_manager.rs +++ b/src/engine/io/constraint_manager.rs @@ -75,7 +75,7 @@ impl ConstraintManager { tran_id: TransactionId, table: Arc, row_pointer: ItemPointer, - ) -> Result<(PageData, RowData), ConstraintManagerError> { + ) -> Result { Ok(self.vis_row_man.get(tran_id, table, row_pointer).await?) } diff --git a/src/engine/io/file_manager.rs b/src/engine/io/file_manager.rs index a3d4844..02909f2 100644 --- a/src/engine/io/file_manager.rs +++ b/src/engine/io/file_manager.rs @@ -53,11 +53,21 @@ impl FileManager { Ok(rev_shutdown.await?) } + pub async fn get_offset(&self, page_id: &PageId) -> Result { + let (res_request, res_receiver) = oneshot::channel(); + + self.request_queue + .send((*page_id, RequestType::GetOffset(res_request)))?; + + Ok(res_receiver.await??) + } + pub async fn add_page( &self, page_id: &PageId, + offset: &PageOffset, page: Bytes, - ) -> Result { + ) -> Result<(), FileManagerError> { let size = UInt12::try_from(page.len() - 1)?; if size != UInt12::max() { return Err(FileManagerError::InvalidPageSize(page.len())); @@ -66,7 +76,7 @@ impl FileManager { let (res_request, res_receiver) = oneshot::channel(); self.request_queue - .send((*page_id, RequestType::Add((page, res_request))))?; + .send((*page_id, RequestType::Add((*offset, page, res_request))))?; Ok(res_receiver.await??) } @@ -84,31 +94,6 @@ impl FileManager { Ok(res_receiver.await??) } - //TODO I'm chewing on if this stream implementation is way too low level especially considering locking - pub fn get_stream( - &self, - page_id: &PageId, - ) -> impl Stream, FileManagerError>> { - let request_queue = self.request_queue.clone(); - let page_id = *page_id; - - try_stream! { - let mut page_num = PageOffset(0); - loop { - let (res_request, res_receiver) = oneshot::channel(); - - request_queue - .send((page_id, RequestType::Read((page_num, res_request))))?; - - let page = res_receiver.await??; - - yield page; - - page_num += PageOffset(1); - } - } - } - pub async fn update_page( &self, page_id: &PageId, @@ -134,7 +119,7 @@ impl Drop for FileManager { if !self.request_queue.is_closed() { return; } - error!("File Manager wasn't shutdown cleanly!"); + error!("File Manager wasn't shutdown cleanly! This is a bug, please report!"); } } @@ -172,11 +157,8 @@ pub enum FileManagerError { #[cfg(test)] mod tests { - use std::time::Duration; - use bytes::BytesMut; use tempfile::TempDir; - use tokio::time::timeout; use uuid::Uuid; use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType}; @@ -203,23 +185,20 @@ mod tests { }; let test_page = get_test_page(1); - let test_page_num = fm.add_page(&page_id, test_page.clone()).await?; + let test_po = fm.get_offset(&page_id).await?; + fm.add_page(&page_id, &test_po, test_page.clone()).await?; - assert_eq!(test_page_num, PageOffset(0)); + assert_eq!(test_po, PageOffset(0)); - let test_page_get = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num)) - .await?? - .unwrap(); + let test_page_get = fm.get_page(&page_id, &test_po).await?.unwrap(); assert_eq!(test_page, test_page_get); let test_page2 = get_test_page(2); - fm.update_page(&page_id, &test_page_num, test_page2.clone()) + fm.update_page(&page_id, &test_po, test_page2.clone()) .await?; - let test_page_get2 = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num)) - .await?? - .unwrap(); + let test_page_get2 = fm.get_page(&page_id, &test_po).await?.unwrap(); assert_eq!(test_page2, test_page_get2); @@ -227,13 +206,12 @@ mod tests { let fm2 = FileManager::new(tmp_dir.as_os_str().to_os_string())?; let test_page3 = get_test_page(3); - let test_page_num3 = fm2.add_page(&page_id, test_page3.clone()).await?; - println!("{0}", test_page_num3); - assert!(test_page_num3 > test_page_num); + let test_po3 = fm2.get_offset(&page_id).await?; + fm2.add_page(&page_id, &test_po3, test_page3.clone()) + .await?; + assert!(test_po3 > test_po); - let test_page_get2 = timeout(Duration::new(10, 0), fm2.get_page(&page_id, &test_page_num)) - .await?? - .unwrap(); + let test_page_get2 = fm2.get_page(&page_id, &test_po).await?.unwrap(); assert_eq!(test_page2, test_page_get2); diff --git a/src/engine/io/file_manager/file_executor.rs b/src/engine/io/file_manager/file_executor.rs index 628f188..14d4d83 100644 --- a/src/engine/io/file_manager/file_executor.rs +++ b/src/engine/io/file_manager/file_executor.rs @@ -142,38 +142,29 @@ impl FileExecutor { let mut new_request_queue = VecDeque::with_capacity(request_queue.len()); for (page_id, req_type) in request_queue.into_iter() { match req_type { - RequestType::Add((a, response)) => { - let next_po = match self.get_next_po(&page_id).await { - Ok(po) => po, - Err(e) => { - let _ = response.send(Err(e)); - continue; - } - }; - - match file_handle_cache.pop(&(page_id, next_po.get_file_number())) { + RequestType::Add((po, a, response)) => { + match file_handle_cache.pop(&(page_id, po.get_file_number())) { Some(maybe_file) => match maybe_file { Some(file) => { file_handle_cache - .put((page_id, next_po.get_file_number()), None); + .put((page_id, po.get_file_number()), None); let file_handle_ret = send_completed.clone(); tokio::spawn(async move { let response_f = response; - match FileOperations::add_chunk(file, &next_po, a).await - { + match FileOperations::add_chunk(file, &po, a).await { Ok(o) => { let _ = file_handle_ret.send(( page_id, - next_po.get_file_number(), + po.get_file_number(), Some(o), )); - let _ = response_f.send(Ok(next_po)); + let _ = response_f.send(Ok(())); } Err(e) => { let _ = file_handle_ret.send(( page_id, - next_po.get_file_number(), + po.get_file_number(), None, )); let _ = response_f.send(Err( @@ -186,16 +177,17 @@ impl FileExecutor { None => { //Request in flight, skip for now, but have to reinsert into cache file_handle_cache - .put((page_id, next_po.get_file_number()), None); + .put((page_id, po.get_file_number()), None); - new_request_queue - .push_back((page_id, RequestType::Add((a, response)))); + new_request_queue.push_back(( + page_id, + RequestType::Add((po, a, response)), + )); continue; } }, None => { - file_handle_cache - .put((page_id, next_po.get_file_number()), None); + file_handle_cache.put((page_id, po.get_file_number()), None); file_handles_open = file_handles_open.saturating_add(1); let data_dir = self.data_dir.clone(); let file_handle_ret = send_completed.clone(); @@ -205,7 +197,7 @@ impl FileExecutor { let file = match FileOperations::open_path( &data_dir, &page_id, - next_po.get_file_number(), + po.get_file_number(), ) .await { @@ -213,7 +205,7 @@ impl FileExecutor { Err(e) => { let _ = file_handle_ret.send(( page_id, - next_po.get_file_number(), + po.get_file_number(), None, )); let _ = response_f.send(Err( @@ -223,19 +215,19 @@ impl FileExecutor { } }; - match FileOperations::add_chunk(file, &next_po, a).await { + match FileOperations::add_chunk(file, &po, a).await { Ok(o) => { let _ = file_handle_ret.send(( page_id, - next_po.get_file_number(), + po.get_file_number(), Some(o), )); - let _ = response_f.send(Ok(next_po)); + let _ = response_f.send(Ok(())); } Err(e) => { let _ = file_handle_ret.send(( page_id, - next_po.get_file_number(), + po.get_file_number(), None, )); let _ = response_f.send(Err( @@ -435,6 +427,17 @@ impl FileExecutor { } } } + RequestType::GetOffset(response) => { + match self.get_next_po(&page_id).await { + Ok(po) => { + let _ = response.send(Ok(po)); + } + Err(e) => { + let _ = response.send(Err(e)); + continue; + } + }; + } } } request_queue = new_request_queue; @@ -592,10 +595,8 @@ pub enum FileExecutorError { #[cfg(test)] mod tests { - use std::time::Duration; - use tempfile::TempDir; - use tokio::{io::AsyncWriteExt, time::timeout}; + use tokio::io::AsyncWriteExt; use uuid::Uuid; use crate::{ @@ -637,11 +638,12 @@ mod tests { let fm = FileManager::new(tmp_dir.as_os_str().to_os_string())?; let test_page = get_test_page(2); - let test_page_num = fm.add_page(&page_id, test_page.clone()).await?; + let test_po = fm.get_offset(&page_id).await?; + fm.add_page(&page_id, &test_po, test_page.clone()).await?; - assert_eq!(test_page_num, PageOffset(2)); + assert_eq!(test_po, PageOffset(2)); - let test_page_get = fm.get_page(&page_id, &test_page_num).await?.unwrap(); + let test_page_get = fm.get_page(&page_id, &test_po).await?.unwrap(); assert_eq!(test_page, test_page_get); @@ -680,17 +682,12 @@ mod tests { let fm = FileManager::new(tmp_dir.as_os_str().to_os_string())?; let test_page = get_test_page(2); - let test_page_num = timeout( - Duration::new(10, 0), - fm.add_page(&page_id, test_page.clone()), - ) - .await??; + let test_po = fm.get_offset(&page_id).await?; + fm.add_page(&page_id, &test_po, test_page.clone()).await?; - assert_eq!(test_page_num, PageOffset(PAGES_PER_FILE * test_count + 2)); + assert_eq!(test_po, PageOffset(PAGES_PER_FILE * test_count + 2)); - let test_page_get = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num)) - .await?? - .unwrap(); + let test_page_get = fm.get_page(&page_id, &test_po).await?.unwrap(); assert_eq!(test_page, test_page_get); diff --git a/src/engine/io/file_manager/file_operations.rs b/src/engine/io/file_manager/file_operations.rs index 73af02e..fcdd991 100644 --- a/src/engine/io/file_manager/file_operations.rs +++ b/src/engine/io/file_manager/file_operations.rs @@ -40,13 +40,18 @@ impl FileOperations { } /// Note the File Handle AND PageOffset should point to where the add should occur + /// If the file is larger than requested nothing is done. pub async fn add_chunk( file: File, page_offset: &PageOffset, buffer: Bytes, ) -> Result { - file.set_len(u64::try_from(page_offset.get_file_chunk_size())?) - .await?; + let metadata = file.metadata().await?; + let chunk_size_u64 = u64::try_from(page_offset.get_file_chunk_size())?; + + if metadata.len() < chunk_size_u64 { + file.set_len(chunk_size_u64).await?; + } Self::update_chunk(file, page_offset, buffer).await } diff --git a/src/engine/io/file_manager/request_type.rs b/src/engine/io/file_manager/request_type.rs index 510628f..b1a0f10 100644 --- a/src/engine/io/file_manager/request_type.rs +++ b/src/engine/io/file_manager/request_type.rs @@ -6,7 +6,8 @@ use super::file_executor::FileExecutorError; #[derive(Debug)] pub enum RequestType { - Add((Bytes, Sender>)), + GetOffset(Sender>), + Add((PageOffset, Bytes, Sender>)), Read( ( PageOffset, diff --git a/src/engine/io/free_space_manager.rs b/src/engine/io/free_space_manager.rs index 92fbb87..e9e2e2b 100644 --- a/src/engine/io/free_space_manager.rs +++ b/src/engine/io/free_space_manager.rs @@ -7,7 +7,7 @@ use crate::constants::PAGE_SIZE; use super::{ file_manager, page_formats::{PageId, PageOffset, PageType}, - FileManager, FileManagerError, LockManager, + FileManager, FileManagerError, LockCacheManager, LockCacheManagerError, LockManager, }; use bytes::{Buf, Bytes, BytesMut}; use lru::LruCache; @@ -16,17 +16,15 @@ use thiserror::Error; const MAX_FREESPACE_COUNT: usize = 32; pub struct FreeSpaceManager { - file_manager: FileManager, freespace_cache: LruCache<(PageId, PageOffset), Bytes>, - lock_manager: LockManager, + lock_cache_manager: LockCacheManager, } impl FreeSpaceManager { - pub fn new(file_manager: FileManager, lock_manager: LockManager) -> FreeSpaceManager { + pub fn new(lock_cache_manager: LockCacheManager) -> FreeSpaceManager { FreeSpaceManager { - file_manager, freespace_cache: LruCache::new(MAX_FREESPACE_COUNT), - lock_manager, + lock_cache_manager, } } @@ -40,32 +38,42 @@ impl FreeSpaceManager { page_type: PageType::FreeSpaceMap, }; loop { - let lock = self.lock_manager.get_lock(&free_id, &offset).await; - let lock_read = lock.read(); - match self.file_manager.get_page(&free_id, &offset).await? { - Some(mut s) => match Self::find_first_free_page_in_page(&mut s) { - Some(s) => { - let full_offset = - PageOffset(s) + offset * PageOffset(PAGE_SIZE as usize) * PageOffset(8); - return Ok(full_offset); + let mut page_handle = self.lock_cache_manager.get_page(free_id, offset).await?; + match page_handle.as_ref() { + Some(s) => { + let mut page_frozen = s.clone().freeze(); + match Self::find_first_free_page_in_page(&mut page_frozen) { + Some(s) => { + let full_offset = PageOffset(s) + + offset * PageOffset(PAGE_SIZE as usize) * PageOffset(8); + return Ok(full_offset); + } + None => { + offset += PageOffset(1); + continue; + } } - None => { - offset += PageOffset(1); - continue; - } - }, + } None => { + //Get the next offset, BUT since there could be a gap, we're going to blindly write all free + //and loop to get it again. + let next_po = self.lock_cache_manager.get_offset(free_id).await?; + + let mut new_page_handle = self + .lock_cache_manager + .get_page_for_update(free_id, next_po) + .await?; + let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - let new_page = vec![0; PAGE_SIZE as usize]; + let new_page = vec![FreeStat::Free as u8; PAGE_SIZE as usize]; buffer.extend_from_slice(&new_page); + new_page_handle.replace(buffer); - //TODO: So I have the read lock now for the add, need figure out how do this sanely in code - let new_page = self - .file_manager - .add_page(&page_id, buffer.freeze()) + self.lock_cache_manager + .add_page(free_id, next_po, new_page_handle) .await?; - let new_offset = new_page * PageOffset(PAGE_SIZE as usize) * PageOffset(8); - return Ok(new_offset); + + continue; //No increment since we could be at a gap } } } @@ -77,17 +85,24 @@ impl FreeSpaceManager { po: PageOffset, status: FreeStat, ) -> Result<(), FreeSpaceManagerError> { + let free_id = PageId { + resource_key: page_id.resource_key, + page_type: PageType::FreeSpaceMap, + }; let (po, offset) = po.get_bitmask_offset(); - match self.file_manager.get_page(&page_id, &po).await? { - Some(mut page) => { - Self::set_status_inside_page(&mut page, offset, status); - self.file_manager - .update_page(&page_id, &po, page.freeze()) - .await?; - Ok(()) - } - None => Err(FreeSpaceManagerError::PageDoesNotExist(page_id)), - } + let mut page_handle = self + .lock_cache_manager + .get_page_for_update(free_id, po) + .await?; + let mut page = page_handle + .as_mut() + .ok_or(FreeSpaceManagerError::PageDoesNotExist(page_id))?; + Self::set_status_inside_page(&mut page, offset, status); + + Ok(self + .lock_cache_manager + .update_page(free_id, po, page_handle) + .await?) } fn find_first_free_page_in_page(buffer: &mut impl Buf) -> Option { @@ -156,7 +171,7 @@ pub enum FreeStat { #[derive(Debug, Error)] pub enum FreeSpaceManagerError { #[error(transparent)] - FileManagerError(#[from] FileManagerError), + LockCacheManagerError(#[from] LockCacheManagerError), #[error("Page Offset {0} doesn't exist")] PageDoesNotExist(PageId), } diff --git a/src/engine/io/index_manager.rs b/src/engine/io/index_manager.rs index 8a5d9d7..304ebe0 100644 --- a/src/engine/io/index_manager.rs +++ b/src/engine/io/index_manager.rs @@ -1,8 +1,4 @@ -//! Start with a single bucket - -//! Insert key and pointer to record -//! read root -//! search through buckets, +//! TODO #24 Fix the index implementation to use the locking layer use std::collections::BTreeMap; use std::ops::Range; @@ -189,15 +185,11 @@ impl IndexManager { nodes: BTreeMap::new(), }; - let page_num = self - .file_manager - .add_page(&page_id, root_node.serialize()?) + self.file_manager + .add_page(&page_id, &PageOffset(1), root_node.serialize()?) .await?; - if page_num != PageOffset(1) { - return Err(IndexManagerError::ConcurrentCreationError()); - } - Ok((BTreeNode::Leaf(root_node), page_num)) + Ok((BTreeNode::Leaf(root_node), PageOffset(1))) } Err(e) => Err(e), } @@ -207,14 +199,13 @@ impl IndexManager { let mut root_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); let root_page = vec![0; PAGE_SIZE as usize]; root_page_buffer.extend_from_slice(&root_page); - let page_num = self - .file_manager - .add_page(index, root_page_buffer.freeze()) + self.file_manager + .add_page(index, &PageOffset(0), root_page_buffer.freeze()) .await?; - if page_num != PageOffset(0) { - return Err(IndexManagerError::ConcurrentCreationError()); - } + //if page_num != PageOffset(0) { + // return Err(IndexManagerError::ConcurrentCreationError()); + //} Ok(()) } diff --git a/src/engine/io/lock_cache_manager.rs b/src/engine/io/lock_cache_manager.rs index e60c424..73a7488 100644 --- a/src/engine/io/lock_cache_manager.rs +++ b/src/engine/io/lock_cache_manager.rs @@ -1,37 +1,35 @@ -use std::{ - collections::{HashMap, VecDeque}, - ptr::read, - sync::Arc, -}; - use super::{ page_formats::{PageId, PageOffset}, FileManager, FileManagerError, }; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use lru::LruCache; +use std::sync::Arc; use thiserror::Error; use tokio::sync::{Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; -//How do I support readers and writers? I want to use RwLocks but I need a way to stop unbounded -//Lrucache growth +#[derive(Clone, Debug)] pub struct LockCacheManager { //TODO I don't like these massive single hashes protected with a single lock // Long term I need to make a fixed hashmap and evict them myself. // Holding on this since I might be able to work around it cache: Arc>>>>>, - file_manager: FileManager, + file_manager: Arc, } impl LockCacheManager { - pub fn new(file_manager: FileManager) -> LockCacheManager { + pub fn new(file_manager: Arc) -> LockCacheManager { LockCacheManager { - //The unbounded nature of the cache worries me. I think I'll have to manage its eviction carefully + //TODO The unbounded nature of the cache worries me. I think I'll have to manage its eviction carefully cache: Arc::new(Mutex::new(LruCache::unbounded())), file_manager, } } + pub async fn get_offset(&self, page_id: PageId) -> Result { + Ok(self.file_manager.get_offset(&page_id).await?) + } + pub async fn get_page( &self, page_id: PageId, @@ -103,21 +101,22 @@ impl LockCacheManager { .await?) } - //TODO Need to figure how to lock for add, without blocking everyone - //for now doing the naive implementation since I can just hold the mutex during the add - //would be easily solved if I disconnect offset generation from the I/O pub async fn add_page( &self, page_id: PageId, - page: Bytes, - ) -> Result { - let mut cache = self.cache.lock().await; - let po = self.file_manager.add_page(&page_id, page.clone()).await?; - - let mut new_page = BytesMut::with_capacity(page.len()); - new_page.copy_from_slice(&page.slice(0..page.len())); - cache.put((page_id, po), Arc::new(RwLock::new(Some(new_page)))); - Ok(po) + offset: PageOffset, + guard: OwnedRwLockWriteGuard>, + ) -> Result<(), LockCacheManagerError> { + let page = match guard.as_ref() { + Some(s) => s.clone(), + None => { + return Err(LockCacheManagerError::PageMissing()); + } + }; + Ok(self + .file_manager + .add_page(&page_id, &offset, page.freeze()) + .await?) } } diff --git a/src/engine/io/page_formats/page_data.rs b/src/engine/io/page_formats/page_data.rs index 7e19006..c30c0c7 100644 --- a/src/engine/io/page_formats/page_data.rs +++ b/src/engine/io/page_formats/page_data.rs @@ -1,4 +1,3 @@ -use crate::constants::PAGE_SIZE; use crate::engine::io::{ConstEncodedSize, EncodedSize}; use crate::engine::objects::SqlTuple; use crate::engine::transactions::TransactionId; @@ -9,7 +8,7 @@ use super::{ ItemIdData, ItemIdDataError, PageHeader, PageHeaderError, PageOffset, UInt12, UInt12Error, }; use async_stream::stream; -use bytes::{Bytes, BytesMut}; +use bytes::{BufMut, BytesMut}; use futures::stream::Stream; use std::convert::TryFrom; use std::sync::Arc; @@ -96,35 +95,28 @@ impl PageData { } } - pub fn serialize(&self) -> Bytes { - let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - - self.page_header.serialize(&mut buffer); + pub fn serialize(&self, buffer: &mut impl BufMut) { + self.page_header.serialize(buffer); //Now write items data in order - self.item_ids.iter().for_each(|f| f.serialize(&mut buffer)); + self.item_ids.iter().for_each(|f| f.serialize(buffer)); //Fill the free space let free_space = vec![0; self.page_header.get_free_space()]; - buffer.extend_from_slice(&free_space); + buffer.put_slice(&free_space); //Write items in reverse order - self.rows - .iter() - .rev() - .for_each(|r| r.serialize(&mut buffer)); - - buffer.freeze() + self.rows.iter().rev().for_each(|r| r.serialize(buffer)); } pub fn parse( table: Arc
, page: PageOffset, - buffer: Bytes, + buffer: &BytesMut, ) -> Result { //Note since we need random access, everything MUST work off slices otherwise counts will be off - let mut page_header_slice = buffer.slice(0..PageHeader::encoded_size()); + let mut page_header_slice = &buffer[0..PageHeader::encoded_size()]; let page_header = PageHeader::parse(&mut page_header_slice)?; let mut item_ids: Vec = Vec::with_capacity(page_header.get_item_count()); @@ -133,10 +125,10 @@ impl PageData { let iid_lower_offset = PageHeader::encoded_size() + (ItemIdData::encoded_size() * i); let iid_upper_offset = PageHeader::encoded_size() + (ItemIdData::encoded_size() * (i + 1)); - let mut iid_slice = buffer.slice(iid_lower_offset..iid_upper_offset); + let mut iid_slice = &buffer[iid_lower_offset..iid_upper_offset]; let iid = ItemIdData::parse(&mut iid_slice)?; - let mut row_slice = buffer.slice(iid.get_range()); + let mut row_slice = &buffer[iid.get_range()]; let row = RowData::parse(table.clone(), &mut row_slice)?; item_ids.push(iid); rows.push(row); @@ -171,7 +163,7 @@ pub enum PageDataError { #[cfg(test)] mod tests { - use crate::constants::Nullable; + use crate::constants::{Nullable, PAGE_SIZE}; use crate::engine::objects::SqlTuple; use super::super::super::super::objects::{ @@ -180,6 +172,7 @@ mod tests { }; use super::super::super::super::transactions::TransactionId; use super::*; + use bytes::BytesMut; use futures::pin_mut; use tokio_stream::StreamExt; @@ -233,10 +226,11 @@ mod tests { for r in rows.clone() { assert!(pd.insert(r.min, &table, r.user_data).is_ok()); } - let serial = pd.serialize(); + let mut serial = BytesMut::with_capacity(PAGE_SIZE as usize); + pd.serialize(&mut serial); assert_eq!(PAGE_SIZE as usize, serial.len()); - let pg_parsed = PageData::parse(table.clone(), PageOffset(0), serial).unwrap(); + let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial).unwrap(); pin_mut!(pg_parsed); let result_rows: Vec = pg_parsed.get_stream().collect().await; @@ -273,8 +267,9 @@ mod tests { for r in rows.clone() { assert!(pd.insert(r.min, &table, r.user_data).is_ok()); } - let serial = pd.serialize(); - let pg_parsed = PageData::parse(table.clone(), PageOffset(0), serial).unwrap(); + let mut serial = BytesMut::with_capacity(PAGE_SIZE as usize); + pd.serialize(&mut serial); + let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial).unwrap(); pin_mut!(pg_parsed); let result_rows: Vec = pg_parsed.get_stream().collect().await; diff --git a/src/engine/io/row_manager.rs b/src/engine/io/row_manager.rs index 2829b8d..7cf6997 100644 --- a/src/engine/io/row_manager.rs +++ b/src/engine/io/row_manager.rs @@ -1,11 +1,12 @@ use super::super::objects::Table; use super::super::transactions::TransactionId; -use super::lock_manager::LockManager; use super::page_formats::{PageData, PageDataError, PageId, PageOffset, PageType, UInt12}; use super::row_formats::{ItemPointer, RowData, RowDataError}; -use super::{EncodedSize, FileManager, FileManagerError}; +use super::{EncodedSize, FileManagerError, LockCacheManager, LockCacheManagerError}; +use crate::constants::PAGE_SIZE; use crate::engine::objects::SqlTuple; use async_stream::try_stream; +use bytes::BytesMut; use futures::stream::Stream; use std::sync::Arc; use thiserror::Error; @@ -15,16 +16,12 @@ use thiserror::Error; /// It operates at the lowest level, no visibility checks are done. #[derive(Clone, Debug)] pub struct RowManager { - file_manager: Arc, - lock_manager: LockManager, + lock_cache_manager: LockCacheManager, } impl RowManager { - pub fn new(file_manager: Arc, lock_manager: LockManager) -> RowManager { - RowManager { - file_manager, - lock_manager, - } + pub fn new(lock_cache_manager: LockCacheManager) -> RowManager { + RowManager { lock_cache_manager } } pub async fn insert_row( @@ -45,7 +42,26 @@ impl RowManager { table: Arc
, row_pointer: ItemPointer, ) -> Result<(), RowManagerError> { - let (mut page, mut row) = self.get(table.clone(), row_pointer).await?; + let page_id = PageId { + resource_key: table.id, + page_type: PageType::Data, + }; + let mut page_handle = self + .lock_cache_manager + .get_page_for_update(page_id, row_pointer.page) + .await?; + let page_buffer = page_handle + .as_mut() + .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?; + + let mut page = PageData::parse(table, row_pointer.page, &page_buffer)?; + let mut row = page + .get_row(row_pointer.count) + .ok_or(RowManagerError::NonExistentRow( + row_pointer.count, + row_pointer.page, + ))? + .clone(); if row.max.is_some() { return Err(RowManagerError::AlreadyDeleted( @@ -57,14 +73,13 @@ impl RowManager { row.max = Some(current_tran_id); page.update(row, row_pointer.count)?; + page_buffer.clear(); + page.serialize(page_buffer); - let page_id = PageId { - resource_key: table.id, - page_type: PageType::Data, - }; - self.file_manager - .update_page(&page_id, &row_pointer.page, page.serialize()) + self.lock_cache_manager + .update_page(page_id, row_pointer.page, page_handle) .await?; + Ok(()) } @@ -76,8 +91,28 @@ impl RowManager { row_pointer: ItemPointer, new_user_data: SqlTuple, ) -> Result { - //First get the current row so we have it for the update/delete - let (mut old_page, mut old_row) = self.get(table.clone(), row_pointer).await?; + //First get the current row so we have it for the update + let page_id = PageId { + resource_key: table.id, + page_type: PageType::Data, + }; + let mut old_page_handle = self + .lock_cache_manager + .get_page_for_update(page_id, row_pointer.page) + .await?; + let old_page_buffer = old_page_handle + .as_mut() + .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?; + + let mut old_page = PageData::parse(table.clone(), row_pointer.page, &old_page_buffer)?; + + let mut old_row = old_page + .get_row(row_pointer.count) + .ok_or(RowManagerError::NonExistentRow( + row_pointer.count, + row_pointer.page, + ))? + .clone(); if old_row.max.is_some() { return Err(RowManagerError::AlreadyDeleted( @@ -93,6 +128,7 @@ impl RowManager { if old_page.can_fit(new_row_len) { new_row_pointer = old_page.insert(current_tran_id, &table, new_user_data)?; } else { + //TODO Possible Deadlock, when I do the freespace map should mark this page not free before doing this new_row_pointer = self .insert_row_internal(current_tran_id, table.clone(), new_user_data) .await?; @@ -102,12 +138,11 @@ impl RowManager { old_row.item_pointer = new_row_pointer; old_page.update(old_row, row_pointer.count)?; - let page_id = PageId { - resource_key: table.id, - page_type: PageType::Data, - }; - self.file_manager - .update_page(&page_id, &row_pointer.page, old_page.serialize()) + old_page_buffer.clear(); + old_page.serialize(old_page_buffer); + + self.lock_cache_manager + .update_page(page_id, row_pointer.page, old_page_handle) .await?; Ok(new_row_pointer) @@ -117,18 +152,20 @@ impl RowManager { &self, table: Arc
, row_pointer: ItemPointer, - ) -> Result<(PageData, RowData), RowManagerError> { + ) -> Result { let page_id = PageId { resource_key: table.id, page_type: PageType::Data, }; - let page_bytes = self - .file_manager - .get_page(&page_id, &row_pointer.page) - .await? + let page_handle = self + .lock_cache_manager + .get_page(page_id, row_pointer.page) + .await?; + let page_bytes = page_handle + .as_ref() .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?; - let page = PageData::parse(table, row_pointer.page, page_bytes.freeze())?; + let page = PageData::parse(table, row_pointer.page, &page_bytes)?; let row = page .get_row(row_pointer.count) @@ -138,7 +175,7 @@ impl RowManager { ))? .clone(); - Ok((page, row)) + Ok(row) } // Provides an unfiltered view of the underlying table @@ -151,26 +188,28 @@ impl RowManager { page_type: PageType::Data, }; + let lock_cache_manager = self.lock_cache_manager; + try_stream! { let mut page_num = PageOffset(0); - for await page_bytes in self.file_manager.get_stream(&page_id) { - let page_bytes = page_bytes?; - match page_bytes { + + loop { + let page_handle = lock_cache_manager.get_page(page_id, page_num).await?; + match page_handle.as_ref() { Some(s) => { - let page = PageData::parse(table.clone(), page_num, s.freeze())?; + let page = PageData::parse(table.clone(), page_num, s)?; for await row in page.get_stream() { yield row; } }, None => {return ();} } - page_num += PageOffset(1); } } } - // TODO implement visibility maps so I don't have to scan probable + // TODO implement free space maps so I don't have to scan every page async fn insert_row_internal( &self, current_tran_id: TransactionId, @@ -184,15 +223,19 @@ impl RowManager { let mut page_num = PageOffset(0); loop { - let page_bytes = self.file_manager.get_page(&page_id, &page_num).await?; - match page_bytes { + let mut page_bytes = self + .lock_cache_manager + .get_page_for_update(page_id, page_num) + .await?; + match page_bytes.as_mut() { Some(p) => { - let mut page = PageData::parse(table.clone(), page_num, p.freeze())?; + let mut page = PageData::parse(table.clone(), page_num, p)?; if page.can_fit(RowData::encoded_size(&user_data)) { + p.clear(); //We're going to reuse the buffer let new_row_pointer = page.insert(current_tran_id, &table, user_data)?; - let new_page_bytes = page.serialize(); - self.file_manager - .update_page(&page_id, &page_num, new_page_bytes) + page.serialize(p); + self.lock_cache_manager + .update_page(page_id, page_num, page_bytes) .await?; return Ok(new_row_pointer); } else { @@ -201,10 +244,25 @@ impl RowManager { } } None => { - let mut new_page = PageData::new(page_num); + //We got here because we asked for an offset that didn't exist yet. + drop(page_bytes); + + let new_page_offset = self.lock_cache_manager.get_offset(page_id).await?; + let mut new_page_handle = self + .lock_cache_manager + .get_page_for_update(page_id, new_page_offset) + .await?; + + let mut new_page = PageData::new(new_page_offset); let new_row_pointer = new_page.insert(current_tran_id, &table, user_data)?; //TODO Will NOT handle overly large rows - self.file_manager - .add_page(&page_id, new_page.serialize()) + + let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + new_page.serialize(&mut buffer); + + new_page_handle.replace(buffer); + + self.lock_cache_manager + .add_page(page_id, new_page_offset, new_page_handle) .await?; return Ok(new_row_pointer); } @@ -220,6 +278,8 @@ pub enum RowManagerError { #[error(transparent)] FileManagerError(#[from] FileManagerError), #[error(transparent)] + LockCacheManagerError(#[from] LockCacheManagerError), + #[error(transparent)] RowDataError(#[from] RowDataError), #[error("Page {0} does not exist")] NonExistentPage(PageOffset), @@ -237,6 +297,7 @@ mod tests { use super::super::super::objects::Table; use super::*; use crate::constants::Nullable; + use crate::engine::io::FileManager; use crate::engine::objects::types::BaseSqlTypes; use crate::engine::objects::types::BaseSqlTypesMapper; use futures::pin_mut; @@ -285,7 +346,7 @@ mod tests { let table = get_table(); let fm = Arc::new(FileManager::new(tmp_dir.clone())?); - let rm = RowManager::new(fm, LockManager::new()); + let rm = RowManager::new(LockCacheManager::new(fm)); let tran_id = TransactionId::new(1); @@ -299,7 +360,7 @@ mod tests { //Now let's make sure they're really in the table, persisting across restarts let fm = Arc::new(FileManager::new(tmp_dir)?); - let rm = RowManager::new(fm, LockManager::new()); + let rm = RowManager::new(LockCacheManager::new(fm)); pin_mut!(rm); let result_rows: Vec = rm @@ -325,7 +386,7 @@ mod tests { let table = get_table(); let fm = Arc::new(FileManager::new(tmp_dir)?); - let rm = RowManager::new(fm, LockManager::new()); + let rm = RowManager::new(LockCacheManager::new(fm)); let tran_id = TransactionId::new(1); diff --git a/src/engine/io/visible_row_manager.rs b/src/engine/io/visible_row_manager.rs index 7fcb3cb..516332c 100644 --- a/src/engine/io/visible_row_manager.rs +++ b/src/engine/io/visible_row_manager.rs @@ -50,11 +50,11 @@ impl VisibleRowManager { tran_id: TransactionId, table: Arc
, row_pointer: ItemPointer, - ) -> Result<(PageData, RowData), VisibleRowManagerError> { - let (page, row) = self.row_manager.get(table, row_pointer).await?; + ) -> Result { + let row = self.row_manager.get(table, row_pointer).await?; if VisibleRowManager::is_visible(self.tran_manager.clone(), tran_id, &row).await? { - Ok((page, row)) + Ok(row) } else { Err(VisibleRowManagerError::NotVisibleRow(row)) } diff --git a/tests/visibility_tests.rs b/tests/visibility_tests.rs index c5b8e35..1a4f113 100644 --- a/tests/visibility_tests.rs +++ b/tests/visibility_tests.rs @@ -1,7 +1,10 @@ use feophantlib::{ constants::Nullable, engine::{ - io::{row_formats::RowData, FileManager, LockManager, RowManager, VisibleRowManager}, + io::{ + row_formats::RowData, FileManager, LockCacheManager, LockManager, RowManager, + VisibleRowManager, + }, objects::{ types::{BaseSqlTypes, BaseSqlTypesMapper}, Attribute, SqlTuple, Table, @@ -64,8 +67,8 @@ async fn test_row_manager_visibility() -> Result<(), Box> let table = get_table(); let mut tm = TransactionManager::new(); - let pm = Arc::new(FileManager::new(tmp_dir)?); - let rm = RowManager::new(pm, LockManager::new()); + let fm = Arc::new(FileManager::new(tmp_dir)?); + let rm = RowManager::new(LockCacheManager::new(fm)); let vm = VisibleRowManager::new(rm.clone(), tm.clone()); let row = get_row("test".to_string());