Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
Have a free space manager now but I need to figure out locking in a s…
Browse files Browse the repository at this point in the history
…ane way.
  • Loading branch information
chotchki committed Aug 20, 2021
1 parent b05b44a commit 0edd6b6
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 18 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Implement Free Space Maps so mutation of data doesn't need a linear scan/parse n
Did more thinking, I should implement postgres's streams concept so that I don't need to do lookups to find associated metadata on an object.
I thought I was going to get to use Uuid+page offset. I think its now going to be uuid+page offset+ type.

struct PageId + enum PageType should do it
struct PageId + enum PageType should do it (done).

So I have a fully ready free space map but I can't avoid the locking issue anymore despite it also being the next item on the todo list.

**TODO**
Implement page level locks that are ordered to avoid deadlocking.
Expand Down
5 changes: 4 additions & 1 deletion src/engine/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ mod file_manager;
pub use file_manager::FileManager;
pub use file_manager::FileManagerError;

mod free_space_manager;
pub use free_space_manager::FreeSpaceManager;

mod lock_manager;
pub use lock_manager::LockManager;

mod page_formats;
pub mod page_formats;

pub mod row_formats;

Expand Down
6 changes: 3 additions & 3 deletions src/engine/io/file_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! since the prior approach was too lock heavy and I couldn't figure out an approach that didn't starve resources.
use super::page_formats::{PageId, PageOffset, UInt12, UInt12Error};
use async_stream::try_stream;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use futures::Stream;
use std::convert::TryFrom;
use std::ffi::OsString;
Expand Down Expand Up @@ -75,7 +75,7 @@ impl FileManager {
&self,
page_id: &PageId,
offset: &PageOffset,
) -> Result<Option<Bytes>, FileManagerError> {
) -> Result<Option<BytesMut>, FileManagerError> {
let (res_request, res_receiver) = oneshot::channel();

self.request_queue
Expand All @@ -87,7 +87,7 @@ impl FileManager {
pub fn get_stream(
&self,
page_id: &PageId,
) -> impl Stream<Item = Result<Option<Bytes>, FileManagerError>> {
) -> impl Stream<Item = Result<Option<BytesMut>, FileManagerError>> {
let request_queue = self.request_queue.clone();
let page_id = *page_id;

Expand Down
4 changes: 2 additions & 2 deletions src/engine/io/file_manager/file_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl FileOperations {
pub async fn read_chunk(
mut file: File,
page_offset: &PageOffset,
) -> Result<(File, Option<Bytes>), FileOperationsError> {
) -> Result<(File, Option<BytesMut>), FileOperationsError> {
let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);

let file_meta = file.metadata().await?;
Expand All @@ -89,7 +89,7 @@ impl FileOperations {
}
}

Ok((file, Some(buffer.freeze())))
Ok((file, Some(buffer)))
}

pub async fn update_chunk(
Expand Down
9 changes: 7 additions & 2 deletions src/engine/io/file_manager/request_type.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use crate::engine::io::page_formats::PageOffset;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use tokio::sync::oneshot::Sender;

use super::file_executor::FileExecutorError;

#[derive(Debug)]
pub enum RequestType {
Add((Bytes, Sender<Result<PageOffset, FileExecutorError>>)),
Read((PageOffset, Sender<Result<Option<Bytes>, FileExecutorError>>)),
Read(
(
PageOffset,
Sender<Result<Option<BytesMut>, FileExecutorError>>,
),
),
Update((PageOffset, Bytes, Sender<Result<(), FileExecutorError>>)),
}
215 changes: 214 additions & 1 deletion src/engine/io/free_space_manager.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,215 @@
//! This struct provides a lookup service to tell row_manager where / if there is free space to
//! add new tuples.
//! add new tuples. It is designed to be extremely space efficent since it only uses 1 bit per
//! page to say the space is availible. This means each page here can cover 134MB of free space.
use crate::constants::PAGE_SIZE;

use super::{
file_manager,
page_formats::{PageId, PageOffset, PageType},
FileManager, FileManagerError, LockManager,
};
use bytes::{Buf, Bytes, BytesMut};
use lru::LruCache;
use thiserror::Error;

const MAX_FREESPACE_COUNT: usize = 32;

pub struct FreeSpaceManager {
file_manager: FileManager,
freespace_cache: LruCache<(PageId, PageOffset), Bytes>,
lock_manager: LockManager,
}

impl FreeSpaceManager {
pub fn new(file_manager: FileManager, lock_manager: LockManager) -> FreeSpaceManager {
FreeSpaceManager {
file_manager,
freespace_cache: LruCache::new(MAX_FREESPACE_COUNT),
lock_manager,
}
}

pub async fn get_next_free_page(
&self,
page_id: PageId,
) -> Result<PageOffset, FreeSpaceManagerError> {
let mut offset = PageOffset(0);
let free_id = PageId {
resource_key: page_id.resource_key,
page_type: PageType::FreeSpaceMap,
};
loop {
let lock = self.lock_manager.get_lock(&free_id, &offset).await;
let lock_read = lock.read();
match self.file_manager.get_page(&free_id, &offset).await? {
Some(mut s) => match Self::find_first_free_page_in_page(&mut s) {
Some(s) => {
let full_offset =
PageOffset(s) + offset * PageOffset(PAGE_SIZE as usize) * PageOffset(8);
return Ok(full_offset);
}
None => {
offset += PageOffset(1);
continue;
}
},
None => {
let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
let new_page = vec![0; PAGE_SIZE as usize];
buffer.extend_from_slice(&new_page);

//TODO: So I have the read lock now for the add, need figure out how do this sanely in code
let new_page = self
.file_manager
.add_page(&page_id, buffer.freeze())
.await?;
let new_offset = new_page * PageOffset(PAGE_SIZE as usize) * PageOffset(8);
return Ok(new_offset);
}
}
}
}

pub async fn mark_page(
&self,
page_id: PageId,
po: PageOffset,
status: FreeStat,
) -> Result<(), FreeSpaceManagerError> {
let (po, offset) = po.get_bitmask_offset();
match self.file_manager.get_page(&page_id, &po).await? {
Some(mut page) => {
Self::set_status_inside_page(&mut page, offset, status);
self.file_manager
.update_page(&page_id, page.freeze(), &po)
.await?;
Ok(())
}
None => Err(FreeSpaceManagerError::PageDoesNotExist(page_id)),
}
}

fn find_first_free_page_in_page(buffer: &mut impl Buf) -> Option<usize> {
let mut i = 0;
while buffer.has_remaining() {
let mut val = buffer.get_u8();
if val == 0xFF {
i += 1;
continue;
}
for j in 0..8 {
if val & 0x1 == 0x0 {
return Some(i * 8 + j);
}
val >>= 1;
}
i += 1;
}
None
}

/// Gets the status of a field inside a page, you MUST pass an offset
/// that fits in the buffer.
fn get_status_inside_page(buffer: &BytesMut, offset: usize) -> FreeStat {
let offset_index = offset / 8;
let offset_subindex = offset % 8;

let offset_value = buffer[offset_index];
let bit_value = (offset_value >> offset_subindex) & 0x1;
if bit_value == 0 {
FreeStat::Free
} else {
FreeStat::InUse
}
}

/// Sets the status of a field inside a page, you MUST pass an offset
/// that fits in the buffer.
fn set_status_inside_page(buffer: &mut BytesMut, offset: usize, status: FreeStat) {
let offset_index = offset / 8;
let offset_subindex = offset % 8;

let current_value = buffer[offset_index];
let mut pre_load = 0x1 << offset_subindex;
let new_value;
match status {
FreeStat::Free => {
pre_load = !pre_load;
new_value = current_value & pre_load;
}
FreeStat::InUse => {
new_value = current_value | pre_load;
}
}

buffer[offset_index] = new_value;
}
}

#[derive(Copy, Clone, Debug, PartialEq)]
pub enum FreeStat {
Free = 0,
InUse = 1,
}

#[derive(Debug, Error)]
pub enum FreeSpaceManagerError {
#[error(transparent)]
FileManagerError(#[from] FileManagerError),
#[error("Page Offset {0} doesn't exist")]
PageDoesNotExist(PageId),
}

#[cfg(test)]
mod tests {
use bytes::BufMut;

use super::*;

///This test works by toggling each bit repeatedly and making sure it gives the correct result each time.
#[test]
fn test_get_and_set() -> Result<(), Box<dyn std::error::Error>> {
let mut test = BytesMut::with_capacity(2);
test.put_u16(0x0);

for i in 0..test.capacity() * 8 {
assert_eq!(
FreeSpaceManager::get_status_inside_page(&test, i),
FreeStat::Free
);
FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse);
assert_eq!(
FreeSpaceManager::get_status_inside_page(&test, i),
FreeStat::InUse
);
FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::Free);
assert_eq!(
FreeSpaceManager::get_status_inside_page(&test, i),
FreeStat::Free
);
}

Ok(())
}

#[test]
fn test_find_and_fill_pages() -> Result<(), Box<dyn std::error::Error>> {
let mut test = BytesMut::with_capacity(2);
test.put_u8(0x0);
test.put_u8(0x0);

for i in 0..test.len() * 8 {
let free_page = FreeSpaceManager::find_first_free_page_in_page(&mut test.clone());
assert_eq!(free_page, Some(i));

FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::InUse);
}
assert_eq!(
FreeSpaceManager::find_first_free_page_in_page(&mut test.clone()),
None
);

Ok(())
}
}
16 changes: 12 additions & 4 deletions src/engine/io/page_formats/page_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,25 @@ use std::{
use thiserror::Error;
use uuid::Uuid;

use crate::engine::io::file_manager::ResourceFormatter;

#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub struct PageId {
pub resource_key: Uuid,
pub page_type: PageType,
}

impl fmt::Display for PageId {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
writeln!(f, "{}", ResourceFormatter::format_uuid(&self.resource_key))?;
writeln!(f, "{}", self.page_type)
}
}

#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub enum PageType {
Data,
//FreeSpaceMap,
FreeSpaceMap,
//VisibilityMap
}

Expand All @@ -44,9 +53,8 @@ impl PageType {
impl Display for PageType {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
PageType::Data => {
write!(f, "data")
}
PageType::Data => write!(f, "data"),
PageType::FreeSpaceMap => write!(f, "fs"),
}
}
}
Expand Down
Loading

0 comments on commit 0edd6b6

Please sign in to comment.