Skip to content

Commit

Permalink
[aptos-workspace-server] ignore errors when writing to stdout & stderr (
Browse files Browse the repository at this point in the history
  • Loading branch information
vgao1996 authored Jan 24, 2025
1 parent 6705ea5 commit b8f4b98
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 59 deletions.
20 changes: 20 additions & 0 deletions aptos-move/aptos-workspace-server/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,26 @@ use std::{
sync::Arc,
};

/// Custom macro to allow writing to stdout while ignoring any errors.
/// This is to allow handling of closed stdout (e.g. in case of a broken pipe).
#[macro_export]
macro_rules! no_panic_println {
($($arg:tt)*) => {{
use std::io::Write;
let _ = writeln!(std::io::stdout(), $($arg)*);
}};
}

/// Custom macro to allow writing to stderr while ignoring any errors.
/// This is to allow handling of closed stderr (e.g. in case of a broken pipe).
#[macro_export]
macro_rules! no_panic_eprintln {
($($arg:tt)*) => {{
use std::io::Write;
let _ = writeln!(std::io::stderr(), $($arg)*);
}};
}

/// An wrapper to ensure propagation of chain of errors.
pub(crate) struct ArcError(Arc<anyhow::Error>);

Expand Down
34 changes: 17 additions & 17 deletions aptos-move/aptos-workspace-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use uuid::Uuid;
async fn run_all_services(timeout: u64) -> Result<()> {
let test_dir = tempfile::tempdir()?;
let test_dir = test_dir.path();
println!("Created test directory: {}", test_dir.display());
no_panic_println!("Created test directory: {}", test_dir.display());

let instance_id = Uuid::new_v4();

Expand All @@ -56,10 +56,10 @@ async fn run_all_services(timeout: u64) -> Result<()> {
tokio::select! {
res = tokio::signal::ctrl_c() => {
res.unwrap();
println!("\nCtrl-C received. Shutting down services. This may take a while.\n");
no_panic_println!("\nCtrl-C received. Shutting down services. This may take a while.\n");
}
_ = tokio::time::sleep(Duration::from_secs(timeout)) => {
println!("\nTimeout reached. Shutting down services. This may take a while.\n");
no_panic_println!("\nTimeout reached. Shutting down services. This may take a while.\n");
}
}

Expand Down Expand Up @@ -132,7 +132,7 @@ async fn run_all_services(timeout: u64) -> Result<()> {
)
};
let clean_up_all = async move {
eprintln!("Running shutdown steps");
no_panic_eprintln!("Running shutdown steps");
fut_indexer_api_clean_up.await;
fut_postgres_clean_up.await;
};
Expand All @@ -144,9 +144,9 @@ async fn run_all_services(timeout: u64) -> Result<()> {
}
res = all_services_up => {
match res.context("one or more services failed to start") {
Ok(_) => println!("ALL SERVICES UP"),
Ok(_) => no_panic_println!("ALL SERVICES UP"),
Err(err) => {
eprintln!("\nOne or more services failed to start, will run shutdown steps\n");
no_panic_eprintln!("\nOne or more services failed to start, will run shutdown steps\n");
clean_up_all.await;

return Err(err)
Expand All @@ -160,40 +160,40 @@ async fn run_all_services(timeout: u64) -> Result<()> {
tokio::select! {
_ = shutdown.cancelled() => (),
res = fut_node_finish => {
eprintln!("Node exited unexpectedly");
no_panic_eprintln!("Node exited unexpectedly");
if let Err(err) = res {
eprintln!("Error: {}", err);
no_panic_eprintln!("Error: {}", err);
}
}
res = fut_faucet_finish => {
eprintln!("Faucet exited unexpectedly");
no_panic_eprintln!("Faucet exited unexpectedly");
if let Err(err) = res {
eprintln!("Error: {}", err);
no_panic_eprintln!("Error: {}", err);
}
}
res = fut_postgres_finish => {
eprintln!("Postgres exited unexpectedly");
no_panic_eprintln!("Postgres exited unexpectedly");
if let Err(err) = res {
eprintln!("Error: {}", err);
no_panic_eprintln!("Error: {}", err);
}
}
res = fut_any_processor_finish => {
eprintln!("One of the processors exited unexpectedly");
no_panic_eprintln!("One of the processors exited unexpectedly");
if let Err(err) = res {
eprintln!("Error: {}", err);
no_panic_eprintln!("Error: {}", err);
}
}
res = fut_indexer_api_finish => {
eprintln!("Indexer API exited unexpectedly");
no_panic_eprintln!("Indexer API exited unexpectedly");
if let Err(err) = res {
eprintln!("Error: {}", err);
no_panic_eprintln!("Error: {}", err);
}
}
}

clean_up_all.await;

println!("Finished running all services");
no_panic_println!("Finished running all services");

Ok(())
}
Expand Down
41 changes: 22 additions & 19 deletions aptos-move/aptos-workspace-server/src/services/docker_common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::{make_shared, ArcError};
use crate::{
common::{make_shared, ArcError},
no_panic_eprintln, no_panic_println,
};
use anyhow::{anyhow, bail, Context, Result};
use bollard::{
container::{CreateContainerOptions, InspectContainerOptions, StartContainerOptions},
Expand Down Expand Up @@ -43,15 +46,15 @@ pub async fn create_docker_network_permanent(

match res {
Ok(_response) => {
println!("Created docker network {}", name);
no_panic_println!("Created docker network {}", name);

Ok(name)
},
Err(err) => match err {
bollard::errors::Error::DockerResponseServerError {
status_code: 409, ..
} => {
println!("Docker network {} already exists, not creating it", name);
no_panic_println!("Docker network {} already exists, not creating it", name);
Ok(name)
},
err => Err(err.into()),
Expand Down Expand Up @@ -120,7 +123,7 @@ pub fn create_docker_network(
.await
.context("failed to create docker network")?;

println!("Created docker network {}", name);
no_panic_println!("Created docker network {}", name);

Ok(name)
});
Expand Down Expand Up @@ -154,10 +157,10 @@ pub fn create_docker_network(

match cleanup.await {
Ok(_) => {
println!("Removed docker network {}", name);
no_panic_println!("Removed docker network {}", name);
},
Err(err) => {
eprintln!("Failed to remove docker network {}: {}", name, err)
no_panic_eprintln!("Failed to remove docker network {}: {}", name, err)
},
}
}
Expand Down Expand Up @@ -220,7 +223,7 @@ pub fn create_docker_volume(
.await
.context("failed to create docker volume")?;

println!("Created docker volume {}", name);
no_panic_println!("Created docker volume {}", name);

Ok(name)
});
Expand Down Expand Up @@ -254,10 +257,10 @@ pub fn create_docker_volume(

match cleanup.await {
Ok(_) => {
println!("Removed docker volume {}", name);
no_panic_println!("Removed docker volume {}", name);
},
Err(err) => {
eprintln!("Failed to remove docker volume {}: {}", name, err)
no_panic_eprintln!("Failed to remove docker volume {}: {}", name, err)
},
}
}
Expand Down Expand Up @@ -326,10 +329,10 @@ pub fn create_start_and_inspect_container(
let image_name = config.image.as_ref().unwrap();
match docker.inspect_image(image_name).await {
Ok(_) => {
println!("Docker image {} already exists", image_name);
no_panic_println!("Docker image {} already exists", image_name);
},
Err(_err) => {
println!(
no_panic_println!(
"Docker image {} does not exist. Pulling image..",
image_name
);
Expand All @@ -347,7 +350,7 @@ pub fn create_start_and_inspect_container(
.await
.context("failed to create docker container")?;

println!("Pulled docker image {}", image_name);
no_panic_println!("Pulled docker image {}", image_name);
},
}

Expand All @@ -358,7 +361,7 @@ pub fn create_start_and_inspect_container(
.create_container(Some(options), config)
.await
.context("failed to create docker container")?;
println!("Created docker container {}", name);
no_panic_println!("Created docker container {}", name);

if shutdown.is_cancelled() {
bail!("failed to start docker container: cancelled")
Expand All @@ -368,7 +371,7 @@ pub fn create_start_and_inspect_container(
.start_container(&name, None::<StartContainerOptions<&str>>)
.await
.context("failed to start docker container")?;
println!("Started docker container {}", name);
no_panic_println!("Started docker container {}", name);

if shutdown.is_cancelled() {
bail!("failed to inspect docker container: cancelled")
Expand Down Expand Up @@ -407,28 +410,28 @@ pub fn create_start_and_inspect_container(
let docker = match fut_docker.await {
Ok(docker) => docker,
Err(err) => {
eprintln!("Failed to clean up docker container {}: {}", name, err);
no_panic_eprintln!("Failed to clean up docker container {}: {}", name, err);
return;
},
};

if *state == State::Started {
match docker.stop_container(name.as_str(), None).await {
Ok(_) => {
println!("Stopped docker container {}", name)
no_panic_println!("Stopped docker container {}", name)
},
Err(err) => {
eprintln!("Failed to stop docker container {}: {}", name, err)
no_panic_eprintln!("Failed to stop docker container {}: {}", name, err)
},
}
}

match docker.remove_container(name.as_str(), None).await {
Ok(_) => {
println!("Removed docker container {}", name)
no_panic_println!("Removed docker container {}", name)
},
Err(err) => {
eprintln!("Failed to remove docker container {}: {}", name, err)
no_panic_eprintln!("Failed to remove docker container {}: {}", name, err)
},
}
}
Expand Down
12 changes: 8 additions & 4 deletions aptos-move/aptos-workspace-server/src/services/faucet.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::{ArcError, IP_LOCAL_HOST};
use crate::{
common::{ArcError, IP_LOCAL_HOST},
no_panic_println,
};
use anyhow::{anyhow, Context, Result};
use aptos_faucet_core::server::{FunderKeyEnum, RunConfig};
use aptos_localnet::health_checker::HealthChecker;
Expand Down Expand Up @@ -38,7 +41,7 @@ pub fn start_faucet(
.await
.context("failed to start faucet: indexer grpc did not start successfully")?;

println!("Starting faucet..");
no_panic_println!("Starting faucet..");

let faucet_run_config = RunConfig::build_for_cli(
Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(),
Expand Down Expand Up @@ -67,9 +70,10 @@ pub fn start_faucet(
HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string());
faucet_health_checker.wait(None).await?;

println!(
no_panic_println!(
"Faucet is ready. Endpoint: http://{}:{}",
IP_LOCAL_HOST, faucet_port
IP_LOCAL_HOST,
faucet_port
);

Ok(faucet_port)
Expand Down
14 changes: 9 additions & 5 deletions aptos-move/aptos-workspace-server/src/services/indexer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use super::{
docker_common::create_start_and_inspect_container,
postgres::get_postgres_connection_string_within_docker_network,
};
use crate::common::{make_shared, ArcError, IP_LOCAL_HOST};
use crate::{
common::{make_shared, ArcError, IP_LOCAL_HOST},
no_panic_println,
};
use anyhow::{anyhow, Context, Result};
use aptos_localnet::{
health_checker::HealthChecker,
Expand Down Expand Up @@ -145,7 +148,7 @@ pub fn start_indexer_api(
"failed to start indexer api server: one or more dependencies failed to start",
)?;

println!("Starting indexer API..");
no_panic_println!("Starting indexer API..");

let (options, config) =
create_container_options_and_config(instance_id, docker_network_name);
Expand Down Expand Up @@ -180,7 +183,7 @@ pub fn start_indexer_api(
.await
.context("failed to wait for indexer API to be ready")?;

println!("Indexer API is up, applying hasura metadata..");
no_panic_println!("Indexer API is up, applying hasura metadata..");

// Apply the hasura metadata, with the second health checker waiting for it to succeed.
post_metadata(url.clone(), HASURA_METADATA)
Expand All @@ -193,9 +196,10 @@ pub fn start_indexer_api(
.await
.context("failed to wait for indexer API to be ready")?;

println!(
no_panic_println!(
"Indexer API is ready. Endpoint: http://{}:{}/",
IP_LOCAL_HOST, indexer_api_port
IP_LOCAL_HOST,
indexer_api_port
);

anyhow::Ok(indexer_api_port)
Expand Down
14 changes: 8 additions & 6 deletions aptos-move/aptos-workspace-server/src/services/node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::IP_LOCAL_HOST;
use crate::{common::IP_LOCAL_HOST, no_panic_println};
use anyhow::{bail, Result};
use aptos_config::config::{NodeConfig, TableInfoServiceMode};
use aptos_localnet::health_checker::HealthChecker;
Expand Down Expand Up @@ -87,7 +87,7 @@ pub fn start_node(
let (indexer_grpc_port_tx, indexer_grpc_port_rx) = oneshot::channel();

let run_node = {
println!("Starting node..");
no_panic_println!("Starting node..");

let test_dir = test_dir.to_owned();
let node_config = node_config.clone();
Expand Down Expand Up @@ -123,9 +123,10 @@ pub fn start_node(
);
api_health_checker.wait(None).await?;

println!(
no_panic_println!(
"Node API is ready. Endpoint: http://{}:{}/",
IP_LOCAL_HOST, api_port
IP_LOCAL_HOST,
api_port
);

Ok(api_port)
Expand All @@ -138,9 +139,10 @@ pub fn start_node(
HealthChecker::DataServiceGrpc(get_data_service_url(indexer_grpc_port));

indexer_grpc_health_checker.wait(None).await?;
println!(
no_panic_println!(
"Transaction stream is ready. Endpoint: http://{}:{}/",
IP_LOCAL_HOST, indexer_grpc_port
IP_LOCAL_HOST,
indexer_grpc_port
);

Ok(indexer_grpc_port)
Expand Down
Loading

0 comments on commit b8f4b98

Please sign in to comment.