Skip to content

Commit

Permalink
Merge pull request #46 from petehayes102/master
Browse files Browse the repository at this point in the history
Streaming API
  • Loading branch information
petehayes102 authored Oct 24, 2017
2 parents 16dfec7 + 3644c52 commit 2de6e2f
Show file tree
Hide file tree
Showing 21 changed files with 860 additions and 418 deletions.
2 changes: 1 addition & 1 deletion RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ Version 0.4 is a complete rewrite of Intecture's core API. If we had been runnin
Anyway, there are some big headlines in this release:

- **Hello Tokio!** This is an exciting change to our socket API, as we bid a fond farewell to ZeroMQ. The change to Tokio will bring loads of benefits, such as substantially reducing our non-Rust dependencies, and introducing strongly typed messaging between servers and clients. This will make our communications more robust and less open to exploitation. Woo hoo!
- **Asynchronous endpoints with `Futures`!** This is another huge change to the API, allowing us to multitask our configurations. It will also pave the way for a streaming API, which has been sorely lacking in Intecture until now.
- **Asynchronous, streaming endpoints with `Futures`!** This is another huge change to the API, which allows configuration tasks to be run in parallel; moreover it allows users to stream task output in realtime. Gone are the days of waiting in the dark for tasks to complete!
- **Greater emphasis on composability.** Each endpoint (formerly called _primitives_) is organised into a collection of _providers_ that will (you guessed it) provide target-specific implementations of the endpoint. Users will be able to select an endpoint provider manually or let the system choose the best provider for the target platform.
- **Separation of duties.** In the previous versions of Intecture, the API had been organised into a single project, making it cluttered and unwieldy. For the next release, things like the FFI, language bindings and project boilerplate will be moved into separate child projects under the same Cargo workspace.
32 changes: 32 additions & 0 deletions agent/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,42 @@
// https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied,
// modified, or distributed except according to those terms.

use error_chain::ChainedError;
use futures::Future;
use intecture_api;
use std::{convert, error, io};

error_chain! {
links {
Api(intecture_api::errors::Error, intecture_api::errors::ErrorKind);
}
}

impl convert::From<Error> for io::Error {
fn from(e: Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, format!("{}", e.display_chain()))
}
}

// @todo This should disappear once Futures are officially supported
// by error_chain.
// See: https://github.com/rust-lang-nursery/error-chain/issues/90
pub type SFuture<T> = Box<Future<Item = T, Error = Error>>;

pub trait FutureChainErr<T> {
fn chain_err<F, E>(self, callback: F) -> SFuture<T>
where F: FnOnce() -> E + 'static,
E: Into<ErrorKind>;
}

impl<F> FutureChainErr<F::Item> for F
where F: Future + 'static,
F::Error: error::Error + Send + 'static,
{
fn chain_err<C, E>(self, callback: C) -> SFuture<F::Item>
where C: FnOnce() -> E + 'static,
E: Into<ErrorKind>,
{
Box::new(self.then(|r| r.chain_err(callback)))
}
}
67 changes: 43 additions & 24 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ extern crate toml;

mod errors;

use error_chain::ChainedError;
use errors::*;
use futures::{future, Future};
use intecture_api::remote::{Executable, Runnable};
use intecture_api::host::local::Local;
use intecture_api::host::remote::JsonProto;
use intecture_api::host::remote::{JsonLineProto, LineMessage};
use intecture_api::remote::{Executable, Request, ResponseResult};
use std::fs::File;
use std::io::{self, Read};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio_core::reactor::Remote;
use tokio_proto::streaming::Message;
use tokio_proto::TcpServer;
use tokio_service::{NewService, Service};

Expand All @@ -37,21 +39,20 @@ pub struct Api {
}

impl Service for Api {
type Request = serde_json::Value;
type Response = serde_json::Value;
type Error = io::Error;
type Request = LineMessage;
type Response = LineMessage;
type Error = Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") {
let req = match req {
Message::WithBody(req, _) => req,
Message::WithoutBody(req) => req,
};

let request: Request = match serde_json::from_value(req).chain_err(|| "Could not deserialize Request") {
Ok(r) => r,
Err(e) => return Box::new(
future::err(
io::Error::new(
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
io::ErrorKind::Other, e.description()
))),
Err(e) => return Box::new(future::ok(error_to_msg(e))),
};

// XXX Danger zone! If we're running multiple threads, this `unwrap()`
Expand All @@ -60,21 +61,30 @@ impl Service for Api {
// only safe for the current thread.
// See https://github.com/alexcrichton/tokio-process/issues/23
let handle = self.remote.handle().unwrap();
Box::new(runnable.exec(&self.host, &handle)
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
.and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") {
Ok(v) => future::ok(v),
Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())),
Box::new(request.exec(&self.host, &handle)
.chain_err(|| "Failed to execute Request")
.then(|req| {
match req {
Ok(mut msg) => {
let body = msg.take_body();
match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize Result") {
Ok(v) => match body {
Some(b) => future::ok(Message::WithBody(v, b)),
None => future::ok(Message::WithoutBody(v)),
},
Err(e) => future::ok(error_to_msg(e)),
}
},
Err(e) => future::ok(error_to_msg(e)),
}
}))
}
}

impl NewService for Api {
type Request = serde_json::Value;
type Response = serde_json::Value;
type Error = io::Error;
type Request = LineMessage;
type Response = LineMessage;
type Error = Error;
type Instance = Api;
fn new_service(&self) -> io::Result<Self::Instance> {
Ok(Api {
Expand Down Expand Up @@ -129,7 +139,7 @@ quick_main!(|| -> Result<()> {
// Currently we force the issue (`unwrap()`), which is only safe
// for the current thread.
// See https://github.com/alexcrichton/tokio-process/issues/23
let server = TcpServer::new(JsonProto, config.address);
let server = TcpServer::new(JsonLineProto, config.address);
server.with_handle(move |handle| {
let api = Api {
host: host.clone(),
Expand All @@ -139,3 +149,12 @@ quick_main!(|| -> Result<()> {
});
Ok(())
});

fn error_to_msg(e: Error) -> LineMessage {
let response = ResponseResult::Err(format!("{}", e.display_chain()));
// If we can't serialize this, we can't serialize anything, so
// panicking is appropriate.
let value = serde_json::to_value(response)
.expect("Cannot serialize ResponseResult::Err. This is bad...");
Message::WithoutBody(value)
}
46 changes: 41 additions & 5 deletions core/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,58 @@ extern crate futures;
extern crate intecture_api;
extern crate tokio_core;

use futures::Future;
use futures::{Future, Stream};
use intecture_api::prelude::*;
use tokio_core::reactor::Core;

fn main() {
// These two lines are part of `tokio-core` and can be safely
// ignored. So long as they appear at the top of your code,
// all is fine with the world.
let mut core = Core::new().unwrap();
let handle = core.handle();

// Here's the meat of your project. In this example we're talking
// to our local machine, so we use the `Local` host type.
let host = Local::new().and_then(|host| {
Command::new(&host, "whoami", None).and_then(|mut cmd| {
cmd.exec(&handle).map(|out| {
println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim());
// Ok, we're in! Now we can pass our `host` handle to other
// endpoints, which informs them of the server we mean to
// talk to.

// Let's start with something basic - a shell command.
Command::new(&host, "whoami", None).and_then(|cmd| {
// Now that we have our `Command` instance, let's run it.
cmd.exec(&handle).and_then(|(stream, status)| {
// At this point, our command is running. As the API
// is asynchronous, we don't have to wait for it to
// finish before inspecting its output. This is called
// "streaming".

// Our first argument, `stream`, is a stream of strings,
// each of which represents a line of output. We can use
// the `for_each` combinator to print these lines to
// stdout.
//
// If printing isn't your thing, you are also
// free to lick them or whatever you're into. I'm not
// here to judge.
stream.for_each(|line| { println!("{}", line); Ok(()) })

// The second argument is a `Future` that represents the
// command's exit status. Let's print that too*.
//
// * Same caveat as above RE: printing. This is a safe
// place.
.join(status.map(|s| println!("This command {} {}",
if s.success { "succeeded" } else { "failed" },
if let Some(e) = s.code { format!("with code {}", e) } else { String::new() })))
})
})
});

// This line is part of `tokio-core` and is used to execute the
// chain of futures you've created above. You'll need to call
// `core.run()` for each host you interact with, otherwise your
// project will not run at all!
core.run(host).unwrap();
}

16 changes: 16 additions & 0 deletions core/examples/remote_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,28 @@ use intecture_api::prelude::*;
use tokio_core::reactor::Core;

fn main() {
// These two lines are part of `tokio-core` and can be safely
// ignored. So long as they appear at the top of your code,
// all is fine with the world.
let mut core = Core::new().unwrap();
let handle = core.handle();

// Here's the meat of your project. In this example we're talking
// to a remote machine. You'll note that this is the `Plain` host
// type, where you might have been expecting `Remote` or some such.
// This is to signify that this host type sends data in the clear,
// rather than encrypting it. Thus the usual disclaimer about
// secure networks and trust applies.
let host = Plain::connect("127.0.0.1:7101", &handle).map(|host| {
// Ok, we're in! Now we can pass our `host` handle to other
// endpoints, which informs them of the server we mean to
// talk to. See basic.rs for more usage.
println!("Connected to {}", host.telemetry().hostname);
});

// This line is part of `tokio-core` and is used to execute the
// chain of futures you've created above. You'll need to call
// `core.run()` for each host you interact with, otherwise your
// project will not run at all!
core.run(host).unwrap();
}
14 changes: 9 additions & 5 deletions core/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod providers;

use errors::*;
use futures::{future, Future};
use futures::stream::Stream;
use host::Host;
use self::providers::CommandProvider;
use tokio_core::reactor::Handle;
Expand All @@ -27,11 +28,9 @@ pub struct Command<H: Host> {
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CommandResult {
pub struct ExitStatus {
pub success: bool,
pub exit_code: Option<i32>,
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
pub code: Option<i32>,
}

impl<H: Host + 'static> Command<H> {
Expand Down Expand Up @@ -68,7 +67,12 @@ impl<H: Host + 'static> Command<H> {
}
}

pub fn exec(&mut self, handle: &Handle) -> Box<Future<Item = CommandResult, Error = Error>> {
pub fn exec(&self, handle: &Handle) ->
Box<Future<Item = (
Box<Stream<Item = String, Error = Error>>,
Box<Future<Item = ExitStatus, Error = Error>>
), Error = Error>>
{
self.inner.exec(&self.host, handle, &self.cmd, &self.shell)
}
}
Loading

0 comments on commit 2de6e2f

Please sign in to comment.