Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mass-rebuilder: Redo the evaluator #701

Merged
merged 14 commits into from
Jan 4, 2025
2 changes: 1 addition & 1 deletion ofborg/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ fn name_to_parts(name: &str) -> Vec<String> {
parts.push(buf.to_owned());
buf = String::from("");
}
buf.push_str(&c.to_string());
buf.push(c);
}
if !buf.is_empty() {
parts.push(buf.to_owned());
Expand Down
2 changes: 1 addition & 1 deletion ofborg/src/bin/build-faker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let arg = env::args().nth(1).expect("usage: build-faker <config>");
let cfg = config::load(arg.as_ref());

let conn = easylapin::from_config(&cfg.rabbitmq)?;
let conn = easylapin::from_config(&cfg.builder.unwrap().rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;

let repo_msg = Repo {
Expand Down
19 changes: 7 additions & 12 deletions ofborg/src/bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,26 @@ use std::path::Path;

use async_std::task::{self, JoinHandle};
use futures_util::future;
use tracing::{info, warn};
use tracing::{error, info, warn};

use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::{checkout, config, tasks};

// FIXME: remove with rust/cargo update
#[allow(clippy::cognitive_complexity)]
fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

let arg = env::args().nth(1).expect("usage: builder <config>");
let arg = env::args()
.nth(1)
.unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap()));
let cfg = config::load(arg.as_ref());

if !cfg.feedback.full_logs {
warn!("Please define feedback.full_logs in your configuration to true!");
warn!("feedback.full_logs when true will cause the full build log to be sent back");
warn!("to the server, and be viewable by everyone.");
warn!("");
warn!("Builders are no longer allowed to operate with this off");
warn!("so your builder will no longer start.");
let Some(builder_cfg) = config::load(arg.as_ref()).builder else {
error!("No builder configuration found!");
panic!();
};

let conn = easylapin::from_config(&cfg.rabbitmq)?;
let conn = easylapin::from_config(&builder_cfg.rabbitmq)?;
let mut handles = Vec::new();

for system in &cfg.nix.system {
Expand Down
15 changes: 10 additions & 5 deletions ofborg/src/bin/log-message-collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::error::Error;
use std::path::PathBuf;

use async_std::task;
use tracing::info;
use tracing::{error, info};

use ofborg::config;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
Expand All @@ -15,10 +15,15 @@ fn main() -> Result<(), Box<dyn Error>> {

let arg = env::args()
.nth(1)
.expect("usage: log-message-collector <config>");
.unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap()));
let cfg = config::load(arg.as_ref());

let conn = easylapin::from_config(&cfg.rabbitmq)?;
let Some(collector_cfg) = config::load(arg.as_ref()).log_message_collector else {
error!("No log message collector configuration found!");
panic!();
};

let conn = easylapin::from_config(&collector_cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;

chan.declare_exchange(easyamqp::ExchangeConfig {
Expand All @@ -31,7 +36,7 @@ fn main() -> Result<(), Box<dyn Error>> {
internal: false,
})?;

let queue_name = "".to_owned();
let queue_name = "logs".to_owned();
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
Expand All @@ -51,7 +56,7 @@ fn main() -> Result<(), Box<dyn Error>> {
// Regular channel, we want prefetching here.
let handle = chan.consume(
tasks::log_message_collector::LogMessageCollector::new(
PathBuf::from(cfg.log_storage.clone().unwrap().path),
PathBuf::from(collector_cfg.logs_path),
100,
),
easyamqp::ConsumeConfig {
Expand Down
8 changes: 2 additions & 6 deletions ofborg/src/bin/logapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,14 @@ fn main() -> Result<(), Box<dyn Error>> {
warn!("attempt_id not found in file: {file_name}");
continue;
};
let attempt_obj = attempts
.entry(attempt_id)
.or_insert_with(Attempt::default);
let attempt_obj = attempts.entry(attempt_id).or_default();
if file_name.ends_with(".metadata.json") {
attempt_obj.metadata = Some(json);
} else {
attempt_obj.result = Some(json);
}
} else {
let attempt_obj = attempts
.entry(file_name.clone())
.or_insert_with(Attempt::default);
let attempt_obj = attempts.entry(file_name.clone()).or_default();
attempt_obj.log_url =
Some(format!("{}/{reqd}/{file_name}", &cfg.serve_root));
}
Expand Down
24 changes: 7 additions & 17 deletions ofborg/src/bin/mass-rebuilder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::env;
use std::error::Error;
use std::path::Path;
use std::process;

use async_std::task;
use tracing::{error, info};
Expand All @@ -13,31 +12,24 @@ use ofborg::easylapin;
use ofborg::stats;
use ofborg::tasks;

// FIXME: remove with rust/cargo update
#[allow(clippy::cognitive_complexity)]
fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

let arg = env::args().nth(1).expect("usage: mass-rebuilder <config>");
let arg = env::args()
.nth(1)
.unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap()));
let cfg = config::load(arg.as_ref());

let memory_info = sys_info::mem_info().expect("Unable to get memory information from OS");

if memory_info.avail < 8 * 1024 * 1024 {
// seems this stuff is in kilobytes?
error!(
"Less than 8Gb of memory available (got {:.2}Gb). Aborting.",
(memory_info.avail as f32) / 1024.0 / 1024.0
);
process::exit(1);
let Some(rebuilder_cfg) = config::load(arg.as_ref()).mass_rebuilder else {
error!("No mass rebuilder configuration found!");
panic!();
};

let conn = easylapin::from_config(&cfg.rabbitmq)?;
let conn = easylapin::from_config(&rebuilder_cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;

let root = Path::new(&cfg.checkout.root);
let cloner = checkout::cached_cloner(&root.join(cfg.runner.instance.to_string()));
let nix = cfg.nix();

let events = stats::RabbitMq::from_lapin(&cfg.whoami(), task::block_on(conn.create_channel())?);

Expand All @@ -54,8 +46,6 @@ fn main() -> Result<(), Box<dyn Error>> {
let handle = easylapin::WorkerChannel(chan).consume(
tasks::evaluate::EvaluationWorker::new(
cloner,
&nix,
cfg.github(),
cfg.github_app_vendingmachine(),
cfg.acl(),
cfg.runner.identity.clone(),
Expand Down
14 changes: 11 additions & 3 deletions ofborg/src/bin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,26 @@ use std::thread;

use async_std::task;
use hyper::server::{Request, Response, Server};
use tracing::info;
use tracing::{error, info};

use ofborg::easyamqp::{ChannelExt, ConsumerExt};
use ofborg::{config, easyamqp, easylapin, stats, tasks};

fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

let arg = env::args().nth(1).expect("usage: stats <config>");
let arg = env::args()
.nth(1)
.unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap()));
let cfg = config::load(arg.as_ref());

let conn = easylapin::from_config(&cfg.rabbitmq)?;
let Some(stats_cfg) = config::load(arg.as_ref()).stats else {
error!("No stats configuration found!");
panic!();
};

let conn = easylapin::from_config(&stats_cfg.rabbitmq)?;

let mut chan = task::block_on(conn.create_channel())?;

let events = stats::RabbitMq::from_lapin(&cfg.whoami(), task::block_on(conn.create_channel())?);
Expand Down
11 changes: 2 additions & 9 deletions ofborg/src/clone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ pub trait GitClonable {
let result = Command::new("git")
.arg("clone")
.args(self.extra_clone_args())
.arg(&self.clone_from())
.arg(&self.clone_to())
.arg(self.clone_from())
.arg(self.clone_to())
.stdout(Stdio::null())
.status()?;

Expand Down Expand Up @@ -147,13 +147,6 @@ pub trait GitClonable {
.stdout(Stdio::null())
.status()?;

debug!("git gc");
Command::new("git")
.arg("gc")
.current_dir(self.clone_to())
.stdout(Stdio::null())
.status()?;

lock.unlock();

Ok(())
Expand Down
63 changes: 52 additions & 11 deletions ofborg/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@ pub struct Config {
pub github_comment_filter: Option<GithubCommentFilter>,
/// Configuration for the GitHub comment poster
pub github_comment_poster: Option<GithubCommentPoster>,
/// Configuration for the mass rebuilder
pub mass_rebuilder: Option<MassRebuilder>,
/// Configuration for the builder
pub builder: Option<Builder>,
/// Configuration for the log message collector
pub log_message_collector: Option<LogMessageCollector>,
/// Configuration for the stats server
pub stats: Option<Stats>,
pub runner: RunnerConfig,
pub feedback: FeedbackConfig,
pub checkout: CheckoutConfig,
pub nix: NixConfig,
pub rabbitmq: RabbitMqConfig,
pub github_app: Option<GithubAppConfig>,
pub log_storage: Option<LogStorage>,
}

/// Configuration for the webhook receiver
Expand Down Expand Up @@ -90,13 +95,43 @@ pub struct GithubCommentPoster {
pub rabbitmq: RabbitMqConfig,
}

/// Configuration for the mass rebuilder
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct MassRebuilder {
/// RabbitMQ broker to connect to
pub rabbitmq: RabbitMqConfig,
}

/// Configuration for the builder
#[derive(Serialize, Deserialize, Debug)]
pub struct FeedbackConfig {
pub full_logs: bool,
#[serde(deny_unknown_fields)]
pub struct Builder {
/// RabbitMQ broker to connect to
pub rabbitmq: RabbitMqConfig,
}

/// Configuration for the log message collector
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct LogMessageCollector {
/// RabbitMQ broker to connect to
pub rabbitmq: RabbitMqConfig,
/// Path where the logs reside
pub logs_path: String,
}

/// Configuration for the stats exporter
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct Stats {
/// RabbitMQ broker to connect to
pub rabbitmq: RabbitMqConfig,
}

/// Configures the connection to a RabbitMQ instance
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct RabbitMqConfig {
/// Whether or not to use SSL
pub ssl: bool,
Expand All @@ -117,6 +152,12 @@ pub struct NixConfig {
pub remote: String,
pub build_timeout_seconds: u16,
pub initial_heap_size: Option<String>,
/// CPU cores for package listing
pub list_cores: Option<u64>,
/// Chunk size for package listing
pub list_chunk_size: Option<u64>,
/// System to evaluate when calculating package diff
pub list_system: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand All @@ -127,11 +168,6 @@ pub struct GithubAppConfig {
pub oauth_client_secret_file: PathBuf,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LogStorage {
pub path: String,
}

const fn default_instance() -> u8 {
1
}
Expand Down Expand Up @@ -240,7 +276,12 @@ impl Config {

impl RabbitMqConfig {
pub fn as_uri(&self) -> Result<String, std::io::Error> {
let password = std::fs::read_to_string(&self.password_file)?;
let password = std::fs::read_to_string(&self.password_file).inspect_err(|_| {
error!(
"Unable to read RabbitMQ password file at {:?}",
self.password_file
);
})?;
let uri = format!(
"{}://{}:{}@{}/{}",
if self.ssl { "amqps" } else { "amqp" },
Expand Down
1 change: 0 additions & 1 deletion ofborg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub mod maintainers;
pub mod message;
pub mod nix;
pub mod nixenv;
pub mod nixstats;
pub mod notifyworker;
pub mod outpathdiff;
pub mod stats;
Expand Down
6 changes: 0 additions & 6 deletions ofborg/src/message/evaluationjob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ pub struct EvaluationJob {
pub pr: Pr,
}

impl EvaluationJob {
pub fn is_nixpkgs(&self) -> bool {
self.repo.name == "nixpkgs"
}
}

pub struct Actions {}

impl Actions {
Expand Down
Loading
Loading