Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force client disconnects when node is unhealthy #13

Open
wants to merge 22 commits into
base: master-2.2
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Address comments
pmantica11 committed Dec 13, 2024
commit ceb621ad4e000d43d2db91eda7f1cec8d64856b2
13 changes: 13 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
@@ -122,6 +122,19 @@ pub struct ConfigGrpc {
with = "humantime_serde"
)]
pub filter_names_cleanup_interval: Duration,
/// Disconnect if node is lagging behind
#[serde(default)]
pub force_disconnect_if_node_is_unhealthy: bool,
/// RPC port to use for health monitoring
pub rpc_port: Option<u16>,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigHealthMonitor {
/// Recommended to set as same threshold as the RPC
pub max_slot_behind_threshold: u64,
pub rpc_port: u16,
}

impl ConfigGrpc {
16 changes: 12 additions & 4 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use {
config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters},
filters::Filter,
metrics::{self, DebugClientMessage},
monitor::{HEALTH_CHECK_SLOT_DISTANCE, NUM_SLOTS_BEHIND},
monitor::SHOULD_DISCONNECT,
version::GrpcVersionInfo,
},
anyhow::Context,
@@ -839,10 +839,12 @@ impl GrpcService {
}
}
message = messages_rx.recv() => {
let num_slots_behind = NUM_SLOTS_BEHIND.load(Ordering::SeqCst);
if num_slots_behind > HEALTH_CHECK_SLOT_DISTANCE {
if SHOULD_DISCONNECT.load(Ordering::SeqCst) {
error!("gRPC node is lagging behind. Disconnecting client #{id}");
stream_tx.send(Err(Status::internal("Node is significantly behind the chain tip. Disconnecting to maintain service quality. Please reconnect - you will be automatically routed to a healthy node if using a load balancer."))).await.unwrap();
stream_tx
.send(Err(Status::internal("Disconnecting since node is lagging behind. If you are connected through a load balancer, please try reconnecting. You might be automatically routed to a healthy node.")))
.await
.unwrap();
break 'outer;
}

@@ -972,6 +974,12 @@ impl Geyser for GrpcService {
&self,
mut request: Request<Streaming<SubscribeRequest>>,
) -> TonicResult<Response<Self::SubscribeStream>> {
if SHOULD_DISCONNECT.load(Ordering::SeqCst) {
error!("gRPC node is lagging behind. Preventing client from connecting.");
return Err(Status::internal(
"Node is lagging behind. If you are connected through a load balancer, please try reconnecting. You might be automatically routed to a healthy node.",
));
}
let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed);

let x_request_snapshot = request.metadata().contains_key("x-request-snapshot");
22 changes: 10 additions & 12 deletions yellowstone-grpc-geyser/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
@@ -11,23 +11,21 @@ use solana_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{client_error, request};
use tokio::time::interval;

pub const HEALTH_CHECK_SLOT_DISTANCE: u64 = 100;
pub static NUM_SLOTS_BEHIND: Lazy<Arc<AtomicU64>> = Lazy::new(|| Arc::new(AtomicU64::new(0)));
pub static SHOULD_DISCONNECT: Lazy<Arc<AtomicBool>> =
Lazy::new(|| Arc::new(AtomicBool::new(false)));

pub async fn fetch_node_blocks_behind_with_infinite_retry(client: &RpcClient) -> u64 {
pub async fn is_node_healthy(client: &RpcClient) -> bool {
loop {
match client.get_health().await {
Ok(()) => {
return 0;
}
Ok(()) => return true,
Err(err) => {
if let client_error::ErrorKind::RpcError(request::RpcError::RpcResponseError {
code: _,
message: _,
data: request::RpcResponseErrorData::NodeUnhealthy { num_slots_behind },
data: request::RpcResponseErrorData::NodeUnhealthy { .. },
}) = &err.kind
{
return num_slots_behind.unwrap_or(2000);
return false;
} else {
log::error!("Failed to get health: {}", err);
tokio::time::sleep(Duration::from_secs(5)).await;
@@ -38,11 +36,11 @@ pub async fn fetch_node_blocks_behind_with_infinite_retry(client: &RpcClient) ->
}
}

pub async fn keep_track_of_node_health(rpc_client: RpcClient) {
pub async fn run_forced_disconnection_monitor(rpc_client: RpcClient) {
let mut interval = interval(Duration::from_millis(100));
loop {
interval.tick().await;
let blocks_behind = fetch_node_blocks_behind_with_infinite_retry(&rpc_client).await;
NUM_SLOTS_BEHIND.store(blocks_behind, Ordering::SeqCst);
let is_healthy = !is_node_healthy(&rpc_client).await;
SHOULD_DISCONNECT.store(is_healthy, Ordering::SeqCst);
}
}
14 changes: 10 additions & 4 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use {
config::Config,
grpc::GrpcService,
metrics::{self, PrometheusService},
monitor::keep_track_of_node_health,
monitor::run_forced_disconnection_monitor,
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
@@ -77,9 +77,15 @@ impl GeyserPlugin for Plugin {
.build()
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;

// Monitor node health
let rpc_client = RpcClient::new("http://127.0.0.1:8899".to_string());
runtime.spawn(keep_track_of_node_health(rpc_client));
if config.grpc.force_disconnect_if_node_is_unhealthy {
let rpc_client = RpcClient::new(format!(
"http://127.0.0.1:{}",
config.grpc.rpc_port.expect(
"RPC port is required when force_disconnect_if_node_is_unhealthy is true"
)
));
runtime.spawn(run_forced_disconnection_monitor(rpc_client));
}

let (snapshot_channel, grpc_channel, grpc_shutdown, prometheus) =
runtime.block_on(async move {