Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
Implemented type options per resource so I can implement free space maps
Browse files Browse the repository at this point in the history
  • Loading branch information
chotchki committed Aug 16, 2021
1 parent ece5a29 commit c700af4
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 169 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ Benchmark to aid in profiling

## Current TODO List - Subject to constant change!

**TODO**
Implement Free Space Maps so mutation of data doesn't need a linear scan/parse non stop.

Did more thinking, I should implement postgres's streams concept so that I don't need to do lookups to find associated metadata on an object.
I thought I was going to get to use Uuid+page offset. I think its now going to be uuid+page offset+ type.

struct PageId + enum PageType should do it

**TODO**
Implement page level locks that are ordered to avoid deadlocking.

Expand Down
5 changes: 3 additions & 2 deletions benches/feophant_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use criterion::{criterion_group, criterion_main};
use feophantlib::constants::Nullable;
use feophantlib::engine::io::row_formats::RowData;
use feophantlib::engine::io::FileManager;
use feophantlib::engine::io::LockManager;
use feophantlib::engine::io::RowManager;
use feophantlib::engine::objects::types::BaseSqlTypes;
use feophantlib::engine::objects::types::BaseSqlTypesMapper;
Expand Down Expand Up @@ -62,7 +63,7 @@ async fn row_manager_mass_insert(row_count: usize) -> Result<(), Box<dyn std::er

let table = get_table();
let fm = Arc::new(FileManager::new(tmp_dir.clone())?);
let rm = RowManager::new(fm);
let rm = RowManager::new(fm, LockManager::new());

let tran_id = TransactionId::new(1);

Expand All @@ -76,7 +77,7 @@ async fn row_manager_mass_insert(row_count: usize) -> Result<(), Box<dyn std::er

//Now let's make sure they're really in the table, persisting across restarts
let fm = Arc::new(FileManager::new(tmp_dir)?);
let rm = RowManager::new(fm);
let rm = RowManager::new(fm, LockManager::new());

pin_mut!(rm);
let result_rows: Vec<RowData> = rm
Expand Down
5 changes: 4 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use transactions::{TransactionId, TransactionManager};

use self::io::ConstraintManager;
use self::io::FileManager;
use self::io::LockManager;
use self::objects::QueryResult;
use std::ops::Deref;
use std::sync::Arc;
Expand All @@ -43,7 +44,9 @@ pub struct Engine {

impl Engine {
pub fn new(file_manager: Arc<FileManager>, tran_manager: TransactionManager) -> Engine {
let vis_row_man = VisibleRowManager::new(RowManager::new(file_manager), tran_manager);
let lock_man = LockManager::new();
let vis_row_man =
VisibleRowManager::new(RowManager::new(file_manager, lock_man), tran_manager);
let con_man = ConstraintManager::new(vis_row_man.clone());
Engine {
analyzer: Analyzer::new(vis_row_man),
Expand Down
8 changes: 4 additions & 4 deletions src/engine/analyzer/definition_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pub enum DefinitionLookupError {
mod tests {
use tempfile::TempDir;

use crate::engine::io::FileManager;
use crate::engine::io::{FileManager, LockManager};

// Note this useful idiom: importing names from outer (for mod tests) scope.
use super::super::super::io::RowManager;
Expand All @@ -205,7 +205,7 @@ mod tests {

let fm = Arc::new(FileManager::new(tmp_dir)?);
let tm = TransactionManager::new();
let rm = RowManager::new(fm);
let rm = RowManager::new(fm, LockManager::new());
let vm = VisibleRowManager::new(rm, tm);
let dl = DefinitionLookup::new(vm);

Expand All @@ -224,7 +224,7 @@ mod tests {

let fm = Arc::new(FileManager::new(tmp_dir)?);
let tm = TransactionManager::new();
let rm = RowManager::new(fm);
let rm = RowManager::new(fm, LockManager::new());
let vm = VisibleRowManager::new(rm, tm);
let dl = DefinitionLookup::new(vm);

Expand All @@ -248,7 +248,7 @@ mod tests {

let fm = Arc::new(FileManager::new(tmp_dir)?);
let mut tm = TransactionManager::new();
let rm = RowManager::new(fm.clone());
let rm = RowManager::new(fm.clone(), LockManager::new());
let vm = VisibleRowManager::new(rm.clone(), tm.clone());
let dl = DefinitionLookup::new(vm);
let mut engine = Engine::new(fm, tm.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/engine/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use file_manager::FileManager;
pub use file_manager::FileManagerError;

mod lock_manager;
//pub use lock_manager::LockManager;
pub use lock_manager::LockManager;

mod page_formats;

Expand Down
82 changes: 34 additions & 48 deletions src/engine/io/file_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! This is a different approach than I had done before. This file manager runs its own loop based on a spawned task
//! since the prior approach was too lock heavy and I couldn't figure out an approach that didn't starve resources.
use super::page_formats::{PageOffset, UInt12, UInt12Error};
use super::page_formats::{PageId, PageOffset, UInt12, UInt12Error};
use async_stream::try_stream;
use bytes::Bytes;
use futures::Stream;
Expand All @@ -12,7 +12,6 @@ use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::oneshot::{self, Sender};
use uuid::Uuid;

//Inner Types
mod file_executor;
Expand All @@ -26,7 +25,7 @@ pub use resource_formatter::ResourceFormatter;

#[derive(Clone, Debug)]
pub struct FileManager {
request_queue: UnboundedSender<(Uuid, RequestType)>,
request_queue: UnboundedSender<(PageId, RequestType)>,
request_shutdown: UnboundedSender<Sender<()>>,
}

Expand Down Expand Up @@ -56,7 +55,7 @@ impl FileManager {

pub async fn add_page(
&self,
resource_key: &Uuid,
page_id: &PageId,
page: Bytes,
) -> Result<PageOffset, FileManagerError> {
let size = UInt12::try_from(page.len() - 1)?;
Expand All @@ -67,38 +66,38 @@ impl FileManager {
let (res_request, res_receiver) = oneshot::channel();

self.request_queue
.send((*resource_key, RequestType::Add((page, res_request))))?;
.send((*page_id, RequestType::Add((page, res_request))))?;

Ok(res_receiver.await??)
}

pub async fn get_page(
&self,
resource_key: &Uuid,
page_id: &PageId,
offset: &PageOffset,
) -> Result<Option<Bytes>, FileManagerError> {
let (res_request, res_receiver) = oneshot::channel();

self.request_queue
.send((*resource_key, RequestType::Read((*offset, res_request))))?;
.send((*page_id, RequestType::Read((*offset, res_request))))?;

Ok(res_receiver.await??)
}

pub fn get_stream(
&self,
resource_key: &Uuid,
page_id: &PageId,
) -> impl Stream<Item = Result<Option<Bytes>, FileManagerError>> {
let request_queue = self.request_queue.clone();
let resource_key = *resource_key;
let page_id = *page_id;

try_stream! {
let mut page_num = PageOffset(0);
loop {
let (res_request, res_receiver) = oneshot::channel();

request_queue
.send((resource_key, RequestType::Read((page_num, res_request))))?;
.send((page_id, RequestType::Read((page_num, res_request))))?;

let page = res_receiver.await??;

Expand All @@ -111,7 +110,7 @@ impl FileManager {

pub async fn update_page(
&self,
resource_key: &Uuid,
page_id: &PageId,
page: Bytes,
offset: &PageOffset,
) -> Result<(), FileManagerError> {
Expand All @@ -122,10 +121,8 @@ impl FileManager {

let (res_request, res_receiver) = oneshot::channel();

self.request_queue.send((
*resource_key,
RequestType::Update((*offset, page, res_request)),
))?;
self.request_queue
.send((*page_id, RequestType::Update((*offset, page, res_request))))?;

Ok(res_receiver.await??)
}
Expand Down Expand Up @@ -159,7 +156,7 @@ pub enum FileManagerError {
#[error(transparent)]
RecvError(#[from] RecvError),
#[error(transparent)]
SendError(#[from] SendError<(Uuid, RequestType)>),
SendError(#[from] SendError<(PageId, RequestType)>),
#[error(transparent)]
ShutdownSendError(#[from] SendError<Sender<()>>),
#[error(transparent)]
Expand All @@ -179,8 +176,9 @@ mod tests {
use bytes::BytesMut;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;

use crate::constants::PAGE_SIZE;
use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType};

use super::*;

Expand All @@ -198,55 +196,43 @@ mod tests {

let fm = FileManager::new(tmp_dir.as_os_str().to_os_string())?;

let test_uuid = Uuid::new_v4();
let page_id = PageId {
resource_key: Uuid::new_v4(),
page_type: PageType::Data,
};

let test_page = get_test_page(1);
let test_page_num = timeout(
Duration::new(10, 0),
fm.add_page(&test_uuid, test_page.clone()),
)
.await??;
let test_page_num = fm.add_page(&page_id, test_page.clone()).await?;

assert_eq!(test_page_num, PageOffset(0));

let test_page_get = timeout(
Duration::new(10, 0),
fm.get_page(&test_uuid, &test_page_num),
)
.await??
.unwrap();
let test_page_get = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num))
.await??
.unwrap();

assert_eq!(test_page, test_page_get);

let test_page2 = get_test_page(2);
timeout(
Duration::new(10, 0),
fm.update_page(&test_uuid, test_page2.clone(), &test_page_num),
)
.await??;

let test_page_get2 = timeout(
Duration::new(10, 0),
fm.get_page(&test_uuid, &test_page_num),
)
.await??
.unwrap();
fm.update_page(&page_id, test_page2.clone(), &test_page_num)
.await?;

let test_page_get2 = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num))
.await??
.unwrap();

assert_eq!(test_page2, test_page_get2);

fm.shutdown().await.unwrap();

let fm2 = FileManager::new(tmp_dir.as_os_str().to_os_string())?;
let test_page3 = get_test_page(3);
let test_page_num3 = fm2.add_page(&test_uuid, test_page3.clone()).await?;
let test_page_num3 = fm2.add_page(&page_id, test_page3.clone()).await?;
println!("{0}", test_page_num3);
assert!(test_page_num3 > test_page_num);

let test_page_get2 = timeout(
Duration::new(10, 0),
fm2.get_page(&test_uuid, &test_page_num),
)
.await??
.unwrap();
let test_page_get2 = timeout(Duration::new(10, 0), fm2.get_page(&page_id, &test_page_num))
.await??
.unwrap();

assert_eq!(test_page2, test_page_get2);

Expand Down
Loading

0 comments on commit c700af4

Please sign in to comment.