Skip to content

Commit

Permalink
[EXPERIMENT] begin adding "async" versions of trait/impls
Browse files Browse the repository at this point in the history
This is an experimentation branch.
The way I duplicated code to """quickly""" have an async impl
is a terrible way to do it.
Full code duplication is never a good option.

My goal is to reach a state of rustic/rustic_core/rustic_backend
to experiment on the async implementations.
  • Loading branch information
nardoor committed Aug 31, 2024
1 parent 28c015a commit 46a8f9b
Show file tree
Hide file tree
Showing 20 changed files with 2,720 additions and 197 deletions.
2 changes: 2 additions & 0 deletions crates/backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ semver = { version = "1.0.23", optional = true }
bytesize = "1.3.0"
rayon = { version = "1.10.0", optional = true }
tokio = { version = "1.39.3", optional = true, default-features = false }
futures = { version = "0.3", optional = true, default-features = false }
async-trait = "0.1.81"

[target.'cfg(not(windows))'.dependencies]
# opendal backend - sftp is not supported on windows, see https://github.com/apache/incubator-opendal/issues/2963
Expand Down
52 changes: 52 additions & 0 deletions crates/backend/src/choose.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This module contains [`BackendOptions`] and helpers to choose a backend from a given url.
use anyhow::{anyhow, Result};
use derive_setters::Setters;
use rustic_core::{AsyncRepositoryBackends, AsyncWriteBackend};
use std::{collections::HashMap, sync::Arc};
use strum_macros::{Display, EnumString};

Expand All @@ -10,6 +11,7 @@ use rustic_core::{RepositoryBackends, WriteBackend};
use crate::{
error::BackendAccessErrorKind,
local::LocalBackend,
opendal::AsyncOpenDALBackend,
util::{location_to_type_and_path, BackendLocation},
};

Expand Down Expand Up @@ -95,6 +97,19 @@ impl BackendOptions {
Ok(RepositoryBackends::new(be, be_hot))
}

pub fn to_async_backends(&self) -> Result<AsyncRepositoryBackends> {
let mut options = self.options.clone();
options.extend(self.options_cold.clone());
let be = self
.get_async_backed(self.repository.as_ref(), options)?
.ok_or_else(|| anyhow!("No repository given."))?;
let mut options = self.options.clone();
options.extend(self.options_hot.clone());
let be_hot = self.get_async_backed(self.repo_hot.as_ref(), options)?;

Ok(AsyncRepositoryBackends::new(be, be_hot))
}

/// Get the backend for the given repository.
///
/// # Arguments
Expand Down Expand Up @@ -125,6 +140,25 @@ impl BackendOptions {
})
.transpose()
}

fn get_async_backed(
&self,
repo_string: Option<&String>,
options: HashMap<String, String>,
) -> Result<Option<Arc<dyn AsyncWriteBackend>>> {
repo_string
.map(|string| {
let (be_type, location) = location_to_type_and_path(string)?;
match be_type.to_async_backends(location, options.into()) {
Ok(e) => Ok(e),
Err(e) if e.downcast_ref::<BackendAccessErrorKind>().is_some() => Err(e.into()),
Err(e) => {
Err(BackendAccessErrorKind::BackendLoadError(be_type.to_string(), e).into())
}
}
})
.transpose()
}
}

/// Trait which can be implemented to choose a backend from a backend type, a backend path and options given as `HashMap`.
Expand All @@ -146,6 +180,12 @@ pub trait BackendChoice {
location: BackendLocation,
options: Option<HashMap<String, String>>,
) -> Result<Arc<dyn WriteBackend>>;

fn to_async_backends(
&self,
location: BackendLocation,
options: Option<HashMap<String, String>>,
) -> Result<Arc<dyn AsyncWriteBackend>>;
}

/// The supported backend types.
Expand Down Expand Up @@ -196,6 +236,18 @@ impl BackendChoice for SupportedBackend {
Self::OpenDAL => Arc::new(OpenDALBackend::new(location, options)?),
})
}

fn to_async_backends(
&self,
location: BackendLocation,
options: Option<HashMap<String, String>>,
) -> Result<Arc<dyn AsyncWriteBackend>> {
let options = options.unwrap_or_default();
match self {
Self::OpenDAL => Ok(Arc::new(AsyncOpenDALBackend::new(location, options)?)),
_ => Err(BackendAccessErrorKind::BackendNoAsync(location.to_string()).into()),
}
}
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions crates/backend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use thiserror::Error;
/// [`BackendAccessErrorKind`] describes the errors that can be returned by the various Backends
#[derive(Error, Debug, Display)]
pub enum BackendAccessErrorKind {
/// no async variant implemented for backend {0:1}
BackendNoAsync(String),
/// backend {0:?} is not supported!
BackendNotSupported(String),
/// backend {0} cannot be loaded: {1:?}
Expand Down
240 changes: 234 additions & 6 deletions crates/backend/src/opendal.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,43 @@
/// `OpenDAL` backend for rustic.
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::OnceLock};
use std::{borrow::Borrow, collections::HashMap, path::PathBuf, str::FromStr, sync::OnceLock};

use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use bytes::Bytes;
use bytesize::ByteSize;
use log::trace;
use log::{debug, trace};
use opendal::{
layers::{BlockingLayer, ConcurrentLimitLayer, LoggingLayer, RetryLayer, ThrottleLayer},
BlockingOperator, ErrorKind, Metakey, Operator, Scheme,
};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use tokio::runtime::Runtime;
use tokio::runtime::{EnterGuard, Handle, Runtime};

use rustic_core::{FileType, Id, ReadBackend, WriteBackend, ALL_FILE_TYPES};
use rustic_core::{
AsyncReadBackend, AsyncWriteBackend, FileType, Id, ReadBackend, WriteBackend, ALL_FILE_TYPES,
};

mod constants {
/// Default number of retries
pub(super) const DEFAULT_RETRY: usize = 5;
}

/// `OpenDALBackend` contains a wrapper around an blocking operator of the `OpenDAL` library.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct OpenDALBackend {
operator: BlockingOperator,
}

/// Async implementation of [OpenDALBackend].
#[derive(Debug)]
pub struct AsyncOpenDALBackend {
operator: Operator,
}

fn runtime() -> &'static Runtime {
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
Expand Down Expand Up @@ -59,7 +68,226 @@ impl FromStr for Throttle {
}
}

impl AsyncOpenDALBackend {
// TODO - factorize code with OpenDALBackend::new()
pub fn new(path: impl AsRef<str>, options: HashMap<String, String>) -> Result<Self> {
let max_retries = match options.get("retry").map(String::as_str) {
Some("false" | "off") => 0,
None | Some("default") => constants::DEFAULT_RETRY,
Some(value) => usize::from_str(value)?,
};
let connections = options
.get("connections")
.map(|c| usize::from_str(c))
.transpose()?;

let throttle = options
.get("throttle")
.map(|t| Throttle::from_str(t))
.transpose()?;

let schema = Scheme::from_str(path.as_ref())?;
let mut operator = Operator::via_iter(schema, options)?
.layer(RetryLayer::new().with_max_times(max_retries).with_jitter());

if let Some(Throttle { bandwidth, burst }) = throttle {
operator = operator.layer(ThrottleLayer::new(bandwidth, burst));
}

if let Some(connections) = connections {
operator = operator.layer(ConcurrentLimitLayer::new(connections));
}

Ok(Self { operator })
}

/// Return a path for the given file type and id.
///
/// # Arguments
///
/// * `tpe` - The type of the file.
/// * `id` - The id of the file.
///
/// # Returns
///
/// The path for the given file type and id.
// Let's keep this for now, as it's being used in the trait implementations.
#[allow(clippy::unused_self)]
fn path(&self, tpe: FileType, id: &Id) -> String {
let hex_id = id.to_hex();
match tpe {
FileType::Config => PathBuf::from("config"),
FileType::Pack => PathBuf::from("data").join(&hex_id[0..2]).join(hex_id),
_ => PathBuf::from(tpe.dirname()).join(hex_id),
}
.to_string_lossy()
.to_string()
}
}

#[async_trait]
impl AsyncReadBackend for AsyncOpenDALBackend {
/// Returns the location of the backend.
///
/// This is `local:<path>`.
fn location(&self) -> String {
let mut location = "opendal:".to_string();
location.push_str(self.operator.info().name());
location
}

/// Lists all files of the given type.
///
/// # Arguments
///
/// * `tpe` - The type of the files to list.
///
/// # Notes
///
/// If the file type is `FileType::Config`, this will return a list with a single default id.
async fn list(&self, tpe: FileType) -> Result<Vec<Id>> {
trace!("listing tpe: {tpe:?}");
if tpe == FileType::Config {
return Ok(if self.operator.is_exist("config").await? {
vec![Id::default()]
} else {
Vec::new()
});
}

Ok(self
.operator
.list_with(&(tpe.dirname().to_string() + "/"))
.recursive(true)
.await?
.into_iter()
.filter(|e| e.metadata().is_file())
.map(|e| Id::from_hex(e.name()))
.filter_map(Result::ok)
.collect())
}

/// Lists all files with their size of the given type.
///
/// # Arguments
///
/// * `tpe` - The type of the files to list.
///
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
trace!("listing tpe: {tpe:?}");
if tpe == FileType::Config {
return match self.operator.stat("config").await {
Ok(entry) => Ok(vec![(Id::default(), entry.content_length().try_into()?)]),
Err(err) if err.kind() == ErrorKind::NotFound => Ok(Vec::new()),
Err(err) => Err(err.into()),
};
}

Ok(self
.operator
.list_with(&(tpe.dirname().to_string() + "/"))
.recursive(true)
.metakey(Metakey::ContentLength)
.await?
.into_iter()
.filter(|e| e.metadata().is_file())
.map(|e| -> Result<(Id, u32)> {
Ok((
Id::from_hex(e.name())?,
e.metadata().content_length().try_into()?,
))
})
.filter_map(Result::ok)
.collect())
}

async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
trace!("reading tpe: {tpe:?}, id: {id}");

Ok(self.operator.read(&self.path(tpe, id)).await?.to_bytes())
}

async fn read_partial(
&self,
tpe: FileType,
id: &Id,
_cacheable: bool,
offset: u32,
length: u32,
) -> Result<Bytes> {
trace!("reading tpe: {tpe:?}, id: {id}, offset: {offset}, length: {length}");
let range = u64::from(offset)..u64::from(offset + length);
Ok(self
.operator
.read_with(&self.path(tpe, id))
.range(range)
.await?
.to_bytes())
}
}

#[async_trait]
impl AsyncWriteBackend for AsyncOpenDALBackend {
async fn create(&self) -> Result<()> {
trace!("creating repo at {:?}", self.location());

for tpe in ALL_FILE_TYPES {
self.operator
.create_dir(&(tpe.dirname().to_string() + "/"))
.await?;
}

// TODO - use futures::stream::Stream;
for i in 0u8..=255 {
self.operator
.create_dir(
&(PathBuf::from("data")
.join(hex::encode([i]))
.to_string_lossy()
.to_string()
+ "/"),
)
.await?
}

Ok(())
}

async fn write_bytes(
&self,
tpe: FileType,
id: &Id,
_cacheable: bool,
buf: Bytes,
) -> Result<()> {
trace!("writing tpe: {:?}, id: {}", &tpe, &id);
let filename = self.path(tpe, id);
self.operator.write(&filename, buf).await?;
Ok(())
}

/// Remove the given file.
///
/// # Arguments
///
/// * `tpe` - The type of the file.
/// * `id` - The id of the file.
/// * `cacheable` - Whether the file is cacheable.
async fn remove(&self, tpe: FileType, id: &Id, _cacheable: bool) -> Result<()> {
trace!("removing tpe: {:?}, id: {}", &tpe, &id);
let filename = self.path(tpe, id);
self.operator.delete(&filename).await?;
Ok(())
}
}

impl OpenDALBackend {
/// TODO have some shared trait with such a method
/// otherwise the knowledge of this async safety could be in this match method in choose.rs
fn safe_in_async_context() -> bool {
false
}

/// Create a new openDAL backend.
///
/// # Arguments
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ itertools = "0.13.0"
quick_cache = "0.6.2"
strum = { version = "0.26.3", features = ["derive"] }
zstd = "0.13.2"
async-trait = "0.1.81"

[target.'cfg(not(windows))'.dependencies]
sha2 = { version = "0.10.8", features = ["asm"] }
Expand Down
Loading

0 comments on commit 46a8f9b

Please sign in to comment.