Skip to content

Commit

Permalink
add fatbuffer tonic demo
Browse files Browse the repository at this point in the history
  • Loading branch information
zealchen committed Apr 26, 2024
1 parent 0bfe53b commit 7b4549e
Show file tree
Hide file tree
Showing 12 changed files with 1,440 additions and 0 deletions.
26 changes: 26 additions & 0 deletions fb_tonic_demo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "fb_tonic_demo"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bytes = "1.6.0"
flatbuffers = "24.3.25"
prost = "0.12.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.116"
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] }
tonic = "0.11.0"

[build-dependencies]
tonic-build = "0.11.0"

[[bin]]
name = "server"
path = "src/server.rs"

[[bin]]
name = "client"
path = "src/client.rs"
26 changes: 26 additions & 0 deletions fb_tonic_demo/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::io::Result;

fn main() -> Result<()> {
// tonic_build::configure()
// .out_dir("src/util")
// .compile(&["src/util/service.proto"], &["src/util"])?;
// Ok(())
let greeter_service = tonic_build::manual::Service::builder()
.name("Greeter")
.package("fb.helloworld")
.method(
tonic_build::manual::Method::builder()
.name("say_hello")
.route_name("SayHello")
.input_type("crate::util::common::FlatBuffersObject<'static>")
.output_type("crate::util::common::FlatBuffersObject<'static>")
.codec_path("crate::util::common::FlatBufferCodec")
.build(),
)
.build();

tonic_build::manual::Builder::new()
.out_dir("src/util")
.compile(&[greeter_service]);
Ok(())
}
72 changes: 72 additions & 0 deletions fb_tonic_demo/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// // main.rs (客户端)
// extern crate flatbuffers;
// use tonic::Request;
// mod util;

// #[tokio::main]
// async fn main() -> Result<(), Box<dyn std::error::Error>> {
// // 连接到服务端
// let mut client = util::MonsterServiceClient::connect("http://[::1]:8000").await?;

// // 创建请求
// let request = tonic::Request::new(util::GetMonsterRequest { id: 123 });

// // 发起请求并等待响应
// let response = client.get_monster(request).await?;

// // 获取 MonsterResponse 中的 monster_data 字段
// let monster_data_bytes = response.get_ref().monster_data.as_slice();

// // 使用 flatbuffers 的逻辑来解析字节数据
// // let monster = match flatbuffers::root::<util::Monster>(monster_data_bytes) {
// // Ok(monster): {},
// // Error(_): {}
// // };
// if let Ok(monster) = flatbuffers::root::<util::Monster>(monster_data_bytes) {
// println!("Received Monster with ID: {}", monster.id());
// println!("Received Monster with Name: {:?}", monster.name());
// } else {
// println!("bad ass.");
// }

// // 处理 monster

// Ok(())
// }

pub mod util;
use util::common::FlatBuffersObject;
use util::fbperson_generated::fbdemo::FBPerson;
use util::fbperson_generated::fbdemo::FBPersonArgs;

pub mod hello_world {
include!("util/fb.helloworld.Greeter.rs");
}
use hello_world::greeter_client::GreeterClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = GreeterClient::connect("http://[::1]:50051").await?;

let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024);

let name = builder.create_string("from_sender");

let data: flatbuffers::WIPOffset<FBPerson<'_>> = FBPerson::create(
&mut builder,
&FBPersonArgs {
name: Some(name),
age: 11,
pets: None,
},
);
let req = FlatBuffersObject::new(data, builder);

let request = tonic::Request::new(req);

let response = client.say_hello(request).await?;

// println!("RESPONSE={:?}", response);

Ok(())
}
102 changes: 102 additions & 0 deletions fb_tonic_demo/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// use flatbuffers::FlatBufferBuilder;
// use tonic::{transport::Server, Request, Response, Status};
// mod util;

// #[derive(Debug, Default)]
// struct MyMonsterService;

// #[tonic::async_trait]
// impl util::monster_service_server::MonsterService for MyMonsterService {
// async fn get_monster(
// &self,
// request: Request<util::GetMonsterRequest>,
// ) -> Result<Response<util::MonsterResponse>, Status> {
// let mut builder = FlatBufferBuilder::new();
// let name = builder.create_string("bob");
// let data: flatbuffers::WIPOffset<util::Monster<'_>> = util::Monster::create(
// &mut builder,
// &util::MonsterArgs {
// name: Some(name),
// id: 11,
// },
// );
// builder.finish(data, None);
// let monster_data_bytes = builder.finished_data();

// Ok(Response::new(util::MonsterResponse {
// monster_data: monster_data_bytes.to_vec(),
// }))
// }
// }

// #[tokio::main]
// async fn main() -> Result<(), Box<dyn std::error::Error>> {
// let address = "[::1]:8000".parse().unwrap();
// let voting_service = MyMonsterService::default();

// Server::builder()
// .add_service(util::MonsterServiceServer::new(voting_service))
// .serve(address)
// .await?;
// Ok(())
// }

use tonic::{transport::Server, Request, Response, Status};

pub mod util;
use util::common::FlatBuffersObject;
use util::fbperson_generated::fbdemo::FBPerson;
use util::fbperson_generated::fbdemo::FBPersonArgs;

pub mod hello_world {
include!("util/fb.helloworld.Greeter.rs");
}
use hello_world::greeter_server::{Greeter, GreeterServer};

#[derive(Default)]
pub struct MyGreeter {}

#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<FlatBuffersObject<'static>>,
) -> Result<Response<FlatBuffersObject<'static>>, Status> {
// println!("Got a request from {:?}", request.remote_addr());

let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024);

let name = builder.create_string("from_receiver");

let data: flatbuffers::WIPOffset<FBPerson<'_>> = FBPerson::create(
&mut builder,
&FBPersonArgs {
name: Some(name),
age: 11,
pets: None,
},
);

let resp: FlatBuffersObject<'_> = FlatBuffersObject::new(data, builder);

// let reply = HelloResponse {
// message: format!("Hello {}!", request.into_inner().name),
// };
Ok(Response::new(resp))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();

println!("GreeterServer listening on {}", addr);

Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;

Ok(())
}
113 changes: 113 additions & 0 deletions fb_tonic_demo/src/util/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! This module defines common request/response types as well as the JsonCodec that is used by the
//! json.helloworld.Greeter service which is defined manually (instead of via proto files) by the
//! `build_json_codec_service` function in the `examples/build.rs` file.
extern crate flatbuffers;
use crate::util::fbperson_generated::fbdemo::FBPerson;
use bytes::{Buf, BufMut};
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use tonic::{
codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder},
Status,
};

#[derive(Debug, Deserialize, Serialize)]
pub struct HelloRequest {
pub name: String,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct HelloResponse {
pub message: String,
}

pub trait FlatBufferSerializable {
fn serialize(&self) -> &[u8];
}

pub struct FlatBuffersObject<'a> {
data: flatbuffers::WIPOffset<FBPerson<'a>>,
builder: flatbuffers::FlatBufferBuilder<'a>,
}

impl<'a> FlatBuffersObject<'a> {
pub fn new(
data: flatbuffers::WIPOffset<FBPerson<'a>>,
builder: flatbuffers::FlatBufferBuilder<'a>,
) -> Self {
Self { data, builder }
}
}

impl FlatBufferSerializable for FlatBuffersObject<'_> {
fn serialize(&self) -> &[u8] {
self.builder.finish(self.data, None);
let buf = self.builder.finished_data();
buf
}
}

#[derive(Debug)]
pub struct FlatBufferEncoder<T>(PhantomData<T>);

impl<T: FlatBufferSerializable> Encoder for FlatBufferEncoder<T> {
type Item = T;
type Error = Status;

fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
let out = item.serialize();
buf.put_slice(out);
Ok(())
//serde_json::to_writer(buf.writer(), &item).map_err(|e| Status::internal(e.to_string()))
}
}

#[derive(Debug)]
pub struct FlatBufferDecoder<U: 'static>(PhantomData<&'static U>);

impl<U: 'static + flatbuffers::Follow<'static> + flatbuffers::Verifiable> Decoder
for FlatBufferDecoder<U>
{
type Item = U::Inner;
type Error = Status;

fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
if !buf.has_remaining() {
return Ok(None);
}

let item =
flatbuffers::root::<U>(buf.chunk()).map_err(|e| Status::internal(e.to_string()))?;
Ok(Some(item))
}
}

/// A [`Codec`] that implements `application/grpc+json` via the serde library.
#[derive(Debug, Clone)]
pub struct FlatBufferCodec<T, U: 'static>(PhantomData<(T, &'static U)>);

impl<T, U> Default for FlatBufferCodec<T, U> {
fn default() -> Self {
Self(PhantomData)
}
}

impl<T, U> Codec for FlatBufferCodec<T, U>
where
T: FlatBufferSerializable + Send + 'static,
U: flatbuffers::Follow<'static> + flatbuffers::Verifiable + Sync + Send + 'static,
U::Inner: Send + 'static,
{
type Encode = T;
type Decode = U::Inner;
type Encoder = FlatBufferEncoder<T>;
type Decoder = FlatBufferDecoder<U>;

fn encoder(&mut self) -> Self::Encoder {
FlatBufferEncoder(PhantomData)
}

fn decoder(&mut self) -> Self::Decoder {
FlatBufferDecoder(PhantomData)
}
}
Loading

0 comments on commit 7b4549e

Please sign in to comment.