Skip to content

Commit

Permalink
Process SubmitBlock requests in parallel (#357)
Browse files Browse the repository at this point in the history
* Add some properties to gRPC server methods

* Apply method properties to requests handling

* Let the service have a full Config and rename bsp to network_bps

* Store routing policies inside the routing map

* When a SubmitBlock fails because the route is full, drop the block and report the accurate reason to the client

* While processing a submitted block, report the new block sequentially to the mempool

* On SubmitBlock failure, send a response with both the reason and an error message

* Add a drop fn to methods with DropIfFull routing policy

* Embed the drop fn into the DropIfFull variant
  • Loading branch information
tiram88 authored Dec 18, 2023
1 parent 50d5233 commit 6658164
Show file tree
Hide file tree
Showing 16 changed files with 405 additions and 95 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
mining_manager,
flow_context,
index_service.as_ref().map(|x| x.utxoindex().unwrap()),
config,
config.clone(),
core.clone(),
processing_counters,
wrpc_borsh_counters.clone(),
Expand All @@ -451,7 +451,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
grpc_tower_counters.clone(),
));
let grpc_service =
Arc::new(GrpcService::new(grpc_server_addr, rpc_core_service.clone(), args.rpc_max_clients, grpc_tower_counters));
Arc::new(GrpcService::new(grpc_server_addr, config, rpc_core_service.clone(), args.rpc_max_clients, grpc_tower_counters));

// Create an async runtime and register the top-level async services
let async_runtime = Arc::new(AsyncRuntime::new(args.async_threads));
Expand Down
8 changes: 2 additions & 6 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,8 @@ impl FlowContext {
// Broadcast as soon as the block has been validated and inserted into the DAG
self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await;

let ctx = self.clone();
let consensus = consensus.clone();
tokio::spawn(async move {
ctx.on_new_block(&consensus, block, virtual_state_task).await;
ctx.log_block_acceptance(hash, BlockSource::Submit);
});
self.on_new_block(consensus, block, virtual_state_task).await;
self.log_block_acceptance(hash, BlockSource::Submit);

Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion rpc/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{net::AddrParseError, num::TryFromIntError};
use thiserror::Error;
use workflow_core::channel::ChannelError;

use crate::{api::ctl::RpcState, RpcHash, RpcTransactionId};
use crate::{api::ctl::RpcState, RpcHash, RpcTransactionId, SubmitBlockRejectReason};

#[derive(Clone, Debug, Error)]
pub enum RpcError {
Expand Down Expand Up @@ -77,6 +77,9 @@ pub enum RpcError {
#[error("IP {0} is not registered as banned.")]
IpIsNotBanned(IpAddress),

#[error("Block was not submitted: {0}")]
SubmitBlockError(SubmitBlockRejectReason),

#[error(transparent)]
AddressError(#[from] kaspa_addresses::AddressError),

Expand Down
8 changes: 5 additions & 3 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ impl SubmitBlockRequest {
}
}

#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[derive(Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[serde(rename_all = "camelCase")]
pub enum SubmitBlockRejectReason {
BlockInvalid = 1,
IsInIBD = 2,
RouteIsFull = 3,
}
impl SubmitBlockRejectReason {
fn as_str(&self) -> &'static str {
// see app\appmessage\rpc_submit_block.go, line 35
match self {
SubmitBlockRejectReason::BlockInvalid => "Block is invalid",
SubmitBlockRejectReason::IsInIBD => "Node is in IBD",
SubmitBlockRejectReason::BlockInvalid => "block is invalid",
SubmitBlockRejectReason::IsInIBD => "node is not synced",
SubmitBlockRejectReason::RouteIsFull => "route is full",
}
}
}
Expand Down
138 changes: 134 additions & 4 deletions rpc/grpc/core/src/convert/message.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
//! Conversions of protowire messages from and to rpc core counterparts.
//!
//! Response payloads in protowire do always contain an error field and generally a set of
//! fields providing the requested data.
//!
//! Responses in rpc core are expressed as RpcResult<XxxResponse>, where Xxx is the called
//! RPC method.
//!
//! The general conversion convention from protowire to rpc core is to consider the error
//! field first and, if present, to return a matching Err(RpcError). If absent, try to
//! convert the set of data fields into a matching XxxResponse rpc core response and, on
//! success, return Ok(XxxResponse), otherwise return a conversion error.
//!
//! Conversely, the general conversion convention from rpc core to protowire, depending on
//! a provided RpcResult is to either convert the Ok(XxxResponse) into the matching set
//! of data fields and provide no error or provide no data fields but an error field in case
//! of Err(RpcError).
//!
//! The SubmitBlockResponse is a notable exception to this general rule.
use crate::protowire::{self, submit_block_response_message::RejectReason};
use kaspa_consensus_core::network::NetworkId;
use kaspa_core::debug;
use kaspa_notify::subscription::Command;
use kaspa_rpc_core::{
RpcContextualPeerAddress, RpcError, RpcExtraData, RpcHash, RpcIpAddress, RpcNetworkType, RpcPeerAddress, RpcResult,
SubmitBlockRejectReason, SubmitBlockReport,
};
use std::str::FromStr;

Expand Down Expand Up @@ -109,14 +130,25 @@ from!(item: &kaspa_rpc_core::SubmitBlockReport, RejectReason, {
kaspa_rpc_core::SubmitBlockReport::Success => RejectReason::None,
kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::BlockInvalid) => RejectReason::BlockInvalid,
kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::IsInIBD) => RejectReason::IsInIbd,
// The conversion of RouteIsFull falls back to None since there exist no such variant in the original protowire version
// and we do not want to break backwards compatibility
kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::RouteIsFull) => RejectReason::None,
}
});

from!(item: &kaspa_rpc_core::SubmitBlockRequest, protowire::SubmitBlockRequestMessage, {
Self { block: Some((&item.block).into()), allow_non_daa_blocks: item.allow_non_daa_blocks }
});
// This conversion breaks the general conversion convention (see file header) since the message may
// contain both a non default reject_reason and a matching error message. In the RouteIsFull case
// reject_reason is None (because this reason has no variant in protowire) but a specific error
// message is provided.
from!(item: RpcResult<&kaspa_rpc_core::SubmitBlockResponse>, protowire::SubmitBlockResponseMessage, {
Self { reject_reason: RejectReason::from(&item.report) as i32, error: None }
let error: Option<protowire::RpcError> = match item.report {
kaspa_rpc_core::SubmitBlockReport::Success => None,
kaspa_rpc_core::SubmitBlockReport::Reject(reason) => Some(RpcError::SubmitBlockError(reason).into())
};
Self { reject_reason: RejectReason::from(&item.report) as i32, error }
});

from!(item: &kaspa_rpc_core::GetBlockTemplateRequest, protowire::GetBlockTemplateRequestMessage, {
Expand Down Expand Up @@ -464,9 +496,31 @@ try_from!(item: &protowire::SubmitBlockRequestMessage, kaspa_rpc_core::SubmitBlo
allow_non_daa_blocks: item.allow_non_daa_blocks,
}
});
try_from!(item: &protowire::SubmitBlockResponseMessage, RpcResult<kaspa_rpc_core::SubmitBlockResponse>, {
Self { report: RejectReason::try_from(item.reject_reason).map_err(|_| RpcError::PrimitiveToEnumConversionError)?.into() }
});
impl TryFrom<&protowire::SubmitBlockResponseMessage> for kaspa_rpc_core::SubmitBlockResponse {
type Error = RpcError;
// This conversion breaks the general conversion convention (see file header) since the message may
// contain both a non-None reject_reason and a matching error message. Things get even challenging
// in the RouteIsFull case where reject_reason is None (because this reason has no variant in protowire)
// but a specific error message is provided.
fn try_from(item: &protowire::SubmitBlockResponseMessage) -> RpcResult<Self> {
let report: SubmitBlockReport =
RejectReason::try_from(item.reject_reason).map_err(|_| RpcError::PrimitiveToEnumConversionError)?.into();
if let Some(ref err) = item.error {
match report {
SubmitBlockReport::Success => {
if err.message == RpcError::SubmitBlockError(SubmitBlockRejectReason::RouteIsFull).to_string() {
Ok(Self { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::RouteIsFull) })
} else {
Err(err.into())
}
}
SubmitBlockReport::Reject(_) => Ok(Self { report }),
}
} else {
Ok(Self { report })
}
}
}

try_from!(item: &protowire::GetBlockTemplateRequestMessage, kaspa_rpc_core::GetBlockTemplateRequest, {
Self { pay_address: item.pay_address.clone().try_into()?, extra_data: RpcExtraData::from_iter(item.extra_data.bytes()) }
Expand Down Expand Up @@ -825,3 +879,79 @@ try_from!(&protowire::NotifySinkBlueScoreChangedResponseMessage, RpcResult<kaspa
// ----------------------------------------------------------------------------

// TODO: tests

#[cfg(test)]
mod tests {
use kaspa_rpc_core::{RpcError, RpcResult, SubmitBlockRejectReason, SubmitBlockReport, SubmitBlockResponse};

use crate::protowire::{self, submit_block_response_message::RejectReason, SubmitBlockResponseMessage};

#[test]
fn test_submit_block_response() {
struct Test {
rpc_core: RpcResult<kaspa_rpc_core::SubmitBlockResponse>,
protowire: protowire::SubmitBlockResponseMessage,
}
impl Test {
fn new(
rpc_core: RpcResult<kaspa_rpc_core::SubmitBlockResponse>,
protowire: protowire::SubmitBlockResponseMessage,
) -> Self {
Self { rpc_core, protowire }
}
}
let tests = vec![
Test::new(
Ok(SubmitBlockResponse { report: SubmitBlockReport::Success }),
SubmitBlockResponseMessage { reject_reason: RejectReason::None as i32, error: None },
),
Test::new(
Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::BlockInvalid) }),
SubmitBlockResponseMessage {
reject_reason: RejectReason::BlockInvalid as i32,
error: Some(protowire::RpcError {
message: RpcError::SubmitBlockError(SubmitBlockRejectReason::BlockInvalid).to_string(),
}),
},
),
Test::new(
Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::IsInIBD) }),
SubmitBlockResponseMessage {
reject_reason: RejectReason::IsInIbd as i32,
error: Some(protowire::RpcError {
message: RpcError::SubmitBlockError(SubmitBlockRejectReason::IsInIBD).to_string(),
}),
},
),
Test::new(
Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::RouteIsFull) }),
SubmitBlockResponseMessage {
reject_reason: RejectReason::None as i32, // This rpc core reject reason has no matching protowire variant
error: Some(protowire::RpcError {
message: RpcError::SubmitBlockError(SubmitBlockRejectReason::RouteIsFull).to_string(),
}),
},
),
];

for test in tests {
let cnv_protowire: SubmitBlockResponseMessage = test.rpc_core.as_ref().map_err(|x| x.clone()).into();
assert_eq!(cnv_protowire.reject_reason, test.protowire.reject_reason);
assert_eq!(cnv_protowire.error.is_some(), test.protowire.error.is_some());
assert_eq!(cnv_protowire.error, test.protowire.error);

let cnv_rpc_core: RpcResult<SubmitBlockResponse> = (&test.protowire).try_into();
assert_eq!(cnv_rpc_core.is_ok(), test.rpc_core.is_ok());
match cnv_rpc_core {
Ok(ref cnv_response) => {
let Ok(ref response) = test.rpc_core else { panic!() };
assert_eq!(cnv_response.report, response.report);
}
Err(ref cnv_err) => {
let Err(ref err) = test.rpc_core else { panic!() };
assert_eq!(cnv_err.to_string(), err.to_string());
}
}
}
}
}
3 changes: 2 additions & 1 deletion rpc/grpc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ include.workspace = true
license.workspace = true

[dependencies]
kaspa-consensus-core.workspace = true
kaspa-core.workspace = true
kaspa-grpc-core.workspace = true
kaspa-notify.workspace = true
kaspa-rpc-core.workspace = true
kaspa-rpc-macros.workspace = true
kaspa-rpc-service.workspace = true
kaspa-utils.workspace = true
kaspa-utils-tower.workspace = true
kaspa-utils.workspace = true

async-channel.workspace = true
async-stream.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion rpc/grpc/server/src/adaptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ impl Adaptor {

pub fn server(
serve_address: NetAddress,
network_bps: u64,
manager: Manager,
core_service: DynRpcService,
core_notifier: Arc<Notifier<Notification, ChannelConnection>>,
counters: Arc<TowerConnectionCounters>,
) -> Arc<Self> {
let (manager_sender, manager_receiver) = mpsc_channel(Self::manager_channel_size());
let connection_handler = ConnectionHandler::new(manager_sender, core_service.clone(), core_notifier, counters);
let connection_handler = ConnectionHandler::new(network_bps, manager_sender, core_service.clone(), core_notifier, counters);
let server_termination = connection_handler.serve(serve_address);
let adaptor = Arc::new(Adaptor::new(Some(server_termination), connection_handler, manager, serve_address));
adaptor.manager.clone().start_event_loop(manager_receiver);
Expand Down
Loading

0 comments on commit 6658164

Please sign in to comment.