Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
Switched the index manager over to the locking system.
Browse files Browse the repository at this point in the history
However the index code is NOT working yet.
  • Loading branch information
chotchki committed Aug 25, 2021
1 parent 0285fcd commit ea6ba39
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 73 deletions.
33 changes: 21 additions & 12 deletions src/engine/io/index_formats/btree_leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
objects::{types::BaseSqlTypesError, SqlTuple},
},
};
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{BufMut, BytesMut};
use std::{collections::BTreeMap, num::TryFromIntError};
use thiserror::Error;

Expand All @@ -27,6 +27,15 @@ pub struct BTreeLeaf {
}

impl BTreeLeaf {
pub fn new() -> BTreeLeaf {
BTreeLeaf {
parent_node: None,
left_node: None,
right_node: None,
nodes: BTreeMap::new(),
}
}

pub fn add(&mut self, key: SqlTuple, item_ptr: ItemIdData) -> Result<(), BTreeLeafError> {
if !self.can_fit(&key) {
return Err(BTreeLeafError::KeyTooLarge(key.encoded_size()));
Expand Down Expand Up @@ -75,22 +84,21 @@ impl BTreeLeaf {
new_size <= PAGE_SIZE as usize
}

pub fn serialize(&self) -> Result<Bytes, BTreeLeafError> {
let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
pub fn serialize(&self, buffer: &mut BytesMut) -> Result<(), BTreeLeafError> {
buffer.put_u8(NodeType::Leaf as u8);

BTreeNode::write_node(&mut buffer, self.parent_node)?;
BTreeNode::write_node(&mut buffer, self.left_node)?;
BTreeNode::write_node(&mut buffer, self.right_node)?;
BTreeNode::write_node(buffer, self.parent_node)?;
BTreeNode::write_node(buffer, self.left_node)?;
BTreeNode::write_node(buffer, self.right_node)?;

encode_size(&mut buffer, self.nodes.len());
encode_size(buffer, self.nodes.len());

for (key, iids) in self.nodes.iter() {
BTreeNode::write_sql_tuple(&mut buffer, key);
BTreeNode::write_sql_tuple(buffer, key);

encode_size(&mut buffer, iids.len());
encode_size(buffer, iids.len());
for iid in iids {
iid.serialize(&mut buffer);
iid.serialize(buffer);
}
}

Expand All @@ -100,7 +108,7 @@ impl BTreeLeaf {
buffer.extend_from_slice(&free_space);
}

Ok(buffer.freeze())
Ok(())
}
}

Expand Down Expand Up @@ -191,7 +199,8 @@ mod tests {
nodes,
};

let mut test_serial = test.clone().serialize()?;
let mut test_serial = BytesMut::with_capacity(PAGE_SIZE as usize);
test.serialize(&mut test_serial)?;
let test_parse = BTreeNode::parse(&mut test_serial, &get_index())?;

match test_parse {
Expand Down
157 changes: 103 additions & 54 deletions src/engine/io/index_manager.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
//! TODO #24 Fix the index implementation to use the locking layer
use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::Arc;

use super::index_formats::{BTreeLeafError, BTreeNode, BTreeNodeError};
use super::page_formats::PageOffset;
use super::page_formats::{ItemIdData, PageId, PageType};
use super::{FileManager, FileManagerError};
use super::{FileManager, FileManagerError, LockCacheManager, LockCacheManagerError};
use crate::engine::io::SelfEncodedSize;
use crate::{
constants::PAGE_SIZE,
Expand All @@ -16,21 +12,26 @@ use crate::{
objects::{Index, SqlTuple},
},
};
use bytes::BytesMut;
use bytes::{Buf, BufMut, BytesMut};
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::mem::size_of;
use std::num::TryFromIntError;
use std::ops::Range;
use std::sync::Arc;
use thiserror::Error;
use uuid::Uuid;

//TODO Support something other than btrees
//TODO Support searching on a non primary index column

#[derive(Clone, Debug)]
pub struct IndexManager {
file_manager: Arc<FileManager>,
lock_cache_manager: LockCacheManager,
}

impl IndexManager {
pub fn new(file_manager: Arc<FileManager>) -> IndexManager {
IndexManager { file_manager }
pub fn new(lock_cache_manager: LockCacheManager) -> IndexManager {
IndexManager { lock_cache_manager }
}

pub async fn add(
Expand Down Expand Up @@ -90,10 +91,11 @@ impl IndexManager {
};

if l.can_fit(&new_key) {
return Ok(self
.file_manager
.update_page(&page_id, &current_node.1, l.serialize()?)
.await?);

//return Ok(self
// .file_manager
// .update_page(&page_id, &current_node.1, l.serialize()?)
// .await?);
}

//If we're here, we have a key that doesn't fit into the leaf so we need to split it.
Expand Down Expand Up @@ -154,8 +156,11 @@ impl IndexManager {
page_type: PageType::Data,
};

match self.file_manager.get_page(&page_id, offset).await? {
Some(mut page) => Ok(BTreeNode::parse(&mut page, index_def)?),
let page_handle = self.lock_cache_manager.get_page(page_id, *offset).await?;
let page_buffer = page_handle.clone();

match page_buffer {
Some(page) => Ok(BTreeNode::parse(&mut page.freeze(), index_def)?),
None => Err(IndexManagerError::NoSuchNode(*offset)),
}
}
Expand All @@ -167,47 +172,87 @@ impl IndexManager {
&self,
index_def: &Index,
) -> Result<(BTreeNode, PageOffset), IndexManagerError> {
match self.get_node(index_def, &PageOffset(1)).await {
Ok(o) => Ok((o, PageOffset(1))),
Err(IndexManagerError::NoSuchNode(_)) => {
let page_id = PageId {
resource_key: index_def.id,
page_type: PageType::Data,
};

//Page zero with no data in it
self.make_root_page(&page_id).await?;

let root_node = BTreeLeaf {
parent_node: None,
left_node: None,
right_node: None,
nodes: BTreeMap::new(),
};

self.file_manager
.add_page(&page_id, &PageOffset(1), root_node.serialize()?)
.await?;

Ok((BTreeNode::Leaf(root_node), PageOffset(1)))
}
Err(e) => Err(e),
let page_id = PageId {
resource_key: index_def.id,
page_type: PageType::Data,
};
let first_page_handle = self
.lock_cache_manager
.get_page(page_id, PageOffset(0))
.await?;

if let Some(s) = first_page_handle.as_ref() {
let mut first_page = s.clone();
return self
.parse_root_page(index_def, &mut first_page, page_id)
.await;
}
}

async fn make_root_page(&self, index: &PageId) -> Result<(), IndexManagerError> {
let mut root_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
let root_page = vec![0; PAGE_SIZE as usize];
root_page_buffer.extend_from_slice(&root_page);
self.file_manager
.add_page(index, &PageOffset(0), root_page_buffer.freeze())
//We have to make it and handle the race window
drop(first_page_handle);

let mut new_first_page_handle = self
.lock_cache_manager
.get_page_for_update(page_id, PageOffset(0))
.await?;

//if page_num != PageOffset(0) {
// return Err(IndexManagerError::ConcurrentCreationError());
//}
if let Some(s) = new_first_page_handle.as_mut() {
return self.parse_root_page(index_def, s, page_id).await;
}

Ok(())
let root_offset = self.lock_cache_manager.get_offset(page_id).await?;

let mut new_page_buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
new_page_buffer.put_uint_le(u64::try_from(root_offset.0)?, size_of::<usize>());
let new_page = vec![0; PAGE_SIZE as usize - size_of::<usize>()];

new_page_buffer.extend_from_slice(&new_page);
new_first_page_handle.replace(new_page_buffer);
self.lock_cache_manager
.add_page(page_id, PageOffset(0), new_first_page_handle)
.await?;

//Now make the root node and save it
let mut root_handle = self
.lock_cache_manager
.get_page_for_update(page_id, root_offset)
.await?;
if let Some(s) = root_handle.as_mut() {
return self.parse_root_page(index_def, s, page_id).await;
}

let root_node = BTreeLeaf::new();

let mut root_buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
root_node.serialize(&mut root_buffer)?;
root_handle.replace(root_buffer);

self.lock_cache_manager
.update_page(page_id, root_offset, root_handle)
.await?;
return Ok((BTreeNode::Leaf(root_node), root_offset));
}

async fn parse_root_page(
&self,
index_def: &Index,
first_page: &mut BytesMut,
page_id: PageId,
) -> Result<(BTreeNode, PageOffset), IndexManagerError> {
let root_offset = usize::try_from(first_page.get_uint_le(size_of::<usize>()))?;
let root_handle = self
.lock_cache_manager
.get_page(page_id, PageOffset(root_offset))
.await?;
let mut root_page = root_handle
.as_ref()
.ok_or(IndexManagerError::RootNodeEmpty())?
.clone()
.freeze();
return Ok((
BTreeNode::parse(&mut root_page, index_def)?,
PageOffset(root_offset),
));
}
}

Expand All @@ -221,14 +266,18 @@ pub enum IndexManagerError {
"Another process made the root index page first, maybe the developer should make locking."
)]
ConcurrentCreationError(),
#[error(transparent)]
FileManagerError(#[from] FileManagerError),
#[error("Key too large size: {0}, maybe the developer should fix this.")]
KeyTooLarge(usize),
#[error(transparent)]
LockCacheManagerError(#[from] LockCacheManagerError),
#[error("Node does not exists {0}")]
NoSuchNode(PageOffset),
#[error("Root Node Empty")]
RootNodeEmpty(),
#[error("Unable to search, the stack is empty")]
StackEmpty(),
#[error(transparent)]
TryFromIntError(#[from] TryFromIntError),
#[error("Unable to split a node of size {0}")]
UnableToSplit(usize),
}
7 changes: 2 additions & 5 deletions src/engine/objects/attribute.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
//!Postgres Doc: https://www.postgresql.org/docs/current/catalog-pg-attribute.html
use std::fmt;

use crate::constants::Nullable;
use uuid::Uuid;

use super::types::BaseSqlTypesMapper;
use crate::constants::Nullable;
use std::fmt;

#[derive(Clone, Debug, PartialEq)]
pub struct Attribute {
Expand Down
2 changes: 1 addition & 1 deletion src/engine/objects/planned_statement.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use super::{types::SqlTypeDefinition, Attribute, SqlTuple, Table};
use super::{types::SqlTypeDefinition, SqlTuple, Table};

pub struct PlannedStatement {
pub common: PlannedCommon,
Expand Down
1 change: 0 additions & 1 deletion tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use feophantlib::engine::{transactions::TransactionManager, Engine};
use feophantlib::feophant::FeOphant;
use tempfile::TempDir;
use tokio::sync::oneshot;
Expand Down

0 comments on commit ea6ba39

Please sign in to comment.