Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
Fixed #17 and #23 by implementing a caching/locking layer.
Browse files Browse the repository at this point in the history
  • Loading branch information
chotchki committed Aug 22, 2021
1 parent 12f5690 commit a53cf69
Show file tree
Hide file tree
Showing 15 changed files with 306 additions and 259 deletions.
5 changes: 3 additions & 2 deletions benches/feophant_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use criterion::{criterion_group, criterion_main};
use feophantlib::constants::Nullable;
use feophantlib::engine::io::row_formats::RowData;
use feophantlib::engine::io::FileManager;
use feophantlib::engine::io::LockCacheManager;
use feophantlib::engine::io::LockManager;
use feophantlib::engine::io::RowManager;
use feophantlib::engine::objects::types::BaseSqlTypes;
Expand Down Expand Up @@ -63,7 +64,7 @@ async fn row_manager_mass_insert(row_count: usize) -> Result<(), Box<dyn std::er

let table = get_table();
let fm = Arc::new(FileManager::new(tmp_dir.clone())?);
let rm = RowManager::new(fm, LockManager::new());
let rm = RowManager::new(LockCacheManager::new(fm));

let tran_id = TransactionId::new(1);

Expand All @@ -77,7 +78,7 @@ async fn row_manager_mass_insert(row_count: usize) -> Result<(), Box<dyn std::er

//Now let's make sure they're really in the table, persisting across restarts
let fm = Arc::new(FileManager::new(tmp_dir)?);
let rm = RowManager::new(fm, LockManager::new());
let rm = RowManager::new(LockCacheManager::new(fm));

pin_mut!(rm);
let result_rows: Vec<RowData> = rm
Expand Down
9 changes: 5 additions & 4 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use transactions::{TransactionId, TransactionManager};

use self::io::ConstraintManager;
use self::io::FileManager;
use self::io::LockManager;
use self::io::LockCacheManager;
use self::objects::QueryResult;
use std::ops::Deref;
use std::sync::Arc;
Expand All @@ -44,9 +44,10 @@ pub struct Engine {

impl Engine {
pub fn new(file_manager: Arc<FileManager>, tran_manager: TransactionManager) -> Engine {
let lock_man = LockManager::new();
let vis_row_man =
VisibleRowManager::new(RowManager::new(file_manager, lock_man), tran_manager);
let vis_row_man = VisibleRowManager::new(
RowManager::new(LockCacheManager::new(file_manager)),
tran_manager,
);
let con_man = ConstraintManager::new(vis_row_man.clone());
Engine {
analyzer: Analyzer::new(vis_row_man),
Expand Down
8 changes: 4 additions & 4 deletions src/engine/analyzer/definition_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pub enum DefinitionLookupError {
mod tests {
use tempfile::TempDir;

use crate::engine::io::{FileManager, LockManager};
use crate::engine::io::{FileManager, LockCacheManager};

// Note this useful idiom: importing names from outer (for mod tests) scope.
use super::super::super::io::RowManager;
Expand All @@ -205,7 +205,7 @@ mod tests {

let fm = Arc::new(FileManager::new(tmp_dir)?);
let tm = TransactionManager::new();
let rm = RowManager::new(fm, LockManager::new());
let rm = RowManager::new(LockCacheManager::new(fm));
let vm = VisibleRowManager::new(rm, tm);
let dl = DefinitionLookup::new(vm);

Expand All @@ -224,7 +224,7 @@ mod tests {

let fm = Arc::new(FileManager::new(tmp_dir)?);
let tm = TransactionManager::new();
let rm = RowManager::new(fm, LockManager::new());
let rm = RowManager::new(LockCacheManager::new(fm));
let vm = VisibleRowManager::new(rm, tm);
let dl = DefinitionLookup::new(vm);

Expand All @@ -248,7 +248,7 @@ mod tests {

let fm = Arc::new(FileManager::new(tmp_dir)?);
let mut tm = TransactionManager::new();
let rm = RowManager::new(fm.clone(), LockManager::new());
let rm = RowManager::new(LockCacheManager::new(fm.clone()));
let vm = VisibleRowManager::new(rm.clone(), tm.clone());
let dl = DefinitionLookup::new(vm);
let mut engine = Engine::new(fm, tm.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/engine/io/constraint_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl ConstraintManager {
tran_id: TransactionId,
table: Arc<Table>,
row_pointer: ItemPointer,
) -> Result<(PageData, RowData), ConstraintManagerError> {
) -> Result<RowData, ConstraintManagerError> {
Ok(self.vis_row_man.get(tran_id, table, row_pointer).await?)
}

Expand Down
70 changes: 24 additions & 46 deletions src/engine/io/file_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,21 @@ impl FileManager {
Ok(rev_shutdown.await?)
}

pub async fn get_offset(&self, page_id: &PageId) -> Result<PageOffset, FileManagerError> {
let (res_request, res_receiver) = oneshot::channel();

self.request_queue
.send((*page_id, RequestType::GetOffset(res_request)))?;

Ok(res_receiver.await??)
}

pub async fn add_page(
&self,
page_id: &PageId,
offset: &PageOffset,
page: Bytes,
) -> Result<PageOffset, FileManagerError> {
) -> Result<(), FileManagerError> {
let size = UInt12::try_from(page.len() - 1)?;
if size != UInt12::max() {
return Err(FileManagerError::InvalidPageSize(page.len()));
Expand All @@ -66,7 +76,7 @@ impl FileManager {
let (res_request, res_receiver) = oneshot::channel();

self.request_queue
.send((*page_id, RequestType::Add((page, res_request))))?;
.send((*page_id, RequestType::Add((*offset, page, res_request))))?;

Ok(res_receiver.await??)
}
Expand All @@ -84,31 +94,6 @@ impl FileManager {
Ok(res_receiver.await??)
}

//TODO I'm chewing on if this stream implementation is way too low level especially considering locking
pub fn get_stream(
&self,
page_id: &PageId,
) -> impl Stream<Item = Result<Option<BytesMut>, FileManagerError>> {
let request_queue = self.request_queue.clone();
let page_id = *page_id;

try_stream! {
let mut page_num = PageOffset(0);
loop {
let (res_request, res_receiver) = oneshot::channel();

request_queue
.send((page_id, RequestType::Read((page_num, res_request))))?;

let page = res_receiver.await??;

yield page;

page_num += PageOffset(1);
}
}
}

pub async fn update_page(
&self,
page_id: &PageId,
Expand All @@ -134,7 +119,7 @@ impl Drop for FileManager {
if !self.request_queue.is_closed() {
return;
}
error!("File Manager wasn't shutdown cleanly!");
error!("File Manager wasn't shutdown cleanly! This is a bug, please report!");
}
}

Expand Down Expand Up @@ -172,11 +157,8 @@ pub enum FileManagerError {

#[cfg(test)]
mod tests {
use std::time::Duration;

use bytes::BytesMut;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;

use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType};
Expand All @@ -203,37 +185,33 @@ mod tests {
};

let test_page = get_test_page(1);
let test_page_num = fm.add_page(&page_id, test_page.clone()).await?;
let test_po = fm.get_offset(&page_id).await?;
fm.add_page(&page_id, &test_po, test_page.clone()).await?;

assert_eq!(test_page_num, PageOffset(0));
assert_eq!(test_po, PageOffset(0));

let test_page_get = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num))
.await??
.unwrap();
let test_page_get = fm.get_page(&page_id, &test_po).await?.unwrap();

assert_eq!(test_page, test_page_get);

let test_page2 = get_test_page(2);
fm.update_page(&page_id, &test_page_num, test_page2.clone())
fm.update_page(&page_id, &test_po, test_page2.clone())
.await?;

let test_page_get2 = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num))
.await??
.unwrap();
let test_page_get2 = fm.get_page(&page_id, &test_po).await?.unwrap();

assert_eq!(test_page2, test_page_get2);

fm.shutdown().await.unwrap();

let fm2 = FileManager::new(tmp_dir.as_os_str().to_os_string())?;
let test_page3 = get_test_page(3);
let test_page_num3 = fm2.add_page(&page_id, test_page3.clone()).await?;
println!("{0}", test_page_num3);
assert!(test_page_num3 > test_page_num);
let test_po3 = fm2.get_offset(&page_id).await?;
fm2.add_page(&page_id, &test_po3, test_page3.clone())
.await?;
assert!(test_po3 > test_po);

let test_page_get2 = timeout(Duration::new(10, 0), fm2.get_page(&page_id, &test_page_num))
.await??
.unwrap();
let test_page_get2 = fm2.get_page(&page_id, &test_po).await?.unwrap();

assert_eq!(test_page2, test_page_get2);

Expand Down
Loading

0 comments on commit a53cf69

Please sign in to comment.