Skip to content

Commit

Permalink
Shredstream proxy maintenance (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
esemeniuc authored Nov 21, 2024
1 parent e2e50db commit d5a747b
Show file tree
Hide file tree
Showing 8 changed files with 2,041 additions and 1,300 deletions.
3,289 changes: 2,017 additions & 1,272 deletions Cargo.lock

Large diffs are not rendered by default.

26 changes: 12 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ members = [
resolver = "2"

[workspace.package]
version = "0.1.9"
version = "0.2.0"
description = "Fast path to receive shreds from Jito, forwarding to local consumers. See https://jito-labs.gitbook.io/mev/searcher-services/shredstream for details."
authors = ["Jito Team <[email protected]>"]
homepage = "https://jito.wtf/"
Expand All @@ -22,26 +22,24 @@ clap = { version = "4", features = ["derive", "env"] }
crossbeam-channel = "0.5.8"
dashmap = "5"
env_logger = "0.11"
histogram = "0.11.0"
hostname = "0.4.0"
itertools = "0.13.0"
jito-protos = { path = "jito_protos" }
log = "0.4.17"
prost = "0.12"
prost-types = "0.12"
log = "0.4"
prost = "0.13"
prost-types = "0.13"
protobuf-src = "2"
rand = "0.8"
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde_json = "1"
signal-hook = "0.3"
solana-client = "=1.18"
solana-measure = "=1.18"
solana-metrics = "=1.18"
solana-net-utils = "=1.18"
solana-perf = "=1.18"
solana-sdk = "=1.18"
solana-streamer = "=1.18"
solana-client = "2.0.16"
solana-metrics = "2.0.16"
solana-net-utils = "2.0.16"
solana-perf = "2.0.16"
solana-sdk = "2.0.16"
solana-streamer = "2.0.16"
thiserror = "1"
tokio = "1"
tonic = { version = "0.10", features = ["tls", "tls-roots", "tls-webpki-roots"] }
tonic-build = "0.10.2"
tonic = { version = "0.12", features = ["tls", "tls-roots", "tls-webpki-roots"] }
tonic-build = "0.12"
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1.4.0
FROM --platform=linux/amd64 rust:1.73-slim-bullseye as builder
FROM --platform=linux/amd64 rust:1.79-slim-bullseye as builder

RUN apt-get -qq update && apt-get install -qq -y ca-certificates libssl-dev protobuf-compiler pkg-config libudev-dev zlib1g-dev llvm clang cmake make libprotobuf-dev g++
RUN rustup component add rustfmt && update-ca-certificates
Expand Down
2 changes: 1 addition & 1 deletion jito_protos/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() {
}

configure()
.compile(
.compile_protos(
&[
"protos/auth.proto",
"protos/shared.proto",
Expand Down
3 changes: 0 additions & 3 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@ clap = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
env_logger = { workspace = true }
histogram = { workspace = true }
hostname = { workspace = true }
itertools = { workspace = true }
jito-protos = { workspace = true }
log = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
protobuf-src = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
serde_json = { workspace = true }
signal-hook = { workspace = true }
solana-client = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
Expand Down
11 changes: 6 additions & 5 deletions proxy/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,16 @@ pub fn start_forwarder_threads(
let (packet_sender, packet_receiver) = crossbeam_channel::unbounded();
let stats = Arc::new(StreamerReceiveStats::new("shredstream_proxy-listen_thread"));
let listen_thread = streamer::receiver(
format!("ssListen{thread_id}"),
Arc::new(incoming_shred_socket),
exit.clone(),
packet_sender,
recycler.clone(),
stats.clone(),
Duration::default(), // do not coalesce since batching consumes more cpu cycles and adds latency.
true,
false,
None,
true,
false,
);

let report_metrics_thread = {
Expand Down Expand Up @@ -147,11 +148,11 @@ pub fn start_forwarder_threads(
/// Returns Err when unable to receive packets.
fn recv_from_channel_and_send_multiple_dest(
maybe_packet_batch: Result<PacketBatch, RecvError>,
deduper: &Arc<RwLock<Deduper<2, [u8]>>>,
deduper: &RwLock<Deduper<2, [u8]>>,
send_socket: &UdpSocket,
local_dest_sockets: &Arc<Vec<SocketAddr>>,
local_dest_sockets: &[SocketAddr],
debug_trace_shred: bool,
metrics: &Arc<ShredMetrics>,
metrics: &ShredMetrics,
) -> Result<(), ShredstreamProxyError> {
let packet_batch = maybe_packet_batch.map_err(ShredstreamProxyError::RecvError)?;
let trace_shred_received_time = SystemTime::now();
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ fn main() -> Result<(), ShredstreamProxyError> {
ProxySubcommands::Shredstream(x) => x.common_args,
ProxySubcommands::ForwardOnly(x) => x,
};
set_host_id(hostname::get().unwrap().into_string().unwrap());
set_host_id(hostname::get()?.into_string().unwrap());
if (args.endpoint_discovery_url.is_none() && args.discovered_endpoints_port.is_some())
|| (args.endpoint_discovery_url.is_some() && args.discovered_endpoints_port.is_none())
{
Expand Down Expand Up @@ -226,7 +226,7 @@ fn main() -> Result<(), ShredstreamProxyError> {
}));
}

let runtime = Runtime::new().unwrap();
let runtime = Runtime::new()?;
let (grpc_restart_signal_s, grpc_restart_signal_r) = crossbeam_channel::bounded(1);
let mut thread_handles = vec![];
if let ProxySubcommands::Shredstream(args) = shredstream_args {
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/token_authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl ClientInterceptor {
let now = SystemTime::now();

let refresh_token_ttl =
SystemTime::try_from(refresh_token.expires_at_utc.as_ref().unwrap().clone())
SystemTime::try_from(*refresh_token.expires_at_utc.as_ref().unwrap())
.unwrap()
.duration_since(now)
.unwrap_or_default();
Expand Down Expand Up @@ -162,7 +162,7 @@ impl ClientInterceptor {
continue;
}

let access_token_ttl = SystemTime::try_from(access_token_expiration.clone())
let access_token_ttl = SystemTime::try_from(access_token_expiration)
.unwrap()
.duration_since(now)
.unwrap_or_default();
Expand Down

0 comments on commit d5a747b

Please sign in to comment.