From 082d521eb1f6f6cf2e72e62be84bd6c5b72fe09e Mon Sep 17 00:00:00 2001 From: Nipun Kumar Date: Thu, 25 Feb 2021 22:26:56 +0530 Subject: [PATCH] V0.2 (#1) Thingy 0.2: - move to Actix actor system - add web UI and REST API --- .cargo/config | 2 + .dockerignore | 4 + .vscode/launch.json | 49 ++++ Cargo.toml | 10 +- Dockerfile | 15 + README.md | 84 ++++-- build.sh | 7 + src/branch_actor.rs | 285 +++++++++++++++++++ src/build_actor.rs | 165 +++++++++++ src/git_utils.rs | 88 ++++++ src/job_actor.rs | 118 ++++++++ src/main.rs | 629 +++++++++++++++++++---------------------- src/models.rs | 134 ++++++++- src/thingy.rs | 144 ++++++++++ static/index.html | 673 ++++++++++++++++++++++++++++++++++++++++++++ 15 files changed, 2032 insertions(+), 375 deletions(-) create mode 100644 .cargo/config create mode 100644 .dockerignore create mode 100644 .vscode/launch.json create mode 100644 Dockerfile create mode 100644 src/branch_actor.rs create mode 100644 src/build_actor.rs create mode 100644 src/git_utils.rs create mode 100644 src/job_actor.rs create mode 100644 src/thingy.rs create mode 100644 static/index.html diff --git a/.cargo/config b/.cargo/config new file mode 100644 index 0000000..abc711e --- /dev/null +++ b/.cargo/config @@ -0,0 +1,2 @@ +#[build] +#target = "x86_64-unknown-linux-musl" diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..3ce01b4 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +.idea +.vscode +target +tmp diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..03b6c25 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,49 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "Debug executable 'thingy'", + "cargo": { + "args": [ + "build", + "--bin=thingy", + "--package=thingy" + ], + "filter": { + "name": "thingy", + "kind": "bin" + } + }, + "args": ["./tmp"], + "env": { + "RUST_BACKTRACE": "1", + "SERVE_STATIC": "true" + }, + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in executable 'thingy'", + "cargo": { + "args": [ + "test", + "--no-run", + "--bin=thingy", + "--package=thingy" + ], + "filter": { + "name": "thingy", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 617e0ab..fc44d7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "thingy" -version = "0.1.1" +version = "0.2.0" authors = ["n-k "] description = "Lightweight build server and thing-doer" homepage = "https://github.com/n-k/thingy" @@ -13,9 +13,13 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" serde_yaml = "0.8" git2 = "0.13" failure = "0.1" chrono = "0.4" -tempfile = "3.1" -file-lock = "1.1" +tempfile = "3" +actix = "0.10" +actix-web = "3" +actix-web-actors = "3" +actix-files = "0.5.0" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6ed9310 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +# Build Stage +FROM clux/muslrust AS builder +WORKDIR /usr/src/thingy +COPY . . +RUN cargo install --target x86_64-unknown-linux-musl --path . + +# Bundle Stage +FROM alpine +WORKDIR /app +COPY --from=builder /root/.cargo/bin/thingy . +ENV LISTEN_ADDRESS=0.0.0.0 +WORKDIR /workspace +EXPOSE 8080 +USER 1000 +CMD ["/app/thingy", "/workspace"] diff --git a/README.md b/README.md index f990dee..24a6e24 100644 --- a/README.md +++ b/README.md @@ -1,43 +1,91 @@ # thingy Lightweight build server and thing-doer -## Using thingy -Installation: `cargo install --force thingy` +## Installation +`cargo install --force thingy` -Run: `thingy ` +## Usage +Run: `thingy path/to/workspace/folder`, then go to `http://localhost:8080/` -Thingy is a command line application and works inside a 'workspace' folder. A thingy workspace is a plain folder with a `thingy.yaml` file in it. This file's structure is based on [this struct](./src/models.rs#L4). This file lists build jobs and configurations. +### Configuration +Thingy has few configuration options, which are provided as optional environment variables: + +|Environment Variable|Default value| | +|-|-|-| +|`LISTEN_ADDRESS`|`127.0.0.1`|Address to bind web server to| +|`LISTEN_PORT`|`8080`|Port web server listens on| + + +Thingy works inside a 'workspace' folder. A thingy workspace is a plain folder with a `thingy.yaml` file in it. This file's structure is based on [this struct](./src/models.rs#L7). This file lists build jobs and configurations. If this file does not exist, an empty config with no jobs will be created. Jobs can then be added from web UI. An example of a workspace file: ```yaml jobs: - - name: "test" + - name: "test" # names must be unique within workspace repo_url: "git@github.com:n-k/thingy.git" - branch: "master" - build_script: "build.sh" # should be an executable file present in the repository - poll_interval_seconds: 300 + build_script: "build.sh" # should be an executable file present in the repository, see build.sh in this repository for example + poll_interval_seconds: 300 # optional auth: # optional - PrivateKey: # currently only supported method, besides no auth + PrivateKey: path: "/path/to/your/ssh/private/key" passphrase: "optional - if key has passphrase" - + - name: "test2" + repo_url: "../../some/path/to/repo.git" + build_script: "build.sh" + auth: # optional + UserPass: + username: "username" + password: "password" ``` -In this example, it is assumed that the repository contains an executable file `build.sh`. When a new commit is being built, thingy will pull the code, set CWD to the checkout directory, and run `build.sh` with a few special envronment variables. See next section for list of additional environment variables. -### List of environment variables provided to build scripts +In this example, it is assumed that the repository contains an executable file `build.sh`. When a new commit is being built, thingy will pull the code, and run `build.sh` in the checkout directory with a few special envronment variables. See next section for list of additional environment variables. + +### List of additional environment variables provided to build scripts - `BRANCH`: name of branch being built - `COMMIT_HASH`: current commit hash being built +Any environment variables passed to the thingy executable are also passed to the buld processes. + +## Features +- Multi-branch Git poll/build +- REST API +- Simple, but functional web interface +- Log viewer, with tailing support for running builds ## Roadmap -- [x] Single branch Git poll/build -- [ ] Multi-branch Git poll/build -- [ ] Web hooks -- [ ] Secrets (other than auth) +- Github account support - allow authenticating with github API token, and listing repositories. +- Support docker builds. It would be good to have more support for docker bulds, but for now, having docker commands in the build scripts works well enough. +- Secrets. It will be good to have better support for secret management. For now, the thingy.yaml file can have git credentials. This file is not expected to be shared or be public, so at least for my setup, it is safe to put credentials in it. + For other secrets, any environment variables passed to the thingy executable are passed on to build processes, which can be used to, e.g., provide paths to files containing other secrets. ## FAQ 1. Why? - This has the minimal set of features which I need for my personal projects, and home-lab automation things. Every other alternative seemed overkill for my needs. I also run this on Raspberry Pi's, and this project will always focus on low resource consumption. 2. Is this going to be maintained? Will you add features? - - I use this myself, so I will maintain at least the current features and a few more (please see roadmap section). If you would like to see some additional features, please open a Github issue, or send a PR. + - I use this myself, so I will maintain at least the current features and a few more (please see roadmap section). If you would like to see some additional features, please open a Github issue, or send a PR - both are very welcome. 3. Why only Git support? - - I only have Git repositories. PRs are very welcome for supporting others. + - I only have Git repositories. + + +# Design +Thingy works on top of Actix actors, and a REST API made with Actix-web. Each component in thingy is an actor. + +Actors in thingy form a tree, with one root. The organization looks like this: + + - Thingy actor (root) + - 1 Actor per job + - 1 Actor per branch of the job's repository + - Temporary actors for each build + +## Structure of workspace directories +``` +workspace_directory/ + thingy.yaml (job definitions) + job_1/ (directory name is same as job name) + branch_1/ + data.json (saved state for this branch, contains past/ongoing builds, last seen commit hash) + build_num.txt (number of latest build to have been started, keeps increasing by 1) + 1/ + repo/ (directory where this build cloned the repository) + ... files from repo ... + log.txt (build logs, both stdout and stderr are captured, and prefixed by [out] or [err]) +``` diff --git a/build.sh b/build.sh index c7bd784..1de34b5 100755 --- a/build.sh +++ b/build.sh @@ -1,3 +1,10 @@ #!/bin/bash +echo "=== Sample build script ===" +echo "= Note that this can be any executable file, like a python script =" cargo test cargo build +# also make musl build +mkdir -p ./tmp +docker build -t thingy:$(BRANCH) . +docker run --rm -it -v "$PWD"/tmp:/tmp thingy:$(BRANCH) sh -c "cp -fv /app/thingy /tmp/" +# distribute/deploy builds? diff --git a/src/branch_actor.rs b/src/branch_actor.rs new file mode 100644 index 0000000..4f987f9 --- /dev/null +++ b/src/branch_actor.rs @@ -0,0 +1,285 @@ +use actix::prelude::*; +use serde::{Deserialize, Serialize}; +use std::{ + fs::{create_dir_all, remove_dir_all, File}, + io::{BufRead, BufReader}, + path::PathBuf, +}; + +use crate::{build_actor::BuildActor, git_utils::clone_commit, models::Job}; + +#[derive(Debug)] +pub struct BranchActor { + job: Job, + branch: String, + dir: PathBuf, + builds: Vec, + state: BranchDetails, +} + +impl BranchActor { + pub fn new(job: Job, branch: String, dir: PathBuf, last_seen_commit: Option) -> Self { + BranchActor { + job, + branch, + dir, + builds: vec![], + state: BranchDetails { + last_seen_commit, + builds: vec![], + }, + } + } + + fn inc_build_num(&mut self) -> Result { + let build_num_file = self.dir.join("build_num.txt"); + let next_num: u64 = if build_num_file.exists() { + let n: u64 = std::fs::read_to_string(&build_num_file)? + .parse() + .unwrap_or_default(); + n + 1 + } else { + 1 + }; + std::fs::write(build_num_file, format!("{}", next_num).as_bytes())?; + Ok(next_num) + } + + fn start_build( + &mut self, + _ctx: &mut Context, + hash: Option, + ) -> Result<(), std::io::Error> { + let bn = self.inc_build_num()?; + // start a build, update last_seen + let build_dir = self.dir.join(&format!("{}", bn)); + if build_dir.exists() { + remove_dir_all(&build_dir)?; + } + create_dir_all(&build_dir)?; + let checkout_dir = build_dir.join("repo"); + create_dir_all(&checkout_dir)?; + // do build + if let Ok(_) = clone_commit( + &self.job.repo_url, + &self.branch, + hash.clone(), + &checkout_dir, + self.job.auth.as_ref(), + ) { + let h = BuildActor::new( + self.job.build_script.clone(), + checkout_dir.clone(), + hash.clone(), + _ctx.address(), + build_dir.join("log.txt"), + bn, + ) + .start(); + self.builds.push(BuildLink { + build_num: bn, + addr: h, + }); + } + if hash.is_some() { + self.state.last_seen_commit = hash.clone(); + } + let build = BuildDetails { + build_num: bn, + commit_hash: hash, + status: "building".into(), + }; + self.state.builds.push(build); + self.write_data_file()?; + Ok(()) + } + + fn get_data_path(&self) -> PathBuf { + self.dir.join("data.json") + } + + fn write_data_file(&self) -> Result<(), std::io::Error> { + std::fs::write(self.get_data_path(), serde_json::to_string(&self.state)?)?; + Ok(()) + } +} + +impl Actor for BranchActor { + type Context = Context; + + fn started(&mut self, _ctx: &mut Context) { + // create build num and data files + let build_num_path = self.dir.join("build_num.txt"); + if !build_num_path.exists() { + std::fs::write(build_num_path, "0".as_bytes()).unwrap(); + } + let data_path = self.get_data_path(); + if !data_path.exists() { + self.write_data_file().unwrap(); + } else { + let det: BranchDetails = + serde_json::from_str(std::fs::read_to_string(data_path).unwrap().as_str()).unwrap(); + self.state = det; + } + } + + fn stopped(&mut self, _ctx: &mut Context) {} +} + +#[derive(Message, Debug)] +#[rtype(result = "Result<(), std::io::Error>")] +pub struct NewCommitMsg(pub String); + +#[derive(Message, Debug)] +#[rtype(result = "Result<(), std::io::Error>")] +pub struct BuildStoppedMsg { + pub build_num: u64, + pub status: String, +} + +#[derive(Message, Debug)] +#[rtype(result = "Result")] +pub struct GetBranchDetailsMsg; + +#[derive(Debug, Clone)] +struct BuildLink { + build_num: u64, + addr: Addr, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BranchDetails { + last_seen_commit: Option, + builds: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BuildDetails { + build_num: u64, + commit_hash: Option, + status: String, +} + +#[derive(Message, Debug)] +#[rtype(result = "Result")] +pub struct GetBuildLogLinesMsg { + pub build_num: u64, + pub start: u32, + pub num_lines: u32, +} +#[derive(Debug, Serialize)] +pub struct LogResponse { + pub lines: Vec, + pub has_more: bool, + pub status: Option, +} + +#[derive(Message, Debug)] +#[rtype(result = "Result>, std::io::Error>")] +pub struct GetBuildActorMsg(pub u64); + +#[derive(Message, Debug)] +#[rtype(result = "Result<(), std::io::Error>")] +pub struct BuildNowMsg; + +impl Handler for BranchActor { + type Result = Result<(), std::io::Error>; + + fn handle(&mut self, _msg: BuildNowMsg, ctx: &mut Self::Context) -> Self::Result { + self.start_build(ctx, None)?; + Ok(()) + } +} + +impl Handler for BranchActor { + type Result = Result<(), std::io::Error>; + + fn handle(&mut self, msg: NewCommitMsg, ctx: &mut Self::Context) -> Self::Result { + let hash = Some(msg.0.clone()); + if !self.state.last_seen_commit.eq(&hash) { + self.start_build(ctx, hash)?; + } + Ok(()) + } +} + +impl Handler for BranchActor { + type Result = Result<(), std::io::Error>; + + fn handle(&mut self, msg: BuildStoppedMsg, _ctx: &mut Self::Context) -> Self::Result { + self.builds = self + .builds + .clone() + .into_iter() + .filter(|b| b.build_num != msg.build_num) + .collect(); + self.state + .builds + .iter_mut() + .filter(|b| b.build_num == msg.build_num) + .for_each(|b| { + b.status = msg.status.clone(); + }); + self.write_data_file()?; + Ok(()) + } +} + +impl Handler for BranchActor { + type Result = Result; + + fn handle(&mut self, _msg: GetBranchDetailsMsg, _ctx: &mut Self::Context) -> Self::Result { + Ok(self.state.clone()) + } +} + +impl Handler for BranchActor { + type Result = Result; + + fn handle(&mut self, _msg: GetBuildLogLinesMsg, _ctx: &mut Self::Context) -> Self::Result { + let log_file = self.dir.join(format!("{}", _msg.build_num)).join("log.txt"); + if log_file.exists() { + let file = File::open(&log_file)?; + let reader = BufReader::new(file); + + let lines = reader + .lines() + .filter(|l| l.is_ok()) + .map(|l| l.unwrap()) + .skip(_msg.start as usize); + let mut batch: Vec = lines.take(_msg.num_lines as usize + 1).collect(); + let has_more = batch.len() >= _msg.num_lines as usize; + if has_more { + for _ in batch.drain(_msg.num_lines as usize..) {} + } + let status = self + .state + .builds + .iter() + .find(|b| b.build_num == _msg.build_num) + .map(|b| b.status.clone()); + return Ok(LogResponse { + lines: batch, + has_more, + status, + }); + } + Ok(LogResponse { + lines: vec![], + has_more: false, + status: None, + }) + } +} + +impl Handler for BranchActor { + type Result = Result>, std::io::Error>; + + fn handle(&mut self, msg: GetBuildActorMsg, _ctx: &mut Self::Context) -> Self::Result { + Ok(self + .builds + .iter() + .find(|l| l.build_num == msg.0) + .map(|a| a.addr.clone())) + } +} diff --git a/src/build_actor.rs b/src/build_actor.rs new file mode 100644 index 0000000..827c3cf --- /dev/null +++ b/src/build_actor.rs @@ -0,0 +1,165 @@ +use actix::prelude::*; +use std::io::prelude::*; +use std::{ + fs::OpenOptions, + io::{BufRead, BufReader}, + path::PathBuf, + process::{Child, Command, Stdio}, + writeln, +}; + +use crate::branch_actor::{BranchActor, BuildStoppedMsg}; + +#[derive(Debug)] +pub struct BuildActor { + command: String, + dir: PathBuf, + commit_hash: Option, + parent: Addr, + log_file_path: PathBuf, + process: Option, + num: u64, + status: String, +} + +impl BuildActor { + pub fn new( + command: String, + dir: PathBuf, + commit_hash: Option, + parent: Addr, + log_file_path: PathBuf, + num: u64, + ) -> Self { + BuildActor { + command, + dir, + commit_hash, + parent, + log_file_path, + process: None, + num, + status: "finished".into(), + } + } +} + +impl Actor for BuildActor { + type Context = Context; + + fn started(&mut self, _ctx: &mut Context) { + println!("Build started"); + let mut args: Vec = self + .command + .as_str() + .split(" ") + .filter(|s| !s.is_empty()) + .map(|s| s.into()) + .collect(); + let cmd = args[0].clone(); + let cmd = self.dir.join(cmd); + args.drain(0..1); + + let mut command = Command::new(cmd); + command.args(args); + let spawn_result = command + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + // always adding this, in case the child process has something + // to do with python and has the insane behavior of not flushing + // std stream file deccriptors on print + .env("PYTHONUNBUFFERED", "1") + // .env("BRANCH", &self.branch) + .current_dir(&self.dir) + .spawn(); + if let Ok(mut child) = spawn_result { + let std_out = child.stdout.take().unwrap(); + let std_err = child.stderr.take().unwrap(); + // self.process.replace(child); + + // spawn threada to transfer buffers and notify actor + let reader = BufReader::new(std_out); + let log_file = self.log_file_path.clone(); + let h = std::thread::spawn(move || { + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(log_file) + .unwrap(); + + reader + .lines() + .filter_map(|line| line.ok()) + .for_each(|line| { + let _ = writeln!(file, "[out] {}", line); + }); + }); + let reader = BufReader::new(std_err); + let log_file = self.log_file_path.clone(); + let h2 = std::thread::spawn(move || { + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(log_file) + .unwrap(); + reader + .lines() + .filter_map(|line| line.ok()) + .for_each(|line| { + let _ = writeln!(file, "[err] {}", line); + }); + }); + let adr = _ctx.address(); + let _ = std::thread::spawn(move || { + let _ = h.join(); + let _ = h2.join(); + adr.do_send(StopBuildMessage); + }); + } else { + _ctx.stop(); + } + } + + fn stopped(&mut self, _ctx: &mut Context) { + match self.process.take() { + Some(mut ch) => { + if let Ok(Some(status)) = ch.try_wait() { + self.status = if status.success() { + "finished".into() + } else { + "error".into() + }; + } else { + let _ = ch.kill(); + if let Ok(Some(status)) = ch.try_wait() { + self.status = if status.success() { + "finished".into() + } else { + "error".into() + }; + } + } + } + None => {} + } + self.parent.do_send(BuildStoppedMsg { + build_num: self.num, + status: self.status.clone(), + }); + println!("Build finished"); + } +} + +#[derive(Message, Debug)] +#[rtype(result = "Result<(), std::io::Error>")] +pub struct StopBuildMessage; + +impl Handler for BuildActor { + type Result = Result<(), std::io::Error>; + + fn handle(&mut self, _msg: StopBuildMessage, _ctx: &mut Context) -> Self::Result { + Ok(_ctx.stop()) + } +} diff --git a/src/git_utils.rs b/src/git_utils.rs new file mode 100644 index 0000000..02aa643 --- /dev/null +++ b/src/git_utils.rs @@ -0,0 +1,88 @@ +use crate::models::*; +use git2::{build::RepoBuilder, Direction, FetchOptions, Repository}; +use std::{self, collections::HashMap, error::Error, path::PathBuf}; +use tempfile::TempDir; + +pub fn clone_commit( + url: &str, + branch: &str, + commit_hash: Option, + dir: &PathBuf, + auth: Option<&GitAuth>, +) -> Result<(), Box> { + let mut callbacks = git2::RemoteCallbacks::new(); + callbacks.credentials( + |_user: &str, user_from_url: Option<&str>, _cred: git2::CredentialType| match auth { + Some(a) => match a { + GitAuth::PrivateKey { path, passphrase } => git2::Cred::ssh_key( + user_from_url.unwrap_or("git"), + None, + std::path::Path::new(path), + passphrase.as_ref().map(|s| s.as_str()), + ), + GitAuth::UserPass { username, password } => { + git2::Cred::userpass_plaintext(username, password) + } + }, + None => git2::Cred::default(), + }, + ); + let mut fo = FetchOptions::new(); + fo.remote_callbacks(callbacks); + + let repo = RepoBuilder::new() + .fetch_options(fo) + .branch(branch) + .clone(url, dir)?; + + if let Some(commit_hash) = commit_hash { + let commit_hash = commit_hash.as_str(); + let oid = git2::Oid::from_str(commit_hash)?; + let commit = repo.find_commit(oid)?; + + repo.branch(commit_hash, &commit, false)?; + let obj = repo.revparse_single(&("refs/heads/".to_owned() + commit_hash))?; + repo.checkout_tree(&obj, None)?; + repo.set_head(&("refs/heads/".to_owned() + commit_hash))?; + } + + Ok(()) +} + +pub fn get_branch_hashes( + url: &str, + auth: Option<&GitAuth>, +) -> Result, Box> { + let tmp_dir = TempDir::new()?; + let repo = Repository::init(tmp_dir.path())?; + + let mut callbacks = git2::RemoteCallbacks::new(); + callbacks.credentials( + |_user: &str, user_from_url: Option<&str>, _cred: git2::CredentialType| match auth { + Some(a) => match a { + GitAuth::PrivateKey { path, passphrase } => git2::Cred::ssh_key( + user_from_url.unwrap_or("git"), + None, + std::path::Path::new(path), + passphrase.as_ref().map(|s| s.as_str()), + ), + GitAuth::UserPass { username, password } => { + git2::Cred::userpass_plaintext(username, password) + } + }, + None => git2::Cred::default(), + }, + ); + + let mut remote = repo.remote("origin", url)?; + let connection = remote.connect_auth(Direction::Fetch, Some(callbacks), None)?; + let mut hashes: HashMap = HashMap::new(); + for b in connection.list()?.iter() { + if b.name().starts_with("refs/heads/") { + let bname = b.name()[11..].to_string(); + hashes.insert(bname, b.oid().to_string()); + } + } + + Ok(hashes) +} diff --git a/src/job_actor.rs b/src/job_actor.rs new file mode 100644 index 0000000..0a81781 --- /dev/null +++ b/src/job_actor.rs @@ -0,0 +1,118 @@ +use std::{collections::HashMap, fs::create_dir_all, path::PathBuf, time::Duration}; + +use crate::{ + branch_actor::{BranchActor, NewCommitMsg}, + git_utils::get_branch_hashes, + models::*, +}; +use actix::prelude::*; +use serde::Serialize; + +#[derive(Debug)] +pub struct JobActor { + pub job: Job, + pub dir: PathBuf, + pub branch_actors: HashMap>, +} + +impl JobActor { + pub fn new(job: Job, dir: PathBuf) -> Self { + JobActor { + job, + dir, + branch_actors: HashMap::new(), + } + } + + /// poll branches for a job + fn _poll(&mut self, context: &mut Context) { + context.address().do_send(JobPollMsg); + } +} + +#[derive(Message, Debug)] +#[rtype(result = "Result<(), std::io::Error>")] +pub struct JobPollMsg; + +#[derive(Message, Debug)] +#[rtype(result = "Result")] +pub struct GetJobDetailsMsg; + +#[derive(Debug, Serialize)] +pub struct JobDetailsResponse { + name: String, + branches: Vec, +} + +#[derive(Message, Debug)] +#[rtype(result = "Result>, std::io::Error>")] +pub struct GetBranchActorMsg(pub String); + +impl Actor for JobActor { + type Context = Context; + + fn started(&mut self, _ctx: &mut Context) { + if let Some(i) = self.job.poll_interval_seconds { + _ctx.run_interval(Duration::from_secs(i), Self::_poll); + } + _ctx.notify(JobPollMsg); + } + + fn stopped(&mut self, _ctx: &mut Context) {} +} + +impl Handler for JobActor { + type Result = Result<(), std::io::Error>; + + fn handle(&mut self, _msg: JobPollMsg, _ctx: &mut Self::Context) -> Self::Result { + if let Ok(hashes) = get_branch_hashes(&self.job.repo_url, self.job.auth.as_ref()) { + for (k, v) in hashes.iter() { + match self.branch_actors.get(k) { + Some(a) => { + a.do_send(NewCommitMsg(v.clone())); + } + _ => { + // ensure dir + let bpath = self.dir.join(k); + create_dir_all(&bpath)?; + let h = + BranchActor::new(self.job.clone(), k.clone(), bpath, None).start(); + self.branch_actors.insert(k.clone(), h); + self.branch_actors + .get(k) + .unwrap() + .do_send(NewCommitMsg(v.clone())); + } + } + } + // remove branches which are no longer present + self.branch_actors = self + .branch_actors + .clone() + .into_iter() + .filter(|(k, _)| hashes.contains_key(k)) + .collect(); + } + Ok(()) + } +} + +impl Handler for JobActor { + type Result = Result; + + fn handle(&mut self, _msg: GetJobDetailsMsg, _ctx: &mut Self::Context) -> Self::Result { + let branches: Vec = self.branch_actors.iter().map(|(k, _)| k.clone()).collect(); + Ok(JobDetailsResponse { + name: self.job.name.clone(), + branches, + }) + } +} + +impl Handler for JobActor { + type Result = Result>, std::io::Error>; + + fn handle(&mut self, msg: GetBranchActorMsg, _ctx: &mut Self::Context) -> Self::Result { + Ok(self.branch_actors.get(&msg.0).map(|a| a.clone())) + } +} diff --git a/src/main.rs b/src/main.rs index 198db7b..e9c5140 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,391 +1,340 @@ -use chrono::{DateTime, Utc}; -use git2::{build::RepoBuilder, Direction, FetchOptions, Repository}; -use serde_yaml; -use std::{ - self, - collections::HashSet, - error::Error, - io::{BufRead, BufReader}, - path::PathBuf, - process::{Command, Stdio}, - sync::mpsc::Sender, - thread::JoinHandle, - time::Duration, - time::SystemTime, +#![forbid(unsafe_code)] +use std::{collections::HashMap, fmt::Display, path::PathBuf}; + +use actix::prelude::*; +use branch_actor::{BuildNowMsg, GetBranchDetailsMsg, GetBuildActorMsg, GetBuildLogLinesMsg}; +use build_actor::StopBuildMessage; +use job_actor::{GetBranchActorMsg, GetJobDetailsMsg, JobPollMsg}; +use thingy::{ + AddJobMsg, GetJobActorMsg, GetJobActorResponse, GetJobsMsg, RemoveJobMsg, Thingy, +}; + +use actix_files as fs; +use actix_web::{ + delete, + dev::HttpResponseBuilder, + get, + http::{header, StatusCode}, + post, web, App, HttpResponse, HttpServer, Responder, }; -use tempfile::TempDir; +use serde::Deserialize; + +mod branch_actor; +mod build_actor; +mod git_utils; +mod job_actor; mod models; +mod thingy; use models::*; -pub fn main() { +#[actix_web::main] +async fn main() -> std::io::Result<()> { let mut args = std::env::args(); if args.len() < 2 { - eprintln!("Usage: thingy "); - return; + eprintln!("Usage: thingy "); + return Ok(()); } args.next(); let path = args.next(); if path.is_none() { - eprintln!("Usage: thingy "); - return; + eprintln!("Usage: thingy "); + return Ok(()); } let path = path.unwrap(); - println!("Starting thingy in '{}'", path); - match std::env::current_dir() { - Ok(p) => { - let path = p.join(path).canonicalize(); - if let Err(err) = &path { - eprintln!("Could not get canonical dir. Exiting. Error: {:?}", err); - return; - } - let path = path.unwrap(); - println!("Changing working directory to '{:?}' ...", path); - match std::env::set_current_dir(&path) { - Ok(_) => { - println!("Done."); - start(path); - return; - } - Err(err) => { - eprintln!("Could not change current dir. Exiting. Error: {:?}", err); - return; - } - } - } - _ => { - eprintln!("Could not get current dir. Exiting."); - return; + let path = PathBuf::from(path).canonicalize()?; + let ws = Workspace::from_dir_path(&path).unwrap(); + let state = ThingyState { + root: Thingy::new(ws, path.into()).start(), + }; + + let listen_addr: String = if let Ok(addr) = std::env::var("LISTEN_ADDRESS") { + addr + } else { + "127.0.0.1".into() + }; + let port: u16 = if let Ok(Ok(p)) = std::env::var("LISTEN_PORT").map(|pstr| pstr.parse()) { + p + } else { + 8080 + }; + HttpServer::new(move || { + let mut app = App::new() + .data(state.clone()) + .service(index) + .service(get_jobs) + .service(create_job) + .service(delete_job) + .service(poll) + .service(get_job) + .service(get_branch) + .service(force_build) + .service(get_build_log) + .service(abort_build); + if let Ok(_) = std::env::var("SERVE_STATIC") { + app = app.service(fs::Files::new("/", "./static/").show_files_listing()); } - } + app + }) + .bind((listen_addr, port))? + .run() + .await?; + println!("shutting down..."); + Ok(()) } -fn start(path: PathBuf) { - println!("Initing thingy in workspace {:?}", &path); +/// Process-wide state +#[derive(Clone)] +struct ThingyState { + /// address of the root actor + root: Addr, +} - let ws_yaml_path = path.clone().join("thingy.yaml"); +/// Contents of static/index.html , served at GET / +static HTML_BYTES: &[u8] = include_bytes!("../static/index.html"); - let md = std::fs::metadata(&ws_yaml_path); - if let Err(err) = &md { - eprintln!( - "Could not read config from {:?}. Exiting. Does the file exist? Error: {:?}", - &ws_yaml_path, &err - ); - return; +/// Struct for error messages +#[derive(Debug)] +struct ApiMessage { + status: StatusCode, + message: String, +} + +/// Converts to JSON string +impl Display for ApiMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut response: HashMap = HashMap::new(); + response.insert("message".into(), self.message.clone()); + f.write_str(&serde_json::to_string(&response).unwrap()) } - let md = md.unwrap(); - if !md.is_file() { - eprintln!("{:?} is not a regular file. Exiting.", &ws_yaml_path); - return; +} + +impl ApiMessage { + fn new() -> Self { + ApiMessage { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "Internal server error".into(), + } } - let contents = std::fs::read_to_string(&ws_yaml_path); - if let Err(err) = &contents { - eprintln!( - "Could not read {:?}. Exiting. Error: {:?}", - &ws_yaml_path, &err - ); - return; + fn new_with_status(status: StatusCode, message: &str) -> Self { + ApiMessage { + status, + message: message.into(), + } } - let contents = contents.unwrap(); - let ws = serde_yaml::from_str::(&contents); +} - if let Err(err) = &ws { - eprintln!( - "Could not read {:?}. Exiting. Does the file contain valid YAML? Error: {:?}", - &ws_yaml_path, &err - ); - return; +/// Convert ApiMessage to an actix error +impl actix_web::error::ResponseError for ApiMessage { + fn status_code(&self) -> actix_web::http::StatusCode { + self.status } - let ws = ws.unwrap(); - let names: Vec<&str> = ws.jobs.iter().map(|j| j.name.trim()).collect(); + fn error_response(&self) -> HttpResponse { + HttpResponseBuilder::new(self.status_code()) + .set_header(header::CONTENT_TYPE, "application/json; charset=utf-8") + .body(self.to_string()) + } +} - let mut uniq = HashSet::<&str>::new(); - for n in names { - if n.is_empty() { - eprintln!("Found job with empty name. Exiting."); - return; - } - if uniq.contains(n) { - eprintln!("Workspace config contains duplicate jobs with name '{}'. Note that names are trimmed when read. Exiting.", n); - return; - } - uniq.insert(n); +/// Convert actor error to ApiMessage +impl From for ApiMessage { + fn from(_: MailboxError) -> Self { + ApiMessage::new() } +} - for j in &ws.jobs { - if let Err(err) = j.validate() { - eprintln!("Configuration for {} is invalid: {}. Exiting.", j.name, err); - return; - } +/// Convert I/O error to ApiMessage +impl From for ApiMessage { + // TODO: get description from io error + fn from(_: std::io::Error) -> Self { + ApiMessage::new() } +} - let mut handles: Vec> = vec![]; - let (s, r) = std::sync::mpsc::channel::(); +/// Index page, serves contents of static/index.html +/// index.html contains all the ui code for thingy +#[get("/")] +async fn index() -> impl Responder { + HttpResponse::Ok() + .content_type("text/html") + .body(HTML_BYTES) +} - // ensure job dirs - for j in &ws.jobs { - let name = j.name.trim(); - let dir = path.join(name); +/// List jobs +#[get("/jobs")] +async fn get_jobs(data: web::Data) -> Result { + Ok(HttpResponse::Ok().json(data.root.send(GetJobsMsg).await??.0)) +} - if dir.is_file() { - eprintln!( - "{:?} is a regular file. Expected directory or nothing.", - &dir - ); - return; - } +/// Add new job to workspace, this updates the /thingy.yaml file +#[post("/jobs")] +async fn create_job( + req: web::Json, + data: web::Data, +) -> Result { + data.root.send(AddJobMsg(req.into_inner())).await??; + Ok(HttpResponse::NoContent().body("")) +} - if !dir.exists() { - if let Err(err) = std::fs::create_dir_all(&dir) { - eprintln!( - "Could not create job dir {:?}. Exiting. Error: {:?}", - &dir, &err - ); - return; - } - } +/// Remove a job from workspace, this updates the /thingy.yaml file +/// Any ongoing builds related to thsi job will not be stopped immediately +#[delete("/jobs/{jobId}")] +async fn delete_job( + path: web::Path<(String,)>, + data: web::Data, +) -> Result { + let id = path.into_inner().0; + data.root.send(RemoveJobMsg(id)).await??; + Ok(HttpResponse::NoContent().body("")) +} - // start a thread to handle this job - let job = j.clone(); - let sender = s.clone(); - let t = std::thread::spawn(move || { - job_work_loop(job, sender, dir); - }); - handles.push(t); +/// Poll a job's repository URL now. This overrides any poll interval set +/// or the job, and resets it so that next automatic poll will happen after +// the usual duration of this command +#[post("/jobs/{jobId}/poll")] +async fn poll( + path: web::Path<(String,)>, + data: web::Data, +) -> Result { + let id = path.into_inner().0; + if let GetJobActorResponse(Some(addr)) = data.root.send(GetJobActorMsg(id)).await?? { + addr.do_send(JobPollMsg); + Ok(HttpResponse::Ok() + .content_type("application/json") + .body("{\"status\": \"OK\"}")) + } else { + Err(ApiMessage::new_with_status( + StatusCode::NOT_FOUND, + "Not found", + )) } +} - while let Ok(je) = r.recv() { - match &je { - JobEvent::Log { - job, - line, - is_stderr, - } => { - let now = SystemTime::now(); - let now: DateTime = now.into(); - let now = now.to_rfc3339(); +/// Get details of a job +#[get("/jobs/{jobId}")] +async fn get_job( + path: web::Path<(String,)>, + data: web::Data, +) -> Result { + let id = path.into_inner().0; + if let GetJobActorResponse(Some(addr)) = data.root.send(GetJobActorMsg(id)).await?? { + Ok(HttpResponse::Ok().json(addr.send(GetJobDetailsMsg).await??)) + } else { + Err(ApiMessage::new_with_status( + StatusCode::NOT_FOUND, + "Not found", + )) + } +} - if *is_stderr { - eprintln!("{} [{}] {}", now, job, line); - } else { - println!("{} [{}] {}", now, job, line); - } - } +#[get("/jobs/{jobId}/branches/{branch}")] +async fn get_branch( + path: web::Path<(String, String)>, + data: web::Data, +) -> Result { + let path = path.into_inner(); + let job_id = path.0; + let branch = path.1; + if let GetJobActorResponse(Some(addr)) = data.root.send(GetJobActorMsg(job_id)).await?? { + if let Some(addr) = addr.send(GetBranchActorMsg(branch)).await?? { + Ok(HttpResponse::Ok().json(addr.send(GetBranchDetailsMsg).await??)) + } else { + Err(ApiMessage::new_with_status( + StatusCode::NOT_FOUND, + "Not found", + )) } + } else { + Err(ApiMessage::new_with_status( + StatusCode::NOT_FOUND, + "Not found", + )) } } -fn job_work_loop(job: Job, sender: Sender, dir: PathBuf) { - let poll_interval = Duration::from_secs(job.poll_interval_seconds); - loop { - let _ = sender.send(JobEvent::Log { - job: job.name.clone(), - line: format!("Scanning repo..."), - is_stderr: true, - }); - let hash = get_branch_hash(&job.repo_url, &job.branch, job.auth.as_ref()); - match hash { - Ok(hash) => { - let hash_file = dir.clone().join("last_commit_hash.txt"); - let old_hash = std::fs::read_to_string(&hash_file).unwrap_or("".into()); - let old_hash = old_hash.trim().to_string(); - if !old_hash.eq(&hash) { - // build this commit - let clone_dir = dir.clone().join(&hash).join("checkout"); - if clone_dir.exists() { - let _ = std::fs::remove_dir_all(&clone_dir); - } - if let Err(err) = clone_commit( - &job.repo_url, - &job.branch, - &hash, - &clone_dir, - job.auth.as_ref(), - ) { - let _ = sender.send(JobEvent::Log { - job: job.name.clone(), - line: format!("Could not clone repo. Error: {}", &err), - is_stderr: true, - }); - } else { - // start the build script in clone dir - let cmd = &job.build_script; - let mut args: Vec = cmd - .split(" ") - .filter(|s| !s.is_empty()) - .map(|s| s.into()) - .collect(); - let cmd = args[0].clone(); - let cmd = clone_dir.clone().join(cmd); - args.drain(0..1); - - let mut command = Command::new(cmd); - command.args(args); - let spawn_result = command - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - // always adding this, in case the child process has something - // to do with python and has the insane behavior of not flushing - // std stream file deccriptors on print - .env("PYTHONUNBUFFERED", "1") - .env("BRANCH", &job.branch) - .env("COMMIT_HASH", &hash) - .current_dir(&clone_dir) - .spawn(); - - if let Err(err) = &spawn_result { - let je = JobEvent::Log { - job: job.name.clone(), - line: format!("Could not spawn build process. Error: {}", &err), - is_stderr: true, - }; - let _ = sender.send(je); - return; - } - let mut printers: Vec> = vec![]; - let mut child = spawn_result.unwrap(); - let std_out = child.stdout.take(); - let std_err = child.stderr.take(); - if let Some(std_out) = std_out { - let reader = BufReader::new(std_out); - let sender_clone = sender.clone(); - let name = job.name.clone(); - let h = std::thread::spawn(move || { - reader - .lines() - .filter_map(|line| line.ok()) - .for_each(|line| { - let je = JobEvent::Log { - job: name.clone(), - line, - is_stderr: false, - }; - let _ = sender_clone.send(je); - }); - }); - printers.push(h); - } - if let Some(std_err) = std_err { - let reader = BufReader::new(std_err); - let sender_clone = sender.clone(); - let name = job.name.clone(); - let h = std::thread::spawn(move || { - reader - .lines() - .filter_map(|line| line.ok()) - .for_each(|line| { - let je = JobEvent::Log { - job: name.clone(), - line, - is_stderr: true, - }; - let _ = sender_clone.send(je); - }); - }); - printers.push(h); - } - - for h in printers { - let _ = h.join(); - } +#[derive(Deserialize)] +struct LogRequest { + start: u32, + num_lines: u32, +} - let _ = std::fs::write(&hash_file, &hash); - } - } - } - Err(err) => { - let je = JobEvent::Log { - job: job.name.clone(), - line: format!("Could not get commit hash. Error: {}", &err), - is_stderr: true, - }; - if let Err(err) = sender.send(je) { - eprintln!( - "[{}] Could not send event. Exiting worker thread. {}", - &job.name, &err - ); - return; - } - } +#[get("/jobs/{jobId}/branches/{branch}/builds/{build_num}/log")] +async fn get_build_log( + path: web::Path<(String, String, u64)>, + req_info: web::Query, + data: web::Data, +) -> Result { + let path = path.into_inner(); + let job_id = path.0; + let branch = path.1; + let build_num = path.2; + let info = req_info.into_inner(); + if let GetJobActorResponse(Some(addr)) = data.root.send(GetJobActorMsg(job_id)).await?? { + if let Some(addr) = addr.send(GetBranchActorMsg(branch)).await?? { + Ok(HttpResponse::Ok().json( + addr.send(GetBuildLogLinesMsg { + build_num, + start: info.start, + num_lines: info.num_lines, + }) + .await??, + )) + } else { + Err(ApiMessage::new_with_status( + StatusCode::NOT_FOUND, + "Not found", + )) } - std::thread::sleep(poll_interval); + } else { + Err(ApiMessage::new_with_status( + StatusCode::NOT_FOUND, + "Not found", + )) } } -fn clone_commit( - url: &str, - branch: &str, - commit_hash: &str, - dir: &PathBuf, - auth: Option<&GitAuth>, -) -> Result<(), Box> { - let mut callbacks = git2::RemoteCallbacks::new(); - callbacks.credentials( - |_user: &str, user_from_url: Option<&str>, _cred: git2::CredentialType| match auth { - Some(a) => match a { - GitAuth::PrivateKey { path, passphrase } => git2::Cred::ssh_key( - user_from_url.unwrap_or("git"), - None, - std::path::Path::new(path), - passphrase.as_ref().map(|s| s.as_str()), - ), - }, - None => git2::Cred::default(), - }, - ); - let mut fo = FetchOptions::new(); - fo.remote_callbacks(callbacks); - - let repo = RepoBuilder::new() - .fetch_options(fo) - .branch(branch) - .clone(url, dir)?; - - let oid = git2::Oid::from_str(commit_hash)?; - let commit = repo.find_commit(oid)?; - - repo.branch(commit_hash, &commit, false)?; - let obj = repo.revparse_single(&("refs/heads/".to_owned() + commit_hash))?; - repo.checkout_tree(&obj, None)?; - repo.set_head(&("refs/heads/".to_owned() + commit_hash))?; - - Ok(()) +#[post("/jobs/{jobId}/branches/{branch}/builds")] +async fn force_build( + path: web::Path<(String, String)>, + data: web::Data, +) -> Result { + let path = path.into_inner(); + let job_id = path.0; + let branch = path.1; + if let GetJobActorResponse(Some(addr)) = data.root.send(GetJobActorMsg(job_id)).await?? { + if let Some(addr) = addr.send(GetBranchActorMsg(branch)).await?? { + addr.do_send(BuildNowMsg); + return Err(ApiMessage::new_with_status(StatusCode::OK, "OK")); + } + } + Err(ApiMessage::new_with_status( + StatusCode::NOT_FOUND, + "Not found", + )) } -fn get_branch_hash( - url: &str, - branch: &str, - auth: Option<&GitAuth>, -) -> Result> { - let tmp_dir = TempDir::new()?; - let repo = Repository::init(tmp_dir.path())?; - - let mut callbacks = git2::RemoteCallbacks::new(); - callbacks.credentials( - |_user: &str, user_from_url: Option<&str>, _cred: git2::CredentialType| match auth { - Some(a) => match a { - GitAuth::PrivateKey { path, passphrase } => git2::Cred::ssh_key( - user_from_url.unwrap_or("git"), - None, - std::path::Path::new(path), - passphrase.as_ref().map(|s| s.as_str()), - ), - }, - None => git2::Cred::default(), - }, - ); - - let mut remote = repo.remote("origin", url)?; - let connection = remote.connect_auth(Direction::Fetch, Some(callbacks), None)?; - let l = connection.list()?.iter().find(|head| { - let rf = head.name(); - rf == format!("refs/heads/{}", branch) - }); - match l { - Some(rf) => Ok(rf.oid().to_string()), - _ => Err("Could not find branch".into()), +#[delete("/jobs/{jobId}/branches/{branch}/builds/{build_num}")] +async fn abort_build( + path: web::Path<(String, String, u64)>, + data: web::Data, +) -> Result { + let path = path.into_inner(); + let job_id = path.0; + let branch = path.1; + let build_num = path.2; + if let GetJobActorResponse(Some(addr)) = data.root.send(GetJobActorMsg(job_id)).await?? { + if let Some(addr) = addr.send(GetBranchActorMsg(branch)).await?? { + if let Some(addr) = addr.send(GetBuildActorMsg(build_num)).await?? { + addr.do_send(StopBuildMessage); + return Err(ApiMessage::new_with_status(StatusCode::OK, "OK")); + } + } } + Err(ApiMessage::new_with_status( + StatusCode::NOT_FOUND, + "Not found", + )) } diff --git a/src/models.rs b/src/models.rs index f4f6d47..9aac243 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,39 +1,145 @@ -use serde::{Serialize, Deserialize}; +use std::{collections::HashSet, path::PathBuf}; +use serde::{Deserialize, Serialize}; + +/// A workspace containing build jobs #[derive(Serialize, Deserialize, Clone)] pub struct Workspace { pub jobs: Vec, } -#[derive(Serialize, Deserialize, Clone)] +impl Workspace { + pub fn from_dir_path( + path: &PathBuf, + ) -> std::result::Result> { + println!("Initing thingy in workspace {:?}", &path); + + let ws_yaml_path = path.clone().join("thingy.yaml"); + + if !ws_yaml_path.exists() { + std::fs::write(&ws_yaml_path, serde_yaml::to_string(&Workspace { + jobs: vec![], + })?)?; + } + + let md = std::fs::metadata(&ws_yaml_path); + if let Err(err) = &md { + return Err(format!( + "Could not read config from {:?}. Exiting. Does the file exist? Error: {:?}", + &ws_yaml_path, &err + ) + .into()); + } + let md = md.unwrap(); + if !md.is_file() { + return Err(format!("{:?} is not a regular file. Exiting.", &ws_yaml_path).into()); + } + let contents = std::fs::read_to_string(&ws_yaml_path); + if let Err(err) = &contents { + return Err(format!( + "Could not read {:?}. Exiting. Error: {:?}", + &ws_yaml_path, &err + ) + .into()); + } + let contents = contents.unwrap(); + let ws = serde_yaml::from_str::(&contents); + + if let Err(err) = &ws { + return Err(format!( + "Could not read {:?}. Exiting. Does the file contain valid YAML? Error: {:?}", + &ws_yaml_path, &err + ) + .into()); + } + + let mut ws = ws.unwrap(); + let names: Vec<&str> = ws.jobs.iter().map(|j| j.name.trim()).collect(); + + let mut uniq = HashSet::<&str>::new(); + for n in names { + if n.is_empty() { + return Err("Found job with empty name. Exiting.".into()); + } + if uniq.contains(n) { + return Err(format!("Workspace config contains duplicate jobs with name '{}'. Note that names are trimmed when read. Exiting.", n).into()); + } + uniq.insert(n); + } + + for j in &mut ws.jobs { + if let Err(err) = &j.validate() { + return Err( + format!("Configuration for {} is invalid: {}. Exiting.", j.name, err).into(), + ); + } + } + + // ensure job dirs + for j in &ws.jobs { + let name = j.name.trim(); + let dir = path.join(name); + + if dir.is_file() { + return Err(format!("{:?} is a file. Expected directory or nothing.", &dir).into()); + } + + if !dir.exists() { + if let Err(err) = std::fs::create_dir_all(&dir) { + return Err(format!( + "Could not create job dir {:?}. Exiting. Error: {:?}", + &dir, &err + ) + .into()); + } + } + } + + Ok(ws) + } +} + +/// A build job +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct Job { + /// name of the job, must be unique within a workspace pub name: String, + /// Git fetch URL pub repo_url: String, - pub branch: String, + /// Path to script in repository which will be called pub build_script: String, - pub poll_interval_seconds: u64, + /// Interval in seconds to wait before polling for changes + pub poll_interval_seconds: Option, + /// Authentication for Git fetch, if required pub auth: Option, } impl Job { - pub fn validate(&self) -> Result<(), String> { + pub fn validate(&mut self) -> Result<(), String> { + if self.repo_url.trim().is_empty() { + return Err("Repository url is empty.".into()); + } + + if self.build_script.trim().is_empty() { + return Err("Build script path is empty.".into()); + } + + if self.poll_interval_seconds.eq(&Some(0)) { + return Err("Poll interval must be > 0.".into()); + } + Ok(()) } } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum GitAuth { PrivateKey { path: String, passphrase: Option, }, -} - -#[derive(Debug, Clone)] -pub enum JobEvent { - Log { - job: String, - line: String, - is_stderr: bool, + UserPass { + username: String, + password: String, }, } diff --git a/src/thingy.rs b/src/thingy.rs new file mode 100644 index 0000000..179faca --- /dev/null +++ b/src/thingy.rs @@ -0,0 +1,144 @@ +use std::{ + collections::HashMap, + fs::create_dir_all, + io::{Error, ErrorKind}, + path::PathBuf, +}; + +use crate::{job_actor::JobActor, models::*}; +use actix::prelude::*; + +pub struct Thingy { + pub workpace: Workspace, + pub dir: PathBuf, + pub job_actors: HashMap>, +} + +impl Thingy { + pub fn new(workpace: Workspace, dir: PathBuf) -> Self { + Thingy { + workpace, + dir, + job_actors: HashMap::new(), + } + } + + pub fn sync_ws_to_disk(&self) -> Result<(), std::io::Error> { + let file_path = self.dir.join("thingy.yaml"); + let yaml = serde_yaml::to_string(&self.workpace) + .map_err(|_e| Error::new(ErrorKind::Other, "Could not write yaml"))?; + std::fs::write(&file_path, yaml)?; + + Ok(()) + } +} + +#[derive(Message, Debug)] +#[rtype(result = "Result")] +pub struct GetJobsMsg; +#[derive(Debug)] +pub struct GetJobResponse(pub Vec); + +#[derive(Message, Debug)] +#[rtype(result = "Result")] +pub struct GetJobActorMsg(pub String); +#[derive(Debug)] +pub struct GetJobActorResponse(pub Option>); + +#[derive(Message, Debug)] +#[rtype(result = "Result<(), std::io::Error>")] +pub struct AddJobMsg(pub Job); + +#[derive(Message, Debug)] +#[rtype(result = "Result<(), std::io::Error>")] +pub struct RemoveJobMsg(pub String); + +impl Actor for Thingy { + type Context = Context; + + fn started(&mut self, _ctx: &mut Context) { + for j in &self.workpace.jobs { + let d = self.dir.join(j.name.clone()); + let ja = JobActor::new(j.clone(), d).start(); + self.job_actors.insert(j.name.clone(), ja); + } + } + + fn stopped(&mut self, _ctx: &mut Context) {} +} + +impl Handler for Thingy { + type Result = Result; + + fn handle(&mut self, _msg: GetJobsMsg, _ctx: &mut Self::Context) -> Self::Result { + Ok(GetJobResponse(self.workpace.jobs.clone())) + } +} + +impl Handler for Thingy { + type Result = Result; + + fn handle(&mut self, msg: GetJobActorMsg, _ctx: &mut Self::Context) -> Self::Result { + let addr = self + .job_actors + .iter() + .find(|(k, _)| k.eq(&&msg.0)) + .map(|(_, v)| v.clone()); + Ok(GetJobActorResponse(addr)) + } +} + +impl Handler for Thingy { + type Result = Result<(), Error>; + + fn handle(&mut self, msg: AddJobMsg, _ctx: &mut Self::Context) -> Self::Result { + let mut job = msg.0; + if let Err(s) = job.validate() { + return Err(Error::new(ErrorKind::Other, s.as_str())); + } + if self + .workpace + .jobs + .iter() + .find(|j| j.name.eq(&job.name)) + .is_some() + { + return Err(Error::new( + ErrorKind::Other, + "Job with this name already exists", + )); + } + self.workpace.jobs.push(job.clone()); + + let d = self.dir.join(job.name.clone()); + create_dir_all(&d)?; + let ja = JobActor::new(job.clone(), d).start(); + self.job_actors.insert(job.name.clone(), ja); + + self.sync_ws_to_disk() + } +} + +impl Handler for Thingy { + type Result = Result<(), Error>; + + fn handle(&mut self, _msg: RemoveJobMsg, _ctx: &mut Self::Context) -> Self::Result { + // Remove the job actor's address from this actor. This is the only place to hold job actor's address, + // so removing it will stop the job actor. + self.job_actors = self + .job_actors + .clone() + .into_iter() + .filter(|ja| !ja.0.eq(&_msg.0)) + .collect(); + self.workpace.jobs = self + .workpace + .jobs + .clone() + .into_iter() + .filter(|j| !j.name.eq(&_msg.0)) + .collect(); + + self.sync_ws_to_disk() + } +} diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..9deff47 --- /dev/null +++ b/static/index.html @@ -0,0 +1,673 @@ + + + + + + thingy.rs + + + + + + + + + + + + +