Skip to content

Commit

Permalink
Add support for listening on vsock to oak_functions_containers_app
Browse files Browse the repository at this point in the history
  • Loading branch information
andrisaar committed Mar 7, 2024
1 parent e214b20 commit a2b3065
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 30 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,6 @@ prost-types = "*"
tokio = "*"
tonic = "*"
tonic-build = { version = "*", default-features = false }

[patch.crates-io]
tokio-vsock = { path = "third_party/tokio-vsock" }
1 change: 1 addition & 0 deletions oak_functions_containers_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tempfile = { version = "*", optional = true }
tikv-jemallocator = "*"
tokio = { version = "*", features = ["rt-multi-thread", "macros", "sync"] }
tokio-stream = { version = "*", features = ["net"] }
tokio-vsock = { version = "*", features = ["tonic-conn"] }
tonic = { workspace = true, features = ["gzip"] }
tower = { version = "*", features = ["load-shed"] }
tower-http = { version = "*", features = ["trace"] }
Expand Down
117 changes: 87 additions & 30 deletions oak_functions_containers_app/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// limitations under the License.

use std::{
error::Error,
fmt::Display,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
Expand All @@ -23,20 +25,31 @@ use anyhow::{anyhow, Context};
use clap::Parser;
use oak_containers_orchestrator::launcher_client::LauncherClient;
use oak_containers_sdk::{InstanceEncryptionKeyHandle, OrchestratorClient};
use oak_crypto::encryption_key::AsyncEncryptionKeyHandle;
#[cfg(feature = "native")]
use oak_functions_containers_app::native_handler::NativeHandler;
use oak_functions_containers_app::serve;
use oak_functions_containers_app::serve as app_serve;
use oak_functions_service::{
proto::oak::functions::config::{
application_config::CommunicationChannel, ApplicationConfig, HandlerType,
TcpCommunicationChannel,
},
wasm::wasmtime::WasmtimeHandler,
};
use opentelemetry::{global::set_error_handler, metrics::MeterProvider, KeyValue};
use opentelemetry::{
global::set_error_handler,
metrics::{Meter, MeterProvider},
KeyValue,
};
use prost::Message;
use tokio::{net::TcpListener, runtime::Handle};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
runtime::Handle,
};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_vsock::{VsockAddr, VsockListener};
use tonic::transport::server::Connected;

const OAK_FUNCTIONS_CONTAINERS_APP_PORT: u16 = 8080;

Expand All @@ -49,6 +62,40 @@ struct Args {
launcher_addr: String,
}

async fn serve<S>(
addr: S,
handler_type: HandlerType,
stream: Box<
dyn tokio_stream::Stream<
Item = Result<
impl Connected + AsyncRead + AsyncWrite + Send + Unpin + 'static,
impl Error + Send + Sync + 'static,
>,
> + Send
+ Unpin,
>,
encryption_key_handle: Box<dyn AsyncEncryptionKeyHandle + Send + Sync>,
meter: Meter,
) -> anyhow::Result<()>
where
S: Display,
{
eprintln!("Running Oak Functions on Oak Containers at address: {addr}");

match handler_type {
HandlerType::HandlerUnspecified | HandlerType::HandlerWasm => {
app_serve::<WasmtimeHandler>(stream, encryption_key_handle, meter).await
}
HandlerType::HandlerNative => {
if cfg!(feature = "native") {
app_serve::<NativeHandler>(stream, encryption_key_handle, meter).await
} else {
panic!("Application config specified `native` handler type, but this binary does not support that feature");
}
}
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
Expand Down Expand Up @@ -159,40 +206,50 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};

let default_channel = CommunicationChannel::TcpChannel(TcpCommunicationChannel::default());
let communication_config = application_config
.communication_channel
.as_ref()
.unwrap_or(&default_channel);

let (addr, stream) = match communication_config {
CommunicationChannel::TcpChannel(config) => {
let mut config = config.clone();
if config.port == 0 {
config.port = OAK_FUNCTIONS_CONTAINERS_APP_PORT.into();
}
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), config.port.try_into()?);
let listener = TcpListener::bind(addr).await?;
(addr, Box::new(TcpListenerStream::new(listener)))
}
};

let server_handle = tokio::spawn(async move {
match application_config.handler_type() {
HandlerType::HandlerUnspecified | HandlerType::HandlerWasm => {
serve::<WasmtimeHandler>(stream, encryption_key_handle, meter).await
let default_channel = CommunicationChannel::TcpChannel(TcpCommunicationChannel::default());
let communication_config = application_config
.communication_channel
.as_ref()
.unwrap_or(&default_channel);

match communication_config {
CommunicationChannel::TcpChannel(config) => {
let mut config = config.clone();
if config.port == 0 {
config.port = OAK_FUNCTIONS_CONTAINERS_APP_PORT.into();
}
let addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), config.port.try_into()?);
let listener = TcpListener::bind(addr).await?;
serve(
addr,
application_config.handler_type(),
Box::new(TcpListenerStream::new(listener)),
encryption_key_handle,
meter,
)
.await
}
HandlerType::HandlerNative => {
if cfg!(feature = "native") {
serve::<NativeHandler>(stream, encryption_key_handle, meter).await
} else {
panic!("Application config specified `native` handler type, but this binary does not support that feature");
CommunicationChannel::VsockChannel(config) => {
let mut config = config.clone();
if config.port == 0 {
config.port = OAK_FUNCTIONS_CONTAINERS_APP_PORT.into();
}
let addr = VsockAddr::new(tokio_vsock::VMADDR_CID_ANY, config.port);
let listener = VsockListener::bind(addr)?;
serve(
addr,
application_config.handler_type(),
Box::new(listener.incoming()),
encryption_key_handle,
meter,
)
.await
}
}
});

eprintln!("Running Oak Functions on Oak Containers at address: {addr}");
client
.notify_app_ready()
.await
Expand Down
6 changes: 6 additions & 0 deletions proto/oak_functions/application_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ message TcpCommunicationChannel {
uint32 port = 1;
}

message VsockCommunicationChannel {
// Port to listen on. If not specified, defaults to 8080.
uint32 port = 1;
}

message ApplicationConfig {
// How to load the provided module.
HandlerType handler_type = 1;
Expand All @@ -45,5 +50,6 @@ message ApplicationConfig {
// - on Oak Containers, if not specified, the default communication channel is TCP.
oneof communication_channel {
TcpCommunicationChannel tcp_channel = 2;
VsockCommunicationChannel vsock_channel = 3;
}
}

0 comments on commit a2b3065

Please sign in to comment.