Skip to content

Commit

Permalink
add redis pipes for processing metadata download
Browse files Browse the repository at this point in the history
  • Loading branch information
Nagaprasadvr committed Feb 5, 2025
1 parent 55d11b6 commit 9eeb7f7
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 43 deletions.
13 changes: 13 additions & 0 deletions grpc-ingest/config-ingester.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,16 @@ download_metadata:
request_timeout: 5_000
stream:
name: METADATA_JSON

download_metadata_publish:
# maximum number of concurrent tasks for processing.
max_concurrency: 10
# stream name
stream_name: METADATA_JSON
# stream max length
stream_maxlen: 100_000_000
# number of pipelines that can be processed concurrently
pipeline_count: 10
# pipeline max idle time in milliseconds
pipeline_max_idle_ms: 150

19 changes: 19 additions & 0 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,17 @@ pub struct ConfigDownloadMetadataPublish {
deserialize_with = "deserialize_usize_str"
)]
pub stream_maxlen: usize,
#[serde(
default = "ConfigDownloadMetadataPublish::default_pipeline_count",
deserialize_with = "deserialize_usize_str"
)]
pub pipeline_count: usize,

#[serde(
default = "ConfigDownloadMetadataPublish::default_pipeline_max_idle",
deserialize_with = "deserialize_duration_str"
)]
pub pipeline_max_idle: Duration,
}

impl ConfigDownloadMetadataPublish {
Expand All @@ -268,6 +279,14 @@ impl ConfigDownloadMetadataPublish {
pub const fn default_stream_maxlen() -> usize {
10_000_000
}

pub const fn default_pipeline_count() -> usize {
10
}

pub const fn default_pipeline_max_idle() -> Duration {
Duration::from_millis(150)
}
}

#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
Expand Down
140 changes: 97 additions & 43 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use {
crate::{
config::{ConfigDownloadMetadataPublish, ConfigIngester, REDIS_STREAM_DATA_KEY},
config::{ConfigDownloadMetadataPublish, ConfigIngester},
postgres::{create_pool as pg_create_pool, report_pgpool},
prom::{download_metadata_publish_time, redis_xadd_status_inc},
redis::{AccountHandle, DownloadMetadataJsonHandle, IngestStream, TransactionHandle},
redis::{
AccountHandle, DownloadMetadataJsonHandle, IngestStream, TrackedPipeline,
TransactionHandle,
},
util::create_shutdown,
},
das_core::{
Expand All @@ -12,14 +15,17 @@ use {
},
futures::stream::StreamExt,
program_transformers::ProgramTransformer,
redis::aio::MultiplexedConnection,
redis::{aio::MultiplexedConnection, streams::StreamMaxlen},
std::sync::Arc,
tokio::{
sync::mpsc::{unbounded_channel, UnboundedSender},
sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot, Mutex,
},
task::{JoinHandle, JoinSet},
time::{sleep, Duration},
},
tracing::warn,
tracing::{error, warn},
};

pub struct DownloadMetadataPublish {
Expand Down Expand Up @@ -67,57 +73,105 @@ impl DownloadMetadataPublishBuilder {

pub fn start(self) -> DownloadMetadataPublish {
let config = self.config.expect("Config must be set");
let connection = self.connection.expect("Connection must be set");

let pipes: Vec<_> = (0..config.pipeline_count)
.map(|_| Arc::new(Mutex::new(TrackedPipeline::default())))
.collect();

let (sender, mut rx) = unbounded_channel::<DownloadMetadataInfo>();
let stream = config.stream_name;
let stream_maxlen = config.stream_maxlen;
let worker_count = config.max_concurrency;
let pipeline_max_idle = config.pipeline_max_idle;
let connection = self.connection.expect("Connection must be set");

let handle = tokio::spawn(async move {
let mut tasks = JoinSet::new();
let control = tokio::spawn({
async move {
let mut flush_handles = Vec::new();
let mut shutdown_senders = Vec::new();

for pipe in &pipes {
let pipe = Arc::clone(pipe);

let mut connection = connection.clone();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();

let stream = stream.clone();
let flush_handle = tokio::spawn(async move {
loop {
tokio::select! {
_ = sleep(pipeline_max_idle) => {
let mut pipe = pipe.lock().await;
let flush = pipe.flush(&mut connection).await;

let status = flush.as_ref().map(|_| ()).map_err(|_| ());
let count = flush.as_ref().unwrap_or_else(|count| count);

redis_xadd_status_inc(&stream, "metadata_json", status, *count);
}
_ = &mut shutdown_rx => {
let mut pipe = pipe.lock().await;
let flush = pipe.flush(&mut connection).await;

let status = flush.as_ref().map(|_| ()).map_err(|_| ());
let count = flush.as_ref().unwrap_or_else(|count| count);
redis_xadd_status_inc(&stream, "metadata_json", status, *count);
break;
}
}
}
});

while let Some(download_metadata_info) = rx.recv().await {
if tasks.len() >= worker_count {
tasks.join_next().await;
flush_handles.push(flush_handle);
shutdown_senders.push(shutdown_tx);
}

let mut connection = connection.clone();
let stream = stream.clone();
let start_time = tokio::time::Instant::now();

tasks.spawn(async move {
match serde_json::to_vec(&download_metadata_info) {
Ok(info_bytes) => {
let xadd = redis::cmd("XADD")
.arg(&stream)
.arg("MAXLEN")
.arg("~")
.arg(stream_maxlen)
.arg("*")
.arg(REDIS_STREAM_DATA_KEY)
.arg(info_bytes)
.query_async::<_, redis::Value>(&mut connection)
.await;

let status = xadd.map(|_| ()).map_err(|_| ());

redis_xadd_status_inc(&stream, "metadata_json", status, 1);
let elapsed_time = start_time.elapsed().as_secs_f64();

download_metadata_publish_time(elapsed_time);
}
Err(_) => {
tracing::error!("download_metadata_info failed to bytes")
}
let mut tasks = JoinSet::new();
let mut current_pipe_index = 0;

while let Some(download_metadata_info) = rx.recv().await {
if tasks.len() >= worker_count {
tasks.join_next().await;
}
});
}

while tasks.join_next().await.is_some() {}
let stream = stream.clone();
let start_time = tokio::time::Instant::now();
let pipe = Arc::clone(&pipes[current_pipe_index]);
tasks.spawn(async move {
let mut pipe = pipe.lock().await;
match serde_json::to_vec(&download_metadata_info) {
Ok(info_bytes) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
info_bytes,
);

let elapsed_time = start_time.elapsed().as_secs_f64();

download_metadata_publish_time(elapsed_time);
}
Err(_) => {
error!("download_metadata_info failed to bytes")
}
}
});

current_pipe_index = (current_pipe_index + 1) % pipes.len();
}

while tasks.join_next().await.is_some() {}

for shutdown_tx in shutdown_senders {
let _ = shutdown_tx.send(());
}

futures::future::join_all(flush_handles).await;
}
});

DownloadMetadataPublish::new(handle, sender)
DownloadMetadataPublish::new(control, sender)
}
}

Expand Down

0 comments on commit 9eeb7f7

Please sign in to comment.