Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
Implemented #16
Browse files Browse the repository at this point in the history
  • Loading branch information
chotchki committed Aug 23, 2021
1 parent a53cf69 commit 759b978
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 20 deletions.
51 changes: 38 additions & 13 deletions src/engine/io/free_space_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,24 @@
//! add new tuples. It is designed to be extremely space efficent since it only uses 1 bit per
//! page to say the space is availible. This means each page here can cover 134MB of free space.
use crate::constants::PAGE_SIZE;

use super::{
file_manager,
page_formats::{PageId, PageOffset, PageType},
FileManager, FileManagerError, LockCacheManager, LockCacheManagerError, LockManager,
LockCacheManager, LockCacheManagerError,
};
use bytes::{Buf, Bytes, BytesMut};
use lru::LruCache;
use crate::constants::PAGE_SIZE;
use bytes::{Buf, BytesMut};
use thiserror::Error;

const MAX_FREESPACE_COUNT: usize = 32;

#[derive(Clone, Debug)]
pub struct FreeSpaceManager {
freespace_cache: LruCache<(PageId, PageOffset), Bytes>,
lock_cache_manager: LockCacheManager,
}

impl FreeSpaceManager {
pub fn new(lock_cache_manager: LockCacheManager) -> FreeSpaceManager {
FreeSpaceManager {
freespace_cache: LruCache::new(MAX_FREESPACE_COUNT),
lock_cache_manager,
}
FreeSpaceManager { lock_cache_manager }
}

pub async fn get_next_free_page(
Expand All @@ -38,7 +32,7 @@ impl FreeSpaceManager {
page_type: PageType::FreeSpaceMap,
};
loop {
let mut page_handle = self.lock_cache_manager.get_page(free_id, offset).await?;
let page_handle = self.lock_cache_manager.get_page(free_id, offset).await?;
match page_handle.as_ref() {
Some(s) => {
let mut page_frozen = s.clone().freeze();
Expand All @@ -55,10 +49,10 @@ impl FreeSpaceManager {
}
}
None => {
drop(page_handle);
//Get the next offset, BUT since there could be a gap, we're going to blindly write all free
//and loop to get it again.
let next_po = self.lock_cache_manager.get_offset(free_id).await?;

let mut new_page_handle = self
.lock_cache_manager
.get_page_for_update(free_id, next_po)
Expand Down Expand Up @@ -178,7 +172,13 @@ pub enum FreeSpaceManagerError {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use bytes::BufMut;
use tempfile::TempDir;
use uuid::Uuid;

use crate::engine::io::FileManager;

use super::*;

Expand Down Expand Up @@ -227,4 +227,29 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_get_next() -> Result<(), Box<dyn std::error::Error>> {
let tmp = TempDir::new()?;
let tmp_dir = tmp.path().as_os_str().to_os_string();

let fm = Arc::new(FileManager::new(tmp_dir)?);
let lm = LockCacheManager::new(fm);
let fsm = FreeSpaceManager::new(lm);

let page_id = PageId {
resource_key: Uuid::new_v4(),
page_type: PageType::Data,
};

let first_free = fsm.get_next_free_page(page_id).await?;
assert_eq!(first_free, PageOffset(0));

fsm.mark_page(page_id, first_free, FreeStat::InUse).await?;

let second_free = fsm.get_next_free_page(page_id).await?;
assert_eq!(second_free, PageOffset(1));

Ok(())
}
}
80 changes: 80 additions & 0 deletions src/engine/io/lock_cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,83 @@ pub enum LockCacheManagerError {
#[error("Cannot update a page without contents")]
PageMissing(),
}

#[cfg(test)]
mod tests {
use tempfile::TempDir;
use uuid::Uuid;

use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType};

use super::*;

fn get_test_page(fill: u8) -> BytesMut {
let mut test_page = BytesMut::with_capacity(PAGE_SIZE as usize);
let free_space = vec![fill; PAGE_SIZE as usize];
test_page.extend_from_slice(&free_space);
test_page
}

#[tokio::test]
async fn test_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
let tmp = TempDir::new()?;
let tmp_dir = tmp.path().as_os_str().to_os_string();

let fm = Arc::new(FileManager::new(tmp_dir)?);
let lm = LockCacheManager::new(fm);

let page_id = PageId {
resource_key: Uuid::new_v4(),
page_type: PageType::Data,
};

let first_offset = lm.get_offset(page_id).await?;
assert_eq!(first_offset, PageOffset(0));

let first_handle = lm.get_page(page_id, first_offset).await?;
assert_eq!(first_handle.as_ref(), None);
drop(first_handle);

let mut second_handle = lm.get_page_for_update(page_id, first_offset).await?;
assert_eq!(second_handle.as_ref(), None);

let page = get_test_page(1);
second_handle.replace(page);

lm.update_page(page_id, first_offset, second_handle).await?;

let third_handle = lm.get_page(page_id, first_offset).await?;
let page2 = get_test_page(1);
assert_eq!(third_handle.as_ref(), Some(&page2));

let fourth_offset = lm.get_offset(page_id).await?;
assert_eq!(fourth_offset, PageOffset(1));

let mut fourth_handle = lm.get_page_for_update(page_id, fourth_offset).await?;
assert_eq!(fourth_handle.as_ref(), None);

let page3 = get_test_page(2);
fourth_handle.replace(page3);
lm.add_page(page_id, fourth_offset, fourth_handle).await?;

let mut fifth_handle = lm.get_page_for_update(page_id, fourth_offset).await?;
let fifth_page = fifth_handle
.as_mut()
.ok_or(LockCacheManagerError::PageMissing())?;
fifth_page.clear();

let page4 = get_test_page(3);
fifth_page.extend_from_slice(&page4[0..page4.len()]);
lm.update_page(page_id, fourth_offset, fifth_handle).await?;

let mut sixth_handle = lm.get_page_for_update(page_id, fourth_offset).await?;
let sixth_page = sixth_handle
.as_mut()
.ok_or(LockCacheManagerError::PageMissing())?;

let test_page = get_test_page(3);
assert_eq!(sixth_page, &test_page);

Ok(())
}
}
25 changes: 18 additions & 7 deletions src/engine/io/row_manager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use super::super::objects::Table;
use super::super::transactions::TransactionId;
use super::free_space_manager::{FreeSpaceManagerError, FreeStat};
use super::page_formats::{PageData, PageDataError, PageId, PageOffset, PageType, UInt12};
use super::row_formats::{ItemPointer, RowData, RowDataError};
use super::{EncodedSize, FileManagerError, LockCacheManager, LockCacheManagerError};
use super::{
EncodedSize, FileManagerError, FreeSpaceManager, LockCacheManager, LockCacheManagerError,
};
use crate::constants::PAGE_SIZE;
use crate::engine::objects::SqlTuple;
use async_stream::try_stream;
Expand All @@ -16,12 +19,16 @@ use thiserror::Error;
/// It operates at the lowest level, no visibility checks are done.
#[derive(Clone, Debug)]
pub struct RowManager {
free_space_manager: FreeSpaceManager,
lock_cache_manager: LockCacheManager,
}

impl RowManager {
pub fn new(lock_cache_manager: LockCacheManager) -> RowManager {
RowManager { lock_cache_manager }
RowManager {
free_space_manager: FreeSpaceManager::new(lock_cache_manager.clone()),
lock_cache_manager,
}
}

pub async fn insert_row(
Expand Down Expand Up @@ -221,25 +228,27 @@ impl RowManager {
page_type: PageType::Data,
};

let mut page_num = PageOffset(0);
loop {
let next_free_page = self.free_space_manager.get_next_free_page(page_id).await?;
let mut page_bytes = self
.lock_cache_manager
.get_page_for_update(page_id, page_num)
.get_page_for_update(page_id, next_free_page)
.await?;
match page_bytes.as_mut() {
Some(p) => {
let mut page = PageData::parse(table.clone(), page_num, p)?;
let mut page = PageData::parse(table.clone(), next_free_page, p)?;
if page.can_fit(RowData::encoded_size(&user_data)) {
p.clear(); //We're going to reuse the buffer
let new_row_pointer = page.insert(current_tran_id, &table, user_data)?;
page.serialize(p);
self.lock_cache_manager
.update_page(page_id, page_num, page_bytes)
.update_page(page_id, next_free_page, page_bytes)
.await?;
return Ok(new_row_pointer);
} else {
page_num += PageOffset(1);
self.free_space_manager
.mark_page(page_id, next_free_page, FreeStat::InUse)
.await?;
continue;
}
}
Expand Down Expand Up @@ -278,6 +287,8 @@ pub enum RowManagerError {
#[error(transparent)]
FileManagerError(#[from] FileManagerError),
#[error(transparent)]
FreeSpaceManagerError(#[from] FreeSpaceManagerError),
#[error(transparent)]
LockCacheManagerError(#[from] LockCacheManagerError),
#[error(transparent)]
RowDataError(#[from] RowDataError),
Expand Down

0 comments on commit 759b978

Please sign in to comment.