Skip to content

Commit

Permalink
trim streams
Browse files Browse the repository at this point in the history
  • Loading branch information
austbot committed Dec 13, 2022
1 parent 7c8306c commit c058063
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 20 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions plerkle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle"
description = "Geyser plugin with dynamic config reloading, message bus agnostic abstractions and a whole lot of fun."
version = "1.1.2"
version = "1.1.3"
authors = ["Metaplex Developers <[email protected]>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down Expand Up @@ -31,9 +31,9 @@ cadence-macros = "0.29.0"
chrono = "0.4.19"
tracing = "0.1.35"
hex = "0.4.3"
plerkle_messenger = { path = "../plerkle_messenger", version = "1.1.2", features = ["redis"] }
plerkle_messenger = { path = "../plerkle_messenger", version = "1.1.3", features = ["redis"] }
flatbuffers = "22.10.26"
plerkle_serialization = { path = "../plerkle_serialization", version = "1.1.2" }
plerkle_serialization = { path = "../plerkle_serialization", version = "1.1.3" }
tokio = { version = "1.23.0", features = ["full"] }
figment = { version = "0.10.6", features = ["env", "test"] }

Expand Down
19 changes: 10 additions & 9 deletions plerkle/src/geyser_plugin_nft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,12 @@ impl GeyserPlugin for Plerkle<'static> {
error!("Error adding BLOCK stream");
}

messenger.set_buffer_size(ACCOUNT_STREAM, 5000).await;
messenger.set_buffer_size(SLOT_STREAM, 5000).await;
messenger.set_buffer_size(TRANSACTION_STREAM, 500000).await;
messenger.set_buffer_size(BLOCK_STREAM, 5000).await;
messenger.set_buffer_size(ACCOUNT_STREAM, 10_000_000).await;
messenger.set_buffer_size(SLOT_STREAM, 100_000).await;
messenger
.set_buffer_size(TRANSACTION_STREAM, 10_000_000)
.await;
messenger.set_buffer_size(BLOCK_STREAM, 100_000).await;

// Receive messages in a loop as long as at least one Sender is in scope.
while let Some(data) = receiver.recv().await {
Expand Down Expand Up @@ -379,7 +381,7 @@ impl GeyserPlugin for Plerkle<'static> {
if transaction_info.is_vote || transaction_info.transaction_status_meta.status.is_err() {
return Ok(());
}

// Check if transaction was selected in config.
if let Some(transaction_selector) = &self.transaction_selector {
if !transaction_selector.is_transaction_selected(
Expand All @@ -406,11 +408,10 @@ impl GeyserPlugin for Plerkle<'static> {
builder,
};
let _ = sender.send(data).await;
safe_metric(|| {
statsd_count!("transaction_seen_event", 1, "slot-idx" => &slt_idx);
});
});
safe_metric(|| {
statsd_count!("transaction_seen_event", 1, "slot-idx" => &slt_idx);
});

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion plerkle/src/transaction_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl TransactionSelector {
.iter()
.map(|key| bs58::decode(key).into_vec().unwrap())
.collect();

Self {
mentioned_addresses,
select_all_transactions: false,
Expand Down
2 changes: 1 addition & 1 deletion plerkle_messenger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle_messenger"
description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns."
version = "1.1.2"
version = "1.1.3"
authors = ["Metaplex Developers <[email protected]>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down
8 changes: 6 additions & 2 deletions plerkle_messenger/src/redis_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ impl RedisMessenger {
error!("Message has reached maximum retries {} for id", id);
continue;
}
retained_ids.push(RecvData::new_retry(id, bytes.to_vec(), info.times_delivered));
retained_ids.push(RecvData::new_retry(
id,
bytes.to_vec(),
info.times_delivered,
));
}
}
Err(e) => error!("Redis xpending_count error {} for id {}", e, id),
Expand Down Expand Up @@ -259,7 +263,7 @@ impl Messenger for RedisMessenger {
.connection
.as_mut()
.unwrap()
.xadd(stream_key, "*", &[(DATA_KEY, &bytes)])
.xadd_maxlen(stream_key, maxlen, "*", &[(DATA_KEY, &bytes)])
.await;

if let Err(e) = result {
Expand Down
2 changes: 1 addition & 1 deletion plerkle_serialization/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle_serialization"
description = "Metaplex Flatbuffers Plerkle Serialization for Geyser plugin producer/consumer patterns."
version = "1.1.2"
version = "1.1.3"
authors = ["Metaplex Developers <[email protected]>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down

0 comments on commit c058063

Please sign in to comment.