Skip to content

Commit

Permalink
user-ingest: Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
raimannma committed Oct 15, 2024
1 parent 85d8e2b commit 2ad0414
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 30 deletions.
25 changes: 1 addition & 24 deletions user-ingest/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,6 @@ use tempfile::NamedTempFile;

const APP_ID: &str = "1422450";

pub async fn process_salts(salts: Salts) {
let handle = futures::try_join!(
process_data(&salts, DataType::Demo),
process_data(&salts, DataType::Meta)
);
if let Err(e) = handle {
match e {
ProcessError::Reqwest(e) => {
eprintln!("Reqwest error: {}", e);
}
ProcessError::S3(e) => {
eprintln!("S3 error: {}", e);
}
ProcessError::Io(e) => {
eprintln!("Io error: {}", e);
}
ProcessError::RmqError(e) => {
eprintln!("Rmq error: {}", e);
}
}
}
}

pub async fn process_data(salts: &Salts, data_type: DataType) -> Result<(), ProcessError> {
let local_file = NamedTempFile::new().map_err(ProcessError::Io)?;
let local_path = local_file.path().to_path_buf();
Expand Down Expand Up @@ -75,7 +52,7 @@ fn get_file_name(salts: &&Salts, data_type: DataType) -> Option<String> {
salt = salt,
data_type = data_type
)
.into()
.into()
}

async fn download_to_file(
Expand Down
16 changes: 10 additions & 6 deletions user-ingest/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::download::process_data;
use crate::models::DataType;
use axum::extract::State;
use axum::http::StatusCode;
use axum::routing::{get, head, post};
use axum::{Json, Router};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use futures::FutureExt;
use log::{debug, error};
use models::Salts;
use serde::Serialize;
use std::future::IntoFuture;
use std::net::Ipv4Addr;
use std::sync::Arc;
use tokio::io;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
Expand All @@ -21,8 +21,6 @@ mod rmq;
mod s3;
mod utils;

const MAX_PARALLEL_DOWNLOADS: usize = 20;

#[derive(Debug, Clone)]
pub struct AppState {
salts_channel: mpsc::Sender<Salts>,
Expand Down Expand Up @@ -58,7 +56,13 @@ async fn main() -> Result<(), io::Error> {
debug!("Received metadata download task: {:?}", salts);
tokio::spawn(async move {
debug!("Received metadata download task: {:?}", salts);
download::process_salts(salts).await;
let demo_result = process_data(&salts, DataType::Demo).await;
let meta_result = process_data(&salts, DataType::Meta).await;
let result = demo_result.and(meta_result);
match result {
Ok(_) => debug!("Downloaded Match Data"),
Err(e) => error!("Failed to download Match Data: {:?}", e),
};
});
}
});
Expand Down

0 comments on commit 2ad0414

Please sign in to comment.