Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of PID of process running job #1

Merged
merged 6 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ log = "0.4"
pretty_env_logger = "0.4"
serde_json = "1.0"
uuid = { version = "0.8", features = ["v4"] }
nix = "0.17"
lazy_static = "1.4"
89 changes: 63 additions & 26 deletions src/lib/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,101 @@
use std::fs::File;
use std::process::{Command, Stdio};
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
use nix::sys::signal::{self, Signal};

use crate::lib::job;

use super::config;

fn remote_cmd(args: &mut Vec<&str>, output: &mut File) -> Result<(), String> {
// TODO(azhng): let's finger cross this works.
async fn remote_cmd(job_id: &Uuid, args: &mut Vec<&str>, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
let output_file = output.try_clone().unwrap();
let error_file = output.try_clone().unwrap();
let cmd = Command::new("ssh")
let cmd_process = Command::new("ssh")
.arg(config::SEXXI_REMOTE_HOST)
.args(args)
.args(&*args)
.stdout(Stdio::from(output_file))
.stderr(Stdio::from(error_file))
.output()
.expect("Ok");
.spawn();

match cmd_process {
Ok(child) => {
info!("Server process with pid {} is running command `{:?}`", child.id(), args);
{
let job_registry = job_registry.read().await;
let job: Arc<RwLock<job::JobDesc>>;
if let Some(j) = job_registry.jobs.get(job_id) {
job = j.clone();
let mut job = job.write().await;
job.pid = child.id();
}
}

let result = child.wait_with_output().expect("Ok");
if !result.status.success() {
/*
Check if Signal::SIGKILL was received. If it was the case
this is probably because the the job was canceled.
*/
if result.status.to_string() == "signal: 9" {
roxelo marked this conversation as resolved.
Show resolved Hide resolved
let job_registry = job_registry.read().await;
let job: Arc<RwLock<job::JobDesc>>;
if let Some(j) = job_registry.jobs.get(job_id) {
job = j.clone();
let mut job = job.write().await;
job.status = job::JobStatus::Canceled;
roxelo marked this conversation as resolved.
Show resolved Hide resolved
}
}
return Err(format!("remote command failed: {}", result.status));
}
},
Err(e) => {
return Err(format!("Command `{:?}` failed, server process didn't start: {}", args, e));
},

if !cmd.status.success() {
return Err(format!("remote command failed: {}", cmd.status));
}

Ok(())
}

fn remote_git_cmd(args: &mut Vec<&str>, output: &mut File) -> Result<(), String> {
async fn remote_git_cmd(job_id: &Uuid, args: &mut Vec<&str>, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
let mut git_cmd = vec!["git", "-C", config::SEXXI_WORK_TREE];
git_cmd.append(args);
remote_cmd(&mut git_cmd, output)
remote_cmd(job_id, &mut git_cmd, output, job_registry).await
}

pub fn remote_git_reset_branch(output: &mut File) -> Result<(), String> {
pub async fn remote_git_reset_branch(job_id: &Uuid, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
let mut cmd = vec!["checkout", "master"];
remote_git_cmd(&mut cmd, output)
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
}

pub fn remote_git_fetch_upstream(output: &mut File) -> Result<(), String> {
let mut cmd = vec!["fetch", "--all"];
remote_git_cmd(&mut cmd, output)
pub async fn remote_git_fetch_upstream(job_id: &Uuid, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
let mut cmd = vec!["fetch", "--all", "-p"];
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
}

pub fn remote_git_checkout_sha(sha: &str, bot_ref: &str, output: &mut File) -> Result<(), String> {
let mut cmd = vec!["checkout", sha, "-b", bot_ref];
remote_git_cmd(&mut cmd, output)
pub async fn remote_git_checkout_sha(job_id: &Uuid, sha: &str, bot_ref: &str, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
let mut cmd = vec!["checkout", sha, "-B", bot_ref];
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
}

pub fn remote_git_rebase_upstream(output: &mut File) -> Result<(), String> {
pub async fn remote_git_rebase_upstream(job_id: &Uuid, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
let mut cmd = vec!["rebase", "upstream/master"];
remote_git_cmd(&mut cmd, output)
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
}

pub fn remote_git_push(bot_ref: &str, output: &mut File) -> Result<(), String> {
pub async fn remote_git_push(job_id: &Uuid, bot_ref: &str, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
let mut cmd = vec!["push", "origin", bot_ref, "-f"];
remote_git_cmd(&mut cmd, output)
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
}

pub fn remote_git_delete_branch(bot_ref: &str, output: &mut File) -> Result<(), String> {
pub async fn remote_git_delete_branch(job_id: &Uuid, bot_ref: &str, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
let mut cmd = vec!["branch", "-D", bot_ref];
remote_git_cmd(&mut cmd, output)
remote_git_cmd(job_id, &mut cmd, output, job_registry).await
}

pub fn remote_test_rust_repo(output: &mut File) -> Result<(), String> {
pub async fn remote_test_rust_repo(job_id: &Uuid, output: &mut File, job_registry: &Arc<RwLock<job::JobRegistry>>) -> Result<(), String> {
let mut cmd = vec![
"cd",
config::SEXXI_WORK_TREE,
Expand All @@ -68,5 +105,5 @@ pub fn remote_test_rust_repo(output: &mut File) -> Result<(), String> {
"-i",
"-j32",
];
remote_cmd(&mut cmd, output)
remote_cmd(job_id, &mut cmd, output, job_registry).await
}
12 changes: 10 additions & 2 deletions src/lib/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::env;
use lazy_static::lazy_static;

pub const REVIEW_REQUESTED: &'static str = "review_requested";
pub const REVIEWER: &'static str = "sexxi-bot";
// TODO(azhng): const REPO: &'static str = "rust";
Expand All @@ -9,11 +12,16 @@ pub const SEXXI_GIT_DIR: &'static str = "$HOME/scratch/sexxi-rust/.git";
pub const SEXXI_WORK_TREE: &'static str = "$HOME/scratch/sexxi-rust";
pub const SEXXI_PROJECT: &'static str = "rust";

// Default testing repo
pub const SEXXI_TEST_PROJECT: &'static str = "sexxi-webhook-test";

pub const SEXXI_REMOTE_HOST: &'static str = "sorbitol";
pub const SEXXI_LOG_FILE_DIR: &'static str = "www/build-logs";

// TODO(azhng): we need to template out the user name here.
pub const BUILD_LOG_BASE_URL: &'static str = "https://csclub.uwaterloo.ca/~z577zhan/build-logs";
lazy_static! {
roxelo marked this conversation as resolved.
Show resolved Hide resolved
pub static ref MACHINE_USER: String = env::var("USER").unwrap();
pub static ref BUILD_LOG_BASE_URL: String = format!("https://csclub.uwaterloo.ca/~{}/build-logs", *MACHINE_USER);
}

pub const COMMENT_JOB_START: &'static str = ":running_man: Start running build job";
pub const COMMENT_JOB_DONE: &'static str = "✅ Job Completed";
40 changes: 33 additions & 7 deletions src/lib/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use tokio::sync::{
};
use uuid::Uuid;
use super::{config, job};
use nix::unistd::Pid;
use nix::sys::signal::{self, Signal};

pub fn gen_response(code: u16) -> Response<Body> {
let mut resp = Response::default();
Expand All @@ -21,15 +23,15 @@ pub fn gen_response(code: u16) -> Response<Body> {

pub async fn handle_webhook(
req: Request<Body>,
jobs: Arc<RwLock<job::JobRegistry>>,
job_registry: Arc<RwLock<job::JobRegistry>>,
sender: &mut mpsc::Sender<Uuid>,
) -> Result<Response<Body>, hyper::Error> {
let mut body = hyper::body::aggregate::<Request<Body>>(req).await?;
let bytes = body.to_bytes();
let blob: Result<serde_json::Value, serde_json::Error> = serde_json::from_slice(&bytes);

match blob {
Ok(json) => parse_and_handle(json, jobs, sender).await,
Ok(json) => parse_and_handle(json, job_registry, sender).await,
Err(e) => {
error!("parsing error: {}", e);
Ok::<_, hyper::Error>(gen_response(400))
Expand All @@ -39,9 +41,10 @@ pub async fn handle_webhook(

pub async fn handle_jobs(
_req: Request<Body>,
jobs: Arc<RwLock<job::JobRegistry>>,
job_registry: Arc<RwLock<job::JobRegistry>>,
) -> Result<Response<Body>, hyper::Error> {
let jobs = &*jobs.read().await;
let registry = job_registry.read().await;
let jobs = &registry.jobs;
let mut output = String::new();

output.push_str("<table style=\"width:100%;border:1px solid black;margin-left:auto;margin-right:auto;\">");
Expand Down Expand Up @@ -91,7 +94,7 @@ pub async fn handle_jobs(

async fn parse_and_handle(
json: serde_json::Value,
jobs: Arc<RwLock<job::JobRegistry>>,
job_registry: Arc<RwLock<job::JobRegistry>>,
sender: &mut mpsc::Sender<Uuid>,
) -> Result<Response<Body>, hyper::Error> {

Expand All @@ -111,11 +114,34 @@ async fn parse_and_handle(
let job = job::JobDesc::new(&action, &reviewer, &sha, pr_number, &head_ref);
let job_id = job.id.clone();

let mut jobs = jobs.write().await;
jobs.insert(job_id.clone(), Arc::new(RwLock::new(job)));
{
let mut job_registry = job_registry.write().await;
job_registry.jobs.insert(job_id.clone(), Arc::new(RwLock::new(job)));
let c_job: Arc<RwLock<job::JobDesc>>;
let c_job_id: Uuid;
if let Some(c_id) = job_registry.running_jobs.get(head_ref.clone()) {
c_job_id = c_id.clone();
if let Some(j) = job_registry.jobs.get(&c_job_id) {
c_job = j.clone();
let curr_job = c_job.read().await;
if head_ref == curr_job.head_ref {
info!("New job on same head_ref, killing current job");
// [To-Do] Fix, could potentially kill a process which is already dead
signal::kill(Pid::from_raw(curr_job.pid as i32), Signal::SIGKILL);
}
}
}

job_registry.running_jobs.insert(head_ref.clone().to_string(), job_id.clone());
}

if let Err(e) = sender.send(job_id).await {
error!("failed to send job id to job runner: {}", e);
{
let mut job_registry = job_registry.write().await;
job_registry.running_jobs.remove(head_ref.clone());
}

return Ok::<_, hyper::Error>(gen_response(500));
};
},
Expand Down
Loading