From 55c0bfab8667d66cd039e45ff507bf9b9d20db7c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jakub=20Hlusi=C4=8Dka?= <Limeth@users.noreply.github.com>
Date: Sun, 4 Aug 2024 18:23:25 +0200
Subject: [PATCH] Add support for the manual splitting strategy

---
 Cargo.toml            |   2 +-
 src/lib.rs            |  18 ++---
 src/owned/record.rs   | 152 ++++++++++++++++++++++++++++++++++++------
 src/owned/registry.rs |   3 +-
 4 files changed, 143 insertions(+), 32 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 33502cb..af82ece 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,7 +24,7 @@ derive_more = "0.99.17"
 ed25519-dalek = { version = "2.1.1", features = ["rand_core", "pem"] }
 futures = "0.3.30"
 include_dir = { version = "0.7.4", features = ["nightly"] }
-rrr = { git = "https://github.com/recursive-record-registry/rrr.git", rev = "fbdd4c2953f78796b85914162b0ec592e8d2e5b6" }
+rrr = { git = "https://github.com/recursive-record-registry/rrr.git", rev = "f2dfeeba77a386d05a7182584a32c508aa831567" }
 serde = { version = "1.0.203", features = ["derive"] }
 serde_bytes = "0.11.14"
 serde_with = "3.8.1"
diff --git a/src/lib.rs b/src/lib.rs
index 2a4b60b..d2cea2f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,5 @@
+#![feature(array_windows)]
+
 use chrono::DateTime;
 use color_eyre::eyre::OptionExt;
 use futures::{future::BoxFuture, FutureExt};
@@ -46,6 +48,7 @@ pub async fn save_record_versioned<L: FileLock>(
     record_path: &RecordPath,
     output_record: &Record,
     hashed_key: &HashedRecordKey,
+    split_at: &[usize],
     stats: &mut MakeRecursiveStatistics,
 ) -> color_eyre::Result<()> {
     let existing_versions = output_registry
@@ -91,7 +94,7 @@ pub async fn save_record_versioned<L: FileLock>(
                     output_record,
                     new_version,
                     max_collision_resolution_attempts,
-                    &[], // TODO
+                    split_at,
                     encryption.as_ref(),
                     false,
                 )
@@ -113,7 +116,7 @@ pub async fn save_record_versioned<L: FileLock>(
                 output_record,
                 0.into(), // This is the first version of the record, as no other versions have been found.
                 max_collision_resolution_attempts,
-                &[], // TODO
+                split_at,
                 encryption.as_ref(),
                 false,
             )
@@ -139,13 +142,8 @@ pub fn make_recursive<'a, L: FileLock>(
 ) -> BoxFuture<'a, color_eyre::Result<()>> {
     async move {
         let mut data = Vec::new();
-
-        input_record
-            .read()
-            .await?
-            .expect("Data not found.")
-            .read_to_end(&mut data)
-            .await?;
+        let mut read_result = input_record.read().await?.expect("Data not found.");
+        read_result.read.read_to_end(&mut data).await?;
 
         let output_record = Record {
             metadata: {
@@ -181,6 +179,8 @@ pub fn make_recursive<'a, L: FileLock>(
             &record_path,
             &output_record,
             &hashed_key,
+            // TODO: Handle `SplittingStrategy::Fill`
+            read_result.split_at.as_deref().unwrap_or_default(),
             stats,
         )
         .await?;
diff --git a/src/owned/record.rs b/src/owned/record.rs
index 0c04792..4e33f7e 100644
--- a/src/owned/record.rs
+++ b/src/owned/record.rs
@@ -1,20 +1,20 @@
 use chrono::{DateTime, Utc};
 use color_eyre::{
-    eyre::{bail, eyre, OptionExt},
+    eyre::{bail, eyre},
     Result,
 };
+use core::str;
 use futures::future::{BoxFuture, FutureExt};
 use rrr::{crypto::encryption::EncryptionAlgorithm, record::segment::SegmentEncryption};
 use serde::{Deserialize, Serialize};
 use serde_bytes::ByteBuf;
 use std::{
     collections::HashSet,
-    ffi::OsString,
     fmt::Debug,
     path::{Path, PathBuf},
     str::FromStr,
 };
-use tokio::io::{AsyncRead, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
 
 use crate::{error::Error, registry::OwnedRegistryConfig, util::serde::DoubleOption};
 
@@ -70,6 +70,7 @@ impl From<OwnedRecordConfigEncryption> for OwnedRecordConfigEncryptionUnresolved
 
 #[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
 pub struct OwnedRecordConfigParametersUnresolved {
+    pub splitting_strategy: Option<SplittingStrategy>,
     pub encryption: DoubleOption<OwnedRecordConfigEncryptionUnresolved>,
 }
 
@@ -78,12 +79,14 @@ impl Unresolved for OwnedRecordConfigParametersUnresolved {
 
     fn or(self, fallback: Self) -> Self {
         Self {
+            splitting_strategy: self.splitting_strategy.or(fallback.splitting_strategy),
             encryption: self.encryption.or(fallback.encryption),
         }
     }
 
     fn resolve(self) -> Result<Self::Resolved, Self> {
         if let Self {
+            splitting_strategy: Some(splitting_strategy),
             encryption: Some(encryption),
         } = self
         {
@@ -92,9 +95,11 @@ impl Unresolved for OwnedRecordConfigParametersUnresolved {
                 .transpose()
             {
                 Ok(resolved) => Ok(Self::Resolved {
+                    splitting_strategy,
                     encryption: resolved,
                 }),
                 Err(unresolved) => Err(Self {
+                    splitting_strategy: Some(splitting_strategy),
                     encryption: Some(Some(unresolved).into()),
                 }),
             }
@@ -107,6 +112,7 @@ impl Unresolved for OwnedRecordConfigParametersUnresolved {
 impl From<OwnedRecordConfigParameters> for OwnedRecordConfigParametersUnresolved {
     fn from(value: OwnedRecordConfigParameters) -> Self {
         Self {
+            splitting_strategy: Some(value.splitting_strategy),
             encryption: Some(
                 value
                     .encryption
@@ -175,8 +181,24 @@ pub struct OwnedRecordConfigEncryption {
     pub segment_padding_to_bytes: u64,
 }
 
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+pub enum SplittingStrategy {
+    /// Automatically split the data into segments of maximum size.
+    Fill {},
+    /// Each segment is created from its corresponding numbered data file.
+    Manual {},
+}
+
+impl Default for SplittingStrategy {
+    fn default() -> Self {
+        Self::Fill {}
+    }
+}
+
+/// Parameters can be defined in the registry config, and individually overwritten in each record config.
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct OwnedRecordConfigParameters {
+    pub splitting_strategy: SplittingStrategy,
     pub encryption: Option<OwnedRecordConfigEncryption>,
 }
 
@@ -187,6 +209,11 @@ pub struct OwnedRecordConfig {
     pub parameters: OwnedRecordConfigParameters,
 }
 
+pub struct OwnedRecordReadSuccess<R: AsyncRead + Unpin + Send + Sync> {
+    pub read: R,
+    pub split_at: Option<Vec<usize>>,
+}
+
 #[derive(Debug)]
 pub struct OwnedRecord {
     pub directory_path: PathBuf,
@@ -299,16 +326,46 @@ impl OwnedRecord {
         }
     }
 
-    pub async fn read(&self) -> Result<Option<impl AsyncRead>> {
-        match tokio::fs::OpenOptions::new()
-            .read(true)
-            .open(self.get_data_path().await?)
-            .await
-        {
-            Ok(file) => Ok(Some(file)),
-            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
-            Err(error) => Err(error.into()),
+    pub async fn read(
+        &self,
+    ) -> Result<Option<OwnedRecordReadSuccess<impl AsyncRead + Unpin + Send + Sync>>> {
+        let data_paths = self.get_data_paths().await?;
+        let Some((data_paths_first, data_paths_rest)) = data_paths.split_first() else {
+            return Ok(None);
+        };
+        let mut split_at = match self.config.parameters.splitting_strategy {
+            SplittingStrategy::Fill {} => None,
+            SplittingStrategy::Manual {} => Some(Vec::new()),
+        };
+        let mut read: Box<dyn AsyncRead + Unpin + Send + Sync> = {
+            let file_first = tokio::fs::OpenOptions::new()
+                .read(true)
+                .open(data_paths_first)
+                .await?;
+            if let Some(split_at) = split_at.as_mut() {
+                let metadata = file_first.metadata().await?;
+                split_at.push(metadata.len() as usize);
+            }
+            Box::new(file_first)
+        };
+
+        for data_path in data_paths_rest.iter() {
+            let file = tokio::fs::OpenOptions::new()
+                .read(true)
+                .open(data_path)
+                .await?;
+            if let Some(split_at) = split_at.as_mut() {
+                let metadata = file.metadata().await?;
+                split_at.push(metadata.len() as usize);
+            }
+            read = Box::new(read.chain(file));
+        }
+
+        if let Some(split_at) = split_at.as_mut() {
+            split_at.pop();
         }
+
+        Ok(Some(OwnedRecordReadSuccess { read, split_at }))
     }
 
     pub fn get_config_path_from_record_directory_path(directory_path: impl AsRef<Path>) -> PathBuf {
@@ -319,26 +376,79 @@ impl OwnedRecord {
         Self::get_config_path_from_record_directory_path(&self.directory_path)
     }
 
-    pub async fn get_data_path(&self) -> Result<PathBuf> {
-        const FILE_STEM_DATA: &str = "data";
+    pub async fn get_data_paths(&self) -> Result<Vec<PathBuf>> {
+        const FILE_STEM_DATA: &[u8] = b"data";
 
         let mut read_dir = tokio::fs::read_dir(&self.directory_path).await?;
-        let mut candidate = None;
+        let mut results = Vec::<(Option<usize>, PathBuf)>::new();
 
         while let Some(dir_entry) = read_dir.next_entry().await? {
             if dir_entry.file_type().await?.is_file() {
                 let path = dir_entry.path();
+                let file_name = path.file_name().expect("regular file expected");
+                let mut iter = file_name.as_encoded_bytes().splitn(2, |byte| *byte == b'.');
+                let stem_bytes = iter.next().expect("expected a non-empty file name");
 
-                if path.file_stem() == Some(&OsString::from(FILE_STEM_DATA)) {
-                    if candidate.is_none() {
-                        candidate = Some(path);
-                    } else {
-                        bail!("Multiple data files available, but only one may exist");
+                if stem_bytes != FILE_STEM_DATA {
+                    continue;
+                }
+
+                if let Some(extensions_bytes) = iter.next() {
+                    let mut iter = extensions_bytes.splitn(2, |byte| *byte == b'.');
+                    let first = iter.next().unwrap();
+
+                    if iter.next().is_some() {
+                        if let Ok(first) = str::from_utf8(first) {
+                            if let Ok(index) = first.parse::<usize>() {
+                                results.push((Some(index), path));
+                                continue;
+                            }
+                        }
+                    }
+                }
+
+                results.push((None, path));
+            }
+        }
+
+        if results.is_empty() {
+            return Ok(Vec::new());
+        }
+
+        results.sort_unstable();
+
+        let indexed = matches!(results.first(), Some((Some(_), _)));
+
+        // Ensure indexing is not mixed.
+        if !results.iter().all(|(index, _)| index.is_some() == indexed) {
+            bail!("cannot mix non-indexed and indexed record data files");
+        }
+
+        // Ensure uniqueness of indexes.
+        for [(index_a, path_a), (index_b, path_b)] in results.array_windows::<2>() {
+            if index_a == index_b {
+                if let Some(index) = index_a {
+                    bail!("multiple (conflicting) record data files with index {index} exist: {path_a:?}, {path_b:?}");
+                } else {
+                    bail!("multiple (conflicting) record data files exist: {path_a:?}, {path_b:?}");
+                }
+            }
+        }
+
+        // Ensure contiguity of indexes.
+        if indexed {
+            for [(index_a, _), (index_b, _)] in results.array_windows::<2>() {
+                if let (Some(index_a), Some(index_b)) = (index_a, index_b) {
+                    if *index_a + 1 != *index_b {
+                        bail!(
+                            "indexed record data files are not contiguous, missing index {}",
+                            *index_a + 1
+                        );
                     }
                 }
             }
         }
 
-        candidate.ok_or_eyre("No data file found")
+        Ok(results.into_iter().map(|(_, path)| path).collect())
     }
 }
diff --git a/src/owned/registry.rs b/src/owned/registry.rs
index 1e24000..73857aa 100644
--- a/src/owned/registry.rs
+++ b/src/owned/registry.rs
@@ -27,7 +27,7 @@ use crate::record::{
     OwnedRecordConfigEncryption, OwnedRecordConfigParameters, OwnedRecordConfigParametersUnresolved,
 };
 
-use super::record::OwnedRecord;
+use super::record::{OwnedRecord, SplittingStrategy};
 
 /// Represents a registry with cryptographic credentials for editing.
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
@@ -276,6 +276,7 @@ impl OwnedRegistry<WriteLock> {
                 .with_algorithm(KdfAlgorithm::Hkdf(HkdfParams::default()))
                 .build_with_random_root_predecessor_nonce(csprng)?,
             default_record_parameters: OwnedRecordConfigParameters {
+                splitting_strategy: SplittingStrategy::Fill {},
                 encryption: Some(OwnedRecordConfigEncryption {
                     algorithm: EncryptionAlgorithm::Aes256Gcm,
                     segment_padding_to_bytes: 1024, // 1 KiB