Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bubblegum verify #150

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat: grpc ingest monitor
kespinola committed Oct 29, 2024
commit 5127a2b52efd0599f1aeed8437acc435f74f4fea
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 20 additions & 21 deletions bubblegum/src/lib.rs
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ mod tree;
use das_core::{MetadataJsonDownloadWorkerArgs, Rpc};
pub use error::ErrorKind;
mod verify;
pub use verify::ProofReport;

use anyhow::Result;
use backfill::worker::{ProgramTransformerWorkerArgs, SignatureWorkerArgs, TreeWorkerArgs};
@@ -18,7 +19,7 @@ use sea_orm::{EntityTrait, QueryFilter};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use std::str::FromStr;
use tracing::{error, info};
use tracing::error;
use tree::TreeResponse;

#[derive(Clone)]
@@ -50,16 +51,6 @@ pub struct BackfillArgs {
pub tree_worker: TreeWorkerArgs,
}

#[derive(Debug, Parser, Clone)]
pub struct VerifyArgs {
/// The list of trees to verify. If not specified, all trees will be crawled.
#[arg(long, env, use_value_delimiter = true)]
pub only_trees: Option<Vec<String>>,

#[arg(long, env, default_value = "20")]
pub max_concurrency: usize,
}

pub async fn start_backfill(context: BubblegumContext, args: BackfillArgs) -> Result<()> {
let trees = if let Some(ref only_trees) = args.only_trees {
TreeResponse::find(&context.solana_rpc, only_trees.clone()).await?
@@ -161,25 +152,33 @@ pub async fn start_bubblegum_replay(
Ok(())
}

pub async fn verify_bubblegum(context: BubblegumContext, args: VerifyArgs) -> Result<()> {
#[derive(Debug, Parser, Clone)]
pub struct VerifyArgs {
/// The list of trees to verify. If not specified, all trees will be crawled.
#[arg(long, env, use_value_delimiter = true)]
pub only_trees: Option<Vec<String>>,

#[arg(long, env, default_value = "20")]
pub max_concurrency: usize,
}

pub async fn verify_bubblegum(
context: BubblegumContext,
args: VerifyArgs,
) -> Result<Vec<verify::ProofReport>> {
let trees = if let Some(ref only_trees) = args.only_trees {
TreeResponse::find(&context.solana_rpc, only_trees.clone()).await?
} else {
TreeResponse::all(&context.solana_rpc).await?
};

let mut reports = Vec::new();

for tree in trees {
let report = verify::check(context.clone(), tree, args.max_concurrency).await?;

info!(
"Tree: {}, Total Leaves: {}, Incorrect Proofs: {}, Not Found Proofs: {}, Correct Proofs: {}",
report.tree_pubkey,
report.total_leaves,
report.incorrect_proofs,
report.not_found_proofs,
report.correct_proofs
);
reports.push(report);
}

Ok(())
Ok(reports)
}
83 changes: 43 additions & 40 deletions bubblegum/src/verify.rs
Original file line number Diff line number Diff line change
@@ -2,21 +2,15 @@ use super::BubblegumContext;
use crate::error::ErrorKind;
use crate::tree::TreeResponse;
use anyhow::{anyhow, Result};
use borsh::BorshDeserialize;
use digital_asset_types::dapi::get_proof_for_asset;
use digital_asset_types::rpc::AssetProof;
use futures::stream::{FuturesUnordered, StreamExt};
use mpl_bubblegum::accounts::TreeConfig;
use sea_orm::SqlxPostgresConnector;
use sha3::{Digest, Keccak256};
use solana_sdk::{pubkey::Pubkey, syscalls::MAX_CPI_INSTRUCTION_ACCOUNTS};
use spl_account_compression::{
canopy::fill_in_proof_from_canopy,
concurrent_tree_wrapper::ProveLeafArgs,
state::{
merkle_tree_get_size, ConcurrentMerkleTreeHeader, CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1,
},
};
use solana_sdk::pubkey::Pubkey;
use spl_account_compression::concurrent_tree_wrapper::ProveLeafArgs;
use std::fmt;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;
@@ -65,7 +59,7 @@ fn hash(left: &[u8], right: &[u8]) -> [u8; 32] {
hash
}

fn verify_merkle_proof(root: [u8; 32], proof: &ProveLeafArgs) -> bool {
fn verify_merkle_proof(proof: &ProveLeafArgs) -> bool {
let mut node = proof.leaf;
for (i, sibling) in proof.proof_vec.iter().enumerate() {
if (proof.index >> i) & 1 == 0 {
@@ -74,22 +68,43 @@ fn verify_merkle_proof(root: [u8; 32], proof: &ProveLeafArgs) -> bool {
node = hash(sibling, &node);
}
}
node == root
node == proof.current_root
}

#[derive(Debug)]
fn leaf_proof_result(proof: AssetProof) -> Result<ProofResult, anyhow::Error> {
match ProveLeafArgs::try_from_asset_proof(proof) {
Ok(proof) if verify_merkle_proof(&proof) => Ok(ProofResult::Correct),
Ok(_) => Ok(ProofResult::Incorrect),
Err(_) => Ok(ProofResult::Corrupt),
}
}

#[derive(Debug, Default)]
pub struct ProofReport {
pub tree_pubkey: Pubkey,
pub total_leaves: usize,
pub incorrect_proofs: usize,
pub not_found_proofs: usize,
pub correct_proofs: usize,
pub corrupt_proofs: usize,
}

enum ProofResult {
Correct,
Incorrect,
NotFound,
Corrupt,
}

impl fmt::Display for ProofResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ProofResult::Correct => write!(f, "Correct proof found"),
ProofResult::Incorrect => write!(f, "Incorrect proof found"),
ProofResult::NotFound => write!(f, "Proof not found"),
ProofResult::Corrupt => write!(f, "Corrupt proof found"),
}
}
}

pub async fn check(
@@ -111,9 +126,7 @@ pub async fn check(
let report = Arc::new(Mutex::new(ProofReport {
tree_pubkey: tree.pubkey,
total_leaves: tree_config.num_minted as usize,
incorrect_proofs: 0,
not_found_proofs: 0,
correct_proofs: 0,
..ProofReport::default()
}));

let mut tasks = FuturesUnordered::new();
@@ -124,41 +137,31 @@ pub async fn check(
}

let db = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone());
let tree_pubkey = tree.pubkey.clone();
let tree_pubkey = tree.pubkey;
let report = Arc::clone(&report);

tasks.push(tokio::spawn(async move {
let (asset, _) = Pubkey::find_program_address(
&[b"asset", &tree_pubkey.to_bytes(), &i.to_le_bytes()],
&mpl_bubblegum::ID,
);
let result: Result<ProofResult, anyhow::Error> =
match get_proof_for_asset(&db, asset.to_bytes().to_vec()).await {
Ok(proof) => match ProveLeafArgs::try_from_asset_proof(proof) {
Ok(prove_leaf_args) => {
if verify_merkle_proof(prove_leaf_args.current_root, &prove_leaf_args) {
Ok(ProofResult::Correct)
} else {
Ok(ProofResult::Incorrect)
}
}
Err(_) => Ok(ProofResult::Incorrect),
},
Err(_) => Ok(ProofResult::NotFound),
};

if let Ok(proof_result) = result {
let proof_lookup: Result<ProofResult, anyhow::Error> =
get_proof_for_asset(&db, asset.to_bytes().to_vec())
.await
.map_or_else(|_| Ok(ProofResult::NotFound), leaf_proof_result);

if let Ok(proof_result) = proof_lookup {
let mut report = report.lock().await;
match proof_result {
ProofResult::Correct => report.correct_proofs += 1,
ProofResult::Incorrect => {
report.incorrect_proofs += 1;
error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "Incorrect proof found");
}
ProofResult::NotFound => {
report.not_found_proofs += 1;
error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "Proof not found");
}
ProofResult::Incorrect => report.incorrect_proofs += 1,
ProofResult::NotFound => report.not_found_proofs += 1,
ProofResult::Corrupt => report.corrupt_proofs += 1,
}
if let ProofResult::Incorrect | ProofResult::NotFound | ProofResult::Corrupt =
proof_result
{
error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "{}", proof_result);
}
}
}));
1 change: 1 addition & 0 deletions grpc-ingest/Cargo.toml
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ publish = { workspace = true }
anyhow = { workspace = true }
async-stream = { workspace = true }
atty = { workspace = true }
das-bubblegum = { workspace = true }
sqlx = { workspace = true, features = [
"macros",
"runtime-tokio-rustls",
8 changes: 8 additions & 0 deletions grpc-ingest/config-monitor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
prometheus: 127.0.0.1:8876
rpc: http://127.0.0.1:8899
postgres:
url: postgres://solana:solana@localhost/solana
min_connections: 10
max_connections: 50
bubblegum:
only_trees: null
45 changes: 37 additions & 8 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
@@ -214,7 +214,7 @@ where
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigIngester {
pub redis: String,
pub postgres: ConfigIngesterPostgres,
pub postgres: ConfigPostgres,
pub download_metadata: ConfigIngesterDownloadMetadata,
pub snapshots: ConfigIngestStream,
pub accounts: ConfigIngestStream,
@@ -231,31 +231,31 @@ pub enum ConfigIngesterRedisStreamType {
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigIngesterPostgres {
pub struct ConfigPostgres {
pub url: String,
#[serde(
default = "ConfigIngesterPostgres::default_min_connections",
default = "ConfigPostgres::default_min_connections",
deserialize_with = "deserialize_usize_str"
)]
pub min_connections: usize,
#[serde(
default = "ConfigIngesterPostgres::default_max_connections",
default = "ConfigPostgres::default_max_connections",
deserialize_with = "deserialize_usize_str"
)]
pub max_connections: usize,
#[serde(
default = "ConfigIngesterPostgres::default_idle_timeout",
default = "ConfigPostgres::default_idle_timeout",
deserialize_with = "deserialize_duration_str"
)]
pub idle_timeout: Duration,
#[serde(
default = "ConfigIngesterPostgres::default_max_lifetime",
default = "ConfigPostgres::default_max_lifetime",
deserialize_with = "deserialize_duration_str"
)]
pub max_lifetime: Duration,
}

impl ConfigIngesterPostgres {
impl ConfigPostgres {
pub const fn default_min_connections() -> usize {
10
}
@@ -337,4 +337,33 @@ impl ConfigIngesterDownloadMetadata {
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigMonitor {}
pub struct ConfigMonitor {
pub postgres: ConfigPostgres,
pub rpc: String,
pub bubblegum: ConfigBubblegumVerify,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigBubblegumVerify {
#[serde(
default = "ConfigBubblegumVerify::default_report_interval",
deserialize_with = "deserialize_duration_str"
)]
pub report_interval: Duration,
#[serde(default)]
pub only_trees: Option<Vec<String>>,
#[serde(
default = "ConfigBubblegumVerify::default_max_concurrency",
deserialize_with = "deserialize_usize_str"
)]
pub max_concurrency: usize,
}

impl ConfigBubblegumVerify {
pub const fn default_report_interval() -> Duration {
Duration::from_millis(5 * 60 * 1000)
}
pub const fn default_max_concurrency() -> usize {
20
}
}
44 changes: 43 additions & 1 deletion grpc-ingest/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,47 @@
use crate::config::ConfigMonitor;
use crate::postgres::create_pool;
use crate::util::create_shutdown;
use crate::{config::ConfigMonitor, prom::update_tree_proof_report};
use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs};
use das_core::{Rpc, SolanaRpcArgs};
use futures::stream::StreamExt;
use tracing::error;

pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> {
let mut shutdown = create_shutdown()?;
let database_pool = create_pool(config.postgres).await?;
let rpc = Rpc::from_config(&SolanaRpcArgs {
solana_rpc_url: config.rpc,
});

let bubblegum_verify = tokio::spawn(async move {
loop {
let bubblegum_context = BubblegumContext::new(database_pool.clone(), rpc.clone());
let verify_args = VerifyArgs {
only_trees: config.bubblegum.only_trees.clone(),
max_concurrency: config.bubblegum.max_concurrency,
};

match verify_bubblegum(bubblegum_context, verify_args).await {
Ok(reports) => {
for report in reports {
update_tree_proof_report(&report);
}
}
Err(e) => {
error!(
message = "Error verifying bubblegum",
error = ?e
);
}
}

tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
}
});

if let Some(_signal) = shutdown.next().await {}

bubblegum_verify.abort();

Ok(())
}
4 changes: 2 additions & 2 deletions grpc-ingest/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
config::ConfigIngesterPostgres,
config::ConfigPostgres,
prom::{pgpool_connections_set, PgpoolConnectionsKind},
},
sqlx::{
@@ -9,7 +9,7 @@ use {
},
};

pub async fn create_pool(config: ConfigIngesterPostgres) -> anyhow::Result<PgPool> {
pub async fn create_pool(config: ConfigPostgres) -> anyhow::Result<PgPool> {
let options: PgConnectOptions = config.url.parse()?;
PgPoolOptions::new()
.min_connections(config.min_connections.try_into()?)
53 changes: 53 additions & 0 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{redis::RedisStreamMessageError, version::VERSION as VERSION_INFO},
das_bubblegum::ProofReport,
das_core::MetadataJsonTaskError,
hyper::{
server::conn::AddrStream,
@@ -72,6 +73,31 @@ lazy_static::lazy_static! {
Opts::new("grpc_tasks", "Number of tasks spawned for writing grpc messages to redis "),
&[]
).unwrap();

static ref BUBBLEGUM_TREE_TOTAL_LEAVES: IntGaugeVec = IntGaugeVec::new(
Opts::new("bubblegum_tree_total_leaves", "Total number of leaves in the bubblegum tree"),
&["tree"]
).unwrap();

static ref BUBBLEGUM_TREE_INCORRECT_PROOFS: IntGaugeVec = IntGaugeVec::new(
Opts::new("bubblegum_tree_incorrect_proofs", "Number of incorrect proofs in the bubblegum tree"),
&["tree"]
).unwrap();

static ref BUBBLEGUM_TREE_NOT_FOUND_PROOFS: IntGaugeVec = IntGaugeVec::new(
Opts::new("bubblegum_tree_not_found_proofs", "Number of not found proofs in the bubblegum tree"),
&["tree"]
).unwrap();

static ref BUBBLEGUM_TREE_CORRECT_PROOFS: IntGaugeVec = IntGaugeVec::new(
Opts::new("bubblegum_tree_correct_proofs", "Number of correct proofs in the bubblegum tree"),
&["tree"]
).unwrap();

static ref BUBBLEGUM_TREE_CORRUPT_PROOFS: IntGaugeVec = IntGaugeVec::new(
Opts::new("bubblegum_tree_corrupt_proofs", "Number of corrupt proofs in the bubblegum tree"),
&["tree"]
).unwrap();
}

pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
@@ -96,6 +122,11 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(INGEST_TASKS);
register!(ACK_TASKS);
register!(GRPC_TASKS);
register!(BUBBLEGUM_TREE_TOTAL_LEAVES);
register!(BUBBLEGUM_TREE_INCORRECT_PROOFS);
register!(BUBBLEGUM_TREE_NOT_FOUND_PROOFS);
register!(BUBBLEGUM_TREE_CORRECT_PROOFS);
register!(BUBBLEGUM_TREE_CORRUPT_PROOFS);

VERSION_INFO_METRIC
.with_label_values(&[
@@ -319,3 +350,25 @@ pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKin
.with_label_values(&[kind.to_str()])
.inc()
}

pub fn update_tree_proof_report(report: &ProofReport) {
BUBBLEGUM_TREE_TOTAL_LEAVES
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.total_leaves as i64);

BUBBLEGUM_TREE_INCORRECT_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.incorrect_proofs as i64);

BUBBLEGUM_TREE_NOT_FOUND_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.not_found_proofs as i64);

BUBBLEGUM_TREE_CORRECT_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.correct_proofs as i64);

BUBBLEGUM_TREE_CORRUPT_PROOFS
.with_label_values(&[&report.tree_pubkey.to_string()])
.set(report.corrupt_proofs as i64);
}
3 changes: 2 additions & 1 deletion ops/Cargo.toml
Original file line number Diff line number Diff line change
@@ -36,5 +36,6 @@ spl-account-compression = { workspace = true }
sqlx = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
mpl-token-metadata = { workspace = true }
serde_json = { workspace = true }
serde_json = { workspace = true }
14 changes: 13 additions & 1 deletion ops/src/bubblegum/verify.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ use anyhow::Result;
use clap::Parser;
use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs};
use das_core::{connect_db, PoolArgs, Rpc, SolanaRpcArgs};
use tracing::info;

#[derive(Debug, Parser, Clone)]
pub struct Args {
@@ -24,7 +25,18 @@ pub async fn run(config: Args) -> Result<()> {
let solana_rpc = Rpc::from_config(&config.solana);
let context = BubblegumContext::new(database_pool, solana_rpc);

verify_bubblegum(context, config.verify_bubblegum).await?;
let reports = verify_bubblegum(context, config.verify_bubblegum).await?;

for report in reports {
info!(
"Tree: {}, Total Leaves: {}, Incorrect Proofs: {}, Not Found Proofs: {}, Correct Proofs: {}",
report.tree_pubkey,
report.total_leaves,
report.incorrect_proofs,
report.not_found_proofs,
report.correct_proofs
);
}

Ok(())
}