Skip to content

Commit

Permalink
remove all unwraps (#153)
Browse files Browse the repository at this point in the history
* remove all unwraps

* update changelog

* fix toml lint

* Apply suggestions from code review

Co-authored-by: Mohit Dhattarwal <[email protected]>

* made error consistent

* resolve comments

* fix small comment

* move to conversion.rs

* fix: borrow issue

* fix: clippy warning

---------

Co-authored-by: Heemank Verma <[email protected]>
Co-authored-by: Mohit Dhattarwal <[email protected]>
  • Loading branch information
3 people authored Oct 11, 2024
1 parent 59f6172 commit 3b04d66
Show file tree
Hide file tree
Showing 24 changed files with 163 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- Fixes all unwraps() in code to improve error logging
- Simplified Update_Job for Database.
- Simplified otel setup.
- Added new_with_settings to SharpClient.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 15 additions & 13 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub async fn get_aws_config(settings_provider: &impl Settings) -> SdkConfig {
}

/// Initializes the app config
pub async fn init_config() -> Arc<Config> {
pub async fn init_config() -> color_eyre::Result<Arc<Config>> {
dotenv().ok();

let settings_provider = EnvSettingsProvider {};
Expand All @@ -104,7 +104,7 @@ pub async fn init_config() -> Arc<Config> {
// init database
let database = build_database_client(&settings_provider).await;
let da_client = build_da_client(&settings_provider).await;
let settlement_client = build_settlement_client(&settings_provider).await;
let settlement_client = build_settlement_client(&settings_provider).await?;
let prover_client = build_prover_service(&settings_provider);
let storage_client = build_storage_client(&settings_provider, provider_config.clone()).await;
let alerts_client = build_alert_client(&settings_provider, provider_config.clone()).await;
Expand All @@ -115,7 +115,7 @@ pub async fn init_config() -> Arc<Config> {
// us stop using the generic omniqueue abstractions for message ack/nack
let queue = build_queue_client();

Arc::new(Config::new(
Ok(Arc::new(Config::new(
rpc_url,
snos_url,
Arc::new(provider),
Expand All @@ -126,7 +126,7 @@ pub async fn init_config() -> Arc<Config> {
queue,
storage_client,
alerts_client,
))
)))
}

impl Config {
Expand Down Expand Up @@ -230,24 +230,26 @@ pub fn build_prover_service(settings_provider: &impl Settings) -> Box<dyn Prover
}

/// Builds the settlement client depending on the env variable SETTLEMENT_LAYER
pub async fn build_settlement_client(settings_provider: &impl Settings) -> Box<dyn SettlementClient + Send + Sync> {
pub async fn build_settlement_client(
settings_provider: &impl Settings,
) -> color_eyre::Result<Box<dyn SettlementClient + Send + Sync>> {
match get_env_var_or_panic("SETTLEMENT_LAYER").as_str() {
"ethereum" => {
#[cfg(not(feature = "testing"))]
{
Box::new(EthereumSettlementClient::new_with_settings(settings_provider))
Ok(Box::new(EthereumSettlementClient::new_with_settings(settings_provider)))
}
#[cfg(feature = "testing")]
{
Box::new(EthereumSettlementClient::with_test_settings(
RootProvider::new_http(get_env_var_or_panic("SETTLEMENT_RPC_URL").as_str().parse().unwrap()),
Address::from_str(&get_env_var_or_panic("L1_CORE_CONTRACT_ADDRESS")).unwrap(),
Url::from_str(get_env_var_or_panic("SETTLEMENT_RPC_URL").as_str()).unwrap(),
Some(Address::from_str(get_env_var_or_panic("STARKNET_OPERATOR_ADDRESS").as_str()).unwrap()),
))
Ok(Box::new(EthereumSettlementClient::with_test_settings(
RootProvider::new_http(get_env_var_or_panic("SETTLEMENT_RPC_URL").as_str().parse()?),
Address::from_str(&get_env_var_or_panic("L1_CORE_CONTRACT_ADDRESS"))?,
Url::from_str(get_env_var_or_panic("SETTLEMENT_RPC_URL").as_str())?,
Some(Address::from_str(get_env_var_or_panic("STARKNET_OPERATOR_ADDRESS").as_str())?),
)))
}
}
"starknet" => Box::new(StarknetSettlementClient::new_with_settings(settings_provider).await),
"starknet" => Ok(Box::new(StarknetSettlementClient::new_with_settings(settings_provider).await)),
_ => panic!("Unsupported Settlement layer"),
}
}
Expand Down
10 changes: 10 additions & 0 deletions crates/orchestrator/src/jobs/conversion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use std::str::FromStr;

use crate::jobs::{JobError, OtherError};

pub fn parse_string<T: FromStr>(value: &str) -> Result<T, JobError>
where
T::Err: std::fmt::Display,
{
value.parse::<T>().map_err(|e| JobError::Other(OtherError::from(format!("Could not parse string: {e}"))))
}
6 changes: 3 additions & 3 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ lazy_static! {
pub static ref GENERATOR: BigUint = BigUint::from_str(
"39033254847818212395286706435128746857159659164139250548781411570340225835782",
)
.unwrap();
.expect("Failed to convert to biguint");

pub static ref BLS_MODULUS: BigUint = BigUint::from_str(
"52435875175126190479447740508185965837690552500527637822603658699938581184513",
)
.unwrap();
pub static ref TWO: BigUint = 2u32.to_biguint().unwrap();
.expect("Failed to convert to biguint");
pub static ref TWO: BigUint = 2u32.to_biguint().expect("Failed to convert to biguint");

pub static ref BLOB_LEN: usize = 4096;
}
Expand Down
15 changes: 9 additions & 6 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use async_trait::async_trait;
use color_eyre::eyre::{eyre, Context};
use conversion::parse_string;
use da_job::DaError;
use mockall::automock;
use mockall_double::double;
Expand All @@ -25,6 +26,7 @@ use crate::metrics::ORCHESTRATOR_METRICS;
use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue, ConsumptionError};

pub mod constants;
pub mod conversion;
pub mod da_job;
pub mod job_handler_factory;
pub mod proving_job;
Expand Down Expand Up @@ -173,7 +175,7 @@ pub async fn create_job(
KeyValue::new("job", format!("{:?}", job_item)),
];

ORCHESTRATOR_METRICS.block_gauge.record(internal_id.parse::<f64>().unwrap(), &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes);
tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block");
Ok(())
}
Expand Down Expand Up @@ -260,8 +262,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
KeyValue::new("job", format!("{:?}", job)),
];

ORCHESTRATOR_METRICS.block_gauge.record(job.internal_id.parse::<f64>().unwrap(), &attributes);

ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block");
Ok(())
}
Expand Down Expand Up @@ -394,8 +395,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
KeyValue::new("job", format!("{:?}", job)),
];

ORCHESTRATOR_METRICS.block_gauge.record(job.internal_id.parse::<f64>().unwrap(), &attributes);

ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block");
Ok(())
}
Expand Down Expand Up @@ -460,7 +460,10 @@ pub fn increment_key_in_metadata(
let attempt = get_u64_from_metadata(metadata, key).map_err(|e| JobError::Other(OtherError(e)))?;
let incremented_value = attempt.checked_add(1);
incremented_value.ok_or_else(|| JobError::KeyOutOfBounds { key: key.to_string() })?;
new_metadata.insert(key.to_string(), incremented_value.unwrap().to_string());
new_metadata.insert(
key.to_string(),
incremented_value.ok_or(JobError::Other(OtherError(eyre!("Overflow while incrementing attempt"))))?.to_string(),
);
Ok(new_metadata)
}

Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/jobs/snos_job/fact_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub fn get_page_sizes(pages: &HashMap<usize, PublicMemoryPage>, output_size: usi
tracing::trace!("FactTopology Processing non-first page");
ensure!(
Some(page_start) == expected_page_start,
FactError::OutputPagesUnexpectedStart(page_id, page_start, expected_page_start.unwrap_or_default(),)
FactError::OutputPagesUnexpectedStart(page_id, page_start, expected_page_start.unwrap_or_default(),) /* The unwrap here is fine as the assert is exactly for this reason */
);
}

Expand All @@ -123,7 +123,7 @@ pub fn get_page_sizes(pages: &HashMap<usize, PublicMemoryPage>, output_size: usi

ensure!(
pages.is_empty() || expected_page_start == Some(output_size),
FactError::OutputPagesUncoveredOutput(expected_page_start.unwrap_or_default(), output_size)
FactError::OutputPagesUncoveredOutput(expected_page_start.unwrap_or_default(), output_size) /* The unwrap here is fine as the assert is exactly for this reason */
);

tracing::debug!(
Expand Down
6 changes: 5 additions & 1 deletion crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ impl Job for StateUpdateJob {
// number : 0
let metadata_tx_hashes = job
.metadata
.get(&format!("{}{}", JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX, attempt_no.parse::<u32>().unwrap() - 1))
.get(&format!(
"{}{}",
JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX,
attempt_no.parse::<u32>().map_err(|e| JobError::Other(OtherError(eyre!(e))))? - 1
))
.ok_or_else(|| StateUpdateError::TxnHashMetadataNotFound)?
.clone()
.replace(' ', "");
Expand Down
12 changes: 7 additions & 5 deletions crates/orchestrator/src/jobs/state_update_job/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn fetch_program_data_for_block(block_number: u64, config: Arc<Config>
let storage_client = config.storage();
let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;
let blob_data = storage_client.get_data(&key).await?;
let transformed_blob_vec_u8 = bytes_to_vec_u8(blob_data.as_ref());
let transformed_blob_vec_u8 = bytes_to_vec_u8(blob_data.as_ref())?;
Ok(transformed_blob_vec_u8)
}

Expand All @@ -48,7 +48,7 @@ pub fn hex_string_to_u8_vec(hex_str: &str) -> color_eyre::Result<Vec<u8>> {
Ok(result)
}

pub fn bytes_to_vec_u8(bytes: &[u8]) -> Vec<[u8; 32]> {
pub fn bytes_to_vec_u8(bytes: &[u8]) -> color_eyre::Result<Vec<[u8; 32]>> {
let cursor = Cursor::new(bytes);
let reader = std::io::BufReader::new(cursor);

Expand All @@ -62,11 +62,13 @@ pub fn bytes_to_vec_u8(bytes: &[u8]) -> Vec<[u8; 32]> {
let result = U256::from_str(trimmed).expect("Unable to convert line");
let res_vec = result.to_be_bytes_vec();
let hex = to_padded_hex(res_vec.as_slice());
let vec_hex = hex_string_to_u8_vec(&hex).unwrap();
program_output.push(vec_hex.try_into().unwrap());
let vec_hex = hex_string_to_u8_vec(&hex)
.map_err(|e| eyre!(format!("Failed converting hex string to Vec<u8>, {:?}", e)))?;
program_output
.push(vec_hex.try_into().map_err(|e| eyre!(format!("Failed to convert Vec<u8> to [u8; 32] : {:?}", e)))?);
}

program_output
Ok(program_output)
}

fn to_padded_hex(slice: &[u8]) -> String {
Expand Down
4 changes: 3 additions & 1 deletion crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ async fn main() {
let meter_provider = setup_analytics();
tracing::info!(service = "orchestrator", "Starting orchestrator service");

color_eyre::install().expect("Unable to isntall color_eyre");

// initial config setup
let config = init_config().await;
let config = init_config().await.expect("Config instantiation failed");
tracing::debug!(service = "orchestrator", "Configuration initialized");

let host = get_env_var_or_default("HOST", "127.0.0.1");
Expand Down
4 changes: 3 additions & 1 deletion crates/orchestrator/src/tests/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ pub mod implement_client {
) -> Box<dyn SettlementClient> {
match service {
ConfigType::Mock(client) => client.into(),
ConfigType::Actual => build_settlement_client(settings_provider).await,
ConfigType::Actual => {
build_settlement_client(settings_provider).await.expect("Failed to initialise settlement_client")
}
ConfigType::Dummy => Box::new(MockSettlementClient::new()),
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::tests::common::default_job_item;
use crate::tests::config::TestConfigBuilder;

lazy_static! {
pub static ref CURRENT_PATH: PathBuf = std::env::current_dir().unwrap();
pub static ref CURRENT_PATH: PathBuf = std::env::current_dir().expect("Failed to get Current Path");
}

pub const X_0_FILE_NAME: &str = "x_0.txt";
Expand Down
6 changes: 2 additions & 4 deletions crates/orchestrator/src/workers/data_submission_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ impl Worker for DataSubmissionWorker {
let latest_proven_job_id = config
.database()
.get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed)
.await
.unwrap()
.await?
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

Expand All @@ -35,8 +34,7 @@ impl Worker for DataSubmissionWorker {
let latest_data_submission_job_id = config
.database()
.get_latest_job_by_type(JobType::DataSubmission)
.await
.unwrap()
.await?
.map(|item| item.internal_id)
.unwrap_or("0".to_string());

Expand Down
6 changes: 2 additions & 4 deletions crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ impl Worker for SnosWorker {
let latest_block_processed_data = config
.database()
.get_latest_job_by_type_and_status(JobType::SnosRun, JobStatus::Completed)
.await
.unwrap()
.await?
.map(|item| item.internal_id)
.unwrap_or("0".to_string());
tracing::debug!(latest_processed_block = %latest_block_processed_data, "Fetched latest processed block from database");
Expand All @@ -39,8 +38,7 @@ impl Worker for SnosWorker {
let job_in_db = config
.database()
.get_job_by_internal_id_and_type(&latest_block_number.to_string(), &JobType::SnosRun)
.await
.unwrap();
.await?;

if job_in_db.is_some() {
tracing::trace!(block_number = %latest_block_number, "SNOS job already exists for the latest block");
Expand Down
23 changes: 17 additions & 6 deletions crates/prover-services/sharp-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@ impl SharpClient {
pub fn new_with_settings(url: Url, settings: &impl Settings) -> Self {
// Getting the cert files from the .env and then decoding it from base64

let cert = general_purpose::STANDARD.decode(settings.get_settings_or_panic("SHARP_USER_CRT")).unwrap();
let key = general_purpose::STANDARD.decode(settings.get_settings_or_panic("SHARP_USER_KEY")).unwrap();
let server_cert = general_purpose::STANDARD.decode(settings.get_settings_or_panic("SHARP_SERVER_CRT")).unwrap();
let cert = general_purpose::STANDARD
.decode(settings.get_settings_or_panic("SHARP_USER_CRT"))
.expect("Failed to decode certificate");
let key = general_purpose::STANDARD
.decode(settings.get_settings_or_panic("SHARP_USER_KEY"))
.expect("Failed to decode sharp user key");
let server_cert = general_purpose::STANDARD
.decode(settings.get_settings_or_panic("SHARP_SERVER_CRT"))
.expect("Failed to decode sharp server certificate");

// Adding Customer ID to the url

Expand All @@ -44,10 +50,15 @@ impl SharpClient {
Self {
base_url: url_mut,
client: ClientBuilder::new()
.identity(Identity::from_pkcs8_pem(&cert, &key).unwrap())
.add_root_certificate(Certificate::from_pem(server_cert.as_slice()).unwrap())
.identity(
Identity::from_pkcs8_pem(&cert, &key)
.expect("Failed to build the identity from certificate and key"),
)
.add_root_certificate(
Certificate::from_pem(server_cert.as_slice()).expect("Failed to add root certificate"),
)
.build()
.unwrap(),
.expect("Failed to build the reqwest client with provided configs"),
}
}

Expand Down
15 changes: 12 additions & 3 deletions crates/prover-services/sharp-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@ pub struct SharpConfig {
impl SharpConfig {
pub fn new_with_settings(settings: &impl Settings) -> color_eyre::Result<Self> {
Ok(Self {
service_url: settings.get_settings_or_panic("SHARP_URL").parse().unwrap(),
rpc_node_url: settings.get_settings_or_panic("SETTLEMENT_RPC_URL").parse().unwrap(),
verifier_address: settings.get_settings_or_panic("GPS_VERIFIER_CONTRACT_ADDRESS").parse().unwrap(),
service_url: settings
.get_settings_or_panic("SHARP_URL")
.parse()
.expect("Failed to parse to service url for SharpConfig"),
rpc_node_url: settings
.get_settings_or_panic("SETTLEMENT_RPC_URL")
.parse()
.expect("Failed to parse to rpc_node_url for SharpConfig"),
verifier_address: settings
.get_settings_or_panic("GPS_VERIFIER_CONTRACT_ADDRESS")
.parse()
.expect("Failed to parse to verifier_address for SharpConfig"),
})
}
}
Loading

0 comments on commit 3b04d66

Please sign in to comment.