diff --git a/src/engine.rs b/src/engine.rs index 07c6d77..16e1d6c 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -35,6 +35,7 @@ use transactions::{TransactionId, TransactionManager}; use self::io::ConstraintManager; use self::io::FileManager; +use self::io::IndexManager; use self::io::LockCacheManager; use self::objects::QueryResult; use std::ops::Deref; @@ -50,11 +51,10 @@ pub struct Engine { impl Engine { pub fn new(file_manager: Arc, tran_manager: TransactionManager) -> Engine { - let vis_row_man = VisibleRowManager::new( - RowManager::new(LockCacheManager::new(file_manager)), - tran_manager, - ); - let con_man = ConstraintManager::new(vis_row_man.clone()); + let lock_cache = LockCacheManager::new(file_manager); + let vis_row_man = VisibleRowManager::new(RowManager::new(lock_cache.clone()), tran_manager); + let index_manager = IndexManager::new(lock_cache); + let con_man = ConstraintManager::new(index_manager, vis_row_man.clone()); Engine { analyzer: Analyzer::new(vis_row_man), executor: Executor::new(con_man), diff --git a/src/engine/io.rs b/src/engine/io.rs index 5ed67ad..1096029 100644 --- a/src/engine/io.rs +++ b/src/engine/io.rs @@ -12,6 +12,9 @@ mod index_formats; mod index_manager; pub use index_manager::IndexManager; +mod index_row_manager; +pub use index_row_manager::IndexRowManager; + mod file_manager; pub use file_manager::FileManager; pub use file_manager::FileManagerError; diff --git a/src/engine/io/constraint_manager.rs b/src/engine/io/constraint_manager.rs index 348d7b6..283a966 100644 --- a/src/engine/io/constraint_manager.rs +++ b/src/engine/io/constraint_manager.rs @@ -16,7 +16,7 @@ use crate::{ use super::{ row_formats::{ItemPointer, RowData}, - VisibleRowManager, VisibleRowManagerError, + IndexManager, VisibleRowManager, VisibleRowManagerError, }; /// The goal of the constraint manager is to ensure all constrainst are satisfied @@ -24,12 +24,16 @@ use super::{ /// of RowData #[derive(Clone, Debug)] pub struct ConstraintManager { + index_manager: IndexManager, vis_row_man: VisibleRowManager, } impl ConstraintManager { - pub fn new(vis_row_man: VisibleRowManager) -> ConstraintManager { - ConstraintManager { vis_row_man } + pub fn new(index_manager: IndexManager, vis_row_man: VisibleRowManager) -> ConstraintManager { + ConstraintManager { + index_manager, + vis_row_man, + } } pub async fn insert_row( @@ -38,12 +42,15 @@ impl ConstraintManager { table: Arc, user_data: SqlTuple, ) -> Result { + //column count check if table.attributes.len() != user_data.0.len() { return Err(ConstraintManagerError::TableRowSizeMismatch( table.attributes.len(), user_data.0.len(), )); } + + //null checks for (data, column) in user_data.0.iter().zip(table.attributes.clone()) { match data { Some(d) => { @@ -62,6 +69,17 @@ impl ConstraintManager { } } + //constraint check + for c in &table.constraints { + match c { + crate::engine::objects::Constraint::PrimaryKey(p) => { + //So for a primary key we have to check for no other dups in the table + + //So what I want to do is ask the index manager to to get active rows matching the key + } + } + } + Ok(self .vis_row_man .insert_row(current_tran_id, table, user_data) diff --git a/src/engine/io/free_space_manager.rs b/src/engine/io/free_space_manager.rs index 5d75fbc..23be17f 100644 --- a/src/engine/io/free_space_manager.rs +++ b/src/engine/io/free_space_manager.rs @@ -30,7 +30,7 @@ impl FreeSpaceManager { page_type: PageType::FreeSpaceMap, }; loop { - let page_handle = self.lock_cache_manager.get_page(free_id, offset).await?; + let page_handle = self.lock_cache_manager.get_page(free_id, &offset).await?; match page_handle.as_ref() { Some(s) => { let mut page_frozen = s.clone().freeze(); @@ -53,7 +53,7 @@ impl FreeSpaceManager { let next_po = self.lock_cache_manager.get_offset(free_id).await?; let mut new_page_handle = self .lock_cache_manager - .get_page_for_update(free_id, next_po) + .get_page_for_update(free_id, &next_po) .await?; let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); @@ -84,7 +84,7 @@ impl FreeSpaceManager { let (po, offset) = po.get_bitmask_offset(); let mut page_handle = self .lock_cache_manager - .get_page_for_update(free_id, po) + .get_page_for_update(free_id, &po) .await?; let mut page = page_handle .as_mut() diff --git a/src/engine/io/index_formats.rs b/src/engine/io/index_formats.rs index 78c1967..1a4eb5b 100644 --- a/src/engine/io/index_formats.rs +++ b/src/engine/io/index_formats.rs @@ -1,3 +1,7 @@ +mod split_branch; +pub use split_branch::split_branch; +pub use split_branch::SplitBranchError; + mod btree_branch; pub use btree_branch::BTreeBranch; pub use btree_branch::BTreeBranchError; @@ -9,3 +13,7 @@ pub use btree_leaf::BTreeLeafError; mod btree_node; pub use btree_node::BTreeNode; pub use btree_node::BTreeNodeError; + +mod index_search; +pub use index_search::index_search_start; +pub use index_search::IndexSearchError; diff --git a/src/engine/io/index_formats/btree_branch.rs b/src/engine/io/index_formats/btree_branch.rs index 1b1562b..d2f43fa 100644 --- a/src/engine/io/index_formats/btree_branch.rs +++ b/src/engine/io/index_formats/btree_branch.rs @@ -1,7 +1,8 @@ use super::{ btree_node::{BTreeNodeError, NodeType}, - BTreeNode, + split_branch, BTreeNode, SplitBranchError, }; +use super::{index_search_start, IndexSearchError}; use crate::{ constants::PAGE_SIZE, engine::{ @@ -15,42 +16,137 @@ use crate::{ }, }; use bytes::{BufMut, Bytes, BytesMut}; -use std::{convert::TryFrom, num::TryFromIntError}; +use std::{convert::TryFrom, num::TryFromIntError, ops::RangeBounds}; use thiserror::Error; #[derive(Clone, Debug, PartialEq)] pub struct BTreeBranch { - pub parent_node: Option, - pub left_node: Option, - pub right_node: Option, + pub parent_node: PageOffset, pub keys: Vec, pub pointers: Vec, } impl BTreeBranch { - //TODO An add function doesn't seem to make sense + pub fn new( + parent_node: PageOffset, + left_pointer: PageOffset, + key: SqlTuple, + right_pointer: PageOffset, + ) -> BTreeBranch { + BTreeBranch { + parent_node, + keys: vec![key], + pointers: vec![left_pointer, right_pointer], + } + } + + pub fn add( + &mut self, + old_pointer: PageOffset, + left_pointer: PageOffset, + key: SqlTuple, + right_pointer: PageOffset, + ) -> Result<(), BTreeBranchError> { + if !self.can_fit(&key) { + return Err(BTreeBranchError::KeyTooLarge(key.encoded_size())); + } + + // 0 2 3 4 5 + // a c d e f g + + // 0 1 2 3 4 5 + // a b c d e f g + + //Find where the new key fits + let mut new_key_loc = self.keys.len(); + for i in 0..self.keys.len() { + if key > self.keys[i] { + new_key_loc = i + 1; + break; + } + } + self.keys.insert(new_key_loc, key); + + self.pointers.remove(new_key_loc); + self.pointers.insert(new_key_loc, right_pointer); + self.pointers.insert(new_key_loc, left_pointer); + + Ok(()) + } + + /// This function is used when the branch is full and we need to split the contents into two new branches + /// **WARNING** If this function fails the branch should be considered poisoned and not used. + pub fn add_and_split( + &mut self, + old_pointer: PageOffset, + left_pointer: PageOffset, + key: SqlTuple, + right_pointer: PageOffset, + ) -> Result<(SqlTuple, BTreeBranch), BTreeBranchError> { + let key_size = key.encoded_size(); + + //Unchecked add + let mut new_key_loc = self.keys.len(); + for i in 0..self.keys.len() { + if key > self.keys[i] { + new_key_loc = i + 1; + break; + } + } + self.keys.insert(new_key_loc, key); + + self.pointers.remove(new_key_loc); + self.pointers.insert(new_key_loc, right_pointer); + self.pointers.insert(new_key_loc, left_pointer); + + //Now we split + let (middle, right_keys, right_pointers) = + split_branch(&mut self.keys, &mut self.pointers)?; + + let new_right = BTreeBranch { + parent_node: self.parent_node, + keys: right_keys, + pointers: right_pointers, + }; - pub fn can_fit(&self, new_keys: SqlTuple) -> bool { + if self.encoded_size() > PAGE_SIZE.into() || new_right.encoded_size() > PAGE_SIZE.into() { + return Err(BTreeBranchError::KeyTooLarge(key_size)); + } + + Ok((middle, new_right)) + } + + pub fn can_fit(&self, new_key: &SqlTuple) -> bool { let current_size = 1 + //Type - (PageOffset::encoded_size() * 3) + //Pointers + (PageOffset::encoded_size()) + //Parent Pointer expected_encoded_size(self.keys.len() + 1) + //Length assuming inserted self.keys.iter().fold(0, |acc, tup| acc + NullMask::encoded_size(&tup) + tup.encoded_size()) + //Keys - NullMask::encoded_size(&new_keys) + //New key null mask - new_keys.encoded_size() + //New Key + NullMask::encoded_size(new_key) + //New key null mask + new_key.encoded_size() + //New Key ItemIdData::encoded_size() * (self.keys.len() + 1); //Pointers to nodes current_size <= PAGE_SIZE as usize } + /// Finds the first PageOffset that satisfys the range + pub fn search<'a, R>(&'a self, range: R) -> Result<&'a PageOffset, BTreeBranchError> + where + R: RangeBounds, + { + if self.keys.is_empty() { + return Err(BTreeBranchError::MissingKeys()); + } + + Ok(index_search_start(&self.keys, &self.pointers, range)?) + } + pub fn serialize(&self) -> Result { let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); buffer.put_u8(NodeType::Branch as u8); - BTreeNode::write_node(&mut buffer, self.parent_node)?; - BTreeNode::write_node(&mut buffer, self.left_node)?; - BTreeNode::write_node(&mut buffer, self.right_node)?; + BTreeNode::write_node(&mut buffer, Some(self.parent_node))?; encode_size(&mut buffer, self.keys.len()); @@ -73,6 +169,24 @@ impl BTreeBranch { } } +impl SelfEncodedSize for BTreeBranch { + fn encoded_size(&self) -> usize { + let mut new_size = 1 + (PageOffset::encoded_size()); //Type plus pointer + + new_size += expected_encoded_size(self.keys.len()); + for tup in self.keys.iter() { + new_size += NullMask::encoded_size(tup); + new_size += tup.encoded_size(); + } + + for point in self.pointers.iter() { + new_size += PageOffset::encoded_size(); + } + + new_size + } +} + #[derive(Debug, Error)] pub enum BTreeBranchError { #[error(transparent)] @@ -82,9 +196,13 @@ pub enum BTreeBranchError { #[error("Buffer too short to parse")] BufferTooShort(), #[error(transparent)] + IndexSearchError(#[from] IndexSearchError), + #[error(transparent)] ItemIdDataError(#[from] ItemIdDataError), #[error("Key too large size: {0}")] KeyTooLarge(usize), + #[error("No keys to search")] + MissingKeys(), #[error("Missing Data for Node Type need {0}, have {1}")] MissingNodeTypeData(usize, usize), #[error("Missing Data for Pointer need {0}, have {1}")] @@ -94,7 +212,13 @@ pub enum BTreeBranchError { #[error(transparent)] SizeError(#[from] SizeError), #[error(transparent)] + SplitBranchError(#[from] SplitBranchError), + #[error(transparent)] TryFromIntError(#[from] TryFromIntError), + #[error("Unable to split")] + UnableToSplit(), + #[error("Unable to find split point")] + UnableToFindSplit(), } #[cfg(test)] @@ -106,7 +230,7 @@ mod tests { constants::Nullable, engine::objects::{ types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition}, - Attribute, Index, Table, + Attribute, Index, }, }; use uuid::Uuid; @@ -149,19 +273,17 @@ mod tests { let pointers = vec![PageOffset(3), PageOffset(3), PageOffset(3)]; let test = BTreeBranch { - parent_node: None, - left_node: Some(PageOffset(1)), - right_node: Some(PageOffset(2)), + parent_node: PageOffset(1), keys, pointers, }; - let mut test_serial = test.clone().serialize()?; + let mut test_serial = test.serialize()?; let test_parse = BTreeNode::parse(&mut test_serial, &get_index())?; match test_parse { BTreeNode::Branch(b) => assert_eq!(test, b), - _ => assert!(false), + _ => panic!("Not a branch"), } Ok(()) diff --git a/src/engine/io/index_formats/btree_leaf.rs b/src/engine/io/index_formats/btree_leaf.rs index 71546ce..067660b 100644 --- a/src/engine/io/index_formats/btree_leaf.rs +++ b/src/engine/io/index_formats/btree_leaf.rs @@ -8,14 +8,14 @@ use crate::{ io::{ encode_size, expected_encoded_size, page_formats::{ItemIdData, ItemIdDataError, PageOffset}, - row_formats::{NullMask, NullMaskError}, + row_formats::{ItemPointer, NullMask, NullMaskError}, ConstEncodedSize, EncodedSize, SelfEncodedSize, SizeError, }, objects::{types::BaseSqlTypesError, SqlTuple}, }, }; use bytes::{BufMut, BytesMut}; -use std::{collections::BTreeMap, num::TryFromIntError}; +use std::{collections::BTreeMap, num::TryFromIntError, ops::RangeBounds}; use thiserror::Error; #[derive(Clone, Debug, PartialEq)] @@ -23,7 +23,7 @@ pub struct BTreeLeaf { pub parent_node: Option, pub left_node: Option, pub right_node: Option, - pub nodes: BTreeMap>, + pub nodes: BTreeMap>, } impl BTreeLeaf { @@ -36,7 +36,7 @@ impl BTreeLeaf { } } - pub fn add(&mut self, key: SqlTuple, item_ptr: ItemIdData) -> Result<(), BTreeLeafError> { + pub fn add(&mut self, key: SqlTuple, item_ptr: ItemPointer) -> Result<(), BTreeLeafError> { if !self.can_fit(&key) { return Err(BTreeLeafError::KeyTooLarge(key.encoded_size())); } @@ -51,8 +51,60 @@ impl BTreeLeaf { Ok(()) } + /// This function is used when the leaf is full and we need to split the contents into a new leaf, to the right. + /// **WARNING** If this function fails the leaf should be considered poisoned and not used. + pub fn add_and_split( + &mut self, + current_node: PageOffset, + new_node: PageOffset, + key: SqlTuple, + item_ptr: ItemPointer, + ) -> Result<(SqlTuple, BTreeLeaf), BTreeLeafError> { + let key_size = key.encoded_size(); + + //Unchecked add + match self.nodes.get_mut(&key) { + Some(iids) => iids.push(item_ptr), + None => { + self.nodes.insert(key, vec![item_ptr]); + } + } + + //Now we split + let middle_entry = self + .nodes + .iter() + .nth(self.nodes.len() / 2) + .ok_or_else(BTreeLeafError::UnableToSplit)? + .0 + .clone(); + + let new_right_nodes = self.nodes.split_off(&middle_entry); + let new_right = BTreeLeaf { + parent_node: self.parent_node, + left_node: Some(current_node), + right_node: self.right_node, + nodes: new_right_nodes, + }; + + self.right_node = Some(new_node); + + if self.encoded_size() > PAGE_SIZE.into() || new_right.encoded_size() > PAGE_SIZE.into() { + return Err(BTreeLeafError::KeyTooLarge(key_size)); + } + + let new_split_point = self + .nodes + .iter() + .rev() + .next() + .ok_or_else(BTreeLeafError::UnableToFindSplit)?; + + Ok((new_split_point.0.clone(), new_right)) + } + pub fn can_fit(&self, new_key: &SqlTuple) -> bool { - let mut new_key_present = self.nodes.contains_key(&new_key); + let new_key_present = self.nodes.contains_key(new_key); let mut new_size = 1 + (PageOffset::encoded_size() * 3); //Type plus pointers @@ -62,14 +114,14 @@ impl BTreeLeaf { } else { new_size += expected_encoded_size(self.nodes.len() + 1); - new_size += NullMask::encoded_size(&new_key); + new_size += NullMask::encoded_size(new_key); new_size += new_key.encoded_size(); new_size += expected_encoded_size(1); //New Item Id new_size += ItemIdData::encoded_size() } for (tup, iids) in self.nodes.iter() { - new_size += NullMask::encoded_size(&tup); + new_size += NullMask::encoded_size(tup); new_size += tup.encoded_size(); if new_key_present && tup == new_key { @@ -84,6 +136,16 @@ impl BTreeLeaf { new_size <= PAGE_SIZE as usize } + pub fn search(&self, range: R) -> Vec + where + R: RangeBounds, + { + self.nodes + .range(range) + .flat_map(|(k, v)| v.clone()) + .collect() + } + pub fn serialize(&self, buffer: &mut BytesMut) -> Result<(), BTreeLeafError> { buffer.put_u8(NodeType::Leaf as u8); @@ -112,6 +174,24 @@ impl BTreeLeaf { } } +impl SelfEncodedSize for BTreeLeaf { + fn encoded_size(&self) -> usize { + let mut new_size = 1 + (PageOffset::encoded_size() * 3); //Type plus pointers + + new_size += expected_encoded_size(self.nodes.len()); + + for (tup, iids) in self.nodes.iter() { + new_size += NullMask::encoded_size(tup); + new_size += tup.encoded_size(); + + new_size += expected_encoded_size(iids.len()); + new_size += ItemIdData::encoded_size() * iids.len(); + } + + new_size + } +} + #[derive(Debug, Error)] pub enum BTreeLeafError { #[error(transparent)] @@ -134,6 +214,10 @@ pub enum BTreeLeafError { SizeError(#[from] SizeError), #[error(transparent)] TryFromIntError(#[from] TryFromIntError), + #[error("Unable to split")] + UnableToSplit(), + #[error("Unable to find split point")] + UnableToFindSplit(), } #[cfg(test)] @@ -147,14 +231,13 @@ mod tests { io::page_formats::UInt12, objects::{ types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition}, - Attribute, Index, Table, + Attribute, Index, }, }, }; use uuid::Uuid; fn get_index() -> Index { - let tbl_uuid = Uuid::new_v4(); let attrs = vec![ Attribute::new( "foo".to_string(), @@ -180,31 +263,40 @@ mod tests { #[test] fn test_btree_leaf_roundtrip() -> Result<(), Box> { - let mut nodes = BTreeMap::new(); - nodes.insert( - SqlTuple(vec![None, Some(BaseSqlTypes::Text("Test".to_string()))]), - vec![ItemIdData::new(UInt12::new(1)?, UInt12::new(2)?)], - ); - nodes.insert( - SqlTuple(vec![None, Some(BaseSqlTypes::Text("Test2".to_string()))]), - vec![ItemIdData::new(UInt12::new(3)?, UInt12::new(4)?)], - ); - - let test = BTreeLeaf { + let mut test = BTreeLeaf { parent_node: None, left_node: Some(PageOffset(1)), right_node: Some(PageOffset(2)), - nodes, + nodes: BTreeMap::new(), }; + let first_key = SqlTuple(vec![None, Some(BaseSqlTypes::Text("Test".to_string()))]); + let first_val = ItemPointer::new(PageOffset(1), UInt12::new(2)?); + test.add(first_key, first_val)?; + test.add( + SqlTuple(vec![None, Some(BaseSqlTypes::Text("Test2".to_string()))]), + ItemPointer::new(PageOffset(3), UInt12::new(4)?), + )?; + + let found = test.search( + SqlTuple(vec![None, Some(BaseSqlTypes::Text("Test".to_string()))]) + ..=SqlTuple(vec![None, Some(BaseSqlTypes::Text("Test".to_string()))]), + ); + + assert_eq!( + found, + vec![ItemPointer::new(PageOffset(1), UInt12::new(2)?)] + ); let mut test_serial = BytesMut::with_capacity(PAGE_SIZE as usize); test.serialize(&mut test_serial)?; - let test_parse = BTreeNode::parse(&mut test_serial, &get_index())?; + let test_parse = match BTreeNode::parse(&mut test_serial, &get_index())? { + BTreeNode::Leaf(l) => l, + _ => { + panic!("That's not a leaf!"); + } + }; - match test_parse { - BTreeNode::Leaf(l) => assert_eq!(test, l), - _ => assert!(false), - } + assert_eq!(test, test_parse); Ok(()) } diff --git a/src/engine/io/index_formats/btree_node.rs b/src/engine/io/index_formats/btree_node.rs index 8a19862..cc9ed4e 100644 --- a/src/engine/io/index_formats/btree_node.rs +++ b/src/engine/io/index_formats/btree_node.rs @@ -20,12 +20,12 @@ //! Note: Min size for all indexes is 2x PAGE_SIZE since the root page is used to mean None. This will change //! since the root page will have a pointer so we can lock and split the root node. -use crate::engine::io::page_formats::{ItemIdDataError, PageOffset}; -use crate::engine::io::row_formats::NullMaskError; +use crate::engine::io::page_formats::PageOffset; +use crate::engine::io::row_formats::{ItemPointer, ItemPointerError, NullMaskError}; use crate::engine::io::{parse_size, ConstEncodedSize, SizeError}; use crate::engine::objects::types::{BaseSqlTypes, BaseSqlTypesError}; use crate::engine::{ - io::{page_formats::ItemIdData, row_formats::NullMask}, + io::row_formats::NullMask, objects::{Index, SqlTuple}, }; use bytes::{Buf, BufMut}; @@ -80,10 +80,11 @@ impl BTreeNode { let node_type = buffer.get_u8(); let parent_node = Self::parse_page(buffer)?; - let left_node = Self::parse_page(buffer)?; - let right_node = Self::parse_page(buffer)?; if node_type == NodeType::Leaf as u8 { + let left_node = Self::parse_page(buffer)?; + let right_node = Self::parse_page(buffer)?; + let bucket_count = parse_size(buffer)?; let mut buckets = BTreeMap::new(); @@ -93,8 +94,8 @@ impl BTreeNode { let item_count = parse_size(buffer)?; let mut items = vec![]; for _ in 0..item_count { - let item_id = ItemIdData::parse(buffer)?; - items.push(item_id); + let item_ptr = ItemPointer::parse(buffer)?; + items.push(item_ptr); } buckets.insert(bucket, items); @@ -107,6 +108,8 @@ impl BTreeNode { nodes: buckets, })); } else { + let parent = parent_node.ok_or_else(BTreeNodeError::ParentNull)?; + let keys_count = parse_size(buffer)?; let mut keys = Vec::with_capacity(keys_count); @@ -132,9 +135,7 @@ impl BTreeNode { } return Ok(BTreeNode::Branch(BTreeBranch { - parent_node, - left_node, - right_node, + parent_node: parent, keys, pointers, })); @@ -182,7 +183,7 @@ pub enum BTreeNodeError { #[error("Buffer too short to parse")] BufferTooShort(), #[error(transparent)] - ItemIdDataError(#[from] ItemIdDataError), + ItemPointerError(#[from] ItemPointerError), #[error("Key too large size: {0}")] KeyTooLarge(usize), #[error("Missing Data for Node Type need {0}, have {1}")] @@ -191,6 +192,8 @@ pub enum BTreeNodeError { MissingPointerData(usize, usize), #[error(transparent)] NullMaskError(#[from] NullMaskError), + #[error("Parent cannot be 0")] + ParentNull(), #[error(transparent)] SizeError(#[from] SizeError), #[error(transparent)] diff --git a/src/engine/io/index_formats/index_search.rs b/src/engine/io/index_formats/index_search.rs new file mode 100644 index 0000000..d61afec --- /dev/null +++ b/src/engine/io/index_formats/index_search.rs @@ -0,0 +1,79 @@ +use std::ops::{Bound, RangeBounds}; +use thiserror::Error; + +pub fn index_search_start<'a, K, R, T>( + keys: &Vec, + pointers: &'a Vec, + range: R, +) -> Result<&'a T, IndexSearchError> +where + K: PartialOrd, + R: RangeBounds, + T: Clone, +{ + //Sanity checks + if keys.is_empty() || pointers.is_empty() { + return Err(IndexSearchError::Empty(keys.len(), pointers.len())); + } else if keys.len() + 1 != pointers.len() { + return Err(IndexSearchError::WrongCount(keys.len(), pointers.len())); + } + + if let (Bound::Unbounded, Bound::Unbounded) = (range.start_bound(), range.end_bound()) { + Ok(&pointers[0]) + } else if let Bound::Included(b) | Bound::Excluded(b) = range.start_bound() { + for i in 0..keys.len() { + if b <= &keys[i] { + return Ok(&pointers[i]); + } + } + Ok(&pointers[keys.len()]) + } else if let Bound::Included(b) | Bound::Excluded(b) = range.end_bound() { + for i in (0..keys.len()).rev() { + if &keys[i] <= b { + return Ok(&pointers[i]); + } + } + Ok(&pointers[0]) + } else { + Err(IndexSearchError::UnreachableState()) + } +} + +#[derive(Debug, Error)] +pub enum IndexSearchError { + #[error("Either keys {0}, or pointers {1} are empty")] + Empty(usize, usize), + #[error("You should never get here")] + UnreachableState(), + #[error("Wrong count keys {0} must be one less than pointers {1}")] + WrongCount(usize, usize), +} + +#[cfg(test)] +mod tests { + use std::ops::RangeFull; + + use super::*; + + #[test] + fn test_roundtrip() -> Result<(), Box> { + // 0 1 2 3 4 5 + // a b c d e f g + + let keys = vec![0, 1, 2, 3, 4, 5]; + let pointers = vec!["a", "b", "c", "d", "e", "f", "g"]; + + assert_eq!(index_search_start(&keys, &pointers, 1..3)?, &"b"); + assert_eq!(index_search_start(&keys, &pointers, 1..)?, &"b"); + assert_eq!(index_search_start(&keys, &pointers, ..4)?, &"e"); + assert_eq!(index_search_start(&keys, &pointers, RangeFull)?, &"a"); + assert_eq!(index_search_start(&keys, &pointers, 1..=3)?, &"b"); + assert_eq!(index_search_start(&keys, &pointers, ..=3)?, &"d"); + assert_eq!(index_search_start(&keys, &pointers, 6..9)?, &"g"); + + //Edge cases + assert_eq!(index_search_start(&keys, &pointers, ..0)?, &"a"); + + Ok(()) + } +} diff --git a/src/engine/io/index_manager.rs b/src/engine/io/index_manager.rs index 604d045..65c2b49 100644 --- a/src/engine/io/index_manager.rs +++ b/src/engine/io/index_manager.rs @@ -1,10 +1,19 @@ //! TODO #24 Fix the index implementation to use the locking layer -use super::index_formats::{BTreeLeafError, BTreeNode, BTreeNodeError}; +// Okay so more thinking, my approach needs to change +/* + For adds, I'll find the leaf page using write locks, dropping as I go. + + Once found, I'll add and then follow the parents up until everything fits + + +*/ + +use super::index_formats::{BTreeBranchError, BTreeLeafError, BTreeNode, BTreeNodeError}; use super::page_formats::PageOffset; use super::page_formats::{ItemIdData, PageId, PageType}; -use super::{FileManager, FileManagerError, LockCacheManager, LockCacheManagerError}; -use crate::engine::io::SelfEncodedSize; +use super::row_formats::ItemPointer; +use super::{LockCacheManager, LockCacheManagerError, SelfEncodedSize}; use crate::{ constants::PAGE_SIZE, engine::{ @@ -13,13 +22,12 @@ use crate::{ }, }; use bytes::{Buf, BufMut, BytesMut}; -use std::collections::BTreeMap; use std::convert::TryFrom; use std::mem::size_of; use std::num::TryFromIntError; use std::ops::Range; -use std::sync::Arc; use thiserror::Error; +use tokio::sync::OwnedRwLockWriteGuard; //TODO Support something other than btrees //TODO Support searching on a non primary index column @@ -38,82 +46,197 @@ impl IndexManager { &self, index_def: &Index, new_key: SqlTuple, - item_ptr: ItemIdData, + item_ptr: ItemPointer, ) -> Result<(), IndexManagerError> { - let mut search_stack = vec![]; - search_stack.push(self.get_root_node(index_def).await?); - - loop { - let current_node = search_stack - .pop() - .ok_or_else(IndexManagerError::StackEmpty)?; - - match current_node.0 { - BTreeNode::Branch(b) => { - //Now we need to scan to find our next traversal, the keys MUST be in order - for k in 0..=b.keys.len() { - let target_page = b.pointers[k]; - if k == b.keys.len() { - //Prior loops hit all the other options, so down the right pointer we go - let next_node = self.get_node(index_def, &target_page).await?; - search_stack.push((BTreeNode::Branch(b), current_node.1)); - search_stack.push((next_node, target_page)); - break; - } else if new_key <= b.keys[k] { - let next_node = self.get_node(index_def, &target_page).await?; - search_stack.push((BTreeNode::Branch(b), current_node.1)); - search_stack.push((next_node, target_page)); - break; - } + let page_id = PageId { + resource_key: index_def.id, + page_type: PageType::Data, + }; + + //Initial Special Case of an Empty Root + let (mut current_page, mut current_offset) = + self.get_root_page_for_write(index_def).await?; + if let None = current_page.as_mut() { + let root = BTreeLeaf::new(); + if !root.can_fit(&new_key) { + return Err(IndexManagerError::KeyTooLarge(new_key.encoded_size())); + } + + let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + root.serialize(&mut buffer); + current_page.replace(buffer); + self.lock_cache_manager + .update_page(page_id, current_offset, current_page) + .await?; + return Ok(()); + } + + //Next the goal is to get to the leaf + if let Some(s) = current_page.as_mut() { + let mut current_node = BTreeNode::parse(s, index_def)?; + + let mut found_leaf; + loop { + match current_node { + BTreeNode::Branch(b) => { + let next_page_offset = b.search(&new_key..&new_key)?; + current_page = self + .lock_cache_manager + .get_page_for_update(page_id, next_page_offset) + .await?; + current_offset = *next_page_offset; + + let s = current_page + .as_mut() + .ok_or_else(|| IndexManagerError::NodeEmpty(current_offset))?; + current_node = BTreeNode::parse(s, index_def)?; + continue; + } + BTreeNode::Leaf(mut l) => { + found_leaf = l; + break; } } - BTreeNode::Leaf(l) => { - //Okay we've gotten to the bottom of the tree, so time to do the adds - search_stack.push((BTreeNode::Leaf(l), current_node.1)); - break; + } + + //If the key fits in the leaf, we add it and are done + if found_leaf.can_fit(&new_key) { + found_leaf.add(new_key, item_ptr); + + let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + found_leaf.serialize(&mut buffer); + + current_page.replace(buffer); + self.lock_cache_manager + .update_page(page_id, current_offset, current_page) + .await?; + return Ok(()); + } + + //Doesn't fit so we have to split and work back up to the loop + let left_node_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?; + let right_node_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?; + + let mut left_node_page = self + .lock_cache_manager + .get_page_for_update(page_id, &left_node_offset) + .await?; + let mut right_node_page = self + .lock_cache_manager + .get_page_for_update(page_id, &right_node_offset) + .await?; + + let (new_split, new_right_node) = + found_leaf.add_and_split(left_node_offset, right_node_offset, new_key, item_ptr)?; + + let mut parent_node_offset = found_leaf + .parent_node + .ok_or_else(IndexManagerError::ParentNodeEmpty)?; + + let mut left_node_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + found_leaf.serialize(&mut left_node_buffer); + + let mut right_node_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + new_right_node.serialize(&mut right_node_buffer); + + left_node_page.replace(left_node_buffer); + right_node_page.replace(right_node_buffer); + + self.lock_cache_manager + .update_page(page_id, left_node_offset, left_node_page) + .await?; + self.lock_cache_manager + .update_page(page_id, right_node_offset, right_node_page) + .await?; + + //Now its time to fix the tree + loop { + let parent_page = self + .lock_cache_manager + .get_page_for_update(page_id, &parent_node_offset) + .await?; + if parent_node_offset == PageOffset(0) { + //We've hit the top of the system so we'll have to remake the root page } } } + Ok(()) + } - //At this point we should have a vec that traverses root->child->leaf of some depth - //Now we do the add and whatever adjustments we need to do to fix the parents - loop { - let current_node = search_stack - .pop() - .ok_or_else(IndexManagerError::StackEmpty)?; - - match current_node.0 { - BTreeNode::Branch(b) => {} - BTreeNode::Leaf(mut l) => { - let page_id = PageId { - resource_key: index_def.id, - page_type: PageType::Data, - }; - - if l.can_fit(&new_key) { - - //return Ok(self - // .file_manager - // .update_page(&page_id, ¤t_node.1, l.serialize()?) - // .await?); - } + async fn get_root_page_for_write( + &self, + index_def: &Index, + ) -> Result<(OwnedRwLockWriteGuard>, PageOffset), IndexManagerError> { + let page_id = PageId { + resource_key: index_def.id, + page_type: PageType::Data, + }; - //If we're here, we have a key that doesn't fit into the leaf so we need to split it. - let mut new_nodes = l.nodes; - match new_nodes.get_mut(&new_key) { - Some(iids) => iids.push(item_ptr), - None => { - new_nodes.insert(new_key, vec![item_ptr]); - } - } + let mut first_page_handle = self + .lock_cache_manager + .get_page_for_update(page_id, &PageOffset(0)) + .await?; + + let (root_offset, changed) = match first_page_handle.as_mut() { + Some(mut s) => { + let root_offset = usize::try_from(s.get_uint_le(size_of::()))?; + if root_offset == 0 { + //This is wrong, recreate it + let root_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?; + + s.clear(); + root_offset.serialize(&mut s); - break; + (root_offset, true) + } else { + (PageOffset(root_offset), false) } } + None => { + let root_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?; + + let mut first_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + root_offset.serialize(&mut first_page_buffer); + let new_page = vec![0; PAGE_SIZE as usize - size_of::()]; + first_page_buffer.extend_from_slice(&new_page); + + first_page_handle.replace(first_page_buffer); + + (root_offset, true) + } + }; + + //Now we know where root is, let's get it + let root_page_handle = self + .lock_cache_manager + .get_page_for_update(page_id, &root_offset) + .await?; + + if changed { + self.lock_cache_manager + .update_page(page_id, PageOffset(0), first_page_handle) + .await? } - Ok(()) + Ok((root_page_handle, root_offset)) } + /* + pub async fn search_for_key( + &self, + index_def: &Index, + key: &SqlTuple, + ) -> Result>, IndexManagerError> { + let (root_node, root_offset) = self.get_root_node(index_def).await?; + match root_node { + BTreeNode::Branch(b) => { + todo!("blah") + } + BTreeNode::Leaf(l) => match l.nodes.get(key) { + Some(s) => Ok(Some(s.clone())), + None => Ok(None), + }, + } + }*/ ///This function provides a mapping given an oversized bucket of how the leaf should be split /// Returns: @@ -139,12 +262,6 @@ impl IndexManager { )) } - // 1 2 3 4 5 - // 2 + 1? - - // 1 2 3 4 5 6 - // 3 + 1 - /// This provides the requested node based on the page, if it exists async fn get_node( &self, @@ -156,7 +273,7 @@ impl IndexManager { page_type: PageType::Data, }; - let page_handle = self.lock_cache_manager.get_page(page_id, *offset).await?; + let page_handle = self.lock_cache_manager.get_page(page_id, offset).await?; let page_buffer = page_handle.clone(); match page_buffer { @@ -164,100 +281,12 @@ impl IndexManager { None => Err(IndexManagerError::NoSuchNode(*offset)), } } - - /// This provides the root node and makes it if its doesn't exist - /// TODO - When locking is implemented, postgres has a far more elegant way to handle this - /// The first page becomes a pointer to root since root might not be page 1. - async fn get_root_node( - &self, - index_def: &Index, - ) -> Result<(BTreeNode, PageOffset), IndexManagerError> { - let page_id = PageId { - resource_key: index_def.id, - page_type: PageType::Data, - }; - let first_page_handle = self - .lock_cache_manager - .get_page(page_id, PageOffset(0)) - .await?; - - if let Some(s) = first_page_handle.as_ref() { - let mut first_page = s.clone(); - return self - .parse_root_page(index_def, &mut first_page, page_id) - .await; - } - - //We have to make it and handle the race window - drop(first_page_handle); - - let mut new_first_page_handle = self - .lock_cache_manager - .get_page_for_update(page_id, PageOffset(0)) - .await?; - - if let Some(s) = new_first_page_handle.as_mut() { - return self.parse_root_page(index_def, s, page_id).await; - } - - let root_offset = self.lock_cache_manager.get_offset(page_id).await?; - - let mut new_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - new_page_buffer.put_uint_le(u64::try_from(root_offset.0)?, size_of::()); - let new_page = vec![0; PAGE_SIZE as usize - size_of::()]; - - new_page_buffer.extend_from_slice(&new_page); - new_first_page_handle.replace(new_page_buffer); - self.lock_cache_manager - .add_page(page_id, PageOffset(0), new_first_page_handle) - .await?; - - //Now make the root node and save it - let mut root_handle = self - .lock_cache_manager - .get_page_for_update(page_id, root_offset) - .await?; - if let Some(s) = root_handle.as_mut() { - return self.parse_root_page(index_def, s, page_id).await; - } - - let root_node = BTreeLeaf::new(); - - let mut root_buffer = BytesMut::with_capacity(PAGE_SIZE as usize); - root_node.serialize(&mut root_buffer)?; - root_handle.replace(root_buffer); - - self.lock_cache_manager - .update_page(page_id, root_offset, root_handle) - .await?; - return Ok((BTreeNode::Leaf(root_node), root_offset)); - } - - async fn parse_root_page( - &self, - index_def: &Index, - first_page: &mut BytesMut, - page_id: PageId, - ) -> Result<(BTreeNode, PageOffset), IndexManagerError> { - let root_offset = usize::try_from(first_page.get_uint_le(size_of::()))?; - let root_handle = self - .lock_cache_manager - .get_page(page_id, PageOffset(root_offset)) - .await?; - let mut root_page = root_handle - .as_ref() - .ok_or(IndexManagerError::RootNodeEmpty())? - .clone() - .freeze(); - return Ok(( - BTreeNode::parse(&mut root_page, index_def)?, - PageOffset(root_offset), - )); - } } #[derive(Debug, Error)] pub enum IndexManagerError { + #[error(transparent)] + BTreeBranchError(#[from] BTreeBranchError), #[error(transparent)] BTreeLeafError(#[from] BTreeLeafError), #[error(transparent)] @@ -272,6 +301,10 @@ pub enum IndexManagerError { LockCacheManagerError(#[from] LockCacheManagerError), #[error("Node does not exists {0}")] NoSuchNode(PageOffset), + #[error("Node {0} empty")] + NodeEmpty(PageOffset), + #[error("Parent Node empty")] + ParentNodeEmpty(), #[error("Root Node Empty")] RootNodeEmpty(), #[error("Unable to search, the stack is empty")] @@ -281,3 +314,71 @@ pub enum IndexManagerError { #[error("Unable to split a node of size {0}")] UnableToSplit(usize), } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tempfile::TempDir; + use uuid::Uuid; + + use crate::{ + constants::Nullable, + engine::{ + io::{page_formats::UInt12, FileManager}, + objects::{ + types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition}, + Attribute, + }, + }, + }; + + use super::*; + + fn get_key_and_ptr(num: usize) -> (SqlTuple, ItemPointer) { + ( + SqlTuple(vec![ + Some(BaseSqlTypes::Text("test".to_string())), + Some(BaseSqlTypes::Integer(num as u32)), + ]), + ItemPointer::new(PageOffset(num), UInt12::new(0).unwrap()), + ) + } + + #[tokio::test] + async fn test_roundtrip() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path().as_os_str().to_os_string(); + + let fm = Arc::new(FileManager::new(tmp_dir)?); + let lm = LockCacheManager::new(fm); + let im = IndexManager::new(lm); + + let index = Index { + id: Uuid::new_v4(), + name: "test".to_string(), + columns: Arc::new(SqlTypeDefinition::new(&[ + Attribute::new( + "foo".to_string(), + BaseSqlTypesMapper::Text, + Nullable::NotNull, + None, + ), + Attribute::new( + "bar".to_string(), + BaseSqlTypesMapper::Integer, + Nullable::NotNull, + None, + ), + ])), + unique: true, + }; + + for i in 0..5000 { + let (key, ptr) = get_key_and_ptr(i); + im.add(&index, key, ptr).await?; + } + + Ok(()) + } +} diff --git a/src/engine/io/index_row_manager.rs b/src/engine/io/index_row_manager.rs new file mode 100644 index 0000000..4a45f19 --- /dev/null +++ b/src/engine/io/index_row_manager.rs @@ -0,0 +1,29 @@ +//!This is a layer above the index manager to handle finding active rows based on an index + +use std::sync::Arc; + +use crate::engine::objects::{Index, SqlTuple, Table}; + +use super::{IndexManager, VisibleRowManager}; + +pub struct IndexRowManager { + index_manager: IndexManager, + vis_row_man: VisibleRowManager, +} + +impl IndexRowManager { + pub fn new(index_manager: IndexManager, vis_row_man: VisibleRowManager) -> IndexRowManager { + IndexRowManager { + index_manager, + vis_row_man, + } + } + + pub fn get_rows_matching_key( + table: Arc
, + index: Arc, + key: SqlTuple, + ) -> Vec { + todo!("Don't call me yet!") + } +} diff --git a/src/engine/io/lock_cache_manager.rs b/src/engine/io/lock_cache_manager.rs index b1d97af..cb06563 100644 --- a/src/engine/io/lock_cache_manager.rs +++ b/src/engine/io/lock_cache_manager.rs @@ -30,10 +30,21 @@ impl LockCacheManager { Ok(self.file_manager.get_offset(&page_id).await?) } + pub async fn get_offset_non_zero( + &self, + page_id: PageId, + ) -> Result { + let mut offset = PageOffset(0); + while offset == PageOffset(0) { + offset = self.file_manager.get_offset(&page_id).await?; + } + Ok(offset) + } + pub async fn get_page( &self, page_id: PageId, - offset: PageOffset, + offset: &PageOffset, ) -> Result>, LockCacheManagerError> { Ok(self .get_page_internal(page_id, offset) @@ -45,7 +56,7 @@ impl LockCacheManager { pub async fn get_page_for_update( &self, page_id: PageId, - offset: PageOffset, + offset: &PageOffset, ) -> Result>, LockCacheManagerError> { Ok(self .get_page_internal(page_id, offset) @@ -57,16 +68,16 @@ impl LockCacheManager { async fn get_page_internal( &self, page_id: PageId, - offset: PageOffset, + offset: &PageOffset, ) -> Result>>, LockCacheManagerError> { let mut cache = self.cache.lock().await; - match cache.get(&(page_id, offset)) { + match cache.get(&(page_id, *offset)) { Some(s) => Ok(s.clone()), None => { //Cache miss, let's make the RwLock and drop the mutex let page_lock = Arc::new(RwLock::new(None)); let mut page_lock_write = page_lock.write().await; - cache.put((page_id, offset), page_lock.clone()); + cache.put((page_id, *offset), page_lock.clone()); drop(cache); //Now we can load the underlying page without blocking everyone @@ -157,11 +168,11 @@ mod tests { let first_offset = lm.get_offset(page_id).await?; assert_eq!(first_offset, PageOffset(0)); - let first_handle = lm.get_page(page_id, first_offset).await?; + let first_handle = lm.get_page(page_id, &first_offset).await?; assert_eq!(first_handle.as_ref(), None); drop(first_handle); - let mut second_handle = lm.get_page_for_update(page_id, first_offset).await?; + let mut second_handle = lm.get_page_for_update(page_id, &first_offset).await?; assert_eq!(second_handle.as_ref(), None); let page = get_test_page(1); @@ -169,21 +180,21 @@ mod tests { lm.update_page(page_id, first_offset, second_handle).await?; - let third_handle = lm.get_page(page_id, first_offset).await?; + let third_handle = lm.get_page(page_id, &first_offset).await?; let page2 = get_test_page(1); assert_eq!(third_handle.as_ref(), Some(&page2)); let fourth_offset = lm.get_offset(page_id).await?; assert_eq!(fourth_offset, PageOffset(1)); - let mut fourth_handle = lm.get_page_for_update(page_id, fourth_offset).await?; + let mut fourth_handle = lm.get_page_for_update(page_id, &fourth_offset).await?; assert_eq!(fourth_handle.as_ref(), None); let page3 = get_test_page(2); fourth_handle.replace(page3); lm.add_page(page_id, fourth_offset, fourth_handle).await?; - let mut fifth_handle = lm.get_page_for_update(page_id, fourth_offset).await?; + let mut fifth_handle = lm.get_page_for_update(page_id, &fourth_offset).await?; let fifth_page = fifth_handle .as_mut() .ok_or(LockCacheManagerError::PageMissing())?; @@ -193,7 +204,7 @@ mod tests { fifth_page.extend_from_slice(&page4[0..page4.len()]); lm.update_page(page_id, fourth_offset, fifth_handle).await?; - let mut sixth_handle = lm.get_page_for_update(page_id, fourth_offset).await?; + let mut sixth_handle = lm.get_page_for_update(page_id, &fourth_offset).await?; let sixth_page = sixth_handle .as_mut() .ok_or(LockCacheManagerError::PageMissing())?; @@ -203,4 +214,23 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_nonzero() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path().as_os_str().to_os_string(); + + let fm = Arc::new(FileManager::new(tmp_dir)?); + let lm = LockCacheManager::new(fm); + + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; + + let offset = lm.get_offset_non_zero(page_id).await?; + assert_ne!(offset, PageOffset(0)); + + Ok(()) + } } diff --git a/src/engine/io/row_manager.rs b/src/engine/io/row_manager.rs index f4a3b57..d52b123 100644 --- a/src/engine/io/row_manager.rs +++ b/src/engine/io/row_manager.rs @@ -55,7 +55,7 @@ impl RowManager { }; let mut page_handle = self .lock_cache_manager - .get_page_for_update(page_id, row_pointer.page) + .get_page_for_update(page_id, &row_pointer.page) .await?; let page_buffer = page_handle .as_mut() @@ -105,7 +105,7 @@ impl RowManager { }; let mut old_page_handle = self .lock_cache_manager - .get_page_for_update(page_id, row_pointer.page) + .get_page_for_update(page_id, &row_pointer.page) .await?; let old_page_buffer = old_page_handle .as_mut() @@ -167,7 +167,7 @@ impl RowManager { let page_handle = self .lock_cache_manager - .get_page(page_id, row_pointer.page) + .get_page(page_id, &row_pointer.page) .await?; let page_bytes = page_handle .as_ref() @@ -201,7 +201,7 @@ impl RowManager { let mut page_num = PageOffset(0); loop { - let page_handle = lock_cache_manager.get_page(page_id, page_num).await?; + let page_handle = lock_cache_manager.get_page(page_id, &page_num).await?; match page_handle.as_ref() { Some(s) => { let page = PageData::parse(table.clone(), page_num, s)?; @@ -232,7 +232,7 @@ impl RowManager { let next_free_page = self.free_space_manager.get_next_free_page(page_id).await?; let mut page_bytes = self .lock_cache_manager - .get_page_for_update(page_id, next_free_page) + .get_page_for_update(page_id, &next_free_page) .await?; match page_bytes.as_mut() { Some(p) => { @@ -259,7 +259,7 @@ impl RowManager { let new_page_offset = self.lock_cache_manager.get_offset(page_id).await?; let mut new_page_handle = self .lock_cache_manager - .get_page_for_update(page_id, new_page_offset) + .get_page_for_update(page_id, &new_page_offset) .await?; let mut new_page = PageData::new(new_page_offset);