diff --git a/optd-persistent/init.db b/optd-persistent/init.db deleted file mode 100644 index fe10513..0000000 Binary files a/optd-persistent/init.db and /dev/null differ diff --git a/optd-persistent/src/cost_model/interface.rs b/optd-persistent/src/cost_model/interface.rs index b4e27c7..d8dfcdf 100644 --- a/optd-persistent/src/cost_model/interface.rs +++ b/optd-persistent/src/cost_model/interface.rs @@ -42,6 +42,14 @@ pub enum StatType { Max, } +#[derive(PartialEq)] +pub enum EpochOption { + // TODO(lanlou): Could I make i32 -> EpochId? + Existed(i32), + New(String, String), +} + +#[derive(Clone)] pub struct Stat { pub stat_type: i32, pub stat_value: Json, @@ -72,7 +80,11 @@ pub trait CostModelStorageLayer { epoch_id: Self::EpochId, ) -> StorageResult<()>; - async fn update_stats(&self, stat: Stat, epoch_id: Self::EpochId) -> StorageResult<()>; + async fn update_stats( + &mut self, + stat: Stat, + epoch_option: EpochOption, + ) -> StorageResult>; async fn store_cost( &self, diff --git a/optd-persistent/src/cost_model/orm.rs b/optd-persistent/src/cost_model/orm.rs index c43c809..0c47117 100644 --- a/optd-persistent/src/cost_model/orm.rs +++ b/optd-persistent/src/cost_model/orm.rs @@ -13,7 +13,7 @@ use sea_orm::{ }; use super::catalog::mock_catalog::{self, MockCatalog}; -use super::interface::{CatalogSource, Stat}; +use super::interface::{CatalogSource, EpochOption, Stat}; impl BackendManager { fn get_description_from_attr_ids( @@ -154,40 +154,47 @@ impl CostModelStorageLayer for BackendManager { } } - // **IMPORTANT**: It is the caller's responsibility to ensure that the updated stat is not the same with the last stored stat if - // if it is already exists. - async fn update_stats(&self, stat: Stat, epoch_id: Self::EpochId) -> StorageResult<()> { - // let transaction = self.db.begin().await?; + /* Update the statistics in the database. + * The statistic can be newly inserted or updated. If the statistic value + * is the same as the latest existing one, the update will be ignored, and + * the return value will be None. + * If `epoch_option` is `EpochOption::Existed(epoch_id)`, the new statistic + * will be associated with the given epoch_id. If `epoch_option` is + * `EpochOption::New(source, data)`, a new epoch will be created with the + * given source and data, and the new statistic will be associated with the + * new epoch. And return the new epoch_id. + * If the statistic value is the same as the latest existing one, this function + * won't create a new epoch. + * + * For batch updates, if the caller can directly call this function with + * New epoch option at the first time, and if the epoch_id is returned, the + * caller can use the returned epoch_id for the rest of the updates. + * But if the epoch_id is not returned, the caller should continue using + * the New epoch option for the next statistic update. + */ + async fn update_stats( + &mut self, + stat: Stat, + epoch_option: EpochOption, + ) -> StorageResult> { + let transaction = self.db.begin().await?; // 0. Check if the stat already exists. If exists, get stat_id, else insert into statistic table. let stat_id = match stat.table_id { Some(table_id) => { // TODO(lanlou): only select needed fields let res = Statistic::find() .filter(statistic::Column::TableId.eq(table_id)) - .filter(statistic::Column::VariantTag.eq(stat.stat_type)) - /* - TODO(FIX_ME, lanlou): Do we need the following filter? - I am really not sure although I add the top comment... - Since we already increase the epoch, so we should update the stat anyway. - (In theory, we can increase the epoch without updating the stat, but it is not - a straightforward design, and the epoch table will be very large.) - But it will increase the overhead, since the caller will need to make another - query to check if the stat is the same with the last one. We cannot put everything - in one query. - Let us assume we should update the stat anyway for now. - */ - // .inner_join(versioned_statistic::Entity) - // .select_also(versioned_statistic::Entity) - // .order_by_desc(versioned_statistic::Column::EpochId) - .one(&self.db) + .inner_join(versioned_statistic::Entity) + .select_also(versioned_statistic::Entity) + .order_by_desc(versioned_statistic::Column::EpochId) + .one(&transaction) .await?; match res { Some(stat_data) => { - // if stat_data.1.unwrap().statistic_value == stat.stat_value { - // return Ok(()); - // } - // stat_data.0.id - stat_data.id + if stat_data.1.unwrap().statistic_value == stat.stat_value { + return Ok(None); + } + stat_data.0.id } None => { let new_stat = statistic::ActiveModel { @@ -201,7 +208,7 @@ impl CostModelStorageLayer for BackendManager { description: sea_orm::ActiveValue::Set("".to_string()), ..Default::default() }; - let res = Statistic::insert(new_stat).exec(&self.db).await; + let res = Statistic::insert(new_stat).exec(&transaction).await; match res { Ok(insert_res) => insert_res.last_insert_id, Err(_) => { @@ -221,18 +228,17 @@ impl CostModelStorageLayer for BackendManager { .filter(statistic::Column::NumberOfAttributes.eq(stat.attr_ids.len() as i32)) .filter(statistic::Column::Description.eq(description.clone())) .filter(statistic::Column::VariantTag.eq(stat.stat_type)) - // .inner_join(versioned_statistic::Entity) - // .select_also(versioned_statistic::Entity) - // .order_by_desc(versioned_statistic::Column::EpochId) - .one(&self.db) + .inner_join(versioned_statistic::Entity) + .select_also(versioned_statistic::Entity) + .order_by_desc(versioned_statistic::Column::EpochId) + .one(&transaction) .await?; match res { Some(stat_data) => { - // if stat_data.1.unwrap().statistic_value == stat.stat_value { - // return Ok(()); - // } - // stat_data.0.id - stat_data.id + if stat_data.1.unwrap().statistic_value == stat.stat_value { + return Ok(None); + } + stat_data.0.id } None => { let new_stat = statistic::ActiveModel { @@ -246,23 +252,17 @@ impl CostModelStorageLayer for BackendManager { ..Default::default() }; // TODO(lanlou): we should not clone here maybe... - let insert_res = Statistic::insert(new_stat.clone()).exec(&self.db).await?; + let insert_res = Statistic::insert(new_stat.clone()) + .exec(&transaction) + .await?; for attr_id in stat.attr_ids { let new_junction = statistic_to_attribute_junction::ActiveModel { statistic_id: sea_orm::ActiveValue::Set(insert_res.last_insert_id), attribute_id: sea_orm::ActiveValue::Set(attr_id), }; let res = StatisticToAttributeJunction::insert(new_junction) - .exec(&self.db) - .await; - if res.is_err() { - let _ = new_stat.delete(&self.db).await; - return Err(BackendError::Database(DbErr::Exec( - RuntimeErr::Internal( - "Failed to insert into statistic_to_attribute_junction table".to_string(), - ), - ))); - } + .exec(&transaction) + .await?; } insert_res.last_insert_id } @@ -270,13 +270,30 @@ impl CostModelStorageLayer for BackendManager { } }; // 1. Insert into attr_stats and related junction tables. + let mut insert_new_epoch = false; + let epoch_id = match epoch_option { + EpochOption::Existed(e) => e, + EpochOption::New(source, data) => { + insert_new_epoch = true; + let new_event = event::ActiveModel { + source_variant: sea_orm::ActiveValue::Set(source), + timestamp: sea_orm::ActiveValue::Set(Utc::now()), + data: sea_orm::ActiveValue::Set(sea_orm::JsonValue::String(data)), + ..Default::default() + }; + let insert_res = Event::insert(new_event).exec(&transaction).await?; + insert_res.last_insert_id + } + }; let new_stats = versioned_statistic::ActiveModel { epoch_id: sea_orm::ActiveValue::Set(epoch_id), statistic_id: sea_orm::ActiveValue::Set(stat_id), statistic_value: sea_orm::ActiveValue::Set(stat.stat_value), ..Default::default() }; - let _ = VersionedStatistic::insert(new_stats).exec(&self.db).await; + let _ = VersionedStatistic::insert(new_stats) + .exec(&transaction) + .await?; // 2. Invalidate all the related cost. let _ = plan_cost::Entity::update_many() @@ -297,11 +314,17 @@ impl CostModelStorageLayer for BackendManager { .to_owned(), ), ) - .exec(&self.db) - .await; + .exec(&transaction) + .await?; - // transaction.commit().await?; - Ok(()) + // TODO(lanlou): consider the update conflict for latest_epoch_id in multiple threads + if insert_new_epoch { + self.latest_epoch_id + .store(epoch_id as usize, std::sync::atomic::Ordering::Relaxed); + } + + transaction.commit().await?; + Ok(Some(epoch_id)) } async fn store_expr_stats_mappings( @@ -455,10 +478,11 @@ impl CostModelStorageLayer for BackendManager { #[cfg(test)] mod tests { - use crate::cost_model::interface::StatType; + use crate::cost_model::interface::{EpochOption, StatType}; use crate::{cost_model::interface::Stat, migrate, CostModelStorageLayer}; use crate::{get_sqlite_url, TEST_DATABASE_FILE}; use sea_orm::sqlx::database; + use sea_orm::sqlx::types::chrono::Utc; use sea_orm::Statement; use sea_orm::{ ColumnTrait, ConnectionTrait, Database, DbBackend, EntityTrait, ModelTrait, QueryFilter, @@ -497,8 +521,7 @@ mod tests { async fn copy_init_db(db_file: &str) -> String { let _ = std::fs::copy(TEST_DATABASE_FILE.clone(), db_file); - let database_url = get_sqlite_url(db_file); - database_url + get_sqlite_url(db_file) } #[tokio::test] @@ -566,7 +589,9 @@ mod tests { table_id: None, name: "countattr1".to_string(), }; - let res = backend_manager.update_stats(stat, epoch_id1).await; + let res = backend_manager + .update_stats(stat, EpochOption::Existed(epoch_id1)) + .await; assert!(res.is_ok()); let stat_res = Statistic::find() .filter(statistic::Column::Name.eq("countattr1")) @@ -636,7 +661,9 @@ mod tests { table_id: None, name: "countattr1".to_string(), }; - let res = backend_manager.update_stats(stat2, epoch_id2).await; + let res = backend_manager + .update_stats(stat2, EpochOption::Existed(epoch_id2)) + .await; assert!(res.is_ok()); // 2.3 Check statistic table let stat_res = Statistic::find() @@ -645,9 +672,7 @@ mod tests { .await .unwrap(); assert_eq!(stat_res.len(), 1); - assert_eq!(stat_res[0].number_of_attributes, 1); assert_eq!(stat_res[0].description, "1".to_string()); - assert_eq!(stat_res[0].variant_tag, StatType::Count as i32); // 2.4 Check statistic_to_attribute_junction table let stat_attr_res = StatisticToAttributeJunction::find() .filter(statistic_to_attribute_junction::Column::StatisticId.eq(stat_res[0].id)) @@ -681,11 +706,121 @@ mod tests { assert_eq!(cost_res[0].epoch_id, epoch_id1); assert!(!cost_res[0].is_valid); + // 3. Update existed stat with the same value + let epoch_num = Event::find().all(&backend_manager.db).await.unwrap().len(); + let stat3 = Stat { + stat_type: StatType::Count as i32, + stat_value: json!(200), + attr_ids: vec![1], + table_id: None, + name: "CountAttr1".to_string(), + }; + let res = backend_manager + .update_stats( + stat3, + EpochOption::New("source".to_string(), "data".to_string()), + ) + .await; + assert!(res.is_ok()); + assert!(res.unwrap().is_none()); + let epoch_num2 = Event::find().all(&backend_manager.db).await.unwrap().len(); + assert_eq!(epoch_num, epoch_num2); + remove_db_file(DATABASE_FILE); } #[tokio::test] - async fn test_update_table_stats() {} + async fn test_update_table_stats() { + // Simulate batch updates, first insert an existed same stat with none epoch_id, + // then insert some non-existed or different stats with New epoch_option. + const DATABASE_FILE: &str = "test_update_table_stats.db"; + let database_url = copy_init_db(DATABASE_FILE).await; + let mut binding = super::BackendManager::new(Some(&database_url)).await; + let backend_manager = binding.as_mut().unwrap(); + + let table_inserted_res = TableMetadata::insert(table_metadata::ActiveModel { + name: sea_orm::ActiveValue::Set("Table2".to_string()), + namespace_id: sea_orm::ActiveValue::Set(1), + creation_time: sea_orm::ActiveValue::Set(Utc::now()), + ..Default::default() + }) + .exec(&backend_manager.db) + .await + .unwrap(); + + let statistics: Vec = vec![ + Stat { + stat_type: StatType::Count as i32, + stat_value: json!(0), + attr_ids: vec![], + table_id: Some(1), + name: "row_count".to_string(), + }, + Stat { + stat_type: StatType::Count as i32, + stat_value: json!(20), + attr_ids: vec![], + table_id: Some(1), + name: "row_count".to_string(), + }, + Stat { + stat_type: StatType::Count as i32, + stat_value: json!(100), + attr_ids: vec![], + table_id: Some(table_inserted_res.last_insert_id), + name: "Table2Count1".to_string(), + }, + ]; + + let mut epoch_id: Option = None; + for stat in statistics { + match epoch_id { + Some(e) => { + let res = backend_manager + .update_stats(stat.clone(), EpochOption::Existed(e)) + .await; + assert!(res.is_ok()); + assert!(stat.name == "Table2Count1"); + let res = res.unwrap(); + assert!(res.is_some()); + assert!(res.unwrap() == e); + let stat_res = Statistic::find() + .filter(statistic::Column::Name.eq(stat.name.clone())) + .all(&backend_manager.db) + .await + .unwrap(); + assert_eq!(stat_res.len(), 1); + let versioned_stat_res = VersionedStatistic::find() + .filter(versioned_statistic::Column::StatisticId.eq(stat_res[0].id)) + .all(&backend_manager.db) + .await + .unwrap(); + assert_eq!(versioned_stat_res.len(), 1); + assert_eq!(versioned_stat_res[0].statistic_value, stat.stat_value); + assert_eq!(versioned_stat_res[0].epoch_id, e); + } + None => { + let res = backend_manager + .update_stats( + stat.clone(), + EpochOption::New("source".to_string(), "data".to_string()), + ) + .await; + assert!(res.is_ok()); + if stat.stat_value == json!(0) { + assert!(res.unwrap().is_none()); + } else { + assert!(stat.stat_value == json!(20)); + let res = res.unwrap(); + assert!(res.is_some()); + epoch_id = Some(res.unwrap()); + } + } + } + } + + remove_db_file(DATABASE_FILE); + } #[tokio::test] async fn test_store_cost() { @@ -699,7 +834,7 @@ mod tests { .unwrap(); let physical_expression_id = 1; let cost = 42; - let res = backend_manager + backend_manager .store_cost(physical_expression_id, cost, epoch_id) .await .unwrap(); @@ -816,7 +951,10 @@ mod tests { table_id: Some(table_id), name: "row_count".to_string(), }; - backend_manager.update_stats(stat, epoch_id2).await.unwrap(); + backend_manager + .update_stats(stat, EpochOption::Existed(epoch_id2)) + .await + .unwrap(); // Get updated stats let res = backend_manager @@ -873,7 +1011,10 @@ mod tests { table_id: None, name: "cardinality".to_string(), }; - backend_manager.update_stats(stat, epoch_id2).await.unwrap(); + backend_manager + .update_stats(stat, EpochOption::Existed(epoch_id2)) + .await + .unwrap(); // Get updated stats let res = backend_manager @@ -930,7 +1071,10 @@ mod tests { table_id: None, name: "cardinality".to_string(), }; - backend_manager.update_stats(stat, epoch_id2).await.unwrap(); + backend_manager + .update_stats(stat, EpochOption::Existed(epoch_id2)) + .await + .unwrap(); // Get updated stats let res = backend_manager diff --git a/optd-persistent/src/db/init.db b/optd-persistent/src/db/init.db index 1fc7eb9..d88b92d 100644 Binary files a/optd-persistent/src/db/init.db and b/optd-persistent/src/db/init.db differ