diff --git a/README.md b/README.md index 0e03cff..b6bdcbd 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,9 @@ Implement Free Space Maps so mutation of data doesn't need a linear scan/parse n Did more thinking, I should implement postgres's streams concept so that I don't need to do lookups to find associated metadata on an object. I thought I was going to get to use Uuid+page offset. I think its now going to be uuid+page offset+ type. -struct PageId + enum PageType should do it +struct PageId + enum PageType should do it (done). + +So I have a fully ready free space map but I can't avoid the locking issue anymore despite it also being the next item on the todo list. **TODO** Implement page level locks that are ordered to avoid deadlocking. diff --git a/src/engine/io.rs b/src/engine/io.rs index b40f5c2..e8001ad 100644 --- a/src/engine/io.rs +++ b/src/engine/io.rs @@ -16,10 +16,13 @@ mod file_manager; pub use file_manager::FileManager; pub use file_manager::FileManagerError; +mod free_space_manager; +pub use free_space_manager::FreeSpaceManager; + mod lock_manager; pub use lock_manager::LockManager; -mod page_formats; +pub mod page_formats; pub mod row_formats; diff --git a/src/engine/io/file_manager.rs b/src/engine/io/file_manager.rs index bc5e035..03f0700 100644 --- a/src/engine/io/file_manager.rs +++ b/src/engine/io/file_manager.rs @@ -2,7 +2,7 @@ //! since the prior approach was too lock heavy and I couldn't figure out an approach that didn't starve resources. use super::page_formats::{PageId, PageOffset, UInt12, UInt12Error}; use async_stream::try_stream; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::Stream; use std::convert::TryFrom; use std::ffi::OsString; @@ -75,7 +75,7 @@ impl FileManager { &self, page_id: &PageId, offset: &PageOffset, - ) -> Result, FileManagerError> { + ) -> Result, FileManagerError> { let (res_request, res_receiver) = oneshot::channel(); self.request_queue @@ -87,7 +87,7 @@ impl FileManager { pub fn get_stream( &self, page_id: &PageId, - ) -> impl Stream, FileManagerError>> { + ) -> impl Stream, FileManagerError>> { let request_queue = self.request_queue.clone(); let page_id = *page_id; diff --git a/src/engine/io/file_manager/file_operations.rs b/src/engine/io/file_manager/file_operations.rs index 633b8b2..73af02e 100644 --- a/src/engine/io/file_manager/file_operations.rs +++ b/src/engine/io/file_manager/file_operations.rs @@ -69,7 +69,7 @@ impl FileOperations { pub async fn read_chunk( mut file: File, page_offset: &PageOffset, - ) -> Result<(File, Option), FileOperationsError> { + ) -> Result<(File, Option), FileOperationsError> { let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); let file_meta = file.metadata().await?; @@ -89,7 +89,7 @@ impl FileOperations { } } - Ok((file, Some(buffer.freeze()))) + Ok((file, Some(buffer))) } pub async fn update_chunk( diff --git a/src/engine/io/file_manager/request_type.rs b/src/engine/io/file_manager/request_type.rs index 2ea5c34..510628f 100644 --- a/src/engine/io/file_manager/request_type.rs +++ b/src/engine/io/file_manager/request_type.rs @@ -1,5 +1,5 @@ use crate::engine::io::page_formats::PageOffset; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use tokio::sync::oneshot::Sender; use super::file_executor::FileExecutorError; @@ -7,6 +7,11 @@ use super::file_executor::FileExecutorError; #[derive(Debug)] pub enum RequestType { Add((Bytes, Sender>)), - Read((PageOffset, Sender, FileExecutorError>>)), + Read( + ( + PageOffset, + Sender, FileExecutorError>>, + ), + ), Update((PageOffset, Bytes, Sender>)), } diff --git a/src/engine/io/free_space_manager.rs b/src/engine/io/free_space_manager.rs index a82e6c0..f07a641 100644 --- a/src/engine/io/free_space_manager.rs +++ b/src/engine/io/free_space_manager.rs @@ -1,2 +1,215 @@ //! This struct provides a lookup service to tell row_manager where / if there is free space to -//! add new tuples. +//! add new tuples. It is designed to be extremely space efficent since it only uses 1 bit per +//! page to say the space is availible. This means each page here can cover 134MB of free space. + +use crate::constants::PAGE_SIZE; + +use super::{ + file_manager, + page_formats::{PageId, PageOffset, PageType}, + FileManager, FileManagerError, LockManager, +}; +use bytes::{Buf, Bytes, BytesMut}; +use lru::LruCache; +use thiserror::Error; + +const MAX_FREESPACE_COUNT: usize = 32; + +pub struct FreeSpaceManager { + file_manager: FileManager, + freespace_cache: LruCache<(PageId, PageOffset), Bytes>, + lock_manager: LockManager, +} + +impl FreeSpaceManager { + pub fn new(file_manager: FileManager, lock_manager: LockManager) -> FreeSpaceManager { + FreeSpaceManager { + file_manager, + freespace_cache: LruCache::new(MAX_FREESPACE_COUNT), + lock_manager, + } + } + + pub async fn get_next_free_page( + &self, + page_id: PageId, + ) -> Result { + let mut offset = PageOffset(0); + let free_id = PageId { + resource_key: page_id.resource_key, + page_type: PageType::FreeSpaceMap, + }; + loop { + let lock = self.lock_manager.get_lock(&free_id, &offset).await; + let lock_read = lock.read(); + match self.file_manager.get_page(&free_id, &offset).await? { + Some(mut s) => match Self::find_first_free_page_in_page(&mut s) { + Some(s) => { + let full_offset = + PageOffset(s) + offset * PageOffset(PAGE_SIZE as usize) * PageOffset(8); + return Ok(full_offset); + } + None => { + offset += PageOffset(1); + continue; + } + }, + None => { + let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize); + let new_page = vec![0; PAGE_SIZE as usize]; + buffer.extend_from_slice(&new_page); + + //TODO: So I have the read lock now for the add, need figure out how do this sanely in code + let new_page = self + .file_manager + .add_page(&page_id, buffer.freeze()) + .await?; + let new_offset = new_page * PageOffset(PAGE_SIZE as usize) * PageOffset(8); + return Ok(new_offset); + } + } + } + } + + pub async fn mark_page( + &self, + page_id: PageId, + po: PageOffset, + status: FreeStat, + ) -> Result<(), FreeSpaceManagerError> { + let (po, offset) = po.get_bitmask_offset(); + match self.file_manager.get_page(&page_id, &po).await? { + Some(mut page) => { + Self::set_status_inside_page(&mut page, offset, status); + self.file_manager + .update_page(&page_id, page.freeze(), &po) + .await?; + Ok(()) + } + None => Err(FreeSpaceManagerError::PageDoesNotExist(page_id)), + } + } + + fn find_first_free_page_in_page(buffer: &mut impl Buf) -> Option { + let mut i = 0; + while buffer.has_remaining() { + let mut val = buffer.get_u8(); + if val == 0xFF { + i += 1; + continue; + } + for j in 0..8 { + if val & 0x1 == 0x0 { + return Some(i * 8 + j); + } + val >>= 1; + } + i += 1; + } + None + } + + /// Gets the status of a field inside a page, you MUST pass an offset + /// that fits in the buffer. + fn get_status_inside_page(buffer: &BytesMut, offset: usize) -> FreeStat { + let offset_index = offset / 8; + let offset_subindex = offset % 8; + + let offset_value = buffer[offset_index]; + let bit_value = (offset_value >> offset_subindex) & 0x1; + if bit_value == 0 { + FreeStat::Free + } else { + FreeStat::InUse + } + } + + /// Sets the status of a field inside a page, you MUST pass an offset + /// that fits in the buffer. + fn set_status_inside_page(buffer: &mut BytesMut, offset: usize, status: FreeStat) { + let offset_index = offset / 8; + let offset_subindex = offset % 8; + + let current_value = buffer[offset_index]; + let mut pre_load = 0x1 << offset_subindex; + let new_value; + match status { + FreeStat::Free => { + pre_load = !pre_load; + new_value = current_value & pre_load; + } + FreeStat::InUse => { + new_value = current_value | pre_load; + } + } + + buffer[offset_index] = new_value; + } +} + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum FreeStat { + Free = 0, + InUse = 1, +} + +#[derive(Debug, Error)] +pub enum FreeSpaceManagerError { + #[error(transparent)] + FileManagerError(#[from] FileManagerError), + #[error("Page Offset {0} doesn't exist")] + PageDoesNotExist(PageId), +} + +#[cfg(test)] +mod tests { + use bytes::BufMut; + + use super::*; + + ///This test works by toggling each bit repeatedly and making sure it gives the correct result each time. + #[test] + fn test_get_and_set() -> Result<(), Box> { + let mut test = BytesMut::with_capacity(2); + test.put_u16(0x0); + + for i in 0..test.capacity() * 8 { + assert_eq!( + FreeSpaceManager::get_status_inside_page(&test, i), + FreeStat::Free + ); + FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse); + assert_eq!( + FreeSpaceManager::get_status_inside_page(&test, i), + FreeStat::InUse + ); + FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::Free); + assert_eq!( + FreeSpaceManager::get_status_inside_page(&test, i), + FreeStat::Free + ); + } + + Ok(()) + } + + #[test] + fn test_find_and_fill_pages() -> Result<(), Box> { + let mut test = BytesMut::with_capacity(2); + test.put_u8(0x0); + test.put_u8(0x0); + + for i in 0..test.len() * 8 { + let free_page = FreeSpaceManager::find_first_free_page_in_page(&mut test.clone()); + assert_eq!(free_page, Some(i)); + + FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse); + } + assert_eq!( + FreeSpaceManager::find_first_free_page_in_page(&mut test.clone()), + None + ); + + Ok(()) + } +} diff --git a/src/engine/io/page_formats/page_id.rs b/src/engine/io/page_formats/page_id.rs index b1e416c..5ee8b7f 100644 --- a/src/engine/io/page_formats/page_id.rs +++ b/src/engine/io/page_formats/page_id.rs @@ -12,16 +12,25 @@ use std::{ use thiserror::Error; use uuid::Uuid; +use crate::engine::io::file_manager::ResourceFormatter; + #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub struct PageId { pub resource_key: Uuid, pub page_type: PageType, } +impl fmt::Display for PageId { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + writeln!(f, "{}", ResourceFormatter::format_uuid(&self.resource_key))?; + writeln!(f, "{}", self.page_type) + } +} + #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub enum PageType { Data, - //FreeSpaceMap, + FreeSpaceMap, //VisibilityMap } @@ -44,9 +53,8 @@ impl PageType { impl Display for PageType { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - PageType::Data => { - write!(f, "data") - } + PageType::Data => write!(f, "data"), + PageType::FreeSpaceMap => write!(f, "fs"), } } } diff --git a/src/engine/io/page_formats/page_offset.rs b/src/engine/io/page_formats/page_offset.rs index 4a3ed2d..11196f0 100644 --- a/src/engine/io/page_formats/page_offset.rs +++ b/src/engine/io/page_formats/page_offset.rs @@ -2,7 +2,12 @@ use crate::{ constants::{PAGES_PER_FILE, PAGE_SIZE}, engine::io::ConstEncodedSize, }; -use std::{fmt, mem::size_of, num::TryFromIntError, ops::AddAssign}; +use std::{ + fmt, + mem::size_of, + num::TryFromIntError, + ops::{Add, AddAssign, Mul}, +}; use thiserror::Error; #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] @@ -33,6 +38,18 @@ impl PageOffset { self.get_file_chunk_size() - PAGE_SIZE as usize } + /// Gets the position of the free/visibility mask for an offset + /// ``` + /// # use crate::feophantlib::engine::io::page_formats::PageOffset; + /// let page = PageOffset(100); + /// assert_eq!(page.get_bitmask_offset(), (PageOffset(0), 100)); + /// ``` + pub fn get_bitmask_offset(&self) -> (PageOffset, usize) { + let offset = self.0 / (PAGE_SIZE as usize * 8); + let inside_offset = self.0 % (PAGE_SIZE as usize * 8); + (PageOffset(offset), inside_offset) + } + /// Determines if a given offset will be the same file or not pub fn is_same_file(&self, rhs: &PageOffset) -> bool { let diff; @@ -50,6 +67,14 @@ impl PageOffset { } } +impl Add for PageOffset { + type Output = PageOffset; + + fn add(self, rhs: Self) -> Self::Output { + PageOffset(self.0 + rhs.0) + } +} + impl AddAssign for PageOffset { fn add_assign(&mut self, other: Self) { self.0.add_assign(other.0); @@ -68,6 +93,14 @@ impl fmt::Display for PageOffset { } } +impl Mul for PageOffset { + type Output = PageOffset; + + fn mul(self, rhs: Self) -> Self::Output { + PageOffset(self.0 * rhs.0) + } +} + #[derive(Debug, Error)] pub enum PageOffsetError { #[error(transparent)] @@ -82,6 +115,12 @@ mod tests { use super::*; + #[test] + fn test_add() -> Result<(), Box> { + assert_eq!(PageOffset(1) + PageOffset(2), PageOffset(3)); + Ok(()) + } + #[test] fn test_add_assign() -> Result<(), Box> { let mut test = PageOffset(1); @@ -90,6 +129,12 @@ mod tests { Ok(()) } + #[test] + fn test_mul() -> Result<(), Box> { + assert_eq!(PageOffset(2) * PageOffset(3), PageOffset(6)); + Ok(()) + } + #[test] fn test_calculate_page_offset() -> Result<(), Box> { assert_eq!(PageOffset::calculate_page_offset(0, 0), PageOffset(0)); diff --git a/src/engine/io/row_manager.rs b/src/engine/io/row_manager.rs index eaf8044..818d550 100644 --- a/src/engine/io/row_manager.rs +++ b/src/engine/io/row_manager.rs @@ -128,7 +128,7 @@ impl RowManager { .get_page(&page_id, &row_pointer.page) .await? .ok_or(RowManagerError::NonExistentPage(row_pointer.page))?; - let page = PageData::parse(table, row_pointer.page, page_bytes)?; + let page = PageData::parse(table, row_pointer.page, page_bytes.freeze())?; let row = page .get_row(row_pointer.count) @@ -157,7 +157,7 @@ impl RowManager { let page_bytes = page_bytes?; match page_bytes { Some(s) => { - let page = PageData::parse(table.clone(), page_num, s)?; + let page = PageData::parse(table.clone(), page_num, s.freeze())?; for await row in page.get_stream() { yield row; } @@ -187,7 +187,7 @@ impl RowManager { let page_bytes = self.file_manager.get_page(&page_id, &page_num).await?; match page_bytes { Some(p) => { - let mut page = PageData::parse(table.clone(), page_num, p)?; + let mut page = PageData::parse(table.clone(), page_num, p.freeze())?; if page.can_fit(RowData::encoded_size(&user_data)) { let new_row_pointer = page.insert(current_tran_id, &table, user_data)?; let new_page_bytes = page.serialize();