From 6f0996e5bbf03c98beb501dfdaa68ced717591ef Mon Sep 17 00:00:00 2001 From: Pete Hayes Date: Mon, 23 Oct 2017 19:06:15 +0100 Subject: [PATCH] Streaming Command and fixes for streaming protocol. Closes intecture/api#40. --- agent/src/errors.rs | 32 ++++++ agent/src/main.rs | 34 +++--- core/examples/basic.rs | 43 ++++++- core/examples/remote_host.rs | 16 +++ core/src/command/mod.rs | 14 ++- core/src/command/providers/generic.rs | 146 +++++++++++++++++++----- core/src/command/providers/mod.rs | 9 +- core/src/errors.rs | 5 + core/src/host/mod.rs | 2 +- core/src/host/remote.rs | 145 +++++++++++++---------- core/src/remote.rs | 6 +- core/src/telemetry/providers/centos.rs | 15 ++- core/src/telemetry/providers/debian.rs | 17 ++- core/src/telemetry/providers/fedora.rs | 17 ++- core/src/telemetry/providers/freebsd.rs | 17 ++- core/src/telemetry/providers/macos.rs | 17 ++- core/src/telemetry/providers/nixos.rs | 17 ++- core/src/telemetry/providers/ubuntu.rs | 17 ++- 18 files changed, 410 insertions(+), 159 deletions(-) diff --git a/agent/src/errors.rs b/agent/src/errors.rs index 4470628..c297fbb 100644 --- a/agent/src/errors.rs +++ b/agent/src/errors.rs @@ -4,10 +4,42 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. +use futures::Future; use intecture_api; +use std::{convert, error, io}; error_chain! { links { Api(intecture_api::errors::Error, intecture_api::errors::ErrorKind); } } + +impl convert::From for io::Error { + fn from(e: Error) -> io::Error { + // @todo Return whole error chain + io::Error::new(io::ErrorKind::Other, e.description()) + } +} + +// @todo This should disappear once Futures are officially supported +// by error_chain. +// See: https://github.com/rust-lang-nursery/error-chain/issues/90 +pub type SFuture = Box>; + +pub trait FutureChainErr { + fn chain_err(self, callback: F) -> SFuture + where F: FnOnce() -> E + 'static, + E: Into; +} + +impl FutureChainErr for F + where F: Future + 'static, + F::Error: error::Error + Send + 'static, +{ + fn chain_err(self, callback: C) -> SFuture + where C: FnOnce() -> E + 'static, + E: Into, + { + Box::new(self.then(|r| r.chain_err(callback))) + } +} diff --git a/agent/src/main.rs b/agent/src/main.rs index 0cc5022..29d34e8 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -20,9 +20,9 @@ mod errors; use errors::*; use futures::{future, Future}; -use intecture_api::remote::{Executable, Runnable}; use intecture_api::host::local::Local; use intecture_api::host::remote::{JsonLineProto, LineMessage}; +use intecture_api::remote::{Executable, Request}; use std::fs::File; use std::io::{self, Read}; use std::net::SocketAddr; @@ -40,7 +40,7 @@ pub struct Api { impl Service for Api { type Request = LineMessage; type Response = LineMessage; - type Error = io::Error; + type Error = Error; type Future = Box>; fn call(&self, req: Self::Request) -> Self::Future { @@ -49,15 +49,9 @@ impl Service for Api { Message::WithoutBody(req) => req, }; - let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") { + let request: Request = match serde_json::from_value(req).chain_err(|| "Received invalid Request") { Ok(r) => r, - Err(e) => return Box::new( - future::err( - io::Error::new( - // @todo Can't wrap 'e' as error_chain Error doesn't derive Sync. - // Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163 - io::ErrorKind::Other, e.description() - ))), + Err(e) => return Box::new(future::err(e)) }; // XXX Danger zone! If we're running multiple threads, this `unwrap()` @@ -66,13 +60,17 @@ impl Service for Api { // only safe for the current thread. // See https://github.com/alexcrichton/tokio-process/issues/23 let handle = self.remote.handle().unwrap(); - Box::new(runnable.exec(&self.host, &handle) - // @todo Can't wrap 'e' as error_chain Error doesn't derive Sync. - // Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163 - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.description())) - .and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") { - Ok(v) => future::ok(Message::WithoutBody(v)), - Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())), + Box::new(request.exec(&self.host, &handle) + .chain_err(|| "Failed to execute Request") + .and_then(|mut msg| { + let body = msg.take_body(); + match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize result") { + Ok(v) => match body { + Some(b) => future::ok(Message::WithBody(v, b)), + None => future::ok(Message::WithoutBody(v)), + }, + Err(e) => future::err(e), + } })) } } @@ -80,7 +78,7 @@ impl Service for Api { impl NewService for Api { type Request = LineMessage; type Response = LineMessage; - type Error = io::Error; + type Error = Error; type Instance = Api; fn new_service(&self) -> io::Result { Ok(Api { diff --git a/core/examples/basic.rs b/core/examples/basic.rs index 85e408b..098526e 100644 --- a/core/examples/basic.rs +++ b/core/examples/basic.rs @@ -8,21 +8,58 @@ extern crate futures; extern crate intecture_api; extern crate tokio_core; -use futures::Future; +use futures::{Future, Stream}; use intecture_api::prelude::*; use tokio_core::reactor::Core; fn main() { + // These two lines are part of `tokio-core` and can be safely + // ignored. So long as they appear at the top of your code, + // all is fine with the world. let mut core = Core::new().unwrap(); let handle = core.handle(); + // Here's the meat of your project. In this example we're talking + // to our local machine, so we use the `Local` host type. let host = Local::new().and_then(|host| { + // Ok, we're in! Now we can pass our `host` handle to other + // endpoints, which informs them of the server we mean to + // talk to. + + // Let's start with something basic - a shell command. Command::new(&host, "whoami", None).and_then(|cmd| { - cmd.exec(&handle).map(|out| { - println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim()); + // Now that we have our `Command` instance, let's run it. + cmd.exec(&handle).and_then(|(stream, status)| { + // At this point, our command is running. As the API + // is asynchronous, we don't have to wait for it to + // finish before inspecting its output. This is called + // "streaming". + + // Our first argument, `stream`, is a stream of strings, + // each of which represents a line of output. We can use + // the `for_each` combinator to print these lines to + // stdout. + // + // If printing isn't your thing, you are also + // free to lick them or whatever you're into. I'm not + // here to judge. + stream.for_each(|line| { println!("{}", line); Ok(()) }) + + // The second argument is a `Future` that represents the + // command's exit status. Let's print that too*. + // + // * Same caveat as above RE: printing. This is a safe + // place. + .join(status.map(|s| println!("This command {} {}", + if s.success { "succeeded" } else { "failed" }, + if let Some(e) = s.code { format!("with code {}", e) } else { String::new() }))) }) }) }); + // This line is part of `tokio-core` and is used to execute the + // chain of futures you've created above. You'll need to call + // `core.run()` for each host you interact with, otherwise your + // project will not run at all! core.run(host).unwrap(); } diff --git a/core/examples/remote_host.rs b/core/examples/remote_host.rs index d62e5ef..d447cce 100644 --- a/core/examples/remote_host.rs +++ b/core/examples/remote_host.rs @@ -13,12 +13,28 @@ use intecture_api::prelude::*; use tokio_core::reactor::Core; fn main() { + // These two lines are part of `tokio-core` and can be safely + // ignored. So long as they appear at the top of your code, + // all is fine with the world. let mut core = Core::new().unwrap(); let handle = core.handle(); + // Here's the meat of your project. In this example we're talking + // to a remote machine. You'll note that this is the `Plain` host + // type, where you might have been expecting `Remote` or some such. + // This is to signify that this host type sends data in the clear, + // rather than encrypting it. Thus the usual disclaimer about + // secure networks and trust applies. let host = Plain::connect("127.0.0.1:7101", &handle).map(|host| { + // Ok, we're in! Now we can pass our `host` handle to other + // endpoints, which informs them of the server we mean to + // talk to. See basic.rs for more usage. println!("Connected to {}", host.telemetry().hostname); }); + // This line is part of `tokio-core` and is used to execute the + // chain of futures you've created above. You'll need to call + // `core.run()` for each host you interact with, otherwise your + // project will not run at all! core.run(host).unwrap(); } diff --git a/core/src/command/mod.rs b/core/src/command/mod.rs index ebc4818..1c4a5d5 100644 --- a/core/src/command/mod.rs +++ b/core/src/command/mod.rs @@ -10,6 +10,7 @@ pub mod providers; use errors::*; use futures::{future, Future}; +use futures::stream::Stream; use host::Host; use self::providers::CommandProvider; use tokio_core::reactor::Handle; @@ -27,11 +28,9 @@ pub struct Command { } #[derive(Debug, Serialize, Deserialize)] -pub struct CommandResult { +pub struct ExitStatus { pub success: bool, - pub exit_code: Option, - pub stdout: Vec, - pub stderr: Vec, + pub code: Option, } impl Command { @@ -68,7 +67,12 @@ impl Command { } } - pub fn exec(&self, handle: &Handle) -> Box> { + pub fn exec(&self, handle: &Handle) -> + Box>, + Box> + ), Error = Error>> + { self.inner.exec(&self.host, handle, &self.cmd, &self.shell) } } diff --git a/core/src/command/providers/generic.rs b/core/src/command/providers/generic.rs index 51c48d5..87a9eb1 100644 --- a/core/src/command/providers/generic.rs +++ b/core/src/command/providers/generic.rs @@ -4,20 +4,27 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use command::CommandResult; +use command::providers::ExitStatus; use errors::*; use futures::{future, Future}; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::sync::{mpsc, oneshot}; use host::{Host, HostType}; use host::local::Local; use host::remote::Plain; use provider::Provider; use remote::{CommandRequest, CommandResponse, Executable, ExecutableResult, GenericRequest, Request, Response, ResponseResult}; -use std::process; +use serde_json; +use std::io::{self, BufReader}; +use std::process::{Command, Stdio}; +use std::result; use super::CommandProvider; use tokio_core::reactor::Handle; +use tokio_io::io::lines; use tokio_process::CommandExt; -use tokio_proto::streaming::Message; +use tokio_proto::streaming::{Body, Message}; #[derive(Clone)] pub struct Generic; @@ -46,7 +53,12 @@ impl Provider for Generic { } impl CommandProvider for Generic { - fn exec(&self, host: &H, handle: &Handle, cmd: &str, shell: &[String]) -> Box> { + fn exec(&self, host: &H, handle: &Handle, cmd: &str, shell: &[String]) -> + Box>, + Box> + ), Error = Error>> + { match host.get_type() { HostType::Local(_) => LocalGeneric::exec(handle, cmd, shell), HostType::Remote(r) => RemoteGeneric::exec(r, cmd, shell), @@ -59,7 +71,12 @@ impl LocalGeneric { Box::new(future::ok(cfg!(unix))) } - fn exec(handle: &Handle, cmd: &str, shell: &[String]) -> Box> { + fn exec(handle: &Handle, cmd: &str, shell: &[String]) -> + Box>, + Box> + ), Error = Error>> + { let cmd = cmd.to_owned(); let shell = shell.to_owned(); let (shell, shell_args) = match shell.split_first() { @@ -67,19 +84,34 @@ impl LocalGeneric { None => return Box::new(future::err("Invalid shell provided".into())), }; - Box::new(process::Command::new(shell) + let child = Command::new(shell) .args(shell_args) .arg(&cmd) - .output_async(handle) - .chain_err(|| "Command execution failed") - .and_then(|output| { - future::ok(CommandResult { - success: output.status.success(), - exit_code: output.status.code(), - stdout: output.stdout, - stderr: output.stderr, - }) - })) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn_async(handle) + .chain_err(|| "Command execution failed"); + let mut child = match child { + Ok(c) => c, + Err(e) => return Box::new(future::err(e)), + }; + + let stdout = child.stdout().take().unwrap(); + let outbuf = BufReader::new(stdout); + let stderr = child.stderr().take().unwrap(); + let errbuf = BufReader::new(stderr); + let lines = lines(outbuf).select(lines(errbuf)); + + Box::new(future::ok( + (Box::new(lines.then(|r| r.chain_err(|| "Command execution failed"))) as Box>, + Box::new(child.then(|r| match r.chain_err(|| "Command execution failed") { + Ok(c) => future::ok(ExitStatus { + success: c.success(), + code: c.code(), + }), + Err(e) => future::err(e) + })) as Box>) + )) } } @@ -88,16 +120,57 @@ impl RemoteGeneric { let runnable = Request::Command( CommandRequest::Generic( GenericRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Command::Generic", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Command(CommandResponse::Available(b)) => b, + _ => unreachable!(), + })) } - fn exec(host: &Plain, cmd: &str, shell: &[String]) -> Box> { + fn exec(host: &Plain, cmd: &str, shell: &[String]) -> + Box>, + Box> + ), Error = Error>> + { let runnable = Request::Command( CommandRequest::Generic( GenericRequest::Exec(cmd.into(), shell.to_owned()))); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Command::Generic", func: "exec" }) + .map(|mut msg| { + let (tx, rx) = oneshot::channel::(); + let mut tx_share = Some(tx); + let mut found = false; + ( + Box::new(msg.take_body() + .expect("Command::exec reply missing body stream") + .filter_map(move |v| { + let s = String::from_utf8_lossy(&v).to_string(); + + // @todo This is a heuristical approach which is fallible + if !found && s.starts_with("ExitStatus:") { + let (_, json) = s.split_at(11); + match serde_json::from_str(json) { + Ok(status) => { + // @todo What should happen if this fails? + let _ = tx_share.take().unwrap().send(status); + found = true; + return None; + }, + _ => (), + } + } + + Some(s) + }) + .then(|r| r.chain_err(|| "Command execution failed")) + ) as Box>, + Box::new(rx.chain_err(|| "Buffer dropped before ExitStatus was sent")) + as Box> + ) + })) } } @@ -110,13 +183,34 @@ impl Executable for GenericRequest { ResponseResult::Ok( Response::Command( CommandResponse::Available(b)))))), - GenericRequest::Exec(cmd, shell) => Box::new( - LocalGeneric::exec(handle, &cmd, &shell) - .map(|t| Message::WithoutBody( - ResponseResult::Ok( - Response::Command( - CommandResponse::Exec(t.into())))) - )) + GenericRequest::Exec(cmd, shell) => { + let handle = handle.clone(); + Box::new(LocalGeneric::exec(&handle, &cmd, &shell) + .and_then(move |(lines, status)| { + let (tx1, body) = Body::pair(); + let tx2 = tx1.clone(); + let msg = Message::WithBody(ResponseResult::Ok(Response::Command(CommandResponse::Exec)), body); + let stream = lines.map(|s| Ok(s.into_bytes())) + .forward(tx1.sink_map_err(|e| Error::with_chain(e, "Could not forward command output to Body"))) + .join(status.and_then(|s| match serde_json::to_string(&s) + .chain_err(|| "Could not serialize `ExitStatus` struct") + { + Ok(s) => { + let mut frame = "ExitStatus:".to_owned(); + frame.push_str(&s); + Box::new(tx2.send(Ok(frame.into_bytes())) + .map_err(|e| Error::with_chain(e, "Could not forward command output to Body")) + ) as Box, io::Error>>, Error = Error>> + }, + Err(e) => Box::new(future::err(e)), + })) + // @todo We should repatriate these errors somehow + .map(|_| ()) + .map_err(|_| ()); + handle.spawn(stream); + future::ok(msg) + })) + }, } } } diff --git a/core/src/command/providers/mod.rs b/core/src/command/providers/mod.rs index a1f0ef6..58f54ce 100644 --- a/core/src/command/providers/mod.rs +++ b/core/src/command/providers/mod.rs @@ -6,16 +6,21 @@ mod generic; +use command::ExitStatus; use errors::*; use futures::{future, Future}; +use futures::stream::Stream; use host::Host; use provider::Provider; pub use self::generic::Generic; -use super::CommandResult; use tokio_core::reactor::Handle; pub trait CommandProvider: Provider { - fn exec(&self, &H, &Handle, &str, &[String]) -> Box>; + fn exec(&self, &H, &Handle, &str, &[String]) -> + Box>, + Box> + ), Error = Error>>; } pub fn factory(host: &H) -> Box>, Error = Error>> { diff --git a/core/src/errors.rs b/core/src/errors.rs index e85effc..7a0efb4 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -34,6 +34,11 @@ error_chain! { display("Could not run {}::{}() on host", endpoint, func), } + Remote(e: String) { + description("Error running command on remote host"), + display("Error running command on remote host: {}", e), + } + SystemCommand(c: &'static str) { description("Error running system command"), display("Error running system command '{}'", c), diff --git a/core/src/host/mod.rs b/core/src/host/mod.rs index 2d17b27..c8dc589 100644 --- a/core/src/host/mod.rs +++ b/core/src/host/mod.rs @@ -15,7 +15,7 @@ use futures::Future; use telemetry::Telemetry; pub trait Host: Clone { - //// Retrieve Telemetry + /// Retrieve Telemetry fn telemetry(&self) -> &Telemetry; #[doc(hidden)] fn get_type<'a>(&'a self) -> HostType<'a>; diff --git a/core/src/host/remote.rs b/core/src/host/remote.rs index 22d3ae5..85ce4fe 100644 --- a/core/src/host/remote.rs +++ b/core/src/host/remote.rs @@ -7,8 +7,7 @@ use bytes::BytesMut; use errors::*; use futures::{future, Future}; -use remote::Request; -use serde::Deserialize; +use remote::{Request, Response, ResponseResult}; use serde_json; use std::{io, result}; use std::sync::Arc; @@ -81,34 +80,8 @@ impl Plain { } #[doc(hidden)] - pub fn run(&self, provider: Request) -> Box> - where for<'de> D: Deserialize<'de> - { - Box::new(self.run_msg::(provider) - .map(|msg| msg.into_inner())) - } - - #[doc(hidden)] - pub fn run_msg(&self, provider: Request) -> Box, io::Error>>, Error = Error>> - where for<'de> D: Deserialize<'de> - { - let value = match serde_json::to_value(provider).chain_err(|| "Could not encode provider to send to host") { - Ok(v) => v, - Err(e) => return Box::new(future::err(e)) - }; - Box::new(self.inner.inner.call(Message::WithoutBody(value)) - .chain_err(|| "Error while running provider on host") - .and_then(|mut msg| { - let body = msg.take_body(); - let msg = match serde_json::from_value::(msg.into_inner()).chain_err(|| "Could not understand response from host") { - Ok(d) => d, - Err(e) => return Box::new(future::err(e)), - }; - Box::new(future::ok(match body { - Some(b) => Message::WithBody(msg, b), - None => Message::WithoutBody(msg), - })) - })) + pub fn call_req(&self, request: Request) -> Box, io::Error>>, Error = Error>> { + self.call(Message::WithoutBody(request)) } } @@ -123,13 +96,49 @@ impl Host for Plain { } impl Service for Plain { - type Request = LineMessage; - type Response = LineMessage; - type Error = io::Error; + type Request = Message, io::Error>>; + type Response = Message, io::Error>>; + type Error = Error; type Future = Box>; - fn call(&self, req: Self::Request) -> Self::Future { - Box::new(self.inner.inner.call(req)) as Self::Future + fn call(&self, mut req: Self::Request) -> Self::Future { + let body = req.take_body(); + let request = req.into_inner(); + + let value = match serde_json::to_value(request).chain_err(|| "Could not encode provider to send to host") { + Ok(v) => v, + Err(e) => return Box::new(future::err(e)) + }; + + debug!("Sending JSON request: {}", value); + + let json_msg = match body { + Some(b) => Message::WithBody(value, b), + None => Message::WithoutBody(value), + }; + + Box::new(self.inner.inner.call(json_msg) + .chain_err(|| "Error while running provider on host") + .and_then(|mut msg| { + let body = msg.take_body(); + let header = msg.into_inner(); + + debug!("Received JSON response: {}", header); + + let result: ResponseResult = match serde_json::from_value(header).chain_err(|| "Could not understand response from host") { + Ok(d) => d, + Err(e) => return Box::new(future::err(e)), + }; + + let msg = match result { + ResponseResult::Ok(msg) => msg, + ResponseResult::Err(e) => return Box::new(future::err(ErrorKind::Remote(e).into())), + }; + Box::new(future::ok(match body { + Some(b) => Message::WithBody(msg, b), + None => Message::WithoutBody(msg), + })) + })) } } @@ -145,33 +154,45 @@ impl Decoder for JsonLineCodec { buf.split_to(1); - if line.is_empty() { - let decoding_head = self.decoding_head; - self.decoding_head = !decoding_head; + if self.decoding_head { + debug!("Decoding header: {:?}", line); - if decoding_head { - Ok(Some(Frame::Message { - message: serde_json::Value::Null, - body: true, - })) - } else { - Ok(Some(Frame::Body { - chunk: None - })) + // The last byte in this frame is a bool that indicates + // whether we have a body stream following or not. + // This byte must exist, or our codec is buggered and + // panicking is appropriate. + let (has_body, line) = line.split_last() + .expect("Missing body byte at end of message frame"); + + debug!("Body byte: {:?}", has_body); + + if *has_body == 1 { + self.decoding_head = false; } + + let frame = Frame::Message { + message: serde_json::from_slice(&line).map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + })?, + body: *has_body == 1, + }; + + debug!("Decoded header: {:?}", frame); + + Ok(Some(frame)) } else { - if self.decoding_head { - Ok(Some(Frame::Message { - message: serde_json::from_slice(&line).map_err(|e| { - io::Error::new(io::ErrorKind::Other, e) - })?, - body: false, - })) + debug!("Decoding body chunk: {:?}", line); + + let frame = if line.is_empty() { + self.decoding_head = true; + Frame::Body { chunk: None } } else { - Ok(Some(Frame::Body { - chunk: Some(line.to_vec()), - })) - } + Frame::Body { chunk: Some(line.to_vec()) } + }; + + debug!("Decoded body chunk: {:?}", frame); + + Ok(Some(frame)) } } } @@ -183,15 +204,17 @@ impl Encoder for JsonLineCodec { fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> { match msg { Frame::Message { message, body } => { - // Our protocol dictates that a message head that - // includes a streaming body is an empty string. - assert!(message.is_null() == body); + debug!("Encoding header: {:?}, {:?}", message, body); let json = serde_json::to_vec(&message) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; buf.extend(&json); + // Add 'has-body' flag + buf.extend(if body { &[1] } else { &[0] }); } Frame::Body { chunk } => { + debug!("Encoding chunk: {:?}", chunk); + if let Some(chunk) = chunk { buf.extend(&chunk); } diff --git a/core/src/remote.rs b/core/src/remote.rs index f712faf..f02def4 100644 --- a/core/src/remote.rs +++ b/core/src/remote.rs @@ -6,15 +6,15 @@ // Hopefully in the near future this will be auto-generated from `derive` attributes. -use command::CommandResult; use errors::*; use futures::Future; use host::local::Local; +use std::io; use telemetry::serializable::Telemetry; use tokio_core::reactor::Handle; use tokio_proto::streaming::{Body, Message}; -pub type ExecutableResult = Box, Error>>, Error = Error>>; +pub type ExecutableResult = Box, io::Error>>, Error = Error>>; pub trait Executable { fn exec(self, &Local, &Handle) -> ExecutableResult; @@ -65,7 +65,7 @@ pub enum GenericRequest { #[derive(Serialize, Deserialize)] pub enum CommandResponse { Available(bool), - Exec(CommandResult), + Exec, } impl Executable for CommandRequest { diff --git a/core/src/telemetry/providers/centos.rs b/core/src/telemetry/providers/centos.rs index 3c68192..23f0e4c 100644 --- a/core/src/telemetry/providers/centos.rs +++ b/core/src/telemetry/providers/centos.rs @@ -17,7 +17,7 @@ use std::env; use super::TelemetryProvider; use target::{default, linux, redhat}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -73,17 +73,24 @@ impl RemoteCentos { let runnable = Request::Telemetry( TelemetryRequest::Centos( CentosRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Centos", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Centos( CentosRequest::Load)); - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Centos", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/debian.rs b/core/src/telemetry/providers/debian.rs index 3395e70..846bdf1 100644 --- a/core/src/telemetry/providers/debian.rs +++ b/core/src/telemetry/providers/debian.rs @@ -17,7 +17,7 @@ use std::{env, process, str}; use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -73,19 +73,24 @@ impl RemoteDebian { let runnable = Request::Telemetry( TelemetryRequest::Debian( DebianRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Debian", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Debian( DebianRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Debian", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/fedora.rs b/core/src/telemetry/providers/fedora.rs index d3561d6..806aa08 100644 --- a/core/src/telemetry/providers/fedora.rs +++ b/core/src/telemetry/providers/fedora.rs @@ -17,7 +17,7 @@ use std::env; use super::TelemetryProvider; use target::{default, linux, redhat}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -73,19 +73,24 @@ impl RemoteFedora { let runnable = Request::Telemetry( TelemetryRequest::Fedora( FedoraRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Fedora", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Fedora( FedoraRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Fedora", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/freebsd.rs b/core/src/telemetry/providers/freebsd.rs index 950b182..e3ef5dc 100644 --- a/core/src/telemetry/providers/freebsd.rs +++ b/core/src/telemetry/providers/freebsd.rs @@ -18,7 +18,7 @@ use std::{env, fs}; use std::io::Read; use super::TelemetryProvider; use target::{default, unix}; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -74,19 +74,24 @@ impl RemoteFreebsd { let runnable = Request::Telemetry( TelemetryRequest::Freebsd( FreebsdRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Freebsd", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Freebsd( FreebsdRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Freebsd", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/macos.rs b/core/src/telemetry/providers/macos.rs index 000497c..a646368 100644 --- a/core/src/telemetry/providers/macos.rs +++ b/core/src/telemetry/providers/macos.rs @@ -16,7 +16,7 @@ use remote::{Executable, ExecutableResult, MacosRequest, Request, Response, use std::{env, process, str}; use super::TelemetryProvider; use target::{default, unix}; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -72,19 +72,24 @@ impl RemoteMacos { let runnable = Request::Telemetry( TelemetryRequest::Macos( MacosRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Macos", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Macos( MacosRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Macos", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/nixos.rs b/core/src/telemetry/providers/nixos.rs index 1576e1c..d220137 100644 --- a/core/src/telemetry/providers/nixos.rs +++ b/core/src/telemetry/providers/nixos.rs @@ -17,7 +17,7 @@ use std::{env, process, str}; use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -73,19 +73,24 @@ impl RemoteNixos { let runnable = Request::Telemetry( TelemetryRequest::Nixos( NixosRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Nixos", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Nixos( NixosRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Nixos", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/ubuntu.rs b/core/src/telemetry/providers/ubuntu.rs index 9a97b9c..43cb432 100644 --- a/core/src/telemetry/providers/ubuntu.rs +++ b/core/src/telemetry/providers/ubuntu.rs @@ -18,7 +18,7 @@ use std::{env, process, str}; use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -74,19 +74,24 @@ impl RemoteUbuntu { let runnable = Request::Telemetry( TelemetryRequest::Ubuntu( UbuntuRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Ubuntu", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Ubuntu( UbuntuRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Ubuntu", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } }