Skip to content

Commit

Permalink
Add support for the manual splitting strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Limeth committed Aug 4, 2024
1 parent 7e8d25c commit 55c0bfa
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 32 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ derive_more = "0.99.17"
ed25519-dalek = { version = "2.1.1", features = ["rand_core", "pem"] }
futures = "0.3.30"
include_dir = { version = "0.7.4", features = ["nightly"] }
rrr = { git = "https://github.com/recursive-record-registry/rrr.git", rev = "fbdd4c2953f78796b85914162b0ec592e8d2e5b6" }
rrr = { git = "https://github.com/recursive-record-registry/rrr.git", rev = "f2dfeeba77a386d05a7182584a32c508aa831567" }
serde = { version = "1.0.203", features = ["derive"] }
serde_bytes = "0.11.14"
serde_with = "3.8.1"
Expand Down
18 changes: 9 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![feature(array_windows)]

use chrono::DateTime;
use color_eyre::eyre::OptionExt;
use futures::{future::BoxFuture, FutureExt};
Expand Down Expand Up @@ -46,6 +48,7 @@ pub async fn save_record_versioned<L: FileLock>(
record_path: &RecordPath,
output_record: &Record,
hashed_key: &HashedRecordKey,
split_at: &[usize],
stats: &mut MakeRecursiveStatistics,
) -> color_eyre::Result<()> {
let existing_versions = output_registry
Expand Down Expand Up @@ -91,7 +94,7 @@ pub async fn save_record_versioned<L: FileLock>(
output_record,
new_version,
max_collision_resolution_attempts,
&[], // TODO
split_at,
encryption.as_ref(),
false,
)
Expand All @@ -113,7 +116,7 @@ pub async fn save_record_versioned<L: FileLock>(
output_record,
0.into(), // This is the first version of the record, as no other versions have been found.
max_collision_resolution_attempts,
&[], // TODO
split_at,
encryption.as_ref(),
false,
)
Expand All @@ -139,13 +142,8 @@ pub fn make_recursive<'a, L: FileLock>(
) -> BoxFuture<'a, color_eyre::Result<()>> {
async move {
let mut data = Vec::new();

input_record
.read()
.await?
.expect("Data not found.")
.read_to_end(&mut data)
.await?;
let mut read_result = input_record.read().await?.expect("Data not found.");
read_result.read.read_to_end(&mut data).await?;

let output_record = Record {
metadata: {
Expand Down Expand Up @@ -181,6 +179,8 @@ pub fn make_recursive<'a, L: FileLock>(
&record_path,
&output_record,
&hashed_key,
// TODO: Handle `SplittingStrategy::Fill`
read_result.split_at.as_deref().unwrap_or_default(),
stats,
)
.await?;
Expand Down
152 changes: 131 additions & 21 deletions src/owned/record.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use chrono::{DateTime, Utc};
use color_eyre::{
eyre::{bail, eyre, OptionExt},
eyre::{bail, eyre},
Result,
};
use core::str;
use futures::future::{BoxFuture, FutureExt};
use rrr::{crypto::encryption::EncryptionAlgorithm, record::segment::SegmentEncryption};
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use std::{
collections::HashSet,
ffi::OsString,
fmt::Debug,
path::{Path, PathBuf},
str::FromStr,
};
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};

use crate::{error::Error, registry::OwnedRegistryConfig, util::serde::DoubleOption};

Expand Down Expand Up @@ -70,6 +70,7 @@ impl From<OwnedRecordConfigEncryption> for OwnedRecordConfigEncryptionUnresolved

#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct OwnedRecordConfigParametersUnresolved {
pub splitting_strategy: Option<SplittingStrategy>,
pub encryption: DoubleOption<OwnedRecordConfigEncryptionUnresolved>,
}

Expand All @@ -78,12 +79,14 @@ impl Unresolved for OwnedRecordConfigParametersUnresolved {

fn or(self, fallback: Self) -> Self {
Self {
splitting_strategy: self.splitting_strategy.or(fallback.splitting_strategy),
encryption: self.encryption.or(fallback.encryption),
}
}

fn resolve(self) -> Result<Self::Resolved, Self> {
if let Self {
splitting_strategy: Some(splitting_strategy),
encryption: Some(encryption),
} = self
{
Expand All @@ -92,9 +95,11 @@ impl Unresolved for OwnedRecordConfigParametersUnresolved {
.transpose()
{
Ok(resolved) => Ok(Self::Resolved {
splitting_strategy,
encryption: resolved,
}),
Err(unresolved) => Err(Self {
splitting_strategy: Some(splitting_strategy),
encryption: Some(Some(unresolved).into()),
}),
}
Expand All @@ -107,6 +112,7 @@ impl Unresolved for OwnedRecordConfigParametersUnresolved {
impl From<OwnedRecordConfigParameters> for OwnedRecordConfigParametersUnresolved {
fn from(value: OwnedRecordConfigParameters) -> Self {
Self {
splitting_strategy: Some(value.splitting_strategy),
encryption: Some(
value
.encryption
Expand Down Expand Up @@ -175,8 +181,24 @@ pub struct OwnedRecordConfigEncryption {
pub segment_padding_to_bytes: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum SplittingStrategy {
/// Automatically split the data into segments of maximum size.
Fill {},
/// Each segment is created from its corresponding numbered data file.
Manual {},
}

impl Default for SplittingStrategy {
fn default() -> Self {
Self::Fill {}
}
}

/// Parameters can be defined in the registry config, and individually overwritten in each record config.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OwnedRecordConfigParameters {
pub splitting_strategy: SplittingStrategy,
pub encryption: Option<OwnedRecordConfigEncryption>,
}

Expand All @@ -187,6 +209,11 @@ pub struct OwnedRecordConfig {
pub parameters: OwnedRecordConfigParameters,
}

pub struct OwnedRecordReadSuccess<R: AsyncRead + Unpin + Send + Sync> {
pub read: R,
pub split_at: Option<Vec<usize>>,
}

#[derive(Debug)]
pub struct OwnedRecord {
pub directory_path: PathBuf,
Expand Down Expand Up @@ -299,16 +326,46 @@ impl OwnedRecord {
}
}

pub async fn read(&self) -> Result<Option<impl AsyncRead>> {
match tokio::fs::OpenOptions::new()
.read(true)
.open(self.get_data_path().await?)
.await
{
Ok(file) => Ok(Some(file)),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(error) => Err(error.into()),
pub async fn read(
&self,
) -> Result<Option<OwnedRecordReadSuccess<impl AsyncRead + Unpin + Send + Sync>>> {
let data_paths = self.get_data_paths().await?;
let Some((data_paths_first, data_paths_rest)) = data_paths.split_first() else {
return Ok(None);
};
let mut split_at = match self.config.parameters.splitting_strategy {
SplittingStrategy::Fill {} => None,
SplittingStrategy::Manual {} => Some(Vec::new()),
};
let mut read: Box<dyn AsyncRead + Unpin + Send + Sync> = {
let file_first = tokio::fs::OpenOptions::new()
.read(true)
.open(data_paths_first)
.await?;
if let Some(split_at) = split_at.as_mut() {
let metadata = file_first.metadata().await?;
split_at.push(metadata.len() as usize);
}
Box::new(file_first)
};

for data_path in data_paths_rest.iter() {
let file = tokio::fs::OpenOptions::new()
.read(true)
.open(data_path)
.await?;
if let Some(split_at) = split_at.as_mut() {
let metadata = file.metadata().await?;
split_at.push(metadata.len() as usize);
}
read = Box::new(read.chain(file));
}

if let Some(split_at) = split_at.as_mut() {
split_at.pop();
}

Ok(Some(OwnedRecordReadSuccess { read, split_at }))
}

pub fn get_config_path_from_record_directory_path(directory_path: impl AsRef<Path>) -> PathBuf {
Expand All @@ -319,26 +376,79 @@ impl OwnedRecord {
Self::get_config_path_from_record_directory_path(&self.directory_path)
}

pub async fn get_data_path(&self) -> Result<PathBuf> {
const FILE_STEM_DATA: &str = "data";
pub async fn get_data_paths(&self) -> Result<Vec<PathBuf>> {
const FILE_STEM_DATA: &[u8] = b"data";

let mut read_dir = tokio::fs::read_dir(&self.directory_path).await?;
let mut candidate = None;
let mut results = Vec::<(Option<usize>, PathBuf)>::new();

while let Some(dir_entry) = read_dir.next_entry().await? {
if dir_entry.file_type().await?.is_file() {
let path = dir_entry.path();
let file_name = path.file_name().expect("regular file expected");
let mut iter = file_name.as_encoded_bytes().splitn(2, |byte| *byte == b'.');
let stem_bytes = iter.next().expect("expected a non-empty file name");

if path.file_stem() == Some(&OsString::from(FILE_STEM_DATA)) {
if candidate.is_none() {
candidate = Some(path);
} else {
bail!("Multiple data files available, but only one may exist");
if stem_bytes != FILE_STEM_DATA {
continue;
}

if let Some(extensions_bytes) = iter.next() {
let mut iter = extensions_bytes.splitn(2, |byte| *byte == b'.');
let first = iter.next().unwrap();

if iter.next().is_some() {
if let Ok(first) = str::from_utf8(first) {
if let Ok(index) = first.parse::<usize>() {
results.push((Some(index), path));
continue;
}
}
}
}

results.push((None, path));
}
}

if results.is_empty() {
return Ok(Vec::new());
}

results.sort_unstable();

let indexed = matches!(results.first(), Some((Some(_), _)));

// Ensure indexing is not mixed.
if !results.iter().all(|(index, _)| index.is_some() == indexed) {
bail!("cannot mix non-indexed and indexed record data files");
}

// Ensure uniqueness of indexes.
for [(index_a, path_a), (index_b, path_b)] in results.array_windows::<2>() {
if index_a == index_b {
if let Some(index) = index_a {
bail!("multiple (conflicting) record data files with index {index} exist: {path_a:?}, {path_b:?}");
} else {
bail!("multiple (conflicting) record data files exist: {path_a:?}, {path_b:?}");
}
}
}

// Ensure contiguity of indexes.
if indexed {
for [(index_a, _), (index_b, _)] in results.array_windows::<2>() {
if let (Some(index_a), Some(index_b)) = (index_a, index_b) {
if *index_a + 1 != *index_b {
bail!(
"indexed record data files are not contiguous, missing index {}",
*index_a + 1
);
}
}
}
}

candidate.ok_or_eyre("No data file found")
Ok(results.into_iter().map(|(_, path)| path).collect())
}
}
3 changes: 2 additions & 1 deletion src/owned/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::record::{
OwnedRecordConfigEncryption, OwnedRecordConfigParameters, OwnedRecordConfigParametersUnresolved,
};

use super::record::OwnedRecord;
use super::record::{OwnedRecord, SplittingStrategy};

/// Represents a registry with cryptographic credentials for editing.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -276,6 +276,7 @@ impl OwnedRegistry<WriteLock> {
.with_algorithm(KdfAlgorithm::Hkdf(HkdfParams::default()))
.build_with_random_root_predecessor_nonce(csprng)?,
default_record_parameters: OwnedRecordConfigParameters {
splitting_strategy: SplittingStrategy::Fill {},
encryption: Some(OwnedRecordConfigEncryption {
algorithm: EncryptionAlgorithm::Aes256Gcm,
segment_padding_to_bytes: 1024, // 1 KiB
Expand Down

0 comments on commit 55c0bfa

Please sign in to comment.