diff --git a/Cargo.toml b/Cargo.toml index 33502cb..af82ece 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ derive_more = "0.99.17" ed25519-dalek = { version = "2.1.1", features = ["rand_core", "pem"] } futures = "0.3.30" include_dir = { version = "0.7.4", features = ["nightly"] } -rrr = { git = "https://github.com/recursive-record-registry/rrr.git", rev = "fbdd4c2953f78796b85914162b0ec592e8d2e5b6" } +rrr = { git = "https://github.com/recursive-record-registry/rrr.git", rev = "f2dfeeba77a386d05a7182584a32c508aa831567" } serde = { version = "1.0.203", features = ["derive"] } serde_bytes = "0.11.14" serde_with = "3.8.1" diff --git a/src/lib.rs b/src/lib.rs index 2a4b60b..d2cea2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(array_windows)] + use chrono::DateTime; use color_eyre::eyre::OptionExt; use futures::{future::BoxFuture, FutureExt}; @@ -46,6 +48,7 @@ pub async fn save_record_versioned( record_path: &RecordPath, output_record: &Record, hashed_key: &HashedRecordKey, + split_at: &[usize], stats: &mut MakeRecursiveStatistics, ) -> color_eyre::Result<()> { let existing_versions = output_registry @@ -91,7 +94,7 @@ pub async fn save_record_versioned( output_record, new_version, max_collision_resolution_attempts, - &[], // TODO + split_at, encryption.as_ref(), false, ) @@ -113,7 +116,7 @@ pub async fn save_record_versioned( output_record, 0.into(), // This is the first version of the record, as no other versions have been found. max_collision_resolution_attempts, - &[], // TODO + split_at, encryption.as_ref(), false, ) @@ -139,13 +142,8 @@ pub fn make_recursive<'a, L: FileLock>( ) -> BoxFuture<'a, color_eyre::Result<()>> { async move { let mut data = Vec::new(); - - input_record - .read() - .await? - .expect("Data not found.") - .read_to_end(&mut data) - .await?; + let mut read_result = input_record.read().await?.expect("Data not found."); + read_result.read.read_to_end(&mut data).await?; let output_record = Record { metadata: { @@ -181,6 +179,8 @@ pub fn make_recursive<'a, L: FileLock>( &record_path, &output_record, &hashed_key, + // TODO: Handle `SplittingStrategy::Fill` + read_result.split_at.as_deref().unwrap_or_default(), stats, ) .await?; diff --git a/src/owned/record.rs b/src/owned/record.rs index 0c04792..4e33f7e 100644 --- a/src/owned/record.rs +++ b/src/owned/record.rs @@ -1,20 +1,20 @@ use chrono::{DateTime, Utc}; use color_eyre::{ - eyre::{bail, eyre, OptionExt}, + eyre::{bail, eyre}, Result, }; +use core::str; use futures::future::{BoxFuture, FutureExt}; use rrr::{crypto::encryption::EncryptionAlgorithm, record::segment::SegmentEncryption}; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use std::{ collections::HashSet, - ffi::OsString, fmt::Debug, path::{Path, PathBuf}, str::FromStr, }; -use tokio::io::{AsyncRead, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use crate::{error::Error, registry::OwnedRegistryConfig, util::serde::DoubleOption}; @@ -70,6 +70,7 @@ impl From for OwnedRecordConfigEncryptionUnresolved #[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct OwnedRecordConfigParametersUnresolved { + pub splitting_strategy: Option, pub encryption: DoubleOption, } @@ -78,12 +79,14 @@ impl Unresolved for OwnedRecordConfigParametersUnresolved { fn or(self, fallback: Self) -> Self { Self { + splitting_strategy: self.splitting_strategy.or(fallback.splitting_strategy), encryption: self.encryption.or(fallback.encryption), } } fn resolve(self) -> Result { if let Self { + splitting_strategy: Some(splitting_strategy), encryption: Some(encryption), } = self { @@ -92,9 +95,11 @@ impl Unresolved for OwnedRecordConfigParametersUnresolved { .transpose() { Ok(resolved) => Ok(Self::Resolved { + splitting_strategy, encryption: resolved, }), Err(unresolved) => Err(Self { + splitting_strategy: Some(splitting_strategy), encryption: Some(Some(unresolved).into()), }), } @@ -107,6 +112,7 @@ impl Unresolved for OwnedRecordConfigParametersUnresolved { impl From for OwnedRecordConfigParametersUnresolved { fn from(value: OwnedRecordConfigParameters) -> Self { Self { + splitting_strategy: Some(value.splitting_strategy), encryption: Some( value .encryption @@ -175,8 +181,24 @@ pub struct OwnedRecordConfigEncryption { pub segment_padding_to_bytes: u64, } +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum SplittingStrategy { + /// Automatically split the data into segments of maximum size. + Fill {}, + /// Each segment is created from its corresponding numbered data file. + Manual {}, +} + +impl Default for SplittingStrategy { + fn default() -> Self { + Self::Fill {} + } +} + +/// Parameters can be defined in the registry config, and individually overwritten in each record config. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OwnedRecordConfigParameters { + pub splitting_strategy: SplittingStrategy, pub encryption: Option, } @@ -187,6 +209,11 @@ pub struct OwnedRecordConfig { pub parameters: OwnedRecordConfigParameters, } +pub struct OwnedRecordReadSuccess { + pub read: R, + pub split_at: Option>, +} + #[derive(Debug)] pub struct OwnedRecord { pub directory_path: PathBuf, @@ -299,16 +326,46 @@ impl OwnedRecord { } } - pub async fn read(&self) -> Result> { - match tokio::fs::OpenOptions::new() - .read(true) - .open(self.get_data_path().await?) - .await - { - Ok(file) => Ok(Some(file)), - Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None), - Err(error) => Err(error.into()), + pub async fn read( + &self, + ) -> Result>> { + let data_paths = self.get_data_paths().await?; + let Some((data_paths_first, data_paths_rest)) = data_paths.split_first() else { + return Ok(None); + }; + let mut split_at = match self.config.parameters.splitting_strategy { + SplittingStrategy::Fill {} => None, + SplittingStrategy::Manual {} => Some(Vec::new()), + }; + let mut read: Box = { + let file_first = tokio::fs::OpenOptions::new() + .read(true) + .open(data_paths_first) + .await?; + if let Some(split_at) = split_at.as_mut() { + let metadata = file_first.metadata().await?; + split_at.push(metadata.len() as usize); + } + Box::new(file_first) + }; + + for data_path in data_paths_rest.iter() { + let file = tokio::fs::OpenOptions::new() + .read(true) + .open(data_path) + .await?; + if let Some(split_at) = split_at.as_mut() { + let metadata = file.metadata().await?; + split_at.push(metadata.len() as usize); + } + read = Box::new(read.chain(file)); + } + + if let Some(split_at) = split_at.as_mut() { + split_at.pop(); } + + Ok(Some(OwnedRecordReadSuccess { read, split_at })) } pub fn get_config_path_from_record_directory_path(directory_path: impl AsRef) -> PathBuf { @@ -319,26 +376,79 @@ impl OwnedRecord { Self::get_config_path_from_record_directory_path(&self.directory_path) } - pub async fn get_data_path(&self) -> Result { - const FILE_STEM_DATA: &str = "data"; + pub async fn get_data_paths(&self) -> Result> { + const FILE_STEM_DATA: &[u8] = b"data"; let mut read_dir = tokio::fs::read_dir(&self.directory_path).await?; - let mut candidate = None; + let mut results = Vec::<(Option, PathBuf)>::new(); while let Some(dir_entry) = read_dir.next_entry().await? { if dir_entry.file_type().await?.is_file() { let path = dir_entry.path(); + let file_name = path.file_name().expect("regular file expected"); + let mut iter = file_name.as_encoded_bytes().splitn(2, |byte| *byte == b'.'); + let stem_bytes = iter.next().expect("expected a non-empty file name"); - if path.file_stem() == Some(&OsString::from(FILE_STEM_DATA)) { - if candidate.is_none() { - candidate = Some(path); - } else { - bail!("Multiple data files available, but only one may exist"); + if stem_bytes != FILE_STEM_DATA { + continue; + } + + if let Some(extensions_bytes) = iter.next() { + let mut iter = extensions_bytes.splitn(2, |byte| *byte == b'.'); + let first = iter.next().unwrap(); + + if iter.next().is_some() { + if let Ok(first) = str::from_utf8(first) { + if let Ok(index) = first.parse::() { + results.push((Some(index), path)); + continue; + } + } + } + } + + results.push((None, path)); + } + } + + if results.is_empty() { + return Ok(Vec::new()); + } + + results.sort_unstable(); + + let indexed = matches!(results.first(), Some((Some(_), _))); + + // Ensure indexing is not mixed. + if !results.iter().all(|(index, _)| index.is_some() == indexed) { + bail!("cannot mix non-indexed and indexed record data files"); + } + + // Ensure uniqueness of indexes. + for [(index_a, path_a), (index_b, path_b)] in results.array_windows::<2>() { + if index_a == index_b { + if let Some(index) = index_a { + bail!("multiple (conflicting) record data files with index {index} exist: {path_a:?}, {path_b:?}"); + } else { + bail!("multiple (conflicting) record data files exist: {path_a:?}, {path_b:?}"); + } + } + } + + // Ensure contiguity of indexes. + if indexed { + for [(index_a, _), (index_b, _)] in results.array_windows::<2>() { + if let (Some(index_a), Some(index_b)) = (index_a, index_b) { + if *index_a + 1 != *index_b { + bail!( + "indexed record data files are not contiguous, missing index {}", + *index_a + 1 + ); } } } } - candidate.ok_or_eyre("No data file found") + Ok(results.into_iter().map(|(_, path)| path).collect()) } } diff --git a/src/owned/registry.rs b/src/owned/registry.rs index 1e24000..73857aa 100644 --- a/src/owned/registry.rs +++ b/src/owned/registry.rs @@ -27,7 +27,7 @@ use crate::record::{ OwnedRecordConfigEncryption, OwnedRecordConfigParameters, OwnedRecordConfigParametersUnresolved, }; -use super::record::OwnedRecord; +use super::record::{OwnedRecord, SplittingStrategy}; /// Represents a registry with cryptographic credentials for editing. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -276,6 +276,7 @@ impl OwnedRegistry { .with_algorithm(KdfAlgorithm::Hkdf(HkdfParams::default())) .build_with_random_root_predecessor_nonce(csprng)?, default_record_parameters: OwnedRecordConfigParameters { + splitting_strategy: SplittingStrategy::Fill {}, encryption: Some(OwnedRecordConfigEncryption { algorithm: EncryptionAlgorithm::Aes256Gcm, segment_padding_to_bytes: 1024, // 1 KiB