Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
skip reading new messages if the ack buffer is full. track running ta…
Browse files Browse the repository at this point in the history
…sks. (#164)
kespinola authored Oct 13, 2024
1 parent 0005b3e commit d092ad8
Showing 5 changed files with 57 additions and 19 deletions.
4 changes: 3 additions & 1 deletion grpc-ingest/config-ingester.yml
Original file line number Diff line number Diff line change
@@ -3,19 +3,21 @@ redis: redis://localhost:6379
postgres:
url: postgres://solana:solana@localhost/solana
min_connections: 10
max_connections: 50 # `max_connection` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible
max_connections: 50
snapshots:
name: SNAPSHOTS
max_concurrency: 10
batch_size: 100
xack_batch_max_idle_ms: 1_000
xack_buffer_size: 10_000
xack_batch_max_size: 500
accounts:
name: ACCOUNTS
max_concurrency: 10
batch_size: 100
xack_batch_max_idle_ms: 1_000
xack_buffer_size: 10_000
xack_batch_max_size: 500
transactions:
name: TRANSACTIONS
download_metadata:
24 changes: 17 additions & 7 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use {
crate::{
config::ConfigGrpc, prom::redis_xadd_status_inc, redis::TrackedPipeline,
config::ConfigGrpc,
prom::{grpc_tasks_total_dec, grpc_tasks_total_inc, redis_xadd_status_inc},
redis::TrackedPipeline,
util::create_shutdown,
},
anyhow::Context,
@@ -56,6 +58,8 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock

let subscribe_tx = Arc::clone(&self.subscribe_tx);

grpc_tasks_total_inc();

async move {
match job {
GrpcJob::FlushRedisPipe => {
@@ -68,7 +72,7 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
let counts = flush.as_ref().unwrap_or_else(|counts| counts);

for (stream, count) in counts.iter() {
debug!(message = "Redis pipe flushed", ?stream, ?status, ?count);
debug!(target: "grpc2redis", action = "flush_redis_pipe", stream = ?stream, status = ?status, count = ?count);
redis_xadd_status_inc(stream, status, *count);
}
}
@@ -91,6 +95,7 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
"*",
account.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_account_update", stream = ?accounts_stream, maxlen = ?accounts_stream_maxlen);
}
UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
@@ -99,6 +104,7 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock
"*",
transaction.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_transaction_update", stream = ?transactions_stream, maxlen = ?transactions_stream_maxlen);
}
UpdateOneof::Ping(_) => {
let ping = subscribe_tx
@@ -112,25 +118,29 @@ impl<'a> AsyncHandler<GrpcJob, topograph::executor::Handle<'a, GrpcJob, Nonblock

match ping {
Ok(_) => {
debug!(message = "Ping sent successfully", id = PING_ID)
debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID)
}
Err(err) => {
warn!(message = "Failed to send ping", ?err, id = PING_ID)
warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID)
}
}
}
UpdateOneof::Pong(pong) => {
if pong.id == PING_ID {
debug!(message = "Pong received", id = PING_ID);
debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID);
} else {
warn!(message = "Unknown pong id received", id = pong.id);
warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id);
}
}
var => warn!(message = "Unknown update variant", ?var),
var => {
warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant", ?var)
}
}
}
}
}

grpc_tasks_total_dec();
}
}
}
7 changes: 4 additions & 3 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
@@ -122,10 +122,11 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {

report.abort();

accounts.stop().await?;
transactions.stop().await?;
futures::future::join_all(vec![accounts.stop(), transactions.stop(), snapshots.stop()])
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
download_metadatas.stop().await?;
snapshots.stop().await?;

pool.close().await;

28 changes: 26 additions & 2 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
@@ -62,6 +62,16 @@ lazy_static::lazy_static! {
Opts::new("ingest_tasks", "Number of tasks spawned for ingest"),
&["stream"]
).unwrap();

static ref ACK_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("ack_tasks", "Number of tasks spawned for ack redis messages"),
&["stream"]
).unwrap();

static ref GRPC_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("grpc_tasks", "Number of tasks spawned for writing grpc messages to redis "),
&[]
).unwrap();
}

pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
@@ -84,6 +94,8 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(PROGRAM_TRANSFORMER_TASK_STATUS_COUNT);
register!(DOWNLOAD_METADATA_INSERTED_COUNT);
register!(INGEST_TASKS);
register!(ACK_TASKS);
register!(GRPC_TASKS);

VERSION_INFO_METRIC
.with_label_values(&[
@@ -182,8 +194,20 @@ pub fn ingest_tasks_total_dec(stream: &str) {
INGEST_TASKS.with_label_values(&[stream]).dec()
}

pub fn ingest_tasks_reset(stream: &str) {
INGEST_TASKS.with_label_values(&[stream]).set(0)
pub fn ack_tasks_total_inc(stream: &str) {
ACK_TASKS.with_label_values(&[stream]).inc()
}

pub fn ack_tasks_total_dec(stream: &str) {
ACK_TASKS.with_label_values(&[stream]).dec()
}

pub fn grpc_tasks_total_inc() {
GRPC_TASKS.with_label_values(&[]).inc()
}

pub fn grpc_tasks_total_dec() {
GRPC_TASKS.with_label_values(&[]).dec()
}

#[derive(Debug, Clone, Copy)]
13 changes: 7 additions & 6 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
@@ -2,9 +2,9 @@ use {
crate::{
config::{ConfigIngestStream, REDIS_STREAM_DATA_KEY},
prom::{
ingest_tasks_reset, ingest_tasks_total_dec, ingest_tasks_total_inc,
program_transformer_task_status_inc, redis_xack_inc, redis_xlen_set, redis_xread_inc,
ProgramTransformerTaskStatusKind,
ack_tasks_total_dec, ack_tasks_total_inc, ingest_tasks_total_dec,
ingest_tasks_total_inc, program_transformer_task_status_inc, redis_xack_inc,
redis_xlen_set, redis_xread_inc, ProgramTransformerTaskStatusKind,
},
},
das_core::{DownloadMetadata, DownloadMetadataInfo},
@@ -400,6 +400,7 @@ impl<'a>

let count = ids.len();

ack_tasks_total_inc(&config.name);
async move {
match redis::pipe()
.atomic()
@@ -425,6 +426,8 @@ impl<'a>
);
}
}

ack_tasks_total_dec(&config.name);
}
}
}
@@ -548,8 +551,6 @@ impl<H: MessageHandler> IngestStream<H> {
}
});

ingest_tasks_reset(&config.name);

let executor = Executor::builder(Nonblock(Tokio))
.max_concurrency(Some(config.max_concurrency))
.build_async(IngestStreamHandler::new(
@@ -602,7 +603,7 @@ impl<H: MessageHandler> IngestStream<H> {

break;
},
result = self.read(&mut connection) => {
result = self.read(&mut connection), if ack_tx.capacity() >= config.batch_size => {
match result {
Ok(reply) => {
for StreamKey { key: _, ids } in reply.keys {

0 comments on commit d092ad8

Please sign in to comment.