Skip to content

Commit

Permalink
vHive engine
Browse files Browse the repository at this point in the history
  • Loading branch information
faromero committed Sep 10, 2021
1 parent 04063e5 commit fe8c11f
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/execution/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ libggexecution_a_SOURCES = response.hh response.cc \
engine_lambda.hh engine_lambda.cc \
engine_gg.hh engine_gg.cc \
engine_gcloud.hh engine_gcloud.cc \
engine_vhive.hh engine_vhive.cc \
meow/message.hh meow/message.cc \
meow/util.hh meow/util.cc \
engine_meow.hh engine_meow.cc \
Expand Down
122 changes: 122 additions & 0 deletions src/execution/engine_vhive.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/* -*-mode:c++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */

#include "engine_vhive.hh"

#include <stdexcept>

#include "response.hh"
#include "net/http_response.hh"
#include "thunk/ggutils.hh"
#include "util/optional.hh"
#include "util/system_runner.hh"
#include "util/units.hh"
#include "util/base64.hh"

using namespace std;
using namespace gg;
using namespace gg::thunk;

VHiveExecutionEngine::VHiveExecutionEngine( const size_t max_jobs, const string & url )
: ExecutionEngine( max_jobs ), parsed_url_( url ), address_( parsed_url_.host, parsed_url_.protocol ),
final_url_( parsed_url_.host )
{
// If a port is specified, we treat the url and address differently
uint16_t port = parsed_url_.port.get();

if ( port ) {
string port_string = to_string( port );
final_url_ = parsed_url_.host + ":" + port_string;
address_ = Address( parsed_url_.host, port_string );
}
}

HTTPRequest VHiveExecutionEngine::generate_request( const Thunk & thunk )
{
string payload = Thunk::execution_payload( thunk );
HTTPRequest request;
request.set_first_line( "POST / HTTP/1.1" );
request.add_header( HTTPHeader{ "Host", final_url_ } );
request.add_header( HTTPHeader{ "Content-Length", to_string( payload.size() ) } );
request.add_header( HTTPHeader{ "Content-Type", "application/json" } );
request.add_header( HTTPHeader{ "Accept", "*/*" } );
request.done_with_headers();

request.read_in_body( payload );
assert( request.state() == COMPLETE );

return request;
}

void VHiveExecutionEngine::force_thunk( const Thunk & thunk,
ExecutionLoop & exec_loop )
{
HTTPRequest request = generate_request( thunk );
exec_loop.make_http_request<TCPConnection>( thunk.hash(),
address_, request,
[this] ( const uint64_t, const string & thunk_hash,
const HTTPResponse & http_response ) -> bool
{
running_jobs_--;

if ( http_response.status_code() != "200" ) {
failure_callback_( thunk_hash, JobStatus::InvocationFailure );
return false;
}

ExecutionResponse response = ExecutionResponse::parse_message( http_response.body() );

/* print the output, if there's any */
if ( response.stdout.length() ) {
cerr << response.stdout << endl;
}

switch ( response.status ) {
case JobStatus::Success:
{
if ( response.thunk_hash != thunk_hash ) {
cerr << http_response.str() << endl;
throw runtime_error( "expected output for " +
thunk_hash + ", got output for " +
response.thunk_hash );
}

for ( const auto & output : response.outputs ) {
gg::cache::insert( gg::hash::for_output( response.thunk_hash, output.tag ), output.hash );

if ( output.data.length() ) {
roost::atomic_create( base64::decode( output.data ),
gg::paths::blob( output.hash ) );
}
}

gg::cache::insert( response.thunk_hash, response.outputs.at( 0 ).hash );

vector<ThunkOutput> thunk_outputs;
for ( auto & output : response.outputs ) {
thunk_outputs.emplace_back( move( output.hash ), move( output.tag ) );
}

success_callback_( response.thunk_hash, move( thunk_outputs ), 0 );

break;
}

default: /* in case of any other failure */
failure_callback_( thunk_hash, response.status );
}

return false;
},
[this] ( const uint64_t, const string & thunk_hash )
{
failure_callback_( thunk_hash, JobStatus::SocketFailure );
}
);

running_jobs_++;
}

size_t VHiveExecutionEngine::job_count() const
{
return running_jobs_;
}
34 changes: 34 additions & 0 deletions src/execution/engine_vhive.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* -*-mode:c++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */

#ifndef ENGINE_VHIVE_HH
#define ENGINE_VHIVE_HH

#include "engine.hh"
#include "net/http_request.hh"
#include "thunk/thunk.hh"
#include "util/uri.hh"

class VHiveExecutionEngine : public ExecutionEngine
{
private:
ParsedURI parsed_url_;
Address address_;
std::string final_url_;

size_t running_jobs_ { 0 };

HTTPRequest generate_request( const gg::thunk::Thunk & thunk );

public:
VHiveExecutionEngine( const size_t max_jobs, const std::string & address );

void force_thunk( const gg::thunk::Thunk & thunk,
ExecutionLoop & exec_loop ) override;
size_t job_count() const override;

bool is_remote() const { return true; }
std::string label() const override { return "remote"; }
bool can_execute( const gg::thunk::Thunk & ) const { return true; }
};

#endif /* ENGINE_VHIVE_HH */
7 changes: 6 additions & 1 deletion src/frontend/gg-force.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "execution/engine_gg.hh"
#include "execution/engine_meow.hh"
#include "execution/engine_gcloud.hh"
#include "execution/engine_vhive.hh"
#include "tui/status_bar.hh"
#include "util/digest.hh"
#include "util/exception.hh"
Expand Down Expand Up @@ -58,6 +59,7 @@ void usage( const char * argv0 )
<< " - remote Executes the jobs on a remote machine" << endl
<< " - meow Executes the jobs on AWS Lambda with long-running workers" << endl
<< " - gcloud Executes the jobs on Google Cloud Functions" << endl
<< " - vhive Executes the jobs on VHive Server" << endl
<< endl
<< "Environment variables:" << endl
<< " - " << FORCE_NO_STATUS << endl
Expand Down Expand Up @@ -100,7 +102,6 @@ unique_ptr<ExecutionEngine> make_execution_engine( const EngineInfo & engine )
const string & engine_name = get<0>( engine );
const string & engine_params = get<1>( engine );
const size_t max_jobs = get<2>( engine );

if ( engine_name == "local" ) {
const bool mixed = (engine_params == "mixed");
return make_unique<LocalExecutionEngine>( mixed, max_jobs );
Expand Down Expand Up @@ -144,6 +145,10 @@ unique_ptr<ExecutionEngine> make_execution_engine( const EngineInfo & engine )
return make_unique<GCFExecutionEngine>( max_jobs,
safe_getenv("GG_GCLOUD_FUNCTION") );
}
else if ( engine_name == "vhive" ) {
string url = engine_params;
return make_unique<VHiveExecutionEngine>( max_jobs, url );
}
else {
throw runtime_error( "unknown execution engine" );
}
Expand Down

0 comments on commit fe8c11f

Please sign in to comment.