diff --git a/Cargo.lock b/Cargo.lock index ad60376e6..6761f252b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,6 +19,7 @@ dependencies = [ "hyper 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jobserver 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "jsonwebtoken 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", @@ -27,6 +28,7 @@ dependencies = [ "lru-disk-cache 0.1.0", "mio-named-pipes 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "openssl 0.9.13 (registry+https://github.com/rust-lang/crates.io-index)", "redis 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -414,6 +416,15 @@ name = "itoa" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "jobserver" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "jsonwebtoken" version = "2.0.1" @@ -1385,6 +1396,7 @@ dependencies = [ "checksum iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b6e8b9c2247fcf6c6a1151f1156932be5606c9fd6f55a2d7f9fc1cb29386b2f7" "checksum itertools 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "772a0928a97246167d59a2a4729df5871f1327ab8b36fd24c4224b229cb47b99" "checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c" +"checksum jobserver 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "443ae8bc0af6c106e6e8b77e04684faecc1a5ce94e058f4c2b0a037b0ea1b133" "checksum jsonwebtoken 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ded1c69eb0de78a0abce9f7987dc832613abb168029868271e8cba843f45a3b3" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" diff --git a/Cargo.toml b/Cargo.toml index 2369b65d9..c91df8b49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,12 +28,14 @@ futures = "0.1.11" futures-cpupool = "0.1" hyper = { version = "0.11", optional = true } hyper-tls = { version = "0.1", optional = true } +jobserver = "0.1" jsonwebtoken = { version = "2.0", optional = true } libc = "0.2.10" local-encoding = "0.2.0" log = "0.3.6" lru-disk-cache = { path = "lru-disk-cache", version = "0.1.0" } native-tls = "0.1" +num_cpus = "1.0" number_prefix = "0.2.5" openssl = { version = "0.9", optional = true } redis = { version = "0.8.0", optional = true } diff --git a/src/cache/disk.rs b/src/cache/disk.rs index b5ee2de3b..a71c68b51 100644 --- a/src/cache/disk.rs +++ b/src/cache/disk.rs @@ -18,7 +18,6 @@ use cache::{ CacheWrite, Storage, }; -use futures::Future; use futures_cpupool::CpuPool; use lru_disk_cache::LruDiskCache; use lru_disk_cache::Error as LruError; @@ -62,7 +61,7 @@ impl Storage for DiskCache { let path = make_key_path(key); let lru = self.lru.clone(); let key = key.to_owned(); - self.pool.spawn_fn(move || { + Box::new(self.pool.spawn_fn(move || { let mut lru = lru.lock().unwrap(); let f = match lru.get(&path) { Ok(f) => f, @@ -78,7 +77,7 @@ impl Storage for DiskCache { }; let hit = CacheRead::from(f)?; Ok(Cache::Hit(hit)) - }).boxed() + })) } fn put(&self, key: &str, entry: CacheWrite) -> SFuture { @@ -87,12 +86,12 @@ impl Storage for DiskCache { trace!("DiskCache::finish_put({})", key); let lru = self.lru.clone(); let key = make_key_path(key); - self.pool.spawn_fn(move || { + Box::new(self.pool.spawn_fn(move || { let start = Instant::now(); let v = entry.finish()?; lru.lock().unwrap().insert_bytes(key, &v)?; Ok(start.elapsed()) - }).boxed() + })) } fn location(&self) -> String { diff --git a/src/commands.rs b/src/commands.rs index d5d5e08ed..23ba817df 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -18,6 +18,7 @@ use client::{ ServerConnection, }; use cmdline::{Command, StatsFormat}; +use jobserver::Client; use log::LogLevel::Trace; use mock_command::{ CommandCreatorSync, @@ -601,9 +602,10 @@ pub fn run_command(cmd: Command) -> Result { } Command::Compile { exe, cmdline, cwd, env_vars } => { trace!("Command::Compile {{ {:?}, {:?}, {:?} }}", exe, cmdline, cwd); + let jobserver = unsafe { Client::new() }; let conn = connect_or_start_server(get_port())?; let mut core = Core::new()?; - let res = do_compile(ProcessCommandCreator::new(&core.handle()), + let res = do_compile(ProcessCommandCreator::new(&core.handle(), &jobserver), &mut core, conn, exe.as_ref(), diff --git a/src/compiler/compiler.rs b/src/compiler/compiler.rs index 72c2c34ff..730049e41 100644 --- a/src/compiler/compiler.rs +++ b/src/compiler/compiler.rs @@ -470,14 +470,13 @@ fn detect_compiler(creator: &T, executable: &Path, pool: &CpuPool) }; let is_rustc = if filename.to_string_lossy().to_lowercase() == "rustc" { // Sanity check that it's really rustc. + let executable = executable.to_path_buf(); let child = creator.clone().new_command_sync(&executable) .stdout(Stdio::piped()) .stderr(Stdio::null()) .args(&["--version"]) - .spawn().chain_err(|| { - format!("failed to execute {:?}", executable) - }); - let output = child.into_future().and_then(move |child| { + .spawn(); + let output = child.and_then(move |child| { child.wait_with_output() .chain_err(|| "failed to read child output") }); @@ -530,10 +529,7 @@ gcc let output = write.and_then(move |(tempdir, src)| { cmd.arg("-E").arg(src); trace!("compiler {:?}", cmd); - let child = cmd.spawn().chain_err(|| { - format!("failed to execute {:?}", cmd) - }); - child.into_future().and_then(|child| { + cmd.spawn().and_then(|child| { child.wait_with_output().chain_err(|| "failed to read child output") }).map(|e| { drop(tempdir); @@ -724,11 +720,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); let cwd = f.tempdir.path(); let arguments = ovec!["-c", "foo.c", "-o", "foo.o"]; @@ -805,11 +799,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); let cwd = f.tempdir.path(); let arguments = ovec!["-c", "foo.c", "-o", "foo.o"]; @@ -887,11 +879,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); let cwd = f.tempdir.path(); let arguments = ovec!["-c", "foo.c", "-o", "foo.o"]; @@ -954,11 +944,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); } let cwd = f.tempdir.path(); diff --git a/src/jobserver.rs b/src/jobserver.rs new file mode 100644 index 000000000..b3ce0ac55 --- /dev/null +++ b/src/jobserver.rs @@ -0,0 +1,71 @@ +extern crate jobserver; + +use std::io; +use std::process::Command; +use std::sync::Arc; + +use futures::prelude::*; +use futures::sync::mpsc; +use futures::sync::oneshot; +use num_cpus; + +use errors::*; + +pub use self::jobserver::Acquired; + +#[derive(Clone)] +pub struct Client { + helper: Arc, + inner: jobserver::Client, + tx: mpsc::UnboundedSender>> +} + +impl Client { + // unsafe because `from_env` is unsafe (can use the wrong fds) + pub unsafe fn new() -> Client { + match jobserver::Client::from_env() { + Some(c) => Client::_new(c), + None => Client::new_num(num_cpus::get()), + } + } + + pub fn new_num(num: usize) -> Client { + let inner = jobserver::Client::new(num) + .expect("failed to create jobserver"); + Client::_new(inner) + } + + fn _new(inner: jobserver::Client) -> Client { + let (tx, rx) = mpsc::unbounded::>(); + let mut rx = rx.wait(); + let helper = inner.clone().into_helper_thread(move |token| { + if let Some(Ok(sender)) = rx.next() { + drop(sender.send(token)); + } + }).expect("failed to spawn helper thread"); + + Client { + inner: inner, + helper: Arc::new(helper), + tx: tx, + } + } + + /// Configures this jobserver to be inherited by the specified command + pub fn configure(&self, cmd: &mut Command) { + self.inner.configure(cmd) + } + + /// Returns a future that represents an acquired jobserver token. + /// + /// This should be invoked before any "work" is spawend (for whatever the + /// defnition of "work" is) to ensure that the system is properly + /// rate-limiting itself. + pub fn acquire(&self) -> SFuture { + let (tx, rx) = oneshot::channel(); + self.helper.request_token(); + self.tx.unbounded_send(tx).unwrap(); + Box::new(rx.chain_err(|| "jobserver helper panicked") + .and_then(|t| t.chain_err(|| "failed to acquire jobserver token"))) + } +} diff --git a/src/main.rs b/src/main.rs index e26d65e5a..1d780c77a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,6 +52,7 @@ extern crate libc; #[cfg(windows)] extern crate mio_named_pipes; extern crate native_tls; +extern crate num_cpus; extern crate number_prefix; #[cfg(feature = "openssl")] extern crate openssl; @@ -93,6 +94,7 @@ mod client; mod cmdline; mod commands; mod compiler; +mod jobserver; mod mock_command; mod protocol; mod server; diff --git a/src/mock_command.rs b/src/mock_command.rs index a131594c5..b282424fa 100644 --- a/src/mock_command.rs +++ b/src/mock_command.rs @@ -47,7 +47,9 @@ #[cfg(unix)] use libc; +use errors::*; use futures::future::{self, Future}; +use jobserver::{Acquired, Client}; use std::boxed::Box; use std::ffi::{OsStr, OsString}; use std::fmt; @@ -61,7 +63,7 @@ use std::process::{ }; use std::sync::{Arc,Mutex}; use tokio_process::{ - Child, + self, ChildStderr, ChildStdin, ChildStdout, @@ -120,7 +122,7 @@ pub trait RunCommand: fmt::Debug { /// Set the process' stderr from `cfg`. fn stderr(&mut self, cfg: Stdio) -> &mut Self; /// Execute the process and return a process object. - fn spawn(&mut self) -> io::Result; + fn spawn(&mut self) -> SFuture; } /// A trait that provides a means to create objects implementing `RunCommand`. @@ -133,7 +135,7 @@ pub trait CommandCreator { type Cmd: RunCommand; /// Create a new instance of this type. - fn new(handle: &Handle) -> Self; + fn new(handle: &Handle, client: &Client) -> Self; /// Create a new object that implements `RunCommand` that can be used /// to create a new process. fn new_command>(&mut self, program: S) -> Self::Cmd; @@ -143,42 +145,63 @@ pub trait CommandCreator { pub trait CommandCreatorSync: Clone + 'static { type Cmd: RunCommand; - fn new(handle: &Handle) -> Self; + fn new(handle: &Handle, client: &Client) -> Self; fn new_command_sync>(&mut self, program: S) -> Self::Cmd; } +pub struct Child { + inner: tokio_process::Child, + token: Acquired, +} + /// Trivial implementation of `CommandChild` for `std::process::Child`. impl CommandChild for Child { type I = ChildStdin; type O = ChildStdout; type E = ChildStderr; - fn take_stdin(&mut self) -> Option { self.stdin().take() } - fn take_stdout(&mut self) -> Option { self.stdout().take() } - fn take_stderr(&mut self) -> Option { self.stderr().take() } + fn take_stdin(&mut self) -> Option { self.inner.stdin().take() } + fn take_stdout(&mut self) -> Option { self.inner.stdout().take() } + fn take_stderr(&mut self) -> Option { self.inner.stderr().take() } fn wait(self) -> Box> { - Box::new(self) + let Child { inner, token } = self; + Box::new(inner.map(|ret| { + drop(token); + ret + })) } fn wait_with_output(self) -> Box> { - Box::new(self.wait_with_output()) + let Child { inner, token } = self; + Box::new(inner.wait_with_output().map(|ret| { + drop(token); + ret + })) } } pub struct AsyncCommand { - inner: Command, + inner: Option, handle: Handle, + jobserver: Client, } impl AsyncCommand { - pub fn new>(program: S, handle: Handle) -> AsyncCommand { + pub fn new>(program: S, + handle: Handle, + jobserver: Client) -> AsyncCommand { AsyncCommand { - inner: Command::new(program), + inner: Some(Command::new(program)), handle: handle, + jobserver: jobserver, } } + + fn inner(&mut self) -> &mut Command { + self.inner.as_mut().expect("can't reuse commands") + } } /// Trivial implementation of `RunCommand` for `std::process::Command`. @@ -186,18 +209,18 @@ impl RunCommand for AsyncCommand { type C = Child; fn arg>(&mut self, arg: S) -> &mut AsyncCommand { - self.inner.arg(arg); + self.inner().arg(arg); self } fn args>(&mut self, args: &[S]) -> &mut AsyncCommand { - self.inner.args(args); + self.inner().args(args); self } fn env(&mut self, key: K, val: V) -> &mut AsyncCommand where K: AsRef, V: AsRef, { - self.inner.env(key, val); + self.inner().env(key, val); self } fn envs(&mut self, vars: I) -> &mut Self @@ -206,16 +229,16 @@ impl RunCommand for AsyncCommand { //TODO: when Command::envs stabilizes, use that: // https://github.com/rust-lang/rust/issues/38526 for (k, v) in vars { - self.inner.env(k, v); + self.inner().env(k, v); } self } fn env_clear(&mut self) -> &mut AsyncCommand { - self.inner.env_clear(); + self.inner().env_clear(); self } fn current_dir>(&mut self, dir: P) -> &mut AsyncCommand { - self.inner.current_dir(dir); + self.inner().current_dir(dir); self } @@ -223,7 +246,7 @@ impl RunCommand for AsyncCommand { fn no_console(&mut self) -> &mut AsyncCommand { use std::os::windows::process::CommandExt; const CREATE_NO_WINDOW: u32 = 0x08000000; - self.inner.creation_flags(CREATE_NO_WINDOW); + self.inner().creation_flags(CREATE_NO_WINDOW); self } @@ -233,19 +256,30 @@ impl RunCommand for AsyncCommand { } fn stdin(&mut self, cfg: Stdio) -> &mut AsyncCommand { - self.inner.stdin(cfg); + self.inner().stdin(cfg); self } fn stdout(&mut self, cfg: Stdio) -> &mut AsyncCommand { - self.inner.stdout(cfg); + self.inner().stdout(cfg); self } fn stderr(&mut self, cfg: Stdio) -> &mut AsyncCommand { - self.inner.stderr(cfg); + self.inner().stderr(cfg); self } - fn spawn(&mut self) -> io::Result { - self.inner.spawn_async(&self.handle) + fn spawn(&mut self) -> SFuture { + let mut inner = self.inner.take().unwrap(); + inner.env_remove("MAKEFLAGS"); + inner.env_remove("MFLAGS"); + inner.env_remove("CARGO_MAKEFLAGS"); + self.jobserver.configure(&mut inner); + let handle = self.handle.clone(); + Box::new(self.jobserver.acquire().and_then(move |token| { + let child = inner.spawn_async(&handle).chain_err(|| { + format!("failed to spawn {:?}", inner) + })?; + Ok(Child { inner: child, token: token }) + })) } } @@ -259,20 +293,22 @@ impl fmt::Debug for AsyncCommand { #[derive(Clone)] pub struct ProcessCommandCreator { handle: Handle, + jobserver: Client, } /// Trivial implementation of `CommandCreator` for `ProcessCommandCreator`. impl CommandCreator for ProcessCommandCreator { type Cmd = AsyncCommand; - fn new(handle: &Handle) -> ProcessCommandCreator { + fn new(handle: &Handle, client: &Client) -> ProcessCommandCreator { ProcessCommandCreator { handle: handle.clone(), + jobserver: client.clone(), } } fn new_command>(&mut self, program: S) -> AsyncCommand { - AsyncCommand::new(program, self.handle.clone()) + AsyncCommand::new(program, self.handle.clone(), self.jobserver.clone()) } } @@ -280,8 +316,8 @@ impl CommandCreator for ProcessCommandCreator { impl CommandCreatorSync for ProcessCommandCreator { type Cmd = AsyncCommand; - fn new(handle: &Handle) -> ProcessCommandCreator { - CommandCreator::new(handle) + fn new(handle: &Handle, client: &Client) -> ProcessCommandCreator { + CommandCreator::new(handle, client) } fn new_command_sync>(&mut self, program: S) -> AsyncCommand { @@ -357,7 +393,7 @@ impl CommandChild for MockChild { fn take_stderr(&mut self) -> Option>> { self.stderr.take() } fn wait(mut self) -> Box> { - future::result(self.wait_result.take().unwrap()).boxed() + Box::new(future::result(self.wait_result.take().unwrap())) } @@ -370,13 +406,13 @@ impl CommandChild for MockChild { stderr: stderr.map(|c| c.into_inner()).unwrap_or(vec!()), }) }); - future::result(result).boxed() + Box::new(future::result(result)) } } pub enum ChildOrCall { - Child(io::Result), - Call(Box io::Result + Send>), + Child(Result), + Call(Box Result + Send>), } impl fmt::Debug for ChildOrCall { @@ -437,10 +473,10 @@ impl RunCommand for MockCommand { fn stderr(&mut self, _cfg: Stdio) -> &mut MockCommand { self } - fn spawn(&mut self) -> io::Result { + fn spawn(&mut self) -> SFuture { match self.child.take().unwrap() { - ChildOrCall::Child(c) => c, - ChildOrCall::Call(f) => f(&self.args), + ChildOrCall::Child(c) => Box::new(future::result(c)), + ChildOrCall::Call(f) => Box::new(future::result(f(&self.args))), } } } @@ -455,7 +491,7 @@ pub struct MockCommandCreator { impl MockCommandCreator { /// The next `MockCommand` created will return `child` from `RunCommand::spawn`. #[allow(dead_code)] - pub fn next_command_spawns(&mut self, child: io::Result) { + pub fn next_command_spawns(&mut self, child: Result) { self.children.push(ChildOrCall::Child(child)); } @@ -463,7 +499,7 @@ impl MockCommandCreator { /// arguments passed to the command. #[allow(dead_code)] pub fn next_command_calls(&mut self, call: C) - where C: Fn(&[OsString]) -> io::Result + Send + 'static, + where C: Fn(&[OsString]) -> Result + Send + 'static, { self.children.push(ChildOrCall::Call(Box::new(call))); } @@ -472,7 +508,7 @@ impl MockCommandCreator { impl CommandCreator for MockCommandCreator { type Cmd = MockCommand; - fn new(_handle: &Handle) -> MockCommandCreator { + fn new(_handle: &Handle, _client: &Client) -> MockCommandCreator { MockCommandCreator { children: Vec::new(), } @@ -492,8 +528,8 @@ impl CommandCreator for MockCommandCreator { impl CommandCreatorSync for Arc> { type Cmd = T::Cmd; - fn new(handle: &Handle) -> Arc> { - Arc::new(Mutex::new(T::new(handle))) + fn new(handle: &Handle, client: &Client) -> Arc> { + Arc::new(Mutex::new(T::new(handle, client))) } fn new_command_sync>(&mut self, program: S) -> T::Cmd { @@ -507,6 +543,7 @@ mod test { use std::error::Error; use std::ffi::OsStr; use std::io; + use jobserver::Client; use futures::Future; use std::process::{ ExitStatus, @@ -517,24 +554,24 @@ mod test { use test::utils::*; use tokio_core::reactor::Core; - fn spawn_command>(creator : &mut T, program: S) -> io::Result<<::Cmd as RunCommand>::C> { - creator.new_command(program).spawn() + fn spawn_command>(creator : &mut T, program: S) -> Result<<::Cmd as RunCommand>::C> { + creator.new_command(program).spawn().wait() } - fn spawn_wait_command>(creator : &mut T, program: S) -> io::Result { - spawn_command(creator, program).and_then(|c| c.wait().wait()) + fn spawn_wait_command>(creator : &mut T, program: S) -> Result { + Ok(spawn_command(creator, program)?.wait().wait()?) } - fn spawn_output_command>(creator : &mut T, program: S) -> io::Result { - spawn_command(creator, program).and_then(|c| c.wait_with_output().wait()) + fn spawn_output_command>(creator : &mut T, program: S) -> Result { + Ok(spawn_command(creator, program)?.wait_with_output().wait()?) } fn spawn_on_thread(mut t : T, really : bool) -> ExitStatus { thread::spawn(move || { if really { - t.new_command_sync("foo").spawn().and_then(|c| { - c.wait().wait() - }).unwrap() + t.new_command_sync("foo") + .spawn().wait().unwrap() + .wait().wait().unwrap() } else { exit_status(1) } @@ -544,7 +581,8 @@ mod test { #[test] fn test_mock_command_wait() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_spawns(Ok(MockChild::new(exit_status(0), "hello", "error"))); assert_eq!(0, spawn_wait_command(&mut creator, "foo").unwrap().code().unwrap()); } @@ -555,14 +593,16 @@ mod test { // If next_command_spawns hasn't been called enough times, // new_command should panic. let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); - creator.new_command("foo").spawn().unwrap(); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); + creator.new_command("foo").spawn().wait().unwrap(); } #[test] fn test_mock_command_output() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_spawns(Ok(MockChild::new(exit_status(0), "hello", "error"))); let output = spawn_output_command(&mut creator, "foo").unwrap(); assert_eq!(0, output.status.code().unwrap()); @@ -573,7 +613,8 @@ mod test { #[test] fn test_mock_command_calls() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_calls(|_| { Ok(MockChild::new(exit_status(0), "hello", "error")) }); @@ -586,27 +627,28 @@ mod test { #[test] fn test_mock_spawn_error() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); - creator.next_command_spawns(Err(io::Error::new(io::ErrorKind::Other, "error"))); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); + creator.next_command_spawns(Err("error".into())); let e = spawn_command(&mut creator, "foo").err().unwrap(); - assert_eq!(io::ErrorKind::Other, e.kind()); assert_eq!("error", e.description()); } #[test] fn test_mock_wait_error() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_spawns(Ok(MockChild::with_error(io::Error::new(io::ErrorKind::Other, "error")))); let e = spawn_wait_command(&mut creator, "foo").err().unwrap(); - assert_eq!(io::ErrorKind::Other, e.kind()); assert_eq!("error", e.description()); } #[test] fn test_mock_command_sync() { let core = Core::new().unwrap(); - let creator = Arc::new(Mutex::new(MockCommandCreator::new(&core.handle()))); + let client = Client::new_num(1); + let creator = Arc::new(Mutex::new(MockCommandCreator::new(&core.handle(), &client))); next_command(&creator, Ok(MockChild::new(exit_status(0), "hello", "error"))); assert_eq!(exit_status(0), spawn_on_thread(creator.clone(), true)); } diff --git a/src/server.rs b/src/server.rs index e85ecac1e..f70ff62e7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -31,6 +31,7 @@ use futures::sync::mpsc; use futures::task::{self, Task}; use futures::{Stream, Sink, Async, AsyncSink, Poll, StartSend, Future}; use futures_cpupool::CpuPool; +use jobserver::Client; use mock_command::{ CommandCreatorSync, ProcessCommandCreator, @@ -121,10 +122,11 @@ fn get_signal(_status: ExitStatus) -> i32 { /// requests a shutdown. pub fn start_server(port: u16) -> Result<()> { trace!("start_server"); + let client = unsafe { Client::new() }; let core = Core::new()?; let pool = CpuPool::new(20); let storage = storage_from_environment(&pool, &core.handle()); - let res = SccacheServer::::new(port, pool, core, storage); + let res = SccacheServer::::new(port, pool, core, client, storage); let notify = env::var_os("SCCACHE_STARTUP_NOTIFY"); match res { Ok(srv) => { @@ -152,6 +154,7 @@ impl SccacheServer { pub fn new(port: u16, pool: CpuPool, core: Core, + client: Client, storage: Arc) -> Result> { let handle = core.handle(); let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port); @@ -161,7 +164,12 @@ impl SccacheServer { // connections. let (tx, rx) = mpsc::channel(1); let (wait, info) = WaitUntilZero::new(); - let service = SccacheService::new(storage, core.handle(), pool, tx, info); + let service = SccacheService::new(storage, + core.handle(), + &client, + pool, + tx, + info); Ok(SccacheServer { core: core, @@ -389,6 +397,7 @@ impl SccacheService { pub fn new(storage: Arc, handle: Handle, + client: &Client, pool: CpuPool, tx: mpsc::Sender, info: ActiveInfo) -> SccacheService { @@ -397,7 +406,7 @@ impl SccacheService storage: storage, compilers: Rc::new(RefCell::new(HashMap::new())), pool: pool, - creator: C::new(&handle), + creator: C::new(&handle, client), handle: handle, tx: tx, info: info, diff --git a/src/simples3/credential.rs b/src/simples3/credential.rs index d8e3c5f22..9aebb30a8 100644 --- a/src/simples3/credential.rs +++ b/src/simples3/credential.rs @@ -87,7 +87,7 @@ pub struct EnvironmentProvider; impl ProvideAwsCredentials for EnvironmentProvider { fn credentials(&self) -> SFuture { - future::result(credentials_from_environment()).boxed() + Box::new(future::result(credentials_from_environment())) } } @@ -189,7 +189,7 @@ impl ProvideAwsCredentials for ProfileProvider { let result = result.and_then(|mut profiles| { profiles.remove(self.profile()).ok_or("profile not found".into()) }); - future::result(result).boxed() + Box::new(future::result(result)) } } diff --git a/src/test/tests.rs b/src/test/tests.rs index 8f7009b6a..4c0e9a02f 100644 --- a/src/test/tests.rs +++ b/src/test/tests.rs @@ -202,11 +202,9 @@ fn test_server_compile() { let obj = f.tempdir.path().join("file.o"); c.next_command_calls(move |_| { // Pretend to compile something. - match File::create(&obj) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), STDOUT, STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&obj)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), STDOUT, STDERR)) }); } // Ask the server to compile something. diff --git a/src/test/utils.rs b/src/test/utils.rs index 33d3ef172..122289daf 100644 --- a/src/test/utils.rs +++ b/src/test/utils.rs @@ -26,6 +26,9 @@ use std::sync::{Arc,Mutex}; use tempdir::TempDir; use tokio_core::reactor::Core; +use jobserver::Client; +use errors::*; + /// Return a `Vec` with each listed entry converted to an owned `String`. macro_rules! stringvec { ( $( $x:expr ),* ) => { @@ -69,15 +72,16 @@ macro_rules! assert_map_contains { pub fn new_creator() -> Arc> { let core = Core::new().unwrap(); - Arc::new(Mutex::new(MockCommandCreator::new(&core.handle()))) + let client = unsafe { Client::new() }; + Arc::new(Mutex::new(MockCommandCreator::new(&core.handle(), &client))) } pub fn next_command(creator : &Arc>, - child: io::Result) { + child: Result) { creator.lock().unwrap().next_command_spawns(child); } -pub fn next_command_calls io::Result + Send + 'static>(creator: &Arc>, call: C) { +pub fn next_command_calls Result + Send + 'static>(creator: &Arc>, call: C) { creator.lock().unwrap().next_command_calls(call); } diff --git a/src/util.rs b/src/util.rs index d19e830d1..f1c1039be 100644 --- a/src/util.rs +++ b/src/util.rs @@ -13,7 +13,6 @@ // limitations under the License. use futures::Future; -use futures::future; use futures_cpupool::CpuPool; use mock_command::{CommandChild, RunCommand}; use ring::digest::{SHA512, Context}; @@ -139,10 +138,9 @@ pub fn run_input_output(mut command: C, input: Option>) .stdin(if input.is_some() { Stdio::piped() } else { Stdio::inherit() }) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .spawn() - .chain_err(|| "failed to spawn child"); + .spawn(); - Box::new(future::result(child) + Box::new(child .and_then(|child| { wait_with_input_output(child, input).and_then(|output| { if output.status.success() {