Skip to content

Commit

Permalink
refactor: move fusio-log to fusio
Browse files Browse the repository at this point in the history
  • Loading branch information
crwen committed Jan 20, 2025
1 parent c0db165 commit 859a6c6
Show file tree
Hide file tree
Showing 18 changed files with 892 additions and 0 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ jobs:
command: build
args: --package fusio-parquet

- name: Run cargo build on fusio-log
uses: actions-rs/cargo@v1
with:
command: build
args: --package fusio-log

- name: Run cargo test on tokio
uses: actions-rs/cargo@v1
with:
Expand All @@ -88,6 +94,12 @@ jobs:
with:
command: test
args: --package fusio --features=tokio-uring

- name: Run cargo test on fusio-log
uses: actions-rs/cargo@v1
with:
command: test
args: --package fusio-log --features=tokio
# 2
fmt:
name: Rust fmt
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

Cargo.lock
parquet/Cargo.lock
.DS_Store
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"examples/opfs",
"fusio",
"fusio-dispatch",
"fusio-log",
"fusio-object-store",
"fusio-opendal",
"fusio-parquet",
Expand Down
43 changes: 43 additions & 0 deletions fusio-log/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[package]
description = "The append only log implementations."
edition.workspace = true
license.workspace = true
name = "fusio-log"
repository.workspace = true
version.workspace = true

[features]
aws = ["fusio-dispatch/aws", "fusio/aws"]
bytes = ["dep:bytes"]
default = ["aws", "bytes", "tokio", "tokio-http"]
tokio = [
"fusio-dispatch/tokio",
"fusio/tokio",
]
tokio-http = ["fusio/tokio-http"]
web = ["fusio-dispatch/opfs", "fusio/opfs"]
web-http = ["fusio/wasm-http"]

[dependencies]
async-stream = "0.3"
bytes = { version = "1.7", optional = true }
crc32fast = "1"
fusio = { version = "0.3.4", path = "../fusio", features = [
"dyn",
"fs",
"bytes"
] }
fusio-dispatch = { version = "0.3.4", path = "../fusio-dispatch" }
futures-core = "0.3"
futures-util = "0.3"
thiserror = "2.0.3"

[dev-dependencies]
tempfile = "3"

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tokio = { version = "1", features = ["full"] }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen = "0.2.99"
wasm-bindgen-test = "0.3.49"
119 changes: 119 additions & 0 deletions fusio-log/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@

# Fusio Log
Fusio-log is an append only log library for Rust. Leveraging [fusio](https://github.com/tonbo-io/fusio), fusio-log supports various backends like disk, wasm, and S3.
You can use it in various scenarios that, such as [write-ahead logging](https://en.wikipedia.org/wiki/Write-ahead_logging), [manifest file](https://en.wikipedia.org/wiki/Manifest_file), distributed log.


## Usage

1. Define data structure.
```rust
struct User {
id: u64,
name: String,
email: Option<String>,
}
```

2. Implement `Encode` and `Decode` trait for it
3. Start to use fusio-log

```rust
#[tokio::main]
async fn main() {
let temp_dir = TempDir::new().unwrap();
let path = Path::from_filesystem_path(temp_dir.path())
.unwrap()
.child("log");

let mut logger = Options::new(path).build::<User>().await.unwrap();
logger.write(&User {id: 1, name: "fusio-log".to_string(), None}).await.unwrap();
logger.write_batch([
User {id: 2, name: "fusio".to_string(), "[email protected]"}
User {id: 3, name: "tonbo".to_string(), None}
].into_iter()).await.unwrap();

logger.flush().await.unwrap();
logger.close().await.unwrap();

}
```

Recover from log file:
```rust
let stream = Options::new(path)
.recover::<User>()
.await
.unwrap();
while let Ok(users) = stream.try_next().await {
match users {
Some(users) => {
for user in users {
println("{}" user.id)
}
};
None => println();
}
}
```

### Use with S3
```rust

let path = Path::from_url_path("log").unwrap();
let option = Options::new(path).fs(FsOptions::S3 {
bucket: "data".to_string(),
credential: Some(fusio::remotes::aws::AwsCredential {
key_id: "key_id".to_string(),
secret_key: "secret_key".to_string(),
token: None,
}),
endpoint: None,
region: Some("region".to_string()),
sign_payload: None,
checksum: None,
});

let mut logger = option.build::<User>().await.unwrap();
logger
.write(&User {
id: 100,
name: "Name".to_string(),
email: None,
})
.await
.unwrap();
```

### Use in Wasm

Please make sure disable default features and enable `web` feature.

```toml
fusio-log = {git = "https://github.com/tonbo-io/fusio-log", default-features = false, features = ["bytes", "web"]}
```

Then, use `Path::from_opfs_path` to replace `Path::from_filesystem_path`
```rust
let temp_dir = TempDir::new().unwrap();
let path = Path::from_opfs_path(temp_dir.path())
.unwrap()
.child("log");

let mut logger = Options::new(path).build::<User>().await.unwrap();
logger.write(&User {id: 1, name: "fusio-log".to_string(), None}).await.unwrap();
```

## Build
### Build in Rust
```sh
cargo build
```

### Build in Wasm

Build with [wasm-pack](https://github.com/rustwasm/wasm-pack)

```sh
wasm-pack build --no-default-features --features web,bytes
```
Empty file added fusio-log/src/error.rs
Empty file.
117 changes: 117 additions & 0 deletions fusio-log/src/fs/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use std::{future::Future, hash::Hasher};

use fusio::{Error, IoBuf, IoBufMut, MaybeSend, SeqRead, Write};

use crate::{
error::{parse_fusio_error, LogError},
serdes::{Decode, Encode},
};

pub(crate) struct HashWriter<W: Write> {
hasher: crc32fast::Hasher,
writer: W,
}

impl<W: Write + Unpin> HashWriter<W> {
pub(crate) fn new(writer: W) -> Self {
Self {
hasher: crc32fast::Hasher::new(),
writer,
}
}

pub(crate) async fn eol(mut self) -> Result<(), fusio::Error> {
let i = self.hasher.finalize();
i.encode(&mut self.writer).await
}
}

impl<W: Write> Write for HashWriter<W> {
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
let (result, buf) = self.writer.write_all(buf).await;
self.hasher.write(buf.as_slice());

(result, buf)
}

fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + MaybeSend {
self.writer.flush()
}

fn close(&mut self) -> impl Future<Output = Result<(), fusio::Error>> + MaybeSend {
self.writer.close()
}
}

pub(crate) struct HashReader<R: SeqRead> {
hasher: crc32fast::Hasher,
reader: R,
pos: u64,
}

impl<R: SeqRead> HashReader<R> {
pub(crate) fn new(reader: R) -> Self {
Self {
hasher: crc32fast::Hasher::new(),
reader,
pos: 0,
}
}

pub(crate) async fn checksum(mut self) -> Result<bool, LogError> {
let checksum = u32::decode(&mut self.reader)
.await
.map_err(parse_fusio_error)?;

self.pos += size_of::<u32>() as u64;
Ok(self.hasher.finalize() == checksum)
}

pub(crate) fn position(&self) -> u64 {
self.pos
}
}

impl<R: SeqRead> SeqRead for HashReader<R> {
async fn read_exact<B: IoBufMut>(&mut self, buf: B) -> (Result<(), Error>, B) {
let (result, buf) = self.reader.read_exact(buf).await;
self.pos += buf.bytes_init() as u64;
if result.is_ok() {
self.hasher.write(buf.as_slice());
}
(result, buf)
}
}

#[cfg(test)]
pub(crate) mod tests {
use std::io::Cursor;

use tokio::io::AsyncSeekExt;

use crate::{
fs::hash::{HashReader, HashWriter},
serdes::{Decode, Encode},
};

#[tokio::test]
async fn test_encode_decode() {
let mut bytes = Vec::new();
let mut cursor = Cursor::new(&mut bytes);

let mut writer = HashWriter::new(&mut cursor);
4_u64.encode(&mut writer).await.unwrap();
3_u32.encode(&mut writer).await.unwrap();
2_u16.encode(&mut writer).await.unwrap();
1_u8.encode(&mut writer).await.unwrap();
writer.eol().await.unwrap();

cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap();
let mut reader = HashReader::new(&mut cursor);
assert_eq!(u64::decode(&mut reader).await.unwrap(), 4);
assert_eq!(u32::decode(&mut reader).await.unwrap(), 3);
assert_eq!(u16::decode(&mut reader).await.unwrap(), 2);
assert_eq!(u8::decode(&mut reader).await.unwrap(), 1);
assert!(reader.checksum().await.unwrap());
}
}
Empty file added fusio-log/src/fs/mod.rs
Empty file.
Loading

0 comments on commit 859a6c6

Please sign in to comment.