Skip to content

Commit

Permalink
[aptos-workspace-server] various fixes and improvements (#15571)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgao1996 authored Jan 9, 2025
1 parent a2479a6 commit 3169527
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 118 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aptos-move/aptos-workspace-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ aptos-types = { workspace = true }
# third party deps
anyhow = { workspace = true }
bollard = { workspace = true }
clap = { workspace = true }
diesel = { workspace = true, features = [
"postgres_backend",
] }
Expand Down
35 changes: 32 additions & 3 deletions aptos-move/aptos-workspace-server/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,48 @@
use futures::{future::Shared, FutureExt};
use std::{
fmt::{self, Debug, Display},
future::Future,
net::{IpAddr, Ipv4Addr},
sync::Arc,
};

/// An wrapper to ensure propagation of chain of errors.
pub(crate) struct ArcError(Arc<anyhow::Error>);

impl Clone for ArcError {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl std::error::Error for ArcError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}

impl Display for ArcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl Debug for ArcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}

/// The local IP address services are bound to.
pub(crate) const IP_LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

/// Converts a future into a shared future by wrapping the error in an `Arc`.
pub(crate) fn make_shared<F, T, E>(fut: F) -> Shared<impl Future<Output = Result<T, Arc<E>>>>
pub(crate) fn make_shared<F, T>(fut: F) -> Shared<impl Future<Output = Result<T, ArcError>>>
where
T: Clone,
F: Future<Output = Result<T, E>>,
F: Future<Output = Result<T, anyhow::Error>>,
{
fut.map(|r| r.map_err(|err| Arc::new(err))).shared()
fut.map(|r| r.map_err(|err| ArcError(Arc::new(err))))
.shared()
}
60 changes: 46 additions & 14 deletions aptos-move/aptos-workspace-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@
mod common;
mod services;

use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use aptos_localnet::docker::get_docker;
use clap::Parser;
use common::make_shared;
use futures::TryFutureExt;
use services::{
docker_common::create_docker_network, indexer_api::start_indexer_api,
docker_common::create_docker_network_permanent, indexer_api::start_indexer_api,
processors::start_all_processors,
};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

pub async fn run_all_services() -> Result<()> {
async fn run_all_services(timeout: u64) -> Result<()> {
let test_dir = tempfile::tempdir()?;
let test_dir = test_dir.path();
println!("Created test directory: {}", test_dir.display());
Expand All @@ -50,9 +53,15 @@ pub async fn run_all_services() -> Result<()> {
// waiting for it to trigger.
let shutdown = shutdown.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();

println!("\nCtrl-C received. Shutting down services. This may take a while.\n");
tokio::select! {
res = tokio::signal::ctrl_c() => {
res.unwrap();
println!("\nCtrl-C received. Shutting down services. This may take a while.\n");
}
_ = tokio::time::sleep(Duration::from_secs(timeout)) => {
println!("\nTimeout reached. Shutting down services. This may take a while.\n");
}
}

shutdown.cancel();
});
Expand All @@ -72,15 +81,22 @@ pub async fn run_all_services() -> Result<()> {
fut_indexer_grpc.clone(),
);

// Docker
let fut_docker = make_shared(get_docker());

// Docker Network
let docker_network_name = format!("aptos-workspace-{}", instance_id);
let (fut_docker_network, fut_docker_network_clean_up) =
create_docker_network(shutdown.clone(), docker_network_name);
let docker_network_name = "aptos-workspace".to_string();
let fut_docker_network = make_shared(create_docker_network_permanent(
shutdown.clone(),
fut_docker.clone(),
docker_network_name,
));

// Indexer part 1: postgres db
let (fut_postgres, fut_postgres_finish, fut_postgres_clean_up) =
services::postgres::start_postgres(
shutdown.clone(),
fut_docker.clone(),
fut_docker_network.clone(),
instance_id,
);
Expand All @@ -98,6 +114,7 @@ pub async fn run_all_services() -> Result<()> {
let (fut_indexer_api, fut_indexer_api_finish, fut_indexer_api_clean_up) = start_indexer_api(
instance_id,
shutdown.clone(),
fut_docker.clone(),
fut_docker_network.clone(),
fut_postgres.clone(),
fut_all_processors_ready.clone(),
Expand All @@ -106,19 +123,18 @@ pub async fn run_all_services() -> Result<()> {
// Phase 2: Wait for all services to be up.
let all_services_up = async move {
tokio::try_join!(
fut_node_api.map_err(anyhow::Error::msg),
fut_indexer_grpc.map_err(anyhow::Error::msg),
fut_node_api.map_err(|err| anyhow!(err)),
fut_indexer_grpc.map_err(|err| anyhow!(err)),
fut_faucet,
fut_postgres.map_err(anyhow::Error::msg),
fut_all_processors_ready.map_err(anyhow::Error::msg),
fut_postgres.map_err(|err| anyhow!(err)),
fut_all_processors_ready.map_err(|err| anyhow!(err)),
fut_indexer_api,
)
};
let clean_up_all = async move {
eprintln!("Running shutdown steps");
fut_indexer_api_clean_up.await;
fut_postgres_clean_up.await;
fut_docker_network_clean_up.await;
};
tokio::select! {
_ = shutdown.cancelled() => {
Expand Down Expand Up @@ -181,3 +197,19 @@ pub async fn run_all_services() -> Result<()> {

Ok(())
}

#[derive(Parser)]
pub enum WorkspaceCommand {
Run {
#[arg(long, default_value_t = 1800)]
timeout: u64,
},
}

impl WorkspaceCommand {
pub async fn run(self) -> Result<()> {
match self {
WorkspaceCommand::Run { timeout } => run_all_services(timeout).await,
}
}
}
7 changes: 4 additions & 3 deletions aptos-move/aptos-workspace-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use clap::Parser;

#[tokio::main]
async fn main() -> Result<()> {
aptos_workspace_server::run_all_services().await?;

Ok(())
aptos_workspace_server::WorkspaceCommand::parse()
.run()
.await
}
Loading

0 comments on commit 3169527

Please sign in to comment.