Skip to content

Commit

Permalink
Implement record version listing
Browse files Browse the repository at this point in the history
  • Loading branch information
Limeth committed Jul 29, 2024
1 parent 0740d6f commit 068785b
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 97 deletions.
37 changes: 36 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl From<String> for GenericError {
}
}

pub trait OptionExt<T> {
pub(crate) trait OptionExt<T> {
fn unwrap_builder_parameter(
&self,
label: &'static str,
Expand All @@ -119,3 +119,38 @@ impl<T: Clone> OptionExt<T> for Option<T> {
})
}
}

pub(crate) trait IoResultExt<T, E> {
fn map_err_not_found_to_none(self) -> std::result::Result<Option<T>, E>;
}

impl<T> IoResultExt<T, std::io::Error> for std::io::Result<T> {
fn map_err_not_found_to_none(self) -> std::io::Result<Option<T>> {
match self {
Ok(ok) => Ok(Some(ok)),
Err(err) => {
if err.kind() == std::io::ErrorKind::NotFound {
Ok(None)
} else {
Err(err)
}
}
}
}
}

impl<T> IoResultExt<T, Error> for std::result::Result<T, Error> {
fn map_err_not_found_to_none(self) -> std::result::Result<Option<T>, Error> {
match self {
Ok(ok) => Ok(Some(ok)),
Err(Error::Io(io_err)) => {
if io_err.kind() == std::io::ErrorKind::NotFound {
Ok(None)
} else {
Err(Error::Io(io_err))
}
}
Err(err) => Err(err),
}
}
}
249 changes: 154 additions & 95 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::cbor::{self, DateTimeParseError, TAG_RRR_RECORD};
use crate::crypto::encryption::EncryptionAlgorithm;
use crate::crypto::signature::SigningKey;
use crate::error::{Error, Result};
use crate::error::{Error, IoResultExt, Result};
use crate::registry::Registry;
use crate::utils::fd_lock::{FileLock, WriteLock};
use crate::utils::serde::{BytesOrAscii, BytesOrHexString, Secret};
Expand All @@ -20,9 +20,9 @@ use segment::{
RecordNonce, RecordParameters, RecordVersion, Segment, SegmentEncryption, SegmentMetadata,
};
use serde::{Deserialize, Serialize};
use std::iter;
use std::ops::{Deref, DerefMut};
use std::{borrow::Cow, fmt::Debug, io::Cursor};
use std::{io, iter};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tracing::{debug, info, instrument, trace};
Expand Down Expand Up @@ -172,121 +172,131 @@ pub struct RecordReadVersionSuccess {
pub segments: Vec<RecordReadVersionSuccessSegment>,
}

impl Record {
// pub async fn list_versions<L>(
// registry: &Registry<L>,
// hashed_key: &HashedRecordKey,
// ) -> Result<impl TryStream<Ok = u64, Error = Error>> {
// todo!()
// }
pub struct RecordListVersionsItem {
pub record_metadata: RecordMetadata,
pub record_version: RecordVersion,
pub record_nonce: RecordNonce,
pub segments: Vec<RecordReadVersionSuccessSegment>,
}

impl Record {
/// Attempts to read a record with the specified record version and record nonce.
#[instrument]
pub async fn read_version<L>(
pub async fn read_version_with_nonce<L>(
registry: &Registry<L>,
hash_record_path: &(impl HashRecordPath + Debug),
record_version: RecordVersion,
max_collision_resolution_attempts: u64,
record_nonce: RecordNonce,
) -> Result<Option<RecordReadVersionSuccess>>
where
L: FileLock,
{
let hashed_key = hash_record_path.hash_record_path(registry).await?;
let mut errors = Vec::<Error>::new();

'collision_resolution_loop: for record_nonce in
0..max_collision_resolution_attempts.checked_add(1).unwrap()
{
let record_parameters = RecordParameters {
version: record_version,
nonce: record_nonce.into(),
let record_parameters = RecordParameters {
version: record_version,
nonce: record_nonce,
};
let mut segments = Vec::new();
let mut data_buffer = Vec::<u8>::new();

'segment_loop: loop {
let fragment_key = FragmentKey {
hashed_record_key: hashed_key.clone(),
fragment_parameters: KdfUsageFragmentParameters {
record_parameters: record_parameters.clone(),
segment_index: (segments.len() as u64).into(),
},
};
let mut segments = Vec::new();
let mut data_buffer = Vec::<u8>::new();

'segment_loop: loop {
let fragment_key = FragmentKey {
hashed_record_key: hashed_key.clone(),
fragment_parameters: KdfUsageFragmentParameters {
record_parameters: record_parameters.clone(),
segment_index: (segments.len() as u64).into(),
},
};
let fragment_file_name =
fragment_key.derive_file_name(&registry.config.kdf).await?;
let fragment_file_tag = fragment_key.derive_file_tag(&registry.config.kdf).await?;
let fragment_path = registry.get_fragment_path(&fragment_file_name);

let segment_result: Result<FragmentReadSuccess> = try {
let fragment_file = File::open(&fragment_path).await?;
let fragment_file_guard = fragment_file.lock_read().await?;
let segment = Segment::read_fragment(
&registry.config.verifying_keys,
&registry.config.kdf,
fragment_file_guard,
&fragment_key,
)
.await?;
let found_file_tag = segment.metadata.get_file_tag()?;

if found_file_tag != fragment_file_tag {
Err(Error::FileTagMismatch)?;
}
let fragment_file_name = fragment_key.derive_file_name(&registry.config.kdf).await?;
let fragment_file_tag = fragment_key.derive_file_tag(&registry.config.kdf).await?;
let fragment_path = registry.get_fragment_path(&fragment_file_name);

let segment_result: Result<FragmentReadSuccess> = try {
let fragment_file = File::open(&fragment_path).await?;
let fragment_file_guard = fragment_file.lock_read().await?;
let segment = Segment::read_fragment(
&registry.config.verifying_keys,
&registry.config.kdf,
fragment_file_guard,
&fragment_key,
)
.await?;
let found_file_tag = segment.metadata.get_file_tag()?;

segment
};
if found_file_tag != fragment_file_tag {
Err(Error::FileTagMismatch)?;
}

trace!(
?fragment_key,
?fragment_file_name,
?fragment_path,
error = ?segment_result.as_ref().err(),
"Attempted to load a record fragment"
);

let segment = match segment_result {
Ok(segment) => segment,
Err(error) => {
if segments.is_empty() {
errors.push(error);
continue 'collision_resolution_loop;
} else {
return Err(error);
}
}
};
segment
};

data_buffer.extend_from_slice(&segment.data);
trace!(
?fragment_key,
?fragment_file_name,
?fragment_path,
error = ?segment_result.as_ref().err(),
"Attempted to load a record fragment"
);

let segment = match segment_result.map_err_not_found_to_none() {
Ok(Some(segment)) => segment,
Ok(None) => return Ok(None),
Err(err) => return Err(err),
};

segments.push(RecordReadVersionSuccessSegment {
segment_bytes: segment.data.len(),
fragment_file_name,
fragment_encryption_algorithm: segment.encryption_algorithm,
});
data_buffer.extend_from_slice(&segment.data);

if segment.metadata.get_last()? {
break 'segment_loop;
}
segments.push(RecordReadVersionSuccessSegment {
segment_bytes: segment.data.len(),
fragment_file_name,
fragment_encryption_algorithm: segment.encryption_algorithm,
});

if segment.metadata.get_last()? {
break 'segment_loop;
}
}

let record = coset::cbor::from_reader::<Self, _>(Cursor::new(&data_buffer))
.map_err(Error::CborDe)?;
let record = coset::cbor::from_reader::<Self, _>(Cursor::new(&data_buffer))
.map_err(Error::CborDe)?;

info!(%record_nonce, "Record read successfully");
info!(record_nonce = %*record_nonce, "Record read successfully");

return Ok(Some(RecordReadVersionSuccess {
record,
record_nonce: record_nonce.into(),
segments,
}));
}
Ok(Some(RecordReadVersionSuccess {
record,
record_nonce,
segments,
}))
}

errors.retain(|error| {
if let Error::Io(error) = error {
error.kind() != io::ErrorKind::NotFound
} else {
!matches!(error, &Error::FileTagMismatch)
}
});
#[instrument]
pub async fn read_version<L>(
registry: &Registry<L>,
hash_record_path: &(impl HashRecordPath + Debug),
record_version: RecordVersion,
max_collision_resolution_attempts: u64,
) -> Result<Option<RecordReadVersionSuccess>>
where
L: FileLock,
{
let hashed_key = hash_record_path.hash_record_path(registry).await?;
let mut errors = Vec::<Error>::new();

for record_nonce in (0..=max_collision_resolution_attempts).map(RecordNonce) {
let record_result =
Self::read_version_with_nonce(registry, &hashed_key, record_version, record_nonce)
.await;

match record_result {
Ok(Some(record)) => return Ok(Some(record)),
Ok(None) | Err(Error::FileTagMismatch) => continue,
Err(error) => {
errors.push(error);
continue;
}
};
}

if let Some(error) = errors.into_iter().next() {
Err(error)
Expand All @@ -295,6 +305,55 @@ impl Record {
}
}

#[instrument]
pub async fn list_versions<L>(
registry: &Registry<L>,
hash_record_path: &(impl HashRecordPath + Debug),
max_version_lookahead: u64,
max_collision_resolution_attempts: u64,
) -> Result<Vec<RecordListVersionsItem>>
where
L: FileLock,
{
let hashed_key = hash_record_path.hash_record_path(registry).await?;
let mut versions = Vec::<RecordListVersionsItem>::new();

'version_loop: loop {
let version_lookahead_start = versions
.last()
.map(|item| RecordVersion(*item.record_version + 1))
.unwrap_or(RecordVersion(0));

'version_lookahead_loop: for version_lookahead in 0..=max_version_lookahead {
let record_version = RecordVersion(version_lookahead_start.0 + version_lookahead);
let record_success = Self::read_version(
registry,
&hashed_key,
record_version,
max_collision_resolution_attempts,
)
.await?;

if let Some(record_success) = record_success {
versions.push(RecordListVersionsItem {
// TODO/FIXME: Currently reads the entire record, even though just the metadata would be sufficient.
record_metadata: record_success.record.metadata,
record_version,
record_nonce: record_success.record_nonce,
segments: record_success.segments,
});
continue 'version_loop;
} else {
continue 'version_lookahead_loop;
}
}

break;
}

Ok(versions)
}

#[instrument]
pub async fn write_version(
&self,
Expand Down
19 changes: 18 additions & 1 deletion src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::error::{Error, InvalidParameterError, OptionExt, Result};
use crate::record::segment::{
FragmentFileNameBytes, RecordNonce, RecordVersion, SegmentEncryption,
};
use crate::record::{HashRecordPath, Record, RecordReadVersionSuccess, SuccessionNonce};
use crate::record::{
HashRecordPath, Record, RecordListVersionsItem, RecordReadVersionSuccess, SuccessionNonce,
};
use crate::utils::fd_lock::{FileLock, ReadLock, WriteLock};
use crate::utils::serde::{BytesOrHexString, Secret};
use async_scoped::TokioScope;
Expand Down Expand Up @@ -539,6 +541,21 @@ impl<L: FileLock> Registry<L> {
)
.await
}

pub async fn list_record_versions(
&self,
hash_record_path: &(impl HashRecordPath + Debug),
max_version_lookahead: u64,
max_collision_resolution_attempts: u64,
) -> Result<Vec<RecordListVersionsItem>> {
Record::list_versions(
self,
hash_record_path,
max_version_lookahead,
max_collision_resolution_attempts,
)
.await
}
}

impl<L: FileLock> PartialEq for Registry<L> {
Expand Down

0 comments on commit 068785b

Please sign in to comment.