Skip to content

Commit

Permalink
init rucat engine
Browse files Browse the repository at this point in the history
Signed-off-by: remzi <[email protected]>
  • Loading branch information
HaoYang670 committed Jun 23, 2024
1 parent 9da4746 commit bb4b3a7
Show file tree
Hide file tree
Showing 21 changed files with 195 additions and 21 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[workspace]
resolver = "2"

members = [
"rucat_server",
"rucat_common"
"rucat_common",
"rucat_engine"
]
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Cluster driver will split the `Task` into several `subTasks` after receiving it

## TODO

Add rdd tests
what happens if get_first_parent returns None?
K8s support for rucat manager
Ballista deployment on K8s
Ballista deployment on K8s
surrealdb, support local process mode for rucat engine to connect
support specifying the path of rucat engine binary
7 changes: 6 additions & 1 deletion rucat_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ edition = "2021"
[dependencies]
surrealdb = {"version" = "1.4.2"}
axum = {"version" = "0.7.5"}
anyhow = {"version" = "1.0.83"}
anyhow = {"version" = "1.0.83"}
tonic = "0.11.0"
prost = "0.12.6"

[build-dependencies]
tonic-build = "0.11.0"
4 changes: 4 additions & 0 deletions rucat_common/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/engine.proto")?;
Ok(())
}
14 changes: 14 additions & 0 deletions rucat_common/proto/engine.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";
package engine_grpc;

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}
9 changes: 7 additions & 2 deletions rucat_common/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, string::FromUtf8Error};

use axum::{
http::StatusCode,
Expand All @@ -17,6 +17,7 @@ pub enum RucatError {
NotAllowedError(String),
IOError(String),
DataStoreError(String),
FailedToStartEngine(String),
Other(String),
}

Expand All @@ -25,11 +26,14 @@ impl Display for RucatError {
// TODO: rewrite this in macro
match self {
Self::IllegalArgument(msg) => write!(f, "Illegal Argument error: {}", msg),
Self::NotFoundError(engine_id) => write!(f, "Not found error: engine {} not found.", engine_id),
Self::NotFoundError(engine_id) => {
write!(f, "Not found error: engine {} not found.", engine_id)
}
Self::UnauthorizedError(msg) => write!(f, "Unauthorized error: {}", msg),
Self::NotAllowedError(msg) => write!(f, "Not allowed error: {}", msg),
Self::IOError(msg) => write!(f, "IO error: {}", msg),
Self::DataStoreError(msg) => write!(f, "Data store error: {}", msg),
Self::FailedToStartEngine(msg) => write!(f, "Failed to start engine: {}", msg),
Self::Other(msg) => write!(f, "Other error: {}", msg),
}
}
Expand Down Expand Up @@ -67,4 +71,5 @@ macro_rules! convert_to_rucat_error {
convert_to_rucat_error!(std::io::Error, RucatError::IOError);
convert_to_rucat_error!(surrealdb::Error, RucatError::DataStoreError);
convert_to_rucat_error!(anyhow::Error, RucatError::Other);
convert_to_rucat_error!(FromUtf8Error, RucatError::Other);
convert_to_rucat_error!(String, RucatError::Other);
6 changes: 5 additions & 1 deletion rucat_common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
pub mod error;
//! Common types and utilities for the Rucat projects.
pub mod error;
pub mod engine_grpc {
tonic::include_proto!("engine_grpc");
}
/// Unique identifier for an engine.
pub type EngineId = String;
11 changes: 11 additions & 0 deletions rucat_engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "rucat_engine"
version = "0.1.0"
edition = "2021"

[dependencies]
rucat_common = {path = "../rucat_common"}
clap = { version = "4.5.7", features = ["derive"] }
clap_derive = "4.5.5"
tonic = "0.11.0"
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
55 changes: 55 additions & 0 deletions rucat_engine/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use clap::Parser;
use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
use tonic::{transport::Server, Request, Response, Status};

use rucat_common::engine_grpc::greeter_server::{Greeter, GreeterServer};
use rucat_common::engine_grpc::{HelloReply, HelloRequest};

#[derive(Debug, Default)]
pub struct MyGreeter {}

#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
println!("Got a request: {:?}", request);

let reply = HelloReply {
message: format!("Hello {}!", request.into_inner().name),
};

Ok(Response::new(reply))
}
}

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// IPv6 address of the engine
#[arg(long, default_value_t = Ipv6Addr::LOCALHOST)]
ip: Ipv6Addr,

/// Port of the engine binding
#[arg(long)]
port: u16,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let Args { ip, port } = Args::parse();
let addr = SocketAddrV6::new(ip, port, 0, 0);
let greeter = MyGreeter::default();

println!("start from rucat engine!");

Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(SocketAddr::V6(addr))
.await?;

println!("Hello, world from rucat engine!");

Ok(())
}
3 changes: 3 additions & 0 deletions rucat_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ tower-http = { "version" = "0.5.0", features = ["trace"] }
tracing = {"version" = "0.1.40"}
tracing-subscriber = {"version" = "0.3.18"}
tokio = {"version" = "1.36.0", features = ["full"]}
tonic = "0.11.0"
clap = { version = "4.5.7", features = ["derive"] }
clap_derive = "4.5.5"

# test dependencies

Expand Down
2 changes: 0 additions & 2 deletions rucat_server/run.sh

This file was deleted.

5 changes: 5 additions & 0 deletions rucat_server/src/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! Module that contains engine rest request handlers and
//! rpc between server and engine.
pub(crate) mod router;
mod rpc;
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Restful API for engine management.
use std::fmt::{Debug, Display};

use axum::{
Expand All @@ -14,6 +16,8 @@ use serde::{Deserialize, Serialize};
use crate::state::AppState;
use EngineState::*;

use super::rpc;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) enum EngineState {
/// Engine is pending to be started.
Expand Down Expand Up @@ -73,6 +77,7 @@ async fn create_engine(
State(state): State<AppState<'_>>,
Json(body): Json<CreateEngineRequest>,
) -> Result<EngineId> {
rpc::create_engine(state.get_engine_binary_path(), 3131).await?;
state.get_data_store().add_engine(body.into()).await
}

Expand Down
41 changes: 41 additions & 0 deletions rucat_server/src/engine/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//! RPC between server and engine.
//!
//! Create a new process for the engine and start the gRPC server.
//! The engine will be listening on the given port. (localhost for now)
use rucat_common::{
engine_grpc::{greeter_client::GreeterClient, HelloRequest},
error::{Result, RucatError},
};

use tokio::process::Command;
use tracing::info;

/// Create a new process for the engine and start the gRPC server.
/// The engine will be listening on the given port. (localhost for now)
pub(super) async fn create_engine(engine_binary_path: &str, port: u16) -> Result<()> {
// Start the engine process.
Command::new(engine_binary_path)
.args(["--ip", "::1", "--port", port.to_string().as_str()])
.spawn()?;

// TODO: better way to wait for the engine to start.
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;

let mut client = GreeterClient::connect(format!("http://[::1]:{}", port))
.await
.map_err(|e| RucatError::FailedToStartEngine(e.to_string()))?;

let request = tonic::Request::new(HelloRequest {
name: "Tonic".into(),
});

let response = client
.say_hello(request)
.await
.map_err(|e| RucatError::FailedToStartEngine(e.to_string()))?;

info!("RESPONSE={:?}", response.into_inner().message);

Ok(())
}
8 changes: 4 additions & 4 deletions rucat_server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use authentication::auth;
use axum::{extract::State, middleware, routing::get, Router};
use axum_extra::middleware::option_layer;
use engine_router::get_engine_router;
use engine::router::get_engine_router;
use rucat_common::error::Result;
use state::{data_store::DataStore, AppState};
use surrealdb::{engine::local::Mem, Surreal};
use tower_http::trace::TraceLayer;

pub(crate) mod authentication;
pub(crate) mod engine_router;
pub(crate) mod engine;
pub(crate) mod state;

/// This is the only entry for users to get the rucat server.
pub async fn get_server(auth_enable: bool) -> Result<Router> {
pub async fn get_server(auth_enable: bool, engine_binary_path: String) -> Result<Router> {
let db = Surreal::new::<Mem>(()).await?;
db.use_ns("test").use_db("test").await?;

let app_state = AppState::new(DataStore::connect_embedded_db(db));
let app_state = AppState::new(DataStore::connect_embedded_db(db), engine_binary_path);

// go through the router from outer to inner
Ok(Router::new()
Expand Down
12 changes: 11 additions & 1 deletion rucat_server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
use std::net::{Ipv6Addr, SocketAddrV6};

use clap::Parser;
use rucat_common::error::Result;
use rucat_server::get_server;
use tracing::info;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// path to the engine binary (local)
#[arg(long)]
engine_binary_path: String,
}

#[tokio::main]
/// Start Rucat server
async fn main() -> Result<()> {
let Args { engine_binary_path } = Args::parse();
// setup tracing
tracing_subscriber::fmt::init();

let endpoint = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 3000, 0, 0);
let app = get_server(true).await?;
let app = get_server(true, engine_binary_path).await?;

// run it
let listener = tokio::net::TcpListener::bind(endpoint).await?;
Expand Down
2 changes: 1 addition & 1 deletion rucat_server/src/state/data_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Datastore to record engines' infomation
use crate::engine_router::{EngineInfo, EngineState};
use crate::engine::router::{EngineInfo, EngineState};
use rucat_common::{
error::{Result, RucatError},
EngineId,
Expand Down
12 changes: 10 additions & 2 deletions rucat_server/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ pub(crate) mod data_store;
#[derive(Clone)]
pub(crate) struct AppState<'a> {
data_store: DataStore<'a>,
engine_binary_path: String,
}

impl<'a> AppState<'a> {
pub(crate) fn new(data_store: DataStore<'a>) -> Self {
Self { data_store }
pub(crate) fn new(data_store: DataStore<'a>, engine_binary_path: String) -> Self {
Self {
data_store,
engine_binary_path,
}
}

pub(crate) fn get_data_store(&self) -> &DataStore {
&self.data_store
}

pub(crate) fn get_engine_binary_path(&self) -> &str {
self.engine_binary_path.as_str()
}
}
2 changes: 1 addition & 1 deletion rucat_server/tests/authentication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rucat_server::get_server;

/// server with embedded datastore and authentication enabled
async fn get_test_server() -> Result<TestServer> {
let app = get_server(true).await?;
let app = get_server(true, "../target/debug/rucat_engine".to_owned()).await?;
TestServer::new(app).map_err(|e| e.into())
}

Expand Down
4 changes: 2 additions & 2 deletions rucat_server/tests/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use serde_json::json;

/// server with embedded datastore and authentication disabled
async fn get_test_server() -> Result<TestServer> {
let app = get_server(false).await?;
let app = get_server(false, "../target/debug/rucat_engine".to_owned()).await?;
TestServer::new(app).map_err(|e| e.into())
}

/// This is a helper function to create an engine.
///
/// **DO NOT** use this function when testing corner cases in create_engine
/// **DO NOT** use this function when testing failed cases in create_engine
async fn create_engine_helper(server: &TestServer) -> TestResponse {
server
.post("/engine")
Expand Down
4 changes: 4 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# build rucat server and rucat engine
cargo build --release
# run rucat server
RUST_LOG=debug cargo run --release --bin rucat_server -- --engine-binary-path target/release/rucat_engine

0 comments on commit bb4b3a7

Please sign in to comment.