Skip to content

Commit

Permalink
chore(voyager): improvements and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
benluelo committed Jan 28, 2025
1 parent 37178d6 commit 219688e
Show file tree
Hide file tree
Showing 17 changed files with 1,042 additions and 980 deletions.
4 changes: 2 additions & 2 deletions lib/chain-utils/src/keyring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crossbeam_queue::ArrayQueue;
use futures::Future;
use rand::prelude::SliceRandom;
use serde::{Deserialize, Serialize};
use tracing::{info_span, warn, Instrument};
use tracing::{debug, info_span, Instrument};

pub trait ChainKeyring {
type Address: Hash + Eq + Clone + Display + Send + Sync;
Expand Down Expand Up @@ -88,7 +88,7 @@ impl<A: Hash + Eq + Clone + Display, S: 'static> ConcurrentKeyring<A, S> {
f: F,
) -> Option<Fut::Output> {
let Some(address) = self.addresses_buffer.pop() else {
warn!(keyring = %self.name, "high traffic in keyring");
debug!(keyring = %self.name, "high traffic in keyring");
return None;
};

Expand Down
12 changes: 7 additions & 5 deletions lib/pg-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl<T: QueueMessage> voyager_vm::Queue<T> for PgQueue<T> {

match row {
Some(row) => {
let span = info_span!("processing item", id = row.id);
let span = info_span!("processing item", item_id = row.id);

trace!(%row.item);

Expand Down Expand Up @@ -384,20 +384,22 @@ impl<T: QueueMessage> voyager_vm::Queue<T> for PgQueue<T> {

sqlx::query(
"
INSERT INTO queue (item)
SELECT * FROM UNNEST($1::JSONB[])
INSERT INTO queue (item, parents)
SELECT *, $1 as parents FROM UNNEST($2::JSONB[])
",
)
.bind(vec![row.id])
.bind(ready.into_iter().map(Json).collect::<Vec<_>>())
.execute(tx.as_mut())
.await?;

sqlx::query(
"
INSERT INTO optimize (item, tag)
SELECT * FROM UNNEST($1::JSONB[], $2::TEXT[])
INSERT INTO optimize (item, tag, parents)
SELECT *, $1 as parents FROM UNNEST($2::JSONB[], $3::TEXT[])
",
)
.bind(vec![row.id])
.bind(optimize.iter().map(|(op, _)| Json(op)).collect::<Vec<_>>())
.bind(optimize.iter().map(|(_, tag)| *tag).collect::<Vec<_>>())
.execute(tx.as_mut())
Expand Down
2 changes: 1 addition & 1 deletion lib/reconnecting-jsonrpc-ws-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl ClientT for Client {
.as_deref()
.ok_or_else(|| {
jsonrpsee::core::client::Error::Custom(format!(
"not yet connected (request: {method})",
"not yet connected (notification: {method})",
))
})?
.notification(method, params)
Expand Down
56 changes: 28 additions & 28 deletions lib/voyager-message/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,39 +562,39 @@ impl Context {
)
.await?;

main_rpc_server.start(Arc::new(modules));

info!("checking for plugin health...");

{
let mut futures = plugins
.iter()
.map(|(name, client)| async move {
match client
.client
.wait_until_connected(Duration::from_secs(10))
.instrument(debug_span!("health check", %name))
.await
{
Ok(_) => {
info!("plugin {name} connected")
}
Err(_) => {
warn!("plugin {name} failed to connect after 10 seconds")
}
let futures = plugins
.iter()
.map(|(name, client)| async move {
match client
.client
.wait_until_connected(Duration::from_secs(10))
.instrument(debug_span!("health check", %name))
.await
{
Ok(()) => {
info!("plugin {name} connected")
}
})
.collect::<FuturesUnordered<_>>();

match cancellation_token
.run_until_cancelled(async { while let Some(()) = futures.next().await {} })
.await
{
Some(()) => {}
None => return Err(anyhow!("startup error")),
}
Err(_) => {
warn!("plugin {name} failed to connect after 10 seconds")
}
}
})
.collect::<FuturesUnordered<_>>();

match cancellation_token
.run_until_cancelled(futures.collect::<Vec<_>>())
.await
{
Some(_) => {}
None => return Err(anyhow!("startup error")),
}

main_rpc_server.start(Arc::new(modules));

info!("started");

Ok(Self {
rpc_server: main_rpc_server,
plugins,
Expand Down
9 changes: 7 additions & 2 deletions lib/voyager-message/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
ChainId, ClientInfo, ClientStateMeta, ClientType, ConsensusStateMeta, IbcInterface, IbcSpec,
},
data::Data,
rpc::ProofType,
RawClientId, VoyagerMessage,
};

Expand Down Expand Up @@ -350,14 +351,18 @@ pub trait ProofModule<V: IbcSpec> {
/// Query a proof of IBC state on this chain, at the specified [`Height`],
/// returning the state as a JSON [`Value`].
#[method(name = "queryIbcProof", with_extensions)]
async fn query_ibc_proof(&self, at: Height, path: V::StorePath) -> RpcResult<Value>;
async fn query_ibc_proof(
&self,
at: Height,
path: V::StorePath,
) -> RpcResult<(Value, ProofType)>;
}

/// Type-erased version of [`ProofModuleClient`].
#[rpc(client, namespace = "proof")]
pub trait RawProofModule {
#[method(name = "queryIbcProof")]
async fn query_ibc_proof_raw(&self, at: Height, path: Value) -> RpcResult<Value>;
async fn query_ibc_proof_raw(&self, at: Height, path: Value) -> RpcResult<(Value, ProofType)>;
}

/// Client modules provide functionality to interact with a single light client
Expand Down
14 changes: 8 additions & 6 deletions lib/voyager-message/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use jsonrpsee::{
types::{ErrorObject, ErrorObjectOwned},
};
use macros::model;
use serde::de::DeserializeOwned;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::{json, Value};
use unionlabs::{ibc::core::client::height::Height, primitives::Bytes, ErrorReporter};
use voyager_core::{IbcSpecId, Timestamp};
Expand Down Expand Up @@ -167,16 +167,18 @@ impl IbcState<Value> {

#[model]
pub struct IbcProof {
// pub proof_type: ProofType,
pub proof_type: ProofType,
/// The height that the proof was read at.
pub height: Height,
pub proof: Value,
}

// enum ProofType {
// Membership,
// NonMembership,
// }
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ProofType {
Membership,
NonMembership,
}

#[model]
pub struct SelfClientState {
Expand Down
20 changes: 14 additions & 6 deletions lib/voyager-message/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,19 @@ impl Server {
.map_err(fatal_error)?
.with_id(self.item_id);

let proof = proof_module
let (proof, proof_type) = proof_module
.query_ibc_proof_raw(height, path)
.await
.map_err(json_rpc_error_to_error_object)?;

// TODO: Use valuable here
debug!(%proof, "fetched ibc proof");
debug!(%proof, ?proof_type, "fetched ibc proof");

Ok(IbcProof { height, proof })
Ok(IbcProof {
height,
proof,
proof_type,
})
})
.await
}
Expand Down Expand Up @@ -415,15 +419,19 @@ impl Server {
.map_err(fatal_error)?
.with_id(self.item_id);

let proof = proof_module
let (proof, proof_type) = proof_module
.query_ibc_proof_raw(height, into_value(path.clone()))
.await
.map_err(json_rpc_error_to_error_object)?;

// TODO: Use valuable here
trace!(%proof, "fetched ibc proof");
debug!(%proof, ?proof_type, "fetched ibc proof");

Ok(IbcProof { height, proof })
Ok(IbcProof {
height,
proof,
proof_type,
})
})
.await
}
Expand Down
12 changes: 6 additions & 6 deletions lib/voyager-vm/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,17 @@ impl<T: QueueMessage> Queue<T> for InMemoryQueue<T> {
};

match op {
Some((id, item)) => {
let span = info_span!("processing item", %id);
Some((item_id, item)) => {
let span = info_span!("processing item", %item_id);

self.done
.lock()
.expect("mutex is poisoned")
.insert(id, item.clone());
.insert(item_id, item.clone());

let (r, res) = f(
item.op.clone(),
ItemId::new(i64::from(id)).expect("infallible"),
ItemId::new(i64::from(item_id)).expect("infallible"),
)
.instrument(span)
.await;
Expand All @@ -130,7 +130,7 @@ impl<T: QueueMessage> Queue<T> for InMemoryQueue<T> {
optimizer_queue.entry(tag.to_owned()).or_default().insert(
self.idx.fetch_add(1, Ordering::SeqCst),
Item {
parents: vec![id],
parents: vec![item_id],
op,
},
);
Expand All @@ -139,7 +139,7 @@ impl<T: QueueMessage> Queue<T> for InMemoryQueue<T> {
ready.insert(
self.idx.fetch_add(1, Ordering::SeqCst),
Item {
parents: vec![id],
parents: vec![item_id],
op,
},
);
Expand Down
79 changes: 45 additions & 34 deletions voyager/modules/proof/cosmos-sdk-union/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tracing::{error, instrument};
use unionlabs::{
bech32::Bech32,
bounded::BoundedI64,
cosmos::ics23::commitment_proof::CommitmentProof,
ibc::core::{client::height::Height, commitment::merkle_proof::MerkleProof},
primitives::H256,
ErrorReporter,
Expand All @@ -26,6 +27,7 @@ use voyager_message::{
core::ChainId,
into_value,
module::{ProofModuleInfo, ProofModuleServer},
rpc::ProofType,
ProofModule, FATAL_JSONRPC_ERROR_CODE,
};
use voyager_vm::BoxDynError;
Expand Down Expand Up @@ -116,7 +118,7 @@ impl ProofModuleServer<IbcUnion> for Module {
_: &Extensions,
at: Height,
path: StorePath,
) -> RpcResult<Value> {
) -> RpcResult<(Value, ProofType)> {
let data = [0x03]
.into_iter()
.chain(*self.ibc_host_contract_address.data())
Expand Down Expand Up @@ -144,44 +146,53 @@ impl ProofModuleServer<IbcUnion> for Module {
true,
)
.await
.map_err(rpc_error("error querying connection proof", None))?;

Ok(into_value(
MerkleProof::try_from(protos::ibc::core::commitment::v1::MerkleProof {
proofs: query_result
.response
.proof_ops
.ok_or_else(|| {
.map_err(rpc_error("error querying ibc proof", None))?;

let proofs = query_result
.response
.proof_ops
.ok_or_else(|| {
ErrorObject::owned(
FATAL_JSONRPC_ERROR_CODE,
"proofOps must be present on abci query when called with prove = true",
None::<()>,
)
})?
.ops
.into_iter()
.map(|op| {
<protos::cosmos::ics23::v1::CommitmentProof as prost::Message>::decode(&*op.data)
.map_err(|e| {
ErrorObject::owned(
FATAL_JSONRPC_ERROR_CODE,
"proofOps must be present on abci query when called with prove = true",
None::<()>,
format!("invalid height value: {}", ErrorReporter(e)),
Some(json!({ "height": at })),
)
})?
.ops
.into_iter()
.map(|op| {
<protos::cosmos::ics23::v1::CommitmentProof as prost::Message>::decode(
&*op.data,
)
.map_err(|e| {
ErrorObject::owned(
FATAL_JSONRPC_ERROR_CODE,
format!("invalid height value: {}", ErrorReporter(e)),
Some(json!({ "height": at })),
)
})
})
.collect::<Result<Vec<_>, _>>()?,
})
.map_err(|e| {
ErrorObject::owned(
FATAL_JSONRPC_ERROR_CODE,
format!("invalid height value: {}", ErrorReporter(e)),
Some(json!({ "height": at })),
)
})?,
))
.collect::<Result<Vec<_>, _>>()?;

let proof =
MerkleProof::try_from(protos::ibc::core::commitment::v1::MerkleProof { proofs })
.map_err(|e| {
ErrorObject::owned(
FATAL_JSONRPC_ERROR_CODE,
format!("invalid merkle proof value: {}", ErrorReporter(e)),
Some(json!({ "height": at })),
)
})?;

let proof_type = if proof
.proofs
.iter()
.any(|p| matches!(&p, CommitmentProof::Nonexist(_)))
{
ProofType::NonMembership
} else {
ProofType::Membership
};

Ok((into_value(proof), proof_type))
}
}

Expand Down
Loading

0 comments on commit 219688e

Please sign in to comment.