Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge queue: embarking main (97460cf), #9167 and #9216 together #9231

Closed
wants to merge 8 commits into from
174 changes: 142 additions & 32 deletions zebra-rpc/src/server/http_request_compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use std::pin::Pin;

use futures::{future, FutureExt};
use http_body_util::BodyExt;
use hyper::{body::Bytes, header};
use hyper::header;
use jsonrpsee::{
core::BoxError,
server::{HttpBody, HttpRequest, HttpResponse},
};
use jsonrpsee_types::ErrorObject;
use serde::{Deserialize, Serialize};
use tower::Service;

use super::cookie::Cookie;
Expand Down Expand Up @@ -118,24 +119,55 @@ impl<S> HttpRequestMiddleware<S> {
}
}

/// Remove any "jsonrpc: 1.0" fields in `data`, and return the resulting string.
pub fn remove_json_1_fields(data: String) -> String {
// Replace "jsonrpc = 1.0":
// - at the start or middle of a list, and
// - at the end of a list;
// with no spaces (lightwalletd format), and spaces after separators (example format).
//
// TODO: if we see errors from lightwalletd, make this replacement more accurate:
// - use a partial JSON fragment parser
// - combine the whole request into a single buffer, and use a JSON parser
// - use a regular expression
//
// We could also just handle the exact lightwalletd format,
// by replacing `{"jsonrpc":"1.0",` with `{"jsonrpc":"2.0`.
data.replace("\"jsonrpc\":\"1.0\",", "\"jsonrpc\":\"2.0\",")
.replace("\"jsonrpc\": \"1.0\",", "\"jsonrpc\": \"2.0\",")
.replace(",\"jsonrpc\":\"1.0\"", ",\"jsonrpc\":\"2.0\"")
.replace(", \"jsonrpc\": \"1.0\"", ", \"jsonrpc\": \"2.0\"")
/// Maps whatever JSON-RPC version the client is using to JSON-RPC 2.0.
async fn request_to_json_rpc_2(
request: HttpRequest<HttpBody>,
) -> (JsonRpcVersion, HttpRequest<HttpBody>) {
let (parts, body) = request.into_parts();
let bytes = body
.collect()
.await
.expect("Failed to collect body data")
.to_bytes();
let (version, bytes) =
if let Ok(request) = serde_json::from_slice::<'_, JsonRpcRequest>(bytes.as_ref()) {
let version = request.version();
if matches!(version, JsonRpcVersion::Unknown) {
(version, bytes)
} else {
(
version,
serde_json::to_vec(&request.into_2()).expect("valid").into(),
)
}
} else {
(JsonRpcVersion::Unknown, bytes)
};
(
version,
HttpRequest::from_parts(parts, HttpBody::from(bytes.as_ref().to_vec())),
)
}
/// Maps JSON-2.0 to whatever JSON-RPC version the client is using.
async fn response_from_json_rpc_2(
version: JsonRpcVersion,
response: HttpResponse<HttpBody>,
) -> HttpResponse<HttpBody> {
let (parts, body) = response.into_parts();
let bytes = body
.collect()
.await
.expect("Failed to collect body data")
.to_bytes();
let bytes =
if let Ok(response) = serde_json::from_slice::<'_, JsonRpcResponse>(bytes.as_ref()) {
serde_json::to_vec(&response.into_version(version))
.expect("valid")
.into()
} else {
bytes
};
HttpResponse::from_parts(parts, HttpBody::from(bytes.as_ref().to_vec()))
}
}

Expand Down Expand Up @@ -203,25 +235,103 @@ where
Self::insert_or_replace_content_type_header(request.headers_mut());

let mut service = self.service.clone();
let (parts, body) = request.into_parts();

async move {
let bytes = body
.collect()
.await
.expect("Failed to collect body data")
.to_bytes();
let (version, request) = Self::request_to_json_rpc_2(request).await;
let response = service.call(request).await.map_err(Into::into)?;
Ok(Self::response_from_json_rpc_2(version, response).await)
}
.boxed()
}
}

#[derive(Clone, Copy, Debug)]
enum JsonRpcVersion {
/// bitcoind used a mishmash of 1.0, 1.1, and 2.0 for its JSON-RPC.
Bitcoind,
/// lightwalletd uses the above mishmash, but also breaks spec to include a
/// `"jsonrpc": "1.0"` key.
Lightwalletd,
/// The client is indicating strict 2.0 handling.
TwoPointZero,
/// On parse errors we don't modify anything, and let the `jsonrpsee` crate handle it.
Unknown,
}

let data = String::from_utf8_lossy(bytes.as_ref()).to_string();
/// A version-agnostic JSON-RPC request.
#[derive(Debug, Deserialize, Serialize)]
struct JsonRpcRequest {
#[serde(skip_serializing_if = "Option::is_none")]
jsonrpc: Option<String>,
method: String,
#[serde(skip_serializing_if = "Option::is_none")]
params: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<serde_json::Value>,
}

// Fix JSON-RPC 1.0 requests.
let data = Self::remove_json_1_fields(data);
let body = HttpBody::from(Bytes::from(data).as_ref().to_vec());
impl JsonRpcRequest {
fn version(&self) -> JsonRpcVersion {
match (self.jsonrpc.as_deref(), &self.params, &self.id) {
(
Some("2.0"),
_,
None
| Some(
serde_json::Value::Null
| serde_json::Value::String(_)
| serde_json::Value::Number(_),
),
) => JsonRpcVersion::TwoPointZero,
(Some("1.0"), Some(_), Some(_)) => JsonRpcVersion::Lightwalletd,
(None, Some(_), Some(_)) => JsonRpcVersion::Bitcoind,
_ => JsonRpcVersion::Unknown,
}
}

let request = HttpRequest::from_parts(parts, body);
fn into_2(mut self) -> Self {
self.jsonrpc = Some("2.0".into());
self
}
}
/// A version-agnostic JSON-RPC response.
#[derive(Debug, Deserialize, Serialize)]
struct JsonRpcResponse {
#[serde(skip_serializing_if = "Option::is_none")]
jsonrpc: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<serde_json::Value>,
id: serde_json::Value,
}

service.call(request).await.map_err(Into::into)
impl JsonRpcResponse {
fn into_version(mut self, version: JsonRpcVersion) -> Self {
match version {
JsonRpcVersion::Bitcoind => {
self.jsonrpc = None;
self.result = self.result.or(Some(serde_json::Value::Null));
self.error = self.error.or(Some(serde_json::Value::Null));
}
JsonRpcVersion::Lightwalletd => {
self.jsonrpc = Some("1.0".into());
self.result = self.result.or(Some(serde_json::Value::Null));
self.error = self.error.or(Some(serde_json::Value::Null));
}
JsonRpcVersion::TwoPointZero => {
// `jsonrpsee` should be returning valid JSON-RPC 2.0 responses. However,
// a valid result of `null` can be parsed into `None` by this parser, so
// we map the result explicitly to `Null` when there is no error.
assert_eq!(self.jsonrpc.as_deref(), Some("2.0"));
if self.error.is_none() {
self.result = self.result.or(Some(serde_json::Value::Null));
} else {
assert!(self.result.is_none());
}
}
JsonRpcVersion::Unknown => (),
}
.boxed()
self
}
}
44 changes: 43 additions & 1 deletion zebra-state/src/service/non_finalized_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

use zebra_chain::{
block::{self, Block},
block::{self, Block, Hash},
parameters::Network,
sprout, transparent,
};
Expand Down Expand Up @@ -45,6 +45,10 @@ pub struct NonFinalizedState {
/// callers should migrate to `chain_iter().next()`.
chain_set: BTreeSet<Arc<Chain>>,

/// Blocks that have been invalidated in, and removed from, the non finalized
/// state.
invalidated_blocks: HashMap<Hash, Arc<Vec<ContextuallyVerifiedBlock>>>,

// Configuration
//
/// The configured Zcash network.
Expand Down Expand Up @@ -92,6 +96,7 @@ impl Clone for NonFinalizedState {
Self {
chain_set: self.chain_set.clone(),
network: self.network.clone(),
invalidated_blocks: self.invalidated_blocks.clone(),

#[cfg(feature = "getblocktemplate-rpcs")]
should_count_metrics: self.should_count_metrics,
Expand All @@ -112,6 +117,7 @@ impl NonFinalizedState {
NonFinalizedState {
chain_set: Default::default(),
network: network.clone(),
invalidated_blocks: Default::default(),
#[cfg(feature = "getblocktemplate-rpcs")]
should_count_metrics: true,
#[cfg(feature = "progress-bar")]
Expand Down Expand Up @@ -264,6 +270,37 @@ impl NonFinalizedState {
Ok(())
}

/// Invalidate block with hash `block_hash` and all descendants from the non-finalized state. Insert
/// the new chain into the chain_set and discard the previous.
pub fn invalidate_block(&mut self, block_hash: Hash) {
let Some(chain) = self.find_chain(|chain| chain.contains_block_hash(block_hash)) else {
return;
};

let invalidated_blocks = if chain.non_finalized_root_hash() == block_hash {
self.chain_set.remove(&chain);
chain.blocks.values().cloned().collect()
} else {
let (new_chain, invalidated_blocks) = chain
.invalidate_block(block_hash)
.expect("already checked that chain contains hash");

// Add the new chain fork or updated chain to the set of recent chains, and
// remove the chain containing the hash of the block from chain set
self.insert_with(Arc::new(new_chain.clone()), |chain_set| {
chain_set.retain(|c| !c.contains_block_hash(block_hash))
});

invalidated_blocks
};

self.invalidated_blocks
.insert(block_hash, Arc::new(invalidated_blocks));

self.update_metrics_for_chains();
self.update_metrics_bars();
}

/// Commit block to the non-finalized state as a new chain where its parent
/// is the finalized tip.
#[tracing::instrument(level = "debug", skip(self, finalized_state, prepared))]
Expand Down Expand Up @@ -586,6 +623,11 @@ impl NonFinalizedState {
self.chain_set.len()
}

/// Return the invalidated blocks.
pub fn invalidated_blocks(&self) -> HashMap<block::Hash, Arc<Vec<ContextuallyVerifiedBlock>>> {
self.invalidated_blocks.clone()
}

/// Return the chain whose tip block hash is `parent_hash`.
///
/// The chain can be an existing chain in the non-finalized state, or a freshly
Expand Down
24 changes: 22 additions & 2 deletions zebra-state/src/service/non_finalized_state/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,26 @@ impl Chain {
(block, treestate)
}

// Returns the block at the provided height and all of its descendant blocks.
pub fn child_blocks(&self, block_height: &block::Height) -> Vec<ContextuallyVerifiedBlock> {
self.blocks
.range(block_height..)
.map(|(_h, b)| b.clone())
.collect()
}

// Returns a new chain without the invalidated block or its descendants.
pub fn invalidate_block(
&self,
block_hash: block::Hash,
) -> Option<(Self, Vec<ContextuallyVerifiedBlock>)> {
let block_height = self.height_by_hash(block_hash)?;
let mut new_chain = self.fork(block_hash)?;
new_chain.pop_tip();
new_chain.last_fork_height = None;
Some((new_chain, self.child_blocks(&block_height)))
}

/// Returns the height of the chain root.
pub fn non_finalized_root_height(&self) -> block::Height {
self.blocks
Expand Down Expand Up @@ -1600,7 +1620,7 @@ impl DerefMut for Chain {

/// The revert position being performed on a chain.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
enum RevertPosition {
pub(crate) enum RevertPosition {
/// The chain root is being reverted via [`Chain::pop_root`], when a block
/// is finalized.
Root,
Expand All @@ -1619,7 +1639,7 @@ enum RevertPosition {
/// and [`Chain::pop_tip`] functions, and fear that it would be easy to
/// introduce bugs when updating them, unless the code was reorganized to keep
/// related operations adjacent to each other.
trait UpdateWith<T> {
pub(crate) trait UpdateWith<T> {
/// When `T` is added to the chain tip,
/// update [`Chain`] cumulative data members to add data that are derived from `T`.
fn update_chain_tip_with(&mut self, _: &T) -> Result<(), ValidateContextError>;
Expand Down
Loading
Loading