Skip to content

Commit

Permalink
Streaming Command and fixes for streaming protocol. Closes #40.
Browse files Browse the repository at this point in the history
  • Loading branch information
petehayes102 committed Oct 23, 2017
1 parent c63cd0c commit 6f0996e
Show file tree
Hide file tree
Showing 18 changed files with 410 additions and 159 deletions.
32 changes: 32 additions & 0 deletions agent/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,42 @@
// https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied,
// modified, or distributed except according to those terms.

use futures::Future;
use intecture_api;
use std::{convert, error, io};

error_chain! {
links {
Api(intecture_api::errors::Error, intecture_api::errors::ErrorKind);
}
}

impl convert::From<Error> for io::Error {
fn from(e: Error) -> io::Error {
// @todo Return whole error chain
io::Error::new(io::ErrorKind::Other, e.description())
}
}

// @todo This should disappear once Futures are officially supported
// by error_chain.
// See: https://github.com/rust-lang-nursery/error-chain/issues/90
pub type SFuture<T> = Box<Future<Item = T, Error = Error>>;

pub trait FutureChainErr<T> {
fn chain_err<F, E>(self, callback: F) -> SFuture<T>
where F: FnOnce() -> E + 'static,
E: Into<ErrorKind>;
}

impl<F> FutureChainErr<F::Item> for F
where F: Future + 'static,
F::Error: error::Error + Send + 'static,
{
fn chain_err<C, E>(self, callback: C) -> SFuture<F::Item>
where C: FnOnce() -> E + 'static,
E: Into<ErrorKind>,
{
Box::new(self.then(|r| r.chain_err(callback)))
}
}
34 changes: 16 additions & 18 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ mod errors;

use errors::*;
use futures::{future, Future};
use intecture_api::remote::{Executable, Runnable};
use intecture_api::host::local::Local;
use intecture_api::host::remote::{JsonLineProto, LineMessage};
use intecture_api::remote::{Executable, Request};
use std::fs::File;
use std::io::{self, Read};
use std::net::SocketAddr;
Expand All @@ -40,7 +40,7 @@ pub struct Api {
impl Service for Api {
type Request = LineMessage;
type Response = LineMessage;
type Error = io::Error;
type Error = Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
Expand All @@ -49,15 +49,9 @@ impl Service for Api {
Message::WithoutBody(req) => req,
};

let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") {
let request: Request = match serde_json::from_value(req).chain_err(|| "Received invalid Request") {
Ok(r) => r,
Err(e) => return Box::new(
future::err(
io::Error::new(
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
io::ErrorKind::Other, e.description()
))),
Err(e) => return Box::new(future::err(e))
};

// XXX Danger zone! If we're running multiple threads, this `unwrap()`
Expand All @@ -66,21 +60,25 @@ impl Service for Api {
// only safe for the current thread.
// See https://github.com/alexcrichton/tokio-process/issues/23
let handle = self.remote.handle().unwrap();
Box::new(runnable.exec(&self.host, &handle)
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
.and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") {
Ok(v) => future::ok(Message::WithoutBody(v)),
Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())),
Box::new(request.exec(&self.host, &handle)
.chain_err(|| "Failed to execute Request")
.and_then(|mut msg| {
let body = msg.take_body();
match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize result") {
Ok(v) => match body {
Some(b) => future::ok(Message::WithBody(v, b)),
None => future::ok(Message::WithoutBody(v)),
},
Err(e) => future::err(e),
}
}))
}
}

impl NewService for Api {
type Request = LineMessage;
type Response = LineMessage;
type Error = io::Error;
type Error = Error;
type Instance = Api;
fn new_service(&self) -> io::Result<Self::Instance> {
Ok(Api {
Expand Down
43 changes: 40 additions & 3 deletions core/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,58 @@ extern crate futures;
extern crate intecture_api;
extern crate tokio_core;

use futures::Future;
use futures::{Future, Stream};
use intecture_api::prelude::*;
use tokio_core::reactor::Core;

fn main() {
// These two lines are part of `tokio-core` and can be safely
// ignored. So long as they appear at the top of your code,
// all is fine with the world.
let mut core = Core::new().unwrap();
let handle = core.handle();

// Here's the meat of your project. In this example we're talking
// to our local machine, so we use the `Local` host type.
let host = Local::new().and_then(|host| {
// Ok, we're in! Now we can pass our `host` handle to other
// endpoints, which informs them of the server we mean to
// talk to.

// Let's start with something basic - a shell command.
Command::new(&host, "whoami", None).and_then(|cmd| {
cmd.exec(&handle).map(|out| {
println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim());
// Now that we have our `Command` instance, let's run it.
cmd.exec(&handle).and_then(|(stream, status)| {
// At this point, our command is running. As the API
// is asynchronous, we don't have to wait for it to
// finish before inspecting its output. This is called
// "streaming".

// Our first argument, `stream`, is a stream of strings,
// each of which represents a line of output. We can use
// the `for_each` combinator to print these lines to
// stdout.
//
// If printing isn't your thing, you are also
// free to lick them or whatever you're into. I'm not
// here to judge.
stream.for_each(|line| { println!("{}", line); Ok(()) })

// The second argument is a `Future` that represents the
// command's exit status. Let's print that too*.
//
// * Same caveat as above RE: printing. This is a safe
// place.
.join(status.map(|s| println!("This command {} {}",
if s.success { "succeeded" } else { "failed" },
if let Some(e) = s.code { format!("with code {}", e) } else { String::new() })))
})
})
});

// This line is part of `tokio-core` and is used to execute the
// chain of futures you've created above. You'll need to call
// `core.run()` for each host you interact with, otherwise your
// project will not run at all!
core.run(host).unwrap();
}
16 changes: 16 additions & 0 deletions core/examples/remote_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,28 @@ use intecture_api::prelude::*;
use tokio_core::reactor::Core;

fn main() {
// These two lines are part of `tokio-core` and can be safely
// ignored. So long as they appear at the top of your code,
// all is fine with the world.
let mut core = Core::new().unwrap();
let handle = core.handle();

// Here's the meat of your project. In this example we're talking
// to a remote machine. You'll note that this is the `Plain` host
// type, where you might have been expecting `Remote` or some such.
// This is to signify that this host type sends data in the clear,
// rather than encrypting it. Thus the usual disclaimer about
// secure networks and trust applies.
let host = Plain::connect("127.0.0.1:7101", &handle).map(|host| {
// Ok, we're in! Now we can pass our `host` handle to other
// endpoints, which informs them of the server we mean to
// talk to. See basic.rs for more usage.
println!("Connected to {}", host.telemetry().hostname);
});

// This line is part of `tokio-core` and is used to execute the
// chain of futures you've created above. You'll need to call
// `core.run()` for each host you interact with, otherwise your
// project will not run at all!
core.run(host).unwrap();
}
14 changes: 9 additions & 5 deletions core/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod providers;

use errors::*;
use futures::{future, Future};
use futures::stream::Stream;
use host::Host;
use self::providers::CommandProvider;
use tokio_core::reactor::Handle;
Expand All @@ -27,11 +28,9 @@ pub struct Command<H: Host> {
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CommandResult {
pub struct ExitStatus {
pub success: bool,
pub exit_code: Option<i32>,
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
pub code: Option<i32>,
}

impl<H: Host + 'static> Command<H> {
Expand Down Expand Up @@ -68,7 +67,12 @@ impl<H: Host + 'static> Command<H> {
}
}

pub fn exec(&self, handle: &Handle) -> Box<Future<Item = CommandResult, Error = Error>> {
pub fn exec(&self, handle: &Handle) ->
Box<Future<Item = (
Box<Stream<Item = String, Error = Error>>,
Box<Future<Item = ExitStatus, Error = Error>>
), Error = Error>>
{
self.inner.exec(&self.host, handle, &self.cmd, &self.shell)
}
}
Loading

0 comments on commit 6f0996e

Please sign in to comment.