Skip to content

Commit

Permalink
Improve requirements endpoint + refac
Browse files Browse the repository at this point in the history
Signed-off-by: Fabricio Aguiar <[email protected]>
  • Loading branch information
fao89 committed Oct 15, 2023
1 parent b5bea50 commit 0a881da
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 49 deletions.
47 changes: 28 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 12 additions & 23 deletions src/sync/collections.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{build_service, get_json, request};
use super::{get_json, request};
use crate::models::{self, CollectionNew, CollectionVersionNew};
use crate::schema::collection_versions;
use actix_web::web;
Expand Down Expand Up @@ -31,10 +31,9 @@ pub struct CollectionData {

pub async fn get_version(
url: String,
client: Client,
service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
) -> Result<Value> {
let (service, resp) = request(url, &client, service).await;
let (service, resp) = request(url, service).await;
let status = resp.status().as_str().to_string();
let json_response = resp.json::<Value>().await.unwrap();
if status != "404" {
Expand All @@ -52,7 +51,6 @@ pub async fn get_version(
info!("Downloading {}", filename);
let (_, resp) = request(
json_response["download_url"].as_str().unwrap().to_string(),
&client,
service,
)
.await;
Expand All @@ -70,7 +68,6 @@ pub async fn get_version(
pub async fn sync_collections(
pool: web::Data<Pool<ConnectionManager<PgConnection>>>,
response: &Value,
client: Client,
service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
) -> Result<()> {
let results = response.as_object().unwrap()["data"].as_array().unwrap();
Expand All @@ -95,7 +92,6 @@ pub async fn sync_collections(
"{}api/v3/plugin/ansible/content/published/collections/index/{}/{}/versions/{}/",
galaxy_url, nspace, n, vs
),
client.clone(),
service.clone(),
)
})
Expand Down Expand Up @@ -169,29 +165,19 @@ pub async fn sync_collections(
Ok(())
}

pub async fn fetch_collection(data: &Value) -> Result<Vec<CollectionData>> {
fetch_versions(&data["versions_url"])
.await
.with_context(|| {
format!(
"Failed to fetch collection versions from {}",
data["versions_url"]
)
})
}

async fn fetch_versions(url: &Value) -> Result<Vec<CollectionData>> {
pub async fn fetch_versions(
mut service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
url: &Value,
) -> Result<Vec<CollectionData>> {
let mut versions: Vec<CollectionData> = Vec::new();
let galaxy_url = dotenv::var("GALAXY_URL").unwrap_or("https://galaxy.ansible.com/".to_string());
let mut versions_url = format!(
"{}{}?limit=100",
galaxy_url.strip_suffix('/').unwrap(),
url.as_str().unwrap()
);
let client = reqwest::Client::new();
let mut service = build_service(client.clone());
loop {
let (svc, resp) = request(versions_url, &client, service).await;
let (svc, resp) = request(versions_url, service).await;
service = svc;
let json_response = resp.json::<Value>().await.unwrap();
let results = json_response.as_object().unwrap()["data"]
Expand All @@ -208,7 +194,6 @@ async fn fetch_versions(url: &Value) -> Result<Vec<CollectionData>> {
galaxy_url.strip_suffix('/').unwrap(),
v["href"].as_str().unwrap()
),
client.clone(),
service.clone(),
)
})
Expand Down Expand Up @@ -245,6 +230,7 @@ async fn fetch_versions(url: &Value) -> Result<Vec<CollectionData>> {

pub async fn process_collection_data(
pool: web::Data<Pool<ConnectionManager<PgConnection>>>,
service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
data: Vec<Vec<CollectionData>>,
fetch_dependencies: bool,
) -> Result<()> {
Expand Down Expand Up @@ -339,7 +325,10 @@ pub async fn process_collection_data(
info!("Fetching collection dependencies");
let dependencies: Vec<_> = deps.keys().map(|url| get_json(url)).collect();
let deps_json = try_join_all(dependencies).await.unwrap();
let to_fetch: Vec<_> = deps_json.iter().map(fetch_collection).collect();
let to_fetch: Vec<_> = deps_json
.iter()
.map(|c| fetch_versions(service.clone(), &c["versions_url"]))
.collect();
to_process = try_join_all(to_fetch).await.unwrap();
} else {
break;
Expand Down
15 changes: 10 additions & 5 deletions src/sync/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
a2b_base64, build_service, fetch_collection, get_json, process_collection_data,
sync_collections, sync_roles,
a2b_base64, build_service, fetch_versions, get_json, process_collection_data, sync_collections,
sync_roles,
};
use crate::models;
use actix_web::{http::header::HeaderMap, web};
Expand Down Expand Up @@ -100,9 +100,14 @@ pub async fn process_requirements(
try_join_all(to_fetch).await?;
} else {
info!("Syncing collections");
let to_fetch: Vec<_> = responses.iter().map(fetch_collection).collect();
let client = reqwest::Client::new();
let service = build_service(client.clone());
let to_fetch: Vec<_> = responses
.iter()
.map(|c| fetch_versions(service.clone(), &c["versions_url"]))
.collect();
let data = try_join_all(to_fetch).await?;
process_collection_data(pool.clone(), data, true).await?
process_collection_data(pool.clone(), service.clone(), data, true).await?
};
}
}
Expand Down Expand Up @@ -152,7 +157,7 @@ pub async fn mirror_content(
.context("Failed to join next_link")?
} else if content_type == "collections" {
info!("Syncing collections");
sync_collections(pool.clone(), &results, client.clone(), service.clone()).await?;
sync_collections(pool.clone(), &results, service.clone()).await?;
if results.as_object().unwrap()["links"]["next"]
.as_str()
.is_none()
Expand Down
2 changes: 1 addition & 1 deletion src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod common;
mod decode;
mod roles;
mod utils;
pub use collections::{fetch_collection, process_collection_data, sync_collections};
pub use collections::{fetch_versions, process_collection_data, sync_collections};
pub use common::{import_task, mirror_content, process_requirements};
pub use decode::a2b_base64;
pub use roles::sync_roles;
Expand Down
2 changes: 1 addition & 1 deletion src/sync/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ pub fn build_service(client: Client) -> Buffer<ConcurrencyLimit<RateLimit<Client

pub async fn request(
url: String,
client: &Client,
mut service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
) -> (
Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
Response,
) {
let client = reqwest::Client::new();
let http_request = client.get(url).build().unwrap();
let mut is_ready = service.ready().await.is_ok();
while !is_ready {
Expand Down

0 comments on commit 0a881da

Please sign in to comment.