diff --git a/rocks-db/src/migrations/clitems_v2.rs b/rocks-db/src/migrations/clitems_v2.rs index da66ae1e..82ba6c5c 100644 --- a/rocks-db/src/migrations/clitems_v2.rs +++ b/rocks-db/src/migrations/clitems_v2.rs @@ -1,7 +1,7 @@ use entities::models::Updated; use crate::{ - columns::cl_items::{ClItemDeprecated, ClItemV2}, + columns::cl_items::{ClItemDeprecated, ClItemKey, ClItemV2}, migrator::{RocksMigration, SerializationType}, }; @@ -10,6 +10,7 @@ impl RocksMigration for ClItemsV2Migration { const VERSION: u64 = 5; const DESERIALIZATION_TYPE: SerializationType = SerializationType::Bincode; const SERIALIZATION_TYPE: SerializationType = SerializationType::Bincode; + type KeyType = ClItemKey; type NewDataType = ClItemV2; type OldDataType = ClItemDeprecated; } diff --git a/rocks-db/src/migrations/offchain_data.rs b/rocks-db/src/migrations/offchain_data.rs index 21db0d9b..dc6a51d0 100644 --- a/rocks-db/src/migrations/offchain_data.rs +++ b/rocks-db/src/migrations/offchain_data.rs @@ -38,6 +38,7 @@ impl RocksMigration for OffChainDataMigration { const VERSION: u64 = 4; const DESERIALIZATION_TYPE: SerializationType = SerializationType::Bincode; const SERIALIZATION_TYPE: SerializationType = SerializationType::Flatbuffers; + type KeyType = String; type NewDataType = OffChainData; type OldDataType = OffChainDataDeprecated; } diff --git a/rocks-db/src/migrator.rs b/rocks-db/src/migrator.rs index da4e4b2d..ce02836b 100644 --- a/rocks-db/src/migrator.rs +++ b/rocks-db/src/migrator.rs @@ -38,12 +38,17 @@ pub trait RocksMigration { const VERSION: u64; const DESERIALIZATION_TYPE: SerializationType; const SERIALIZATION_TYPE: SerializationType; - type NewDataType: Sync + Serialize + DeserializeOwned + Send + TypedColumn; + type KeyType: 'static + Hash + Eq + std::fmt::Debug; + type NewDataType: Sync + + Serialize + + DeserializeOwned + + Send + + TypedColumn; type OldDataType: Sync + Serialize + DeserializeOwned + Send - + TypedColumn + + TypedColumn + Into<::ValueType>; } @@ -242,7 +247,7 @@ impl<'a> MigrationApplier<'a> { { let mut batch = HashMap::new(); for (key, value) in Self::migration_column_iter::(&temporary_migration_storage.db)? { - let key_decoded = match column.decode_key(key.to_vec()) { + let key_decoded = match M::OldDataType::decode_key(key.to_vec()) { Ok(key_decoded) => key_decoded, Err(e) => { error!("migration data decode_key: {:?}, {}", key.to_vec(), e); diff --git a/rocks-db/tests/migration_tests.rs b/rocks-db/tests/migration_tests.rs index b2d9aec9..d18eb14c 100644 --- a/rocks-db/tests/migration_tests.rs +++ b/rocks-db/tests/migration_tests.rs @@ -1,14 +1,19 @@ #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{str::FromStr, sync::Arc}; + use entities::models::Updated; use metrics_utils::red::RequestErrorDurationMetrics; use rocks_db::{ column::TypedColumn, - columns::offchain_data::{OffChainData, OffChainDataDeprecated}, + columns::{ + cl_items::{ClItemDeprecated, ClItemKey, ClItemV2}, + offchain_data::{OffChainData, OffChainDataDeprecated}, + }, migrator::MigrationState, Storage, }; + use solana_sdk::pubkey::Pubkey; use tempfile::TempDir; use tokio::{sync::Mutex, task::JoinSet}; @@ -112,6 +117,92 @@ mod tests { assert_eq!(migrated_v2.last_read_at, 0); } + #[tokio::test] + async fn test_clitems_v2_migration() { + let dir = TempDir::new().unwrap(); + let node_id = 32782; + let tree_id = Pubkey::from_str("6EdzmXrunmS1gqkuWzDuP94o1YPNc2cb8z45G1eQaMpp") + .expect("a valid pubkey"); + let hash = [ + 93, 208, 232, 135, 101, 117, 109, 249, 149, 77, 57, 114, 173, 168, 145, 196, 185, 190, + 21, 121, 205, 253, 143, 155, 82, 119, 9, 143, 73, 176, 233, 179, + ] + .to_vec(); + let seq = 32; + let v1 = ClItemDeprecated { + cli_node_idx: node_id, + cli_tree_key: tree_id, + cli_leaf_idx: None, + cli_seq: seq, + cli_level: 2, + cli_hash: hash.clone(), + slot_updated: 239021690, + }; + let key = ClItemKey::new(node_id, tree_id); + let path = dir.path().to_str().unwrap(); + let old_storage = Storage::open( + path, + Arc::new(Mutex::new(JoinSet::new())), + Arc::new(RequestErrorDurationMetrics::new()), + MigrationState::Version(0), + ) + .unwrap(); + old_storage.cl_items_deprecated.put(key.clone(), v1.clone()).expect("should put"); + drop(old_storage); + let secondary_storage_dir = TempDir::new().unwrap(); + let migration_version_manager = Storage::open_secondary( + path, + secondary_storage_dir.path().to_str().unwrap(), + Arc::new(Mutex::new(JoinSet::new())), + Arc::new(RequestErrorDurationMetrics::new()), + MigrationState::Version(4), + ) + .unwrap(); + let binding = TempDir::new().unwrap(); + let migration_storage_path = binding.path().to_str().unwrap(); + Storage::apply_all_migrations( + path, + migration_storage_path, + Arc::new(migration_version_manager), + ) + .await + .unwrap(); + + let new_storage = Storage::open( + path, + Arc::new(Mutex::new(JoinSet::new())), + Arc::new(RequestErrorDurationMetrics::new()), + MigrationState::Version(4), + ) + .unwrap(); + let migrated_v1 = new_storage + .db + .get_pinned_cf( + &new_storage.db.cf_handle(ClItemV2::NAME).unwrap(), + ClItemV2::encode_key(key.clone()), + ) + .expect("expect to get value successfully") + .expect("value to be present"); + + print!("migrated is {:?}", migrated_v1.to_vec()); + let migrated_v1 = new_storage + .cl_items + .get(key.clone()) + .expect("should get value successfully") + .expect("the value should be not empty"); + assert_eq!(migrated_v1.finalized_hash, None); + assert_eq!(migrated_v1.leaf_idx, None); + assert_eq!(migrated_v1.level, 2); + assert_eq!( + migrated_v1.pending_hash, + Some(Updated::new( + 239021690, + Some(entities::models::UpdateVersion::Sequence(seq)), + hash + )) + ); + } + #[test] #[ignore = "TODO: test migrations on relevant columns"] fn test_merge_fail() {