Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming API #46

Merged
merged 6 commits into from
Oct 24, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ Version 0.4 is a complete rewrite of Intecture's core API. If we had been runnin
Anyway, there are some big headlines in this release:

- **Hello Tokio!** This is an exciting change to our socket API, as we bid a fond farewell to ZeroMQ. The change to Tokio will bring loads of benefits, such as substantially reducing our non-Rust dependencies, and introducing strongly typed messaging between servers and clients. This will make our communications more robust and less open to exploitation. Woo hoo!
- **Asynchronous endpoints with `Futures`!** This is another huge change to the API, allowing us to multitask our configurations. It will also pave the way for a streaming API, which has been sorely lacking in Intecture until now.
- **Asynchronous, streaming endpoints with `Futures`!** This is another huge change to the API, which allows configuration tasks to be run in parallel; moreover it allows users to stream task output in realtime. Gone are the days of waiting in the dark for tasks to complete!
- **Greater emphasis on composability.** Each endpoint (formerly called _primitives_) is organised into a collection of _providers_ that will (you guessed it) provide target-specific implementations of the endpoint. Users will be able to select an endpoint provider manually or let the system choose the best provider for the target platform.
- **Separation of duties.** In the previous versions of Intecture, the API had been organised into a single project, making it cluttered and unwieldy. For the next release, things like the FFI, language bindings and project boilerplate will be moved into separate child projects under the same Cargo workspace.
32 changes: 32 additions & 0 deletions agent/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,42 @@
// https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied,
// modified, or distributed except according to those terms.

use futures::Future;
use intecture_api;
use std::{convert, error, io};

error_chain! {
links {
Api(intecture_api::errors::Error, intecture_api::errors::ErrorKind);
}
}

impl convert::From<Error> for io::Error {
fn from(e: Error) -> io::Error {
// @todo Return whole error chain
io::Error::new(io::ErrorKind::Other, e.description())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to only keep the description here and throw away the error considering that the second argument to io::Error::new is generic? Won't we loose some information that might be useful for debugging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny, I'm just working on these errors. This was part laziness, knowing that I was coming back to it once it was working, and partly an issue with error-chain. Basically error-chain::Error doesn't derive Sync, which is required by io::Error, so we can't chain the whole error. See this outstanding PR: rust-lang-deprecated/error-chain#163

I just became aware of error_chain::ChainedError, which will give us the whole chain as a formatted string. Not as good as chaining the actual Error, but at least we don't lose the error chain. I'll tack this onto the current PR. Should take me an hour or so to finish up.

}
}

// @todo This should disappear once Futures are officially supported
// by error_chain.
// See: https://github.com/rust-lang-nursery/error-chain/issues/90
pub type SFuture<T> = Box<Future<Item = T, Error = Error>>;

pub trait FutureChainErr<T> {
fn chain_err<F, E>(self, callback: F) -> SFuture<T>
where F: FnOnce() -> E + 'static,
E: Into<ErrorKind>;
}

impl<F> FutureChainErr<F::Item> for F
where F: Future + 'static,
F::Error: error::Error + Send + 'static,
{
fn chain_err<C, E>(self, callback: C) -> SFuture<F::Item>
where C: FnOnce() -> E + 'static,
E: Into<ErrorKind>,
{
Box::new(self.then(|r| r.chain_err(callback)))
}
}
52 changes: 28 additions & 24 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ mod errors;

use errors::*;
use futures::{future, Future};
use intecture_api::remote::{Executable, Runnable};
use intecture_api::host::local::Local;
use intecture_api::host::remote::JsonProto;
use intecture_api::host::remote::{JsonLineProto, LineMessage};
use intecture_api::remote::{Executable, Request};
use std::fs::File;
use std::io::{self, Read};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio_core::reactor::Remote;
use tokio_proto::streaming::Message;
use tokio_proto::TcpServer;
use tokio_service::{NewService, Service};

Expand All @@ -37,21 +38,20 @@ pub struct Api {
}

impl Service for Api {
type Request = serde_json::Value;
type Response = serde_json::Value;
type Error = io::Error;
type Request = LineMessage;
type Response = LineMessage;
type Error = Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") {
let req = match req {
Message::WithBody(req, _) => req,
Message::WithoutBody(req) => req,
};

let request: Request = match serde_json::from_value(req).chain_err(|| "Received invalid Request") {
Ok(r) => r,
Err(e) => return Box::new(
future::err(
io::Error::new(
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
io::ErrorKind::Other, e.description()
))),
Err(e) => return Box::new(future::err(e))
};

// XXX Danger zone! If we're running multiple threads, this `unwrap()`
Expand All @@ -60,21 +60,25 @@ impl Service for Api {
// only safe for the current thread.
// See https://github.com/alexcrichton/tokio-process/issues/23
let handle = self.remote.handle().unwrap();
Box::new(runnable.exec(&self.host, &handle)
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
.and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") {
Ok(v) => future::ok(v),
Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())),
Box::new(request.exec(&self.host, &handle)
.chain_err(|| "Failed to execute Request")
.and_then(|mut msg| {
let body = msg.take_body();
match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize result") {
Ok(v) => match body {
Some(b) => future::ok(Message::WithBody(v, b)),
None => future::ok(Message::WithoutBody(v)),
},
Err(e) => future::err(e),
}
}))
}
}

impl NewService for Api {
type Request = serde_json::Value;
type Response = serde_json::Value;
type Error = io::Error;
type Request = LineMessage;
type Response = LineMessage;
type Error = Error;
type Instance = Api;
fn new_service(&self) -> io::Result<Self::Instance> {
Ok(Api {
Expand Down Expand Up @@ -129,7 +133,7 @@ quick_main!(|| -> Result<()> {
// Currently we force the issue (`unwrap()`), which is only safe
// for the current thread.
// See https://github.com/alexcrichton/tokio-process/issues/23
let server = TcpServer::new(JsonProto, config.address);
let server = TcpServer::new(JsonLineProto, config.address);
server.with_handle(move |handle| {
let api = Api {
host: host.clone(),
Expand Down
46 changes: 41 additions & 5 deletions core/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,58 @@ extern crate futures;
extern crate intecture_api;
extern crate tokio_core;

use futures::Future;
use futures::{Future, Stream};
use intecture_api::prelude::*;
use tokio_core::reactor::Core;

fn main() {
// These two lines are part of `tokio-core` and can be safely
// ignored. So long as they appear at the top of your code,
// all is fine with the world.
let mut core = Core::new().unwrap();
let handle = core.handle();

// Here's the meat of your project. In this example we're talking
// to our local machine, so we use the `Local` host type.
let host = Local::new().and_then(|host| {
Command::new(&host, "whoami", None).and_then(|mut cmd| {
cmd.exec(&handle).map(|out| {
println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim());
// Ok, we're in! Now we can pass our `host` handle to other
// endpoints, which informs them of the server we mean to
// talk to.

// Let's start with something basic - a shell command.
Command::new(&host, "whoami", None).and_then(|cmd| {
// Now that we have our `Command` instance, let's run it.
cmd.exec(&handle).and_then(|(stream, status)| {
// At this point, our command is running. As the API
// is asynchronous, we don't have to wait for it to
// finish before inspecting its output. This is called
// "streaming".

// Our first argument, `stream`, is a stream of strings,
// each of which represents a line of output. We can use
// the `for_each` combinator to print these lines to
// stdout.
//
// If printing isn't your thing, you are also
// free to lick them or whatever you're into. I'm not
// here to judge.
stream.for_each(|line| { println!("{}", line); Ok(()) })

// The second argument is a `Future` that represents the
// command's exit status. Let's print that too*.
//
// * Same caveat as above RE: printing. This is a safe
// place.
.join(status.map(|s| println!("This command {} {}",
if s.success { "succeeded" } else { "failed" },
if let Some(e) = s.code { format!("with code {}", e) } else { String::new() })))
})
})
});

// This line is part of `tokio-core` and is used to execute the
// chain of futures you've created above. You'll need to call
// `core.run()` for each host you interact with, otherwise your
// project will not run at all!
core.run(host).unwrap();
}

16 changes: 16 additions & 0 deletions core/examples/remote_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,28 @@ use intecture_api::prelude::*;
use tokio_core::reactor::Core;

fn main() {
// These two lines are part of `tokio-core` and can be safely
// ignored. So long as they appear at the top of your code,
// all is fine with the world.
let mut core = Core::new().unwrap();
let handle = core.handle();

// Here's the meat of your project. In this example we're talking
// to a remote machine. You'll note that this is the `Plain` host
// type, where you might have been expecting `Remote` or some such.
// This is to signify that this host type sends data in the clear,
// rather than encrypting it. Thus the usual disclaimer about
// secure networks and trust applies.
let host = Plain::connect("127.0.0.1:7101", &handle).map(|host| {
// Ok, we're in! Now we can pass our `host` handle to other
// endpoints, which informs them of the server we mean to
// talk to. See basic.rs for more usage.
println!("Connected to {}", host.telemetry().hostname);
});

// This line is part of `tokio-core` and is used to execute the
// chain of futures you've created above. You'll need to call
// `core.run()` for each host you interact with, otherwise your
// project will not run at all!
core.run(host).unwrap();
}
14 changes: 9 additions & 5 deletions core/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod providers;

use errors::*;
use futures::{future, Future};
use futures::stream::Stream;
use host::Host;
use self::providers::CommandProvider;
use tokio_core::reactor::Handle;
Expand All @@ -27,11 +28,9 @@ pub struct Command<H: Host> {
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CommandResult {
pub struct ExitStatus {
pub success: bool,
pub exit_code: Option<i32>,
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
pub code: Option<i32>,
}

impl<H: Host + 'static> Command<H> {
Expand Down Expand Up @@ -68,7 +67,12 @@ impl<H: Host + 'static> Command<H> {
}
}

pub fn exec(&mut self, handle: &Handle) -> Box<Future<Item = CommandResult, Error = Error>> {
pub fn exec(&self, handle: &Handle) ->
Box<Future<Item = (
Box<Stream<Item = String, Error = Error>>,
Box<Future<Item = ExitStatus, Error = Error>>
), Error = Error>>
{
self.inner.exec(&self.host, handle, &self.cmd, &self.shell)
}
}
Loading