Skip to content

Commit

Permalink
Merge pull request #3 from sexxi-goose/arc-clone
Browse files Browse the repository at this point in the history
webhook: remove deep clone of JobDesc in the worker.
  • Loading branch information
Azhng authored Jun 25, 2020
2 parents 5c17ee1 + 0fb3bd3 commit 47a0b13
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 141 deletions.
87 changes: 30 additions & 57 deletions src/lib/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
use std::fs::File;
use std::process::{Command, Stdio};
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
use nix::sys::signal::{self, Signal};

use crate::lib::job;
use std::process::{Child, Command, Stdio};

use super::config;

async fn remote_cmd(job_id: &Uuid, args: &mut Vec<&str>, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
async fn remote_cmd(args: &mut Vec<&str>, output: &mut File) -> Result<Child, String> {
let output_file = output.try_clone().unwrap();
let error_file = output.try_clone().unwrap();
let cmd_process = Command::new("ssh")
Expand All @@ -21,82 +15,61 @@ async fn remote_cmd(job_id: &Uuid, args: &mut Vec<&str>, output: &mut File, job_

match cmd_process {
Ok(child) => {
info!("Server process with pid {} is running command `{:?}`", child.id(), args);
{
let job_registry = job_registry.read().await;
let job: Arc<RwLock<job::JobDesc>>;
if let Some(j) = job_registry.jobs.get(job_id) {
job = j.clone();
let mut job = job.write().await;
job.pid = child.id();
}
}

let result = child.wait_with_output().expect("Ok");
if !result.status.success() {
/*
Check if Signal::SIGINT was received. If it was the case
this is probably because the the job was canceled.
exit code 130 is a result of Control-C/SIGINT
*/
if result.status.to_string() == "exit code: 130" {
let job_registry = job_registry.read().await;
let job: Arc<RwLock<job::JobDesc>>;
if let Some(j) = job_registry.jobs.get(job_id) {
job = j.clone();
let mut job = job.write().await;
job.status = job::JobStatus::Canceled;
}
}
return Err(format!("remote command failed: {}", result.status));
}
info!("Server process with pid {} is running command `{:?}`", &child.id(), args);
return Ok(child);
},
Err(e) => {
return Err(format!("Command `{:?}` failed, server process didn't start: {}", args, e));
return Err(format!("command `{:?}` failed, server process didn't start: {}", args, e));
},

}

Ok(())
}

async fn remote_git_cmd(job_id: &Uuid, args: &mut Vec<&str>, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
async fn remote_git_cmd(args: &mut Vec<&str>, output: &mut File) -> Result<(), String> {
let mut git_cmd = vec!["git", "-C", config::SEXXI_WORK_TREE];
git_cmd.append(args);
remote_cmd(job_id, &mut git_cmd, output, job_registry).await
match remote_cmd(&mut git_cmd, output).await {
Ok(child) => {
let result = child.wait_with_output().expect("Ok");
if !result.status.success() {
return Err(format!("remote command failed: {}", result.status));
}
Ok(())
},
Err(e) => Err(format!("remote command failed: {}", e))
}
}

pub async fn remote_git_reset_branch(job_id: &Uuid, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
pub async fn remote_git_reset_branch(output: &mut File) -> Result<(), String> {
let mut cmd = vec!["checkout", "master"];
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
remote_git_cmd(&mut cmd, output).await
}

pub async fn remote_git_fetch_upstream(job_id: &Uuid, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
pub async fn remote_git_fetch_upstream(output: &mut File) -> Result<(), String> {
let mut cmd = vec!["fetch", "--all", "-p"];
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
remote_git_cmd(&mut cmd, output).await
}

pub async fn remote_git_checkout_sha(job_id: &Uuid, sha: &str, bot_ref: &str, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
pub async fn remote_git_checkout_sha(sha: &str, bot_ref: &str, output: &mut File) -> Result<(), String> {
let mut cmd = vec!["checkout", sha, "-B", bot_ref];
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
remote_git_cmd(&mut cmd, output).await
}

pub async fn remote_git_rebase_upstream(job_id: &Uuid, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
pub async fn remote_git_rebase_upstream(output: &mut File) -> Result<(), String> {
let mut cmd = vec!["rebase", "upstream/master"];
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
remote_git_cmd(&mut cmd, output).await
}

pub async fn remote_git_push(job_id: &Uuid, bot_ref: &str, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
pub async fn remote_git_push(bot_ref: &str, output: &mut File) -> Result<(), String> {
let mut cmd = vec!["push", "origin", bot_ref, "-f"];
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
remote_git_cmd(&mut cmd, output).await
}

pub async fn remote_git_delete_branch(job_id: &Uuid, bot_ref: &str, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
pub async fn remote_git_delete_branch(bot_ref: &str, output: &mut File) -> Result<(), String> {
let mut cmd = vec!["branch", "-D", bot_ref];
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
remote_git_cmd(&mut cmd, output).await
}

pub async fn remote_test_rust_repo(job_id: &Uuid, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
pub async fn remote_test_rust_repo(output: &mut File) -> Result<Child, String> {
let mut cmd = vec![
"cd",
config::SEXXI_WORK_TREE,
Expand All @@ -106,5 +79,5 @@ pub async fn remote_test_rust_repo(job_id: &Uuid, output: &mut File, job_registr
"-i",
"-j32",
];
remote_cmd(job_id, &mut cmd, output, job_registry).await
remote_cmd(&mut cmd, output).await
}
7 changes: 5 additions & 2 deletions src/lib/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::env;
use lazy_static::lazy_static;
use std::env;

pub const REVIEW_REQUESTED: &'static str = "review_requested";
pub const REVIEW_REQUESTED_REMOVED: &'static str = "review_requested_removed";

pub const REVIEWER: &'static str = "sexxi-bot";
// TODO(azhng): const REPO: &'static str = "rust";

Expand All @@ -20,7 +22,8 @@ pub const SEXXI_LOG_FILE_DIR: &'static str = "www/build-logs";

lazy_static! {
pub static ref MACHINE_USER: String = env::var("USER").unwrap();
pub static ref BUILD_LOG_BASE_URL: String = format!("https://csclub.uwaterloo.ca/~{}/build-logs", *MACHINE_USER);
pub static ref BUILD_LOG_BASE_URL: String =
format!("https://csclub.uwaterloo.ca/~{}/build-logs", *MACHINE_USER);
}

pub const COMMENT_JOB_START: &'static str = ":running_man: Start running build job";
Expand Down
15 changes: 8 additions & 7 deletions src/lib/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,20 @@ async fn parse_and_handle(

let job = job::JobDesc::new(&action, &reviewer, &sha, pr_number, &head_ref);
let job_id = job.id.clone();
let job = Arc::new(RwLock::new(job));

{
let mut job_registry = job_registry.write().await;
job_registry.jobs.insert(job_id.clone(), Arc::new(RwLock::new(job)));
let c_job: Arc<RwLock<job::JobDesc>>;
job_registry.jobs.insert(job_id.clone(), job.clone());
let c_job_id: Uuid;
if let Some(c_id) = job_registry.running_jobs.get(head_ref.clone()) {
c_job_id = c_id.clone();
if let Some(j) = job_registry.jobs.get(&c_job_id) {
c_job = j.clone();
let curr_job = c_job.read().await;
if head_ref == curr_job.head_ref {
info!("New job on same head_ref, killing current job");
if let Some(curr_job) = job_registry.jobs.get(&c_job_id) {
let curr_job = curr_job.read().await;
// TODO(azhng): stop gap solution to avoid accidentally killing server
// process
if head_ref == curr_job.head_ref && curr_job.pid != 0 {
info!("New job on same head_ref, killing current job pid: {}", &curr_job.pid);
// [To-Do] Fix, could potentially kill a process which is already dead
signal::kill(Pid::from_raw(curr_job.pid as i32), Signal::SIGINT);
}
Expand Down
138 changes: 63 additions & 75 deletions src/lib/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl JobRegistry {
pub async fn process_job(job_id: &Uuid, job_registry: Arc<RwLock<JobRegistry>>) {
info!("Starting job {}", &job_id);

let mut succeed = false;
let job: Arc<RwLock<JobDesc>>;

{
Expand All @@ -81,54 +80,14 @@ pub async fn process_job(job_id: &Uuid, job_registry: Arc<RwLock<JobRegistry>>)
}
}

{
let mut job = job.write().await;
job.status = JobStatus::Running;
}

let copy_job: JobDesc;
{
let job = &*job.read().await;

/*
Important: We can't pass job directly into start_build_job, otherwise we will
have a deadlock. We want to update the JobDesc for the job with the PID of the
running process, but since we have the read lock, we can't acquire the write lock.
job.write().await.status = JobStatus::Running;

If we want to avoid having to request the read lock everytime we need it, we should
duplicate the current state (info) of the JobDesc. We may want to change this later
if we change more than the PID info while running the job.
*/
copy_job = job.clone();
}

if let Err(e) = start_build_job(&copy_job, &job_registry).await {
if let Err(e) = start_build_job(&job).await {
error!("job {} failed due to: {}", &job_id, e);
} else {
succeed = true;
}

{
let mut job = job.write().await;

/*
We don't want to remove the head_ref key from the cancelled
job if the JobStatus is Canceled because when a job is Canceled,
a new job is started with the same key. ie: removing the key results
in removing a running job not the canceled job.
*/
if job.status != JobStatus::Canceled {
{
let mut job_registry = job_registry.write().await;
job_registry.running_jobs.remove(&job.head_ref.clone());
}
}

if succeed {
job.status = JobStatus::Finished;
} else {
job.status = JobStatus::Failed;
}
if job.read().await.status == JobStatus::Canceled {
job_registry.write().await.running_jobs.remove(&job.read().await.head_ref.clone());
}
}

Expand All @@ -153,63 +112,89 @@ async fn job_failure_handler<T: std::fmt::Display>(
Ok(())
}

async fn run_and_build(job: &JobDesc, job_registry: &Arc<RwLock<JobRegistry>>) -> Result<(), String> {
async fn run_and_build(job: &Arc<RwLock<JobDesc>>) -> Result<(), String> {
// TODO(azhng): figure out how to perform additional cleanup.

let log_file_name = format!("{}/{}/{}", env::var("HOME").unwrap(), config::SEXXI_LOG_FILE_DIR, &job.id);
let log_file_name = format!("{}/{}/{}", env::var("HOME").unwrap(), config::SEXXI_LOG_FILE_DIR, &job.read().await.id);
let log_file_path = Path::new(&log_file_name);
info!("Creating log file at: {}", &log_file_name);
let mut log_file = File::create(&log_file_path).unwrap();
let bot_ref = format!("bot-{}", &job.head_ref);
let bot_ref = format!("bot-{}", &job.read().await.head_ref);

if let Err(e) = api::update_commit_status(&job.sha, api::COMMIT_PENDING, "Building job started", &job.build_log_url()).await {
if let Err(e) = api::update_commit_status(&job.read().await.sha, api::COMMIT_PENDING, "Building job started", &job.read().await.build_log_url()).await {
return Err(format!("unable to update commit status for failed job: {}", e));
}


if let Err(e) = cmd::remote_git_reset_branch(&job.id, &mut log_file, job_registry).await {
return job_failure_handler("unable to reset branch", &job, e).await;
if let Err(e) = cmd::remote_git_reset_branch(&mut log_file).await {
return job_failure_handler("unable to reset branch", &*job.read().await, e).await;
}

if let Err(e) = cmd::remote_git_fetch_upstream(&job.id, &mut log_file, job_registry).await {
return job_failure_handler("unable to fetch upstream", &job, e).await;
if let Err(e) = cmd::remote_git_fetch_upstream(&mut log_file).await {
return job_failure_handler("unable to fetch upstream", &*job.read().await, e).await;
}

if let Err(e) = cmd::remote_git_checkout_sha(&job.id, &job.sha, &bot_ref, &mut log_file, job_registry).await {
return job_failure_handler("unable to check out commit", &job, e).await;
if let Err(e) = cmd::remote_git_checkout_sha(&job.read().await.sha, &bot_ref, &mut log_file).await {
return job_failure_handler("unable to check out commit", &*job.read().await, e).await;
}

if let Err(e) = cmd::remote_git_rebase_upstream(&job.id, &mut log_file, job_registry).await {
return job_failure_handler("unable to rebase against upstream", &job, e).await;
if let Err(e) = cmd::remote_git_rebase_upstream(&mut log_file).await {
return job_failure_handler("unable to rebase against upstream", &*job.read().await, e).await;
}

// TODO(azhng): make this a runtime decision.
//info!("Skipping running test for development");
if let Err(e) = cmd::remote_test_rust_repo(&job.id, &mut log_file, job_registry).await {
cmd::remote_git_reset_branch(&job.id, &mut log_file, job_registry).await.expect("Ok");
cmd::remote_git_delete_branch(&job.id, &bot_ref, &mut log_file, job_registry).await.expect("Ok");
return job_failure_handler("unit test failed", &job, e).await;
if job.read().await.status != JobStatus::Canceled {
let mut succeed = false;
let mut err = String::from("");
match cmd::remote_test_rust_repo(&mut log_file).await {
Ok(test_process) => {
job.write().await.pid = test_process.id();
let result = test_process.wait_with_output().expect("Ok");
if !result.status.success() {
if result.status.to_string() == "exit code: 130" {
err = format!("job canceled");
job.write().await.status = JobStatus::Canceled;
} else {
err = format!("unit test failed: {}", result.status);
job.write().await.status = JobStatus::Failed;
}
} else {
job.write().await.status = JobStatus::Finished;
succeed = true;
}
},
Err(e) => {
err = format!("unable to start testing process: {}", e);
}
}

if !succeed {
cmd::remote_git_reset_branch(&mut log_file).await.expect("Ok");
cmd::remote_git_delete_branch(&bot_ref, &mut log_file).await.expect("Ok");
return job_failure_handler("unit test failed", &*job.read().await, err).await;
}
}

if let Err(e) = cmd::remote_git_push(&job.id, &bot_ref, &mut log_file, job_registry).await {
return job_failure_handler("unable to push bot branch", &job, e).await;
if let Err(e) = cmd::remote_git_push(&bot_ref, &mut log_file).await {
return job_failure_handler("unable to push bot branch", &*job.read().await, e).await;
}

if let Err(e) = cmd::remote_git_reset_branch(&job.id, &mut log_file, job_registry).await {
return job_failure_handler("unable to reset branch for clean up", &job, e).await;
if let Err(e) = cmd::remote_git_reset_branch(&mut log_file).await {
return job_failure_handler("unable to reset branch for clean up", &*job.read().await, e).await;
}

if let Err(e) = cmd::remote_git_delete_branch(&job.id, &bot_ref, &mut log_file, job_registry).await {
return job_failure_handler("unable to delete bot branch", &job, e).await;
if let Err(e) = cmd::remote_git_delete_branch(&bot_ref, &mut log_file).await {
return job_failure_handler("unable to delete bot branch", &*job.read().await, e).await;
}

let msg = format!("{}, access build log [here]({})", config::COMMENT_JOB_DONE, job.build_log_url());
let msg = format!("{}, access build log [here]({})", config::COMMENT_JOB_DONE, job.read().await.build_log_url());
info!("{}", &msg);
if let Err(e) = api::post_comment(&msg, job.pr_num).await {
if let Err(e) = api::post_comment(&msg, job.read().await.pr_num).await {
warn!("failed to post comment for job completion: {}", e);
}

if let Err(e) = api::update_commit_status(&job.sha, api::COMMIT_SUCCESS, &msg, &job.build_log_url()).await {
if let Err(e) = api::update_commit_status(&job.read().await.sha, api::COMMIT_SUCCESS, &msg, &job.read().await.build_log_url()).await {
return Err(format!("unable to update commit status for failed job: {}", e));
}

Expand All @@ -219,10 +204,13 @@ async fn run_and_build(job: &JobDesc, job_registry: &Arc<RwLock<JobRegistry>>) -
}


async fn start_build_job(job: &JobDesc, job_registry: &Arc<RwLock<JobRegistry>>) -> Result<(), String> {
let comment = format!("{}, job id: {}", config::COMMENT_JOB_START, &job.id);
if let Err(e) = api::post_comment(&comment, job.pr_num).await {
return Err(format!("failed to post comment to pr {}: {}", &job.pr_num, e));
async fn start_build_job(job: &Arc<RwLock<JobDesc>>) -> Result<(), String> {
{
let job = &*job.read().await;
let comment = format!("{}, job id: [{}]({})", config::COMMENT_JOB_START, &job.id, job.build_log_url());
if let Err(e) = api::post_comment(&comment, job.pr_num).await {
return Err(format!("failed to post comment to pr {}: {}", &job.pr_num, e));
}
}
run_and_build(job, job_registry).await
run_and_build(job).await
}

0 comments on commit 47a0b13

Please sign in to comment.