Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: job isolation done #204

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- added metadata serialization and deserialization
- Added retry job endpoint for failed jobs
- readme: setup instructions added
- Added : Grafana dashboard
Expand Down Expand Up @@ -51,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- refactor: job isolation added, each job will have needed information from it's worker
- verify_job now handles VerificationTimeout status
- refactor: expect removed and added error wraps
- refactor: Readme and .env.example
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ axum-macros = "0.4.1"
bincode = "1.3.3"
bytes = "1.7.2"
color-eyre = "0.6.2"
chrono = "0.4.0"
chrono = { version = "0.4", features = ["serde"] }
c-kzg = "1.0.3"
dotenvy = "0.15.7"
futures = "0.3.30"
Expand Down
7 changes: 7 additions & 0 deletions crates/orchestrator/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip";

pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've a lot of metadata constants here. should we reuse this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

pub const JOB_METADATA_SNOS_OUTPUT_PATH: &str = "snos_output_path";
pub const JOB_METADATA_PROGRAM_OUTPUT_PATH: &str = "program_output_path";
pub const JOB_METADATA_CROSS_VERIFY: &str = "cross_verify";
pub const JOB_METADATA_SNOS_FULL_OUTPUT: &str = "snos_full_output";
pub const JOB_METADATA_BLOB_DATA_PATH: &str = "blob_data_path";
24 changes: 18 additions & 6 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,26 @@ impl Database for MongoDb {

tracing::debug!(job_type = ?job_type, category = "db_call", "Fetching latest job by type");

// Get the first (and only) result if it exists
match cursor.try_next().await? {
Some(doc) => {
let job: JobItem = mongodb::bson::from_document(doc)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should use .map_error ?

let job: JobItem = mongodb::bson::from_document(doc).map_err(|e| {
    tracing::error!(error = %e, document = ?doc, "Failed to deserialize document into JobItem");
    e
})?;

let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(Some(job))
// Try to deserialize and log any errors
match mongodb::bson::from_document::<JobItem>(doc.clone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're cloning here so that we can log in error later? if yes, should we explain that's why we're cloning it?

Ok(job) => {
tracing::debug!(deserialized_job = ?job, "Successfully deserialized job");
let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(Some(job))
}
Err(e) => {
tracing::error!(
error = %e,
document = ?doc,
"Failed to deserialize document into JobItem"
);
Err(eyre!("Failed to deserialize document: {}", e))
}
}
}
None => Ok(None),
}
Expand Down
75 changes: 59 additions & 16 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
use color_eyre::eyre::WrapErr;
use color_eyre::eyre::{eyre, WrapErr};
use lazy_static::lazy_static;
use num_bigint::{BigUint, ToBigUint};
use num_traits::{Num, Zero};
Expand All @@ -19,7 +19,7 @@ use uuid::Uuid;
use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;
use crate::jobs::metadata::{DaMetadata, JobMetadata, JobSpecificMetadata};
use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec;

lazy_static! {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Job for DaJob {
&self,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
metadata: JobMetadata,
) -> Result<JobItem, JobError> {
let job_id = Uuid::new_v4();
tracing::info!(log_type = "starting", category = "da", function_type = "create_job", block_no = %internal_id, "DA job creation started.");
Expand All @@ -91,7 +91,21 @@ impl Job for DaJob {
#[tracing::instrument(fields(category = "da"), skip(self, config), ret, err)]
async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "DA job processing started.");
tracing::info!(
log_type = "starting",
category = "da",
function_type = "process_job",
job_id = ?job.id,
block_no = %internal_id,
"DA job processing started."
);

// Get DA-specific metadata
let mut da_metadata: DaMetadata = job.metadata.specific.clone().try_into().map_err(|e| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure of this idea of "specific" and "common" inside metadata.
What is your opinion on two fields within the job itself ?
i.e
instead of metadata.specific and metadata.common we should have
commonMetaData and JobMetadata directly in the job,
Do feel the current structure is necessary ?

tracing::error!(job_id = ?job.id, error = ?e, "Invalid metadata type for DA job");
JobError::Other(OtherError(e))
})?;

let block_no = job.internal_id.parse::<u64>().wrap_err("Failed to parse u64".to_string()).map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to parse block number");
JobError::Other(OtherError(e))
Expand All @@ -115,13 +129,14 @@ impl Job for DaJob {
MaybePendingStateUpdate::Update(state_update) => state_update,
};
tracing::debug!(job_id = ?job.id, "Retrieved state update");

// constructing the data from the rpc
let blob_data = state_update_to_blob_data(block_no, state_update, config.clone()).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to convert state update to blob data");
JobError::Other(OtherError(e))
})?;

// transforming the data so that we can apply FFT on this.
// @note: we can skip this step if in the above step we return vec<BigUint> directly
let blob_data_biguint = convert_to_biguint(blob_data.clone());
tracing::trace!(job_id = ?job.id, "Converted blob data to BigUint");

Expand All @@ -130,16 +145,26 @@ impl Job for DaJob {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to apply FFT transformation");
JobError::Other(OtherError(e))
})?;
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
// Get blob data path from metadata
let blob_data_path = da_metadata.blob_data_path.as_ref().ok_or_else(|| {
tracing::error!(job_id = ?job.id, "Blob data path not found in metadata");
JobError::Other(OtherError(eyre!("Blob data path not found in metadata")))
})?;

// Store the transformed data
store_blob_data(transformed_data.clone(), blob_data_path, config.clone()).await?;
tracing::debug!(job_id = ?job.id, "Stored blob data");

let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await;
let max_blob_per_txn = config.da_client().max_blob_per_txn().await;
tracing::trace!(job_id = ?job.id, max_bytes_per_blob = max_bytes_per_blob, max_blob_per_txn = max_blob_per_txn, "Retrieved DA client configuration");
// converting BigUints to Vec<u8>, one Vec<u8> represents one blob data
tracing::trace!(
job_id = ?job.id,
max_bytes_per_blob = max_bytes_per_blob,
max_blob_per_txn = max_blob_per_txn,
"Retrieved DA client configuration"
);

let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?;
let current_blob_length: u64 = blob_array
Expand All @@ -152,9 +177,14 @@ impl Job for DaJob {
})?;
tracing::debug!(job_id = ?job.id, blob_count = current_blob_length, "Converted data to blobs");

// there is a limit on number of blobs per txn, checking that here
// Check blob limit
if current_blob_length > max_blob_per_txn {
tracing::warn!(job_id = ?job.id, current_blob_length = current_blob_length, max_blob_per_txn = max_blob_per_txn, "Exceeded maximum number of blobs per transaction");
tracing::warn!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a error only if we are throwing an error after this, isn't it ?

job_id = ?job.id,
current_blob_length = current_blob_length,
max_blob_per_txn = max_blob_per_txn,
"Exceeded maximum number of blobs per transaction"
);
Err(DaError::MaxBlobsLimitExceeded {
max_blob_per_txn,
current_blob_length,
Expand All @@ -163,13 +193,24 @@ impl Job for DaJob {
})?
}

// making the txn to the DA layer
// Publish to DA layer
let external_id = config.da_client().publish_state_diff(blob_array, &[0; 32]).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to publish state diff to DA layer");
JobError::Other(OtherError(e))
})?;

tracing::info!(log_type = "completed", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, external_id = ?external_id, "Successfully published state diff to DA layer.");
da_metadata.tx_hash = Some(external_id.clone());
job.metadata.specific = JobSpecificMetadata::Da(da_metadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of how we don't have to send the new metadata to database right away.
but I remember an error about this not working properly,

for example JOB_METADATA_PROCESSING_COMPLETED_AT is never updated to the job when we add it after external_id is returned to the general_process_job.

We can discuss more on this offline, but ensure to validate that these values get registered to Database.


tracing::info!(
log_type = "completed",
category = "da",
function_type = "process_job",
job_id = ?job.id,
block_no = %internal_id,
external_id = ?external_id,
"Successfully published state diff to DA layer."
);
Ok(external_id)
}

Expand Down Expand Up @@ -344,14 +385,16 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc<Config>) -> Result<(), JobError> {
async fn store_blob_data(blob_data: Vec<BigUint>, blob_data_path: &str, config: Arc<Config>) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;

let blob_data_vec_u8 = biguint_vec_to_u8_vec(blob_data.as_slice());

if !blob_data_vec_u8.is_empty() {
storage_client.put_data(blob_data_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
storage_client
.put_data(blob_data_vec_u8.into(), blob_data_path)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
}

Ok(())
Expand Down
Loading
Loading