diff --git a/benches/feophant_benchmark.rs b/benches/feophant_benchmark.rs
index f75f36d..439d327 100644
--- a/benches/feophant_benchmark.rs
+++ b/benches/feophant_benchmark.rs
@@ -3,9 +3,9 @@ use criterion::Criterion;
use criterion::{criterion_group, criterion_main};
use feophantlib::engine::get_row;
use feophantlib::engine::get_table;
+use feophantlib::engine::io::block_layer::file_manager::FileManager;
+use feophantlib::engine::io::block_layer::lock_cache_manager::LockCacheManager;
use feophantlib::engine::io::row_formats::RowData;
-use feophantlib::engine::io::FileManager;
-use feophantlib::engine::io::LockCacheManager;
use feophantlib::engine::io::RowManager;
use feophantlib::engine::transactions::TransactionId;
use futures::pin_mut;
diff --git a/src/engine.rs b/src/engine.rs
index 16e1d6c..967403b 100644
--- a/src/engine.rs
+++ b/src/engine.rs
@@ -33,10 +33,10 @@ pub use test_objects::get_table;
pub mod transactions;
use transactions::{TransactionId, TransactionManager};
+use self::io::block_layer::file_manager::FileManager;
+use self::io::block_layer::lock_cache_manager::LockCacheManager;
use self::io::ConstraintManager;
-use self::io::FileManager;
use self::io::IndexManager;
-use self::io::LockCacheManager;
use self::objects::QueryResult;
use std::ops::Deref;
use std::sync::Arc;
diff --git a/src/engine/analyzer/definition_lookup.rs b/src/engine/analyzer/definition_lookup.rs
index 7d1992a..ae94c5e 100644
--- a/src/engine/analyzer/definition_lookup.rs
+++ b/src/engine/analyzer/definition_lookup.rs
@@ -322,7 +322,8 @@ pub enum DefinitionLookupError {
mod tests {
use tempfile::TempDir;
- use crate::engine::io::{FileManager, LockCacheManager};
+ use crate::engine::io::block_layer::file_manager::FileManager;
+ use crate::engine::io::block_layer::lock_cache_manager::LockCacheManager;
// Note this useful idiom: importing names from outer (for mod tests) scope.
use super::super::super::io::RowManager;
diff --git a/src/engine/io.rs b/src/engine/io.rs
index 1096029..eac8c02 100644
--- a/src/engine/io.rs
+++ b/src/engine/io.rs
@@ -1,3 +1,5 @@
+pub mod block_layer;
+
mod constraint_manager;
pub use constraint_manager::ConstraintManager;
pub use constraint_manager::ConstraintManagerError;
@@ -15,17 +17,6 @@ pub use index_manager::IndexManager;
mod index_row_manager;
pub use index_row_manager::IndexRowManager;
-mod file_manager;
-pub use file_manager::FileManager;
-pub use file_manager::FileManagerError;
-
-mod free_space_manager;
-pub use free_space_manager::FreeSpaceManager;
-
-mod lock_cache_manager;
-pub use lock_cache_manager::LockCacheManager;
-pub use lock_cache_manager::LockCacheManagerError;
-
pub mod page_formats;
pub mod row_formats;
diff --git a/src/engine/io/block_layer.rs b/src/engine/io/block_layer.rs
new file mode 100644
index 0000000..e4778de
--- /dev/null
+++ b/src/engine/io/block_layer.rs
@@ -0,0 +1,36 @@
+/*
+
+ Addressing:
+ Uuid / Page Type / Page Offset
+
+ Locking:
+ Reading
+ Writing
+
+ Free Space:
+ In Use
+ Free
+
+ I am most concerned about lost writes.
+
+ Caching can move into the file layer, but locking stays out.
+
+ File Manager handles I/O operations
+
+ Free Space Manager guides what pages are usable
+
+ Lock Cache Manager Handles locking
+
+
+ Process:
+
+ let page = get_page_for_read()
+*/
+
+pub mod file_manager;
+
+pub mod free_space_manager;
+
+pub mod lock_cache_manager;
+
+pub mod lock_manager;
diff --git a/src/engine/io/file_manager.rs b/src/engine/io/block_layer/file_manager.rs
similarity index 97%
rename from src/engine/io/file_manager.rs
rename to src/engine/io/block_layer/file_manager.rs
index bb93847..07a1b28 100644
--- a/src/engine/io/file_manager.rs
+++ b/src/engine/io/block_layer/file_manager.rs
@@ -1,7 +1,7 @@
//! This is a different approach than I had done before. This file manager runs its own loop based on a spawned task
//! since the prior approach was too lock heavy and I couldn't figure out an approach that didn't starve resources.
-use super::page_formats::{PageId, PageOffset, UInt12, UInt12Error};
-use bytes::{Bytes, BytesMut};
+use super::super::page_formats::{PageId, PageOffset, UInt12, UInt12Error};
+use bytes::Bytes;
use std::convert::TryFrom;
use std::ffi::OsString;
use std::num::TryFromIntError;
@@ -83,7 +83,7 @@ impl FileManager {
&self,
page_id: &PageId,
offset: &PageOffset,
- ) -> Result, FileManagerError> {
+ ) -> Result , FileManagerError> {
let (res_request, res_receiver) = oneshot::channel();
self.request_queue
diff --git a/src/engine/io/file_manager/file_executor.rs b/src/engine/io/block_layer/file_manager/file_executor.rs
similarity index 99%
rename from src/engine/io/file_manager/file_executor.rs
rename to src/engine/io/block_layer/file_manager/file_executor.rs
index 9f76fc3..7cab0bd 100644
--- a/src/engine/io/file_manager/file_executor.rs
+++ b/src/engine/io/block_layer/file_manager/file_executor.rs
@@ -2,8 +2,8 @@ use super::file_operations::{FileOperations, FileOperationsError};
/// Inner type that implements the actual I/O operations so the outter type can
/// handle queue management.
use super::request_type::RequestType;
+use super::ResourceFormatter;
use crate::constants::PAGE_SIZE;
-use crate::engine::io::file_manager::ResourceFormatter;
use crate::engine::io::page_formats::{PageId, PageOffset};
use bytes::{Bytes, BytesMut};
use lru::LruCache;
@@ -599,7 +599,7 @@ mod tests {
use crate::{
constants::PAGES_PER_FILE,
- engine::io::{page_formats::PageType, FileManager},
+ engine::io::{block_layer::file_manager::FileManager, page_formats::PageType},
};
use super::*;
diff --git a/src/engine/io/file_manager/file_operations.rs b/src/engine/io/block_layer/file_manager/file_operations.rs
similarity index 96%
rename from src/engine/io/file_manager/file_operations.rs
rename to src/engine/io/block_layer/file_manager/file_operations.rs
index fcdd991..290a92f 100644
--- a/src/engine/io/file_manager/file_operations.rs
+++ b/src/engine/io/block_layer/file_manager/file_operations.rs
@@ -14,7 +14,7 @@ use tokio::{
};
use crate::constants::PAGE_SIZE;
-use crate::engine::io::file_manager::ResourceFormatter;
+use crate::engine::io::block_layer::file_manager::ResourceFormatter;
use crate::engine::io::page_formats::{PageId, PageOffset};
pub struct FileOperations {}
@@ -74,7 +74,7 @@ impl FileOperations {
pub async fn read_chunk(
mut file: File,
page_offset: &PageOffset,
- ) -> Result<(File, Option), FileOperationsError> {
+ ) -> Result<(File, Option), FileOperationsError> {
let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
let file_meta = file.metadata().await?;
@@ -94,7 +94,7 @@ impl FileOperations {
}
}
- Ok((file, Some(buffer)))
+ Ok((file, Some(buffer.freeze())))
}
pub async fn update_chunk(
diff --git a/src/engine/io/file_manager/request_type.rs b/src/engine/io/block_layer/file_manager/request_type.rs
similarity index 70%
rename from src/engine/io/file_manager/request_type.rs
rename to src/engine/io/block_layer/file_manager/request_type.rs
index b1a0f10..bff4ce4 100644
--- a/src/engine/io/file_manager/request_type.rs
+++ b/src/engine/io/block_layer/file_manager/request_type.rs
@@ -1,5 +1,5 @@
use crate::engine::io::page_formats::PageOffset;
-use bytes::{Bytes, BytesMut};
+use bytes::Bytes;
use tokio::sync::oneshot::Sender;
use super::file_executor::FileExecutorError;
@@ -8,11 +8,6 @@ use super::file_executor::FileExecutorError;
pub enum RequestType {
GetOffset(Sender>),
Add((PageOffset, Bytes, Sender>)),
- Read(
- (
- PageOffset,
- Sender, FileExecutorError>>,
- ),
- ),
+ Read((PageOffset, Sender, FileExecutorError>>)),
Update((PageOffset, Bytes, Sender>)),
}
diff --git a/src/engine/io/file_manager/resource_formatter.rs b/src/engine/io/block_layer/file_manager/resource_formatter.rs
similarity index 100%
rename from src/engine/io/file_manager/resource_formatter.rs
rename to src/engine/io/block_layer/file_manager/resource_formatter.rs
diff --git a/src/engine/io/free_space_manager.rs b/src/engine/io/block_layer/free_space_manager.rs
similarity index 86%
rename from src/engine/io/free_space_manager.rs
rename to src/engine/io/block_layer/free_space_manager.rs
index 23be17f..1431ac3 100644
--- a/src/engine/io/free_space_manager.rs
+++ b/src/engine/io/block_layer/free_space_manager.rs
@@ -3,11 +3,11 @@
//! page to say the space is availible. This means each page here can cover 134MB of free space.
use super::{
- page_formats::{PageId, PageOffset, PageType},
- LockCacheManager, LockCacheManagerError,
+ super::page_formats::{PageId, PageOffset, PageType},
+ lock_cache_manager::{LockCacheManager, LockCacheManagerError},
};
use crate::constants::PAGE_SIZE;
-use bytes::{Buf, BytesMut};
+use bytes::{Buf, Bytes, BytesMut};
use thiserror::Error;
#[derive(Clone, Debug)]
@@ -33,7 +33,7 @@ impl FreeSpaceManager {
let page_handle = self.lock_cache_manager.get_page(free_id, &offset).await?;
match page_handle.as_ref() {
Some(s) => {
- let mut page_frozen = s.clone().freeze();
+ let mut page_frozen = s.clone();
match Self::find_first_free_page_in_page(&mut page_frozen) {
Some(s) => {
let full_offset = PageOffset(s)
@@ -59,7 +59,7 @@ impl FreeSpaceManager {
let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
let new_page = vec![FreeStat::Free as u8; PAGE_SIZE as usize];
buffer.extend_from_slice(&new_page);
- new_page_handle.replace(buffer);
+ new_page_handle.replace(buffer.freeze());
self.lock_cache_manager
.add_page(free_id, next_po, new_page_handle)
@@ -86,10 +86,11 @@ impl FreeSpaceManager {
.lock_cache_manager
.get_page_for_update(free_id, &po)
.await?;
- let mut page = page_handle
+ let page = page_handle
.as_mut()
.ok_or(FreeSpaceManagerError::PageDoesNotExist(page_id))?;
- Self::set_status_inside_page(&mut page, offset, status);
+ let new_page = Self::set_status_inside_page(page, offset, status);
+ page_handle.replace(new_page);
Ok(self
.lock_cache_manager
@@ -118,7 +119,9 @@ impl FreeSpaceManager {
/// Sets the status of a field inside a page, you MUST pass an offset
/// that fits in the buffer.
- fn set_status_inside_page(buffer: &mut BytesMut, offset: usize, status: FreeStat) {
+ fn set_status_inside_page(src: &Bytes, offset: usize, status: FreeStat) -> Bytes {
+ let mut buffer = BytesMut::with_capacity(src.len());
+ buffer.extend_from_slice(&src[..]);
let offset_index = offset / 8;
let offset_subindex = offset % 8;
@@ -136,6 +139,8 @@ impl FreeSpaceManager {
}
buffer[offset_index] = new_value;
+
+ buffer.freeze()
}
}
@@ -161,14 +166,14 @@ mod tests {
use tempfile::TempDir;
use uuid::Uuid;
- use crate::engine::io::FileManager;
+ use crate::engine::io::block_layer::file_manager::FileManager;
use super::*;
/// Gets the status of a field inside a page, you MUST pass an offset
/// that fits in the buffer.
//This was in the implementation, I just only needed it for unit tests
- fn get_status_inside_page(buffer: &BytesMut, offset: usize) -> FreeStat {
+ fn get_status_inside_page(buffer: &Bytes, offset: usize) -> FreeStat {
let offset_index = offset / 8;
let offset_subindex = offset % 8;
@@ -187,11 +192,13 @@ mod tests {
let mut test = BytesMut::with_capacity(2);
test.put_u16(0x0);
- for i in 0..test.capacity() * 8 {
+ let mut test = test.freeze();
+
+ for i in 0..test.len() * 8 {
assert_eq!(get_status_inside_page(&test, i), FreeStat::Free);
- FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse);
+ test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::InUse);
assert_eq!(get_status_inside_page(&test, i), FreeStat::InUse);
- FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::Free);
+ test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::Free);
assert_eq!(get_status_inside_page(&test, i), FreeStat::Free);
}
@@ -204,11 +211,13 @@ mod tests {
test.put_u8(0x0);
test.put_u8(0x0);
+ let mut test = test.freeze();
+
for i in 0..test.len() * 8 {
let free_page = FreeSpaceManager::find_first_free_page_in_page(&mut test.clone());
assert_eq!(free_page, Some(i));
- FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse);
+ test = FreeSpaceManager::set_status_inside_page(&test, i, FreeStat::InUse);
}
assert_eq!(
FreeSpaceManager::find_first_free_page_in_page(&mut test),
diff --git a/src/engine/io/lock_cache_manager.rs b/src/engine/io/block_layer/lock_cache_manager.rs
similarity index 71%
rename from src/engine/io/lock_cache_manager.rs
rename to src/engine/io/block_layer/lock_cache_manager.rs
index cb06563..cb3107e 100644
--- a/src/engine/io/lock_cache_manager.rs
+++ b/src/engine/io/block_layer/lock_cache_manager.rs
@@ -1,8 +1,6 @@
-use super::{
- page_formats::{PageId, PageOffset},
- FileManager, FileManagerError,
-};
-use bytes::BytesMut;
+use super::file_manager::{FileManager, FileManagerError};
+use crate::engine::io::page_formats::{PageId, PageOffset};
+use bytes::Bytes;
use lru::LruCache;
use std::sync::Arc;
use thiserror::Error;
@@ -13,7 +11,7 @@ pub struct LockCacheManager {
//TODO I don't like these massive single hashes protected with a single lock
// Long term I need to make a fixed hashmap and evict them myself.
// Holding on this since I might be able to work around it
- cache: Arc>>>>>,
+ cache: Arc>>>>>,
file_manager: Arc,
}
@@ -45,7 +43,7 @@ impl LockCacheManager {
&self,
page_id: PageId,
offset: &PageOffset,
- ) -> Result>, LockCacheManagerError> {
+ ) -> Result>, LockCacheManagerError> {
Ok(self
.get_page_internal(page_id, offset)
.await?
@@ -57,7 +55,7 @@ impl LockCacheManager {
&self,
page_id: PageId,
offset: &PageOffset,
- ) -> Result>, LockCacheManagerError> {
+ ) -> Result>, LockCacheManagerError> {
Ok(self
.get_page_internal(page_id, offset)
.await?
@@ -69,7 +67,7 @@ impl LockCacheManager {
&self,
page_id: PageId,
offset: &PageOffset,
- ) -> Result>>, LockCacheManagerError> {
+ ) -> Result>>, LockCacheManagerError> {
let mut cache = self.cache.lock().await;
match cache.get(&(page_id, *offset)) {
Some(s) => Ok(s.clone()),
@@ -95,7 +93,7 @@ impl LockCacheManager {
&self,
page_id: PageId,
offset: PageOffset,
- guard: OwnedRwLockWriteGuard>,
+ guard: OwnedRwLockWriteGuard >,
) -> Result<(), LockCacheManagerError> {
let page = match guard.as_ref() {
Some(s) => s.clone(),
@@ -105,7 +103,7 @@ impl LockCacheManager {
};
Ok(self
.file_manager
- .update_page(&page_id, &offset, page.freeze())
+ .update_page(&page_id, &offset, page)
.await?)
}
@@ -113,7 +111,7 @@ impl LockCacheManager {
&self,
page_id: PageId,
offset: PageOffset,
- guard: OwnedRwLockWriteGuard >,
+ guard: OwnedRwLockWriteGuard >,
) -> Result<(), LockCacheManagerError> {
let page = match guard.as_ref() {
Some(s) => s.clone(),
@@ -121,10 +119,7 @@ impl LockCacheManager {
return Err(LockCacheManagerError::PageMissing());
}
};
- Ok(self
- .file_manager
- .add_page(&page_id, &offset, page.freeze())
- .await?)
+ Ok(self.file_manager.add_page(&page_id, &offset, page).await?)
}
}
@@ -138,18 +133,26 @@ pub enum LockCacheManagerError {
#[cfg(test)]
mod tests {
+ use bytes::BytesMut;
use tempfile::TempDir;
use uuid::Uuid;
- use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType};
+ use crate::{
+ constants::PAGE_SIZE,
+ engine::io::{
+ format_traits::{Parseable, Serializable},
+ index_formats::BTreeFirstPage,
+ page_formats::PageType,
+ },
+ };
use super::*;
- fn get_test_page(fill: u8) -> BytesMut {
+ fn get_test_page(fill: u8) -> Bytes {
let mut test_page = BytesMut::with_capacity(PAGE_SIZE as usize);
let free_space = vec![fill; PAGE_SIZE as usize];
test_page.extend_from_slice(&free_space);
- test_page
+ test_page.freeze()
}
#[tokio::test]
@@ -198,10 +201,9 @@ mod tests {
let fifth_page = fifth_handle
.as_mut()
.ok_or(LockCacheManagerError::PageMissing())?;
- fifth_page.clear();
let page4 = get_test_page(3);
- fifth_page.extend_from_slice(&page4[0..page4.len()]);
+ fifth_handle.replace(page4);
lm.update_page(page_id, fourth_offset, fifth_handle).await?;
let mut sixth_handle = lm.get_page_for_update(page_id, &fourth_offset).await?;
@@ -233,4 +235,49 @@ mod tests {
Ok(())
}
+
+ /// This is reproducing an interesting bug that the cache seems to be remembering
+ /// that someone had previously read data out of the buffer. I think a clone is
+ /// missing.
+ #[tokio::test]
+ async fn test_repeated_read() -> Result<(), Box> {
+ let tmp = TempDir::new()?;
+ let tmp_dir = tmp.path().as_os_str().to_os_string();
+
+ let fm = Arc::new(FileManager::new(tmp_dir)?);
+ let lm = LockCacheManager::new(fm);
+
+ let page_id = PageId {
+ resource_key: Uuid::new_v4(),
+ page_type: PageType::Data,
+ };
+
+ let offset = lm.get_offset_non_zero(page_id).await?;
+ let mut page = lm.get_page_for_update(page_id, &offset).await?;
+
+ let first = BTreeFirstPage {
+ root_offset: PageOffset(4000),
+ };
+ first.serialize_and_pad(&mut page);
+ assert_eq!(page.as_ref().unwrap().len(), PAGE_SIZE as usize);
+ lm.update_page(page_id, offset, page).await?;
+
+ let mut page2 = lm.get_page_for_update(page_id, &offset).await?;
+ assert_eq!(page2.as_ref().unwrap().len(), PAGE_SIZE as usize);
+ match page2.as_mut() {
+ Some(mut s) => {
+ let mut change = BTreeFirstPage::parse(&mut s)?;
+ }
+ None => panic!("Foo"),
+ }
+ drop(page2);
+
+ let mut page3 = lm.get_page_for_update(page_id, &offset).await?;
+ assert_eq!(page3.as_ref().unwrap().len(), PAGE_SIZE as usize);
+ let node3 = BTreeFirstPage::parse(&mut page3.as_ref().unwrap().clone())?;
+ assert_eq!(node3.root_offset, PageOffset(4000));
+ drop(page3);
+
+ Ok(())
+ }
}
diff --git a/src/engine/io/block_layer/lock_manager.rs b/src/engine/io/block_layer/lock_manager.rs
new file mode 100644
index 0000000..8a879dc
--- /dev/null
+++ b/src/engine/io/block_layer/lock_manager.rs
@@ -0,0 +1,3 @@
+pub struct LockManager;
+
+impl LockManager {}
diff --git a/src/engine/io/format_traits/serializable.rs b/src/engine/io/format_traits/serializable.rs
index 021f593..8f83380 100644
--- a/src/engine/io/format_traits/serializable.rs
+++ b/src/engine/io/format_traits/serializable.rs
@@ -1,6 +1,7 @@
//! Serializes a given struct to a given ByteMut
-use bytes::{BufMut, BytesMut};
+use bytes::{BufMut, Bytes, BytesMut};
+use tokio::sync::OwnedRwLockWriteGuard;
use crate::constants::PAGE_SIZE;
@@ -8,37 +9,26 @@ pub trait Serializable {
/// Transforms the structure to a byte stream
fn serialize(&self, buffer: &mut impl BufMut);
- /// Handles updating the page from the I/O sub system
- fn serialize_and_pad(&self, page: &mut Option) {
- match page.as_mut() {
- Some(mut s) => {
- s.clear();
+ /// Produces a new page to support the change to how the I/O subsystem works
+ fn serialize_and_pad(&self, buffer: &mut OwnedRwLockWriteGuard>) {
+ let mut page = BytesMut::with_capacity(PAGE_SIZE as usize);
+ self.serialize(&mut page);
- self.serialize(&mut s);
-
- if s.len() != PAGE_SIZE as usize {
- let padding = vec![0; PAGE_SIZE as usize - s.len()];
- s.extend_from_slice(&padding);
- }
- }
- None => {
- let mut new_page = BytesMut::with_capacity(PAGE_SIZE as usize);
- self.serialize(&mut new_page);
-
- if new_page.len() != PAGE_SIZE as usize {
- let padding = vec![0; PAGE_SIZE as usize - new_page.len()];
- new_page.extend_from_slice(&padding);
- }
-
- page.replace(new_page);
- }
+ if page.len() != PAGE_SIZE as usize {
+ let padding = vec![0; PAGE_SIZE as usize - page.len()];
+ page.extend_from_slice(&padding);
}
+
+ buffer.replace(page.freeze());
}
}
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use bytes::Buf;
+ use tokio::sync::RwLock;
use super::*;
@@ -51,34 +41,24 @@ mod tests {
}
}
- #[test]
- fn test_none() -> Result<(), Box> {
- let test = Test { inner: 2000 };
-
- let mut page = None;
- test.serialize_and_pad(&mut page);
-
- assert!(page.is_some());
-
- let mut page = page.unwrap();
- assert_eq!(page.len(), PAGE_SIZE as usize);
- assert_eq!(test.inner, page.get_u32_le());
-
- Ok(())
- }
-
- #[test]
- fn test_some() -> Result<(), Box> {
+ #[tokio::test]
+ async fn test_roundtrip() -> Result<(), Box> {
let test = Test { inner: 2000 };
- let mut page = Some(BytesMut::with_capacity(PAGE_SIZE as usize));
- test.serialize_and_pad(&mut page);
+ let page_lock = Arc::new(RwLock::new(None));
+ let mut guard = page_lock.clone().write_owned().await;
- assert!(page.is_some());
+ test.serialize_and_pad(&mut guard);
+ drop(guard);
- let mut page = page.unwrap();
- assert_eq!(page.len(), PAGE_SIZE as usize);
- assert_eq!(test.inner, page.get_u32_le());
+ let page = page_lock.read_owned().await;
+ if let Some(s) = page.as_ref() {
+ let mut s = s.clone();
+ assert_eq!(s.len(), PAGE_SIZE as usize);
+ assert_eq!(test.inner, s.get_u32_le());
+ } else {
+ panic!("None found!");
+ }
Ok(())
}
diff --git a/src/engine/io/index_formats/btree_branch.rs b/src/engine/io/index_formats/btree_branch.rs
index d2f43fa..1aa0120 100644
--- a/src/engine/io/index_formats/btree_branch.rs
+++ b/src/engine/io/index_formats/btree_branch.rs
@@ -8,6 +8,7 @@ use crate::{
engine::{
io::{
encode_size, expected_encoded_size,
+ format_traits::Serializable,
page_formats::{ItemIdData, ItemIdDataError, PageOffset},
row_formats::{NullMask, NullMaskError},
ConstEncodedSize, EncodedSize, SelfEncodedSize, SizeError,
@@ -15,8 +16,8 @@ use crate::{
objects::{types::BaseSqlTypesError, SqlTuple},
},
};
-use bytes::{BufMut, Bytes, BytesMut};
-use std::{convert::TryFrom, num::TryFromIntError, ops::RangeBounds};
+use bytes::BufMut;
+use std::{num::TryFromIntError, ops::RangeBounds};
use thiserror::Error;
#[derive(Clone, Debug, PartialEq)]
@@ -42,7 +43,6 @@ impl BTreeBranch {
pub fn add(
&mut self,
- old_pointer: PageOffset,
left_pointer: PageOffset,
key: SqlTuple,
right_pointer: PageOffset,
@@ -78,7 +78,6 @@ impl BTreeBranch {
/// **WARNING** If this function fails the branch should be considered poisoned and not used.
pub fn add_and_split(
&mut self,
- old_pointer: PageOffset,
left_pointer: PageOffset,
key: SqlTuple,
right_pointer: PageOffset,
@@ -141,32 +140,6 @@ impl BTreeBranch {
Ok(index_search_start(&self.keys, &self.pointers, range)?)
}
-
- pub fn serialize(&self) -> Result {
- let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
- buffer.put_u8(NodeType::Branch as u8);
-
- BTreeNode::write_node(&mut buffer, Some(self.parent_node))?;
-
- encode_size(&mut buffer, self.keys.len());
-
- for key in self.keys.iter() {
- BTreeNode::write_sql_tuple(&mut buffer, key);
- }
-
- for pointer in self.pointers.iter() {
- let pointer_u64 = u64::try_from(pointer.0)?;
- buffer.put_uint_le(pointer_u64, PageOffset::encoded_size());
- }
-
- //Zero pad to page size
- if buffer.len() < PAGE_SIZE as usize {
- let free_space = vec![0; PAGE_SIZE as usize - buffer.len()];
- buffer.extend_from_slice(&free_space);
- }
-
- Ok(buffer.freeze())
- }
}
impl SelfEncodedSize for BTreeBranch {
@@ -179,14 +152,28 @@ impl SelfEncodedSize for BTreeBranch {
new_size += tup.encoded_size();
}
- for point in self.pointers.iter() {
- new_size += PageOffset::encoded_size();
- }
+ new_size += self.pointers.len() * PageOffset::encoded_size();
new_size
}
}
+impl Serializable for BTreeBranch {
+ fn serialize(&self, buffer: &mut impl BufMut) {
+ buffer.put_u8(NodeType::Branch as u8);
+
+ BTreeNode::write_node(buffer, Some(self.parent_node));
+
+ encode_size(buffer, self.keys.len());
+
+ for key in self.keys.iter() {
+ BTreeNode::write_sql_tuple(buffer, key);
+ }
+
+ self.pointers.iter().for_each(|p| p.serialize(buffer));
+ }
+}
+
#[derive(Debug, Error)]
pub enum BTreeBranchError {
#[error(transparent)]
@@ -233,6 +220,7 @@ mod tests {
Attribute, Index,
},
};
+ use bytes::BytesMut;
use uuid::Uuid;
fn get_index() -> Index {
@@ -278,8 +266,9 @@ mod tests {
pointers,
};
- let mut test_serial = test.serialize()?;
- let test_parse = BTreeNode::parse(&mut test_serial, &get_index())?;
+ let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
+ test.serialize(&mut buffer);
+ let test_parse = BTreeNode::parse(&mut buffer, &get_index())?;
match test_parse {
BTreeNode::Branch(b) => assert_eq!(test, b),
diff --git a/src/engine/io/index_formats/btree_first_page.rs b/src/engine/io/index_formats/btree_first_page.rs
index c5eba72..da5023c 100644
--- a/src/engine/io/index_formats/btree_first_page.rs
+++ b/src/engine/io/index_formats/btree_first_page.rs
@@ -33,9 +33,19 @@ pub enum BTreeFirstPageError {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use bytes::BytesMut;
+ use tempfile::TempDir;
+ use uuid::Uuid;
- use crate::constants::PAGE_SIZE;
+ use crate::{
+ constants::PAGE_SIZE,
+ engine::io::{
+ block_layer::{file_manager::FileManager, lock_cache_manager::LockCacheManager},
+ page_formats::{PageId, PageType},
+ },
+ };
use super::*;
@@ -54,4 +64,52 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_on_disk() -> Result<(), Box> {
+ let tmp = TempDir::new()?;
+ let tmp_dir = tmp.path().as_os_str().to_os_string();
+
+ let fm = Arc::new(FileManager::new(tmp_dir)?);
+ let lm = LockCacheManager::new(fm.clone());
+
+ let page_id = PageId {
+ resource_key: Uuid::new_v4(),
+ page_type: PageType::Data,
+ };
+
+ let first_offset = lm.get_offset(page_id).await?;
+ assert_eq!(first_offset, PageOffset(0));
+
+ let mut first_page = lm.get_page_for_update(page_id, &first_offset).await?;
+ let root_offset = lm.get_offset_non_zero(page_id).await?;
+ assert_ne!(root_offset, PageOffset(0));
+
+ let btfp = BTreeFirstPage { root_offset };
+ btfp.serialize_and_pad(&mut first_page);
+ lm.update_page(page_id, first_offset, first_page).await?;
+
+ // Okay now its time to actually test, without drop
+ let mut new_first_page = lm.get_page(page_id, &PageOffset(0)).await?.clone();
+ if let Some(s) = new_first_page.as_mut() {
+ let btfp2 = BTreeFirstPage::parse(&mut s.clone())?;
+ assert_ne!(btfp2.root_offset, PageOffset(0));
+ } else {
+ panic!("That page should exist!");
+ }
+
+ // Test again with a drop
+ drop(lm);
+ let lm2 = LockCacheManager::new(fm);
+
+ let mut new_first_page2 = lm2.get_page(page_id, &PageOffset(0)).await?.clone();
+ if let Some(s) = new_first_page2.as_mut() {
+ let btfp2 = BTreeFirstPage::parse(&mut s.clone())?;
+ assert_ne!(btfp2.root_offset, PageOffset(0));
+ } else {
+ panic!("That page should exist!");
+ }
+
+ Ok(())
+ }
}
diff --git a/src/engine/io/index_formats/btree_node.rs b/src/engine/io/index_formats/btree_node.rs
index cc9ed4e..ef0b551 100644
--- a/src/engine/io/index_formats/btree_node.rs
+++ b/src/engine/io/index_formats/btree_node.rs
@@ -20,7 +20,8 @@
//! Note: Min size for all indexes is 2x PAGE_SIZE since the root page is used to mean None. This will change
//! since the root page will have a pointer so we can lock and split the root node.
-use crate::engine::io::page_formats::PageOffset;
+use crate::engine::io::format_traits::{Parseable, Serializable};
+use crate::engine::io::page_formats::{PageOffset, PageOffsetError};
use crate::engine::io::row_formats::{ItemPointer, ItemPointerError, NullMaskError};
use crate::engine::io::{parse_size, ConstEncodedSize, SizeError};
use crate::engine::objects::types::{BaseSqlTypes, BaseSqlTypesError};
@@ -49,18 +50,11 @@ pub enum NodeType {
}
impl BTreeNode {
- pub fn write_node(
- buffer: &mut impl BufMut,
- node: Option,
- ) -> Result<(), BTreeNodeError> {
+ pub fn write_node(buffer: &mut impl BufMut, node: Option) {
match node {
- Some(pn) => {
- let pn_u64 = u64::try_from(pn.0)?;
- buffer.put_uint_le(pn_u64, size_of::())
- }
+ Some(pn) => pn.serialize(buffer),
None => buffer.put_uint_le(0, size_of::()),
}
- Ok(())
}
pub fn write_sql_tuple(buffer: &mut impl BufMut, tuple: &SqlTuple) {
@@ -79,7 +73,7 @@ impl BTreeNode {
}
let node_type = buffer.get_u8();
- let parent_node = Self::parse_page(buffer)?;
+ let parent_node = PageOffset::parse(buffer)?;
if node_type == NodeType::Leaf as u8 {
let left_node = Self::parse_page(buffer)?;
@@ -108,8 +102,6 @@ impl BTreeNode {
nodes: buckets,
}));
} else {
- let parent = parent_node.ok_or_else(BTreeNodeError::ParentNull)?;
-
let keys_count = parse_size(buffer)?;
let mut keys = Vec::with_capacity(keys_count);
@@ -135,7 +127,7 @@ impl BTreeNode {
}
return Ok(BTreeNode::Branch(BTreeBranch {
- parent_node: parent,
+ parent_node,
keys,
pointers,
}));
@@ -192,6 +184,8 @@ pub enum BTreeNodeError {
MissingPointerData(usize, usize),
#[error(transparent)]
NullMaskError(#[from] NullMaskError),
+ #[error(transparent)]
+ PageOffsetError(#[from] PageOffsetError),
#[error("Parent cannot be 0")]
ParentNull(),
#[error(transparent)]
diff --git a/src/engine/io/index_manager.rs b/src/engine/io/index_manager.rs
index 65c2b49..bd9c2ef 100644
--- a/src/engine/io/index_manager.rs
+++ b/src/engine/io/index_manager.rs
@@ -2,32 +2,40 @@
// Okay so more thinking, my approach needs to change
/*
- For adds, I'll find the leaf page using write locks, dropping as I go.
+ lock leaf
- Once found, I'll add and then follow the parents up until everything fits
+ split _ cow
+ lock left and right leaves
+
+ check up splitting as we go
+
+ write down, unlocking as we go
*/
-use super::index_formats::{BTreeBranchError, BTreeLeafError, BTreeNode, BTreeNodeError};
+use super::block_layer::lock_cache_manager::{LockCacheManager, LockCacheManagerError};
+use super::format_traits::Parseable;
+use super::index_formats::{
+ BTreeBranchError, BTreeFirstPage, BTreeFirstPageError, BTreeLeafError, BTreeNode,
+ BTreeNodeError,
+};
use super::page_formats::PageOffset;
-use super::page_formats::{ItemIdData, PageId, PageType};
+use super::page_formats::{PageId, PageType};
use super::row_formats::ItemPointer;
-use super::{LockCacheManager, LockCacheManagerError, SelfEncodedSize};
-use crate::{
- constants::PAGE_SIZE,
- engine::{
- io::index_formats::BTreeLeaf,
- objects::{Index, SqlTuple},
- },
-};
-use bytes::{Buf, BufMut, BytesMut};
-use std::convert::TryFrom;
-use std::mem::size_of;
+use crate::engine::io::format_traits::Serializable;
+use crate::engine::io::index_formats::BTreeBranch;
+use crate::engine::objects::{Index, SqlTuple};
use std::num::TryFromIntError;
-use std::ops::Range;
use thiserror::Error;
-use tokio::sync::OwnedRwLockWriteGuard;
+
+mod find_leaf;
+use find_leaf::find_leaf;
+use find_leaf::FindLeafError;
+
+mod split_leaf;
+use split_leaf::split_leaf;
+use split_leaf::SplitLeafError;
//TODO Support something other than btrees
//TODO Support searching on a non primary index column
@@ -53,213 +61,199 @@ impl IndexManager {
page_type: PageType::Data,
};
- //Initial Special Case of an Empty Root
- let (mut current_page, mut current_offset) =
- self.get_root_page_for_write(index_def).await?;
- if let None = current_page.as_mut() {
- let root = BTreeLeaf::new();
- if !root.can_fit(&new_key) {
- return Err(IndexManagerError::KeyTooLarge(new_key.encoded_size()));
- }
+ debug!("Adding {:?}", new_key);
+
+ //Find the target leaf
+ let (mut page, page_offset, mut leaf) =
+ find_leaf(&self.lock_cache_manager, index_def, &new_key).await?;
+
+ debug!("target offset {0}", page_offset);
+ //If the key fits in the leaf, we add it and are done
+ if leaf.can_fit(&new_key) {
+ debug!("fits");
+
+ leaf.add(new_key, item_ptr)?;
+
+ leaf.serialize_and_pad(&mut page);
- let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
- root.serialize(&mut buffer);
- current_page.replace(buffer);
self.lock_cache_manager
- .update_page(page_id, current_offset, current_page)
+ .update_page(page_id, page_offset, page)
.await?;
return Ok(());
}
- //Next the goal is to get to the leaf
- if let Some(s) = current_page.as_mut() {
- let mut current_node = BTreeNode::parse(s, index_def)?;
+ debug!("expand");
- let mut found_leaf;
- loop {
- match current_node {
- BTreeNode::Branch(b) => {
- let next_page_offset = b.search(&new_key..&new_key)?;
- current_page = self
- .lock_cache_manager
- .get_page_for_update(page_id, next_page_offset)
- .await?;
- current_offset = *next_page_offset;
+ //Lock the leafs left and right if they exist
+ let left_neighbor = leaf.left_node;
+ let left_page = match left_neighbor {
+ Some(s) => Some(
+ self.lock_cache_manager
+ .get_page_for_update(page_id, &s)
+ .await?,
+ ),
+ None => None,
+ };
- let s = current_page
- .as_mut()
- .ok_or_else(|| IndexManagerError::NodeEmpty(current_offset))?;
- current_node = BTreeNode::parse(s, index_def)?;
- continue;
- }
- BTreeNode::Leaf(mut l) => {
- found_leaf = l;
- break;
- }
+ let right_neighbor = leaf.right_node;
+ let right_page = match right_neighbor {
+ Some(s) => Some(
+ self.lock_cache_manager
+ .get_page_for_update(page_id, &s)
+ .await?,
+ ),
+ None => None,
+ };
+
+ //Doesn't fit so we have to split and work back up to the loop
+ let (mut split_key, mut parent_node_offset, new_left_offset, new_right_offset) =
+ split_leaf(&self.lock_cache_manager, index_def, leaf, new_key, item_ptr).await?;
+
+ if let Some(mut s) = left_page {
+ if let Some(s2) = s.as_mut() {
+ if let BTreeNode::Leaf(mut l) = BTreeNode::parse(&mut s2.clone(), index_def)? {
+ l.right_node = Some(new_left_offset);
+ l.serialize_and_pad(&mut s);
+ self.lock_cache_manager
+ .update_page(page_id, left_neighbor.unwrap(), s)
+ .await?;
+ } else {
+ return Err(IndexManagerError::UnexpectedBranch(left_neighbor.unwrap()));
}
}
+ }
- //If the key fits in the leaf, we add it and are done
- if found_leaf.can_fit(&new_key) {
- found_leaf.add(new_key, item_ptr);
+ if let Some(mut s) = right_page {
+ if let Some(s2) = s.as_mut() {
+ if let BTreeNode::Leaf(mut l) = BTreeNode::parse(&mut s2.clone(), index_def)? {
+ l.left_node = Some(new_right_offset);
+ l.serialize_and_pad(&mut s);
+ self.lock_cache_manager
+ .update_page(page_id, right_neighbor.unwrap(), s)
+ .await?;
+ } else {
+ return Err(IndexManagerError::UnexpectedBranch(right_neighbor.unwrap()));
+ }
+ }
+ }
- let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
- found_leaf.serialize(&mut buffer);
+ //Now its time to fix the tree
+ loop {
+ let mut parent_page = self
+ .lock_cache_manager
+ .get_page_for_update(page_id, &parent_node_offset)
+ .await?;
+ if parent_node_offset == PageOffset(0) {
+ //We've hit the top of the system so we'll have to remake the root page
+ let new_root_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?;
- current_page.replace(buffer);
+ let mut new_root_page = self
+ .lock_cache_manager
+ .get_page_for_update(page_id, &new_root_offset)
+ .await?;
+
+ let new_root =
+ BTreeBranch::new(PageOffset(0), new_left_offset, split_key, new_right_offset);
+
+ new_root.serialize_and_pad(&mut new_root_page);
+ self.lock_cache_manager
+ .update_page(page_id, new_root_offset, new_root_page)
+ .await?;
+
+ let first_page = BTreeFirstPage {
+ root_offset: new_root_offset,
+ };
+ first_page.serialize_and_pad(&mut parent_page);
self.lock_cache_manager
- .update_page(page_id, current_offset, current_page)
+ .update_page(page_id, PageOffset(0), parent_page)
.await?;
+
return Ok(());
}
+ if let Some(s) = parent_page.as_mut() {
+ if let BTreeNode::Branch(mut b) = BTreeNode::parse(&mut s.clone(), index_def)? {
+ if b.can_fit(&split_key) {
+ b.add(new_left_offset, split_key, new_right_offset)?;
- //Doesn't fit so we have to split and work back up to the loop
- let left_node_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?;
- let right_node_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?;
+ b.serialize_and_pad(&mut parent_page);
- let mut left_node_page = self
- .lock_cache_manager
- .get_page_for_update(page_id, &left_node_offset)
- .await?;
- let mut right_node_page = self
- .lock_cache_manager
- .get_page_for_update(page_id, &right_node_offset)
- .await?;
-
- let (new_split, new_right_node) =
- found_leaf.add_and_split(left_node_offset, right_node_offset, new_key, item_ptr)?;
+ self.lock_cache_manager
+ .update_page(page_id, parent_node_offset, parent_page)
+ .await?;
- let mut parent_node_offset = found_leaf
- .parent_node
- .ok_or_else(IndexManagerError::ParentNodeEmpty)?;
+ return Ok(());
+ } else {
+ //Need to split the branch and move up a level
+ let (middle_key, new_right) =
+ b.add_and_split(new_left_offset, split_key, new_right_offset)?;
- let mut left_node_buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
- found_leaf.serialize(&mut left_node_buffer);
+ let new_right_offset =
+ self.lock_cache_manager.get_offset_non_zero(page_id).await?;
+ let mut new_right_page = self
+ .lock_cache_manager
+ .get_page_for_update(page_id, &new_right_offset)
+ .await?;
+ new_right.serialize_and_pad(&mut new_right_page);
+ self.lock_cache_manager
+ .update_page(page_id, new_right_offset, new_right_page)
+ .await?;
- let mut right_node_buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
- new_right_node.serialize(&mut right_node_buffer);
+ b.serialize_and_pad(&mut parent_page);
+ self.lock_cache_manager
+ .update_page(page_id, PageOffset(0), parent_page)
+ .await?;
- left_node_page.replace(left_node_buffer);
- right_node_page.replace(right_node_buffer);
+ parent_node_offset = b.parent_node;
+ split_key = middle_key;
- self.lock_cache_manager
- .update_page(page_id, left_node_offset, left_node_page)
- .await?;
- self.lock_cache_manager
- .update_page(page_id, right_node_offset, right_node_page)
- .await?;
-
- //Now its time to fix the tree
- loop {
- let parent_page = self
- .lock_cache_manager
- .get_page_for_update(page_id, &parent_node_offset)
- .await?;
- if parent_node_offset == PageOffset(0) {
- //We've hit the top of the system so we'll have to remake the root page
+ continue;
+ }
+ } else {
+ return Err(IndexManagerError::UnexpectedLeaf(parent_node_offset));
}
+ } else {
+ return Err(IndexManagerError::NodeEmpty(parent_node_offset));
}
}
- Ok(())
}
- async fn get_root_page_for_write(
+ pub async fn search_for_key(
&self,
index_def: &Index,
- ) -> Result<(OwnedRwLockWriteGuard>, PageOffset), IndexManagerError> {
+ key: &SqlTuple,
+ ) -> Result >, IndexManagerError> {
let page_id = PageId {
resource_key: index_def.id,
page_type: PageType::Data,
};
-
- let mut first_page_handle = self
+ let first_page = self
.lock_cache_manager
- .get_page_for_update(page_id, &PageOffset(0))
+ .get_page(page_id, &PageOffset(0))
.await?;
- let (root_offset, changed) = match first_page_handle.as_mut() {
- Some(mut s) => {
- let root_offset = usize::try_from(s.get_uint_le(size_of::()))?;
- if root_offset == 0 {
- //This is wrong, recreate it
- let root_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?;
-
- s.clear();
- root_offset.serialize(&mut s);
+ if let Some(s) = first_page.as_ref() {
+ let first_node = BTreeFirstPage::parse(&mut s.clone())?;
- (root_offset, true)
- } else {
- (PageOffset(root_offset), false)
+ let mut current_offset = first_node.root_offset;
+ loop {
+ debug!("scan {0}", current_offset);
+ let node = self.get_node(index_def, ¤t_offset).await?;
+ match node {
+ BTreeNode::Branch(b) => {
+ current_offset = *b.search(key..key)?;
+ continue;
+ }
+ BTreeNode::Leaf(l) => match l.nodes.get(key) {
+ Some(s) => return Ok(Some(s.clone())),
+ None => {
+ return Ok(None);
+ }
+ },
}
}
- None => {
- let root_offset = self.lock_cache_manager.get_offset_non_zero(page_id).await?;
-
- let mut first_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
- root_offset.serialize(&mut first_page_buffer);
- let new_page = vec![0; PAGE_SIZE as usize - size_of::()];
- first_page_buffer.extend_from_slice(&new_page);
-
- first_page_handle.replace(first_page_buffer);
-
- (root_offset, true)
- }
- };
-
- //Now we know where root is, let's get it
- let root_page_handle = self
- .lock_cache_manager
- .get_page_for_update(page_id, &root_offset)
- .await?;
-
- if changed {
- self.lock_cache_manager
- .update_page(page_id, PageOffset(0), first_page_handle)
- .await?
- }
-
- Ok((root_page_handle, root_offset))
- }
- /*
- pub async fn search_for_key(
- &self,
- index_def: &Index,
- key: &SqlTuple,
- ) -> Result>, IndexManagerError> {
- let (root_node, root_offset) = self.get_root_node(index_def).await?;
- match root_node {
- BTreeNode::Branch(b) => {
- todo!("blah")
- }
- BTreeNode::Leaf(l) => match l.nodes.get(key) {
- Some(s) => Ok(Some(s.clone())),
- None => Ok(None),
- },
- }
- }*/
-
- ///This function provides a mapping given an oversized bucket of how the leaf should be split
- /// Returns:
- /// * Left node range
- /// * Node for lifting up to the parent (will be the same as the last left entry in the list)
- /// * Right node range
- fn map_split_node(
- old_nodes_count: usize,
- ) -> Result<(Range, usize, Range), IndexManagerError> {
- if old_nodes_count < 2 {
- return Err(IndexManagerError::UnableToSplit(old_nodes_count));
+ } else {
+ Ok(None)
}
-
- let mut midpoint = old_nodes_count / 2;
- if old_nodes_count % 2 == 0 {
- midpoint += 1;
- }
-
- Ok((
- (0..midpoint - 1),
- midpoint - 1,
- (midpoint..old_nodes_count - 1),
- ))
}
/// This provides the requested node based on the page, if it exists
@@ -277,7 +271,7 @@ impl IndexManager {
let page_buffer = page_handle.clone();
match page_buffer {
- Some(page) => Ok(BTreeNode::parse(&mut page.freeze(), index_def)?),
+ Some(mut page) => Ok(BTreeNode::parse(&mut page, index_def)?),
None => Err(IndexManagerError::NoSuchNode(*offset)),
}
}
@@ -288,6 +282,8 @@ pub enum IndexManagerError {
#[error(transparent)]
BTreeBranchError(#[from] BTreeBranchError),
#[error(transparent)]
+ BTreeFirstPageError(#[from] BTreeFirstPageError),
+ #[error(transparent)]
BTreeLeafError(#[from] BTreeLeafError),
#[error(transparent)]
BTreeNodeError(#[from] BTreeNodeError),
@@ -295,6 +291,8 @@ pub enum IndexManagerError {
"Another process made the root index page first, maybe the developer should make locking."
)]
ConcurrentCreationError(),
+ #[error(transparent)]
+ FindLeafError(#[from] FindLeafError),
#[error("Key too large size: {0}, maybe the developer should fix this.")]
KeyTooLarge(usize),
#[error(transparent)]
@@ -307,12 +305,18 @@ pub enum IndexManagerError {
ParentNodeEmpty(),
#[error("Root Node Empty")]
RootNodeEmpty(),
+ #[error(transparent)]
+ SplitLeafError(#[from] SplitLeafError),
#[error("Unable to search, the stack is empty")]
StackEmpty(),
#[error(transparent)]
TryFromIntError(#[from] TryFromIntError),
#[error("Unable to split a node of size {0}")]
UnableToSplit(usize),
+ #[error("Unexpect Branch at offset {0}")]
+ UnexpectedBranch(PageOffset),
+ #[error("Unexpect Leaf at offset {0}")]
+ UnexpectedLeaf(PageOffset),
}
#[cfg(test)]
@@ -325,7 +329,7 @@ mod tests {
use crate::{
constants::Nullable,
engine::{
- io::{page_formats::UInt12, FileManager},
+ io::{block_layer::file_manager::FileManager, page_formats::UInt12},
objects::{
types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition},
Attribute,
@@ -334,6 +338,8 @@ mod tests {
};
use super::*;
+ use log::LevelFilter;
+ use simplelog::{ColorChoice, CombinedLogger, Config, TermLogger, TerminalMode};
fn get_key_and_ptr(num: usize) -> (SqlTuple, ItemPointer) {
(
@@ -347,11 +353,18 @@ mod tests {
#[tokio::test]
async fn test_roundtrip() -> Result<(), Box> {
+ CombinedLogger::init(vec![TermLogger::new(
+ LevelFilter::Debug,
+ Config::default(),
+ TerminalMode::Mixed,
+ ColorChoice::Auto,
+ )])?;
+
let tmp = TempDir::new()?;
let tmp_dir = tmp.path().as_os_str().to_os_string();
let fm = Arc::new(FileManager::new(tmp_dir)?);
- let lm = LockCacheManager::new(fm);
+ let lm = LockCacheManager::new(fm.clone());
let im = IndexManager::new(lm);
let index = Index {
@@ -374,11 +387,16 @@ mod tests {
unique: true,
};
- for i in 0..5000 {
+ for i in 0..1000 {
let (key, ptr) = get_key_and_ptr(i);
im.add(&index, key, ptr).await?;
}
+ let (key, ptr) = get_key_and_ptr(999);
+ assert_eq!(Some(vec![ptr]), im.search_for_key(&index, &key).await?);
+
+ fm.shutdown().await?;
+
Ok(())
}
}
diff --git a/src/engine/io/index_manager/find_leaf.rs b/src/engine/io/index_manager/find_leaf.rs
new file mode 100644
index 0000000..05430bd
--- /dev/null
+++ b/src/engine/io/index_manager/find_leaf.rs
@@ -0,0 +1,171 @@
+use crate::engine::{
+ io::{
+ block_layer::lock_cache_manager::{LockCacheManager, LockCacheManagerError},
+ format_traits::{Parseable, Serializable},
+ index_formats::{
+ BTreeBranchError, BTreeFirstPage, BTreeFirstPageError, BTreeLeaf, BTreeNode,
+ BTreeNodeError,
+ },
+ page_formats::{PageId, PageOffset, PageType},
+ },
+ objects::{Index, SqlTuple},
+};
+use bytes::{Bytes, BytesMut};
+use thiserror::Error;
+use tokio::sync::OwnedRwLockWriteGuard;
+
+pub async fn find_leaf(
+ lcm: &LockCacheManager,
+ index_def: &Index,
+ new_key: &SqlTuple,
+) -> Result<(OwnedRwLockWriteGuard>, PageOffset, BTreeLeaf), FindLeafError> {
+ let page_id = PageId {
+ resource_key: index_def.id,
+ page_type: PageType::Data,
+ };
+
+ let mut prior_offset = PageOffset(0);
+ let mut offset = PageOffset(0);
+
+ loop {
+ let mut page = lcm.get_page_for_update(page_id, &offset).await?;
+
+ //Handle the first page
+ if offset == PageOffset(0) {
+ offset = match page.as_mut() {
+ Some(s) => {
+ let mut page_node = BTreeFirstPage::parse(&mut s.clone())?;
+ if page_node.root_offset == PageOffset(0) {
+ debug!("root is zero");
+ page_node.root_offset = lcm.get_offset_non_zero(page_id).await?;
+
+ page_node.serialize_and_pad(&mut page);
+ lcm.update_page(page_id, offset, page).await?;
+ }
+ page_node.root_offset
+ }
+ None => {
+ debug!("page doesn't exist");
+ let root_offset = lcm.get_offset_non_zero(page_id).await?;
+ let page_node = BTreeFirstPage { root_offset };
+
+ page_node.serialize_and_pad(&mut page);
+ lcm.update_page(page_id, offset, page).await?;
+
+ page_node.root_offset
+ }
+ };
+ continue;
+ }
+
+ match page.as_mut() {
+ None => {
+ //Special case, should only be due to root not existing
+ return Ok((page, offset, BTreeLeaf::new(prior_offset)));
+ }
+ Some(s) => {
+ let node = BTreeNode::parse(s, index_def)?;
+
+ prior_offset = offset;
+ match node {
+ BTreeNode::Branch(b) => {
+ offset = *b.search(new_key..new_key)?;
+ continue;
+ }
+ BTreeNode::Leaf(l) => {
+ return Ok((page, offset, l));
+ }
+ }
+ }
+ }
+ }
+}
+
+#[derive(Debug, Error)]
+pub enum FindLeafError {
+ #[error(transparent)]
+ BTreeBranchError(#[from] BTreeBranchError),
+ #[error(transparent)]
+ BTreeFirstPageError(#[from] BTreeFirstPageError),
+ #[error(transparent)]
+ BTreeNodeError(#[from] BTreeNodeError),
+ #[error(transparent)]
+ LockCacheManagerError(#[from] LockCacheManagerError),
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use tempfile::TempDir;
+ use uuid::Uuid;
+
+ use crate::engine::{
+ io::{
+ block_layer::file_manager::FileManager, index_manager::find_leaf, page_formats::UInt12,
+ row_formats::ItemPointer,
+ },
+ objects::types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition},
+ };
+
+ use super::*;
+
+ //Super unsafe function to get test data, just don't count too high
+ fn get_key(index: usize) -> (SqlTuple, ItemPointer) {
+ (
+ SqlTuple(vec![Some(BaseSqlTypes::Integer(index as u32))]),
+ ItemPointer::new(PageOffset(index), UInt12::new(index as u16).unwrap()),
+ )
+ }
+
+ #[tokio::test]
+ async fn test_find() -> Result<(), Box> {
+ let tmp = TempDir::new()?;
+ let tmp_dir = tmp.path().as_os_str().to_os_string();
+
+ let fm = Arc::new(FileManager::new(tmp_dir)?);
+ let lm = LockCacheManager::new(fm.clone());
+
+ let index = Index {
+ id: Uuid::new_v4(),
+ name: "test".to_string(),
+ columns: Arc::new(SqlTypeDefinition(vec![(
+ "foo".to_string(),
+ BaseSqlTypesMapper::Integer,
+ )])),
+ unique: false,
+ };
+ let page_id = PageId {
+ resource_key: index.id,
+ page_type: PageType::Data,
+ };
+
+ let first_offset = lm.get_offset(page_id).await?;
+ assert_eq!(first_offset, PageOffset(0));
+
+ let mut first_page = lm.get_page_for_update(page_id, &first_offset).await?;
+ let root_offset = lm.get_offset_non_zero(page_id).await?;
+ assert_ne!(root_offset, PageOffset(0));
+ let mut root_page = lm.get_page_for_update(page_id, &root_offset).await?;
+
+ let btfp = BTreeFirstPage { root_offset };
+ btfp.serialize_and_pad(&mut first_page);
+ lm.update_page(page_id, first_offset, first_page).await?;
+
+ let mut root = BTreeLeaf::new(first_offset);
+ let (key, ptr) = get_key(42);
+ root.add(key.clone(), ptr)?;
+ root.serialize_and_pad(&mut root_page);
+ lm.update_page(page_id, root_offset, root_page).await?;
+
+ // Okay now its time to actually test
+ let (_, offset, leaf) = find_leaf(&lm, &index, &key).await?;
+ assert_eq!(leaf, root);
+ assert_ne!(offset, PageOffset(0));
+
+ let (_, offset2, leaf2) = find_leaf(&lm, &index, &key).await?;
+ assert_eq!(leaf2, root);
+ assert_eq!(offset, offset2);
+ Ok(())
+ }
+}
diff --git a/src/engine/io/index_manager/split_leaf.rs b/src/engine/io/index_manager/split_leaf.rs
new file mode 100644
index 0000000..5d8da11
--- /dev/null
+++ b/src/engine/io/index_manager/split_leaf.rs
@@ -0,0 +1,150 @@
+use crate::engine::{
+ io::{
+ block_layer::lock_cache_manager::{LockCacheManager, LockCacheManagerError},
+ format_traits::Serializable,
+ index_formats::{BTreeLeaf, BTreeLeafError},
+ page_formats::{PageId, PageOffset, PageType},
+ row_formats::ItemPointer,
+ },
+ objects::{Index, SqlTuple},
+};
+use thiserror::Error;
+
+/// Takes a leaf node and produces a new right node
+pub async fn split_leaf(
+ lcm: &LockCacheManager,
+ index_def: &Index,
+ mut leaf: BTreeLeaf,
+ new_key: SqlTuple,
+ item_ptr: ItemPointer,
+) -> Result<(SqlTuple, PageOffset, PageOffset, PageOffset), SplitLeafError> {
+ let page_id = PageId {
+ resource_key: index_def.id,
+ page_type: PageType::Data,
+ };
+
+ let left_node_offset = lcm.get_offset_non_zero(page_id).await?;
+ let right_node_offset = lcm.get_offset_non_zero(page_id).await?;
+
+ let mut left_node_page = lcm.get_page_for_update(page_id, &left_node_offset).await?;
+ let mut right_node_page = lcm.get_page_for_update(page_id, &right_node_offset).await?;
+
+ let (new_split_key, new_right_node) =
+ leaf.add_and_split(left_node_offset, right_node_offset, new_key, item_ptr)?;
+
+ let parent_node_offset = leaf.parent_node;
+
+ leaf.serialize_and_pad(&mut left_node_page);
+ new_right_node.serialize_and_pad(&mut right_node_page);
+
+ lcm.update_page(page_id, left_node_offset, left_node_page)
+ .await?;
+ lcm.update_page(page_id, right_node_offset, right_node_page)
+ .await?;
+
+ Ok((
+ new_split_key,
+ parent_node_offset,
+ left_node_offset,
+ right_node_offset,
+ ))
+}
+
+#[derive(Debug, Error)]
+pub enum SplitLeafError {
+ #[error(transparent)]
+ BTreeLeafError(#[from] BTreeLeafError),
+ #[error(transparent)]
+ LockCacheManagerError(#[from] LockCacheManagerError),
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use tempfile::TempDir;
+ use uuid::Uuid;
+
+ use crate::engine::{
+ io::{
+ block_layer::file_manager::FileManager, index_formats::BTreeNode, page_formats::UInt12,
+ },
+ objects::types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition},
+ };
+
+ use super::*;
+
+ //Super unsafe function to get test data, just don't count too high
+ fn get_key(index: usize) -> (SqlTuple, ItemPointer) {
+ (
+ SqlTuple(vec![Some(BaseSqlTypes::Integer(index as u32))]),
+ ItemPointer::new(PageOffset(index), UInt12::new(index as u16).unwrap()),
+ )
+ }
+
+ #[tokio::test]
+ async fn test_split_leaf() -> Result<(), Box> {
+ let tmp = TempDir::new()?;
+ let tmp_dir = tmp.path().as_os_str().to_os_string();
+
+ let fm = Arc::new(FileManager::new(tmp_dir)?);
+ let lcm = LockCacheManager::new(fm);
+
+ let index = Index {
+ id: Uuid::new_v4(),
+ name: "test".to_string(),
+ columns: Arc::new(SqlTypeDefinition(vec![(
+ "foo".to_string(),
+ BaseSqlTypesMapper::Integer,
+ )])),
+ unique: false,
+ };
+ let page_id = PageId {
+ resource_key: index.id,
+ page_type: PageType::Data,
+ };
+
+ let parent_offset = lcm.get_offset_non_zero(page_id).await?;
+ let mut leaf = BTreeLeaf::new(parent_offset);
+ let leaf_size = leaf.nodes.len();
+
+ for i in 0..10 {
+ let (key, ptr) = get_key(i);
+ if leaf.can_fit(&key) {
+ leaf.add(key, ptr)?;
+ }
+ }
+
+ let (key, ptr) = get_key(11);
+
+ let (split_key, parent_node, left_offset, right_offset) =
+ split_leaf(&lcm, &index, leaf, key, ptr).await?;
+
+ let left_page = lcm.get_page(page_id, &left_offset).await?;
+ let mut left_buffer = left_page.as_ref().unwrap().clone();
+ let left_node = match BTreeNode::parse(&mut left_buffer, &index)? {
+ BTreeNode::Branch(_) => panic!("Unexpected branch"),
+ BTreeNode::Leaf(l) => l,
+ };
+
+ let right_page = lcm.get_page(page_id, &right_offset).await?;
+ let mut right_buffer = right_page.as_ref().unwrap().clone();
+ let right_node = match BTreeNode::parse(&mut right_buffer, &index)? {
+ BTreeNode::Branch(_) => panic!("Unexpected branch"),
+ BTreeNode::Leaf(l) => l,
+ };
+
+ assert_eq!(parent_node, left_node.parent_node);
+ assert_eq!(parent_node, right_node.parent_node);
+
+ for n in left_node.nodes {
+ assert!(n.0 <= split_key);
+ }
+
+ for n in right_node.nodes {
+ assert!(n.0 > split_key);
+ }
+
+ Ok(())
+ }
+}
diff --git a/src/engine/io/page_formats/page_data.rs b/src/engine/io/page_formats/page_data.rs
index c1d297e..500d51b 100644
--- a/src/engine/io/page_formats/page_data.rs
+++ b/src/engine/io/page_formats/page_data.rs
@@ -9,7 +9,7 @@ use super::{
ItemIdData, ItemIdDataError, PageHeader, PageHeaderError, PageOffset, UInt12, UInt12Error,
};
use async_stream::stream;
-use bytes::{BufMut, BytesMut};
+use bytes::{BufMut, Bytes};
use futures::stream::Stream;
use std::convert::TryFrom;
use std::sync::Arc;
@@ -99,7 +99,7 @@ impl PageData {
pub fn parse(
table: Arc,
page: PageOffset,
- buffer: &BytesMut,
+ buffer: &Bytes,
) -> Result {
//Note since we need random access, everything MUST work off slices otherwise counts will be off
@@ -202,7 +202,7 @@ mod tests {
pd.serialize(&mut serial);
assert_eq!(PAGE_SIZE as usize, serial.len());
- let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial).unwrap();
+ let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial.freeze()).unwrap();
pin_mut!(pg_parsed);
let result_rows: Vec = pg_parsed.get_stream().collect().await;
@@ -241,7 +241,7 @@ mod tests {
}
let mut serial = BytesMut::with_capacity(PAGE_SIZE as usize);
pd.serialize(&mut serial);
- let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial).unwrap();
+ let pg_parsed = PageData::parse(table.clone(), PageOffset(0), &serial.freeze()).unwrap();
pin_mut!(pg_parsed);
let result_rows: Vec = pg_parsed.get_stream().collect().await;
diff --git a/src/engine/io/page_formats/page_id.rs b/src/engine/io/page_formats/page_id.rs
index 5ee8b7f..187da1c 100644
--- a/src/engine/io/page_formats/page_id.rs
+++ b/src/engine/io/page_formats/page_id.rs
@@ -1,5 +1,6 @@
//! A struct to uniquely identify a page in all operations. This replaces adding additional arguments everywhere.
+use crate::engine::io::block_layer::file_manager::ResourceFormatter;
use nom::{
bytes::complete::tag_no_case,
error::{convert_error, make_error, ContextError, ErrorKind, ParseError, VerboseError},
@@ -12,8 +13,6 @@ use std::{
use thiserror::Error;
use uuid::Uuid;
-use crate::engine::io::file_manager::ResourceFormatter;
-
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub struct PageId {
pub resource_key: Uuid,
diff --git a/src/engine/io/row_manager.rs b/src/engine/io/row_manager.rs
index 5e830f5..f3c8540 100644
--- a/src/engine/io/row_manager.rs
+++ b/src/engine/io/row_manager.rs
@@ -1,16 +1,14 @@
use super::super::objects::Table;
use super::super::transactions::TransactionId;
+use super::block_layer::file_manager::FileManagerError;
+use super::block_layer::free_space_manager::{FreeSpaceManager, FreeSpaceManagerError, FreeStat};
+use super::block_layer::lock_cache_manager::{LockCacheManager, LockCacheManagerError};
use super::format_traits::Serializable;
-use super::free_space_manager::{FreeSpaceManagerError, FreeStat};
use super::page_formats::{PageData, PageDataError, PageId, PageOffset, PageType, UInt12};
use super::row_formats::{ItemPointer, RowData, RowDataError};
-use super::{
- EncodedSize, FileManagerError, FreeSpaceManager, LockCacheManager, LockCacheManagerError,
-};
-use crate::constants::PAGE_SIZE;
+use super::EncodedSize;
use crate::engine::objects::SqlTuple;
use async_stream::try_stream;
-use bytes::BytesMut;
use futures::stream::Stream;
use std::sync::Arc;
use thiserror::Error;
@@ -60,9 +58,10 @@ impl RowManager {
.await?;
let page_buffer = page_handle
.as_mut()
- .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?;
+ .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?
+ .clone();
- let mut page = PageData::parse(table, row_pointer.page, page_buffer)?;
+ let mut page = PageData::parse(table, row_pointer.page, &page_buffer)?;
let mut row = page
.get_row(row_pointer.count)
.ok_or(RowManagerError::NonExistentRow(
@@ -81,8 +80,7 @@ impl RowManager {
row.max = Some(current_tran_id);
page.update(row, row_pointer.count)?;
- page_buffer.clear();
- page.serialize(page_buffer);
+ page.serialize_and_pad(&mut page_handle);
self.lock_cache_manager
.update_page(page_id, row_pointer.page, page_handle)
@@ -110,9 +108,10 @@ impl RowManager {
.await?;
let old_page_buffer = old_page_handle
.as_mut()
- .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?;
+ .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?
+ .clone();
- let mut old_page = PageData::parse(table.clone(), row_pointer.page, old_page_buffer)?;
+ let mut old_page = PageData::parse(table.clone(), row_pointer.page, &old_page_buffer)?;
let mut old_row = old_page
.get_row(row_pointer.count)
@@ -145,9 +144,7 @@ impl RowManager {
old_row.max = Some(current_tran_id);
old_row.item_pointer = new_row_pointer;
old_page.update(old_row, row_pointer.count)?;
-
- old_page_buffer.clear();
- old_page.serialize(old_page_buffer);
+ old_page.serialize_and_pad(&mut old_page_handle);
self.lock_cache_manager
.update_page(page_id, row_pointer.page, old_page_handle)
@@ -239,9 +236,8 @@ impl RowManager {
Some(p) => {
let mut page = PageData::parse(table.clone(), next_free_page, p)?;
if page.can_fit(RowData::encoded_size(&user_data)) {
- p.clear(); //We're going to reuse the buffer
let new_row_pointer = page.insert(current_tran_id, &table, user_data)?;
- page.serialize(p);
+ page.serialize_and_pad(&mut page_bytes);
self.lock_cache_manager
.update_page(page_id, next_free_page, page_bytes)
.await?;
@@ -266,10 +262,7 @@ impl RowManager {
let mut new_page = PageData::new(new_page_offset);
let new_row_pointer = new_page.insert(current_tran_id, &table, user_data)?; //TODO Will NOT handle overly large rows
- let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
- new_page.serialize(&mut buffer);
-
- new_page_handle.replace(buffer);
+ new_page.serialize_and_pad(&mut new_page_handle);
self.lock_cache_manager
.add_page(page_id, new_page_offset, new_page_handle)
@@ -308,7 +301,7 @@ mod tests {
use super::*;
use crate::engine::get_row;
use crate::engine::get_table;
- use crate::engine::io::FileManager;
+ use crate::engine::io::block_layer::file_manager::FileManager;
use futures::pin_mut;
use tempfile::TempDir;
use tokio_stream::StreamExt;
diff --git a/src/feophant.rs b/src/feophant.rs
index 66ca3ed..44a8281 100644
--- a/src/feophant.rs
+++ b/src/feophant.rs
@@ -1,7 +1,7 @@
use crate::{
codec::{NetworkFrame, PgCodec},
engine::{
- io::{FileManager, FileManagerError},
+ io::block_layer::file_manager::{FileManager, FileManagerError},
transactions::TransactionManager,
Engine,
},
diff --git a/tests/visibility_tests.rs b/tests/visibility_tests.rs
index 2a63d06..54d49a3 100644
--- a/tests/visibility_tests.rs
+++ b/tests/visibility_tests.rs
@@ -1,6 +1,10 @@
use feophantlib::engine::{
get_row, get_table,
- io::{row_formats::RowData, FileManager, LockCacheManager, RowManager, VisibleRowManager},
+ io::{
+ block_layer::{file_manager::FileManager, lock_cache_manager::LockCacheManager},
+ row_formats::RowData,
+ RowManager, VisibleRowManager,
+ },
transactions::TransactionManager,
};
use futures::stream::StreamExt;