Skip to content

Commit

Permalink
Add basic record updating
Browse files Browse the repository at this point in the history
  • Loading branch information
Limeth committed Aug 1, 2024
1 parent 8378e4a commit 7e8d25c
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 32 deletions.
5 changes: 5 additions & 0 deletions src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ use tracing_error::ErrorLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

async fn setup_tracing() -> Result<()> {
// Enable logging by default.
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", format!("{}=info", env!("CARGO_CRATE_NAME")));
}

tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(ErrorLayer::default())
Expand Down
24 changes: 22 additions & 2 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::path::PathBuf;

use crate::{make_recursive, registry::OwnedRegistry};
use crate::{make_recursive, registry::OwnedRegistry, MakeRecursiveStatistics};
use clap::Parser;
use color_eyre::eyre::Result;
use rrr::{
registry::{Registry, RegistryConfig},
utils::fd_lock::WriteLock,
};
use tracing::info;

#[derive(Parser)]
#[command(version, about)]
Expand Down Expand Up @@ -60,15 +61,34 @@ impl Command {
.clone();

// TODO: Verify target registry keys
let mut stats = MakeRecursiveStatistics::default();

make_recursive(
&mut output_registry,
&input_registry,
&input_root_record,
&root_predecessor_nonce,
force,
0, // TODO
0, // TODO
&mut Vec::new(),
&mut stats,
)
.await?;

if stats.records_created == 0 && stats.records_updated == 0 {
info! {
"Target registry unchanged. Checked {} records in total.",
stats.records_created + stats.records_updated + stats.records_unchanged,
};
} else {
info! {
"Target registry updated. Checked {} records in total. {} new records created, {} existing records updated, {} existing records unchanged.",
stats.records_created + stats.records_updated + stats.records_unchanged,
stats.records_created,
stats.records_updated,
stats.records_unchanged,
};
}
}
}

Expand Down
172 changes: 143 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use chrono::DateTime;
use color_eyre::eyre::OptionExt;
use futures::{future::BoxFuture, FutureExt};
use record::OwnedRecord;
use registry::OwnedRegistry;
use rrr::{
record::{
segment::SegmentEncryption, Record, RecordKey, RecordMetadata, RecordName, SuccessionNonce,
segment::{RecordVersion, SegmentEncryption},
HashedRecordKey, Record, RecordKey, RecordMetadata, RecordName, RecordPath,
SuccessionNonce,
},
registry::Registry,
utils::{
Expand All @@ -23,13 +26,116 @@ pub mod util;
pub mod cmd;

pub use owned::*;
use tracing::{debug, info};

#[derive(Default)]
pub struct MakeRecursiveStatistics {
pub records_created: usize,
pub records_updated: usize,
pub records_unchanged: usize,
}

/// If `output_record` differs from the latest version of the record in the `output_registry`, saves
/// the `output_record` as a new version.
pub async fn save_record_versioned<L: FileLock>(
output_registry: &mut Registry<WriteLock>,
input_registry: &OwnedRegistry<L>,
input_record: &OwnedRecord,
max_version_lookahead: u64,
max_collision_resolution_attempts: u64,
record_path: &RecordPath,
output_record: &Record,
hashed_key: &HashedRecordKey,
stats: &mut MakeRecursiveStatistics,
) -> color_eyre::Result<()> {
let existing_versions = output_registry
.list_record_versions(
hashed_key,
max_version_lookahead,
max_collision_resolution_attempts,
)
.await?;
let encryption = input_record
.config
.parameters
.encryption
.as_ref()
.map(SegmentEncryption::from);

if let Some(latest_existing_version) = existing_versions.last() {
let latest_existing_version_record = Record::read_version_with_nonce(
output_registry,
hashed_key,
latest_existing_version.record_version,
latest_existing_version.record_nonce,
)
.await?
.ok_or_eyre("Failed to load the latest version of a record.")?;

if &latest_existing_version_record.record == output_record {
debug!(version = %latest_existing_version.record_version.0, %record_path, "Record unchanged, skipping.");
stats.records_unchanged += 1;
} else {
let new_version = RecordVersion(latest_existing_version.record_version.0 + 1);

debug!(
version_previous = latest_existing_version.record_version.0,
version_current = new_version.0,
%record_path,
"Record changed, writing new version."
);
output_registry
.save_record(
&input_registry.signing_keys,
hashed_key,
output_record,
new_version,
max_collision_resolution_attempts,
&[], // TODO
encryption.as_ref(),
false,
)
.await?;
stats.records_updated += 1;

info!(
version_previous = latest_existing_version.record_version.0,
version_current = new_version.0,
%record_path,
"New version of record created."
);
}
} else {
output_registry
.save_record(
&input_registry.signing_keys,
hashed_key,
output_record,
0.into(), // This is the first version of the record, as no other versions have been found.
max_collision_resolution_attempts,
&[], // TODO
encryption.as_ref(),
false,
)
.await?;
stats.records_created += 1;

info!(%record_path, "New record created.");
}

Ok(())
}

pub fn make_recursive<'a, L: FileLock>(
output_registry: &'a mut Registry<WriteLock>,
input_registry: &'a OwnedRegistry<L>,
input_record: &'a OwnedRecord,
predecessor_nonce: &'a SuccessionNonce,
force: bool,
max_version_lookahead: u64,
max_collision_resolution_attempts: u64,
// Record path excluding the `input_record`.
path_to_parent_record: &'a mut Vec<RecordName>,
stats: &'a mut MakeRecursiveStatistics,
) -> BoxFuture<'a, color_eyre::Result<()>> {
async move {
let mut data = Vec::new();
Expand Down Expand Up @@ -60,39 +166,47 @@ pub fn make_recursive<'a, L: FileLock>(
predecessor_nonce: predecessor_nonce.clone(),
};
let hashed_key = key.hash(&input_registry.hash).await?;
let record_path = {
let mut record_path = path_to_parent_record.clone();
record_path.push(key.record_name.clone());
RecordPath::try_from(record_path).unwrap()
};

output_registry
.save_record(
&input_registry.signing_keys,
&hashed_key,
&output_record,
0.into(), // TODO
0, // TODO
&[], // TODO
input_record
.config
.parameters
.encryption
.as_ref()
.map(SegmentEncryption::from)
.as_ref(),
force,
)
.await?;
save_record_versioned(
output_registry,
input_registry,
input_record,
max_version_lookahead,
max_collision_resolution_attempts,
&record_path,
&output_record,
&hashed_key,
stats,
)
.await?;

let succession_nonce = hashed_key
.derive_succession_nonce(&input_registry.config.kdf)
.await?;

for successive_record in &input_record.successive_records {
make_recursive(
output_registry,
input_registry,
successive_record,
&succession_nonce,
force,
)
.await?;
{
path_to_parent_record.push(key.record_name.clone());

for successive_record in &input_record.successive_records {
make_recursive(
output_registry,
input_registry,
successive_record,
&succession_nonce,
max_version_lookahead,
max_collision_resolution_attempts,
path_to_parent_record,
stats,
)
.await?;
}

path_to_parent_record.pop();
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion src/owned/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ impl OwnedRecord {
let config = config_unresolved
.try_resolve_with(registry_config.default_record_parameters.clone() /* TODO: cloning seems excessive */)
.map_err(|_| eyre!("incomplete record parameters"))?;
dbg!(&config);
let mut successive_records_stream = tokio::fs::read_dir(&directory_path).await?;
let mut successive_records = Vec::new();
let mut successive_record_names = HashSet::new();
Expand Down

0 comments on commit 7e8d25c

Please sign in to comment.