Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
New lock and cache layer has been implemented for #17, need to hook i…
Browse files Browse the repository at this point in the history
…t up now.
  • Loading branch information
chotchki committed Aug 21, 2021
1 parent ebf78e1 commit 12f5690
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 12 deletions.
4 changes: 4 additions & 0 deletions src/engine/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub use file_manager::FileManagerError;
mod free_space_manager;
pub use free_space_manager::FreeSpaceManager;

mod lock_cache_manager;
pub use lock_cache_manager::LockCacheManager;
pub use lock_cache_manager::LockCacheManagerError;

mod lock_manager;
pub use lock_manager::LockManager;

Expand Down
5 changes: 3 additions & 2 deletions src/engine/io/file_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl FileManager {
Ok(res_receiver.await??)
}

//TODO I'm chewing on if this stream implementation is way too low level especially considering locking
pub fn get_stream(
&self,
page_id: &PageId,
Expand Down Expand Up @@ -111,8 +112,8 @@ impl FileManager {
pub async fn update_page(
&self,
page_id: &PageId,
page: Bytes,
offset: &PageOffset,
page: Bytes,
) -> Result<(), FileManagerError> {
let size = UInt12::try_from(page.len() - 1)?;
if size != UInt12::max() {
Expand Down Expand Up @@ -213,7 +214,7 @@ mod tests {
assert_eq!(test_page, test_page_get);

let test_page2 = get_test_page(2);
fm.update_page(&page_id, test_page2.clone(), &test_page_num)
fm.update_page(&page_id, &test_page_num, test_page2.clone())
.await?;

let test_page_get2 = timeout(Duration::new(10, 0), fm.get_page(&page_id, &test_page_num))
Expand Down
1 change: 0 additions & 1 deletion src/engine/io/file_manager/file_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl FileExecutor {
}
}

//Still haven't figured out incrementing file_handles
if file_handles_open < MAX_FILE_HANDLE_COUNT && !request_queue.is_empty() {
let mut new_request_queue = VecDeque::with_capacity(request_queue.len());
for (page_id, req_type) in request_queue.into_iter() {
Expand Down
2 changes: 1 addition & 1 deletion src/engine/io/free_space_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl FreeSpaceManager {
Some(mut page) => {
Self::set_status_inside_page(&mut page, offset, status);
self.file_manager
.update_page(&page_id, page.freeze(), &po)
.update_page(&page_id, &po, page.freeze())
.await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/engine/io/index_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl IndexManager {
if l.can_fit(&new_key) {
return Ok(self
.file_manager
.update_page(&page_id, l.serialize()?, &current_node.1)
.update_page(&page_id, &current_node.1, l.serialize()?)
.await?);
}

Expand Down
130 changes: 130 additions & 0 deletions src/engine/io/lock_cache_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use std::{
collections::{HashMap, VecDeque},
ptr::read,
sync::Arc,
};

use super::{
page_formats::{PageId, PageOffset},
FileManager, FileManagerError,
};
use bytes::{Bytes, BytesMut};
use lru::LruCache;
use thiserror::Error;
use tokio::sync::{Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};

//How do I support readers and writers? I want to use RwLocks but I need a way to stop unbounded
//Lrucache growth
pub struct LockCacheManager {
//TODO I don't like these massive single hashes protected with a single lock
// Long term I need to make a fixed hashmap and evict them myself.
// Holding on this since I might be able to work around it
cache: Arc<Mutex<LruCache<(PageId, PageOffset), Arc<RwLock<Option<BytesMut>>>>>>,
file_manager: FileManager,
}

impl LockCacheManager {
pub fn new(file_manager: FileManager) -> LockCacheManager {
LockCacheManager {
//The unbounded nature of the cache worries me. I think I'll have to manage its eviction carefully
cache: Arc::new(Mutex::new(LruCache::unbounded())),
file_manager,
}
}

pub async fn get_page(
&self,
page_id: PageId,
offset: PageOffset,
) -> Result<OwnedRwLockReadGuard<Option<BytesMut>>, LockCacheManagerError> {
Ok(self
.get_page_internal(page_id, offset)
.await?
.read_owned()
.await)
}

pub async fn get_page_for_update(
&self,
page_id: PageId,
offset: PageOffset,
) -> Result<OwnedRwLockWriteGuard<Option<BytesMut>>, LockCacheManagerError> {
Ok(self
.get_page_internal(page_id, offset)
.await?
.write_owned()
.await)
}

async fn get_page_internal(
&self,
page_id: PageId,
offset: PageOffset,
) -> Result<Arc<RwLock<Option<BytesMut>>>, LockCacheManagerError> {
let mut cache = self.cache.lock().await;
match cache.get(&(page_id, offset)) {
Some(s) => return Ok(s.clone()),
None => {
//Cache miss, let's make the RwLock and drop the mutex
let page_lock = Arc::new(RwLock::new(None));
let mut page_lock_write = page_lock.write().await;
cache.put((page_id, offset), page_lock.clone());
drop(cache);

//Now we can load the underlying page without blocking everyone
match self.file_manager.get_page(&page_id, &offset).await? {
Some(s) => {
page_lock_write.replace(s);
}
None => {}
};
drop(page_lock_write);

Ok(page_lock)
}
}
}

pub async fn update_page(
&self,
page_id: PageId,
offset: PageOffset,
guard: OwnedRwLockWriteGuard<Option<BytesMut>>,
) -> Result<(), LockCacheManagerError> {
let page = match guard.as_ref() {
Some(s) => s.clone(),
None => {
return Err(LockCacheManagerError::PageMissing());
}
};
Ok(self
.file_manager
.update_page(&page_id, &offset, page.freeze())
.await?)
}

//TODO Need to figure how to lock for add, without blocking everyone
//for now doing the naive implementation since I can just hold the mutex during the add
//would be easily solved if I disconnect offset generation from the I/O
pub async fn add_page(
&self,
page_id: PageId,
page: Bytes,
) -> Result<PageOffset, FileManagerError> {
let mut cache = self.cache.lock().await;
let po = self.file_manager.add_page(&page_id, page.clone()).await?;

let mut new_page = BytesMut::with_capacity(page.len());
new_page.copy_from_slice(&page.slice(0..page.len()));
cache.put((page_id, po), Arc::new(RwLock::new(Some(new_page))));
Ok(po)
}
}

#[derive(Debug, Error)]
pub enum LockCacheManagerError {
#[error(transparent)]
FileManagerError(#[from] FileManagerError),
#[error("Cannot update a page without contents")]
PageMissing(),
}
6 changes: 3 additions & 3 deletions src/engine/io/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl LockManager {

self.cleanup(&mut lm).await;

lock
lock.clone()
}

async fn cleanup(
Expand Down Expand Up @@ -111,7 +111,7 @@ impl LockManagerEntry {

match le.get(offset) {
Some(s) => match s.upgrade() {
Some(s1) => s1,
Some(s1) => s1.clone(),
None => {
//Weak failed, need to recreate
drop(le);
Expand Down Expand Up @@ -155,7 +155,7 @@ impl LockManagerEntry {
};
self.cleanup(&mut le);

lock
lock.clone()
}

fn cleanup(&self, le: &mut RwLockWriteGuard<HashMap<PageOffset, Weak<RwLock<u8>>>>) {
Expand Down
1 change: 0 additions & 1 deletion src/engine/io/page_cache_manager.rs

This file was deleted.

6 changes: 3 additions & 3 deletions src/engine/io/row_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl RowManager {
page_type: PageType::Data,
};
self.file_manager
.update_page(&page_id, page.serialize(), &row_pointer.page)
.update_page(&page_id, &row_pointer.page, page.serialize())
.await?;
Ok(())
}
Expand Down Expand Up @@ -107,7 +107,7 @@ impl RowManager {
page_type: PageType::Data,
};
self.file_manager
.update_page(&page_id, old_page.serialize(), &row_pointer.page)
.update_page(&page_id, &row_pointer.page, old_page.serialize())
.await?;

Ok(new_row_pointer)
Expand Down Expand Up @@ -192,7 +192,7 @@ impl RowManager {
let new_row_pointer = page.insert(current_tran_id, &table, user_data)?;
let new_page_bytes = page.serialize();
self.file_manager
.update_page(&page_id, new_page_bytes, &page_num)
.update_page(&page_id, &page_num, new_page_bytes)
.await?;
return Ok(new_row_pointer);
} else {
Expand Down

0 comments on commit 12f5690

Please sign in to comment.