From 17b5eb40decc92472052b08480cbe870ecfaa1d4 Mon Sep 17 00:00:00 2001 From: Pete Hayes Date: Tue, 17 Oct 2017 16:56:34 +0100 Subject: [PATCH 1/6] Implement streaming proto --- agent/src/main.rs | 20 +++-- core/src/command/mod.rs | 2 +- core/src/host/remote.rs | 158 +++++++++++++++++++++++++++++----------- 3 files changed, 131 insertions(+), 49 deletions(-) diff --git a/agent/src/main.rs b/agent/src/main.rs index 6092e0c..0cc5022 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -22,12 +22,13 @@ use errors::*; use futures::{future, Future}; use intecture_api::remote::{Executable, Runnable}; use intecture_api::host::local::Local; -use intecture_api::host::remote::JsonProto; +use intecture_api::host::remote::{JsonLineProto, LineMessage}; use std::fs::File; use std::io::{self, Read}; use std::net::SocketAddr; use std::sync::Arc; use tokio_core::reactor::Remote; +use tokio_proto::streaming::Message; use tokio_proto::TcpServer; use tokio_service::{NewService, Service}; @@ -37,12 +38,17 @@ pub struct Api { } impl Service for Api { - type Request = serde_json::Value; - type Response = serde_json::Value; + type Request = LineMessage; + type Response = LineMessage; type Error = io::Error; type Future = Box>; fn call(&self, req: Self::Request) -> Self::Future { + let req = match req { + Message::WithBody(req, _) => req, + Message::WithoutBody(req) => req, + }; + let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") { Ok(r) => r, Err(e) => return Box::new( @@ -65,15 +71,15 @@ impl Service for Api { // Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163 .map_err(|e| io::Error::new(io::ErrorKind::Other, e.description())) .and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") { - Ok(v) => future::ok(v), + Ok(v) => future::ok(Message::WithoutBody(v)), Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())), })) } } impl NewService for Api { - type Request = serde_json::Value; - type Response = serde_json::Value; + type Request = LineMessage; + type Response = LineMessage; type Error = io::Error; type Instance = Api; fn new_service(&self) -> io::Result { @@ -129,7 +135,7 @@ quick_main!(|| -> Result<()> { // Currently we force the issue (`unwrap()`), which is only safe // for the current thread. // See https://github.com/alexcrichton/tokio-process/issues/23 - let server = TcpServer::new(JsonProto, config.address); + let server = TcpServer::new(JsonLineProto, config.address); server.with_handle(move |handle| { let api = Api { host: host.clone(), diff --git a/core/src/command/mod.rs b/core/src/command/mod.rs index c1892fd..ebc4818 100644 --- a/core/src/command/mod.rs +++ b/core/src/command/mod.rs @@ -68,7 +68,7 @@ impl Command { } } - pub fn exec(&mut self, handle: &Handle) -> Box> { + pub fn exec(&self, handle: &Handle) -> Box> { self.inner.exec(&self.host, handle, &self.cmd, &self.shell) } } diff --git a/core/src/host/remote.rs b/core/src/host/remote.rs index 4af2985..7f28f4c 100644 --- a/core/src/host/remote.rs +++ b/core/src/host/remote.rs @@ -4,7 +4,7 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use bytes::{BufMut, BytesMut}; +use bytes::BytesMut; use errors::*; use futures::{future, Future}; use remote::Runnable; @@ -15,14 +15,18 @@ use std::sync::Arc; use std::net::SocketAddr; use super::{Host, HostType}; use telemetry::{self, Telemetry}; -use tokio_core::net::TcpStream; use tokio_core::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::{Encoder, Decoder, Framed}; -use tokio_proto::pipeline::{ClientProto, ClientService, ServerProto}; +use tokio_proto::streaming::{Body, Message}; +use tokio_proto::streaming::pipeline::{ClientProto, Frame, ServerProto}; use tokio_proto::TcpClient; +use tokio_proto::util::client_proxy::ClientProxy; use tokio_service::Service; +#[doc(hidden)] +pub type LineMessage = Message, io::Error>>; + /// A `Host` type that uses an unencrypted socket. /// /// *Warning! An unencrypted host is susceptible to eavesdropping and MITM @@ -34,12 +38,14 @@ pub struct Plain { } struct Inner { - inner: ClientService, + inner: ClientProxy, telemetry: Option, } -pub struct JsonCodec; -pub struct JsonProto; +pub struct JsonLineCodec { + decoding_head: bool, +} +pub struct JsonLineProto; impl Plain { /// Create a new Host connected to addr. @@ -51,7 +57,7 @@ impl Plain { info!("Connecting to host {}", addr); - Box::new(TcpClient::new(JsonProto) + Box::new(TcpClient::new(JsonLineProto) .connect(&addr, handle) .chain_err(|| "Could not connect to host") .and_then(|client_service| { @@ -77,16 +83,31 @@ impl Plain { #[doc(hidden)] pub fn run(&self, provider: Runnable) -> Box> where for<'de> D: Deserialize<'de> + { + Box::new(self.run_msg::(provider) + .map(|msg| msg.into_inner())) + } + + #[doc(hidden)] + pub fn run_msg(&self, provider: Runnable) -> Box, io::Error>>, Error = Error>> + where for<'de> D: Deserialize<'de> { let value = match serde_json::to_value(provider).chain_err(|| "Could not encode provider to send to host") { Ok(v) => v, Err(e) => return Box::new(future::err(e)) }; - Box::new(self.call(value) + Box::new(self.inner.inner.call(Message::WithoutBody(value)) .chain_err(|| "Error while running provider on host") - .and_then(|v| match serde_json::from_value::(v).chain_err(|| "Could not understand response from host") { - Ok(d) => future::ok(d), - Err(e) => future::err(e) + .and_then(|mut msg| { + let body = msg.take_body(); + let msg = match serde_json::from_value::(msg.into_inner()).chain_err(|| "Could not understand response from host") { + Ok(d) => d, + Err(e) => return Box::new(future::err(e)), + }; + Box::new(future::ok(match body { + Some(b) => Message::WithBody(msg, b), + None => Message::WithoutBody(msg), + })) })) } } @@ -102,8 +123,8 @@ impl Host for Plain { } impl Service for Plain { - type Request = serde_json::Value; - type Response = serde_json::Value; + type Request = LineMessage; + type Response = LineMessage; type Error = io::Error; type Future = Box>; @@ -112,58 +133,113 @@ impl Service for Plain { } } -impl Decoder for JsonCodec { - type Item = serde_json::Value; +impl Decoder for JsonLineCodec { + type Item = Frame, io::Error>; type Error = io::Error; - fn decode(&mut self, buf: &mut BytesMut) -> result::Result, Self::Error> { - // Check to see if the frame contains a new line - if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') { - // remove the serialized frame from the buffer. - let line = buf.split_to(n); - - // Also remove the '\n' - buf.split_to(1); + fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { + let line = match buf.iter().position(|b| *b == b'\n') { + Some(n) => buf.split_to(n), + None => return Ok(None), + }; - return Ok(Some(serde_json::from_slice(&line).unwrap())); + buf.split_to(1); + + if line.is_empty() { + let decoding_head = self.decoding_head; + self.decoding_head = !decoding_head; + + if decoding_head { + Ok(Some(Frame::Message { + message: serde_json::Value::Null, + body: true, + })) + } else { + Ok(Some(Frame::Body { + chunk: None + })) + } + } else { + if self.decoding_head { + Ok(Some(Frame::Message { + message: serde_json::from_slice(&line).map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + })?, + body: false, + })) + } else { + Ok(Some(Frame::Body { + chunk: Some(line.to_vec()), + })) + } } - - Ok(None) } } -impl Encoder for JsonCodec { - type Item = serde_json::Value; +impl Encoder for JsonLineCodec { + type Item = Frame, io::Error>; type Error = io::Error; - fn encode(&mut self, value: Self::Item, buf: &mut BytesMut) -> io::Result<()> { - let json = serde_json::to_string(&value).unwrap(); - buf.reserve(json.len() + 1); - buf.extend(json.as_bytes()); - buf.put_u8(b'\n'); + fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> { + match msg { + Frame::Message { message, body } => { + // Our protocol dictates that a message head that + // includes a streaming body is an empty string. + assert!(message.is_null() == body); + + let json = serde_json::to_vec(&message) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + buf.extend(&json); + } + Frame::Body { chunk } => { + if let Some(chunk) = chunk { + buf.extend(&chunk); + } + } + Frame::Error { error } => { + // @todo Support error frames + return Err(error) + } + } + + buf.extend(b"\n"); Ok(()) } } -impl ClientProto for JsonProto { +impl ClientProto for JsonLineProto { type Request = serde_json::Value; + type RequestBody = Vec; type Response = serde_json::Value; - type Transport = Framed; - type BindTransport = result::Result; + type ResponseBody = Vec; + type Error = io::Error; + type Transport = Framed; + type BindTransport = result::Result; fn bind_transport(&self, io: T) -> Self::BindTransport { - Ok(io.framed(JsonCodec)) + let codec = JsonLineCodec { + decoding_head: true, + }; + + Ok(io.framed(codec)) } } -impl ServerProto for JsonProto { +impl ServerProto for JsonLineProto { type Request = serde_json::Value; + type RequestBody = Vec; type Response = serde_json::Value; - type Transport = Framed; - type BindTransport = result::Result; + type ResponseBody = Vec; + type Error = io::Error; + type Transport = Framed; + type BindTransport = result::Result; fn bind_transport(&self, io: T) -> Self::BindTransport { - Ok(io.framed(JsonCodec)) + let codec = JsonLineCodec { + decoding_head: true, + }; + + Ok(io.framed(codec)) } } From 93ee18ef8c8dd80a7ab85296f98bc24968ed4c8d Mon Sep 17 00:00:00 2001 From: Pete Hayes Date: Wed, 18 Oct 2017 13:03:19 +0100 Subject: [PATCH 2/6] Remove unnecessary mutable ref --- core/examples/basic.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/examples/basic.rs b/core/examples/basic.rs index 3588606..85e408b 100644 --- a/core/examples/basic.rs +++ b/core/examples/basic.rs @@ -17,7 +17,7 @@ fn main() { let handle = core.handle(); let host = Local::new().and_then(|host| { - Command::new(&host, "whoami", None).and_then(|mut cmd| { + Command::new(&host, "whoami", None).and_then(|cmd| { cmd.exec(&handle).map(|out| { println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim()); }) @@ -26,4 +26,3 @@ fn main() { core.run(host).unwrap(); } - From c63cd0c049c77d6aa1211bc097b2f8c1123a5950 Mon Sep 17 00:00:00 2001 From: Pete Hayes Date: Sat, 21 Oct 2017 19:09:05 +0100 Subject: [PATCH 3/6] Add Response remote type, rename Runnable to Request --- core/src/command/providers/generic.rs | 49 ++++---- core/src/command/providers/mod.rs | 19 +--- core/src/errors.rs | 2 +- core/src/host/remote.rs | 6 +- core/src/remote.rs | 144 ++++++++++++++++++++++-- core/src/telemetry/mod.rs | 3 +- core/src/telemetry/providers/centos.rs | 54 ++++----- core/src/telemetry/providers/debian.rs | 52 +++++---- core/src/telemetry/providers/fedora.rs | 54 ++++----- core/src/telemetry/providers/freebsd.rs | 54 ++++----- core/src/telemetry/providers/macos.rs | 52 +++++---- core/src/telemetry/providers/mod.rs | 44 ++------ core/src/telemetry/providers/nixos.rs | 52 +++++---- core/src/telemetry/providers/ubuntu.rs | 52 +++++---- 14 files changed, 366 insertions(+), 271 deletions(-) diff --git a/core/src/command/providers/generic.rs b/core/src/command/providers/generic.rs index f63ae3b..51c48d5 100644 --- a/core/src/command/providers/generic.rs +++ b/core/src/command/providers/generic.rs @@ -5,31 +5,25 @@ // modified, or distributed except according to those terms. use command::CommandResult; -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; use host::local::Local; use host::remote::Plain; use provider::Provider; -use remote::{Executable, Runnable}; +use remote::{CommandRequest, CommandResponse, Executable, ExecutableResult, + GenericRequest, Request, Response, ResponseResult}; use std::process; -use super::{CommandProvider, CommandRunnable}; +use super::CommandProvider; use tokio_core::reactor::Handle; use tokio_process::CommandExt; +use tokio_proto::streaming::Message; #[derive(Clone)] pub struct Generic; struct LocalGeneric; struct RemoteGeneric; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum GenericRunnable { - Available, - Exec(String, Vec), -} - impl Provider for Generic { fn available(host: &H) -> Box> { match host.get_type() { @@ -91,27 +85,38 @@ impl LocalGeneric { impl RemoteGeneric { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Command( - CommandRunnable::Generic( - GenericRunnable::Available)); + let runnable = Request::Command( + CommandRequest::Generic( + GenericRequest::Available)); host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Command::Generic", func: "available" }) + .chain_err(|| ErrorKind::Request { endpoint: "Command::Generic", func: "available" }) } fn exec(host: &Plain, cmd: &str, shell: &[String]) -> Box> { - let runnable = Runnable::Command( - CommandRunnable::Generic( - GenericRunnable::Exec(cmd.into(), shell.to_owned()))); + let runnable = Request::Command( + CommandRequest::Generic( + GenericRequest::Exec(cmd.into(), shell.to_owned()))); host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Command::Generic", func: "exec" }) + .chain_err(|| ErrorKind::Request { endpoint: "Command::Generic", func: "exec" }) } } -impl Executable for GenericRunnable { - fn exec(self, _: &Local, handle: &Handle) -> Box, Error = Error>> { +impl Executable for GenericRequest { + fn exec(self, _: &Local, handle: &Handle) -> ExecutableResult { match self { - GenericRunnable::Available => Box::new(LocalGeneric::available().map(|b| Box::new(b) as Box)), - GenericRunnable::Exec(cmd, shell) => Box::new(LocalGeneric::exec(handle, &cmd, &shell).map(|r| Box::new(r) as Box)), + GenericRequest::Available => Box::new( + LocalGeneric::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Command( + CommandResponse::Available(b)))))), + GenericRequest::Exec(cmd, shell) => Box::new( + LocalGeneric::exec(handle, &cmd, &shell) + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Command( + CommandResponse::Exec(t.into())))) + )) } } } diff --git a/core/src/command/providers/mod.rs b/core/src/command/providers/mod.rs index 427d0b9..a1f0ef6 100644 --- a/core/src/command/providers/mod.rs +++ b/core/src/command/providers/mod.rs @@ -7,13 +7,10 @@ mod generic; use errors::*; -use erased_serde::Serialize; use futures::{future, Future}; use host::Host; -use host::local::Local; use provider::Provider; -use remote::Executable; -pub use self::generic::{Generic, GenericRunnable}; +pub use self::generic::Generic; use super::CommandResult; use tokio_core::reactor::Handle; @@ -21,20 +18,6 @@ pub trait CommandProvider: Provider { fn exec(&self, &H, &Handle, &str, &[String]) -> Box>; } -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum CommandRunnable { - Generic(GenericRunnable) -} - -impl Executable for CommandRunnable { - fn exec(self, host: &Local, handle: &Handle) -> Box, Error = Error>> { - match self { - CommandRunnable::Generic(p) => p.exec(host, handle) - } - } -} - pub fn factory(host: &H) -> Box>, Error = Error>> { Box::new(Generic::try_new(host) .and_then(|opt| match opt { diff --git a/core/src/errors.rs b/core/src/errors.rs index 1e82d60..e85effc 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -26,7 +26,7 @@ error_chain! { display("No providers available for {}", p), } - Runnable { + Request { endpoint: &'static str, func: &'static str, } { diff --git a/core/src/host/remote.rs b/core/src/host/remote.rs index 7f28f4c..22d3ae5 100644 --- a/core/src/host/remote.rs +++ b/core/src/host/remote.rs @@ -7,7 +7,7 @@ use bytes::BytesMut; use errors::*; use futures::{future, Future}; -use remote::Runnable; +use remote::Request; use serde::Deserialize; use serde_json; use std::{io, result}; @@ -81,7 +81,7 @@ impl Plain { } #[doc(hidden)] - pub fn run(&self, provider: Runnable) -> Box> + pub fn run(&self, provider: Request) -> Box> where for<'de> D: Deserialize<'de> { Box::new(self.run_msg::(provider) @@ -89,7 +89,7 @@ impl Plain { } #[doc(hidden)] - pub fn run_msg(&self, provider: Runnable) -> Box, io::Error>>, Error = Error>> + pub fn run_msg(&self, provider: Request) -> Box, io::Error>>, Error = Error>> where for<'de> D: Deserialize<'de> { let value = match serde_json::to_value(provider).chain_err(|| "Could not encode provider to send to host") { diff --git a/core/src/remote.rs b/core/src/remote.rs index 589015e..f712faf 100644 --- a/core/src/remote.rs +++ b/core/src/remote.rs @@ -4,29 +4,151 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use command::providers::CommandRunnable; -use erased_serde::Serialize; +// Hopefully in the near future this will be auto-generated from `derive` attributes. + +use command::CommandResult; use errors::*; use futures::Future; use host::local::Local; -use telemetry::providers::TelemetryRunnable; +use telemetry::serializable::Telemetry; use tokio_core::reactor::Handle; +use tokio_proto::streaming::{Body, Message}; + +pub type ExecutableResult = Box, Error>>, Error = Error>>; pub trait Executable { - fn exec(self, &Local, &Handle) -> Box, Error = Error>>; + fn exec(self, &Local, &Handle) -> ExecutableResult; +} + +#[derive(Serialize, Deserialize)] +pub enum Request { + Command(CommandRequest), + Telemetry(TelemetryRequest), +} + +#[derive(Serialize, Deserialize)] +pub enum Response { + Command(CommandResponse), + Telemetry(TelemetryResponse), +} + +#[derive(Serialize, Deserialize)] +pub enum ResponseResult { + Ok(Response), + Err(String), +} + +impl Executable for Request { + fn exec(self, host: &Local, handle: &Handle) -> ExecutableResult { + match self { + Request::Command(p) => p.exec(host, handle), + Request::Telemetry(p) => p.exec(host, handle), + } + } +} + +// +// Command +// + +#[derive(Serialize, Deserialize)] +pub enum CommandRequest { + Generic(GenericRequest), +} + +#[derive(Serialize, Deserialize)] +pub enum GenericRequest { + Available, + Exec(String, Vec), +} + +#[derive(Serialize, Deserialize)] +pub enum CommandResponse { + Available(bool), + Exec(CommandResult), +} + +impl Executable for CommandRequest { + fn exec(self, host: &Local, handle: &Handle) -> ExecutableResult { + match self { + CommandRequest::Generic(p) => p.exec(host, handle) + } + } +} + +// +// Telemetry +// + +#[derive(Serialize, Deserialize)] +pub enum TelemetryRequest { + Centos(CentosRequest), + Debian(DebianRequest), + Fedora(FedoraRequest), + Freebsd(FreebsdRequest), + Macos(MacosRequest), + Nixos(NixosRequest), + Ubuntu(UbuntuRequest), +} + +#[derive(Serialize, Deserialize)] +pub enum CentosRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum DebianRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum FedoraRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum FreebsdRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum MacosRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum NixosRequest { + Available, + Load, +} + +#[derive(Serialize, Deserialize)] +pub enum UbuntuRequest { + Available, + Load, } #[derive(Serialize, Deserialize)] -pub enum Runnable { - Command(CommandRunnable), - Telemetry(TelemetryRunnable), +pub enum TelemetryResponse { + Available(bool), + Load(Telemetry), } -impl Executable for Runnable { - fn exec(self, host: &Local, handle: &Handle) -> Box, Error = Error>> { +impl Executable for TelemetryRequest { + fn exec(self, host: &Local, handle: &Handle) -> ExecutableResult { match self { - Runnable::Command(p) => p.exec(host, handle), - Runnable::Telemetry(p) => p.exec(host, handle), + TelemetryRequest::Centos(p) => p.exec(host, handle), + TelemetryRequest::Debian(p) => p.exec(host, handle), + TelemetryRequest::Fedora(p) => p.exec(host, handle), + TelemetryRequest::Freebsd(p) => p.exec(host, handle), + TelemetryRequest::Macos(p) => p.exec(host, handle), + TelemetryRequest::Nixos(p) => p.exec(host, handle), + TelemetryRequest::Ubuntu(p) => p.exec(host, handle), } } } diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 80307ac..afe944f 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -7,7 +7,8 @@ //! System generated data about your host. pub mod providers; -mod serializable; +#[doc(hidden)] +pub mod serializable; use pnet::datalink::NetworkInterface; diff --git a/core/src/telemetry/providers/centos.rs b/core/src/telemetry/providers/centos.rs index 7c8af0e..3c68192 100644 --- a/core/src/telemetry/providers/centos.rs +++ b/core/src/telemetry/providers/centos.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -12,25 +11,20 @@ use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; -use remote::{Executable, Runnable}; -use std::{env, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use remote::{CentosRequest, Executable, ExecutableResult, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; +use std::env; +use super::TelemetryProvider; use target::{default, linux, redhat}; use target::linux::LinuxFlavour; use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Centos; struct LocalCentos; struct RemoteCentos; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum CentosRunnable { - Available, - Load, -} - impl Provider for Centos { fn available(host: &H) -> Box> { match host.get_type() { @@ -76,31 +70,39 @@ impl LocalCentos { impl RemoteCentos { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Centos( - CentosRunnable::Available)); + let runnable = Request::Telemetry( + TelemetryRequest::Centos( + CentosRequest::Available)); host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Centos", func: "available" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Centos", func: "available" }) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Centos( - CentosRunnable::Load)); + let runnable = Request::Telemetry( + TelemetryRequest::Centos( + CentosRequest::Load)); Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Centos", func: "load" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Centos", func: "load" }) .map(|t: serializable::Telemetry| Telemetry::from(t))) } } -impl Executable for CentosRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for CentosRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - CentosRunnable::Available => Box::new(LocalCentos::available().map(|b| Box::new(b) as Box)), - CentosRunnable::Load => Box::new(LocalCentos::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + CentosRequest::Available => Box::new( + LocalCentos::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + CentosRequest::Load => Box::new( + LocalCentos::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/debian.rs b/core/src/telemetry/providers/debian.rs index 9ebcca3..3395e70 100644 --- a/core/src/telemetry/providers/debian.rs +++ b/core/src/telemetry/providers/debian.rs @@ -4,33 +4,27 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; -use remote::{Executable, Runnable}; use futures::{future, Future}; use host::{Host, HostType}; use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; +use remote::{DebianRequest, Executable, ExecutableResult, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; use std::{env, process, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Debian; struct LocalDebian; struct RemoteDebian; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum DebianRunnable { - Available, - Load, -} - impl Provider for Debian { fn available(host: &H) -> Box> { match host.get_type() { @@ -76,33 +70,41 @@ impl LocalDebian { impl RemoteDebian { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Debian( - DebianRunnable::Available)); + let runnable = Request::Telemetry( + TelemetryRequest::Debian( + DebianRequest::Available)); host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Debian", func: "available" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Debian", func: "available" }) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Debian( - DebianRunnable::Load)); + let runnable = Request::Telemetry( + TelemetryRequest::Debian( + DebianRequest::Load)); let host = host.clone(); Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Debian", func: "load" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Debian", func: "load" }) .map(|t: serializable::Telemetry| Telemetry::from(t))) } } -impl Executable for DebianRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for DebianRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - DebianRunnable::Available => Box::new(LocalDebian::available().map(|b| Box::new(b) as Box)), - DebianRunnable::Load => Box::new(LocalDebian::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + DebianRequest::Available => Box::new( + LocalDebian::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + DebianRequest::Load => Box::new( + LocalDebian::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/fedora.rs b/core/src/telemetry/providers/fedora.rs index 2d407ca..d3561d6 100644 --- a/core/src/telemetry/providers/fedora.rs +++ b/core/src/telemetry/providers/fedora.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -12,25 +11,20 @@ use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; -use remote::{Executable, Runnable}; -use std::{env, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use remote::{Executable, ExecutableResult, FedoraRequest, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; +use std::env; +use super::TelemetryProvider; use target::{default, linux, redhat}; use target::linux::LinuxFlavour; use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Fedora; struct LocalFedora; struct RemoteFedora; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum FedoraRunnable { - Available, - Load, -} - impl Provider for Fedora { fn available(host: &H) -> Box> { match host.get_type() { @@ -76,33 +70,41 @@ impl LocalFedora { impl RemoteFedora { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Fedora( - FedoraRunnable::Available)); + let runnable = Request::Telemetry( + TelemetryRequest::Fedora( + FedoraRequest::Available)); host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Fedora", func: "available" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Fedora", func: "available" }) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Fedora( - FedoraRunnable::Load)); + let runnable = Request::Telemetry( + TelemetryRequest::Fedora( + FedoraRequest::Load)); let host = host.clone(); Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Fedora", func: "load" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Fedora", func: "load" }) .map(|t: serializable::Telemetry| Telemetry::from(t))) } } -impl Executable for FedoraRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for FedoraRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - FedoraRunnable::Available => Box::new(LocalFedora::available().map(|b| Box::new(b) as Box)), - FedoraRunnable::Load => Box::new(LocalFedora::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + FedoraRequest::Available => Box::new( + LocalFedora::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + FedoraRequest::Load => Box::new( + LocalFedora::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/freebsd.rs b/core/src/telemetry/providers/freebsd.rs index 863b71c..950b182 100644 --- a/core/src/telemetry/providers/freebsd.rs +++ b/core/src/telemetry/providers/freebsd.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -13,25 +12,20 @@ use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; use regex::Regex; -use remote::{Executable, Runnable}; -use std::{env, fs, str}; +use remote::{Executable, ExecutableResult, FreebsdRequest, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; +use std::{env, fs}; use std::io::Read; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, unix}; use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Freebsd; struct LocalFreebsd; struct RemoteFreebsd; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum FreebsdRunnable { - Available, - Load, -} - impl Provider for Freebsd { fn available(host: &H) -> Box> { match host.get_type() { @@ -77,33 +71,41 @@ impl LocalFreebsd { impl RemoteFreebsd { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Freebsd( - FreebsdRunnable::Available)); + let runnable = Request::Telemetry( + TelemetryRequest::Freebsd( + FreebsdRequest::Available)); host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Freebsd", func: "available" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Freebsd", func: "available" }) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Freebsd( - FreebsdRunnable::Load)); + let runnable = Request::Telemetry( + TelemetryRequest::Freebsd( + FreebsdRequest::Load)); let host = host.clone(); Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Freebsd", func: "load" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Freebsd", func: "load" }) .map(|t: serializable::Telemetry| Telemetry::from(t))) } } -impl Executable for FreebsdRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for FreebsdRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - FreebsdRunnable::Available => Box::new(LocalFreebsd::available().map(|b| Box::new(b) as Box)), - FreebsdRunnable::Load => Box::new(LocalFreebsd::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + FreebsdRequest::Available => Box::new( + LocalFreebsd::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + FreebsdRequest::Load => Box::new( + LocalFreebsd::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/macos.rs b/core/src/telemetry/providers/macos.rs index 71a61dc..000497c 100644 --- a/core/src/telemetry/providers/macos.rs +++ b/core/src/telemetry/providers/macos.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -12,24 +11,19 @@ use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; -use remote::{Executable, Runnable}; +use remote::{Executable, ExecutableResult, MacosRequest, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; use std::{env, process, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, unix}; use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Macos; struct LocalMacos; struct RemoteMacos; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum MacosRunnable { - Available, - Load, -} - impl Provider for Macos { fn available(host: &H) -> Box> { match host.get_type() { @@ -75,33 +69,41 @@ impl LocalMacos { impl RemoteMacos { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Macos( - MacosRunnable::Available)); + let runnable = Request::Telemetry( + TelemetryRequest::Macos( + MacosRequest::Available)); host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Macos", func: "available" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Macos", func: "available" }) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Macos( - MacosRunnable::Load)); + let runnable = Request::Telemetry( + TelemetryRequest::Macos( + MacosRequest::Load)); let host = host.clone(); Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Macos", func: "load" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Macos", func: "load" }) .map(|t: serializable::Telemetry| Telemetry::from(t))) } } -impl Executable for MacosRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for MacosRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - MacosRunnable::Available => Box::new(LocalMacos::available().map(|b| Box::new(b) as Box)), - MacosRunnable::Load => Box::new(LocalMacos::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + MacosRequest::Available => Box::new( + LocalMacos::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + MacosRequest::Load => Box::new( + LocalMacos::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/mod.rs b/core/src/telemetry/providers/mod.rs index d8d114c..e421cef 100644 --- a/core/src/telemetry/providers/mod.rs +++ b/core/src/telemetry/providers/mod.rs @@ -12,54 +12,24 @@ mod macos; mod nixos; mod ubuntu; -pub use self::centos::{Centos, CentosRunnable}; -pub use self::debian::{Debian, DebianRunnable}; -pub use self::fedora::{Fedora, FedoraRunnable}; -pub use self::freebsd::{Freebsd, FreebsdRunnable}; -pub use self::macos::{Macos, MacosRunnable}; -pub use self::nixos::{Nixos, NixosRunnable}; -pub use self::ubuntu::{Ubuntu, UbuntuRunnable}; +pub use self::centos::Centos; +pub use self::debian::Debian; +pub use self::fedora::Fedora; +pub use self::freebsd::Freebsd; +pub use self::macos::Macos; +pub use self::nixos::Nixos; +pub use self::ubuntu::Ubuntu; -use erased_serde::Serialize; use errors::*; use futures::future::{self, Future}; use host::Host; -use host::local::Local; use provider::Provider; -use remote::Executable; use super::Telemetry; -use tokio_core::reactor::Handle; pub trait TelemetryProvider: Provider { fn load(&self, host: &H) -> Box>; } -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum TelemetryRunnable { - Centos(CentosRunnable), - Debian(DebianRunnable), - Fedora(FedoraRunnable), - Freebsd(FreebsdRunnable), - Macos(MacosRunnable), - Nixos(NixosRunnable), - Ubuntu(UbuntuRunnable), -} - -impl Executable for TelemetryRunnable { - fn exec(self, host: &Local, handle: &Handle) -> Box, Error = Error>> { - match self { - TelemetryRunnable::Centos(p) => p.exec(host, handle), - TelemetryRunnable::Debian(p) => p.exec(host, handle), - TelemetryRunnable::Fedora(p) => p.exec(host, handle), - TelemetryRunnable::Freebsd(p) => p.exec(host, handle), - TelemetryRunnable::Macos(p) => p.exec(host, handle), - TelemetryRunnable::Nixos(p) => p.exec(host, handle), - TelemetryRunnable::Ubuntu(p) => p.exec(host, handle), - } - } -} - pub fn factory(host: &H) -> Box> { let mut providers: Vec>> = Vec::new(); diff --git a/core/src/telemetry/providers/nixos.rs b/core/src/telemetry/providers/nixos.rs index 39af561..1576e1c 100644 --- a/core/src/telemetry/providers/nixos.rs +++ b/core/src/telemetry/providers/nixos.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -12,25 +11,20 @@ use host::local::Local; use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; -use remote::{Executable, Runnable}; +use remote::{Executable, ExecutableResult, NixosRequest, Request, Response, + ResponseResult, TelemetryRequest, TelemetryResponse}; use std::{env, process, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Nixos; struct LocalNixos; struct RemoteNixos; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum NixosRunnable { - Available, - Load, -} - impl Provider for Nixos { fn available(host: &H) -> Box> { match host.get_type() { @@ -76,33 +70,41 @@ impl LocalNixos { impl RemoteNixos { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Nixos( - NixosRunnable::Available)); + let runnable = Request::Telemetry( + TelemetryRequest::Nixos( + NixosRequest::Available)); host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Nixos", func: "available" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Nixos", func: "available" }) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Nixos( - NixosRunnable::Load)); + let runnable = Request::Telemetry( + TelemetryRequest::Nixos( + NixosRequest::Load)); let host = host.clone(); Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Nixos", func: "load" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Nixos", func: "load" }) .map(|t: serializable::Telemetry| Telemetry::from(t))) } } -impl Executable for NixosRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for NixosRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - NixosRunnable::Available => Box::new(LocalNixos::available().map(|b| Box::new(b) as Box)), - NixosRunnable::Load => Box::new(LocalNixos::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + NixosRequest::Available => Box::new( + LocalNixos::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + NixosRequest::Load => Box::new( + LocalNixos::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } diff --git a/core/src/telemetry/providers/ubuntu.rs b/core/src/telemetry/providers/ubuntu.rs index 61b7b11..9a97b9c 100644 --- a/core/src/telemetry/providers/ubuntu.rs +++ b/core/src/telemetry/providers/ubuntu.rs @@ -4,7 +4,6 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use erased_serde::Serialize; use errors::*; use futures::{future, Future}; use host::{Host, HostType}; @@ -13,25 +12,20 @@ use host::remote::Plain; use pnet::datalink::interfaces; use provider::Provider; use regex::Regex; -use remote::{Executable, Runnable}; +use remote::{Executable, ExecutableResult, Request, Response, ResponseResult, + TelemetryRequest, TelemetryResponse, UbuntuRequest}; use std::{env, process, str}; -use super::{TelemetryProvider, TelemetryRunnable}; +use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; use tokio_core::reactor::Handle; +use tokio_proto::streaming::Message; pub struct Ubuntu; struct LocalUbuntu; struct RemoteUbuntu; -#[doc(hidden)] -#[derive(Serialize, Deserialize)] -pub enum UbuntuRunnable { - Available, - Load, -} - impl Provider for Ubuntu { fn available(host: &H) -> Box> { match host.get_type() { @@ -77,33 +71,41 @@ impl LocalUbuntu { impl RemoteUbuntu { fn available(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Ubuntu( - UbuntuRunnable::Available)); + let runnable = Request::Telemetry( + TelemetryRequest::Ubuntu( + UbuntuRequest::Available)); host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Ubuntu", func: "available" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Ubuntu", func: "available" }) } fn load(host: &Plain) -> Box> { - let runnable = Runnable::Telemetry( - TelemetryRunnable::Ubuntu( - UbuntuRunnable::Load)); + let runnable = Request::Telemetry( + TelemetryRequest::Ubuntu( + UbuntuRequest::Load)); let host = host.clone(); Box::new(host.run(runnable) - .chain_err(|| ErrorKind::Runnable { endpoint: "Telemetry::Ubuntu", func: "load" }) + .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Ubuntu", func: "load" }) .map(|t: serializable::Telemetry| Telemetry::from(t))) } } -impl Executable for UbuntuRunnable { - fn exec(self, _: &Local, _: &Handle) -> Box, Error = Error>> { +impl Executable for UbuntuRequest { + fn exec(self, _: &Local, _: &Handle) -> ExecutableResult { match self { - UbuntuRunnable::Available => Box::new(LocalUbuntu::available().map(|b| Box::new(b) as Box)), - UbuntuRunnable::Load => Box::new(LocalUbuntu::load().map(|t| { - let t: serializable::Telemetry = t.into(); - Box::new(t) as Box - })) + UbuntuRequest::Available => Box::new( + LocalUbuntu::available() + .map(|b| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Available(b)))))), + UbuntuRequest::Load => Box::new( + LocalUbuntu::load() + .map(|t| Message::WithoutBody( + ResponseResult::Ok( + Response::Telemetry( + TelemetryResponse::Load(t.into())))) + )) } } } From 6f0996e5bbf03c98beb501dfdaa68ced717591ef Mon Sep 17 00:00:00 2001 From: Pete Hayes Date: Mon, 23 Oct 2017 19:06:15 +0100 Subject: [PATCH 4/6] Streaming Command and fixes for streaming protocol. Closes intecture/api#40. --- agent/src/errors.rs | 32 ++++++ agent/src/main.rs | 34 +++--- core/examples/basic.rs | 43 ++++++- core/examples/remote_host.rs | 16 +++ core/src/command/mod.rs | 14 ++- core/src/command/providers/generic.rs | 146 +++++++++++++++++++----- core/src/command/providers/mod.rs | 9 +- core/src/errors.rs | 5 + core/src/host/mod.rs | 2 +- core/src/host/remote.rs | 145 +++++++++++++---------- core/src/remote.rs | 6 +- core/src/telemetry/providers/centos.rs | 15 ++- core/src/telemetry/providers/debian.rs | 17 ++- core/src/telemetry/providers/fedora.rs | 17 ++- core/src/telemetry/providers/freebsd.rs | 17 ++- core/src/telemetry/providers/macos.rs | 17 ++- core/src/telemetry/providers/nixos.rs | 17 ++- core/src/telemetry/providers/ubuntu.rs | 17 ++- 18 files changed, 410 insertions(+), 159 deletions(-) diff --git a/agent/src/errors.rs b/agent/src/errors.rs index 4470628..c297fbb 100644 --- a/agent/src/errors.rs +++ b/agent/src/errors.rs @@ -4,10 +4,42 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. +use futures::Future; use intecture_api; +use std::{convert, error, io}; error_chain! { links { Api(intecture_api::errors::Error, intecture_api::errors::ErrorKind); } } + +impl convert::From for io::Error { + fn from(e: Error) -> io::Error { + // @todo Return whole error chain + io::Error::new(io::ErrorKind::Other, e.description()) + } +} + +// @todo This should disappear once Futures are officially supported +// by error_chain. +// See: https://github.com/rust-lang-nursery/error-chain/issues/90 +pub type SFuture = Box>; + +pub trait FutureChainErr { + fn chain_err(self, callback: F) -> SFuture + where F: FnOnce() -> E + 'static, + E: Into; +} + +impl FutureChainErr for F + where F: Future + 'static, + F::Error: error::Error + Send + 'static, +{ + fn chain_err(self, callback: C) -> SFuture + where C: FnOnce() -> E + 'static, + E: Into, + { + Box::new(self.then(|r| r.chain_err(callback))) + } +} diff --git a/agent/src/main.rs b/agent/src/main.rs index 0cc5022..29d34e8 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -20,9 +20,9 @@ mod errors; use errors::*; use futures::{future, Future}; -use intecture_api::remote::{Executable, Runnable}; use intecture_api::host::local::Local; use intecture_api::host::remote::{JsonLineProto, LineMessage}; +use intecture_api::remote::{Executable, Request}; use std::fs::File; use std::io::{self, Read}; use std::net::SocketAddr; @@ -40,7 +40,7 @@ pub struct Api { impl Service for Api { type Request = LineMessage; type Response = LineMessage; - type Error = io::Error; + type Error = Error; type Future = Box>; fn call(&self, req: Self::Request) -> Self::Future { @@ -49,15 +49,9 @@ impl Service for Api { Message::WithoutBody(req) => req, }; - let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") { + let request: Request = match serde_json::from_value(req).chain_err(|| "Received invalid Request") { Ok(r) => r, - Err(e) => return Box::new( - future::err( - io::Error::new( - // @todo Can't wrap 'e' as error_chain Error doesn't derive Sync. - // Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163 - io::ErrorKind::Other, e.description() - ))), + Err(e) => return Box::new(future::err(e)) }; // XXX Danger zone! If we're running multiple threads, this `unwrap()` @@ -66,13 +60,17 @@ impl Service for Api { // only safe for the current thread. // See https://github.com/alexcrichton/tokio-process/issues/23 let handle = self.remote.handle().unwrap(); - Box::new(runnable.exec(&self.host, &handle) - // @todo Can't wrap 'e' as error_chain Error doesn't derive Sync. - // Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163 - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.description())) - .and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") { - Ok(v) => future::ok(Message::WithoutBody(v)), - Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())), + Box::new(request.exec(&self.host, &handle) + .chain_err(|| "Failed to execute Request") + .and_then(|mut msg| { + let body = msg.take_body(); + match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize result") { + Ok(v) => match body { + Some(b) => future::ok(Message::WithBody(v, b)), + None => future::ok(Message::WithoutBody(v)), + }, + Err(e) => future::err(e), + } })) } } @@ -80,7 +78,7 @@ impl Service for Api { impl NewService for Api { type Request = LineMessage; type Response = LineMessage; - type Error = io::Error; + type Error = Error; type Instance = Api; fn new_service(&self) -> io::Result { Ok(Api { diff --git a/core/examples/basic.rs b/core/examples/basic.rs index 85e408b..098526e 100644 --- a/core/examples/basic.rs +++ b/core/examples/basic.rs @@ -8,21 +8,58 @@ extern crate futures; extern crate intecture_api; extern crate tokio_core; -use futures::Future; +use futures::{Future, Stream}; use intecture_api::prelude::*; use tokio_core::reactor::Core; fn main() { + // These two lines are part of `tokio-core` and can be safely + // ignored. So long as they appear at the top of your code, + // all is fine with the world. let mut core = Core::new().unwrap(); let handle = core.handle(); + // Here's the meat of your project. In this example we're talking + // to our local machine, so we use the `Local` host type. let host = Local::new().and_then(|host| { + // Ok, we're in! Now we can pass our `host` handle to other + // endpoints, which informs them of the server we mean to + // talk to. + + // Let's start with something basic - a shell command. Command::new(&host, "whoami", None).and_then(|cmd| { - cmd.exec(&handle).map(|out| { - println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim()); + // Now that we have our `Command` instance, let's run it. + cmd.exec(&handle).and_then(|(stream, status)| { + // At this point, our command is running. As the API + // is asynchronous, we don't have to wait for it to + // finish before inspecting its output. This is called + // "streaming". + + // Our first argument, `stream`, is a stream of strings, + // each of which represents a line of output. We can use + // the `for_each` combinator to print these lines to + // stdout. + // + // If printing isn't your thing, you are also + // free to lick them or whatever you're into. I'm not + // here to judge. + stream.for_each(|line| { println!("{}", line); Ok(()) }) + + // The second argument is a `Future` that represents the + // command's exit status. Let's print that too*. + // + // * Same caveat as above RE: printing. This is a safe + // place. + .join(status.map(|s| println!("This command {} {}", + if s.success { "succeeded" } else { "failed" }, + if let Some(e) = s.code { format!("with code {}", e) } else { String::new() }))) }) }) }); + // This line is part of `tokio-core` and is used to execute the + // chain of futures you've created above. You'll need to call + // `core.run()` for each host you interact with, otherwise your + // project will not run at all! core.run(host).unwrap(); } diff --git a/core/examples/remote_host.rs b/core/examples/remote_host.rs index d62e5ef..d447cce 100644 --- a/core/examples/remote_host.rs +++ b/core/examples/remote_host.rs @@ -13,12 +13,28 @@ use intecture_api::prelude::*; use tokio_core::reactor::Core; fn main() { + // These two lines are part of `tokio-core` and can be safely + // ignored. So long as they appear at the top of your code, + // all is fine with the world. let mut core = Core::new().unwrap(); let handle = core.handle(); + // Here's the meat of your project. In this example we're talking + // to a remote machine. You'll note that this is the `Plain` host + // type, where you might have been expecting `Remote` or some such. + // This is to signify that this host type sends data in the clear, + // rather than encrypting it. Thus the usual disclaimer about + // secure networks and trust applies. let host = Plain::connect("127.0.0.1:7101", &handle).map(|host| { + // Ok, we're in! Now we can pass our `host` handle to other + // endpoints, which informs them of the server we mean to + // talk to. See basic.rs for more usage. println!("Connected to {}", host.telemetry().hostname); }); + // This line is part of `tokio-core` and is used to execute the + // chain of futures you've created above. You'll need to call + // `core.run()` for each host you interact with, otherwise your + // project will not run at all! core.run(host).unwrap(); } diff --git a/core/src/command/mod.rs b/core/src/command/mod.rs index ebc4818..1c4a5d5 100644 --- a/core/src/command/mod.rs +++ b/core/src/command/mod.rs @@ -10,6 +10,7 @@ pub mod providers; use errors::*; use futures::{future, Future}; +use futures::stream::Stream; use host::Host; use self::providers::CommandProvider; use tokio_core::reactor::Handle; @@ -27,11 +28,9 @@ pub struct Command { } #[derive(Debug, Serialize, Deserialize)] -pub struct CommandResult { +pub struct ExitStatus { pub success: bool, - pub exit_code: Option, - pub stdout: Vec, - pub stderr: Vec, + pub code: Option, } impl Command { @@ -68,7 +67,12 @@ impl Command { } } - pub fn exec(&self, handle: &Handle) -> Box> { + pub fn exec(&self, handle: &Handle) -> + Box>, + Box> + ), Error = Error>> + { self.inner.exec(&self.host, handle, &self.cmd, &self.shell) } } diff --git a/core/src/command/providers/generic.rs b/core/src/command/providers/generic.rs index 51c48d5..87a9eb1 100644 --- a/core/src/command/providers/generic.rs +++ b/core/src/command/providers/generic.rs @@ -4,20 +4,27 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. -use command::CommandResult; +use command::providers::ExitStatus; use errors::*; use futures::{future, Future}; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::sync::{mpsc, oneshot}; use host::{Host, HostType}; use host::local::Local; use host::remote::Plain; use provider::Provider; use remote::{CommandRequest, CommandResponse, Executable, ExecutableResult, GenericRequest, Request, Response, ResponseResult}; -use std::process; +use serde_json; +use std::io::{self, BufReader}; +use std::process::{Command, Stdio}; +use std::result; use super::CommandProvider; use tokio_core::reactor::Handle; +use tokio_io::io::lines; use tokio_process::CommandExt; -use tokio_proto::streaming::Message; +use tokio_proto::streaming::{Body, Message}; #[derive(Clone)] pub struct Generic; @@ -46,7 +53,12 @@ impl Provider for Generic { } impl CommandProvider for Generic { - fn exec(&self, host: &H, handle: &Handle, cmd: &str, shell: &[String]) -> Box> { + fn exec(&self, host: &H, handle: &Handle, cmd: &str, shell: &[String]) -> + Box>, + Box> + ), Error = Error>> + { match host.get_type() { HostType::Local(_) => LocalGeneric::exec(handle, cmd, shell), HostType::Remote(r) => RemoteGeneric::exec(r, cmd, shell), @@ -59,7 +71,12 @@ impl LocalGeneric { Box::new(future::ok(cfg!(unix))) } - fn exec(handle: &Handle, cmd: &str, shell: &[String]) -> Box> { + fn exec(handle: &Handle, cmd: &str, shell: &[String]) -> + Box>, + Box> + ), Error = Error>> + { let cmd = cmd.to_owned(); let shell = shell.to_owned(); let (shell, shell_args) = match shell.split_first() { @@ -67,19 +84,34 @@ impl LocalGeneric { None => return Box::new(future::err("Invalid shell provided".into())), }; - Box::new(process::Command::new(shell) + let child = Command::new(shell) .args(shell_args) .arg(&cmd) - .output_async(handle) - .chain_err(|| "Command execution failed") - .and_then(|output| { - future::ok(CommandResult { - success: output.status.success(), - exit_code: output.status.code(), - stdout: output.stdout, - stderr: output.stderr, - }) - })) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn_async(handle) + .chain_err(|| "Command execution failed"); + let mut child = match child { + Ok(c) => c, + Err(e) => return Box::new(future::err(e)), + }; + + let stdout = child.stdout().take().unwrap(); + let outbuf = BufReader::new(stdout); + let stderr = child.stderr().take().unwrap(); + let errbuf = BufReader::new(stderr); + let lines = lines(outbuf).select(lines(errbuf)); + + Box::new(future::ok( + (Box::new(lines.then(|r| r.chain_err(|| "Command execution failed"))) as Box>, + Box::new(child.then(|r| match r.chain_err(|| "Command execution failed") { + Ok(c) => future::ok(ExitStatus { + success: c.success(), + code: c.code(), + }), + Err(e) => future::err(e) + })) as Box>) + )) } } @@ -88,16 +120,57 @@ impl RemoteGeneric { let runnable = Request::Command( CommandRequest::Generic( GenericRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Command::Generic", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Command(CommandResponse::Available(b)) => b, + _ => unreachable!(), + })) } - fn exec(host: &Plain, cmd: &str, shell: &[String]) -> Box> { + fn exec(host: &Plain, cmd: &str, shell: &[String]) -> + Box>, + Box> + ), Error = Error>> + { let runnable = Request::Command( CommandRequest::Generic( GenericRequest::Exec(cmd.into(), shell.to_owned()))); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Command::Generic", func: "exec" }) + .map(|mut msg| { + let (tx, rx) = oneshot::channel::(); + let mut tx_share = Some(tx); + let mut found = false; + ( + Box::new(msg.take_body() + .expect("Command::exec reply missing body stream") + .filter_map(move |v| { + let s = String::from_utf8_lossy(&v).to_string(); + + // @todo This is a heuristical approach which is fallible + if !found && s.starts_with("ExitStatus:") { + let (_, json) = s.split_at(11); + match serde_json::from_str(json) { + Ok(status) => { + // @todo What should happen if this fails? + let _ = tx_share.take().unwrap().send(status); + found = true; + return None; + }, + _ => (), + } + } + + Some(s) + }) + .then(|r| r.chain_err(|| "Command execution failed")) + ) as Box>, + Box::new(rx.chain_err(|| "Buffer dropped before ExitStatus was sent")) + as Box> + ) + })) } } @@ -110,13 +183,34 @@ impl Executable for GenericRequest { ResponseResult::Ok( Response::Command( CommandResponse::Available(b)))))), - GenericRequest::Exec(cmd, shell) => Box::new( - LocalGeneric::exec(handle, &cmd, &shell) - .map(|t| Message::WithoutBody( - ResponseResult::Ok( - Response::Command( - CommandResponse::Exec(t.into())))) - )) + GenericRequest::Exec(cmd, shell) => { + let handle = handle.clone(); + Box::new(LocalGeneric::exec(&handle, &cmd, &shell) + .and_then(move |(lines, status)| { + let (tx1, body) = Body::pair(); + let tx2 = tx1.clone(); + let msg = Message::WithBody(ResponseResult::Ok(Response::Command(CommandResponse::Exec)), body); + let stream = lines.map(|s| Ok(s.into_bytes())) + .forward(tx1.sink_map_err(|e| Error::with_chain(e, "Could not forward command output to Body"))) + .join(status.and_then(|s| match serde_json::to_string(&s) + .chain_err(|| "Could not serialize `ExitStatus` struct") + { + Ok(s) => { + let mut frame = "ExitStatus:".to_owned(); + frame.push_str(&s); + Box::new(tx2.send(Ok(frame.into_bytes())) + .map_err(|e| Error::with_chain(e, "Could not forward command output to Body")) + ) as Box, io::Error>>, Error = Error>> + }, + Err(e) => Box::new(future::err(e)), + })) + // @todo We should repatriate these errors somehow + .map(|_| ()) + .map_err(|_| ()); + handle.spawn(stream); + future::ok(msg) + })) + }, } } } diff --git a/core/src/command/providers/mod.rs b/core/src/command/providers/mod.rs index a1f0ef6..58f54ce 100644 --- a/core/src/command/providers/mod.rs +++ b/core/src/command/providers/mod.rs @@ -6,16 +6,21 @@ mod generic; +use command::ExitStatus; use errors::*; use futures::{future, Future}; +use futures::stream::Stream; use host::Host; use provider::Provider; pub use self::generic::Generic; -use super::CommandResult; use tokio_core::reactor::Handle; pub trait CommandProvider: Provider { - fn exec(&self, &H, &Handle, &str, &[String]) -> Box>; + fn exec(&self, &H, &Handle, &str, &[String]) -> + Box>, + Box> + ), Error = Error>>; } pub fn factory(host: &H) -> Box>, Error = Error>> { diff --git a/core/src/errors.rs b/core/src/errors.rs index e85effc..7a0efb4 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -34,6 +34,11 @@ error_chain! { display("Could not run {}::{}() on host", endpoint, func), } + Remote(e: String) { + description("Error running command on remote host"), + display("Error running command on remote host: {}", e), + } + SystemCommand(c: &'static str) { description("Error running system command"), display("Error running system command '{}'", c), diff --git a/core/src/host/mod.rs b/core/src/host/mod.rs index 2d17b27..c8dc589 100644 --- a/core/src/host/mod.rs +++ b/core/src/host/mod.rs @@ -15,7 +15,7 @@ use futures::Future; use telemetry::Telemetry; pub trait Host: Clone { - //// Retrieve Telemetry + /// Retrieve Telemetry fn telemetry(&self) -> &Telemetry; #[doc(hidden)] fn get_type<'a>(&'a self) -> HostType<'a>; diff --git a/core/src/host/remote.rs b/core/src/host/remote.rs index 22d3ae5..85ce4fe 100644 --- a/core/src/host/remote.rs +++ b/core/src/host/remote.rs @@ -7,8 +7,7 @@ use bytes::BytesMut; use errors::*; use futures::{future, Future}; -use remote::Request; -use serde::Deserialize; +use remote::{Request, Response, ResponseResult}; use serde_json; use std::{io, result}; use std::sync::Arc; @@ -81,34 +80,8 @@ impl Plain { } #[doc(hidden)] - pub fn run(&self, provider: Request) -> Box> - where for<'de> D: Deserialize<'de> - { - Box::new(self.run_msg::(provider) - .map(|msg| msg.into_inner())) - } - - #[doc(hidden)] - pub fn run_msg(&self, provider: Request) -> Box, io::Error>>, Error = Error>> - where for<'de> D: Deserialize<'de> - { - let value = match serde_json::to_value(provider).chain_err(|| "Could not encode provider to send to host") { - Ok(v) => v, - Err(e) => return Box::new(future::err(e)) - }; - Box::new(self.inner.inner.call(Message::WithoutBody(value)) - .chain_err(|| "Error while running provider on host") - .and_then(|mut msg| { - let body = msg.take_body(); - let msg = match serde_json::from_value::(msg.into_inner()).chain_err(|| "Could not understand response from host") { - Ok(d) => d, - Err(e) => return Box::new(future::err(e)), - }; - Box::new(future::ok(match body { - Some(b) => Message::WithBody(msg, b), - None => Message::WithoutBody(msg), - })) - })) + pub fn call_req(&self, request: Request) -> Box, io::Error>>, Error = Error>> { + self.call(Message::WithoutBody(request)) } } @@ -123,13 +96,49 @@ impl Host for Plain { } impl Service for Plain { - type Request = LineMessage; - type Response = LineMessage; - type Error = io::Error; + type Request = Message, io::Error>>; + type Response = Message, io::Error>>; + type Error = Error; type Future = Box>; - fn call(&self, req: Self::Request) -> Self::Future { - Box::new(self.inner.inner.call(req)) as Self::Future + fn call(&self, mut req: Self::Request) -> Self::Future { + let body = req.take_body(); + let request = req.into_inner(); + + let value = match serde_json::to_value(request).chain_err(|| "Could not encode provider to send to host") { + Ok(v) => v, + Err(e) => return Box::new(future::err(e)) + }; + + debug!("Sending JSON request: {}", value); + + let json_msg = match body { + Some(b) => Message::WithBody(value, b), + None => Message::WithoutBody(value), + }; + + Box::new(self.inner.inner.call(json_msg) + .chain_err(|| "Error while running provider on host") + .and_then(|mut msg| { + let body = msg.take_body(); + let header = msg.into_inner(); + + debug!("Received JSON response: {}", header); + + let result: ResponseResult = match serde_json::from_value(header).chain_err(|| "Could not understand response from host") { + Ok(d) => d, + Err(e) => return Box::new(future::err(e)), + }; + + let msg = match result { + ResponseResult::Ok(msg) => msg, + ResponseResult::Err(e) => return Box::new(future::err(ErrorKind::Remote(e).into())), + }; + Box::new(future::ok(match body { + Some(b) => Message::WithBody(msg, b), + None => Message::WithoutBody(msg), + })) + })) } } @@ -145,33 +154,45 @@ impl Decoder for JsonLineCodec { buf.split_to(1); - if line.is_empty() { - let decoding_head = self.decoding_head; - self.decoding_head = !decoding_head; + if self.decoding_head { + debug!("Decoding header: {:?}", line); - if decoding_head { - Ok(Some(Frame::Message { - message: serde_json::Value::Null, - body: true, - })) - } else { - Ok(Some(Frame::Body { - chunk: None - })) + // The last byte in this frame is a bool that indicates + // whether we have a body stream following or not. + // This byte must exist, or our codec is buggered and + // panicking is appropriate. + let (has_body, line) = line.split_last() + .expect("Missing body byte at end of message frame"); + + debug!("Body byte: {:?}", has_body); + + if *has_body == 1 { + self.decoding_head = false; } + + let frame = Frame::Message { + message: serde_json::from_slice(&line).map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + })?, + body: *has_body == 1, + }; + + debug!("Decoded header: {:?}", frame); + + Ok(Some(frame)) } else { - if self.decoding_head { - Ok(Some(Frame::Message { - message: serde_json::from_slice(&line).map_err(|e| { - io::Error::new(io::ErrorKind::Other, e) - })?, - body: false, - })) + debug!("Decoding body chunk: {:?}", line); + + let frame = if line.is_empty() { + self.decoding_head = true; + Frame::Body { chunk: None } } else { - Ok(Some(Frame::Body { - chunk: Some(line.to_vec()), - })) - } + Frame::Body { chunk: Some(line.to_vec()) } + }; + + debug!("Decoded body chunk: {:?}", frame); + + Ok(Some(frame)) } } } @@ -183,15 +204,17 @@ impl Encoder for JsonLineCodec { fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> { match msg { Frame::Message { message, body } => { - // Our protocol dictates that a message head that - // includes a streaming body is an empty string. - assert!(message.is_null() == body); + debug!("Encoding header: {:?}, {:?}", message, body); let json = serde_json::to_vec(&message) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; buf.extend(&json); + // Add 'has-body' flag + buf.extend(if body { &[1] } else { &[0] }); } Frame::Body { chunk } => { + debug!("Encoding chunk: {:?}", chunk); + if let Some(chunk) = chunk { buf.extend(&chunk); } diff --git a/core/src/remote.rs b/core/src/remote.rs index f712faf..f02def4 100644 --- a/core/src/remote.rs +++ b/core/src/remote.rs @@ -6,15 +6,15 @@ // Hopefully in the near future this will be auto-generated from `derive` attributes. -use command::CommandResult; use errors::*; use futures::Future; use host::local::Local; +use std::io; use telemetry::serializable::Telemetry; use tokio_core::reactor::Handle; use tokio_proto::streaming::{Body, Message}; -pub type ExecutableResult = Box, Error>>, Error = Error>>; +pub type ExecutableResult = Box, io::Error>>, Error = Error>>; pub trait Executable { fn exec(self, &Local, &Handle) -> ExecutableResult; @@ -65,7 +65,7 @@ pub enum GenericRequest { #[derive(Serialize, Deserialize)] pub enum CommandResponse { Available(bool), - Exec(CommandResult), + Exec, } impl Executable for CommandRequest { diff --git a/core/src/telemetry/providers/centos.rs b/core/src/telemetry/providers/centos.rs index 3c68192..23f0e4c 100644 --- a/core/src/telemetry/providers/centos.rs +++ b/core/src/telemetry/providers/centos.rs @@ -17,7 +17,7 @@ use std::env; use super::TelemetryProvider; use target::{default, linux, redhat}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -73,17 +73,24 @@ impl RemoteCentos { let runnable = Request::Telemetry( TelemetryRequest::Centos( CentosRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Centos", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Centos( CentosRequest::Load)); - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Centos", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/debian.rs b/core/src/telemetry/providers/debian.rs index 3395e70..846bdf1 100644 --- a/core/src/telemetry/providers/debian.rs +++ b/core/src/telemetry/providers/debian.rs @@ -17,7 +17,7 @@ use std::{env, process, str}; use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -73,19 +73,24 @@ impl RemoteDebian { let runnable = Request::Telemetry( TelemetryRequest::Debian( DebianRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Debian", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Debian( DebianRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Debian", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/fedora.rs b/core/src/telemetry/providers/fedora.rs index d3561d6..806aa08 100644 --- a/core/src/telemetry/providers/fedora.rs +++ b/core/src/telemetry/providers/fedora.rs @@ -17,7 +17,7 @@ use std::env; use super::TelemetryProvider; use target::{default, linux, redhat}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -73,19 +73,24 @@ impl RemoteFedora { let runnable = Request::Telemetry( TelemetryRequest::Fedora( FedoraRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Fedora", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Fedora( FedoraRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Fedora", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/freebsd.rs b/core/src/telemetry/providers/freebsd.rs index 950b182..e3ef5dc 100644 --- a/core/src/telemetry/providers/freebsd.rs +++ b/core/src/telemetry/providers/freebsd.rs @@ -18,7 +18,7 @@ use std::{env, fs}; use std::io::Read; use super::TelemetryProvider; use target::{default, unix}; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -74,19 +74,24 @@ impl RemoteFreebsd { let runnable = Request::Telemetry( TelemetryRequest::Freebsd( FreebsdRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Freebsd", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Freebsd( FreebsdRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Freebsd", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/macos.rs b/core/src/telemetry/providers/macos.rs index 000497c..a646368 100644 --- a/core/src/telemetry/providers/macos.rs +++ b/core/src/telemetry/providers/macos.rs @@ -16,7 +16,7 @@ use remote::{Executable, ExecutableResult, MacosRequest, Request, Response, use std::{env, process, str}; use super::TelemetryProvider; use target::{default, unix}; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -72,19 +72,24 @@ impl RemoteMacos { let runnable = Request::Telemetry( TelemetryRequest::Macos( MacosRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Macos", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Macos( MacosRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Macos", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/nixos.rs b/core/src/telemetry/providers/nixos.rs index 1576e1c..d220137 100644 --- a/core/src/telemetry/providers/nixos.rs +++ b/core/src/telemetry/providers/nixos.rs @@ -17,7 +17,7 @@ use std::{env, process, str}; use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -73,19 +73,24 @@ impl RemoteNixos { let runnable = Request::Telemetry( TelemetryRequest::Nixos( NixosRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Nixos", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Nixos( NixosRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Nixos", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } diff --git a/core/src/telemetry/providers/ubuntu.rs b/core/src/telemetry/providers/ubuntu.rs index 9a97b9c..43cb432 100644 --- a/core/src/telemetry/providers/ubuntu.rs +++ b/core/src/telemetry/providers/ubuntu.rs @@ -18,7 +18,7 @@ use std::{env, process, str}; use super::TelemetryProvider; use target::{default, linux}; use target::linux::LinuxFlavour; -use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable}; +use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry}; use tokio_core::reactor::Handle; use tokio_proto::streaming::Message; @@ -74,19 +74,24 @@ impl RemoteUbuntu { let runnable = Request::Telemetry( TelemetryRequest::Ubuntu( UbuntuRequest::Available)); - host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Ubuntu", func: "available" }) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Available(b)) => b, + _ => unreachable!(), + })) } fn load(host: &Plain) -> Box> { let runnable = Request::Telemetry( TelemetryRequest::Ubuntu( UbuntuRequest::Load)); - let host = host.clone(); - - Box::new(host.run(runnable) + Box::new(host.call_req(runnable) .chain_err(|| ErrorKind::Request { endpoint: "Telemetry::Ubuntu", func: "load" }) - .map(|t: serializable::Telemetry| Telemetry::from(t))) + .map(|msg| match msg.into_inner() { + Response::Telemetry(TelemetryResponse::Load(t)) => Telemetry::from(t), + _ => unreachable!(), + })) } } From 67ded1ac53d4eb06a8d64c479206e10df102a1c4 Mon Sep 17 00:00:00 2001 From: Pete Hayes Date: Mon, 23 Oct 2017 19:38:07 +0100 Subject: [PATCH 5/6] Update RELEASES.md to reflect shiny new streaming proto --- RELEASES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASES.md b/RELEASES.md index cb871fc..b1c225c 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -5,6 +5,6 @@ Version 0.4 is a complete rewrite of Intecture's core API. If we had been runnin Anyway, there are some big headlines in this release: - **Hello Tokio!** This is an exciting change to our socket API, as we bid a fond farewell to ZeroMQ. The change to Tokio will bring loads of benefits, such as substantially reducing our non-Rust dependencies, and introducing strongly typed messaging between servers and clients. This will make our communications more robust and less open to exploitation. Woo hoo! -- **Asynchronous endpoints with `Futures`!** This is another huge change to the API, allowing us to multitask our configurations. It will also pave the way for a streaming API, which has been sorely lacking in Intecture until now. +- **Asynchronous, streaming endpoints with `Futures`!** This is another huge change to the API, which allows configuration tasks to be run in parallel; moreover it allows users to stream task output in realtime. Gone are the days of waiting in the dark for tasks to complete! - **Greater emphasis on composability.** Each endpoint (formerly called _primitives_) is organised into a collection of _providers_ that will (you guessed it) provide target-specific implementations of the endpoint. Users will be able to select an endpoint provider manually or let the system choose the best provider for the target platform. - **Separation of duties.** In the previous versions of Intecture, the API had been organised into a single project, making it cluttered and unwieldy. For the next release, things like the FFI, language bindings and project boilerplate will be moved into separate child projects under the same Cargo workspace. From 3644c526304677652b8cc175253ce090cf015a41 Mon Sep 17 00:00:00 2001 From: Pete Hayes Date: Tue, 24 Oct 2017 12:55:02 +0100 Subject: [PATCH 6/6] Better API error handling for Agent --- agent/src/errors.rs | 4 ++-- agent/src/main.rs | 35 +++++++++++++++++++++++++---------- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/agent/src/errors.rs b/agent/src/errors.rs index c297fbb..9c3b6ef 100644 --- a/agent/src/errors.rs +++ b/agent/src/errors.rs @@ -4,6 +4,7 @@ // https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied, // modified, or distributed except according to those terms. +use error_chain::ChainedError; use futures::Future; use intecture_api; use std::{convert, error, io}; @@ -16,8 +17,7 @@ error_chain! { impl convert::From for io::Error { fn from(e: Error) -> io::Error { - // @todo Return whole error chain - io::Error::new(io::ErrorKind::Other, e.description()) + io::Error::new(io::ErrorKind::Other, format!("{}", e.display_chain())) } } diff --git a/agent/src/main.rs b/agent/src/main.rs index 29d34e8..134fce2 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -18,11 +18,12 @@ extern crate toml; mod errors; +use error_chain::ChainedError; use errors::*; use futures::{future, Future}; use intecture_api::host::local::Local; use intecture_api::host::remote::{JsonLineProto, LineMessage}; -use intecture_api::remote::{Executable, Request}; +use intecture_api::remote::{Executable, Request, ResponseResult}; use std::fs::File; use std::io::{self, Read}; use std::net::SocketAddr; @@ -49,9 +50,9 @@ impl Service for Api { Message::WithoutBody(req) => req, }; - let request: Request = match serde_json::from_value(req).chain_err(|| "Received invalid Request") { + let request: Request = match serde_json::from_value(req).chain_err(|| "Could not deserialize Request") { Ok(r) => r, - Err(e) => return Box::new(future::err(e)) + Err(e) => return Box::new(future::ok(error_to_msg(e))), }; // XXX Danger zone! If we're running multiple threads, this `unwrap()` @@ -62,14 +63,19 @@ impl Service for Api { let handle = self.remote.handle().unwrap(); Box::new(request.exec(&self.host, &handle) .chain_err(|| "Failed to execute Request") - .and_then(|mut msg| { - let body = msg.take_body(); - match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize result") { - Ok(v) => match body { - Some(b) => future::ok(Message::WithBody(v, b)), - None => future::ok(Message::WithoutBody(v)), + .then(|req| { + match req { + Ok(mut msg) => { + let body = msg.take_body(); + match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize Result") { + Ok(v) => match body { + Some(b) => future::ok(Message::WithBody(v, b)), + None => future::ok(Message::WithoutBody(v)), + }, + Err(e) => future::ok(error_to_msg(e)), + } }, - Err(e) => future::err(e), + Err(e) => future::ok(error_to_msg(e)), } })) } @@ -143,3 +149,12 @@ quick_main!(|| -> Result<()> { }); Ok(()) }); + +fn error_to_msg(e: Error) -> LineMessage { + let response = ResponseResult::Err(format!("{}", e.display_chain())); + // If we can't serialize this, we can't serialize anything, so + // panicking is appropriate. + let value = serde_json::to_value(response) + .expect("Cannot serialize ResponseResult::Err. This is bad..."); + Message::WithoutBody(value) +}