From 957e0a2773e7cafd87efed457096aaf4baa0b79c Mon Sep 17 00:00:00 2001 From: Clay McLeod Date: Fri, 7 Feb 2025 17:03:32 -0600 Subject: [PATCH] WIP --- Cargo.lock | 70 ++++---- Cargo.toml | 2 +- crankshaft-config/src/backend.rs | 3 +- crankshaft-config/src/backend/tes.rs | 6 + .../src/service/runner/backend/docker.rs | 2 +- .../src/service/runner/backend/tes.rs | 153 ++++++++---------- crankshaft-engine/src/task.rs | 78 ++++++++- crankshaft-engine/src/task/execution.rs | 23 +++ crankshaft-engine/src/task/input.rs | 32 ++++ crankshaft-engine/src/task/input/contents.rs | 13 ++ crankshaft-engine/src/task/output.rs | 24 +++ crankshaft-engine/src/task/resources.rs | 16 ++ 12 files changed, 304 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74e8036..c32ae70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -423,7 +423,7 @@ dependencies = [ "encode_unicode", "libc", "once_cell", - "unicode-width", + "unicode-width 0.2.0", "windows-sys 0.59.0", ] @@ -1335,7 +1335,7 @@ dependencies = [ "console", "number_prefix", "portable-atomic", - "unicode-width", + "unicode-width 0.2.0", "web-time", ] @@ -1482,6 +1482,29 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "miette" +version = "7.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a955165f87b37fd1862df2a59547ac542c77ef6d17c666f619d1ad22dd89484" +dependencies = [ + "cfg-if", + "miette-derive", + "thiserror 1.0.69", + "unicode-width 0.1.14", +] + +[[package]] +name = "miette-derive" +version = "7.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf45bf44ab49be92fd1227a3be6fc6f617f1a337c06af54981048574d8783147" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "mime" version = "0.3.17" @@ -1589,9 +1612,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.2" +version = "1.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" [[package]] name = "openssl" @@ -1643,17 +1666,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" -[[package]] -name = "ordered-float" -version = "4.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bb71e1b3fa6ca1c61f383464aaf2bb0e2f8e772a1f01d486832464de363b951" -dependencies = [ - "num-traits", - "rand 0.8.5", - "serde", -] - [[package]] name = "ordered-multimap" version = "0.7.3" @@ -1857,7 +1869,6 @@ dependencies = [ "libc", "rand_chacha 0.3.1", "rand_core 0.6.4", - "serde", ] [[package]] @@ -1868,7 +1879,7 @@ checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.0", - "zerocopy 0.8.16", + "zerocopy 0.8.17", ] [[package]] @@ -1898,7 +1909,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ "getrandom 0.2.15", - "serde", ] [[package]] @@ -1908,7 +1918,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff" dependencies = [ "getrandom 0.3.1", - "zerocopy 0.8.16", + "zerocopy 0.8.17", ] [[package]] @@ -2496,14 +2506,14 @@ dependencies = [ [[package]] name = "tes" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3faa059a151f94ccfecf89d7b092e3bd508c606d5a34421abfef1d9d038cc688" +checksum = "438921ffd4bf0917edb3e314d9fc05c667b31ba614480a2da5038ea6f8310e6d" dependencies = [ "anyhow", "base64 0.21.7", "chrono", - "ordered-float", + "miette", "reqwest", "reqwest-middleware", "reqwest-retry", @@ -2862,6 +2872,12 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + [[package]] name = "unicode-width" version = "0.2.0" @@ -3400,11 +3416,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b8c07a70861ce02bad1607b5753ecb2501f67847b9f9ada7c160fff0ec6300c" +checksum = "aa91407dacce3a68c56de03abe2760159582b846c6a4acd2f456618087f12713" dependencies = [ - "zerocopy-derive 0.8.16", + "zerocopy-derive 0.8.17", ] [[package]] @@ -3420,9 +3436,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5226bc9a9a9836e7428936cde76bb6b22feea1a8bfdbc0d241136e4d13417e25" +checksum = "06718a168365cad3d5ff0bb133aad346959a2074bd4a85c121255a11304a8626" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 838e4b3..f0667f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ shlex = "1.3.0" ssh2 = "0.9.4" tar = "0.4.41" tempfile = "3.12.0" -tes = { version = "0.3.0", features = ["client", "serde"] } +tes = { version = "0.4.0", features = ["client", "serde"] } thiserror = "2.0.11" tokio = { version = "1.40.0", features = ["full", "time", "tracing"] } tokio-metrics = "0.3.1" diff --git a/crankshaft-config/src/backend.rs b/crankshaft-config/src/backend.rs index 6abdd4e..0d3a70e 100644 --- a/crankshaft-config/src/backend.rs +++ b/crankshaft-config/src/backend.rs @@ -57,7 +57,8 @@ impl Config { self.defaults.as_ref() } - /// Consumes `self` returns the constituent parts of the [`Config`]. + /// Consumes `self` returns the constituent, owned parts of the + /// configuration. pub fn into_parts(self) -> (String, Kind, usize, Option) { (self.name, self.kind, self.max_tasks, self.defaults) } diff --git a/crankshaft-config/src/backend/tes.rs b/crankshaft-config/src/backend/tes.rs index ffc511f..0be9e7f 100644 --- a/crankshaft-config/src/backend/tes.rs +++ b/crankshaft-config/src/backend/tes.rs @@ -31,4 +31,10 @@ impl Config { pub fn http(&self) -> &http::Config { &self.http } + + /// Consumes `self` and returns the constituent, owned parts of the + /// configuration. + pub fn into_parts(self) -> (Url, http::Config) { + (self.url, self.http) + } } diff --git a/crankshaft-engine/src/service/runner/backend/docker.rs b/crankshaft-engine/src/service/runner/backend/docker.rs index 995aa3f..e2f9861 100644 --- a/crankshaft-engine/src/service/runner/backend/docker.rs +++ b/crankshaft-engine/src/service/runner/backend/docker.rs @@ -98,7 +98,7 @@ impl crate::Backend for Backend { // (2) Upload inputs to the container. // // TODO(clay): these could be cached. - for task in task.inputs().cloned().map(|i| { + for task in task.inputs().map(|i| { let container = container.clone(); tokio::spawn(async move { let contents = i.fetch().await; diff --git a/crankshaft-engine/src/service/runner/backend/tes.rs b/crankshaft-engine/src/service/runner/backend/tes.rs index 1fad2b3..c6286ee 100644 --- a/crankshaft-engine/src/service/runner/backend/tes.rs +++ b/crankshaft-engine/src/service/runner/backend/tes.rs @@ -23,6 +23,7 @@ use tes::v1::Client; use tes::v1::client::tasks::View; use tracing::debug; use tracing::error; +use tracing::trace; use crate::Task; @@ -35,16 +36,25 @@ pub struct Backend { impl Backend { /// AttemptsCreates a new [`Backend`]. + /// + /// # Examples + /// + /// ``` + /// use crankshaft::engine; + /// ``` pub fn initialize(config: Config) -> Self { - let mut builder = Client::builder().url(config.url().to_owned()); + let (url, config) = config.into_parts(); - if let Some(token) = config.http().basic_auth_token() { + let mut builder = Client::builder().url(url); + + if let Some(token) = config.basic_auth_token() { builder = builder.insert_header("Authorization", format!("Basic {}", token)); } Self { - // SAFETY: this is manually constructed to always build. - client: Arc::new(builder.try_build().expect("client did not build")), + // SAFETY: the only required field of `builder` is the `url`, which + // we provided earlier. + client: Arc::new(builder.try_build().expect("client to build")), } } } @@ -58,7 +68,7 @@ impl crate::Backend for Backend { /// Runs a task in a backend. fn run(&self, task: Task) -> Result>>> { let client = self.client.clone(); - let task = to_tes_task(task); + let task = tes::v1::types::Task::from(task); Ok(async move { let task_id = client.create_task(task).await?.id; @@ -76,99 +86,70 @@ impl crate::Backend for Backend { debug!("state was found for {task_id}"); if !state.is_executing() { debug!("task is completed for {task_id}"); - // let mut results = task - // .logs - // .unwrap() - // .into_iter() - // .flat_map(|task| task.logs) - // .map(|log| { - // let status = - // log.exit_code.expect("exit code to be present"); - - // #[cfg(unix)] - // let output = Output { - // status: ExitStatus::from_raw(status as i32), - // stdout: log - // .stdout - // .unwrap_or_default() - // .as_bytes() - // .to_vec(), - // stderr: log - // .stderr - // .unwrap_or_default() - // .as_bytes() - // .to_vec(), - // }; - - // #[cfg(windows)] - // let output = Output { - // status: ExitStatus::from_raw(status), - // stdout: log - // .stdout - // .unwrap_or_default() - // .as_bytes() - // .to_vec(), - // stderr: log - // .stderr - // .unwrap_or_default() - // .as_bytes() - // .to_vec(), - // }; - - // output - // }); - - // let mut outputs = NonEmpty::new(results.next().unwrap()); - // outputs.extend(results); - let outputs = NonEmpty::new(Output { - status: ExitStatus::from_raw(0), - stdout: Vec::new(), - stderr: Vec::new(), - }); + let mut results = task + .logs + .unwrap() + .into_iter() + .flat_map(|task| task.logs) + .map(|log| { + let status = + log.exit_code.expect("exit code to be present"); + + #[cfg(unix)] + let output = Output { + status: ExitStatus::from_raw(status as i32), + stdout: log + .stdout + .unwrap_or_default() + .as_bytes() + .to_vec(), + stderr: log + .stderr + .unwrap_or_default() + .as_bytes() + .to_vec(), + }; + + #[cfg(windows)] + let output = Output { + status: ExitStatus::from_raw(status), + stdout: log + .stdout + .unwrap_or_default() + .as_bytes() + .to_vec(), + stderr: log + .stderr + .unwrap_or_default() + .as_bytes() + .to_vec(), + }; + + output + }); + + // SAFETY: at least one set of logs is always + // expected to be returned from the server. + // TODO(clay): we should probably change this to a + // recoverable error. + let mut outputs = NonEmpty::new(results.next().unwrap()); + outputs.extend(results); return Ok(outputs); } else { - debug!("task was NOT completed for {task_id}; looping..."); + trace!("task was NOT completed for {task_id}; looping..."); } } else { - debug!("state was NOT set for {task_id}; looping..."); + trace!("state was NOT set for {task_id}; looping..."); } + // TODO(clay): make this configurable. tokio::time::sleep(Duration::from_millis(200)).await; } - Err(err) => error!("error: {err}"), + Err(err) => error!("{err}"), } } } .boxed()) } } - -/// Translates a [`Task`] to a [TES Task](tes::v1::types::Task) for submission. -fn to_tes_task(task: Task) -> tes::v1::types::Task { - // NOTE: a name is not required by the TES specification, so it is kept as - // empty if no name is provided. - let name = task.name().map(|v| v.to_owned()); - let description = task.description().map(|v| v.to_owned()); - - let executors = task - .executions() - .map(|execution| { - let mut command = Vec::with_capacity(1 + execution.args().len()); - command.push(execution.program().to_string()); - command.extend(execution.args().iter().cloned()); - tes::v1::types::task::Executor { - image: execution.image().to_owned(), - command, - ..Default::default() - } - }) - .collect::>(); - - tes::v1::types::Task { - name, - description, - executors, - ..Default::default() - } -} diff --git a/crankshaft-engine/src/task.rs b/crankshaft-engine/src/task.rs index 4d53215..b805dc5 100644 --- a/crankshaft-engine/src/task.rs +++ b/crankshaft-engine/src/task.rs @@ -4,6 +4,10 @@ use std::sync::Arc; use bon::Builder; use nonempty::NonEmpty; +use tes::v1::types::task::Executor; +use tes::v1::types::task::Input as TesInput; +use tes::v1::types::task::Output as TesOutput; +use tes::v1::types::task::Resources as TesResources; pub mod execution; pub mod input; @@ -65,8 +69,8 @@ impl Task { } /// Gets the inputs for the task (if any exist). - pub fn inputs(&self) -> impl Iterator> { - self.inputs.iter() + pub fn inputs(&self) -> impl Iterator> + use<'_> { + self.inputs.iter().cloned() } /// Gets the outputs for the task (if any exist). @@ -88,4 +92,74 @@ impl Task { pub fn shared_volumes(&self) -> impl Iterator { self.volumes.iter().map(|v| v.as_str()) } + + /// Consumes `self` and returns the constituent, owned parts of the task. + #[allow(clippy::type_complexity)] + pub fn into_parts( + self, + ) -> ( + Option, + Option, + Vec>, + Vec, + Option, + NonEmpty, + Vec, + ) { + ( + self.name, + self.description, + self.inputs, + self.outputs, + self.resources, + self.executions, + self.volumes, + ) + } +} + +impl From for tes::v1::types::Task { + fn from(task: Task) -> Self { + let (name, description, inputs, outputs, resources, executions, volumes) = + task.into_parts(); + + let inputs = inputs + .into_iter() + .map(|input| TesInput::from((*input).clone())) + .collect::>(); + + let inputs = if inputs.is_empty() { + None + } else { + Some(inputs) + }; + + let outputs = outputs + .into_iter() + .map(|output| TesOutput::from(output.clone())) + .collect::>(); + + let outputs = if outputs.is_empty() { + None + } else { + Some(outputs) + }; + + let executors = executions.map(Executor::from).into_iter().collect(); + let resources = resources.map(TesResources::from); + + if !volumes.is_empty() { + todo!("volumes are not yet supported within Crankshaft"); + } + + tes::v1::types::Task { + name, + description, + inputs, + outputs, + executors, + resources, + ..Default::default() + } + } } diff --git a/crankshaft-engine/src/task/execution.rs b/crankshaft-engine/src/task/execution.rs index d5cc702..78d5003 100644 --- a/crankshaft-engine/src/task/execution.rs +++ b/crankshaft-engine/src/task/execution.rs @@ -1,5 +1,7 @@ //! A unit of executable work. +use std::collections::HashMap; + use bon::Builder; use indexmap::IndexMap; @@ -84,3 +86,24 @@ impl Execution { &self.env } } + +impl From for tes::v1::types::task::Executor { + fn from(execution: Execution) -> Self { + let env = execution + .env + .into_iter() + .collect::>(); + + let env = if env.is_empty() { None } else { Some(env) }; + + tes::v1::types::task::Executor { + image: execution.image.to_owned(), + command: execution.args.to_vec(), + workdir: execution.work_dir, + stdin: execution.stdin, + stdout: execution.stdout, + stderr: execution.stderr, + env, + } + } +} diff --git a/crankshaft-engine/src/task/input.rs b/crankshaft-engine/src/task/input.rs index eb048b5..9890c5d 100644 --- a/crankshaft-engine/src/task/input.rs +++ b/crankshaft-engine/src/task/input.rs @@ -69,6 +69,17 @@ impl Input { &self.ty } + /// Consumes `self` and returns the constituent, owned parts of the input. + pub fn into_parts(self) -> (Option, Option, Contents, String, Type) { + ( + self.name, + self.description, + self.contents, + self.path, + self.ty, + ) + } + /// Fetches the file contents via an [`AsyncRead`]er. pub async fn fetch(&self) -> Vec { match &self.contents { @@ -90,3 +101,24 @@ impl Input { } } } + +impl From for tes::v1::types::task::Input { + fn from(input: Input) -> Self { + let (name, description, contents, path, ty) = input.into_parts(); + let (url, content) = contents.one_hot(); + + let r#type = match ty { + Type::File => tes::v1::types::task::file::Type::File, + Type::Directory => tes::v1::types::task::file::Type::Directory, + }; + + tes::v1::types::task::Input { + name, + description, + url: url.map(|url| url.to_string()), + path, + r#type, + content, + } + } +} diff --git a/crankshaft-engine/src/task/input/contents.rs b/crankshaft-engine/src/task/input/contents.rs index 1e2128c..d7ea30a 100644 --- a/crankshaft-engine/src/task/input/contents.rs +++ b/crankshaft-engine/src/task/input/contents.rs @@ -29,4 +29,17 @@ impl Contents { pub fn url_from_str(url: impl AsRef) -> Result { url.as_ref().parse().map(Self::Url).map_err(Error::ParseUrl) } + + /// Consumes `self` and one hot encodes the inner contents. + /// + /// * The first value is the [`Url`] if the type is [`Contents::Url`]. Else, + /// the value is [`None`]. + /// * The second value is the literal contents as a [`String`] if the type + /// is [`Contents::Literal`]. Else, the value is [`None`]. + pub fn one_hot(self) -> (Option, Option) { + match self { + Contents::Url(url) => (Some(url), None), + Contents::Literal(value) => (None, Some(value)), + } + } } diff --git a/crankshaft-engine/src/task/output.rs b/crankshaft-engine/src/task/output.rs index 905efee..0695896 100644 --- a/crankshaft-engine/src/task/output.rs +++ b/crankshaft-engine/src/task/output.rs @@ -63,4 +63,28 @@ impl Output { pub fn ty(&self) -> &Type { &self.ty } + + /// Consumes `self` and returns the constituent, owned parts of the output. + pub fn into_parts(self) -> (Option, Option, Url, String, Type) { + (self.name, self.description, self.url, self.path, self.ty) + } +} + +impl From for tes::v1::types::task::Output { + fn from(output: Output) -> Self { + let (name, description, url, path, ty) = output.into_parts(); + + let r#type = match ty { + Type::File => tes::v1::types::task::file::Type::File, + Type::Directory => tes::v1::types::task::file::Type::Directory, + }; + + tes::v1::types::task::Output { + name, + description, + url: url.to_string(), + path, + r#type, + } + } } diff --git a/crankshaft-engine/src/task/resources.rs b/crankshaft-engine/src/task/resources.rs index 62a7897..4e89f10 100644 --- a/crankshaft-engine/src/task/resources.rs +++ b/crankshaft-engine/src/task/resources.rs @@ -162,3 +162,19 @@ impl From<&Resources> for HostConfig { host_config } } + +impl From for tes::v1::types::task::Resources { + fn from(resources: Resources) -> Self { + if !resources.zones.is_empty() { + todo!("zones within resources are not yet implemented in Crankshaft"); + } + + tes::v1::types::task::Resources { + cpu_cores: resources.cpu().map(|inner| inner as i64), + preemptible: resources.preemptible(), + ram_gb: resources.ram(), + disk_gb: resources.disk(), + zones: None, + } + } +}