diff --git a/user-ingest/src/download.rs b/user-ingest/src/download.rs index 7b52bf8..f17f429 100644 --- a/user-ingest/src/download.rs +++ b/user-ingest/src/download.rs @@ -9,29 +9,6 @@ use tempfile::NamedTempFile; const APP_ID: &str = "1422450"; -pub async fn process_salts(salts: Salts) { - let handle = futures::try_join!( - process_data(&salts, DataType::Demo), - process_data(&salts, DataType::Meta) - ); - if let Err(e) = handle { - match e { - ProcessError::Reqwest(e) => { - eprintln!("Reqwest error: {}", e); - } - ProcessError::S3(e) => { - eprintln!("S3 error: {}", e); - } - ProcessError::Io(e) => { - eprintln!("Io error: {}", e); - } - ProcessError::RmqError(e) => { - eprintln!("Rmq error: {}", e); - } - } - } -} - pub async fn process_data(salts: &Salts, data_type: DataType) -> Result<(), ProcessError> { let local_file = NamedTempFile::new().map_err(ProcessError::Io)?; let local_path = local_file.path().to_path_buf(); @@ -75,7 +52,7 @@ fn get_file_name(salts: &&Salts, data_type: DataType) -> Option { salt = salt, data_type = data_type ) - .into() + .into() } async fn download_to_file( diff --git a/user-ingest/src/main.rs b/user-ingest/src/main.rs index bd70fad..0af1968 100644 --- a/user-ingest/src/main.rs +++ b/user-ingest/src/main.rs @@ -1,16 +1,16 @@ +use crate::download::process_data; +use crate::models::DataType; use axum::extract::State; use axum::http::StatusCode; use axum::routing::{get, head, post}; use axum::{Json, Router}; use futures::future::join_all; -use futures::stream::FuturesUnordered; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use log::{debug, error}; use models::Salts; use serde::Serialize; use std::future::IntoFuture; use std::net::Ipv4Addr; -use std::sync::Arc; use tokio::io; use tokio::net::TcpListener; use tokio::sync::mpsc; @@ -21,8 +21,6 @@ mod rmq; mod s3; mod utils; -const MAX_PARALLEL_DOWNLOADS: usize = 20; - #[derive(Debug, Clone)] pub struct AppState { salts_channel: mpsc::Sender, @@ -58,7 +56,13 @@ async fn main() -> Result<(), io::Error> { debug!("Received metadata download task: {:?}", salts); tokio::spawn(async move { debug!("Received metadata download task: {:?}", salts); - download::process_salts(salts).await; + let demo_result = process_data(&salts, DataType::Demo).await; + let meta_result = process_data(&salts, DataType::Meta).await; + let result = demo_result.and(meta_result); + match result { + Ok(_) => debug!("Downloaded Match Data"), + Err(e) => error!("Failed to download Match Data: {:?}", e), + }; }); } });