Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: version compatibility check for MonoVertex #1912

Closed
wants to merge 10 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
init version
Signed-off-by: Sidhant Kohli <sidhant.kohli@gmail.com>
kohlisid committed Aug 8, 2024
commit 7882add289d67d6b15fc96a282a5550acc1172a7
7 changes: 7 additions & 0 deletions serving/source-sink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -23,6 +23,13 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
hyper-util = "0.1.6"
tower = "0.4.13"
uuid = { version = "1.10.0", features = ["v4"] }
anyhow = "1.0.86"
serde_json = "1.0.122"
serde = { version = "1.0.204", features = ["derive"] }
lazy_static = "1.5.0"
semver = "1.0"
#pep440 = "0.2"
pep440_rs = "0.6.6"

[dev-dependencies]
tower = "0.4.13"
3 changes: 3 additions & 0 deletions serving/source-sink/src/error.rs
Original file line number Diff line number Diff line change
@@ -24,6 +24,9 @@ pub enum Error {

#[error("gRPC Error - {0}")]
GRPCError(String),

#[error("ServerInfoError Error - {0}")]
ServerInfoError(String),
}

impl From<tonic::Status> for Error {
61 changes: 42 additions & 19 deletions serving/source-sink/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use std::fs;
use std::time::Duration;

use anyhow::Context;
use serde::Deserialize;
use tokio::signal;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::info;
use tracing::{info, warn};

pub(crate) use crate::error::Error;
use crate::forwarder::Forwarder;
use crate::sink::{SinkClient, SinkConfig};
use crate::source::{SourceClient, SourceConfig};
use crate::transformer::{TransformerClient, TransformerConfig};

pub(crate) use self::error::Result;
pub(crate) use crate::error::Error;

/// SourcerSinker orchestrates data movement from the Source to the Sink via the optional SourceTransformer.
/// The forward-a-chunk executes the following in an infinite loop till a shutdown signal is received:
@@ -33,6 +36,11 @@ pub mod transformer;
pub mod forwarder;

pub mod message;

pub mod server_info;

pub mod version;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these need not be pub also mod version should be inside server_info


pub(crate) mod shared;

const TIMEOUT_IN_MS: u32 = 1000;
@@ -46,14 +54,14 @@ pub async fn run_forwarder(
transformer_config: Option<TransformerConfig>,
custom_shutdown_rx: Option<oneshot::Receiver<()>>,
) -> Result<()> {
wait_for_server_info(&source_config.server_info_file).await?;
server_info::wait_for_server_info(&source_config.server_info_file).await?;
let mut source_client = SourceClient::connect(source_config).await?;

wait_for_server_info(&sink_config.server_info_file).await?;
server_info::wait_for_server_info(&sink_config.server_info_file).await?;
let mut sink_client = SinkClient::connect(sink_config).await?;

let mut transformer_client = if let Some(config) = transformer_config {
wait_for_server_info(&config.server_info_file).await?;
server_info::wait_for_server_info(&config.server_info_file).await?;
Some(TransformerClient::connect(config).await?)
} else {
None
@@ -100,17 +108,17 @@ pub async fn run_forwarder(
Ok(())
}

async fn wait_for_server_info(file_path: &str) -> Result<()> {
loop {
if let Ok(metadata) = fs::metadata(file_path) {
if metadata.len() > 0 {
return Ok(());
}
}
info!("Server info file {} is not ready, waiting...", file_path);
sleep(Duration::from_secs(1)).await;
}
}
// async fn wait_for_server_info(file_path: &str) -> Result<()> {
// loop {
// if let Ok(metadata) = fs::metadata(file_path) {
// if metadata.len() > 0 {
// return Ok(());
// }
// }
// info!("Server info file {} is not ready, waiting...", file_path);
// sleep(Duration::from_secs(1)).await;
// }
// }

async fn wait_until_ready(
source_client: &mut SourceClient,
@@ -184,13 +192,15 @@ async fn shutdown_signal(shutdown_rx: Option<oneshot::Receiver<()>>) {

#[cfg(test)]
mod tests {
use crate::sink::SinkConfig;
use crate::source::SourceConfig;
use std::env;

use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source};
use std::env;
use tokio::sync::mpsc::Sender;

use crate::sink::SinkConfig;
use crate::source::SourceConfig;

struct SimpleSource;
#[tonic::async_trait]
impl source::Sourcer for SimpleSource {
@@ -241,6 +251,19 @@ mod tests {
max_message_size: 100,
};

// Write to the server info file with sample data, use the write_server_info
// function from the server_info module
// println!("file_path_1: {:?}", src_info_file.clone());
// let dummy_info = server_info::ServerInfo::dummy();
// server_info::write_server_info(&dummy_info, &src_info_file.to_str().unwrap())
// .await.expect("Could not write to server info file");

// // read the server info file and check if the data is correct
// let s = fs::read_to_string(src_info_file.clone());
// println!("Server info file: {:?}", s);
//
// println!("file_path_1: {:?}", src_info_file);

let (sink_shutdown_tx, sink_shutdown_rx) = tokio::sync::oneshot::channel();
let tmp_dir = tempfile::TempDir::new().unwrap();
let sink_sock_file = tmp_dir.path().join("sink.sock");
Loading