Skip to content

Commit

Permalink
hotfix for migrations when the key encoding changes (#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
StanChe authored Jan 27, 2025
1 parent 0e29a10 commit 1adc382
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 6 deletions.
3 changes: 2 additions & 1 deletion rocks-db/src/migrations/clitems_v2.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use entities::models::Updated;

use crate::{
columns::cl_items::{ClItemDeprecated, ClItemV2},
columns::cl_items::{ClItemDeprecated, ClItemKey, ClItemV2},
migrator::{RocksMigration, SerializationType},
};

Expand All @@ -10,6 +10,7 @@ impl RocksMigration for ClItemsV2Migration {
const VERSION: u64 = 5;
const DESERIALIZATION_TYPE: SerializationType = SerializationType::Bincode;
const SERIALIZATION_TYPE: SerializationType = SerializationType::Bincode;
type KeyType = ClItemKey;
type NewDataType = ClItemV2;
type OldDataType = ClItemDeprecated;
}
Expand Down
1 change: 1 addition & 0 deletions rocks-db/src/migrations/offchain_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl RocksMigration for OffChainDataMigration {
const VERSION: u64 = 4;
const DESERIALIZATION_TYPE: SerializationType = SerializationType::Bincode;
const SERIALIZATION_TYPE: SerializationType = SerializationType::Flatbuffers;
type KeyType = String;
type NewDataType = OffChainData;
type OldDataType = OffChainDataDeprecated;
}
11 changes: 8 additions & 3 deletions rocks-db/src/migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,17 @@ pub trait RocksMigration {
const VERSION: u64;
const DESERIALIZATION_TYPE: SerializationType;
const SERIALIZATION_TYPE: SerializationType;
type NewDataType: Sync + Serialize + DeserializeOwned + Send + TypedColumn;
type KeyType: 'static + Hash + Eq + std::fmt::Debug;
type NewDataType: Sync
+ Serialize
+ DeserializeOwned
+ Send
+ TypedColumn<KeyType = Self::KeyType>;
type OldDataType: Sync
+ Serialize
+ DeserializeOwned
+ Send
+ TypedColumn
+ TypedColumn<KeyType = Self::KeyType>
+ Into<<Self::NewDataType as TypedColumn>::ValueType>;
}

Expand Down Expand Up @@ -242,7 +247,7 @@ impl<'a> MigrationApplier<'a> {
{
let mut batch = HashMap::new();
for (key, value) in Self::migration_column_iter::<M>(&temporary_migration_storage.db)? {
let key_decoded = match column.decode_key(key.to_vec()) {
let key_decoded = match M::OldDataType::decode_key(key.to_vec()) {
Ok(key_decoded) => key_decoded,
Err(e) => {
error!("migration data decode_key: {:?}, {}", key.to_vec(), e);
Expand Down
95 changes: 93 additions & 2 deletions rocks-db/tests/migration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{str::FromStr, sync::Arc};

use entities::models::Updated;
use metrics_utils::red::RequestErrorDurationMetrics;
use rocks_db::{
column::TypedColumn,
columns::offchain_data::{OffChainData, OffChainDataDeprecated},
columns::{
cl_items::{ClItemDeprecated, ClItemKey, ClItemV2},
offchain_data::{OffChainData, OffChainDataDeprecated},
},
migrator::MigrationState,
Storage,
};
use solana_sdk::pubkey::Pubkey;
use tempfile::TempDir;
use tokio::{sync::Mutex, task::JoinSet};

Expand Down Expand Up @@ -112,6 +117,92 @@ mod tests {
assert_eq!(migrated_v2.last_read_at, 0);
}

#[tokio::test]
async fn test_clitems_v2_migration() {
let dir = TempDir::new().unwrap();
let node_id = 32782;
let tree_id = Pubkey::from_str("6EdzmXrunmS1gqkuWzDuP94o1YPNc2cb8z45G1eQaMpp")
.expect("a valid pubkey");
let hash = [
93, 208, 232, 135, 101, 117, 109, 249, 149, 77, 57, 114, 173, 168, 145, 196, 185, 190,
21, 121, 205, 253, 143, 155, 82, 119, 9, 143, 73, 176, 233, 179,
]
.to_vec();
let seq = 32;
let v1 = ClItemDeprecated {
cli_node_idx: node_id,
cli_tree_key: tree_id,
cli_leaf_idx: None,
cli_seq: seq,
cli_level: 2,
cli_hash: hash.clone(),
slot_updated: 239021690,
};
let key = ClItemKey::new(node_id, tree_id);
let path = dir.path().to_str().unwrap();
let old_storage = Storage::open(
path,
Arc::new(Mutex::new(JoinSet::new())),
Arc::new(RequestErrorDurationMetrics::new()),
MigrationState::Version(0),
)
.unwrap();
old_storage.cl_items_deprecated.put(key.clone(), v1.clone()).expect("should put");
drop(old_storage);
let secondary_storage_dir = TempDir::new().unwrap();
let migration_version_manager = Storage::open_secondary(
path,
secondary_storage_dir.path().to_str().unwrap(),
Arc::new(Mutex::new(JoinSet::new())),
Arc::new(RequestErrorDurationMetrics::new()),
MigrationState::Version(4),
)
.unwrap();
let binding = TempDir::new().unwrap();
let migration_storage_path = binding.path().to_str().unwrap();
Storage::apply_all_migrations(
path,
migration_storage_path,
Arc::new(migration_version_manager),
)
.await
.unwrap();

let new_storage = Storage::open(
path,
Arc::new(Mutex::new(JoinSet::new())),
Arc::new(RequestErrorDurationMetrics::new()),
MigrationState::Version(4),
)
.unwrap();
let migrated_v1 = new_storage
.db
.get_pinned_cf(
&new_storage.db.cf_handle(ClItemV2::NAME).unwrap(),
ClItemV2::encode_key(key.clone()),
)
.expect("expect to get value successfully")
.expect("value to be present");

print!("migrated is {:?}", migrated_v1.to_vec());
let migrated_v1 = new_storage
.cl_items
.get(key.clone())
.expect("should get value successfully")
.expect("the value should be not empty");
assert_eq!(migrated_v1.finalized_hash, None);
assert_eq!(migrated_v1.leaf_idx, None);
assert_eq!(migrated_v1.level, 2);
assert_eq!(
migrated_v1.pending_hash,
Some(Updated::new(
239021690,
Some(entities::models::UpdateVersion::Sequence(seq)),
hash
))
);
}

#[test]
#[ignore = "TODO: test migrations on relevant columns"]
fn test_merge_fail() {
Expand Down

0 comments on commit 1adc382

Please sign in to comment.