Skip to content

Commit

Permalink
Merge branch 'grpc-ingest' into triton-build
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Jan 24, 2025
2 parents 841ffa7 + e678814 commit 72ff117
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 82 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/src/metadata_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ pub async fn create_download_metadata_notifier(
pub struct MetadataJsonDownloadWorkerArgs {
/// The number of worker threads
#[arg(long, env, default_value = "25")]
metadata_json_download_worker_count: usize,
pub metadata_json_download_worker_count: usize,
/// The request timeout in milliseconds
#[arg(long, env, default_value = "1000")]
metadata_json_download_worker_request_timeout: u64,
pub metadata_json_download_worker_request_timeout: u64,
}

impl MetadataJsonDownloadWorkerArgs {
Expand Down
2 changes: 2 additions & 0 deletions grpc-ingest/config-ingester.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,7 @@ download_metadata:
retry_max_delay: 10
# retry min delay in milliseconds
retry_min_delay: 1
# request timeout in milliseconds
request_timeout: 5_000
stream:
name: METADATA_JSON
186 changes: 106 additions & 80 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {

let subscriptions = config.subscriptions.clone();

let (global_shutdown_tx, mut global_shutdown_rx) = oneshot::channel();
let global_shutdown_tx = Arc::new(Mutex::new(Some(global_shutdown_tx)));

let mut subscription_tasks = Vec::new();
for (label, subscription_config) in subscriptions {
let subscription = Subscription {
Expand All @@ -48,19 +51,27 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
.config(Arc::clone(&config))
.connection(connection.clone())
.subscription(subscription)
.start()
.start(Arc::clone(&global_shutdown_tx))
.await?;

subscription_tasks.push(task);
}

if let Some(signal) = shutdown.next().await {
warn!(
target: "grpc2redis",
action = "shutdown_signal_received",
message = "Shutdown signal received, waiting for spawned tasks to complete",
signal = ?signal
);
tokio::select! {
_ = &mut global_shutdown_rx => {
warn!(
target: "grpc2redis",
action = "global_shutdown_signal_received",
message = "Global shutdown signal received, stopping all tasks"
);
}
_ = shutdown.next() => {
warn!(
target: "grpc2redis",
action = "shutdown_signal_received",
message = "Shutdown signal received, waiting for spawned tasks to complete"
);
}
}

futures::future::join_all(
Expand Down Expand Up @@ -108,7 +119,10 @@ impl SubscriptionTask {
self
}

pub async fn start(mut self) -> anyhow::Result<SubscriptionTaskStop> {
pub async fn start(
mut self,
global_shutdown_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
) -> anyhow::Result<SubscriptionTaskStop> {
let config = Arc::clone(&self.config);
let connection = self
.connection
Expand Down Expand Up @@ -164,6 +178,7 @@ impl SubscriptionTask {
let (mut subscribe_tx, stream) = dragon_mouth_client
.subscribe_with_request(Some(request))
.await?;
let global_shutdown_tx = Arc::clone(&global_shutdown_tx);

let control = tokio::spawn({
async move {
Expand Down Expand Up @@ -215,89 +230,100 @@ impl SubscriptionTask {

loop {
tokio::select! {
Some(Ok(msg)) = stream.next() => {
match msg.update_oneof {
Some(UpdateOneof::Account(_)) | Some(UpdateOneof::Transaction(_)) => {
if tasks.len() >= stream_config.max_concurrency {
tasks.next().await;
}
grpc_tasks_total_inc(&label, &stream_config.name);

tasks.push(tokio::spawn({
let pipe = Arc::clone(&pipes[current_pipe_index]);
let label = label.clone();
let stream_config = Arc::clone(&stream_config);

async move {
let stream = stream_config.name.clone();
let stream_maxlen = stream_config.max_len;

let SubscribeUpdate { update_oneof, .. } = msg;

let mut pipe = pipe.lock().await;

if let Some(update) = update_oneof {
match update {
UpdateOneof::Account(account) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
account.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_account_update",label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
event = stream.next() => {
match event {
Some(Ok(msg)) => {
match msg.update_oneof {
Some(UpdateOneof::Account(_)) | Some(UpdateOneof::Transaction(_)) => {
if tasks.len() >= stream_config.max_concurrency {
tasks.next().await;
}
grpc_tasks_total_inc(&label, &stream_config.name);

tasks.push(tokio::spawn({
let pipe = Arc::clone(&pipes[current_pipe_index]);
let label = label.clone();
let stream_config = Arc::clone(&stream_config);

async move {
let stream = stream_config.name.clone();
let stream_maxlen = stream_config.max_len;

let SubscribeUpdate { update_oneof, .. } = msg;

let mut pipe = pipe.lock().await;

if let Some(update) = update_oneof {
match update {
UpdateOneof::Account(account) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
account.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_account_update", label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
}

UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
transaction.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_transaction_update", label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant", label = ?label, message = "Unknown update variant");
}
}
}

UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
transaction.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_transaction_update",label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant",label = ?label, message = "Unknown update variant")
}
grpc_tasks_total_dec(&label, &stream_config.name);
}
}

}));

grpc_tasks_total_dec(&label, &stream_config.name);
current_pipe_index = (current_pipe_index + 1) % pipes.len();
}
}));

current_pipe_index = (current_pipe_index + 1) % pipes.len();
}
Some(UpdateOneof::Ping(_)) => {
let ping = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: PING_ID }),
..Default::default()
})
.await;

match ping {
Ok(_) => {
debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID)
Some(UpdateOneof::Ping(_)) => {
let ping = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: PING_ID }),
..Default::default()
})
.await;

match ping {
Ok(_) => {
debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID);
}
Err(err) => {
warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID);
}
}
}
Err(err) => {
warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID)
Some(UpdateOneof::Pong(pong)) => {
if pong.id == PING_ID {
debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID);
} else {
warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id);
}
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant");
}
}
}
Some(UpdateOneof::Pong(pong)) => {
if pong.id == PING_ID {
debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID);
} else {
warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id);
}
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant", ?msg.update_oneof)
break;
}
}

let mut global_shutdown_tx = global_shutdown_tx.lock().await;
if let Some(global_shutdown_tx) = global_shutdown_tx.take() {
let _ = global_shutdown_tx.send(());
}
}
_ = &mut shutdown_rx => {
debug!(target: "grpc2redis", action = "shutdown_signal_received", message = "Shutdown signal received, stopping subscription task", ?label);
Expand Down
1 change: 1 addition & 0 deletions ops/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ tokio = { workspace = true }
tracing = { workspace = true }
mpl-token-metadata = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }
4 changes: 4 additions & 0 deletions ops/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod account;
mod bubblegum;
mod metadata;

use account::{subcommand as account_subcommand, AccountCommand};
use anyhow::Result;
Expand All @@ -19,6 +20,8 @@ enum Command {
Bubblegum(BubblegumCommand),
#[clap(name = "account")]
Account(AccountCommand),
#[clap(name = "metadata_json")]
MetadataJson(metadata::MetadataJsonCommand),
}

#[tokio::main]
Expand All @@ -30,6 +33,7 @@ async fn main() -> Result<()> {
match args.command {
Command::Bubblegum(subcommand) => bubblegum_subcommand(subcommand).await?,
Command::Account(subcommand) => account_subcommand(subcommand).await?,
Command::MetadataJson(subcommand) => metadata::subcommand(subcommand).await?,
}

Ok(())
Expand Down
Loading

0 comments on commit 72ff117

Please sign in to comment.