diff --git a/src/engine/io/index_formats/btree_leaf.rs b/src/engine/io/index_formats/btree_leaf.rs index 5722256..1fde6fa 100644 --- a/src/engine/io/index_formats/btree_leaf.rs +++ b/src/engine/io/index_formats/btree_leaf.rs @@ -14,7 +14,7 @@ use crate::{ objects::{types::BaseSqlTypesError, SqlTuple}, }, }; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{BufMut, BytesMut}; use std::{collections::BTreeMap, num::TryFromIntError}; use thiserror::Error; @@ -27,6 +27,15 @@ pub struct BTreeLeaf { } impl BTreeLeaf { + pub fn new() -> BTreeLeaf { + BTreeLeaf { + parent_node: None, + left_node: None, + right_node: None, + nodes: BTreeMap::new(), + } + } + pub fn add(&mut self, key: SqlTuple, item_ptr: ItemIdData) -> Result<(), BTreeLeafError> { if !self.can_fit(&key) { return Err(BTreeLeafError::KeyTooLarge(key.encoded_size())); @@ -75,22 +84,21 @@ impl BTreeLeaf { new_size <= PAGE_SIZE as usize } - pub fn serialize(&self) -> Result { - let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + pub fn serialize(&self, buffer: &mut BytesMut) -> Result<(), BTreeLeafError> { buffer.put_u8(NodeType::Leaf as u8); - BTreeNode::write_node(&mut buffer, self.parent_node)?; - BTreeNode::write_node(&mut buffer, self.left_node)?; - BTreeNode::write_node(&mut buffer, self.right_node)?; + BTreeNode::write_node(buffer, self.parent_node)?; + BTreeNode::write_node(buffer, self.left_node)?; + BTreeNode::write_node(buffer, self.right_node)?; - encode_size(&mut buffer, self.nodes.len()); + encode_size(buffer, self.nodes.len()); for (key, iids) in self.nodes.iter() { - BTreeNode::write_sql_tuple(&mut buffer, key); + BTreeNode::write_sql_tuple(buffer, key); - encode_size(&mut buffer, iids.len()); + encode_size(buffer, iids.len()); for iid in iids { - iid.serialize(&mut buffer); + iid.serialize(buffer); } } @@ -100,7 +108,7 @@ impl BTreeLeaf { buffer.extend_from_slice(&free_space); } - Ok(buffer.freeze()) + Ok(()) } } @@ -191,7 +199,8 @@ mod tests { nodes, }; - let mut test_serial = test.clone().serialize()?; + let mut test_serial = BytesMut::with_capacity(PAGE_SIZE as usize); + test.serialize(&mut test_serial)?; let test_parse = BTreeNode::parse(&mut test_serial, &get_index())?; match test_parse { diff --git a/src/engine/io/index_manager.rs b/src/engine/io/index_manager.rs index 304ebe0..604d045 100644 --- a/src/engine/io/index_manager.rs +++ b/src/engine/io/index_manager.rs @@ -1,13 +1,9 @@ //! TODO #24 Fix the index implementation to use the locking layer -use std::collections::BTreeMap; -use std::ops::Range; -use std::sync::Arc; - use super::index_formats::{BTreeLeafError, BTreeNode, BTreeNodeError}; use super::page_formats::PageOffset; use super::page_formats::{ItemIdData, PageId, PageType}; -use super::{FileManager, FileManagerError}; +use super::{FileManager, FileManagerError, LockCacheManager, LockCacheManagerError}; use crate::engine::io::SelfEncodedSize; use crate::{ constants::PAGE_SIZE, @@ -16,21 +12,26 @@ use crate::{ objects::{Index, SqlTuple}, }, }; -use bytes::BytesMut; +use bytes::{Buf, BufMut, BytesMut}; +use std::collections::BTreeMap; +use std::convert::TryFrom; +use std::mem::size_of; +use std::num::TryFromIntError; +use std::ops::Range; +use std::sync::Arc; use thiserror::Error; -use uuid::Uuid; //TODO Support something other than btrees //TODO Support searching on a non primary index column #[derive(Clone, Debug)] pub struct IndexManager { - file_manager: Arc, + lock_cache_manager: LockCacheManager, } impl IndexManager { - pub fn new(file_manager: Arc) -> IndexManager { - IndexManager { file_manager } + pub fn new(lock_cache_manager: LockCacheManager) -> IndexManager { + IndexManager { lock_cache_manager } } pub async fn add( @@ -90,10 +91,11 @@ impl IndexManager { }; if l.can_fit(&new_key) { - return Ok(self - .file_manager - .update_page(&page_id, ¤t_node.1, l.serialize()?) - .await?); + + //return Ok(self + // .file_manager + // .update_page(&page_id, ¤t_node.1, l.serialize()?) + // .await?); } //If we're here, we have a key that doesn't fit into the leaf so we need to split it. @@ -154,8 +156,11 @@ impl IndexManager { page_type: PageType::Data, }; - match self.file_manager.get_page(&page_id, offset).await? { - Some(mut page) => Ok(BTreeNode::parse(&mut page, index_def)?), + let page_handle = self.lock_cache_manager.get_page(page_id, *offset).await?; + let page_buffer = page_handle.clone(); + + match page_buffer { + Some(page) => Ok(BTreeNode::parse(&mut page.freeze(), index_def)?), None => Err(IndexManagerError::NoSuchNode(*offset)), } } @@ -167,47 +172,87 @@ impl IndexManager { &self, index_def: &Index, ) -> Result<(BTreeNode, PageOffset), IndexManagerError> { - match self.get_node(index_def, &PageOffset(1)).await { - Ok(o) => Ok((o, PageOffset(1))), - Err(IndexManagerError::NoSuchNode(_)) => { - let page_id = PageId { - resource_key: index_def.id, - page_type: PageType::Data, - }; - - //Page zero with no data in it - self.make_root_page(&page_id).await?; - - let root_node = BTreeLeaf { - parent_node: None, - left_node: None, - right_node: None, - nodes: BTreeMap::new(), - }; - - self.file_manager - .add_page(&page_id, &PageOffset(1), root_node.serialize()?) - .await?; - - Ok((BTreeNode::Leaf(root_node), PageOffset(1))) - } - Err(e) => Err(e), + let page_id = PageId { + resource_key: index_def.id, + page_type: PageType::Data, + }; + let first_page_handle = self + .lock_cache_manager + .get_page(page_id, PageOffset(0)) + .await?; + + if let Some(s) = first_page_handle.as_ref() { + let mut first_page = s.clone(); + return self + .parse_root_page(index_def, &mut first_page, page_id) + .await; } - } - async fn make_root_page(&self, index: &PageId) -> Result<(), IndexManagerError> { - let mut root_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - let root_page = vec![0; PAGE_SIZE as usize]; - root_page_buffer.extend_from_slice(&root_page); - self.file_manager - .add_page(index, &PageOffset(0), root_page_buffer.freeze()) + //We have to make it and handle the race window + drop(first_page_handle); + + let mut new_first_page_handle = self + .lock_cache_manager + .get_page_for_update(page_id, PageOffset(0)) .await?; - //if page_num != PageOffset(0) { - // return Err(IndexManagerError::ConcurrentCreationError()); - //} + if let Some(s) = new_first_page_handle.as_mut() { + return self.parse_root_page(index_def, s, page_id).await; + } - Ok(()) + let root_offset = self.lock_cache_manager.get_offset(page_id).await?; + + let mut new_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + new_page_buffer.put_uint_le(u64::try_from(root_offset.0)?, size_of::()); + let new_page = vec![0; PAGE_SIZE as usize - size_of::()]; + + new_page_buffer.extend_from_slice(&new_page); + new_first_page_handle.replace(new_page_buffer); + self.lock_cache_manager + .add_page(page_id, PageOffset(0), new_first_page_handle) + .await?; + + //Now make the root node and save it + let mut root_handle = self + .lock_cache_manager + .get_page_for_update(page_id, root_offset) + .await?; + if let Some(s) = root_handle.as_mut() { + return self.parse_root_page(index_def, s, page_id).await; + } + + let root_node = BTreeLeaf::new(); + + let mut root_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + root_node.serialize(&mut root_buffer)?; + root_handle.replace(root_buffer); + + self.lock_cache_manager + .update_page(page_id, root_offset, root_handle) + .await?; + return Ok((BTreeNode::Leaf(root_node), root_offset)); + } + + async fn parse_root_page( + &self, + index_def: &Index, + first_page: &mut BytesMut, + page_id: PageId, + ) -> Result<(BTreeNode, PageOffset), IndexManagerError> { + let root_offset = usize::try_from(first_page.get_uint_le(size_of::()))?; + let root_handle = self + .lock_cache_manager + .get_page(page_id, PageOffset(root_offset)) + .await?; + let mut root_page = root_handle + .as_ref() + .ok_or(IndexManagerError::RootNodeEmpty())? + .clone() + .freeze(); + return Ok(( + BTreeNode::parse(&mut root_page, index_def)?, + PageOffset(root_offset), + )); } } @@ -221,14 +266,18 @@ pub enum IndexManagerError { "Another process made the root index page first, maybe the developer should make locking." )] ConcurrentCreationError(), - #[error(transparent)] - FileManagerError(#[from] FileManagerError), #[error("Key too large size: {0}, maybe the developer should fix this.")] KeyTooLarge(usize), + #[error(transparent)] + LockCacheManagerError(#[from] LockCacheManagerError), #[error("Node does not exists {0}")] NoSuchNode(PageOffset), + #[error("Root Node Empty")] + RootNodeEmpty(), #[error("Unable to search, the stack is empty")] StackEmpty(), + #[error(transparent)] + TryFromIntError(#[from] TryFromIntError), #[error("Unable to split a node of size {0}")] UnableToSplit(usize), } diff --git a/src/engine/objects/attribute.rs b/src/engine/objects/attribute.rs index 9d77899..da2fb43 100644 --- a/src/engine/objects/attribute.rs +++ b/src/engine/objects/attribute.rs @@ -1,11 +1,8 @@ //!Postgres Doc: https://www.postgresql.org/docs/current/catalog-pg-attribute.html -use std::fmt; - -use crate::constants::Nullable; -use uuid::Uuid; - use super::types::BaseSqlTypesMapper; +use crate::constants::Nullable; +use std::fmt; #[derive(Clone, Debug, PartialEq)] pub struct Attribute { diff --git a/src/engine/objects/planned_statement.rs b/src/engine/objects/planned_statement.rs index 0e19431..683aa77 100644 --- a/src/engine/objects/planned_statement.rs +++ b/src/engine/objects/planned_statement.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use super::{types::SqlTypeDefinition, Attribute, SqlTuple, Table}; +use super::{types::SqlTypeDefinition, SqlTuple, Table}; pub struct PlannedStatement { pub common: PlannedCommon, diff --git a/tests/common/mod.rs b/tests/common/mod.rs index c26b6e4..4e256da 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,4 +1,3 @@ -use feophantlib::engine::{transactions::TransactionManager, Engine}; use feophantlib::feophant::FeOphant; use tempfile::TempDir; use tokio::sync::oneshot;