Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update: added termination queue #56

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Tests for updating the state.
- Function to update the state and publish blob on ethereum in state update job.
- Fixtures for testing.
- Implement DL queue for handling failed jobs.

## Changed

Expand Down
26 changes: 26 additions & 0 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,32 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
Ok(())
}

/// Terminates the job and updates the status of the job in the DB.
/// Throws error if the job status `Completed` is existing on DL queue.
pub async fn handle_job_failure(id: Uuid) -> Result<()> {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
let config = config().await;

let mut job = get_job(id).await?.clone();
let mut metadata = job.metadata.clone();

if job.status == JobStatus::Completed {
return Err(eyre!("Invalid state exists on DL queue: {}", job.status.to_string()));
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
}
// We assume that a Failure status wil only show up if the message is sent twice from a queue
// Can return silently because it's already been processed.
else if job.status == JobStatus::Failed {
return Ok(());
}

metadata.insert("last_job_status".to_string(), job.status.to_string());
job.metadata = metadata;
job.status = JobStatus::Failed;

config.database().update_job(&job).await?;

Ok(())
}

fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
Expand Down
18 changes: 17 additions & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt};

use color_eyre::eyre::eyre;
use color_eyre::Result;
Expand Down Expand Up @@ -99,6 +99,22 @@ pub enum JobStatus {
VerificationTimeout,
/// The job failed processing
VerificationFailed(String),
/// The job failed completing
Failed,
}

impl fmt::Display for JobStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
JobStatus::Created => write!(f, "Created"),
JobStatus::LockedForProcessing => write!(f, "Locked for Processing"),
JobStatus::PendingVerification => write!(f, "Pending Verification"),
JobStatus::Completed => write!(f, "Completed"),
JobStatus::VerificationTimeout => write!(f, "Verification Timeout"),
JobStatus::VerificationFailed(reason) => write!(f, "Verification Failed: {}", reason),
JobStatus::Failed => write!(f, "Failed"),
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
41 changes: 21 additions & 20 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use tracing::log;
use uuid::Uuid;

use crate::config::config;
use crate::jobs::{process_job, verify_job};
use crate::jobs::{handle_job_failure, process_job, verify_job};

pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";
// Below is the Data Letter Queue for the the above two jobs.
pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue";

#[derive(Debug, Serialize, Deserialize)]
pub struct JobQueueMessage {
Expand Down Expand Up @@ -68,26 +70,25 @@ where
Ok(())
}

pub async fn init_consumers() -> Result<()> {
// TODO: figure out a way to generalize this
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_PROCESSING_QUEUE.to_string(), process_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_PROCESSING_QUEUE, e),
}
sleep(Duration::from_secs(1)).await;
}
});
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_VERIFICATION_QUEUE.to_string(), verify_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_VERIFICATION_QUEUE, e),
macro_rules! spawn_consumer {
($queue_type :expr, $handler : expr) => {
tokio::spawn(async move {
loop {
match consume_job_from_queue($queue_type, $handler).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e),
}
sleep(Duration::from_secs(1)).await;
}
sleep(Duration::from_secs(1)).await;
}
});
});
};
}

pub async fn init_consumers() -> Result<()> {
spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job);
spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job);
spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure);

Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/tests/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::database::{Database, DatabaseConfig};
use crate::queue::sqs::SqsQueue;
use crate::queue::QueueProvider;

use super::common::drop_database;
use httpmock::MockServer;
// Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html
// TestConfigBuilder allows to heavily customise the global configs based on the test's requirement.
Expand Down Expand Up @@ -132,6 +133,8 @@ impl TestConfigBuilder {
self.storage.unwrap(),
);

drop_database().await.unwrap();

config_force_init(config).await;

server
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn test_database_create_job() -> color_eyre::Result<()> {
// Test Util Functions
// ==========================================

fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem {
pub fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem {
JobItem {
id: Uuid::new_v4(),
internal_id: internal_id.to_string(),
Expand Down
105 changes: 105 additions & 0 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use super::database::build_job_item;
use crate::config::config;
use crate::jobs::handle_job_failure;
use crate::jobs::types::JobType;
use crate::{jobs::types::JobStatus, tests::config::TestConfigBuilder};
use rstest::rstest;

#[cfg(test)]
Expand All @@ -20,3 +25,103 @@ async fn create_job_fails_job_already_exists() {
async fn create_job_fails_works_new_job() {
// TODO
}

use std::str::FromStr;

impl FromStr for JobStatus {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"Created" => Ok(JobStatus::Created),
"LockedForProcessing" => Ok(JobStatus::LockedForProcessing),
"PendingVerification" => Ok(JobStatus::PendingVerification),
"Completed" => Ok(JobStatus::Completed),
"VerificationTimeout" => Ok(JobStatus::VerificationTimeout),
"Failed" => Ok(JobStatus::Failed),
s if s.starts_with("VerificationFailed(") && s.ends_with(')') => {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
let reason = s[19..s.len() - 1].to_string();
Ok(JobStatus::VerificationFailed(reason))
}
_ => Err(format!("Invalid job status: {}", s)),
}
}
}

impl FromStr for JobType {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"SnosRun" => Ok(JobType::SnosRun),
"DataSubmission" => Ok(JobType::DataSubmission),
"ProofCreation" => Ok(JobType::ProofCreation),
"ProofRegistration" => Ok(JobType::ProofRegistration),
"StateTransition" => Ok(JobType::StateTransition),
_ => Err(format!("Invalid job type: {}", s)),
}
}
}

#[rstest]
#[case("DataSubmission", "Completed")] // code should panic here, how can completed move to dl queue ?
#[case("SnosRun", "PendingVerification")]
#[case("ProofCreation", "LockedForProcessing")]
#[case("ProofRegistration", "Created")]
#[case("DataSubmission", "Failed")]
#[case("StateTransition", "Completed")]
#[case("ProofCreation", "VerificationTimeout")]
#[case("DataSubmission", "VerificationFailed()")]
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
#[tokio::test]
async fn test_handle_job_failure(#[case] job_type: JobType, #[case] job_status: JobStatus) -> color_eyre::Result<()> {
use color_eyre::eyre::eyre;

TestConfigBuilder::new().build().await;
dotenvy::from_filename("../.env.test")?;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

let internal_id = 1;

let config = config().await;
let database_client = config.database();

// create a job
let mut job = build_job_item(job_type.clone(), job_status.clone(), internal_id);
let job_id = job.id;

// if testcase is for Failure, add last_job_status to job's metadata
if job_status == JobStatus::Failed {
let mut metadata = job.metadata.clone();
metadata.insert("last_job_status".to_string(), "VerificationTimeout".to_string());
job.metadata = metadata;
}

// feeding the job to DB
database_client.create_job(job.clone()).await.unwrap();

// calling handle_job_failure
let response = handle_job_failure(job_id).await;

match response {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
Ok(()) => {
// check job in db
let job = config.database().get_job_by_id(job_id).await?;

if let Some(job_item) = job {
// check if job status is Failure
assert_eq!(job_item.status, JobStatus::Failed);
// check if job metadata has `last_job_status`
assert_ne!(None, job_item.metadata.get("last_job_status"));
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

println!("Handle Job Failure for ID {} was handled successfully", job_id);
} else {
return Err(eyre!("Unable to fetch Job Data"));
}
}
Err(err) => {
let expected = eyre!("Invalid state exists on DL queue: Completed");
// Should only fail for Completed case, anything else : raise error
assert_eq!(err.to_string(), expected.to_string());
}
}
Ok(())
}