Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
We're functional again minus the primary key constraint issue!!!
Browse files Browse the repository at this point in the history
  • Loading branch information
chotchki committed Oct 9, 2021
1 parent a4bea6f commit e18cfd1
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 42 deletions.
6 changes: 3 additions & 3 deletions src/engine/io/constraint_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ConstraintManager {
for c in &table.constraints {
match c {
crate::engine::objects::Constraint::PrimaryKey(p) => {
//So for a primary key we have to check for no other dups in the table
//TODO So for a primary key we have to check for no other dups in the table

//So what I want to do is ask the index manager to to get active rows matching the key
}
Expand All @@ -82,13 +82,13 @@ impl ConstraintManager {

Ok(self
.vis_row_man
.insert_row(current_tran_id, &table, user_data)
.insert_row(current_tran_id, table, user_data)
.await?)
}

/// Gets a specific tuple from below, at the moment just a passthrough
pub async fn get(
&self,
&mut self,
tran_id: TransactionId,
table: &Arc<Table>,
row_pointer: ItemPointer,
Expand Down
15 changes: 5 additions & 10 deletions src/engine/io/index_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use super::page_formats::{PageId, PageType};
use super::row_formats::ItemPointer;
use crate::engine::io::format_traits::Serializable;
use crate::engine::io::index_formats::BTreeBranch;
use crate::engine::io::SelfEncodedSize;
use crate::engine::objects::{Index, SqlTuple};
use std::num::TryFromIntError;
use std::sync::Arc;
Expand Down Expand Up @@ -77,8 +76,6 @@ impl IndexManager {
return Ok(());
}

debug!("expand offset {0}", page_offset);

//Lock the leafs left and right if they exist
let left_neighbor = leaf.left_node;
let left_page = match left_neighbor {
Expand Down Expand Up @@ -157,12 +154,12 @@ impl IndexManager {
return Ok(());
} else {
//Need to split the branch and move up a level
let (middle_key, new_right) =
b.add_and_split(new_left_offset, split_key, new_right_offset)?;

let (new_right_offset, new_right_guard) =
self.file_manager.get_next_offset_non_zero(&page_id).await?;

let (middle_key, new_right) =
b.add_and_split(new_left_offset, split_key, new_right_offset)?;

self.file_manager
.update_page(new_right_guard, new_right.serialize_and_pad())
.await?;
Expand Down Expand Up @@ -192,15 +189,13 @@ impl IndexManager {
page_type: PageType::Data,
};

let (mut first_page, first_guard) =
let (mut first_page, _first_guard) =
self.file_manager.get_page(&page_id, &PageOffset(0)).await?;
let first_node = BTreeFirstPage::parse(&mut first_page)?;

let mut current_offset = first_node.root_offset;
loop {
debug!("scan {0}", current_offset);

let (mut current_page, current_guard) = self
let (mut current_page, _current_guard) = self
.file_manager
.get_page(&page_id, &current_offset)
.await?;
Expand Down
12 changes: 12 additions & 0 deletions src/engine/io/page_formats/page_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ mod tests {

use super::*;

#[test]
fn sizes_match() -> Result<(), Box<dyn std::error::Error>> {
let mut test = PageHeader::new();
let calc_len = PageHeader::encoded_size();

let mut buffer = BytesMut::new();
test.serialize(&mut buffer);

assert_eq!(calc_len, buffer.freeze().len());
Ok(())
}

#[test]
fn test_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
let test = PageHeader::new();
Expand Down
31 changes: 14 additions & 17 deletions src/engine/io/row_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl RowManager {
}

pub async fn insert_row(
self,
&self,
current_tran_id: TransactionId,
table: &Arc<Table>,
user_data: SqlTuple,
Expand All @@ -45,7 +45,7 @@ impl RowManager {
//Note this is a logical delete
//TODO debating if this should respect the visibility map, probably yes just trying to limit the pain
pub async fn delete_row(
self,
&self,
current_tran_id: TransactionId,
table: &Arc<Table>,
row_pointer: ItemPointer,
Expand Down Expand Up @@ -179,15 +179,15 @@ impl RowManager {

// Provides an unfiltered view of the underlying table
pub fn get_stream(
self,
&self,
table: &Arc<Table>,
) -> impl Stream<Item = Result<RowData, RowManagerError>> {
let page_id = PageId {
resource_key: table.id,
page_type: PageType::Data,
};

let file_manager = self.file_manager;
let file_manager = self.file_manager.clone();
let table = table.clone();

try_stream! {
Expand Down Expand Up @@ -248,7 +248,7 @@ impl RowManager {
Err(_) => {
//We got here because we asked for an offset that didn't exist yet.
let (new_page_offset, new_page_guard) =
self.file_manager.get_next_offset_non_zero(&page_id).await?;
self.file_manager.get_next_offset(&page_id).await?;

let mut new_page = PageData::new(new_page_offset);
let new_row_pointer = new_page.insert(current_tran_id, table, user_data)?; //TODO Will NOT handle overly large rows
Expand Down Expand Up @@ -291,6 +291,12 @@ mod tests {
use crate::engine::get_row;
use crate::engine::get_table;
use futures::pin_mut;
use log::LevelFilter;
use simplelog::ColorChoice;
use simplelog::CombinedLogger;
use simplelog::Config;
use simplelog::TermLogger;
use simplelog::TerminalMode;
use tempfile::TempDir;
use tokio_stream::StreamExt;

Expand Down Expand Up @@ -349,19 +355,17 @@ mod tests {
let table = get_table();
let fm = Arc::new(FileManager2::new(tmp_dir.clone())?);
let fsm = FreeSpaceManager::new(fm.clone());
let rm = RowManager::new(fm, fsm);
let mut rm = RowManager::new(fm, fsm);

let tran_id = TransactionId::new(1);

let insert_pointer = rm
.clone()
.insert_row(tran_id, &table, get_row("test".to_string()))
.await?;

let tran_id_2 = TransactionId::new(3);

let update_pointer = rm
.clone()
.update_row(
tran_id_2,
&table,
Expand All @@ -372,19 +376,12 @@ mod tests {

//Now let's make sure the update took
pin_mut!(rm);
let result_rows: Vec<RowData> = rm
.clone()
.get_stream(&table)
.map(Result::unwrap)
.collect()
.await;
let result_rows: Vec<RowData> = rm.get_stream(&table).map(Result::unwrap).collect().await;
assert_eq!(result_rows.len(), 2);

let tran_id_3 = TransactionId::new(3);

rm.clone()
.delete_row(tran_id_3, &table, update_pointer)
.await?;
rm.delete_row(tran_id_3, &table, update_pointer).await?;

Ok(())
}
Expand Down
21 changes: 9 additions & 12 deletions src/engine/io/visible_row_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl VisibleRowManager {
}

pub async fn insert_row(
self,
&self,
current_tran_id: TransactionId,
table: &Arc<Table>,
user_data: SqlTuple,
Expand All @@ -45,14 +45,14 @@ impl VisibleRowManager {
}

pub async fn get(
&self,
&mut self,
tran_id: TransactionId,
table: &Arc<Table>,
row_pointer: ItemPointer,
) -> Result<RowData, VisibleRowManagerError> {
let row = self.row_manager.get(table, row_pointer).await?;

if VisibleRowManager::is_visible(self.tran_manager.clone(), tran_id, &row).await? {
if VisibleRowManager::is_visible(&mut self.tran_manager, tran_id, &row).await? {
Ok(row)
} else {
Err(VisibleRowManagerError::NotVisibleRow(row))
Expand All @@ -61,30 +61,27 @@ impl VisibleRowManager {

// Provides a filtered view that respects transaction visability
pub fn get_stream(
self,
&self,
tran_id: TransactionId,
table: &Arc<Table>,
) -> impl Stream<Item = Result<RowData, VisibleRowManagerError>> {
let rm = self.row_manager.clone();
let mut tm = self.tran_manager.clone();
let table = table.clone();

try_stream! {
let tm = self.tran_manager;

for await row in self.row_manager.get_stream(&table) {
for await row in rm.get_stream(&table) {
let unwrap_row = row?;
if VisibleRowManager::is_visible(tm.clone(), tran_id, &unwrap_row).await? {
debug!("Found visible row {:?}", unwrap_row);
if VisibleRowManager::is_visible(&mut tm, tran_id, &unwrap_row).await? {
yield unwrap_row;
} else {
debug!("Found not visible row {:?}", unwrap_row);
}
}
}
}

//TODO I want to find a way to NOT depend on tm
async fn is_visible(
mut tm: TransactionManager,
tm: &mut TransactionManager,
tran_id: TransactionId,
row_data: &RowData,
) -> Result<bool, VisibleRowManagerError> {
Expand Down

0 comments on commit e18cfd1

Please sign in to comment.