diff --git a/RELEASES.md b/RELEASES.md index cb871fc..b1c225c 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -5,6 +5,6 @@ Version 0.4 is a complete rewrite of Intecture's core API. If we had been runnin Anyway, there are some big headlines in this release: - **Hello Tokio!** This is an exciting change to our socket API, as we bid a fond farewell to ZeroMQ. The change to Tokio will bring loads of benefits, such as substantially reducing our non-Rust dependencies, and introducing strongly typed messaging between servers and clients. This will make our communications more robust and less open to exploitation. Woo hoo! -- **Asynchronous endpoints with `Futures`!** This is another huge change to the API, allowing us to multitask our configurations. It will also pave the way for a streaming API, which has been sorely lacking in Intecture until now. +- **Asynchronous, streaming endpoints with `Futures`!** This is another huge change to the API, which allows configuration tasks to be run in parallel; moreover it allows users to stream task output in realtime. Gone are the days of waiting in the dark for tasks to complete! - **Greater emphasis on composability.** Each endpoint (formerly called _primitives_) is organised into a collection of _providers_ that will (you guessed it) provide target-specific implementations of the endpoint. Users will be able to select an endpoint provider manually or let the system choose the best provider for the target platform. - **Separation of duties.** In the previous versions of Intecture, the API had been organised into a single project, making it cluttered and unwieldy. For the next release, things like the FFI, language bindings and project boilerplate will be moved into separate child projects under the same Cargo workspace. diff --git a/agent/src/errors.rs b/agent/src/errors.rs index 4470628..9c3b6ef 100644 --- a/agent/src/errors.rs +++ b/agent/src/errors.rs @@ -4,10 +4,42 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. +use error_chain::ChainedError; +use futures::Future; use intecture_api; +use std::{convert, error, io}; error_chain! { links { Api(intecture_api::errors::Error, intecture_api::errors::ErrorKind); } } + +impl convert::From for io::Error { + fn from(e: Error) -> io::Error { + io::Error::new(io::ErrorKind::Other, format!("{}", e.display_chain())) + } +} + +// @todo This should disappear once Futures are officially supported +// by error_chain. +// See: https://github.com/rust-lang-nursery/error-chain/issues/90 +pub type SFuture = Box>; + +pub trait FutureChainErr { + fn chain_err(self, callback: F) -> SFuture + where F: FnOnce() -> E + 'static, + E: Into; +} + +impl FutureChainErr for F + where F: Future + 'static, + F::Error: error::Error + Send + 'static, +{ + fn chain_err(self, callback: C) -> SFuture + where C: FnOnce() -> E + 'static, + E: Into, + { + Box::new(self.then(|r| r.chain_err(callback))) + } +} diff --git a/agent/src/main.rs b/agent/src/main.rs index 6092e0c..134fce2 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -18,16 +18,18 @@ extern crate toml; mod errors; +use error_chain::ChainedError; use errors::*; use futures::{future, Future}; -use intecture_api::remote::{Executable, Runnable}; use intecture_api::host::local::Local; -use intecture_api::host::remote::JsonProto; +use intecture_api::host::remote::{JsonLineProto, LineMessage}; +use intecture_api::remote::{Executable, Request, ResponseResult}; use std::fs::File; use std::io::{self, Read}; use std::net::SocketAddr; use std::sync::Arc; use tokio_core::reactor::Remote; +use tokio_proto::streaming::Message; use tokio_proto::TcpServer; use tokio_service::{NewService, Service}; @@ -37,21 +39,20 @@ pub struct Api { } impl Service for Api { - type Request = serde_json::Value; - type Response = serde_json::Value; - type Error = io::Error; + type Request = LineMessage; + type Response = LineMessage; + type Error = Error; type Future = Box>; fn call(&self, req: Self::Request) -> Self::Future { - let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") { + let req = match req { + Message::WithBody(req, _) => req, + Message::WithoutBody(req) => req, + }; + + let request: Request = match serde_json::from_value(req).chain_err(|| "Could not deserialize Request") { Ok(r) => r, - Err(e) => return Box::new( - future::err( - io::Error::new( - // @todo Can't wrap 'e' as error_chain Error doesn't derive Sync. - // Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163 - io::ErrorKind::Other, e.description() - ))), + Err(e) => return Box::new(future::ok(error_to_msg(e))), }; // XXX Danger zone! If we're running multiple threads, this `unwrap()` @@ -60,21 +61,30 @@ impl Service for Api { // only safe for the current thread. // See https://github.com/alexcrichton/tokio-process/issues/23 let handle = self.remote.handle().unwrap(); - Box::new(runnable.exec(&self.host, &handle) - // @todo Can't wrap 'e' as error_chain Error doesn't derive Sync. - // Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163 - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.description())) - .and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") { - Ok(v) => future::ok(v), - Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())), + Box::new(request.exec(&self.host, &handle) + .chain_err(|| "Failed to execute Request") + .then(|req| { + match req { + Ok(mut msg) => { + let body = msg.take_body(); + match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize Result") { + Ok(v) => match body { + Some(b) => future::ok(Message::WithBody(v, b)), + None => future::ok(Message::WithoutBody(v)), + }, + Err(e) => future::ok(error_to_msg(e)), + } + }, + Err(e) => future::ok(error_to_msg(e)), + } })) } } impl NewService for Api { - type Request = serde_json::Value; - type Response = serde_json::Value; - type Error = io::Error; + type Request = LineMessage; + type Response = LineMessage; + type Error = Error; type Instance = Api; fn new_service(&self) -> io::Result { Ok(Api { @@ -129,7 +139,7 @@ quick_main!(|| -> Result<()> { // Currently we force the issue (`unwrap()`), which is only safe // for the current thread. // See https://github.com/alexcrichton/tokio-process/issues/23 - let server = TcpServer::new(JsonProto, config.address); + let server = TcpServer::new(JsonLineProto, config.address); server.with_handle(move |handle| { let api = Api { host: host.clone(), @@ -139,3 +149,12 @@ quick_main!(|| -> Result<()> { }); Ok(()) }); + +fn error_to_msg(e: Error) -> LineMessage { + let response = ResponseResult::Err(format!("{}", e.display_chain())); + // If we can't serialize this, we can't serialize anything, so + // panicking is appropriate. + let value = serde_json::to_value(response) + .expect("Cannot serialize ResponseResult::Err. This is bad..."); + Message::WithoutBody(value) +} diff --git a/core/examples/basic.rs b/core/examples/basic.rs index 3588606..098526e 100644 --- a/core/examples/basic.rs +++ b/core/examples/basic.rs @@ -8,22 +8,58 @@ extern crate futures; extern crate intecture_api; extern crate tokio_core; -use futures::Future; +use futures::{Future, Stream}; use intecture_api::prelude::*; use tokio_core::reactor::Core; fn main() { + // These two lines are part of `tokio-core` and can be safely + // ignored. So long as they appear at the top of your code, + // all is fine with the world. let mut core = Core::new().unwrap(); let handle = core.handle(); + // Here's the meat of your project. In this example we're talking + // to our local machine, so we use the `Local` host type. let host = Local::new().and_then(|host| { - Command::new(&host, "whoami", None).and_then(|mut cmd| { - cmd.exec(&handle).map(|out| { - println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim()); + // Ok, we're in! Now we can pass our `host` handle to other + // endpoints, which informs them of the server we mean to + // talk to. + + // Let's start with something basic - a shell command. + Command::new(&host, "whoami", None).and_then(|cmd| { + // Now that we have our `Command` instance, let's run it. + cmd.exec(&handle).and_then(|(stream, status)| { + // At this point, our command is running. As the API + // is asynchronous, we don't have to wait for it to + // finish before inspecting its output. This is called + // "streaming". + + // Our first argument, `stream`, is a stream of strings, + // each of which represents a line of output. We can use + // the `for_each` combinator to print these lines to + // stdout. + // + // If printing isn't your thing, you are also + // free to lick them or whatever you're into. I'm not + // here to judge. + stream.for_each(|line| { println!("{}", line); Ok(()) }) + + // The second argument is a `Future` that represents the + // command's exit status. Let's print that too*. + // + // * Same caveat as above RE: printing. This is a safe + // place. + .join(status.map(|s| println!("This command {} {}", + if s.success { "succeeded" } else { "failed" }, + if let Some(e) = s.code { format!("with code {}", e) } else { String::new() }))) }) }) }); + // This line is part of `tokio-core` and is used to execute the + // chain of futures you've created above. You'll need to call + // `core.run()` for each host you interact with, otherwise your + // project will not run at all! core.run(host).unwrap(); } - diff --git a/core/examples/remote_host.rs b/core/examples/remote_host.rs index d62e5ef..d447cce 100644 --- a/core/examples/remote_host.rs +++ b/core/examples/remote_host.rs @@ -13,12 +13,28 @@ use intecture_api::prelude::*; use tokio_core::reactor::Core; fn main() { + // These two lines are part of `tokio-core` and can be safely + // ignored. So long as they appear at the top of your code, + // all is fine with the world. let mut core = Core::new().unwrap(); let handle = core.handle(); + // Here's the meat of your project. In this example we're talking + // to a remote machine. You'll note that this is the `Plain` host + // type, where you might have been expecting `Remote` or some such. + // This is to signify that this host type sends data in the clear, + // rather than encrypting it. Thus the usual disclaimer about + // secure networks and trust applies. let host = Plain::connect("127.0.0.1:7101", &handle).map(|host| { + // Ok, we're in! Now we can pass our `host` handle to other + // endpoints, which informs them of the server we mean to + // talk to. See basic.rs for more usage. println!("Connected to {}", host.telemetry().hostname); }); + // This line is part of `tokio-core` and is used to execute the + // chain of futures you've created above. You'll need to call + // `core.run()` for each host you interact with, otherwise your + // project will not run at all! core.run(host).unwrap(); } diff --git a/core/src/command/mod.rs b/core/src/command/mod.rs index c1892fd..1c4a5d5 100644 --- a/core/src/command/mod.rs +++ b/core/src/command/mod.rs @@ -10,6 +10,7 @@ pub mod providers; use errors::*; use futures::{future, Future}; +use futures::stream::Stream; use host::Host; use self::providers::CommandProvider; use tokio_core::reactor::Handle; @@ -27,11 +28,9 @@ pub struct Command { } #[derive(Debug, Serialize, Deserialize)] -pub struct CommandResult { +pub struct ExitStatus { pub success: bool, - pub exit_code: Option, - pub stdout: Vec, - pub stderr: Vec, + pub code: Option, } impl Command { @@ -68,7 +67,12 @@ impl Command { } } - pub fn exec(&mut self, handle: &Handle) -> Box> { + pub fn exec(&self, handle: &Handle) -> + Box>, + Box> + ), Error = Error>> + { self.inner.exec(&self.host, handle, &self.cmd, &self.shell) } } diff --git a/core/src/command/providers/generic.rs b/core/src/command/providers/generic.rs index f63ae3b..87a9eb1 100644 --- a/core/src/command/providers/generic.rs +++ b/core/src/command/providers/generic.rs @@ -4,32 +4,33 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use command::CommandResult; -use erased_serde::Serialize; +use command::providers::ExitStatus; use errors::*; use futures::{future, Future}; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::sync::{mpsc, oneshot}; use host::{Host, HostType}; use host::local::Local; use host::remote::Plain; use provider::Provider; -use remote::{Executable, Runnable}; -use std::process; -use super::{CommandProvider, CommandRunnable}; +use remote::{CommandRequest, CommandResponse, Executable, ExecutableResult, + GenericRequest, Request, Response, ResponseResult}; +use serde_json; +use std::io::{self, BufReader}; +use std::process::{Command, Stdio}; +use std::result; +use super::CommandProvider; use tokio_core::reactor::Handle; +use tokio_io::io::lines; use tokio_process::CommandExt; +use tokio_proto::streaming::{Body, Message}; #[derive(Clone)] pub struct Generic; struct LocalGeneric; struct RemoteGeneric; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum GenericRunnable { - Available, - Exec(String, Vec), -} - impl Provider for Generic { fn available(host: &H) -> Box> { match host.get_type() { @@ -52,7 +53,12 @@ impl Provider for Generic { } impl CommandProvider for Generic { - fn exec(&self, host: &H, handle: &Handle, cmd: &str, shell: &[String]) -> Box> { + fn exec(&self, host: &H, handle: &Handle, cmd: &str, shell: &[String]) -> + Box>, + Box> + ), Error = Error>> + { match host.get_type() { HostType::Local(_) => LocalGeneric::exec(handle, cmd, shell), HostType::Remote(r) => RemoteGeneric::exec(r, cmd, shell), @@ -65,7 +71,12 @@ impl LocalGeneric { Box::new(future::ok(cfg!(unix))) } - fn exec(handle: &Handle, cmd: &str, shell: &[String]) -> Box> { + fn exec(handle: &Handle, cmd: &str, shell: &[String]) -> + Box>, + Box> + ), Error = Error>> + { let cmd = cmd.to_owned(); let shell = shell.to_owned(); let (shell, shell_args) = match shell.split_first() { @@ -73,45 +84,133 @@ impl LocalGeneric { None => return Box::new(future::err("Invalid shell provided".into())), }; - Box::new(process::Command::new(shell) + let child = Command::new(shell) .args(shell_args) .arg(&cmd) - .output_async(handle) - .chain_err(|| "Command execution failed") - .and_then(|output| { - future::ok(CommandResult { - success: output.status.success(), - exit_code: output.status.code(), - stdout: output.stdout, - stderr: output.stderr, - }) - })) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn_async(handle) + .chain_err(|| "Command execution failed"); + let mut child = match child { + Ok(c) => c, + Err(e) => return Box::new(future::err(e)), + }; + + let stdout = child.stdout().take().unwrap(); + let outbuf = BufReader::new(stdout); + let stderr = child.stderr().take().unwrap(); + let errbuf = BufReader::new(stderr); + let lines = lines(outbuf).select(lines(errbuf)); + + Box::new(future::ok( + (Box::new(lines.then(|r| r.chain_err(|| "Command execution failed"))) as Box>, + Box::new(child.then(|r| match r.chain_err(|| "Command execution failed") { + Ok(c) => future::ok(ExitStatus { + success: c.success(), + code: c.code(), + }), + Err(e) => future::err(e) + })) as Box>) + )) } } impl RemoteGeneric { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Command( - CommandRunnable::Generic( - GenericRunnable::Available)); - host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Command::Generic", func: "available" }) + let runnable = Request::Command( + CommandRequest::Generic( + GenericRequest::Available)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Command::Generic", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Command(CommandResponse::Available(b)) => b, + _ => unreachable!(), + })) } - fn exec(host: &Plain, cmd: &str, shell: &[String]) -> Box> { - let runnable = Runnable::Command( - CommandRunnable::Generic( - GenericRunnable::Exec(cmd.into(), shell.to_owned()))); - host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Command::Generic", func: "exec" }) + fn exec(host: &Plain, cmd: &str, shell: &[String]) -> + Box>, + Box> + ), Error = Error>> + { + let runnable = Request::Command( + CommandRequest::Generic( + GenericRequest::Exec(cmd.into(), shell.to_owned()))); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Command::Generic", func: "exec" }) + .map(|mut msg| { + let (tx, rx) = oneshot::channel::(); + let mut tx_share = Some(tx); + let mut found = false; + ( + Box::new(msg.take_body() + .expect("Command::exec reply missing body stream") + .filter_map(move |v| { + let s = String::from_utf8_lossy(&v).to_string(); + + // @todo This is a heuristical approach which is fallible + if !found && s.starts_with("ExitStatus:") { + let (_, json) = s.split_at(11); + match serde_json::from_str(json) { + Ok(status) => { + // @todo What should happen if this fails? + let _ = tx_share.take().unwrap().send(status); + found = true; + return None; + }, + _ => (), + } + } + + Some(s) + }) + .then(|r| r.chain_err(|| "Command execution failed")) + ) as Box>, + Box::new(rx.chain_err(|| "Buffer dropped before ExitStatus was sent")) + as Box> + ) + })) } } -impl Executable for GenericRunnable { - fn exec(self, _: &Local, handle: &Handle) -> Box, Error = Error>> { +impl Executable for GenericRequest { + fn exec(self, _: &Local, handle: &Handle) -> ExecutableResult { match self { - GenericRunnable::Available => Box::new(LocalGeneric::available().map(|b| Box::new(b) as Box)), - GenericRunnable::Exec(cmd, shell) => Box::new(LocalGeneric::exec(handle, &cmd, &shell).map(|r| Box::new(r) as Box)), + GenericRequest::Available => Box::new( + LocalGeneric::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Command( + CommandResponse::Available(b)))))), + GenericRequest::Exec(cmd, shell) => { + let handle = handle.clone(); + Box::new(LocalGeneric::exec(&handle, &cmd, &shell) + .and_then(move |(lines, status)| { + let (tx1, body) = Body::pair(); + let tx2 = tx1.clone(); + let msg = Message::WithBody(ResponseResult::Ok(Response::Command(CommandResponse::Exec)), body); + let stream = lines.map(|s| Ok(s.into_bytes())) + .forward(tx1.sink_map_err(|e| Error::with_chain(e, "Could not forward command output to Body"))) + .join(status.and_then(|s| match serde_json::to_string(&s) + .chain_err(|| "Could not serialize `ExitStatus` struct") + { + Ok(s) => { + let mut frame = "ExitStatus:".to_owned(); + frame.push_str(&s); + Box::new(tx2.send(Ok(frame.into_bytes())) + .map_err(|e| Error::with_chain(e, "Could not forward command output to Body")) + ) as Box, io::Error>>, Error = Error>> + }, + Err(e) => Box::new(future::err(e)), + })) + // @todo We should repatriate these errors somehow + .map(|_| ()) + .map_err(|_| ()); + handle.spawn(stream); + future::ok(msg) + })) + }, } } } diff --git a/core/src/command/providers/mod.rs b/core/src/command/providers/mod.rs index 427d0b9..58f54ce 100644 --- a/core/src/command/providers/mod.rs +++ b/core/src/command/providers/mod.rs @@ -6,33 +6,21 @@ mod generic; +use command::ExitStatus; use errors::*; -use erased_serde::Serialize; use futures::{future, Future}; +use futures::stream::Stream; use host::Host; -use host::local::Local; use provider::Provider; -use remote::Executable; -pub use self::generic::{Generic, GenericRunnable}; -use super::CommandResult; +pub use self::generic::Generic; use tokio_core::reactor::Handle; pub trait CommandProvider: Provider { - fn exec(&self, &H, &Handle, &str, &[String]) -> Box>; -} - -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum CommandRunnable { - Generic(GenericRunnable) -} - -impl Executable for CommandRunnable { - fn exec(self, host: &Local, handle: &Handle) -> Box, Error = Error>> { - match self { - CommandRunnable::Generic(p) => p.exec(host, handle) - } - } + fn exec(&self, &H, &Handle, &str, &[String]) -> + Box>, + Box> + ), Error = Error>>; } pub fn factory(host: &H) -> Box>, Error = Error>> { diff --git a/core/src/errors.rs b/core/src/errors.rs index 1e82d60..7a0efb4 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -26,7 +26,7 @@ error_chain! { display("No providers available for {}", p), } - Runnable { + Request { endpoint: &'static str, func: &'static str, } { @@ -34,6 +34,11 @@ error_chain! { display("Could not run {}::{}() on host", endpoint, func), } + Remote(e: String) { + description("Error running command on remote host"), + display("Error running command on remote host: {}", e), + } + SystemCommand(c: &'static str) { description("Error running system command"), display("Error running system command '{}'", c), diff --git a/core/src/host/mod.rs b/core/src/host/mod.rs index 2d17b27..c8dc589 100644 --- a/core/src/host/mod.rs +++ b/core/src/host/mod.rs @@ -15,7 +15,7 @@ use futures::Future; use telemetry::Telemetry; pub trait Host: Clone { - //// Retrieve Telemetry + /// Retrieve Telemetry fn telemetry(&self) -> &Telemetry; #[doc(hidden)] fn get_type<'a>(&'a self) -> HostType<'a>; diff --git a/core/src/host/remote.rs b/core/src/host/remote.rs index 4af2985..85ce4fe 100644 --- a/core/src/host/remote.rs +++ b/core/src/host/remote.rs @@ -4,25 +4,28 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use bytes::{BufMut, BytesMut}; +use bytes::BytesMut; use errors::*; use futures::{future, Future}; -use remote::Runnable; -use serde::Deserialize; +use remote::{Request, Response, ResponseResult}; use serde_json; use std::{io, result}; use std::sync::Arc; use std::net::SocketAddr; use super::{Host, HostType}; use telemetry::{self, Telemetry}; -use tokio_core::net::TcpStream; use tokio_core::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::{Encoder, Decoder, Framed}; -use tokio_proto::pipeline::{ClientProto, ClientService, ServerProto}; +use tokio_proto::streaming::{Body, Message}; +use tokio_proto::streaming::pipeline::{ClientProto, Frame, ServerProto}; use tokio_proto::TcpClient; +use tokio_proto::util::client_proxy::ClientProxy; use tokio_service::Service; +#[doc(hidden)] +pub type LineMessage = Message, io::Error>>; + /// A `Host` type that uses an unencrypted socket. /// /// *Warning! An unencrypted host is susceptible to eavesdropping and MITM @@ -34,12 +37,14 @@ pub struct Plain { } struct Inner { - inner: ClientService, + inner: ClientProxy, telemetry: Option, } -pub struct JsonCodec; -pub struct JsonProto; +pub struct JsonLineCodec { + decoding_head: bool, +} +pub struct JsonLineProto; impl Plain { /// Create a new Host connected to addr. @@ -51,7 +56,7 @@ impl Plain { info!("Connecting to host {}", addr); - Box::new(TcpClient::new(JsonProto) + Box::new(TcpClient::new(JsonLineProto) .connect(&addr, handle) .chain_err(|| "Could not connect to host") .and_then(|client_service| { @@ -75,19 +80,8 @@ impl Plain { } #[doc(hidden)] - pub fn run(&self, provider: Runnable) -> Box> - where for<'de> D: Deserialize<'de> - { - let value = match serde_json::to_value(provider).chain_err(|| "Could not encode provider to send to host") { - Ok(v) => v, - Err(e) => return Box::new(future::err(e)) - }; - Box::new(self.call(value) - .chain_err(|| "Error while running provider on host") - .and_then(|v| match serde_json::from_value::(v).chain_err(|| "Could not understand response from host") { - Ok(d) => future::ok(d), - Err(e) => future::err(e) - })) + pub fn call_req(&self, request: Request) -> Box, io::Error>>, Error = Error>> { + self.call(Message::WithoutBody(request)) } } @@ -102,68 +96,173 @@ impl Host for Plain { } impl Service for Plain { - type Request = serde_json::Value; - type Response = serde_json::Value; - type Error = io::Error; + type Request = Message, io::Error>>; + type Response = Message, io::Error>>; + type Error = Error; type Future = Box>; - fn call(&self, req: Self::Request) -> Self::Future { - Box::new(self.inner.inner.call(req)) as Self::Future + fn call(&self, mut req: Self::Request) -> Self::Future { + let body = req.take_body(); + let request = req.into_inner(); + + let value = match serde_json::to_value(request).chain_err(|| "Could not encode provider to send to host") { + Ok(v) => v, + Err(e) => return Box::new(future::err(e)) + }; + + debug!("Sending JSON request: {}", value); + + let json_msg = match body { + Some(b) => Message::WithBody(value, b), + None => Message::WithoutBody(value), + }; + + Box::new(self.inner.inner.call(json_msg) + .chain_err(|| "Error while running provider on host") + .and_then(|mut msg| { + let body = msg.take_body(); + let header = msg.into_inner(); + + debug!("Received JSON response: {}", header); + + let result: ResponseResult = match serde_json::from_value(header).chain_err(|| "Could not understand response from host") { + Ok(d) => d, + Err(e) => return Box::new(future::err(e)), + }; + + let msg = match result { + ResponseResult::Ok(msg) => msg, + ResponseResult::Err(e) => return Box::new(future::err(ErrorKind::Remote(e).into())), + }; + Box::new(future::ok(match body { + Some(b) => Message::WithBody(msg, b), + None => Message::WithoutBody(msg), + })) + })) } } -impl Decoder for JsonCodec { - type Item = serde_json::Value; +impl Decoder for JsonLineCodec { + type Item = Frame, io::Error>; type Error = io::Error; - fn decode(&mut self, buf: &mut BytesMut) -> result::Result, Self::Error> { - // Check to see if the frame contains a new line - if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') { - // remove the serialized frame from the buffer. - let line = buf.split_to(n); + fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { + let line = match buf.iter().position(|b| *b == b'\n') { + Some(n) => buf.split_to(n), + None => return Ok(None), + }; + + buf.split_to(1); - // Also remove the '\n' - buf.split_to(1); + if self.decoding_head { + debug!("Decoding header: {:?}", line); - return Ok(Some(serde_json::from_slice(&line).unwrap())); - } + // The last byte in this frame is a bool that indicates + // whether we have a body stream following or not. + // This byte must exist, or our codec is buggered and + // panicking is appropriate. + let (has_body, line) = line.split_last() + .expect("Missing body byte at end of message frame"); + + debug!("Body byte: {:?}", has_body); + + if *has_body == 1 { + self.decoding_head = false; + } + + let frame = Frame::Message { + message: serde_json::from_slice(&line).map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + })?, + body: *has_body == 1, + }; + + debug!("Decoded header: {:?}", frame); + + Ok(Some(frame)) + } else { + debug!("Decoding body chunk: {:?}", line); + + let frame = if line.is_empty() { + self.decoding_head = true; + Frame::Body { chunk: None } + } else { + Frame::Body { chunk: Some(line.to_vec()) } + }; + + debug!("Decoded body chunk: {:?}", frame); - Ok(None) + Ok(Some(frame)) + } } } -impl Encoder for JsonCodec { - type Item = serde_json::Value; +impl Encoder for JsonLineCodec { + type Item = Frame, io::Error>; type Error = io::Error; - fn encode(&mut self, value: Self::Item, buf: &mut BytesMut) -> io::Result<()> { - let json = serde_json::to_string(&value).unwrap(); - buf.reserve(json.len() + 1); - buf.extend(json.as_bytes()); - buf.put_u8(b'\n'); + fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> { + match msg { + Frame::Message { message, body } => { + debug!("Encoding header: {:?}, {:?}", message, body); + + let json = serde_json::to_vec(&message) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + buf.extend(&json); + // Add 'has-body' flag + buf.extend(if body { &[1] } else { &[0] }); + } + Frame::Body { chunk } => { + debug!("Encoding chunk: {:?}", chunk); + + if let Some(chunk) = chunk { + buf.extend(&chunk); + } + } + Frame::Error { error } => { + // @todo Support error frames + return Err(error) + } + } + + buf.extend(b"\n"); Ok(()) } } -impl ClientProto for JsonProto { +impl ClientProto for JsonLineProto { type Request = serde_json::Value; + type RequestBody = Vec; type Response = serde_json::Value; - type Transport = Framed; - type BindTransport = result::Result; + type ResponseBody = Vec; + type Error = io::Error; + type Transport = Framed; + type BindTransport = result::Result; fn bind_transport(&self, io: T) -> Self::BindTransport { - Ok(io.framed(JsonCodec)) + let codec = JsonLineCodec { + decoding_head: true, + }; + + Ok(io.framed(codec)) } } -impl ServerProto for JsonProto { +impl ServerProto for JsonLineProto { type Request = serde_json::Value; + type RequestBody = Vec; type Response = serde_json::Value; - type Transport = Framed; - type BindTransport = result::Result; + type ResponseBody = Vec; + type Error = io::Error; + type Transport = Framed; + type BindTransport = result::Result; fn bind_transport(&self, io: T) -> Self::BindTransport { - Ok(io.framed(JsonCodec)) + let codec = JsonLineCodec { + decoding_head: true, + }; + + Ok(io.framed(codec)) } } diff --git a/core/src/remote.rs b/core/src/remote.rs index 589015e..f02def4 100644 --- a/core/src/remote.rs +++ b/core/src/remote.rs @@ -4,29 +4,151 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use command::providers::CommandRunnable; -use erased_serde::Serialize; +// Hopefully in the near future this will be auto-generated from `derive` attributes. + use errors::*; use futures::Future; use host::local::Local; -use telemetry::providers::TelemetryRunnable; +use std::io; +use telemetry::serializable::Telemetry; use tokio_core::reactor::Handle; +use tokio_proto::streaming::{Body, Message}; + +pub type ExecutableResult = Box, io::Error>>, Error = Error>>; pub trait Executable { - fn exec(self, &Local, &Handle) -> Box, Error = Error>>; + fn exec(self, &Local, &Handle) -> ExecutableResult; +} + +#[derive(Serialize, Deserialize)] +pub enum Request { + Command(CommandRequest), + Telemetry(TelemetryRequest), +} + +#[derive(Serialize, Deserialize)] +pub enum Response { + Command(CommandResponse), + Telemetry(TelemetryResponse), +} + +#[derive(Serialize, Deserialize)] +pub enum ResponseResult { + Ok(Response), + Err(String), +} + +impl Executable for Request { + fn exec(self, host: &Local, handle: &Handle) -> ExecutableResult { + match self { + Request::Command(p) => p.exec(host, handle), + Request::Telemetry(p) => p.exec(host, handle), + } + } +} + +// +// Command +// + +#[derive(Serialize, Deserialize)] +pub enum CommandRequest { + Generic(GenericRequest), +} + +#[derive(Serialize, Deserialize)] +pub enum GenericRequest { + Available, + Exec(String, Vec), +} + +#[derive(Serialize, Deserialize)] +pub enum CommandResponse { + Available(bool), + Exec, +} + +impl Executable for CommandRequest { + fn exec(self, host: &Local, handle: &Handle) -> ExecutableResult { + match self { + CommandRequest::Generic(p) => p.exec(host, handle) + } + } +} + +// +// Telemetry +// + +#[derive(Serialize, Deserialize)] +pub enum TelemetryRequest { + Centos(CentosRequest), + Debian(DebianRequest), + Fedora(FedoraRequest), + Freebsd(FreebsdRequest), + Macos(MacosRequest), + Nixos(NixosRequest), + Ubuntu(UbuntuRequest), +} + +#[derive(Serialize, Deserialize)] +pub enum CentosRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum DebianRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum FedoraRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum FreebsdRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum MacosRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum NixosRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum UbuntuRequest { + Available, + Load, } #[derive(Serialize, Deserialize)] -pub enum Runnable { - Command(CommandRunnable), - Telemetry(TelemetryRunnable), +pub enum TelemetryResponse { + Available(bool), + Load(Telemetry), } -impl Executable for Runnable { - fn exec(self, host: &Local, handle: &Handle) -> Box, Error = Error>> { +impl Executable for TelemetryRequest { + fn exec(self, host: &Local, handle: &Handle) -> ExecutableResult { match self { - Runnable::Command(p) => p.exec(host, handle), - Runnable::Telemetry(p) => p.exec(host, handle), + TelemetryRequest::Centos(p) => p.exec(host, handle), + TelemetryRequest::Debian(p) => p.exec(host, handle), + TelemetryRequest::Fedora(p) => p.exec(host, handle), + TelemetryRequest::Freebsd(p) => p.exec(host, handle), + TelemetryRequest::Macos(p) => p.exec(host, handle), + TelemetryRequest::Nixos(p) => p.exec(host, handle), + TelemetryRequest::Ubuntu(p) => p.exec(host, handle), } } } diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 80307ac..afe944f 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -7,7 +7,8 @@ //! System generated data about your host. pub mod providers; -mod serializable; +#[doc(hidden)] +pub mod serializable; use pnet::datalink::NetworkInterface; diff --git a/core/src/telemetry/providers/centos.rs b/core/src/telemetry/providers/centos.rs index 7c8af0e..23f0e4c 100644 --- a/core/src/telemetry/providers/centos.rs +++ b/core/src/telemetry/providers/centos.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -12,25 +11,20 @@ use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; -use remote::{Executable, Runnable}; -use std::{env, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use remote::{CentosRequest, Executable, ExecutableResult, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; +use std::env; +use super::TelemetryProvider; use target::{default, linux, redhat}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Centos; struct LocalCentos; struct RemoteCentos; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum CentosRunnable { - Available, - Load, -} - impl Provider for Centos { fn available(host: &H) -> Box> { match host.get_type() { @@ -76,31 +70,46 @@ impl LocalCentos { impl RemoteCentos { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Centos( - CentosRunnable::Available)); - host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Centos", func: "available" }) + let runnable = Request::Telemetry( + TelemetryRequest::Centos( + CentosRequest::Available)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Centos", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Centos( - CentosRunnable::Load)); - Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Centos", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + let runnable = Request::Telemetry( + TelemetryRequest::Centos( + CentosRequest::Load)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Centos", func: "load" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } -impl Executable for CentosRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for CentosRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - CentosRunnable::Available => Box::new(LocalCentos::available().map(|b| Box::new(b) as Box)), - CentosRunnable::Load => Box::new(LocalCentos::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + CentosRequest::Available => Box::new( + LocalCentos::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + CentosRequest::Load => Box::new( + LocalCentos::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/debian.rs b/core/src/telemetry/providers/debian.rs index 9ebcca3..846bdf1 100644 --- a/core/src/telemetry/providers/debian.rs +++ b/core/src/telemetry/providers/debian.rs @@ -4,33 +4,27 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; -use remote::{Executable, Runnable}; use futures::{future, Future}; use host::{Host, HostType}; use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; +use remote::{DebianRequest, Executable, ExecutableResult, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; use std::{env, process, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Debian; struct LocalDebian; struct RemoteDebian; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum DebianRunnable { - Available, - Load, -} - impl Provider for Debian { fn available(host: &H) -> Box> { match host.get_type() { @@ -76,33 +70,46 @@ impl LocalDebian { impl RemoteDebian { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Debian( - DebianRunnable::Available)); - host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Debian", func: "available" }) + let runnable = Request::Telemetry( + TelemetryRequest::Debian( + DebianRequest::Available)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Debian", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Debian( - DebianRunnable::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Debian", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + let runnable = Request::Telemetry( + TelemetryRequest::Debian( + DebianRequest::Load)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Debian", func: "load" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } -impl Executable for DebianRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for DebianRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - DebianRunnable::Available => Box::new(LocalDebian::available().map(|b| Box::new(b) as Box)), - DebianRunnable::Load => Box::new(LocalDebian::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + DebianRequest::Available => Box::new( + LocalDebian::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + DebianRequest::Load => Box::new( + LocalDebian::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/fedora.rs b/core/src/telemetry/providers/fedora.rs index 2d407ca..806aa08 100644 --- a/core/src/telemetry/providers/fedora.rs +++ b/core/src/telemetry/providers/fedora.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -12,25 +11,20 @@ use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; -use remote::{Executable, Runnable}; -use std::{env, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use remote::{Executable, ExecutableResult, FedoraRequest, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; +use std::env; +use super::TelemetryProvider; use target::{default, linux, redhat}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Fedora; struct LocalFedora; struct RemoteFedora; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum FedoraRunnable { - Available, - Load, -} - impl Provider for Fedora { fn available(host: &H) -> Box> { match host.get_type() { @@ -76,33 +70,46 @@ impl LocalFedora { impl RemoteFedora { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Fedora( - FedoraRunnable::Available)); - host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Fedora", func: "available" }) + let runnable = Request::Telemetry( + TelemetryRequest::Fedora( + FedoraRequest::Available)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Fedora", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Fedora( - FedoraRunnable::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Fedora", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + let runnable = Request::Telemetry( + TelemetryRequest::Fedora( + FedoraRequest::Load)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Fedora", func: "load" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } -impl Executable for FedoraRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for FedoraRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - FedoraRunnable::Available => Box::new(LocalFedora::available().map(|b| Box::new(b) as Box)), - FedoraRunnable::Load => Box::new(LocalFedora::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + FedoraRequest::Available => Box::new( + LocalFedora::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + FedoraRequest::Load => Box::new( + LocalFedora::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/freebsd.rs b/core/src/telemetry/providers/freebsd.rs index 863b71c..e3ef5dc 100644 --- a/core/src/telemetry/providers/freebsd.rs +++ b/core/src/telemetry/providers/freebsd.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -13,25 +12,20 @@ use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; use regex::Regex; -use remote::{Executable, Runnable}; -use std::{env, fs, str}; +use remote::{Executable, ExecutableResult, FreebsdRequest, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; +use std::{env, fs}; use std::io::Read; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, unix}; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Freebsd; struct LocalFreebsd; struct RemoteFreebsd; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum FreebsdRunnable { - Available, - Load, -} - impl Provider for Freebsd { fn available(host: &H) -> Box> { match host.get_type() { @@ -77,33 +71,46 @@ impl LocalFreebsd { impl RemoteFreebsd { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Freebsd( - FreebsdRunnable::Available)); - host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Freebsd", func: "available" }) + let runnable = Request::Telemetry( + TelemetryRequest::Freebsd( + FreebsdRequest::Available)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Freebsd", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Freebsd( - FreebsdRunnable::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Freebsd", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + let runnable = Request::Telemetry( + TelemetryRequest::Freebsd( + FreebsdRequest::Load)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Freebsd", func: "load" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } -impl Executable for FreebsdRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for FreebsdRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - FreebsdRunnable::Available => Box::new(LocalFreebsd::available().map(|b| Box::new(b) as Box)), - FreebsdRunnable::Load => Box::new(LocalFreebsd::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + FreebsdRequest::Available => Box::new( + LocalFreebsd::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + FreebsdRequest::Load => Box::new( + LocalFreebsd::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/macos.rs b/core/src/telemetry/providers/macos.rs index 71a61dc..a646368 100644 --- a/core/src/telemetry/providers/macos.rs +++ b/core/src/telemetry/providers/macos.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -12,24 +11,19 @@ use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; -use remote::{Executable, Runnable}; +use remote::{Executable, ExecutableResult, MacosRequest, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; use std::{env, process, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, unix}; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Macos; struct LocalMacos; struct RemoteMacos; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum MacosRunnable { - Available, - Load, -} - impl Provider for Macos { fn available(host: &H) -> Box> { match host.get_type() { @@ -75,33 +69,46 @@ impl LocalMacos { impl RemoteMacos { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Macos( - MacosRunnable::Available)); - host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Macos", func: "available" }) + let runnable = Request::Telemetry( + TelemetryRequest::Macos( + MacosRequest::Available)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Macos", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Macos( - MacosRunnable::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Macos", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + let runnable = Request::Telemetry( + TelemetryRequest::Macos( + MacosRequest::Load)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Macos", func: "load" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } -impl Executable for MacosRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for MacosRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - MacosRunnable::Available => Box::new(LocalMacos::available().map(|b| Box::new(b) as Box)), - MacosRunnable::Load => Box::new(LocalMacos::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + MacosRequest::Available => Box::new( + LocalMacos::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + MacosRequest::Load => Box::new( + LocalMacos::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/mod.rs b/core/src/telemetry/providers/mod.rs index d8d114c..e421cef 100644 --- a/core/src/telemetry/providers/mod.rs +++ b/core/src/telemetry/providers/mod.rs @@ -12,54 +12,24 @@ mod macos; mod nixos; mod ubuntu; -pub use self::centos::{Centos, CentosRunnable}; -pub use self::debian::{Debian, DebianRunnable}; -pub use self::fedora::{Fedora, FedoraRunnable}; -pub use self::freebsd::{Freebsd, FreebsdRunnable}; -pub use self::macos::{Macos, MacosRunnable}; -pub use self::nixos::{Nixos, NixosRunnable}; -pub use self::ubuntu::{Ubuntu, UbuntuRunnable}; +pub use self::centos::Centos; +pub use self::debian::Debian; +pub use self::fedora::Fedora; +pub use self::freebsd::Freebsd; +pub use self::macos::Macos; +pub use self::nixos::Nixos; +pub use self::ubuntu::Ubuntu; -use erased_serde::Serialize; use errors::*; use futures::future::{self, Future}; use host::Host; -use host::local::Local; use provider::Provider; -use remote::Executable; use super::Telemetry; -use tokio_core::reactor::Handle; pub trait TelemetryProvider: Provider { fn load(&self, host: &H) -> Box>; } -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum TelemetryRunnable { - Centos(CentosRunnable), - Debian(DebianRunnable), - Fedora(FedoraRunnable), - Freebsd(FreebsdRunnable), - Macos(MacosRunnable), - Nixos(NixosRunnable), - Ubuntu(UbuntuRunnable), -} - -impl Executable for TelemetryRunnable { - fn exec(self, host: &Local, handle: &Handle) -> Box, Error = Error>> { - match self { - TelemetryRunnable::Centos(p) => p.exec(host, handle), - TelemetryRunnable::Debian(p) => p.exec(host, handle), - TelemetryRunnable::Fedora(p) => p.exec(host, handle), - TelemetryRunnable::Freebsd(p) => p.exec(host, handle), - TelemetryRunnable::Macos(p) => p.exec(host, handle), - TelemetryRunnable::Nixos(p) => p.exec(host, handle), - TelemetryRunnable::Ubuntu(p) => p.exec(host, handle), - } - } -} - pub fn factory(host: &H) -> Box> { let mut providers: Vec>> = Vec::new(); diff --git a/core/src/telemetry/providers/nixos.rs b/core/src/telemetry/providers/nixos.rs index 39af561..d220137 100644 --- a/core/src/telemetry/providers/nixos.rs +++ b/core/src/telemetry/providers/nixos.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -12,25 +11,20 @@ use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; -use remote::{Executable, Runnable}; +use remote::{Executable, ExecutableResult, NixosRequest, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; use std::{env, process, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Nixos; struct LocalNixos; struct RemoteNixos; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum NixosRunnable { - Available, - Load, -} - impl Provider for Nixos { fn available(host: &H) -> Box> { match host.get_type() { @@ -76,33 +70,46 @@ impl LocalNixos { impl RemoteNixos { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Nixos( - NixosRunnable::Available)); - host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Nixos", func: "available" }) + let runnable = Request::Telemetry( + TelemetryRequest::Nixos( + NixosRequest::Available)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Nixos", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Nixos( - NixosRunnable::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Nixos", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + let runnable = Request::Telemetry( + TelemetryRequest::Nixos( + NixosRequest::Load)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Nixos", func: "load" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } -impl Executable for NixosRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for NixosRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - NixosRunnable::Available => Box::new(LocalNixos::available().map(|b| Box::new(b) as Box)), - NixosRunnable::Load => Box::new(LocalNixos::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + NixosRequest::Available => Box::new( + LocalNixos::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + NixosRequest::Load => Box::new( + LocalNixos::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/ubuntu.rs b/core/src/telemetry/providers/ubuntu.rs index 61b7b11..43cb432 100644 --- a/core/src/telemetry/providers/ubuntu.rs +++ b/core/src/telemetry/providers/ubuntu.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -13,25 +12,20 @@ use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; use regex::Regex; -use remote::{Executable, Runnable}; +use remote::{Executable, ExecutableResult, Request, Response, ResponseResult, + TelemetryRequest, TelemetryResponse, UbuntuRequest}; use std::{env, process, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Ubuntu; struct LocalUbuntu; struct RemoteUbuntu; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum UbuntuRunnable { - Available, - Load, -} - impl Provider for Ubuntu { fn available(host: &H) -> Box> { match host.get_type() { @@ -77,33 +71,46 @@ impl LocalUbuntu { impl RemoteUbuntu { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Ubuntu( - UbuntuRunnable::Available)); - host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Ubuntu", func: "available" }) + let runnable = Request::Telemetry( + TelemetryRequest::Ubuntu( + UbuntuRequest::Available)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Ubuntu", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Ubuntu( - UbuntuRunnable::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Ubuntu", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + let runnable = Request::Telemetry( + TelemetryRequest::Ubuntu( + UbuntuRequest::Load)); + Box::new(host.call_req(runnable) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Ubuntu", func: "load" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } -impl Executable for UbuntuRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for UbuntuRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - UbuntuRunnable::Available => Box::new(LocalUbuntu::available().map(|b| Box::new(b) as Box)), - UbuntuRunnable::Load => Box::new(LocalUbuntu::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + UbuntuRequest::Available => Box::new( + LocalUbuntu::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + UbuntuRequest::Load => Box::new( + LocalUbuntu::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } }