From 759b978c0f601e46ebd8852ee2570b4c0d5a96d1 Mon Sep 17 00:00:00 2001 From: Christopher Hotchkiss Date: Sun, 22 Aug 2021 22:55:15 -0400 Subject: [PATCH] Implemented #16 --- src/engine/io/free_space_manager.rs | 51 +++++++++++++----- src/engine/io/lock_cache_manager.rs | 80 +++++++++++++++++++++++++++++ src/engine/io/row_manager.rs | 25 ++++++--- 3 files changed, 136 insertions(+), 20 deletions(-) diff --git a/src/engine/io/free_space_manager.rs b/src/engine/io/free_space_manager.rs index e9e2e2b..6ed7f08 100644 --- a/src/engine/io/free_space_manager.rs +++ b/src/engine/io/free_space_manager.rs @@ -2,30 +2,24 @@ //! add new tuples. It is designed to be extremely space efficent since it only uses 1 bit per //! page to say the space is availible. This means each page here can cover 134MB of free space. -use crate::constants::PAGE_SIZE; - use super::{ - file_manager, page_formats::{PageId, PageOffset, PageType}, - FileManager, FileManagerError, LockCacheManager, LockCacheManagerError, LockManager, + LockCacheManager, LockCacheManagerError, }; -use bytes::{Buf, Bytes, BytesMut}; -use lru::LruCache; +use crate::constants::PAGE_SIZE; +use bytes::{Buf, BytesMut}; use thiserror::Error; const MAX_FREESPACE_COUNT: usize = 32; +#[derive(Clone, Debug)] pub struct FreeSpaceManager { - freespace_cache: LruCache<(PageId, PageOffset), Bytes>, lock_cache_manager: LockCacheManager, } impl FreeSpaceManager { pub fn new(lock_cache_manager: LockCacheManager) -> FreeSpaceManager { - FreeSpaceManager { - freespace_cache: LruCache::new(MAX_FREESPACE_COUNT), - lock_cache_manager, - } + FreeSpaceManager { lock_cache_manager } } pub async fn get_next_free_page( @@ -38,7 +32,7 @@ impl FreeSpaceManager { page_type: PageType::FreeSpaceMap, }; loop { - let mut page_handle = self.lock_cache_manager.get_page(free_id, offset).await?; + let page_handle = self.lock_cache_manager.get_page(free_id, offset).await?; match page_handle.as_ref() { Some(s) => { let mut page_frozen = s.clone().freeze(); @@ -55,10 +49,10 @@ impl FreeSpaceManager { } } None => { + drop(page_handle); //Get the next offset, BUT since there could be a gap, we're going to blindly write all free //and loop to get it again. let next_po = self.lock_cache_manager.get_offset(free_id).await?; - let mut new_page_handle = self .lock_cache_manager .get_page_for_update(free_id, next_po) @@ -178,7 +172,13 @@ pub enum FreeSpaceManagerError { #[cfg(test)] mod tests { + use std::sync::Arc; + use bytes::BufMut; + use tempfile::TempDir; + use uuid::Uuid; + + use crate::engine::io::FileManager; use super::*; @@ -227,4 +227,29 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_get_next() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path().as_os_str().to_os_string(); + + let fm = Arc::new(FileManager::new(tmp_dir)?); + let lm = LockCacheManager::new(fm); + let fsm = FreeSpaceManager::new(lm); + + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; + + let first_free = fsm.get_next_free_page(page_id).await?; + assert_eq!(first_free, PageOffset(0)); + + fsm.mark_page(page_id, first_free, FreeStat::InUse).await?; + + let second_free = fsm.get_next_free_page(page_id).await?; + assert_eq!(second_free, PageOffset(1)); + + Ok(()) + } } diff --git a/src/engine/io/lock_cache_manager.rs b/src/engine/io/lock_cache_manager.rs index 73a7488..b2f45e8 100644 --- a/src/engine/io/lock_cache_manager.rs +++ b/src/engine/io/lock_cache_manager.rs @@ -127,3 +127,83 @@ pub enum LockCacheManagerError { #[error("Cannot update a page without contents")] PageMissing(), } + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + use uuid::Uuid; + + use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType}; + + use super::*; + + fn get_test_page(fill: u8) -> BytesMut { + let mut test_page = BytesMut::with_capacity(PAGE_SIZE as usize); + let free_space = vec![fill; PAGE_SIZE as usize]; + test_page.extend_from_slice(&free_space); + test_page + } + + #[tokio::test] + async fn test_roundtrip() -> Result<(), Box> { + let tmp = TempDir::new()?; + let tmp_dir = tmp.path().as_os_str().to_os_string(); + + let fm = Arc::new(FileManager::new(tmp_dir)?); + let lm = LockCacheManager::new(fm); + + let page_id = PageId { + resource_key: Uuid::new_v4(), + page_type: PageType::Data, + }; + + let first_offset = lm.get_offset(page_id).await?; + assert_eq!(first_offset, PageOffset(0)); + + let first_handle = lm.get_page(page_id, first_offset).await?; + assert_eq!(first_handle.as_ref(), None); + drop(first_handle); + + let mut second_handle = lm.get_page_for_update(page_id, first_offset).await?; + assert_eq!(second_handle.as_ref(), None); + + let page = get_test_page(1); + second_handle.replace(page); + + lm.update_page(page_id, first_offset, second_handle).await?; + + let third_handle = lm.get_page(page_id, first_offset).await?; + let page2 = get_test_page(1); + assert_eq!(third_handle.as_ref(), Some(&page2)); + + let fourth_offset = lm.get_offset(page_id).await?; + assert_eq!(fourth_offset, PageOffset(1)); + + let mut fourth_handle = lm.get_page_for_update(page_id, fourth_offset).await?; + assert_eq!(fourth_handle.as_ref(), None); + + let page3 = get_test_page(2); + fourth_handle.replace(page3); + lm.add_page(page_id, fourth_offset, fourth_handle).await?; + + let mut fifth_handle = lm.get_page_for_update(page_id, fourth_offset).await?; + let fifth_page = fifth_handle + .as_mut() + .ok_or(LockCacheManagerError::PageMissing())?; + fifth_page.clear(); + + let page4 = get_test_page(3); + fifth_page.extend_from_slice(&page4[0..page4.len()]); + lm.update_page(page_id, fourth_offset, fifth_handle).await?; + + let mut sixth_handle = lm.get_page_for_update(page_id, fourth_offset).await?; + let sixth_page = sixth_handle + .as_mut() + .ok_or(LockCacheManagerError::PageMissing())?; + + let test_page = get_test_page(3); + assert_eq!(sixth_page, &test_page); + + Ok(()) + } +} diff --git a/src/engine/io/row_manager.rs b/src/engine/io/row_manager.rs index 7cf6997..cfd03a2 100644 --- a/src/engine/io/row_manager.rs +++ b/src/engine/io/row_manager.rs @@ -1,8 +1,11 @@ use super::super::objects::Table; use super::super::transactions::TransactionId; +use super::free_space_manager::{FreeSpaceManagerError, FreeStat}; use super::page_formats::{PageData, PageDataError, PageId, PageOffset, PageType, UInt12}; use super::row_formats::{ItemPointer, RowData, RowDataError}; -use super::{EncodedSize, FileManagerError, LockCacheManager, LockCacheManagerError}; +use super::{ + EncodedSize, FileManagerError, FreeSpaceManager, LockCacheManager, LockCacheManagerError, +}; use crate::constants::PAGE_SIZE; use crate::engine::objects::SqlTuple; use async_stream::try_stream; @@ -16,12 +19,16 @@ use thiserror::Error; /// It operates at the lowest level, no visibility checks are done. #[derive(Clone, Debug)] pub struct RowManager { + free_space_manager: FreeSpaceManager, lock_cache_manager: LockCacheManager, } impl RowManager { pub fn new(lock_cache_manager: LockCacheManager) -> RowManager { - RowManager { lock_cache_manager } + RowManager { + free_space_manager: FreeSpaceManager::new(lock_cache_manager.clone()), + lock_cache_manager, + } } pub async fn insert_row( @@ -221,25 +228,27 @@ impl RowManager { page_type: PageType::Data, }; - let mut page_num = PageOffset(0); loop { + let next_free_page = self.free_space_manager.get_next_free_page(page_id).await?; let mut page_bytes = self .lock_cache_manager - .get_page_for_update(page_id, page_num) + .get_page_for_update(page_id, next_free_page) .await?; match page_bytes.as_mut() { Some(p) => { - let mut page = PageData::parse(table.clone(), page_num, p)?; + let mut page = PageData::parse(table.clone(), next_free_page, p)?; if page.can_fit(RowData::encoded_size(&user_data)) { p.clear(); //We're going to reuse the buffer let new_row_pointer = page.insert(current_tran_id, &table, user_data)?; page.serialize(p); self.lock_cache_manager - .update_page(page_id, page_num, page_bytes) + .update_page(page_id, next_free_page, page_bytes) .await?; return Ok(new_row_pointer); } else { - page_num += PageOffset(1); + self.free_space_manager + .mark_page(page_id, next_free_page, FreeStat::InUse) + .await?; continue; } } @@ -278,6 +287,8 @@ pub enum RowManagerError { #[error(transparent)] FileManagerError(#[from] FileManagerError), #[error(transparent)] + FreeSpaceManagerError(#[from] FreeSpaceManagerError), + #[error(transparent)] LockCacheManagerError(#[from] LockCacheManagerError), #[error(transparent)] RowDataError(#[from] RowDataError),