Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flatbuffer tonic demo #3

Merged
merged 21 commits into from
May 8, 2024
18 changes: 14 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
[package]
[workspace.package]
name = "serialization-benchmark-rs"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[workspace]
resolver = "2"
# In alphabetical order
members = [
"fb_tonic_demo",
]

[dependencies]
[workspace.dependencies]
bytes = "1.6.0"
clap = { version = "4.5.4", features = ["derive", "env"] }
flatbuffers = "24.3.25"
fury = { git= "https://github.com/apache/incubator-fury.git", branch = "main" }
lazy_static = "1.4.0"
prettytable-rs = "0.10.0"
prost = "0.12.4"
sysinfo = "0.30.11"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.116"
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] }
tonic = "0.11.0"

[build-dependencies]
[workspace.build-dependencies]
tonic-build = "0.11.0"
32 changes: 27 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,44 @@ Benchmark for serialization in Rust, https://github.com/apache/incubator-horaedb

# Usage
```sh
%> serialization-benchmark-rs --help
[]# serialization-benchmark-rs --help
Usage: serialization-benchmark-rs [OPTIONS]

Options:
--batch-size <BATCH_SIZE> [env: BATCH_SIZE=] [default: 1000000]
--unsafe use unsafe feature within flatbuffer [env: UNSAFE=]
-h, --help Print help
-V, --version Print version


%> cargo run
[]# ./serialization-benchmark-rs
Benchmark test, batch_size=1000000, result:

+------------+----------------+------------------+-------------+
| name | serialize time | deserialize time | cpu_utility |
| flatbuffer | 0.32837144(s) | 1.5406052(s) | 11.836201 |
| fury | 2.7501004(s) | 2.738932(s) | 20.819456 |
| protobuf | 1.4705708(s) | 2.3939092(s) | 20.486391 |
| flatbuffer | 0.049742587(s) | 0.2990468(s) | 27.905426 |
| fury | 0.4078921(s) | 0.31441632(s) | 26.46583 |
| protobuf | 0.22009465(s) | 0.3065254(s) | 28.004875 |
+------------+----------------+------------------+-------------+

[]# ./serialization-benchmark-rs --batch-size 5000000
Benchmark test, batch_size=5000000, result:

+------------+----------------+------------------+-------------+
| name | serialize time | deserialize time | cpu_utility |
| flatbuffer | 0.21562137(s) | 1.4061759(s) | 29.822226 |
| fury | 2.0300736(s) | 1.6155167(s) | 25.778494 |
| protobuf | 1.065279(s) | 1.5093864(s) | 25.547447 |
+------------+----------------+------------------+-------------+

[]# ./serialization-benchmark-rs --enable-unsafe
Benchmark test, batch_size=1000000, result:

+------------+----------------+------------------+-------------+
| name | serialize time | deserialize time | cpu_utility |
| flatbuffer | 0.055235952(s) | 0.02805682(s) | 29.246042 |
| fury | 0.4071525(s) | 0.31315032(s) | 26.696833 |
| protobuf | 0.216035(s) | 0.30267346(s) | 25.301205 |
+------------+----------------+------------------+-------------+
```

26 changes: 26 additions & 0 deletions fb_tonic_demo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "fb_tonic_demo"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bytes = { workspace = true }
flatbuffers = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }

[build-dependencies]
tonic-build = "0.11.0"

[[bin]]
name = "server"
path = "src/server.rs"

[[bin]]
name = "client"
path = "src/client.rs"
6 changes: 6 additions & 0 deletions fb_tonic_demo/READEME.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# How to run flatbuffer tonic demo?

```sh
cargo run --bin server
cargo run --bin client
```
22 changes: 22 additions & 0 deletions fb_tonic_demo/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::io::Result;

fn main() -> Result<()> {
let greeter_service = tonic_build::manual::Service::builder()
.name("Greeter")
.package("fb.helloworld")
.method(
tonic_build::manual::Method::builder()
.name("say_hello")
.route_name("SayHello")
.input_type("crate::util::common::FlatBufferBytes")
.output_type("crate::util::common::FlatBufferBytes")
.codec_path("crate::util::common::FlatBufferCodec")
.build(),
)
.build();

tonic_build::manual::Builder::new()
.out_dir("src/util")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种构建生成的一般不放到 git 里面,一般都是 target 里面,我们之前生成 fb 代码是这么写的:

    println!("cargo:rerun-if-changed=fbs/prom_write.fbs");
    flatc_rust::run(flatc_rust::Args {
        inputs: &[Path::new("fbs/prom_write.fbs")],
        out_dir: Path::new("target/flatbuffers/"),
        ..Default::default()
    })
    .expect("flatc");

然后在 lib.rs 里这样写

#[path = "../target/flatbuffers/prom_write_generated.rs"]
mod prom_write;

pub use prom_write::*;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best Practice. Done!

.compile(&[greeter_service]);
Ok(())
}
42 changes: 42 additions & 0 deletions fb_tonic_demo/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#![allow(unused)]
pub mod util;
use util::common::FlatBufferBytes;
use util::fbgreeting_generated::fbdemo::Greetings;
use util::fbgreeting_generated::fbdemo::GreetingsArgs;

pub mod hello_world {
include!("util/fb.helloworld.Greeter.rs");
}
use hello_world::greeter_client::GreeterClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = GreeterClient::connect("http://[::1]:50051").await?;

let mut builder: flatbuffers::FlatBufferBuilder<'_> =
flatbuffers::FlatBufferBuilder::with_capacity(1024);

let name = builder.create_string("Alice");
let words = builder.create_string("Hello~~");

let root_offset: flatbuffers::WIPOffset<Greetings<'_>> = Greetings::create(
&mut builder,
&GreetingsArgs {
name: Some(name),
words: Some(words),
},
);
let request = tonic::Request::new(FlatBufferBytes::serialize(builder, root_offset));
let response = client.say_hello(request).await?;

let r = response.into_inner();
if let Ok(greetings) = r.deserialize::<Greetings>() {
println!(
"Greetings from {:?}: {:?}",
greetings.name().unwrap(),
greetings.words().unwrap()
);
}

Ok(())
}
61 changes: 61 additions & 0 deletions fb_tonic_demo/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#![allow(unused)]
use tonic::{transport::Server, Request, Response, Status};

pub mod util;
use util::common::FlatBufferBytes;
use util::fbgreeting_generated::fbdemo::Greetings;
use util::fbgreeting_generated::fbdemo::GreetingsArgs;

pub mod hello_world {
include!("util/fb.helloworld.Greeter.rs");
}
use hello_world::greeter_server::{Greeter, GreeterServer};

#[derive(Default)]
pub struct MyGreeter {}

#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<FlatBufferBytes>,
) -> Result<Response<FlatBufferBytes>, Status> {
let req = request.into_inner();
if let Ok(greetings) = req.deserialize::<Greetings>() {
println!(
"Greetings from {:?}: {:?}",
greetings.name().unwrap(),
greetings.words().unwrap()
);
}

let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024);
let name = builder.create_string("bob");
let words = builder.create_string("world~~");

let root_offset: flatbuffers::WIPOffset<Greetings<'_>> = Greetings::create(
&mut builder,
&GreetingsArgs {
name: Some(name),
words: Some(words),
},
);
let resp = FlatBufferBytes::serialize(builder, root_offset);
Ok(Response::new(resp))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();

println!("GreeterServer listening on {}", addr);

Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;

Ok(())
}
98 changes: 98 additions & 0 deletions fb_tonic_demo/src/util/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! This module defines common request/response types as well as the JsonCodec that is used by the
//! json.helloworld.Greeter service which is defined manually (instead of via proto files) by the
//! `build_json_codec_service` function in the `examples/build.rs` file.
extern crate flatbuffers;
use bytes::{Buf, BufMut};
use std::io::Read;
use tonic::{
codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder},
Status,
};

// TODO: Any better solutions to manage the flatbuffer objects?
// As the associated type Encode / Decode of Trait Codec has a 'static lifetime bound which means
// items been encoded or decoded shall not have any non static references.
// However flatbuffer related types always have a 'fbb lifetime bound, I found no way to implement
// something like serde do.
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
pub struct FlatBufferBytes {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里没必要单独定义这么一个结构吧?直接用 Vec<u8> 是不是也够了?拿到字节流后,上层自己去做反序列化

Copy link
Contributor Author

@zealchen zealchen May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Done.

data: Vec<u8>,
head: usize,
}

impl FlatBufferBytes {
pub fn new(data: Vec<u8>, head: usize) -> Self {
Self { data, head }
}

pub fn valid_slice(&self) -> &[u8] {
&(self.data[self.head..])
}

pub fn serialize<'buf, T: flatbuffers::Follow<'buf> + 'buf>(
mut builder: flatbuffers::FlatBufferBuilder<'buf>,
root_offset: flatbuffers::WIPOffset<T>,
) -> Self {
builder.finish(root_offset, None);
let (data, head) = builder.collapse();
Self { data, head }
}

pub fn deserialize<'buf, T: flatbuffers::Follow<'buf> + flatbuffers::Verifiable + 'buf>(
&'buf self,
) -> Result<T::Inner, Box<dyn std::error::Error>> {
let data = self.valid_slice();
flatbuffers::root::<T>(data).map_err(|x| Box::new(x) as Box<dyn std::error::Error>)
}
}

#[derive(Debug)]
pub struct FlatBufferEncoder();

impl Encoder for FlatBufferEncoder {
type Item = FlatBufferBytes;
type Error = Status;

fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
buf.put_slice(item.valid_slice());
Ok(())
}
}

#[derive(Debug)]
pub struct FlatBufferDecoder();

impl Decoder for FlatBufferDecoder {
type Item = FlatBufferBytes;
type Error = Status;

fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
if !buf.has_remaining() {
return Ok(None);
}
let mut data: Vec<u8> = Vec::new();
buf.reader()
.read_to_end(&mut data)
.map_err(|e| Status::internal(e.to_string()))?;
let item = FlatBufferBytes::new(data, 0);
Ok(Some(item))
}
}

/// A [`Codec`] that implements `application/grpc+json` via the serde library.
#[derive(Debug, Clone, Default)]
pub struct FlatBufferCodec();

impl Codec for FlatBufferCodec {
type Encode = FlatBufferBytes;
type Decode = FlatBufferBytes;
type Encoder = FlatBufferEncoder;
type Decoder = FlatBufferDecoder;

fn encoder(&mut self) -> Self::Encoder {
FlatBufferEncoder()
}

fn decoder(&mut self) -> Self::Decoder {
FlatBufferDecoder()
}
}
Loading