Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of PID of process running job #1

Merged
merged 6 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ log = "0.4"
pretty_env_logger = "0.4"
serde_json = "1.0"
uuid = { version = "0.8", features = ["v4"] }
nix = "0.17"
68 changes: 43 additions & 25 deletions src/lib/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,82 @@
use std::fs::File;
use std::process::{Command, Stdio};
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::lib::job;

use super::config;

fn remote_cmd(args: &mut Vec<&str>, output: &mut File) -> Result<(), String> {
async fn remote_cmd(args: &mut Vec<&str>, output: &mut File, curr_job: &Arc<RwLock<job::JobInfo>>) -> Result<(), String> {
// TODO(azhng): let's finger cross this works.
let output_file = output.try_clone().unwrap();
let error_file = output.try_clone().unwrap();
let cmd = Command::new("ssh")
let cmd_process = Command::new("ssh")
.arg(config::SEXXI_REMOTE_HOST)
.args(args)
.args(args.clone())
roxelo marked this conversation as resolved.
Show resolved Hide resolved
.stdout(Stdio::from(output_file))
.stderr(Stdio::from(error_file))
.output()
.expect("Ok");
.spawn();

match cmd_process {
Ok(child) => {
info!("Server process with pid {} is running command `{:?}`", child.id(), args);
{
let mut job_info = curr_job.write().await;
job_info.pid = child.id();
}

let result = child.wait_with_output().expect("Ok");
if !result.status.success() {
return Err(format!("remote command failed: {}", result.status));
}
},
Err(e) => {
error!("Command `{:?}` failed, server process didn't start: {}", args, e);
std::process::exit(1);
roxelo marked this conversation as resolved.
Show resolved Hide resolved
},

if !cmd.status.success() {
return Err(format!("remote command failed: {}", cmd.status));
}

Ok(())
}

fn remote_git_cmd(args: &mut Vec<&str>, output: &mut File) -> Result<(), String> {
async fn remote_git_cmd(args: &mut Vec<&str>, output: &mut File, curr_job: &Arc<RwLock<job::JobInfo>>) -> Result<(), String> {
roxelo marked this conversation as resolved.
Show resolved Hide resolved
let mut git_cmd = vec!["git", "-C", config::SEXXI_WORK_TREE];
git_cmd.append(args);
remote_cmd(&mut git_cmd, output)
remote_cmd(&mut git_cmd, output, curr_job).await
}

pub fn remote_git_reset_branch(output: &mut File) -> Result<(), String> {
pub async fn remote_git_reset_branch(output: &mut File, curr_job: &Arc<RwLock<job::JobInfo>>) -> Result<(), String> {
let mut cmd = vec!["checkout", "master"];
remote_git_cmd(&mut cmd, output)
remote_git_cmd(&mut cmd, output, curr_job).await
}

pub fn remote_git_fetch_upstream(output: &mut File) -> Result<(), String> {
let mut cmd = vec!["fetch", "--all"];
remote_git_cmd(&mut cmd, output)
pub async fn remote_git_fetch_upstream(output: &mut File, curr_job: &Arc<RwLock<job::JobInfo>>) -> Result<(), String> {
let mut cmd = vec!["fetch", "--all", "-p"];
remote_git_cmd(&mut cmd, output, curr_job).await
}

pub fn remote_git_checkout_sha(sha: &str, bot_ref: &str, output: &mut File) -> Result<(), String> {
let mut cmd = vec!["checkout", sha, "-b", bot_ref];
remote_git_cmd(&mut cmd, output)
pub async fn remote_git_checkout_sha(sha: &str, bot_ref: &str, output: &mut File, curr_job: &Arc<RwLock<job::JobInfo>>) -> Result<(), String> {
let mut cmd = vec!["checkout", sha, "-B", bot_ref];
remote_git_cmd(&mut cmd, output, curr_job).await
}

pub fn remote_git_rebase_upstream(output: &mut File) -> Result<(), String> {
pub async fn remote_git_rebase_upstream(output: &mut File, curr_job: &Arc<RwLock<job::JobInfo>>) -> Result<(), String> {
let mut cmd = vec!["rebase", "upstream/master"];
remote_git_cmd(&mut cmd, output)
remote_git_cmd(&mut cmd, output, curr_job).await
}

pub fn remote_git_push(bot_ref: &str, output: &mut File) -> Result<(), String> {
pub async fn remote_git_push(bot_ref: &str, output: &mut File, curr_job: &Arc<RwLock<job::JobInfo>>) -> Result<(), String> {
let mut cmd = vec!["push", "origin", bot_ref, "-f"];
remote_git_cmd(&mut cmd, output)
remote_git_cmd(&mut cmd, output, curr_job).await
}

pub fn remote_git_delete_branch(bot_ref: &str, output: &mut File) -> Result<(), String> {
pub async fn remote_git_delete_branch(bot_ref: &str, output: &mut File, curr_job: &Arc<RwLock<job::JobInfo>>) -> Result<(), String> {
let mut cmd = vec!["branch", "-D", bot_ref];
remote_git_cmd(&mut cmd, output)
remote_git_cmd(&mut cmd, output, curr_job).await
}

pub fn remote_test_rust_repo(output: &mut File) -> Result<(), String> {
pub async fn remote_test_rust_repo(output: &mut File, curr_job: &Arc<RwLock<job::JobInfo>>) -> Result<(), String> {
let mut cmd = vec![
"cd",
config::SEXXI_WORK_TREE,
Expand All @@ -68,5 +86,5 @@ pub fn remote_test_rust_repo(output: &mut File) -> Result<(), String> {
"-i",
"-j32",
];
remote_cmd(&mut cmd, output)
remote_cmd(&mut cmd, output, curr_job).await
}
6 changes: 4 additions & 2 deletions src/lib/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ pub const SEXXI_GIT_DIR: &'static str = "$HOME/scratch/sexxi-rust/.git";
pub const SEXXI_WORK_TREE: &'static str = "$HOME/scratch/sexxi-rust";
pub const SEXXI_PROJECT: &'static str = "rust";

// Default testing repo
pub const SEXXI_TEST_PROJECT: &'static str = "sexxi-webhook-test";

pub const SEXXI_REMOTE_HOST: &'static str = "sorbitol";
pub const SEXXI_LOG_FILE_DIR: &'static str = "www/build-logs";

// TODO(azhng): we need to template out the user name here.
pub const BUILD_LOG_BASE_URL: &'static str = "https://csclub.uwaterloo.ca/~z577zhan/build-logs";
pub const BUILD_LOG_BASE_URL: &'static str = "https://csclub.uwaterloo.ca/~rlmmfruy/build-logs";
roxelo marked this conversation as resolved.
Show resolved Hide resolved

pub const COMMENT_JOB_START: &'static str = ":running_man: Start running build job";
pub const COMMENT_JOB_DONE: &'static str = "✅ Job Completed";
17 changes: 16 additions & 1 deletion src/lib/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use tokio::sync::{
};
use uuid::Uuid;
use super::{config, job};
use nix::unistd::Pid;
use nix::sys::signal::{self, Signal};

pub fn gen_response(code: u16) -> Response<Body> {
let mut resp = Response::default();
Expand All @@ -23,13 +25,14 @@ pub async fn handle_webhook(
req: Request<Body>,
jobs: Arc<RwLock<job::JobRegistry>>,
sender: &mut mpsc::Sender<Uuid>,
curr_job: Arc<RwLock<job::JobInfo>>
) -> Result<Response<Body>, hyper::Error> {
let mut body = hyper::body::aggregate::<Request<Body>>(req).await?;
let bytes = body.to_bytes();
let blob: Result<serde_json::Value, serde_json::Error> = serde_json::from_slice(&bytes);

match blob {
Ok(json) => parse_and_handle(json, jobs, sender).await,
Ok(json) => parse_and_handle(json, jobs, sender, &curr_job).await,
Err(e) => {
error!("parsing error: {}", e);
Ok::<_, hyper::Error>(gen_response(400))
Expand Down Expand Up @@ -93,6 +96,7 @@ async fn parse_and_handle(
json: serde_json::Value,
jobs: Arc<RwLock<job::JobRegistry>>,
sender: &mut mpsc::Sender<Uuid>,
curr_job: &Arc<RwLock<job::JobInfo>>
) -> Result<Response<Body>, hyper::Error> {


Expand All @@ -114,6 +118,17 @@ async fn parse_and_handle(
let mut jobs = jobs.write().await;
jobs.insert(job_id.clone(), Arc::new(RwLock::new(job)));

if head_ref == curr_job.read().await.head_ref {
info!("New job on same head_ref, killing current job");
// [To-Do] Fix, could potentially kill a process which is already dead
signal::kill(Pid::from_raw(curr_job.read().await.pid as i32), Signal::SIGKILL);
} else {
{
let mut w_curr_job = curr_job.write().await;
w_curr_job.head_ref = String::from(head_ref);
}
}

if let Err(e) = sender.send(job_id).await {
error!("failed to send job id to job runner: {}", e);
return Ok::<_, hyper::Error>(gen_response(500));
Expand Down
35 changes: 20 additions & 15 deletions src/lib/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ pub struct JobDesc {
pub status: JobStatus,
}

pub struct JobInfo {
roxelo marked this conversation as resolved.
Show resolved Hide resolved
pub head_ref: String,
pub pid: u32,
}

impl JobDesc {
pub fn new(action: &str, reviewer: &str, sha: &str, pr_num: u64, head_ref: &str) -> JobDesc {
JobDesc {
Expand All @@ -50,7 +55,7 @@ impl JobDesc {

pub type JobRegistry = HashMap<Uuid, Arc<RwLock<JobDesc>>>;

pub async fn process_job(job_id: &Uuid, jobs: Arc<RwLock<JobRegistry>>) {
pub async fn process_job(job_id: &Uuid, jobs: Arc<RwLock<JobRegistry>>, curr_job: Arc<RwLock<JobInfo>>) {
info!("Starting job {}", &job_id);

let mut succeed = false;
Expand All @@ -73,7 +78,7 @@ pub async fn process_job(job_id: &Uuid, jobs: Arc<RwLock<JobRegistry>>) {

{
let job = &*job.read().await;
if let Err(e) = start_build_job(job).await {
if let Err(e) = start_build_job(job, &curr_job).await {
error!("job {} failed due to: {}", &job_id, e);
} else {
succeed = true;
Expand Down Expand Up @@ -111,7 +116,7 @@ async fn job_failure_handler<T: std::fmt::Display>(
Ok(())
}

async fn run_and_build(job: &JobDesc) -> Result<(), String> {
async fn run_and_build(job: &JobDesc, curr_job: &Arc<RwLock<JobInfo>>) -> Result<(), String> {
// TODO(azhng): figure out how to perform additional cleanup.

let log_file_name = format!("{}/{}/{}", env::var("HOME").unwrap(), config::SEXXI_LOG_FILE_DIR, &job.id);
Expand All @@ -125,39 +130,39 @@ async fn run_and_build(job: &JobDesc) -> Result<(), String> {
}


if let Err(e) = cmd::remote_git_reset_branch(&mut log_file) {
if let Err(e) = cmd::remote_git_reset_branch(&mut log_file, curr_job).await {
return job_failure_handler("unable to reset branch", &job, e).await;
}

if let Err(e) = cmd::remote_git_fetch_upstream(&mut log_file) {
if let Err(e) = cmd::remote_git_fetch_upstream(&mut log_file, curr_job).await {
return job_failure_handler("unable to fetch upstream", &job, e).await;
}

if let Err(e) = cmd::remote_git_checkout_sha(&job.sha, &bot_ref, &mut log_file) {
if let Err(e) = cmd::remote_git_checkout_sha(&job.sha, &bot_ref, &mut log_file, curr_job).await {
return job_failure_handler("unable to check out commit", &job, e).await;
}

if let Err(e) = cmd::remote_git_rebase_upstream(&mut log_file) {
if let Err(e) = cmd::remote_git_rebase_upstream(&mut log_file, curr_job).await {
return job_failure_handler("unable to rebase against upstream", &job, e).await;
}

// TODO(azhng): make this a runtime decision.
//info!("Skipping running test for development");
if let Err(e) = cmd::remote_test_rust_repo(&mut log_file) {
cmd::remote_git_reset_branch(&mut log_file).expect("Ok");
cmd::remote_git_delete_branch(&bot_ref, &mut log_file).expect("Ok");
if let Err(e) = cmd::remote_test_rust_repo(&mut log_file, curr_job).await {
cmd::remote_git_reset_branch(&mut log_file, curr_job).await.expect("Ok");
cmd::remote_git_delete_branch(&bot_ref, &mut log_file, curr_job).await.expect("Ok");
return job_failure_handler("unit test failed", &job, e).await;
}

if let Err(e) = cmd::remote_git_push(&bot_ref, &mut log_file) {
if let Err(e) = cmd::remote_git_push(&bot_ref, &mut log_file, curr_job).await {
return job_failure_handler("unable to push bot branch", &job, e).await;
}

if let Err(e) = cmd::remote_git_reset_branch(&mut log_file) {
if let Err(e) = cmd::remote_git_reset_branch(&mut log_file, curr_job).await {
return job_failure_handler("unable to reset branch for clean up", &job, e).await;
}

if let Err(e) = cmd::remote_git_delete_branch(&bot_ref, &mut log_file) {
if let Err(e) = cmd::remote_git_delete_branch(&bot_ref, &mut log_file, curr_job).await {
return job_failure_handler("unable to delete bot branch", &job, e).await;
}

Expand All @@ -177,10 +182,10 @@ async fn run_and_build(job: &JobDesc) -> Result<(), String> {
}


async fn start_build_job(job: &JobDesc) -> Result<(), String> {
async fn start_build_job(job: &JobDesc, curr_job: &Arc<RwLock<JobInfo>>) -> Result<(), String> {
let comment = format!("{}, job id: {}", config::COMMENT_JOB_START, &job.id);
if let Err(e) = api::post_comment(&comment, job.pr_num).await {
return Err(format!("failed to post comment to pr {}: {}", &job.pr_num, e));
}
run_and_build(job).await
run_and_build(job, curr_job).await
}