Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding Logs (#147)
Browse files Browse the repository at this point in the history
* logs to orchestrator

* Update/init logs (#148)

* update: adding ret and err to instrumentation

* update: added logs

* update: fixed info logs

* update: corrected debugs and trace

* fix: removing dummy code

* fix: linting

* update: fixed issues

* Update crates/orchestrator/src/jobs/proving_job/mod.rs

Co-authored-by: Mohit Dhattarwal <48082542+Mohiiit@users.noreply.github.com>

* update: fixed Reviews

* update: fixed PR review

* Update crates/orchestrator/src/jobs/snos_job/fact_topology.rs

Co-authored-by: Mohit Dhattarwal <48082542+Mohiiit@users.noreply.github.com>

* Update crates/orchestrator/src/jobs/snos_job/mod.rs

Co-authored-by: Mohit Dhattarwal <48082542+Mohiiit@users.noreply.github.com>

* Update crates/orchestrator/src/jobs/snos_job/fact_topology.rs

Co-authored-by: Mohit Dhattarwal <48082542+Mohiiit@users.noreply.github.com>

* update: lint fix

---------

Co-authored-by: Mohit Dhattarwal <48082542+Mohiiit@users.noreply.github.com>
heemankv and Mohiiit authored Oct 11, 2024
1 parent fb6f95c commit 59f6172
Showing 28 changed files with 851 additions and 213 deletions.
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ STARKNET_ACCOUNT_ADDRESS=0x3bb306a004034dba19e6cf7b161e7a4fef64bc1078419e8ad1876
## Instrumentation
OTEL_SERVICE_NAME="madara_orchestrator"


##### Tests #####

STARKNET_OPERATOR_ADDRESS="0x5b98B836969A60FEC50Fa925905Dd1D382a7db43"
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- added logs
- added MongoDB migrations using nodejs
- added dockerfile
- `SnosJob` implementation and e2e
3 changes: 1 addition & 2 deletions crates/orchestrator/src/controllers/errors.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ use axum::response::IntoResponse;
use axum::Json;
use color_eyre::eyre::ErrReport;
use serde_json::json;
use tracing::log;

/// Root level error which is sent back to the client
#[derive(thiserror::Error, Debug)]
@@ -15,7 +14,7 @@ pub enum AppError {
/// Convert the error into a response so that it can be sent back to the client
impl IntoResponse for AppError {
fn into_response(self) -> axum::http::Response<axum::body::Body> {
log::error!("Error: {:?}", self);
tracing::error!("Error: {:?}", self);
let (status, err_msg) = match self {
Self::InternalServerError(msg) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, msg.to_string()),
};
16 changes: 16 additions & 0 deletions crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
@@ -48,7 +48,16 @@ impl DataStorage for AWSS3 {
async fn get_data(&self, key: &str) -> Result<Bytes> {
let response = self.client.get_object().bucket(&self.bucket).key(key).send().await?;
let data_stream = response.body.collect().await.expect("Failed to convert body into AggregatedBytes.");
tracing::debug!("DataStorage: Collected response body into data stream from {}, key={}", self.bucket, key);
let data_bytes = data_stream.into_bytes();
tracing::debug!(
log_type = "DataStorage",
category = "data_storage_call",
data_bytes = data_bytes.len(),
"Successfully retrieved and converted data from {}, key={}",
self.bucket,
key
);
Ok(data_bytes)
}

@@ -63,6 +72,13 @@ impl DataStorage for AWSS3 {
.send()
.await?;

tracing::debug!(
log_type = "DataStorage",
category = "data_storage_call",
"Successfully put data into {}. key={}",
self.bucket,
key
);
Ok(())
}

44 changes: 25 additions & 19 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ impl MongoDb {
let client = Client::with_options(client_options).expect("Failed to create MongoDB client");
// Ping the server to see if you can connect to the cluster
client.database("admin").run_command(doc! {"ping": 1}, None).await.expect("Failed to ping MongoDB deployment");
log::debug!("Pinged your deployment. You successfully connected to MongoDB!");
tracing::debug!("Pinged your deployment. You successfully connected to MongoDB!");

Self { client, database_name: mongo_db_settings.database_name }
}
@@ -86,30 +86,33 @@ impl MongoDb {

#[async_trait]
impl Database for MongoDb {
#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn create_job(&self, job: JobItem) -> Result<JobItem> {
self.get_job_collection().insert_one(&job, None).await?;
tracing::debug!(job_id = %job.id, category = "db_call", "Job created successfully");
Ok(job)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_job_by_id(&self, id: Uuid) -> Result<Option<JobItem>> {
let filter = doc! {
"id": id
};
tracing::debug!(job_id = %id, category = "db_call", "Fetched job by ID");
Ok(self.get_job_collection().find_one(filter, None).await?)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"internal_id": internal_id,
"job_type": mongodb::bson::to_bson(&job_type)?,
};
tracing::debug!(internal_id = %internal_id, job_type = ?job_type, category = "db_call", "Fetched job by internal ID and type");
Ok(self.get_job_collection().find_one(filter, None).await?)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result<()> {
// Filters to search for the job
let filter = doc! {
@@ -126,18 +129,21 @@ impl Database for MongoDb {

let result = self.get_job_collection().update_one(filter, update, options).await?;
if result.modified_count == 0 {
tracing::warn!(job_id = %current_job.id, category = "db_call", "Failed to update job. Job version is likely outdated");
return Err(eyre!("Failed to update job. Job version is likely outdated"));
}

tracing::debug!(job_id = %current_job.id, category = "db_call", "Job updated successfully");
Ok(())
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
tracing::debug!(job_type = ?job_type, category = "db_call", "Fetching latest job by type");
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

@@ -162,7 +168,7 @@ impl Database for MongoDb {
/// job_b_type : ProofCreation
///
/// TODO : For now Job B status implementation is pending so we can pass None
#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_jobs_without_successor(
&self,
job_a_type: JobType,
@@ -226,7 +232,6 @@ impl Database for MongoDb {
}
},
];

// TODO : Job B status code :
// // Conditionally add status matching for job_b_status
// if let Some(status) = job_b_status {
@@ -253,16 +258,17 @@ impl Database for MongoDb {
match result {
Ok(document) => match bson::from_bson(Bson::Document(document)) {
Ok(job_item) => vec_jobs.push(job_item),
Err(e) => eprintln!("Failed to deserialize JobItem: {:?}", e),
Err(e) => tracing::error!(error = %e, category = "db_call", "Failed to deserialize JobItem"),
},
Err(e) => eprintln!("Error retrieving document: {:?}", e),
Err(e) => tracing::error!(error = %e, category = "db_call", "Error retrieving document"),
}
}

tracing::debug!(job_count = vec_jobs.len(), category = "db_call", "Retrieved jobs without successor");
Ok(vec_jobs)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_latest_job_by_type_and_status(
&self,
job_type: JobType,
@@ -274,10 +280,11 @@ impl Database for MongoDb {
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();

tracing::debug!(job_type = ?job_type, job_status = ?job_status, category = "db_call", "Fetched latest job by type and status");
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_jobs_after_internal_id_by_job_type(
&self,
job_type: JobType,
@@ -287,15 +294,14 @@ impl Database for MongoDb {
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"status": bson::to_bson(&job_status)?,
"internal_id": { "$gt": internal_id }
"internal_id": { "$gt": internal_id.clone() }
};

let jobs = self.get_job_collection().find(filter, None).await?.try_collect().await?;

let jobs: Vec<JobItem> = self.get_job_collection().find(filter, None).await?.try_collect().await?;
tracing::debug!(job_type = ?job_type, job_status = ?job_status, internal_id = internal_id, category = "db_call", "Fetched jobs after internal ID by job type");
Ok(jobs)
}

#[tracing::instrument(skip(self, limit), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self, limit), fields(function_type = "db_call"), ret, err)]
async fn get_jobs_by_statuses(&self, job_status: Vec<JobStatus>, limit: Option<i64>) -> Result<Vec<JobItem>> {
let filter = doc! {
"status": {
@@ -306,8 +312,8 @@ impl Database for MongoDb {

let find_options = limit.map(|val| FindOptions::builder().limit(Some(val)).build());

let jobs = self.get_job_collection().find(filter, find_options).await?.try_collect().await?;

let jobs: Vec<JobItem> = self.get_job_collection().find(filter, find_options).await?.try_collect().await?;
tracing::debug!(job_count = jobs.len(), category = "db_call", "Retrieved jobs by statuses");
Ok(jobs)
}
}
92 changes: 61 additions & 31 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ use starknet::core::types::{
};
use starknet::providers::Provider;
use thiserror::Error;
use tracing::log;
use uuid::Uuid;

use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
@@ -65,73 +64,93 @@ pub struct DaJob;

#[async_trait]
impl Job for DaJob {
#[tracing::instrument(fields(category = "da"), skip(self, _config, metadata))]
#[tracing::instrument(fields(category = "da"), skip(self, _config, metadata), ret, err)]
async fn create_job(
&self,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem, JobError> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
let job_id = Uuid::new_v4();
tracing::info!(log_type = "starting", category = "da", function_type = "create_job", block_no = %internal_id, "DA job creation started.");
let job_item = JobItem {
id: job_id,
internal_id: internal_id.clone(),
job_type: JobType::DataSubmission,
status: JobStatus::Created,
external_id: String::new().into(),
metadata,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
};
tracing::info!(log_type = "completed", category = "da", function_type = "create_job", block_no = %internal_id, "DA job creation completed.");
Ok(job_item)
}

#[tracing::instrument(fields(category = "da"), skip(self, config))]
#[tracing::instrument(fields(category = "da"), skip(self, config), ret, err)]
async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
let block_no = job
.internal_id
.parse::<u64>()
.wrap_err("Failed to parse u64".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "DA job processing started.");
let block_no = job.internal_id.parse::<u64>().wrap_err("Failed to parse u64".to_string()).map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to parse block number");
JobError::Other(OtherError(e))
})?;

let state_update = config
.starknet_client()
.get_state_update(BlockId::Number(block_no))
.await
.wrap_err("Failed to get state Update.".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;
.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to get state update");
JobError::Other(OtherError(e))
})?;

let state_update = match state_update {
MaybePendingStateUpdate::PendingUpdate(_) => {
tracing::warn!(job_id = ?job.id, block_no = block_no, "Block is still pending");
Err(DaError::BlockPending { block_no: block_no.to_string(), job_id: job.id })?
}
MaybePendingStateUpdate::Update(state_update) => state_update,
};
tracing::debug!(job_id = ?job.id, "Retrieved state update");
// constructing the data from the rpc
let blob_data = state_update_to_blob_data(block_no, state_update, config.clone())
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
let blob_data = state_update_to_blob_data(block_no, state_update, config.clone()).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to convert state update to blob data");
JobError::Other(OtherError(e))
})?;
// transforming the data so that we can apply FFT on this.
// @note: we can skip this step if in the above step we return vec<BigUint> directly
let blob_data_biguint = convert_to_biguint(blob_data.clone());
tracing::trace!(job_id = ?job.id, "Converted blob data to BigUint");

// data transformation on the data
let transformed_data = fft_transformation(blob_data_biguint);
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
tracing::debug!(job_id = ?job.id, "Stored blob data");

let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await;
let max_blob_per_txn = config.da_client().max_blob_per_txn().await;

tracing::trace!(job_id = ?job.id, max_bytes_per_blob = max_bytes_per_blob, max_blob_per_txn = max_blob_per_txn, "Retrieved DA client configuration");
// converting BigUints to Vec<u8>, one Vec<u8> represents one blob data

let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?;
let current_blob_length: u64 = blob_array
.len()
.try_into()
.wrap_err("Unable to convert the blob length into u64 format.".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;
.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to convert blob length to u64");
JobError::Other(OtherError(e))
})?;
tracing::debug!(job_id = ?job.id, blob_count = current_blob_length, "Converted data to blobs");

// there is a limit on number of blobs per txn, checking that here
if current_blob_length > max_blob_per_txn {
tracing::warn!(job_id = ?job.id, current_blob_length = current_blob_length, max_blob_per_txn = max_blob_per_txn, "Exceeded maximum number of blobs per transaction");
Err(DaError::MaxBlobsLimitExceeded {
max_blob_per_txn,
current_blob_length,
@@ -141,23 +160,34 @@ impl Job for DaJob {
}

// making the txn to the DA layer
let external_id = config
.da_client()
.publish_state_diff(blob_array, &[0; 32])
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
let external_id = config.da_client().publish_state_diff(blob_array, &[0; 32]).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to publish state diff to DA layer");
JobError::Other(OtherError(e))
})?;

tracing::info!(log_type = "completed", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, external_id = ?external_id, "Successfully published state diff to DA layer.");
Ok(external_id)
}

#[tracing::instrument(fields(category = "da"), skip(self, config))]
#[tracing::instrument(fields(category = "da"), skip(self, config), ret, err)]
async fn verify_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
Ok(config
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "da", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, "DA job verification started.");
let verification_status = config
.da_client()
.verify_inclusion(job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?)
.verify_inclusion(job.external_id.unwrap_string().map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to unwrap external ID");
JobError::Other(OtherError(e))
})?)
.await
.map_err(|e| JobError::Other(OtherError(e)))?
.into())
.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Job verification failed");
JobError::Other(OtherError(e))
})?
.into();

tracing::info!(log_type = "completed", category = "da", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, verification_status = ?verification_status, "DA job verification completed.");
Ok(verification_status)
}

fn max_process_attempts(&self) -> u64 {
@@ -173,7 +203,7 @@ impl Job for DaJob {
}
}

#[tracing::instrument(skip(elements))]
#[tracing::instrument(skip(elements), ret)]
pub fn fft_transformation(elements: Vec<BigUint>) -> Vec<BigUint> {
let xs: Vec<BigUint> = (0..*BLOB_LEN)
.map(|i| {
@@ -234,7 +264,7 @@ fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>
let mut blob = chunk.to_vec();
if blob.len() < chunk_size {
blob.resize(chunk_size, 0);
log::debug!("Warning: Last chunk of {} bytes was padded to full blob size", chunk.len());
tracing::debug!("Warning: Last chunk of {} bytes was padded to full blob size", chunk.len());
}
blobs.push(blob);
}
134 changes: 100 additions & 34 deletions crates/orchestrator/src/jobs/mod.rs

Large diffs are not rendered by default.

83 changes: 60 additions & 23 deletions crates/orchestrator/src/jobs/proving_job/mod.rs
Original file line number Diff line number Diff line change
@@ -7,8 +7,6 @@ use chrono::{SubsecRound, Utc};
use color_eyre::eyre::{eyre, WrapErr};
use prover_client_interface::{Task, TaskStatus};
use thiserror::Error;
use tracing::log::log;
use tracing::log::Level::Error;
use uuid::Uuid;

use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
@@ -36,67 +34,106 @@ pub struct ProvingJob;

#[async_trait]
impl Job for ProvingJob {
#[tracing::instrument(fields(category = "proving"), skip(self, _config, metadata))]
#[tracing::instrument(fields(category = "proving"), skip(self, _config, metadata), ret, err)]
async fn create_job(
&self,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem, JobError> {
Ok(JobItem {
tracing::info!(log_type = "starting", category = "proving", function_type = "create_job", block_no = %internal_id, "Proving job creation started.");
let job_item = JobItem {
id: Uuid::new_v4(),
internal_id,
internal_id: internal_id.clone(),
job_type: JobType::ProofCreation,
status: JobStatus::Created,
external_id: String::new().into(),
metadata,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
};
tracing::info!(log_type = "completed", category = "proving", function_type = "create_job", block_no = %internal_id, "Proving job created.");
Ok(job_item)
}

#[tracing::instrument(fields(category = "proving"), skip(self, config))]
#[tracing::instrument(fields(category = "proving"), skip(self, config), ret, err)]
async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "proving", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "Proving job processing started.");

// Cairo Pie path in s3 storage client
let block_number: String = job.internal_id.to_string();
let cairo_pie_path = block_number + "/" + CAIRO_PIE_FILE_NAME;
let cairo_pie_file = config
.storage()
.get_data(&cairo_pie_path)
.await
.map_err(|e| ProvingError::CairoPIEFileFetchFailed(e.to_string()))?;
tracing::debug!(job_id = %job.internal_id, %cairo_pie_path, "Fetching Cairo PIE file");

let cairo_pie_file = config.storage().get_data(&cairo_pie_path).await.map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "Failed to fetch Cairo PIE file");
ProvingError::CairoPIEFileFetchFailed(e.to_string())
})?;

let cairo_pie = CairoPie::from_bytes(cairo_pie_file.to_vec().as_slice())
.map_err(|e| ProvingError::CairoPIENotReadable(e.to_string()))?;
tracing::debug!(job_id = %job.internal_id, "Parsing Cairo PIE file");
let cairo_pie = CairoPie::from_bytes(cairo_pie_file.to_vec().as_slice()).map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "Failed to parse Cairo PIE file");
ProvingError::CairoPIENotReadable(e.to_string())
})?;

tracing::debug!(job_id = %job.internal_id, "Submitting task to prover client");
let external_id = config
.prover_client()
.submit_task(Task::CairoPie(cairo_pie))
.await
.wrap_err("Prover Client Error".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;
.map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "Failed to submit task to prover client");
JobError::Other(OtherError(e))
})?;

tracing::info!(log_type = "completed", category = "proving", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, %external_id, "Proving job processed successfully.");
Ok(external_id)
}

#[tracing::instrument(fields(category = "proving"), skip(self, config))]
#[tracing::instrument(fields(category = "proving"), skip(self, config), ret, err)]
async fn verify_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
let task_id: String = job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?.into();

let fact = job.metadata.get(JOB_METADATA_SNOS_FACT).ok_or(OtherError(eyre!("Fact not available in job")))?;
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "proving", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, "Proving job verification started.");

let task_id: String = job
.external_id
.unwrap_string()
.map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "Failed to unwrap external_id");
JobError::Other(OtherError(e))
})?
.into();

let fact = job.metadata.get(JOB_METADATA_SNOS_FACT).ok_or_else(|| {
tracing::error!(job_id = %job.internal_id, "Fact not available in job metadata");
OtherError(eyre!("Fact not available in job"))
})?;

tracing::debug!(job_id = %job.internal_id, %task_id, "Getting task status from prover client");
let task_status = config
.prover_client()
.get_task_status(&task_id, fact)
.await
.wrap_err("Prover Client Error".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;
.map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "Failed to get task status from prover client");
JobError::Other(OtherError(e))
})?;

match task_status {
TaskStatus::Processing => Ok(JobVerificationStatus::Pending),
TaskStatus::Succeeded => Ok(JobVerificationStatus::Verified),
TaskStatus::Processing => {
tracing::info!(log_type = "pending", category = "proving", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, "Proving job verification pending.");
Ok(JobVerificationStatus::Pending)
}
TaskStatus::Succeeded => {
tracing::info!(log_type = "completed", category = "proving", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, "Proving job verification completed.");
Ok(JobVerificationStatus::Verified)
}
TaskStatus::Failed(err) => {
log!(Error, "Prover job #{} failed: {}", job.internal_id, err);
tracing::info!(log_type = "failed", category = "proving", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, "Proving job verification failed.");
Ok(JobVerificationStatus::Rejected(format!(
"Prover job #{} failed with error: {}",
job.internal_id, err
20 changes: 13 additions & 7 deletions crates/orchestrator/src/jobs/register_proof_job/mod.rs
Original file line number Diff line number Diff line change
@@ -15,16 +15,17 @@ pub struct RegisterProofJob;

#[async_trait]
impl Job for RegisterProofJob {
#[tracing::instrument(fields(category = "proof_registry"), skip(self, _config, metadata))]
#[tracing::instrument(fields(category = "proof_registry"), skip(self, _config, metadata), ret, err)]
async fn create_job(
&self,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem, JobError> {
Ok(JobItem {
tracing::info!(log_type = "starting", category = "proof_registry", function_type = "create_job", block_no = %internal_id, "Proof registration job creation started.");
let job_item = JobItem {
id: Uuid::new_v4(),
internal_id,
internal_id: internal_id.clone(),
job_type: JobType::ProofRegistration,
status: JobStatus::Created,
external_id: String::new().into(),
@@ -34,20 +35,25 @@ impl Job for RegisterProofJob {
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
};
tracing::info!(log_type = "completed", category = "proof_registry", function_type = "create_job", block_no = %internal_id, "Proof registration job created.");
Ok(job_item)
}

#[tracing::instrument(fields(category = "proof_registry"), skip(self, _config))]
#[tracing::instrument(fields(category = "proof_registry"), skip(self, _config), ret, err)]
async fn process_job(&self, _config: Arc<Config>, _job: &mut JobItem) -> Result<String, JobError> {
// Get proof from storage and submit on chain for verification
// We need to implement a generic trait for this to support multiple
// base layers
todo!()
}

#[tracing::instrument(fields(category = "proof_registry"), skip(self, _config))]
async fn verify_job(&self, _config: Arc<Config>, _job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
#[tracing::instrument(fields(category = "proof_registry"), skip(self, _config), ret, err)]
async fn verify_job(&self, _config: Arc<Config>, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "proof_registry", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, "Proof registration job verification started.");
// verify that the proof transaction has been included on chain
tracing::info!(log_type = "completed", category = "proof_registry", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, "Proof registration job verification completed.");
todo!()
}

79 changes: 73 additions & 6 deletions crates/orchestrator/src/jobs/snos_job/fact_info.rs
Original file line number Diff line number Diff line change
@@ -27,19 +27,86 @@ pub struct FactInfo {
}

pub fn get_fact_info(cairo_pie: &CairoPie, program_hash: Option<Felt>) -> Result<FactInfo, FactError> {
tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Starting get_fact_info function"
);

tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Getting program output"
);
let program_output = get_program_output(cairo_pie)?;
tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Program output length: {}",
program_output.len()
);

tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Getting fact topology"
);
let fact_topology = get_fact_topology(cairo_pie, program_output.len())?;

let program_hash = match program_hash {
Some(hash) => hash,
None => Felt::from_bytes_be(
&compute_program_hash_chain(&cairo_pie.metadata.program, BOOTLOADER_VERSION)
.map_err(|e| FactError::ProgramHashCompute(e.to_string()))?
.to_bytes_be(),
),
Some(hash) => {
tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Using provided program hash"
);
hash
}
None => {
tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Computing program hash"
);
Felt::from_bytes_be(
&compute_program_hash_chain(&cairo_pie.metadata.program, BOOTLOADER_VERSION)
.map_err(|e| {
tracing::error!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Failed to compute program hash: {}",
e
);
FactError::ProgramHashCompute(e.to_string())
})?
.to_bytes_be(),
)
}
};
tracing::trace!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Program hash: {:?} and now generating merkle root",
program_hash
);
let output_root = generate_merkle_root(&program_output, &fact_topology)?;
let fact = keccak256([program_hash.to_bytes_be(), *output_root.node_hash].concat());
tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Fact computed successfully: {:?}",
fact
);

Ok(FactInfo { program_output, fact_topology, fact })
}

66 changes: 62 additions & 4 deletions crates/orchestrator/src/jobs/snos_job/fact_node.rs
Original file line number Diff line number Diff line change
@@ -50,29 +50,71 @@ pub struct FactNode {
/// Basically it transforms the flat fact topology into a non-binary Merkle tree and then computes
/// its root, enriching the nodes with metadata such as page sizes and hashes.
pub fn generate_merkle_root(program_output: &[Felt252], fact_topology: &FactTopology) -> Result<FactNode, FactError> {
tracing::info!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Starting generate_merkle_root function"
);
let FactTopology { tree_structure, mut page_sizes } = fact_topology.clone();

let mut end_offset: usize = 0;
let mut node_stack: Vec<FactNode> = Vec::with_capacity(page_sizes.len());
let mut output_iter = program_output.iter();

tracing::debug!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Processing tree structure: {:?}",
tree_structure.len()
);
for (n_pages, n_nodes) in tree_structure.into_iter().tuples() {
tracing::trace!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"(n_pages: {}, n_nodes: {})",
n_pages,
n_nodes
);
ensure!(n_pages <= page_sizes.len(), FactError::TreeStructurePagesCountOutOfRange(n_pages, page_sizes.len()));

// Push n_pages (leaves) to the stack
for _ in 0..n_pages {
let page_size = page_sizes.remove(0);
tracing::trace!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Processing page with size: {}",
page_size
);
// Page size is already validated upon retrieving the topology
let page = output_iter.by_ref().take(page_size).map(|felt| felt.to_bytes_be().to_vec()).concat();
let node_hash = keccak256(&page);
end_offset += page_size;
// Add lead node (no children)
node_stack.push(FactNode { node_hash, end_offset, page_size, children: vec![] })
node_stack.push(FactNode { node_hash, end_offset, page_size, children: vec![] });
tracing::debug!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Added leaf node with hash: {:?}",
node_hash.to_string()
);
}

ensure!(n_nodes <= node_stack.len(), FactError::TreeStructureNodesCountOutOfRange(n_nodes, node_stack.len()));

if n_nodes > 0 {
tracing::trace!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Creating parent node for {} children",
n_nodes
);
// Create a parent node to the last n_nodes in the head of the stack.
let children: Vec<FactNode> = node_stack.drain(node_stack.len() - n_nodes..).collect();
let mut node_data = Vec::with_capacity(2 * 32 * children.len());
@@ -87,12 +129,20 @@ pub fn generate_merkle_root(program_output: &[Felt252], fact_topology: &FactTopo
child_end_offset = node.end_offset;
}

node_stack.push(FactNode {
let parent_node = FactNode {
node_hash: calculate_node_hash(node_data.as_slice()),
end_offset: child_end_offset,
page_size: total_page_size,
children,
})
};
node_stack.push(parent_node.clone());
tracing::debug!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Added parent node with hash: {:?}",
parent_node.node_hash
);
}
}

@@ -107,7 +157,15 @@ pub fn generate_merkle_root(program_output: &[Felt252], fact_topology: &FactTopo
FactError::TreeStructureRootOffsetInvalid(node_stack[0].end_offset, program_output.len(),)
);

Ok(node_stack.remove(0))
let root = node_stack.remove(0);
tracing::info!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Successfully generated Merkle root with hash: {:?}",
root.node_hash
);
Ok(root)
}

/// Calculates the keccak hash and adds 1 to it.
40 changes: 40 additions & 0 deletions crates/orchestrator/src/jobs/snos_job/fact_topology.rs
Original file line number Diff line number Diff line change
@@ -23,34 +23,60 @@ pub struct FactTopology {

/// Returns the fact topology from the additional data of the output builtin.
pub fn get_fact_topology(cairo_pie: &CairoPie, output_size: usize) -> Result<FactTopology, FactError> {
tracing::debug!(
log_type = "FactTopology",
category = "get_fact_topology",
function_type = "get_fact_topology",
"Starting get_fact_topology function"
);
if let Some(BuiltinAdditionalData::Output(additional_data)) = cairo_pie.additional_data.0.get(&BuiltinName::output)
{
tracing::trace!("Found Output additional data");
let tree_structure = match additional_data.attributes.get(GPS_FACT_TOPOLOGY) {
Some(tree_structure) => {
tracing::debug!("GPS_FACT_TOPOLOGY found in additional data attributes");
ensure!(!tree_structure.is_empty(), FactError::TreeStructureEmpty);
ensure!(tree_structure.len() % 2 == 0, FactError::TreeStructureLenOdd);
ensure!(tree_structure.len() <= 10, FactError::TreeStructureTooLarge);
ensure!(tree_structure.iter().all(|&x| x < 2 << 30), FactError::TreeStructureInvalid);
tracing::trace!("Tree structure validation passed");
tree_structure.clone()
}
None => {
tracing::warn!("GPS_FACT_TOPOLOGY not found in additional data attributes");
ensure!(additional_data.pages.is_empty(), FactError::OutputPagesLenUnexpected);
tracing::debug!("Using default tree structure");
vec![1, 0]
}
};
tracing::debug!("Retrieving page sizes");
let page_sizes = get_page_sizes(&additional_data.pages, output_size)?;
tracing::debug!(
log_type = "FactTopology",
category = "get_fact_topology",
function_type = "get_fact_topology",
"FactTopology successfully created"
);
Ok(FactTopology { tree_structure, page_sizes })
} else {
tracing::error!("Failed to get Output additional data");
Err(FactError::OutputBuiltinNoAdditionalData)
}
}

/// Returns the sizes of the program output pages, given the pages dictionary that appears
/// in the additional attributes of the output builtin.
pub fn get_page_sizes(pages: &HashMap<usize, PublicMemoryPage>, output_size: usize) -> Result<Vec<usize>, FactError> {
tracing::debug!(
log_type = "FactTopology",
category = "get_page_sizes",
function_type = "get_page_sizes",
"Starting get_page_sizes function"
);
let mut pages_list: Vec<(usize, usize, usize)> =
pages.iter().map(|(&id, page)| (id, page.start, page.size)).collect();
pages_list.sort();
tracing::debug!("FactTopology Sorted pages list: {:?}", pages_list);

// The first page id is expected to be 1.
let mut expected_page_id = 1;
@@ -60,17 +86,22 @@ pub fn get_page_sizes(pages: &HashMap<usize, PublicMemoryPage>, output_size: usi
let mut page_sizes = Vec::with_capacity(pages_list.len() + 1);
// The size of page 0 is output_size if there are no other pages, or the start of page 1 otherwise.
page_sizes.push(output_size);
tracing::debug!("FactTopology Initialized page_sizes with output_size: {}", output_size);

for (page_id, page_start, page_size) in pages_list {
tracing::debug!("FactTopology Processing page: id={}, start={}, size={}", page_id, page_start, page_size);
ensure!(page_id == expected_page_id, FactError::OutputPagesUnexpectedId(page_id, expected_page_id));

if page_id == 1 {
tracing::trace!("FactTopology Processing first page");
ensure!(
page_start > 0 && page_start < output_size,
FactError::OutputPagesInvalidStart(page_id, page_start, output_size)
);
page_sizes[0] = page_start;
tracing::debug!("FactTopology Updated page_sizes[0] to {}", page_start);
} else {
tracing::trace!("FactTopology Processing non-first page");
ensure!(
Some(page_start) == expected_page_start,
FactError::OutputPagesUnexpectedStart(page_id, page_start, expected_page_start.unwrap_or_default(),)
@@ -85,11 +116,20 @@ pub fn get_page_sizes(pages: &HashMap<usize, PublicMemoryPage>, output_size: usi
expected_page_id += 1;

page_sizes.push(page_size);
tracing::trace!("FactTopology Added page_size {} to page_sizes", page_size);
}

tracing::debug!("FactTopology Final page_sizes: {:?}", page_sizes);

ensure!(
pages.is_empty() || expected_page_start == Some(output_size),
FactError::OutputPagesUncoveredOutput(expected_page_start.unwrap_or_default(), output_size)
);

tracing::debug!(
log_type = "FactTopology",
category = "get_page_sizes",
"FactTopology Successfully generated page sizes"
);
Ok(page_sizes)
}
30 changes: 21 additions & 9 deletions crates/orchestrator/src/jobs/snos_job/mod.rs
Original file line number Diff line number Diff line change
@@ -69,16 +69,17 @@ pub struct SnosJob;

#[async_trait]
impl Job for SnosJob {
#[tracing::instrument(fields(category = "snos"), skip(self, _config, metadata))]
#[tracing::instrument(fields(category = "snos"), skip(self, _config, metadata), ret, err)]
async fn create_job(
&self,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem, JobError> {
tracing::info!(log_type = "starting", category = "snos", function_type = "create_job", block_no = %internal_id, "SNOS job creation started.");
let mut metadata = metadata;
metadata.insert(JOB_METADATA_SNOS_BLOCK.to_string(), internal_id.clone());
Ok(JobItem {
let job_item = JobItem {
id: Uuid::new_v4(),
internal_id: internal_id.clone(),
job_type: JobType::SnosRun,
@@ -88,37 +89,48 @@ impl Job for SnosJob {
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
};
tracing::info!(log_type = "completed", category = "snos", function_type = "create_job", block_no = %internal_id, "SNOS job creation completed.");
Ok(job_item)
}

#[tracing::instrument(fields(category = "snos"), skip(self, config))]
#[tracing::instrument(fields(category = "snos"), skip(self, config), ret, err)]
async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "snos", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "SNOS job processing started.");
let block_number = self.get_block_number_from_metadata(job)?;
tracing::debug!(job_id = %job.internal_id, block_number = %block_number, "Retrieved block number from metadata");

let snos_url = config.snos_url().to_string();
let snos_url = snos_url.trim_end_matches('/');
tracing::debug!(job_id = %job.internal_id, "Calling prove_block function");
let (cairo_pie, snos_output) =
prove_block(block_number, snos_url, LayoutName::all_cairo, false).await.map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "SNOS execution failed");
SnosError::SnosExecutionError { internal_id: job.internal_id.clone(), message: e.to_string() }
})?;
tracing::debug!(job_id = %job.internal_id, "prove_block function completed successfully");

let fact_info = get_fact_info(&cairo_pie, None)?;
let program_output = fact_info.program_output;
tracing::debug!(job_id = %job.internal_id, "Fact info calculated successfully");

// snos output = output returned by SNOS
// program output = output written on the output segment
tracing::debug!(job_id = %job.internal_id, "Storing SNOS outputs");
self.store(config.storage(), &job.internal_id, block_number, cairo_pie, snos_output, program_output).await?;

// store the fact
job.metadata.insert(JOB_METADATA_SNOS_FACT.into(), fact_info.fact.to_string());
tracing::info!(log_type = "completed", category = "snos", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "SNOS job processed successfully.");

Ok(block_number.to_string())
}

#[tracing::instrument(fields(category = "snos"), skip(self, _config))]
async fn verify_job(&self, _config: Arc<Config>, _job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
#[tracing::instrument(fields(category = "snos"), skip(self, _config), ret, err)]
async fn verify_job(&self, _config: Arc<Config>, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "snos", function_type = "verify_job", job_id = %job.id, block_no = %internal_id, "SNOS job verification started.");
// No need for verification as of now. If we later on decide to outsource SNOS run
// to another service, verify_job can be used to poll on the status of the job
tracing::info!(log_type = "completed", category = "snos", function_type = "verify_job", job_id = %job.id, block_no = %internal_id, "SNOS job verification completed.");
Ok(JobVerificationStatus::Verified)
}

46 changes: 32 additions & 14 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
@@ -73,22 +73,24 @@ pub enum StateUpdateError {
pub struct StateUpdateJob;
#[async_trait]
impl Job for StateUpdateJob {
#[tracing::instrument(fields(category = "state_update"), skip(self, _config, metadata))]
#[tracing::instrument(fields(category = "state_update"), skip(self, _config, metadata), ret, err)]
async fn create_job(
&self,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem, JobError> {
tracing::info!(log_type = "starting", category = "state_update", function_type = "create_job", block_no = %internal_id, "State update job creation started.");
// Inserting the metadata (If it doesn't exist)
let mut metadata = metadata.clone();
if !metadata.contains_key(JOB_PROCESS_ATTEMPT_METADATA_KEY) {
tracing::debug!(job_id = %internal_id, "Inserting initial process attempt metadata");
metadata.insert(JOB_PROCESS_ATTEMPT_METADATA_KEY.to_string(), "0".to_string());
}

Ok(JobItem {
let job_item = JobItem {
id: Uuid::new_v4(),
internal_id,
internal_id: internal_id.clone(),
job_type: JobType::StateTransition,
status: JobStatus::Created,
external_id: String::new().into(),
@@ -98,11 +100,15 @@ impl Job for StateUpdateJob {
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
};
tracing::info!(log_type = "completed", category = "state_update", function_type = "create_job", block_no = %internal_id, "State update job created.");
Ok(job_item)
}

#[tracing::instrument(fields(category = "state_update"), skip(self, config))]
#[tracing::instrument(fields(category = "state_update"), skip(self, config), ret, err)]
async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "state_update", function_type = "process_job", job_id = %job.id, block_no = %internal_id, "State update job processing started.");
let attempt_no = job
.metadata
.get(JOB_PROCESS_ATTEMPT_METADATA_KEY)
@@ -114,24 +120,21 @@ impl Job for StateUpdateJob {
let mut block_numbers = self.get_block_numbers_from_metadata(job)?;
self.validate_block_numbers(config.clone(), &block_numbers).await?;

// If we had a block state update failing last run, we recover from this block
if let Some(last_failed_block) = job.metadata.get(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO) {
let last_failed_block =
last_failed_block.parse().map_err(|_| StateUpdateError::LastFailedBlockNonPositive)?;

block_numbers = block_numbers.into_iter().filter(|&block| block >= last_failed_block).collect::<Vec<u64>>();
}

let mut nonce = config.settlement_client().get_nonce().await.map_err(|e| JobError::Other(OtherError(e)))?;

let mut sent_tx_hashes: Vec<String> = Vec::with_capacity(block_numbers.len());
for block_no in block_numbers.iter() {
tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block");
let snos = self.fetch_snos_for_block(*block_no, config.clone()).await;
let tx_hash = self.update_state_for_block(config.clone(), *block_no, snos, nonce).await.map_err(|e| {
tracing::error!(job_id = %job.internal_id, block_no = %block_no, error = %e, "Error updating state for block");
job.metadata.insert(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO.into(), block_no.to_string());

self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes);

StateUpdateError::Other(OtherError(eyre!(
"Block #{block_no} - Error occurred during the state update: {e}"
)))
@@ -142,8 +145,8 @@ impl Job for StateUpdateJob {

self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes);

// external_id returned corresponds to the last block number settled
let val = block_numbers.last().ok_or_else(|| StateUpdateError::LastNumberReturnedError)?;
tracing::info!(log_type = "completed", category = "state_update", function_type = "process_job", job_id = %job.id, block_no = %internal_id, last_settled_block = %val, "State update job processed successfully.");

Ok(val.to_string())
}
@@ -153,12 +156,15 @@ impl Job for StateUpdateJob {
/// 1. the last settlement tx hash is successful,
/// 2. the expected last settled block from our configuration is indeed the one found in the
/// provider.
#[tracing::instrument(fields(category = "state_update"), skip(self, config))]
#[tracing::instrument(fields(category = "state_update"), skip(self, config), ret, err)]
async fn verify_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "state_update", function_type = "verify_job", job_id = %job.id, block_no = %internal_id, "State update job verification started.");
let attempt_no = job
.metadata
.get(JOB_PROCESS_ATTEMPT_METADATA_KEY)
.ok_or_else(|| StateUpdateError::AttemptNumberNotFound)?;
tracing::debug!(job_id = %job.internal_id, attempt_no = %attempt_no, "Retrieved attempt number");

// We are doing attempt_no - 1 because the attempt number is increased in the
// global process job function and the transaction hash is stored with attempt
@@ -172,18 +178,22 @@ impl Job for StateUpdateJob {

let tx_hashes: Vec<&str> = metadata_tx_hashes.split(',').collect();
let block_numbers = self.get_block_numbers_from_metadata(job)?;
tracing::debug!(job_id = %job.internal_id, "Retrieved block numbers from metadata");
let settlement_client = config.settlement_client();

for (tx_hash, block_no) in tx_hashes.iter().zip(block_numbers.iter()) {
tracing::trace!(job_id = %job.internal_id, tx_hash = %tx_hash, block_no = %block_no, "Verifying transaction inclusion");
let tx_inclusion_status =
settlement_client.verify_tx_inclusion(tx_hash).await.map_err(|e| JobError::Other(OtherError(e)))?;
match tx_inclusion_status {
SettlementVerificationStatus::Rejected(_) => {
tracing::warn!(job_id = %job.internal_id, tx_hash = %tx_hash, block_no = %block_no, "Transaction rejected");
job.metadata.insert(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO.into(), block_no.to_string());
return Ok(tx_inclusion_status.into());
}
// If the tx is still pending, we wait for it to be finalized and check again the status.
SettlementVerificationStatus::Pending => {
tracing::debug!(job_id = %job.internal_id, tx_hash = %tx_hash, "Transaction pending, waiting for finality");
settlement_client
.wait_for_tx_finality(tx_hash)
.await
@@ -194,17 +204,23 @@ impl Job for StateUpdateJob {
.map_err(|e| JobError::Other(OtherError(e)))?;
match new_status {
SettlementVerificationStatus::Rejected(_) => {
tracing::warn!(job_id = %job.internal_id, tx_hash = %tx_hash, block_no = %block_no, "Transaction rejected after finality");
job.metadata
.insert(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO.into(), block_no.to_string());
return Ok(new_status.into());
}
SettlementVerificationStatus::Pending => {
tracing::error!(job_id = %job.internal_id, tx_hash = %tx_hash, "Transaction still pending after finality check");
Err(StateUpdateError::TxnShouldNotBePending { tx_hash: tx_hash.to_string() })?
}
SettlementVerificationStatus::Verified => {}
SettlementVerificationStatus::Verified => {
tracing::debug!(job_id = %job.internal_id, tx_hash = %tx_hash, "Transaction verified after finality");
}
}
}
SettlementVerificationStatus::Verified => {}
SettlementVerificationStatus::Verified => {
tracing::debug!(job_id = %job.internal_id, tx_hash = %tx_hash, "Transaction verified");
}
}
}
// verify that the last settled block is indeed the one we expect to be
@@ -213,8 +229,10 @@ impl Job for StateUpdateJob {
let out_last_block_number =
settlement_client.get_last_settled_block().await.map_err(|e| JobError::Other(OtherError(e)))?;
let block_status = if out_last_block_number == *expected_last_block_number {
tracing::info!(log_type = "completed", category = "state_update", function_type = "verify_job", job_id = %job.id, block_no = %internal_id, last_settled_block = %out_last_block_number, "Last settled block verified.");
SettlementVerificationStatus::Verified
} else {
tracing::warn!(log_type = "failed/rejected", category = "state_update", function_type = "verify_job", job_id = %job.id, block_no = %internal_id, expected = %expected_last_block_number, actual = %out_last_block_number, "Last settled block mismatch.");
SettlementVerificationStatus::Rejected(format!(
"Last settle bock expected was {} but found {}",
expected_last_block_number, out_last_block_number
20 changes: 16 additions & 4 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
@@ -12,25 +12,37 @@ use utils::env_utils::get_env_var_or_default;
#[allow(clippy::needless_return)]
async fn main() {
dotenv().ok();

// Analytics Setup
let meter_provider = setup_analytics();
tracing::info!(service = "orchestrator", "Starting orchestrator service");

// initial config setup
let config = init_config().await;
tracing::debug!(service = "orchestrator", "Configuration initialized");

let host = get_env_var_or_default("HOST", "127.0.0.1");
let port = get_env_var_or_default("PORT", "3000").parse::<u16>().expect("PORT must be a u16");
let address = format!("{}:{}", host, port);
let listener = tokio::net::TcpListener::bind(address.clone()).await.expect("Failed to get listener");

let app = app_router();
tracing::debug!(service = "orchestrator", "Application router initialized");

// init consumer
init_consumers(config).await.expect("Failed to init consumers");
match init_consumers(config).await {
Ok(_) => tracing::info!(service = "orchestrator", "Consumers initialized successfully"),
Err(e) => {
tracing::error!(service = "orchestrator", error = %e, "Failed to initialize consumers");
panic!("Failed to init consumers: {}", e);
}
}

tracing::info!("Listening on http://{}", address);
axum::serve(listener, app).await.expect("Failed to start axum server");
if let Err(e) = axum::serve(listener, app).await {
tracing::error!(service = "orchestrator", error = %e, "Server failed to start");
panic!("Failed to start axum server: {}", e);
}

// Analytics Shutdown
shutdown_analytics(meter_provider);
tracing::info!(service = "orchestrator", "Orchestrator service shutting down");
}
43 changes: 30 additions & 13 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@ use omniqueue::{Delivery, QueueError};
use serde::{Deserialize, Deserializer, Serialize};
use thiserror::Error;
use tokio::time::sleep;
use tracing::log;
use uuid::Uuid;

use crate::config::Config;
@@ -104,12 +103,12 @@ enum DeliveryReturnType {
}

pub async fn add_job_to_process_queue(id: Uuid, config: Arc<Config>) -> EyreResult<()> {
log::info!("Adding job with id {:?} to processing queue", id);
tracing::info!("Adding job with id {:?} to processing queue", id);
add_job_to_queue(id, JOB_PROCESSING_QUEUE.to_string(), None, config).await
}

pub async fn add_job_to_verification_queue(id: Uuid, delay: Duration, config: Arc<Config>) -> EyreResult<()> {
log::info!("Adding job with id {:?} to verification queue", id);
tracing::info!("Adding job with id {:?} to verification queue", id);
add_job_to_queue(id, JOB_VERIFICATION_QUEUE.to_string(), Some(delay), config).await
}

@@ -123,26 +122,36 @@ where
F: Send + 'static,
Fut: Future<Output = Result<(), JobError>> + Send,
{
log::debug!("Consuming from queue {:?}", queue);
tracing::info!(queue = %queue, "Attempting to consume job from queue");

let delivery = get_delivery_from_queue(&queue, config.clone()).await?;

let message = match delivery {
DeliveryReturnType::Message(message) => message,
DeliveryReturnType::NoMessage => return Ok(()),
DeliveryReturnType::Message(message) => {
tracing::debug!(queue = %queue, "Message received from queue");
message
}
DeliveryReturnType::NoMessage => {
tracing::debug!(queue = %queue, "No message in queue");
return Ok(());
}
};

let job_message = parse_job_message(&message)?;

if let Some(job_message) = job_message {
tracing::info!(queue = %queue, job_id = %job_message.id, "Processing job message");
tokio::spawn(async move {
match handle_job_message(job_message, message, handler, config).await {
Ok(_) => {}
Err(e) => log::error!("Failed to handle job message. Error: {:?}", e),
}
});
} else {
tracing::warn!(queue = %queue, "Received empty job message");
}

tracing::info!(queue = %queue, "Job consumption completed successfully");
Ok(())
}

@@ -158,7 +167,7 @@ where
F: Send + 'static,
Fut: Future<Output = color_eyre::Result<()>> + Send,
{
log::debug!("Consuming from queue {:?}", queue);
tracing::debug!("Consuming from queue {:?}", queue);
let delivery = get_delivery_from_queue(&queue, config.clone()).await?;

let message = match delivery {
@@ -172,7 +181,7 @@ where
tokio::spawn(async move {
match handle_worker_message(job_message, message, handler, config).await {
Ok(_) => {}
Err(e) => log::error!("Failed to handle worker message. Error: {:?}", e),
Err(e) => tracing::error!("Failed to handle worker message. Error: {:?}", e),
}
});
}
@@ -204,7 +213,7 @@ where
F: FnOnce(Uuid, Arc<Config>) -> Fut,
Fut: Future<Output = Result<(), JobError>>,
{
log::info!("Handling job with id {:?}", job_message.id);
tracing::info!("Handling job with id {:?}", job_message.id);

match handler(job_message.id, config.clone()).await {
Ok(_) => {
@@ -217,7 +226,7 @@ where
Ok(())
}
Err(e) => {
log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e);
tracing::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e);
config
.alerts()
.send_alert_message(e.to_string())
@@ -261,7 +270,7 @@ where
Ok(())
}
Err(e) => {
log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e);
tracing::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e);
config
.alerts()
.send_alert_message(e.to_string())
@@ -304,7 +313,7 @@ macro_rules! spawn_consumer {
loop {
match $consume_function($queue_type, $handler, config_clone.clone()).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e),
Err(e) => tracing::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e),
}
sleep(Duration::from_millis(500)).await;
}
@@ -327,6 +336,14 @@ async fn spawn_worker(worker: Box<dyn Worker>, config: Arc<Config>) -> color_eyr
}
async fn add_job_to_queue(id: Uuid, queue: String, delay: Option<Duration>, config: Arc<Config>) -> EyreResult<()> {
let message = JobQueueMessage { id };
config.queue().send_message_to_queue(queue, serde_json::to_string(&message)?, delay).await?;
config.queue().send_message_to_queue(queue.clone(), serde_json::to_string(&message)?, delay).await?;
tracing::info!(
log_type = "JobQueue",
category = "add_job_to_queue",
function_type = "add_job_to_queue",
"Added job with id {:?} to {:?} queue",
id,
queue
);
Ok(())
}
24 changes: 23 additions & 1 deletion crates/orchestrator/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,9 @@ use std::time::Duration;
use lazy_static::lazy_static;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{ExportConfig, WithExportConfig};
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::metrics::reader::{DefaultAggregationSelector, DefaultTemporalitySelector};
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::trace::{BatchConfigBuilder, Config, Tracer};
@@ -32,7 +34,16 @@ pub fn setup_analytics() -> Option<SdkMeterProvider> {
if let Some(otel_endpoint) = otel_endpoint {
let meter_provider = init_metric_provider(&otel_endpoint);
let tracer = init_tracer_provider(&otel_endpoint);
tracing_subscriber.with(OpenTelemetryLayer::new(tracer)).init();

// Opentelemetry will not provide a global API to manage the logger
// provider. Application users must manage the lifecycle of the logger
// provider on their own. Dropping logger providers will disable log
// emitting.
let logger_provider = init_logs(&otel_endpoint).unwrap();
// Create a new OpenTelemetryTracingBridge using the above LoggerProvider.
let layer = OpenTelemetryTracingBridge::new(&logger_provider);

tracing_subscriber.with(OpenTelemetryLayer::new(tracer)).with(layer).init();
Some(meter_provider)
} else {
tracing_subscriber.init();
@@ -103,6 +114,17 @@ pub fn init_metric_provider(otel_endpoint: &str) -> SdkMeterProvider {
provider
}

fn init_logs(otel_endpoint: &str) -> Result<LoggerProvider, opentelemetry::logs::LogError> {
opentelemetry_otlp::new_pipeline()
.logging()
.with_resource(Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
format!("{}{}", *OTEL_SERVICE_NAME, "_logs_service"),
)]))
.with_exporter(opentelemetry_otlp::new_exporter().tonic().with_endpoint(otel_endpoint.to_string()))
.install_batch(runtime::Tokio)
}

#[cfg(test)]
mod tests {
use std::env;
6 changes: 3 additions & 3 deletions crates/orchestrator/src/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -73,11 +73,11 @@ pub async fn create_sqs_queues(provider_config: Arc<ProviderConfig>) -> color_ey
// Dropping sqs queues
let list_queues_output = sqs_client.list_queues().send().await?;
let queue_urls = list_queues_output.queue_urls();
log::debug!("Found {} queues", queue_urls.len());
tracing::debug!("Found {} queues", queue_urls.len());
for queue_url in queue_urls {
match sqs_client.delete_queue().queue_url(queue_url).send().await {
Ok(_) => log::debug!("Successfully deleted queue: {}", queue_url),
Err(e) => eprintln!("Error deleting queue {}: {:?}", queue_url, e),
Ok(_) => tracing::debug!("Successfully deleted queue: {}", queue_url),
Err(e) => tracing::error!("Error deleting queue {}: {:?}", queue_url, e),
}
}

35 changes: 32 additions & 3 deletions crates/orchestrator/src/workers/data_submission_worker.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@ impl Worker for DataSubmissionWorker {
// 2. Fetch the latest DA job creation.
// 3. Create jobs from after the lastest DA job already created till latest completed proving job.
async fn run_worker(&self, config: Arc<Config>) -> Result<(), Box<dyn Error>> {
tracing::info!(log_type = "starting", category = "DataSubmissionWorker", "DataSubmissionWorker started.");

// provides latest completed proof creation job id
let latest_proven_job_id = config
.database()
@@ -27,6 +29,8 @@ impl Worker for DataSubmissionWorker {
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

tracing::debug!(latest_proven_job_id, "Fetched latest completed ProofCreation job");

// provides latest triggered data submission job id
let latest_data_submission_job_id = config
.database()
@@ -36,15 +40,40 @@ impl Worker for DataSubmissionWorker {
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?;
let latest_proven_id: u64 = latest_proven_job_id.parse()?;
tracing::debug!(latest_data_submission_job_id, "Fetched latest DataSubmission job");

let latest_data_submission_id: u64 = match latest_data_submission_job_id.parse() {
Ok(id) => id,
Err(e) => {
tracing::error!(error = ?e, "Failed to parse latest_data_submission_job_id");
return Err(Box::new(e));
}
};

let latest_proven_id: u64 = match latest_proven_job_id.parse() {
Ok(id) => id,
Err(e) => {
tracing::error!(error = ?e, "Failed to parse latest_proven_job_id");
return Err(Box::new(e));
}
};

tracing::debug!(latest_data_submission_id, latest_proven_id, "Parsed job IDs");

// creating data submission jobs for latest blocks that don't have existing data submission jobs
// yet.
for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 {
create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new(), config.clone()).await?;
tracing::debug!(new_job_id, "Creating new DataSubmission job");
match create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new(), config.clone()).await {
Ok(_) => tracing::info!(job_id = new_job_id, "Successfully created DataSubmission job"),
Err(e) => {
tracing::error!(job_id = new_job_id, error = ?e, "Failed to create DataSubmission job");
return Err(Box::new(e));
}
}
}

tracing::info!(log_type = "completed", category = "DataSubmissionWorker", "DataSubmissionWorker completed.");
Ok(())
}
}
13 changes: 12 additions & 1 deletion crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
@@ -15,15 +15,26 @@ impl Worker for ProvingWorker {
/// 1. Fetch all successful SNOS job runs that don't have a proving job
/// 2. Create a proving job for each SNOS job run
async fn run_worker(&self, config: Arc<Config>) -> Result<(), Box<dyn Error>> {
tracing::info!(log_type = "starting", category = "ProvingWorker", "ProvingWorker started.");

let successful_snos_jobs = config
.database()
.get_jobs_without_successor(JobType::SnosRun, JobStatus::Completed, JobType::ProofCreation)
.await?;

tracing::debug!("Found {} successful SNOS jobs without proving jobs", successful_snos_jobs.len());

for job in successful_snos_jobs {
create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await?
tracing::debug!(job_id = %job.internal_id, "Creating proof creation job for SNOS job");
match create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata.clone(), config.clone())
.await
{
Ok(_) => tracing::info!(block_no = %job.internal_id, "Successfully created proof creation job"),
Err(e) => tracing::error!(job_id = %job.internal_id, error = %e, "Failed to create proof creation job"),
}
}

tracing::info!(log_type = "completed", category = "ProvingWorker", "ProvingWorker completed.");
Ok(())
}
}
17 changes: 15 additions & 2 deletions crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
@@ -18,8 +18,11 @@ impl Worker for SnosWorker {
/// 2. Fetch the last block that had a SNOS job run.
/// 3. Create SNOS run jobs for all the remaining blocks
async fn run_worker(&self, config: Arc<Config>) -> Result<(), Box<dyn Error>> {
tracing::info!(log_type = "starting", category = "SnosWorker", "SnosWorker started.");

let provider = config.starknet_client();
let latest_block_number = provider.block_number().await?;
tracing::debug!(latest_block_number = %latest_block_number, "Fetched latest block number from Starknet");

let latest_block_processed_data = config
.database()
@@ -28,6 +31,7 @@ impl Worker for SnosWorker {
.unwrap()
.map(|item| item.internal_id)
.unwrap_or("0".to_string());
tracing::debug!(latest_processed_block = %latest_block_processed_data, "Fetched latest processed block from database");

// Check if job does not exist
// TODO: fetching all SNOS jobs with internal id > latest_block_processed_data
@@ -39,22 +43,31 @@ impl Worker for SnosWorker {
.unwrap();

if job_in_db.is_some() {
tracing::trace!(block_number = %latest_block_number, "SNOS job already exists for the latest block");
return Ok(());
}

let latest_block_processed: u64 = latest_block_processed_data.parse()?;
let latest_block_processed: u64 = match latest_block_processed_data.parse() {
Ok(block) => block,
Err(e) => {
tracing::error!(error = %e, block_no = %latest_block_processed_data, "Failed to parse latest processed block number");
return Err(Box::new(e));
}
};

let block_diff = latest_block_number - latest_block_processed;
tracing::debug!(block_diff = %block_diff, "Calculated block difference");

// if all blocks are processed
if block_diff == 0 {
tracing::info!("All blocks are already processed");
return Ok(());
}

for x in latest_block_processed + 1..latest_block_number + 1 {
create_job(JobType::SnosRun, x.to_string(), HashMap::new(), config.clone()).await?;
}

tracing::info!(log_type = "completed", category = "SnosWorker", "SnosWorker completed.");
Ok(())
}
}
56 changes: 39 additions & 17 deletions crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
@@ -17,59 +17,81 @@ impl Worker for UpdateStateWorker {
/// 2. Fetch all successful proving jobs covering blocks after the last state update
/// 3. Create state updates for all the blocks that don't have a state update job
async fn run_worker(&self, config: Arc<Config>) -> Result<(), Box<dyn Error>> {
tracing::info!(log_type = "starting", category = "UpdateStateWorker", "UpdateStateWorker started.");

let latest_successful_job =
config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?;

match latest_successful_job {
Some(job) => {
tracing::debug!(job_id = %job.id, "Found latest successful state transition job");
let successful_da_jobs_without_successor = config
.database()
.get_jobs_without_successor(JobType::DataSubmission, JobStatus::Completed, JobType::StateTransition)
.await?;

if successful_da_jobs_without_successor.is_empty() {
tracing::debug!("No new data submission jobs to process");
return Ok(());
}

let mut metadata = job.metadata;
metadata.insert(
JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(),
Self::parse_job_items_into_block_number_list(successful_da_jobs_without_successor.clone()),
tracing::debug!(
count = successful_da_jobs_without_successor.len(),
"Found data submission jobs without state transition"
);

let mut metadata = job.metadata;
let blocks_to_settle =
Self::parse_job_items_into_block_number_list(successful_da_jobs_without_successor.clone());
metadata.insert(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(), blocks_to_settle.clone());

tracing::trace!(blocks_to_settle = %blocks_to_settle, "Prepared blocks to settle for state transition");

// Creating a single job for all the pending blocks.
create_job(
JobType::StateTransition,
successful_da_jobs_without_successor[0].internal_id.clone(),
metadata,
config,
)
.await?;
let new_job_id = successful_da_jobs_without_successor[0].internal_id.clone();
match create_job(JobType::StateTransition, new_job_id.clone(), metadata, config).await {
Ok(_) => tracing::info!(job_id = %new_job_id, "Successfully created new state transition job"),
Err(e) => {
tracing::error!(job_id = %new_job_id, error = %e, "Failed to create new state transition job");
return Err(e.into());
}
}

tracing::info!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed.");
Ok(())
}
None => {
tracing::warn!("No previous state transition job found, fetching latest data submission job");
// Getting latest DA job in case no latest state update job is present
let latest_successful_jobs_without_successor = config
.database()
.get_jobs_without_successor(JobType::DataSubmission, JobStatus::Completed, JobType::StateTransition)
.await?;

if latest_successful_jobs_without_successor.is_empty() {
tracing::debug!("No data submission jobs found to process");
return Ok(());
}

let job = latest_successful_jobs_without_successor[0].clone();
let mut metadata = job.metadata;

metadata.insert(
JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(),
Self::parse_job_items_into_block_number_list(latest_successful_jobs_without_successor.clone()),
);
let blocks_to_settle =
Self::parse_job_items_into_block_number_list(latest_successful_jobs_without_successor.clone());
metadata.insert(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(), blocks_to_settle.clone());

create_job(JobType::StateTransition, job.internal_id, metadata, config).await?;
tracing::trace!(job_id = %job.id, blocks_to_settle = %blocks_to_settle, "Prepared blocks to settle for initial state transition");

return Ok(());
match create_job(JobType::StateTransition, job.internal_id.clone(), metadata, config).await {
Ok(_) => tracing::info!(job_id = %job.id, "Successfully created initial state transition job"),
Err(e) => {
tracing::error!(job_id = %job.id, error = %e, "Failed to create initial state transition job");
return Err(e.into());
}
}

tracing::info!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed.");
Ok(())
}
}
}
66 changes: 62 additions & 4 deletions crates/prover-services/sharp-service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -26,20 +26,38 @@ pub struct SharpProverService {

#[async_trait]
impl ProverClient for SharpProverService {
#[tracing::instrument(skip(self, task))]
#[tracing::instrument(skip(self, task), ret, err)]
async fn submit_task(&self, task: Task) -> Result<String, ProverClientError> {
tracing::info!(
log_type = "starting",
category = "submit_task",
function_type = "cairo_pie",
"Submitting Cairo PIE task."
);
match task {
Task::CairoPie(cairo_pie) => {
let encoded_pie =
starknet_os::sharp::pie::encode_pie_mem(cairo_pie).map_err(ProverClientError::PieEncoding)?;
let (_, job_key) = self.sharp_client.add_job(&encoded_pie).await?;
tracing::info!(
log_type = "completed",
category = "submit_task",
function_type = "cairo_pie",
"Cairo PIE task submitted."
);
Ok(job_key.to_string())
}
}
}

#[tracing::instrument(skip(self))]
#[tracing::instrument(skip(self), ret, err)]
async fn get_task_status(&self, job_key: &str, fact: &str) -> Result<TaskStatus, ProverClientError> {
tracing::info!(
log_type = "starting",
category = "get_task_status",
function_type = "cairo_pie",
"Getting Cairo PIE task status."
);
let job_key = Uuid::from_str(job_key)
.map_err(|e| ProverClientError::InvalidJobKey(format!("Failed to convert {} to UUID {}", job_key, e)))?;
let res = self.sharp_client.get_job_status(&job_key).await?;
@@ -48,19 +66,59 @@ impl ProverClient for SharpProverService {
// TODO : We would need to remove the FAILED, UNKNOWN, NOT_CREATED status as it is not in the sharp client
// response specs : https://docs.google.com/document/d/1-9ggQoYmjqAtLBGNNR2Z5eLreBmlckGYjbVl0khtpU0
// We are waiting for the official public API spec before making changes
CairoJobStatus::FAILED => Ok(TaskStatus::Failed(res.error_log.unwrap_or_default())),
CairoJobStatus::FAILED => {
tracing::error!(
log_type = "failed",
category = "get_task_status",
function_type = "cairo_pie",
"Cairo PIE task status: FAILED."
);
Ok(TaskStatus::Failed(res.error_log.unwrap_or_default()))
}
CairoJobStatus::INVALID => {
tracing::warn!(
log_type = "completed",
category = "get_task_status",
function_type = "cairo_pie",
"Cairo PIE task status: INVALID."
);
Ok(TaskStatus::Failed(format!("Task is invalid: {:?}", res.invalid_reason.unwrap_or_default())))
}
CairoJobStatus::UNKNOWN => Ok(TaskStatus::Failed(format!("Task not found: {}", job_key))),
CairoJobStatus::UNKNOWN => {
tracing::warn!(
log_type = "unknown",
category = "get_task_status",
function_type = "cairo_pie",
"Cairo PIE task status: UNKNOWN."
);
Ok(TaskStatus::Failed(format!("Task not found: {}", job_key)))
}
CairoJobStatus::IN_PROGRESS | CairoJobStatus::NOT_CREATED | CairoJobStatus::PROCESSED => {
tracing::info!(
log_type = "in_progress",
category = "get_task_status",
function_type = "cairo_pie",
"Cairo PIE task status: IN_PROGRESS, NOT_CREATED, or PROCESSED."
);
Ok(TaskStatus::Processing)
}
CairoJobStatus::ONCHAIN => {
let fact = B256::from_str(fact).map_err(|e| ProverClientError::FailedToConvertFact(e.to_string()))?;
if self.fact_checker.is_valid(&fact).await? {
tracing::info!(
log_type = "onchain",
category = "get_task_status",
function_type = "cairo_pie",
"Cairo PIE task status: ONCHAIN and fact is valid."
);
Ok(TaskStatus::Succeeded)
} else {
tracing::error!(
log_type = "onchain_failed",
category = "get_task_status",
function_type = "cairo_pie",
"Cairo PIE task status: ONCHAIN and fact is not valid."
);
Ok(TaskStatus::Failed(format!("Fact {} is not valid or not registered", hex::encode(fact))))
}
}
57 changes: 56 additions & 1 deletion crates/settlement-clients/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
@@ -154,11 +154,24 @@ impl SettlementClient for EthereumSettlementClient {
onchain_data_hash: [u8; 32],
onchain_data_size: [u8; 32],
) -> Result<String> {
tracing::info!(
log_type = "starting",
category = "update_state",
function_type = "calldata",
"Updating state with calldata."
);
let program_output: Vec<U256> = vec_u8_32_to_vec_u256(program_output.as_slice())?;
let onchain_data_hash: U256 = slice_u8_to_u256(&onchain_data_hash)?;
let onchain_data_size = U256::from_be_bytes(onchain_data_size);
let tx_receipt =
self.core_contract_client.update_state(program_output, onchain_data_hash, onchain_data_size).await?;
tracing::info!(
log_type = "completed",
category = "update_state",
function_type = "calldata",
tx_hash = %tx_receipt.transaction_hash,
"State updated with calldata."
);
Ok(format!("0x{:x}", tx_receipt.transaction_hash))
}

@@ -169,6 +182,12 @@ impl SettlementClient for EthereumSettlementClient {
state_diff: Vec<Vec<u8>>,
nonce: u64,
) -> Result<String> {
tracing::info!(
log_type = "starting",
category = "update_state",
function_type = "blobs",
"Updating state with blobs."
);
let (sidecar_blobs, sidecar_commitments, sidecar_proofs) = prepare_sidecar(&state_diff, &KZG_SETTINGS).await?;
let sidecar = BlobTransactionSidecar::new(sidecar_blobs, sidecar_commitments, sidecar_proofs);

@@ -223,22 +242,58 @@ impl SettlementClient for EthereumSettlementClient {
{ test_config::configure_transaction(self.provider.clone(), tx_envelope, self.impersonate_account).await };

let pending_transaction = self.provider.send_transaction(txn_request).await?;
tracing::info!(
log_type = "completed",
category = "update_state",
function_type = "blobs",
"State updated with blobs."
);
return Ok(pending_transaction.tx_hash().to_string());
}

/// Should verify the inclusion of a tx in the settlement layer
async fn verify_tx_inclusion(&self, tx_hash: &str) -> Result<SettlementVerificationStatus> {
tracing::info!(
log_type = "starting",
category = "verify_tx",
function_type = "inclusion",
tx_hash = %tx_hash,
"Verifying tx inclusion."
);
let tx_hash = B256::from_str(tx_hash)?;
let maybe_tx_status: Option<TransactionReceipt> = self.provider.get_transaction_receipt(tx_hash).await?;
match maybe_tx_status {
Some(tx_status) => {
if tx_status.status() {
tracing::info!(
log_type = "completed",
category = "verify_tx",
function_type = "inclusion",
tx_hash = %tx_status.transaction_hash,
"Tx inclusion verified."
);
Ok(SettlementVerificationStatus::Verified)
} else {
tracing::info!(
log_type = "pending",
category = "verify_tx",
function_type = "inclusion",
tx_hash = %tx_status.transaction_hash,
"Tx inclusion pending."
);
Ok(SettlementVerificationStatus::Pending)
}
}
None => Ok(SettlementVerificationStatus::Rejected(format!("Could not find status of tx: {}", tx_hash))),
None => {
tracing::info!(
log_type = "pending",
category = "verify_tx",
function_type = "inclusion",
tx_hash = %tx_hash,
"Tx inclusion pending."
);
Ok(SettlementVerificationStatus::Pending)
}
}
}

4 changes: 2 additions & 2 deletions crates/settlement-clients/starknet/src/conversion.rs
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ fn test_u64_from_from_felt_panic() {
let number = Felt::MAX;
let number = u64_from_felt(number);
match number {
Ok(n) => log::info!("Nonce value from get_nonce: {:?}", n),
Err(e) => log::error!("Error getting nonce: {:?}", e),
Ok(n) => tracing::debug!("Nonce value from get_nonce: {:?}", n),
Err(e) => tracing::error!("Error getting nonce: {:?}", e),
}
}
54 changes: 48 additions & 6 deletions crates/settlement-clients/starknet/src/lib.rs
Original file line number Diff line number Diff line change
@@ -116,32 +116,74 @@ impl SettlementClient for StarknetSettlementClient {
onchain_data_hash: [u8; 32],
onchain_data_size: [u8; 32],
) -> Result<String> {
tracing::info!(
log_type = "starting",
category = "update_state",
function_type = "calldata",
"Updating state with calldata."
);
let program_output = slice_slice_u8_to_vec_field(program_output.as_slice());
let onchain_data_hash = slice_u8_to_field(&onchain_data_hash);
let core_contract: &CoreContract = self.starknet_core_contract_client.as_ref();
let onchain_data_size = crypto_bigint::U256::from_be_bytes(onchain_data_size).into();
let invoke_result = core_contract.update_state(program_output, onchain_data_hash, onchain_data_size).await?;

tracing::info!(
log_type = "completed",
category = "update_state",
function_type = "calldata",
"State updated with calldata."
);
Ok(invoke_result.transaction_hash.to_hex_string())
}

/// Should verify the inclusion of a tx in the settlement layer
async fn verify_tx_inclusion(&self, tx_hash: &str) -> Result<SettlementVerificationStatus> {
tracing::info!(
log_type = "starting",
category = "verify_tx",
function_type = "inclusion",
tx_hash = %tx_hash,
"Verifying tx inclusion."
);
let tx_hash = Felt::from_hex(tx_hash)?;
let tx_receipt = self.account.provider().get_transaction_receipt(tx_hash).await?;
let execution_result = tx_receipt.receipt.execution_result();
let status = execution_result.status();

match status {
TransactionExecutionStatus::Reverted => Ok(SettlementVerificationStatus::Rejected(format!(
"Transaction {} has been reverted: {}",
tx_hash,
execution_result.revert_reason().unwrap()
))),
TransactionExecutionStatus::Reverted => {
tracing::info!(
log_type = "completed",
category = "verify_tx",
tx_hash = %tx_hash,
function_type = "inclusion",
revert_reason = %execution_result.revert_reason().unwrap(),
"Tx inclusion verified."
);
Ok(SettlementVerificationStatus::Rejected(format!(
"Transaction {} has been reverted: {}",
tx_hash,
execution_result.revert_reason().unwrap()
)))
}
TransactionExecutionStatus::Succeeded => {
if tx_receipt.block.is_pending() {
tracing::info!(
log_type = "pending",
category = "verify_tx",
function_type = "inclusion",
tx_hash = %tx_hash,
"Tx inclusion pending."
);
Ok(SettlementVerificationStatus::Pending)
} else {
tracing::info!(
log_type = "completed",
category = "verify_tx",
function_type = "inclusion",
tx_hash = %tx_hash,
"Tx inclusion verified."
);
Ok(SettlementVerificationStatus::Verified)
}
}
10 changes: 5 additions & 5 deletions crates/settlement-clients/starknet/src/tests/test.rs
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ use crate::{LocalWalletSignerMiddleware, StarknetSettlementClient};
#[fixture]
pub async fn spin_up_madara() -> MadaraCmd {
dotenvy::from_filename_override(".env.test").expect("Failed to load the .env file");
log::trace!("Spinning up Madara");
tracing::debug!("Spinning up Madara");
let mut node = MadaraCmdBuilder::new()
.args([
"--no-sync-polling",
@@ -122,7 +122,7 @@ async fn test_settle(#[future] setup: (LocalWalletSignerMiddleware, MadaraCmd))

let DeclareTransactionResult { transaction_hash: declare_tx_hash, class_hash: _ } =
account.declare_v2(Arc::new(flattened_class.clone()), compiled_class_hash).send().await.unwrap();
log::debug!("declare tx hash {:?}", declare_tx_hash);
tracing::debug!("declare tx hash {:?}", declare_tx_hash);

let is_success = wait_for_tx(&account, declare_tx_hash, Duration::from_secs(2)).await;
assert!(is_success, "Declare trasactiion failed");
@@ -148,7 +148,7 @@ async fn test_settle(#[future] setup: (LocalWalletSignerMiddleware, MadaraCmd))
.await
.expect("Sending Update state");

log::debug!("update state tx hash {:?}", update_state_tx_hash);
tracing::debug!("update state tx hash {:?}", update_state_tx_hash);

let is_success = wait_for_tx(
&account,
@@ -179,8 +179,8 @@ async fn test_get_nonce_works(#[future] setup: (LocalWalletSignerMiddleware, Mad
let (account, _madara_process) = setup.await;
let nonce = account.get_nonce().await;
match &nonce {
Ok(n) => log::info!("Nonce value from get_nonce: {:?}", n),
Err(e) => log::error!("Error getting nonce: {:?}", e),
Ok(n) => tracing::debug!("Nonce value from get_nonce: {:?}", n),
Err(e) => tracing::error!("Error getting nonce: {:?}", e),
}
assert!(nonce.is_ok(), "Failed to get nonce");
}
4 changes: 2 additions & 2 deletions e2e-tests/src/localstack.rs
Original file line number Diff line number Diff line change
@@ -45,10 +45,10 @@ impl LocalStack {
pub async fn setup_sqs(&self) -> color_eyre::Result<()> {
let list_queues_output = self.sqs_client.list_queues().send().await?;
let queue_urls = list_queues_output.queue_urls();
log::debug!("Found {} queues", queue_urls.len());
println!("Found {} queues", queue_urls.len());
for queue_url in queue_urls {
match self.sqs_client.delete_queue().queue_url(queue_url).send().await {
Ok(_) => log::debug!("Successfully deleted queue: {}", queue_url),
Ok(_) => println!("Successfully deleted queue: {}", queue_url),
Err(e) => eprintln!("Error deleting queue {}: {:?}", queue_url, e),
}
}

0 comments on commit 59f6172

Please sign in to comment.