From c700af4bbd7cf0acd369e30795fd6fcf3f914111 Mon Sep 17 00:00:00 2001 From: Christopher Hotchkiss Date: Sun, 15 Aug 2021 23:21:47 -0400 Subject: [PATCH] Implemented type options per resource so I can implement free space maps --- README.md | 8 + benches/feophant_benchmark.rs | 5 +- src/engine.rs | 5 +- src/engine/analyzer/definition_lookup.rs | 8 +- src/engine/io.rs | 2 +- src/engine/io/file_manager.rs | 82 ++++----- src/engine/io/file_manager/file_executor.rs | 167 +++++++++--------- src/engine/io/file_manager/file_operations.rs | 28 +-- src/engine/io/free_space_manager.rs | 2 + src/engine/io/index_manager.rs | 27 ++- src/engine/io/page_formats.rs | 4 + src/engine/io/page_formats/page_id.rs | 68 +++++++ src/engine/io/row_manager.rs | 55 ++++-- tests/visibility_tests.rs | 4 +- 14 files changed, 296 insertions(+), 169 deletions(-) create mode 100644 src/engine/io/free_space_manager.rs create mode 100644 src/engine/io/page_formats/page_id.rs diff --git a/README.md b/README.md index d705020..0e03cff 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,14 @@ Benchmark to aid in profiling ## Current TODO List - Subject to constant change! +**TODO** +Implement Free Space Maps so mutation of data doesn't need a linear scan/parse non stop. + +Did more thinking, I should implement postgres's streams concept so that I don't need to do lookups to find associated metadata on an object. +I thought I was going to get to use Uuid+page offset. I think its now going to be uuid+page offset+ type. + +struct PageId + enum PageType should do it + **TODO** Implement page level locks that are ordered to avoid deadlocking. diff --git a/benches/feophant_benchmark.rs b/benches/feophant_benchmark.rs index 2cbfde0..d88cc30 100644 --- a/benches/feophant_benchmark.rs +++ b/benches/feophant_benchmark.rs @@ -8,6 +8,7 @@ use criterion::{criterion_group, criterion_main}; use feophantlib::constants::Nullable; use feophantlib::engine::io::row_formats::RowData; use feophantlib::engine::io::FileManager; +use feophantlib::engine::io::LockManager; use feophantlib::engine::io::RowManager; use feophantlib::engine::objects::types::BaseSqlTypes; use feophantlib::engine::objects::types::BaseSqlTypesMapper; @@ -62,7 +63,7 @@ async fn row_manager_mass_insert(row_count: usize) -> Result<(), Box Result<(), Box = rm diff --git a/src/engine.rs b/src/engine.rs index e704dca..e76e80b 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -29,6 +29,7 @@ use transactions::{TransactionId, TransactionManager}; use self::io::ConstraintManager; use self::io::FileManager; +use self::io::LockManager; use self::objects::QueryResult; use std::ops::Deref; use std::sync::Arc; @@ -43,7 +44,9 @@ pub struct Engine { impl Engine { pub fn new(file_manager: Arc, tran_manager: TransactionManager) -> Engine { - let vis_row_man = VisibleRowManager::new(RowManager::new(file_manager), tran_manager); + let lock_man = LockManager::new(); + let vis_row_man = + VisibleRowManager::new(RowManager::new(file_manager, lock_man), tran_manager); let con_man = ConstraintManager::new(vis_row_man.clone()); Engine { analyzer: Analyzer::new(vis_row_man), diff --git a/src/engine/analyzer/definition_lookup.rs b/src/engine/analyzer/definition_lookup.rs index 372162b..40f3b22 100644 --- a/src/engine/analyzer/definition_lookup.rs +++ b/src/engine/analyzer/definition_lookup.rs @@ -190,7 +190,7 @@ pub enum DefinitionLookupError { mod tests { use tempfile::TempDir; - use crate::engine::io::FileManager; + use crate::engine::io::{FileManager, LockManager}; // Note this useful idiom: importing names from outer (for mod tests) scope. use super::super::super::io::RowManager; @@ -205,7 +205,7 @@ mod tests { let fm = Arc::new(FileManager::new(tmp_dir)?); let tm = TransactionManager::new(); - let rm = RowManager::new(fm); + let rm = RowManager::new(fm, LockManager::new()); let vm = VisibleRowManager::new(rm, tm); let dl = DefinitionLookup::new(vm); @@ -224,7 +224,7 @@ mod tests { let fm = Arc::new(FileManager::new(tmp_dir)?); let tm = TransactionManager::new(); - let rm = RowManager::new(fm); + let rm = RowManager::new(fm, LockManager::new()); let vm = VisibleRowManager::new(rm, tm); let dl = DefinitionLookup::new(vm); @@ -248,7 +248,7 @@ mod tests { let fm = Arc::new(FileManager::new(tmp_dir)?); let mut tm = TransactionManager::new(); - let rm = RowManager::new(fm.clone()); + let rm = RowManager::new(fm.clone(), LockManager::new()); let vm = VisibleRowManager::new(rm.clone(), tm.clone()); let dl = DefinitionLookup::new(vm); let mut engine = Engine::new(fm, tm.clone()); diff --git a/src/engine/io.rs b/src/engine/io.rs index a4bc289..b40f5c2 100644 --- a/src/engine/io.rs +++ b/src/engine/io.rs @@ -17,7 +17,7 @@ pub use file_manager::FileManager; pub use file_manager::FileManagerError; mod lock_manager; -//pub use lock_manager::LockManager; +pub use lock_manager::LockManager; mod page_formats; diff --git a/src/engine/io/file_manager.rs b/src/engine/io/file_manager.rs index 68ea8ee..bc5e035 100644 --- a/src/engine/io/file_manager.rs +++ b/src/engine/io/file_manager.rs @@ -1,6 +1,6 @@ //! This is a different approach than I had done before. This file manager runs its own loop based on a spawned task //! since the prior approach was too lock heavy and I couldn't figure out an approach that didn't starve resources. -use super::page_formats::{PageOffset, UInt12, UInt12Error}; +use super::page_formats::{PageId, PageOffset, UInt12, UInt12Error}; use async_stream::try_stream; use bytes::Bytes; use futures::Stream; @@ -12,7 +12,6 @@ use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::sync::oneshot::error::RecvError; use tokio::sync::oneshot::{self, Sender}; -use uuid::Uuid; //Inner Types mod file_executor; @@ -26,7 +25,7 @@ pub use resource_formatter::ResourceFormatter; #[derive(Clone, Debug)] pub struct FileManager { - request_queue: UnboundedSender<(Uuid, RequestType)>, + request_queue: UnboundedSender<(PageId, RequestType)>, request_shutdown: UnboundedSender>, } @@ -56,7 +55,7 @@ impl FileManager { pub async fn add_page( &self, - resource_key: &Uuid, + page_id: &PageId, page: Bytes, ) -> Result { let size = UInt12::try_from(page.len() - 1)?; @@ -67,30 +66,30 @@ impl FileManager { let (res_request, res_receiver) = oneshot::channel(); self.request_queue - .send((*resource_key, RequestType::Add((page, res_request))))?; + .send((*page_id, RequestType::Add((page, res_request))))?; Ok(res_receiver.await??) } pub async fn get_page( &self, - resource_key: &Uuid, + page_id: &PageId, offset: &PageOffset, ) -> Result, FileManagerError> { let (res_request, res_receiver) = oneshot::channel(); self.request_queue - .send((*resource_key, RequestType::Read((*offset, res_request))))?; + .send((*page_id, RequestType::Read((*offset, res_request))))?; Ok(res_receiver.await??) } pub fn get_stream( &self, - resource_key: &Uuid, + page_id: &PageId, ) -> impl Stream, FileManagerError>> { let request_queue = self.request_queue.clone(); - let resource_key = *resource_key; + let page_id = *page_id; try_stream! { let mut page_num = PageOffset(0); @@ -98,7 +97,7 @@ impl FileManager { let (res_request, res_receiver) = oneshot::channel(); request_queue - .send((resource_key, RequestType::Read((page_num, res_request))))?; + .send((page_id, RequestType::Read((page_num, res_request))))?; let page = res_receiver.await??; @@ -111,7 +110,7 @@ impl FileManager { pub async fn update_page( &self, - resource_key: &Uuid, + page_id: &PageId, page: Bytes, offset: &PageOffset, ) -> Result<(), FileManagerError> { @@ -122,10 +121,8 @@ impl FileManager { let (res_request, res_receiver) = oneshot::channel(); - self.request_queue.send(( - *resource_key, - RequestType::Update((*offset, page, res_request)), - ))?; + self.request_queue + .send((*page_id, RequestType::Update((*offset, page, res_request))))?; Ok(res_receiver.await??) } @@ -159,7 +156,7 @@ pub enum FileManagerError { #[error(transparent)] RecvError(#[from] RecvError), #[error(transparent)] - SendError(#[from] SendError<(Uuid, RequestType)>), + SendError(#[from] SendError<(PageId, RequestType)>), #[error(transparent)] ShutdownSendError(#[from] SendError>), #[error(transparent)] @@ -179,8 +176,9 @@ mod tests { use bytes::BytesMut; use tempfile::TempDir; use tokio::time::timeout; + use uuid::Uuid; - use crate::constants::PAGE_SIZE; + use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType}; use super::*; @@ -198,38 +196,29 @@ mod tests { let fm = FileManager::new(tmp_dir.as_os_str().to_os_string())?; - let test_uuid = Uuid::new_v4(); + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; + let test_page = get_test_page(1); - let test_page_num = timeout( - Duration::new(10, 0), - fm.add_page(&test_uuid, test_page.clone()), - ) - .await??; + let test_page_num = fm.add_page(&page_id, test_page.clone()).await?; assert_eq!(test_page_num, PageOffset(0)); - let test_page_get = timeout( - Duration::new(10, 0), - fm.get_page(&test_uuid, &test_page_num), - ) - .await?? - .unwrap(); + let test_page_get = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num)) + .await?? + .unwrap(); assert_eq!(test_page, test_page_get); let test_page2 = get_test_page(2); - timeout( - Duration::new(10, 0), - fm.update_page(&test_uuid, test_page2.clone(), &test_page_num), - ) - .await??; - - let test_page_get2 = timeout( - Duration::new(10, 0), - fm.get_page(&test_uuid, &test_page_num), - ) - .await?? - .unwrap(); + fm.update_page(&page_id, test_page2.clone(), &test_page_num) + .await?; + + let test_page_get2 = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num)) + .await?? + .unwrap(); assert_eq!(test_page2, test_page_get2); @@ -237,16 +226,13 @@ mod tests { let fm2 = FileManager::new(tmp_dir.as_os_str().to_os_string())?; let test_page3 = get_test_page(3); - let test_page_num3 = fm2.add_page(&test_uuid, test_page3.clone()).await?; + let test_page_num3 = fm2.add_page(&page_id, test_page3.clone()).await?; println!("{0}", test_page_num3); assert!(test_page_num3 > test_page_num); - let test_page_get2 = timeout( - Duration::new(10, 0), - fm2.get_page(&test_uuid, &test_page_num), - ) - .await?? - .unwrap(); + let test_page_get2 = timeout(Duration::new(10, 0), fm2.get_page(&page_id, &test_page_num)) + .await?? + .unwrap(); assert_eq!(test_page2, test_page_get2); diff --git a/src/engine/io/file_manager/file_executor.rs b/src/engine/io/file_manager/file_executor.rs index 096b490..065be3f 100644 --- a/src/engine/io/file_manager/file_executor.rs +++ b/src/engine/io/file_manager/file_executor.rs @@ -4,13 +4,19 @@ use super::file_operations::{FileOperations, FileOperationsError}; use super::request_type::RequestType; use crate::constants::PAGE_SIZE; use crate::engine::io::file_manager::ResourceFormatter; -use crate::engine::io::page_formats::PageOffset; +use crate::engine::io::page_formats::{PageId, PageOffset, PageType}; use bytes::{Bytes, BytesMut}; use lru::LruCache; +use nom::bytes::complete::tag_no_case; +use nom::character::complete::alphanumeric1; +use nom::error::{ContextError, ParseError}; +use nom::sequence::tuple; +use nom::IResult; use std::collections::{HashMap, VecDeque}; use std::convert::TryFrom; use std::ffi::OsStr; use std::num::TryFromIntError; +use std::str::FromStr; use std::string::FromUtf8Error; use std::{ ffi::OsString, @@ -35,15 +41,15 @@ const EMPTY_BUFFER: [u8; 16] = [0u8; 16]; #[derive(Debug)] pub struct FileExecutor { data_dir: PathBuf, - receive_queue: UnboundedReceiver<(Uuid, RequestType)>, + receive_queue: UnboundedReceiver<(PageId, RequestType)>, receive_shutdown: UnboundedReceiver>, - resource_lookup: HashMap, + resource_lookup: HashMap, } impl FileExecutor { pub fn new( raw_path: OsString, - receive_queue: UnboundedReceiver<(Uuid, RequestType)>, + receive_queue: UnboundedReceiver<(PageId, RequestType)>, receive_shutdown: UnboundedReceiver>, ) -> Result { let data_dir = Path::new(&raw_path).to_path_buf(); @@ -54,7 +60,7 @@ impl FileExecutor { )); } - let resource_lookup: HashMap = HashMap::new(); + let resource_lookup = HashMap::new(); Ok(FileExecutor { data_dir, @@ -91,7 +97,7 @@ impl FileExecutor { // This cache is used to indicate when a file operation is in flight on a handle, there are two options: // * entry: some(file) -> Idle File Handle that can be used // * entry: None -> File handle in use but not returned - let mut file_handle_cache: LruCache<(Uuid, usize), Option> = + let mut file_handle_cache: LruCache<(PageId, usize), Option> = LruCache::new(MAX_FILE_HANDLE_COUNT); // This channel is used to restore or drop entries on the file handle cache, there are two options: @@ -100,7 +106,7 @@ impl FileExecutor { let (send_completed, mut receive_completed) = mpsc::unbounded_channel(); // Queue used as a holding ground until a handle is availible for it to execute. Used in a FIFO fashion - let mut request_queue: VecDeque<(Uuid, RequestType)> = VecDeque::new(); + let mut request_queue: VecDeque<(PageId, RequestType)> = VecDeque::new(); let mut shutdown_sender = None; @@ -131,8 +137,8 @@ impl FileExecutor { } } maybe_recv = self.receive_queue.recv(), if request_queue.len() < MAX_FILE_HANDLE_COUNT => { - if let Some((resource_key, req_type)) = maybe_recv { - request_queue.push_back((resource_key, req_type)); + if let Some((page_id, req_type)) = maybe_recv { + request_queue.push_back((page_id, req_type)); } else { break; } @@ -142,10 +148,10 @@ impl FileExecutor { //Still haven't figured out incrementing file_handles if file_handles_open < MAX_FILE_HANDLE_COUNT && !request_queue.is_empty() { let mut new_request_queue = VecDeque::with_capacity(request_queue.len()); - for (resource_key, req_type) in request_queue.into_iter() { + for (page_id, req_type) in request_queue.into_iter() { match req_type { RequestType::Add((a, response)) => { - let next_po = match self.get_next_po(&resource_key).await { + let next_po = match self.get_next_po(&page_id).await { Ok(po) => po, Err(e) => { let _ = response.send(Err(e)); @@ -153,12 +159,11 @@ impl FileExecutor { } }; - match file_handle_cache.pop(&(resource_key, next_po.get_file_number())) - { + match file_handle_cache.pop(&(page_id, next_po.get_file_number())) { Some(maybe_file) => match maybe_file { Some(file) => { file_handle_cache - .put((resource_key, next_po.get_file_number()), None); + .put((page_id, next_po.get_file_number()), None); let file_handle_ret = send_completed.clone(); tokio::spawn(async move { let response_f = response; @@ -167,7 +172,7 @@ impl FileExecutor { { Ok(o) => { let _ = file_handle_ret.send(( - resource_key, + page_id, next_po.get_file_number(), Some(o), )); @@ -175,7 +180,7 @@ impl FileExecutor { } Err(e) => { let _ = file_handle_ret.send(( - resource_key, + page_id, next_po.get_file_number(), None, )); @@ -189,18 +194,16 @@ impl FileExecutor { None => { //Request in flight, skip for now, but have to reinsert into cache file_handle_cache - .put((resource_key, next_po.get_file_number()), None); + .put((page_id, next_po.get_file_number()), None); - new_request_queue.push_back(( - resource_key, - RequestType::Add((a, response)), - )); + new_request_queue + .push_back((page_id, RequestType::Add((a, response)))); continue; } }, None => { file_handle_cache - .put((resource_key, next_po.get_file_number()), None); + .put((page_id, next_po.get_file_number()), None); file_handles_open = file_handles_open.saturating_add(1); let data_dir = self.data_dir.clone(); let file_handle_ret = send_completed.clone(); @@ -209,7 +212,7 @@ impl FileExecutor { let file = match FileOperations::open_path( &data_dir, - &resource_key, + &page_id, next_po.get_file_number(), ) .await @@ -217,7 +220,7 @@ impl FileExecutor { Ok(o) => o, Err(e) => { let _ = file_handle_ret.send(( - resource_key, + page_id, next_po.get_file_number(), None, )); @@ -231,7 +234,7 @@ impl FileExecutor { match FileOperations::add_chunk(file, &next_po, a).await { Ok(o) => { let _ = file_handle_ret.send(( - resource_key, + page_id, next_po.get_file_number(), Some(o), )); @@ -239,7 +242,7 @@ impl FileExecutor { } Err(e) => { let _ = file_handle_ret.send(( - resource_key, + page_id, next_po.get_file_number(), None, )); @@ -253,11 +256,10 @@ impl FileExecutor { } } RequestType::Read((r, response)) => { - match file_handle_cache.pop(&(resource_key, r.get_file_number())) { + match file_handle_cache.pop(&(page_id, r.get_file_number())) { Some(maybe_file) => match maybe_file { Some(file) => { - file_handle_cache - .put((resource_key, r.get_file_number()), None); + file_handle_cache.put((page_id, r.get_file_number()), None); let file_handle_ret = send_completed.clone(); tokio::spawn(async move { let response_f = response; @@ -265,7 +267,7 @@ impl FileExecutor { match FileOperations::read_chunk(file, &r).await { Ok((o, buffer)) => { let _ = file_handle_ret.send(( - resource_key, + page_id, r.get_file_number(), Some(o), )); @@ -273,7 +275,7 @@ impl FileExecutor { } Err(e) => { let _ = file_handle_ret.send(( - resource_key, + page_id, r.get_file_number(), None, )); @@ -286,18 +288,14 @@ impl FileExecutor { } None => { //Request in flight, skip for now, but have to reinsert into cache - file_handle_cache - .put((resource_key, r.get_file_number()), None); - new_request_queue.push_back(( - resource_key, - RequestType::Read((r, response)), - )); + file_handle_cache.put((page_id, r.get_file_number()), None); + new_request_queue + .push_back((page_id, RequestType::Read((r, response)))); continue; } }, None => { - file_handle_cache - .put((resource_key, r.get_file_number()), None); + file_handle_cache.put((page_id, r.get_file_number()), None); file_handles_open = file_handles_open.saturating_add(1); let data_dir = self.data_dir.clone(); let file_handle_ret = send_completed.clone(); @@ -306,7 +304,7 @@ impl FileExecutor { let file = match FileOperations::open_path( &data_dir, - &resource_key, + &page_id, r.get_file_number(), ) .await @@ -314,7 +312,7 @@ impl FileExecutor { Ok(o) => o, Err(e) => { let _ = file_handle_ret.send(( - resource_key, + page_id, r.get_file_number(), None, )); @@ -328,7 +326,7 @@ impl FileExecutor { match FileOperations::read_chunk(file, &r).await { Ok((o, maybe_buffer)) => { let _ = file_handle_ret.send(( - resource_key, + page_id, r.get_file_number(), Some(o), )); @@ -336,7 +334,7 @@ impl FileExecutor { } Err(e) => { let _ = file_handle_ret.send(( - resource_key, + page_id, r.get_file_number(), None, )); @@ -350,11 +348,10 @@ impl FileExecutor { } } RequestType::Update((u, buffer, response)) => { - match file_handle_cache.pop(&(resource_key, u.get_file_number())) { + match file_handle_cache.pop(&(page_id, u.get_file_number())) { Some(maybe_file) => match maybe_file { Some(file) => { - file_handle_cache - .put((resource_key, u.get_file_number()), None); + file_handle_cache.put((page_id, u.get_file_number()), None); let file_handle_ret = send_completed.clone(); tokio::spawn(async move { let response_f = response; @@ -364,7 +361,7 @@ impl FileExecutor { { Ok(o) => { let _ = file_handle_ret.send(( - resource_key, + page_id, u.get_file_number(), Some(o), )); @@ -372,7 +369,7 @@ impl FileExecutor { } Err(e) => { let _ = file_handle_ret.send(( - resource_key, + page_id, u.get_file_number(), None, )); @@ -385,18 +382,16 @@ impl FileExecutor { } None => { //Request in flight, skip for now, but have to reinsert into cache - file_handle_cache - .put((resource_key, u.get_file_number()), None); + file_handle_cache.put((page_id, u.get_file_number()), None); new_request_queue.push_back(( - resource_key, + page_id, RequestType::Update((u, buffer, response)), )); continue; } }, None => { - file_handle_cache - .put((resource_key, u.get_file_number()), None); + file_handle_cache.put((page_id, u.get_file_number()), None); file_handles_open = file_handles_open.saturating_add(1); let data_dir = self.data_dir.clone(); let file_handle_ret = send_completed.clone(); @@ -405,7 +400,7 @@ impl FileExecutor { let file = match FileOperations::open_path( &data_dir, - &resource_key, + &page_id, u.get_file_number(), ) .await @@ -413,7 +408,7 @@ impl FileExecutor { Ok(o) => o, Err(e) => { let _ = file_handle_ret.send(( - resource_key, + page_id, u.get_file_number(), None, )); @@ -427,7 +422,7 @@ impl FileExecutor { match FileOperations::update_chunk(file, &u, buffer).await { Ok(o) => { let _ = file_handle_ret.send(( - resource_key, + page_id, u.get_file_number(), Some(o), )); @@ -435,7 +430,7 @@ impl FileExecutor { } Err(e) => { let _ = file_handle_ret.send(( - resource_key, + page_id, u.get_file_number(), None, )); @@ -465,24 +460,24 @@ impl FileExecutor { } } - async fn get_next_po(&mut self, resource_key: &Uuid) -> Result { + async fn get_next_po(&mut self, page_id: &PageId) -> Result { //Find the resource key's latest offset so we can iterate on it for adds - match self.resource_lookup.remove(resource_key) { + match self.resource_lookup.remove(page_id) { Some(po) => { - self.resource_lookup.insert(*resource_key, po.next()); + self.resource_lookup.insert(*page_id, po.next()); Ok(po) } None => { - let po = self.find_next_offset(resource_key).await?; - self.resource_lookup.insert(*resource_key, po.next()); + let po = self.find_next_offset(page_id).await?; + self.resource_lookup.insert(*page_id, po.next()); Ok(po) } } } - async fn find_next_offset(&self, resource_key: &Uuid) -> Result { + async fn find_next_offset(&self, page_id: &PageId) -> Result { let (path, count) = - match Self::search_for_max_file(self.data_dir.as_path(), resource_key).await? { + match Self::search_for_max_file(self.data_dir.as_path(), page_id).await? { Some((p, c)) => (p, c), None => { return Ok(PageOffset(0)); @@ -533,10 +528,12 @@ impl FileExecutor { /// This will search for the highest numbered file for the Uuid async fn search_for_max_file( data_dir: &Path, - resource_key: &Uuid, + page_id: &PageId, ) -> Result, FileExecutorError> { - let sub_path = FileOperations::make_sub_path(data_dir, resource_key).await?; - let target_filename = ResourceFormatter::format_uuid(resource_key); + let sub_path = FileOperations::make_sub_path(data_dir, page_id).await?; + let target_uuid = ResourceFormatter::format_uuid(&page_id.resource_key); + let target_type = page_id.page_type.to_string(); + let target_filename = format!("{0}.{1}", target_uuid, target_type); let mut max_file_count = 0; let mut max_file_path = None; @@ -607,8 +604,12 @@ mod tests { use tempfile::TempDir; use tokio::{io::AsyncWriteExt, time::timeout}; + use uuid::Uuid; - use crate::{constants::PAGES_PER_FILE, engine::io::FileManager}; + use crate::{ + constants::PAGES_PER_FILE, + engine::io::{page_formats::PageType, FileManager}, + }; use super::*; @@ -627,10 +628,13 @@ mod tests { //We're going to touch a single file to force it to think it has far more data than it does. //I don't normally write tests this way but I don't want to write GBs unnecessarily. - let test_uuid = Uuid::new_v4(); + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; let mut test_file = - FileOperations::open_path(tmp_dir, &test_uuid, PageOffset(0).get_file_number()).await?; + FileOperations::open_path(tmp_dir, &page_id, PageOffset(0).get_file_number()).await?; let mut test_page = get_test_page(1); @@ -641,11 +645,11 @@ mod tests { let fm = FileManager::new(tmp_dir.as_os_str().to_os_string())?; let test_page = get_test_page(2); - let test_page_num = fm.add_page(&test_uuid, test_page.clone()).await?; + let test_page_num = fm.add_page(&page_id, test_page.clone()).await?; assert_eq!(test_page_num, PageOffset(2)); - let test_page_get = fm.get_page(&test_uuid, &test_page_num).await?.unwrap(); + let test_page_get = fm.get_page(&page_id, &test_page_num).await?.unwrap(); assert_eq!(test_page, test_page_get); @@ -661,12 +665,16 @@ mod tests { //We're going to touch a single file to force it to think it has far more data than it does. //I don't normally write tests this way but I don't want to write GBs unnecessarily. - let test_uuid = Uuid::new_v4(); + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; + let test_count: usize = 10; let mut test_file = FileOperations::open_path( tmp_dir, - &test_uuid, + &page_id, PageOffset(PAGES_PER_FILE * test_count).get_file_number(), ) .await?; @@ -682,18 +690,15 @@ mod tests { let test_page = get_test_page(2); let test_page_num = timeout( Duration::new(10, 0), - fm.add_page(&test_uuid, test_page.clone()), + fm.add_page(&page_id, test_page.clone()), ) .await??; assert_eq!(test_page_num, PageOffset(PAGES_PER_FILE * test_count + 2)); - let test_page_get = timeout( - Duration::new(10, 0), - fm.get_page(&test_uuid, &test_page_num), - ) - .await?? - .unwrap(); + let test_page_get = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num)) + .await?? + .unwrap(); assert_eq!(test_page, test_page_get); diff --git a/src/engine/io/file_manager/file_operations.rs b/src/engine/io/file_manager/file_operations.rs index 011238e..633b8b2 100644 --- a/src/engine/io/file_manager/file_operations.rs +++ b/src/engine/io/file_manager/file_operations.rs @@ -12,22 +12,22 @@ use tokio::{ fs::{File, OpenOptions}, io::AsyncSeekExt, }; -use uuid::Uuid; use crate::constants::PAGE_SIZE; use crate::engine::io::file_manager::ResourceFormatter; -use crate::engine::io::page_formats::PageOffset; +use crate::engine::io::page_formats::{PageId, PageOffset}; pub struct FileOperations {} impl FileOperations { pub async fn open_path( data_dir: &Path, - resource_key: &Uuid, + page_id: &PageId, file_number: usize, ) -> Result { - let mut path = Self::make_sub_path(data_dir, resource_key).await?; - let file_stem = ResourceFormatter::format_uuid(resource_key); - let filename = format!("{0}.{1}", file_stem, file_number); + let mut path = Self::make_sub_path(data_dir, page_id).await?; + let file_stem = ResourceFormatter::format_uuid(&page_id.resource_key); + let file_type = page_id.page_type.to_string(); + let filename = format!("{0}.{1}.{2}", file_stem, file_type, file_number); path.push(filename); @@ -54,9 +54,9 @@ impl FileOperations { //Makes the prefix folder so we don't fill up folders. Will consider more nesting eventually pub async fn make_sub_path( data_dir: &Path, - resource_key: &Uuid, + page_id: &PageId, ) -> Result { - let subfolder = ResourceFormatter::get_uuid_prefix(resource_key); + let subfolder = ResourceFormatter::get_uuid_prefix(&page_id.resource_key); let mut path = PathBuf::new(); path.push(data_dir); @@ -121,6 +121,9 @@ pub enum FileOperationsError { #[cfg(test)] mod tests { use tempfile::TempDir; + use uuid::Uuid; + + use crate::engine::io::page_formats::PageType; use super::*; @@ -128,11 +131,14 @@ mod tests { async fn test_make_sub_path() -> Result<(), Box> { let tmp = TempDir::new()?; - let test_uuid = Uuid::new_v4(); + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; //Must be able to repeatedly make the sub_path - FileOperations::make_sub_path(tmp.path(), &test_uuid).await?; - FileOperations::make_sub_path(tmp.path(), &test_uuid).await?; + FileOperations::make_sub_path(tmp.path(), &page_id).await?; + FileOperations::make_sub_path(tmp.path(), &page_id).await?; Ok(()) } diff --git a/src/engine/io/free_space_manager.rs b/src/engine/io/free_space_manager.rs new file mode 100644 index 0000000..a82e6c0 --- /dev/null +++ b/src/engine/io/free_space_manager.rs @@ -0,0 +1,2 @@ +//! This struct provides a lookup service to tell row_manager where / if there is free space to +//! add new tuples. diff --git a/src/engine/io/index_manager.rs b/src/engine/io/index_manager.rs index 21e691e..318b221 100644 --- a/src/engine/io/index_manager.rs +++ b/src/engine/io/index_manager.rs @@ -9,8 +9,8 @@ use std::ops::Range; use std::sync::Arc; use super::index_formats::{BTreeLeafError, BTreeNode, BTreeNodeError}; -use super::page_formats::ItemIdData; use super::page_formats::PageOffset; +use super::page_formats::{ItemIdData, PageId, PageType}; use super::{FileManager, FileManagerError}; use crate::engine::io::SelfEncodedSize; use crate::{ @@ -88,10 +88,15 @@ impl IndexManager { match current_node.0 { BTreeNode::Branch(b) => {} BTreeNode::Leaf(mut l) => { + let page_id = PageId { + resource_key: index_def.id, + page_type: PageType::Data, + }; + if l.can_fit(&new_key) { return Ok(self .file_manager - .update_page(&index_def.id, l.serialize()?, ¤t_node.1) + .update_page(&page_id, l.serialize()?, ¤t_node.1) .await?); } @@ -148,7 +153,12 @@ impl IndexManager { index_def: &Index, offset: &PageOffset, ) -> Result { - match self.file_manager.get_page(&index_def.id, offset).await? { + let page_id = PageId { + resource_key: index_def.id, + page_type: PageType::Data, + }; + + match self.file_manager.get_page(&page_id, offset).await? { Some(mut page) => Ok(BTreeNode::parse(&mut page, index_def)?), None => Err(IndexManagerError::NoSuchNode(*offset)), } @@ -164,8 +174,13 @@ impl IndexManager { match self.get_node(index_def, &PageOffset(1)).await { Ok(o) => Ok((o, PageOffset(1))), Err(IndexManagerError::NoSuchNode(_)) => { + let page_id = PageId { + resource_key: index_def.id, + page_type: PageType::Data, + }; + //Page zero with no data in it - self.make_root_page(&index_def.id).await?; + self.make_root_page(&page_id).await?; let root_node = BTreeLeaf { parent_node: None, @@ -176,7 +191,7 @@ impl IndexManager { let page_num = self .file_manager - .add_page(&index_def.id, root_node.serialize()?) + .add_page(&page_id, root_node.serialize()?) .await?; if page_num != PageOffset(1) { return Err(IndexManagerError::ConcurrentCreationError()); @@ -188,7 +203,7 @@ impl IndexManager { } } - async fn make_root_page(&self, index: &Uuid) -> Result<(), IndexManagerError> { + async fn make_root_page(&self, index: &PageId) -> Result<(), IndexManagerError> { let mut root_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); let root_page = vec![0; PAGE_SIZE as usize]; root_page_buffer.extend_from_slice(&root_page); diff --git a/src/engine/io/page_formats.rs b/src/engine/io/page_formats.rs index fa4f201..9bd8f31 100644 --- a/src/engine/io/page_formats.rs +++ b/src/engine/io/page_formats.rs @@ -6,6 +6,10 @@ mod page_data; pub use page_data::PageData; pub use page_data::PageDataError; +mod page_id; +pub use page_id::PageId; +pub use page_id::PageType; + mod page_offset; pub use page_offset::PageOffset; pub use page_offset::PageOffsetError; diff --git a/src/engine/io/page_formats/page_id.rs b/src/engine/io/page_formats/page_id.rs new file mode 100644 index 0000000..b1e416c --- /dev/null +++ b/src/engine/io/page_formats/page_id.rs @@ -0,0 +1,68 @@ +//! A struct to uniquely identify a page in all operations. This replaces adding additional arguments everywhere. + +use nom::{ + bytes::complete::tag_no_case, + error::{convert_error, make_error, ContextError, ErrorKind, ParseError, VerboseError}, + Finish, IResult, +}; +use std::{ + fmt::{self, Display, Formatter}, + str::FromStr, +}; +use thiserror::Error; +use uuid::Uuid; + +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct PageId { + pub resource_key: Uuid, + pub page_type: PageType, +} + +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub enum PageType { + Data, + //FreeSpaceMap, + //VisibilityMap +} + +impl PageType { + pub fn parse_type<'a, E: ParseError<&'a str> + ContextError<&'a str>>( + input: &'a str, + ) -> IResult<&'a str, PageType, E> { + let (input, matched) = tag_no_case("data")(input)?; + + let page_type = match matched { + "data" => PageType::Data, + _ => { + return Err(nom::Err::Failure(make_error(input, ErrorKind::Fix))); + } + }; + Ok((input, page_type)) + } +} + +impl Display for PageType { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + PageType::Data => { + write!(f, "data") + } + } + } +} + +impl FromStr for PageType { + type Err = PageTypeError; + fn from_str(s: &str) -> Result { + match Self::parse_type::>(s).finish() { + Ok((_, page_type)) => Ok(page_type), + Err(e) => Err(PageTypeError::ParseError(convert_error(s, e))), + } + } +} + +#[derive(Debug, Error)] +pub enum PageTypeError { + #[error("Page Type Parse Error {0}")] + ParseError(String), +} diff --git a/src/engine/io/row_manager.rs b/src/engine/io/row_manager.rs index 55855e2..eaf8044 100644 --- a/src/engine/io/row_manager.rs +++ b/src/engine/io/row_manager.rs @@ -1,6 +1,7 @@ use super::super::objects::Table; use super::super::transactions::TransactionId; -use super::page_formats::{PageData, PageDataError, PageOffset, UInt12}; +use super::lock_manager::LockManager; +use super::page_formats::{PageData, PageDataError, PageId, PageOffset, PageType, UInt12}; use super::row_formats::{ItemPointer, RowData, RowDataError}; use super::{EncodedSize, FileManager, FileManagerError}; use crate::engine::objects::SqlTuple; @@ -15,11 +16,15 @@ use thiserror::Error; #[derive(Clone, Debug)] pub struct RowManager { file_manager: Arc, + lock_manager: LockManager, } impl RowManager { - pub fn new(file_manager: Arc) -> RowManager { - RowManager { file_manager } + pub fn new(file_manager: Arc, lock_manager: LockManager) -> RowManager { + RowManager { + file_manager, + lock_manager, + } } pub async fn insert_row( @@ -53,8 +58,12 @@ impl RowManager { page.update(row, row_pointer.count)?; + let page_id = PageId { + resource_key: table.id, + page_type: PageType::Data, + }; self.file_manager - .update_page(&table.id, page.serialize(), &row_pointer.page) + .update_page(&page_id, page.serialize(), &row_pointer.page) .await?; Ok(()) } @@ -93,8 +102,12 @@ impl RowManager { old_row.item_pointer = new_row_pointer; old_page.update(old_row, row_pointer.count)?; + let page_id = PageId { + resource_key: table.id, + page_type: PageType::Data, + }; self.file_manager - .update_page(&table.id, old_page.serialize(), &row_pointer.page) + .update_page(&page_id, old_page.serialize(), &row_pointer.page) .await?; Ok(new_row_pointer) @@ -105,9 +118,14 @@ impl RowManager { table: Arc, row_pointer: ItemPointer, ) -> Result<(PageData, RowData), RowManagerError> { + let page_id = PageId { + resource_key: table.id, + page_type: PageType::Data, + }; + let page_bytes = self .file_manager - .get_page(&table.id, &row_pointer.page) + .get_page(&page_id, &row_pointer.page) .await? .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?; let page = PageData::parse(table, row_pointer.page, page_bytes)?; @@ -128,9 +146,14 @@ impl RowManager { self, table: Arc
, ) -> impl Stream> { + let page_id = PageId { + resource_key: table.id, + page_type: PageType::Data, + }; + try_stream! { let mut page_num = PageOffset(0); - for await page_bytes in self.file_manager.get_stream(&table.id) { + for await page_bytes in self.file_manager.get_stream(&page_id) { let page_bytes = page_bytes?; match page_bytes { Some(s) => { @@ -147,15 +170,21 @@ impl RowManager { } } + // TODO implement visibility maps so I don't have to scan probable async fn insert_row_internal( &self, current_tran_id: TransactionId, table: Arc
, user_data: SqlTuple, ) -> Result { + let page_id = PageId { + resource_key: table.id, + page_type: PageType::Data, + }; + let mut page_num = PageOffset(0); loop { - let page_bytes = self.file_manager.get_page(&table.id, &page_num).await?; + let page_bytes = self.file_manager.get_page(&page_id, &page_num).await?; match page_bytes { Some(p) => { let mut page = PageData::parse(table.clone(), page_num, p)?; @@ -163,7 +192,7 @@ impl RowManager { let new_row_pointer = page.insert(current_tran_id, &table, user_data)?; let new_page_bytes = page.serialize(); self.file_manager - .update_page(&table.id, new_page_bytes, &page_num) + .update_page(&page_id, new_page_bytes, &page_num) .await?; return Ok(new_row_pointer); } else { @@ -175,7 +204,7 @@ impl RowManager { let mut new_page = PageData::new(page_num); let new_row_pointer = new_page.insert(current_tran_id, &table, user_data)?; //TODO Will NOT handle overly large rows self.file_manager - .add_page(&table.id, new_page.serialize()) + .add_page(&page_id, new_page.serialize()) .await?; return Ok(new_row_pointer); } @@ -256,7 +285,7 @@ mod tests { let table = get_table(); let fm = Arc::new(FileManager::new(tmp_dir.clone())?); - let rm = RowManager::new(fm); + let rm = RowManager::new(fm, LockManager::new()); let tran_id = TransactionId::new(1); @@ -270,7 +299,7 @@ mod tests { //Now let's make sure they're really in the table, persisting across restarts let fm = Arc::new(FileManager::new(tmp_dir)?); - let rm = RowManager::new(fm); + let rm = RowManager::new(fm, LockManager::new()); pin_mut!(rm); let result_rows: Vec = rm @@ -296,7 +325,7 @@ mod tests { let table = get_table(); let fm = Arc::new(FileManager::new(tmp_dir)?); - let rm = RowManager::new(fm); + let rm = RowManager::new(fm, LockManager::new()); let tran_id = TransactionId::new(1); diff --git a/tests/visibility_tests.rs b/tests/visibility_tests.rs index 07715ce..c5b8e35 100644 --- a/tests/visibility_tests.rs +++ b/tests/visibility_tests.rs @@ -1,7 +1,7 @@ use feophantlib::{ constants::Nullable, engine::{ - io::{row_formats::RowData, FileManager, RowManager, VisibleRowManager}, + io::{row_formats::RowData, FileManager, LockManager, RowManager, VisibleRowManager}, objects::{ types::{BaseSqlTypes, BaseSqlTypesMapper}, Attribute, SqlTuple, Table, @@ -65,7 +65,7 @@ async fn test_row_manager_visibility() -> Result<(), Box> let table = get_table(); let mut tm = TransactionManager::new(); let pm = Arc::new(FileManager::new(tmp_dir)?); - let rm = RowManager::new(pm); + let rm = RowManager::new(pm, LockManager::new()); let vm = VisibleRowManager::new(rm.clone(), tm.clone()); let row = get_row("test".to_string());