Skip to content

Commit

Permalink
Support pending transactions subscription (#647)
Browse files Browse the repository at this point in the history
  • Loading branch information
FabijanC authored Nov 22, 2024
1 parent 0da16ff commit 979a986
Show file tree
Hide file tree
Showing 12 changed files with 733 additions and 83 deletions.
3 changes: 1 addition & 2 deletions crates/starknet-devnet-core/src/starknet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use starknet_api::block::{BlockNumber, BlockStatus, BlockTimestamp, GasPrice, Ga
use starknet_api::core::SequencerContractAddress;
use starknet_api::felt;
use starknet_api::transaction::Fee;
use starknet_config::BlockGenerationOn;
use starknet_rs_core::types::{
BlockId, BlockTag, Call, ExecutionResult, Felt, Hash256, MsgFromL1, TransactionFinalityStatus,
};
Expand Down Expand Up @@ -409,7 +408,7 @@ impl Starknet {
self.transactions.insert(transaction_hash, transaction_to_add);

// create new block from pending one, only in block-generation-on-transaction mode
if self.config.block_generation_on == BlockGenerationOn::Transaction {
if !self.config.uses_pending_block() {
self.generate_new_block_and_state()?;
}

Expand Down
9 changes: 9 additions & 0 deletions crates/starknet-devnet-core/src/starknet/starknet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ pub struct StarknetConfig {
pub strk_erc20_contract_class: String,
}

impl StarknetConfig {
pub fn uses_pending_block(&self) -> bool {
match self.block_generation_on {
BlockGenerationOn::Transaction => false,
BlockGenerationOn::Demand | BlockGenerationOn::Interval(_) => true,
}
}
}

#[allow(clippy::unwrap_used)]
impl Default for StarknetConfig {
fn default() -> Self {
Expand Down
107 changes: 92 additions & 15 deletions crates/starknet-devnet-server/src/api/json_rpc/endpoints_ws.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use starknet_core::error::Error;
use starknet_core::starknet::starknet_config::BlockGenerationOn;
use starknet_rs_core::types::{BlockId, BlockTag};
use starknet_types::rpc::block::{BlockResult, PendingBlock};
use starknet_types::rpc::transactions::{TransactionWithHash, Transactions};
use starknet_types::starknet_api::block::{BlockNumber, BlockStatus};

use super::error::ApiError;
use super::models::{BlockInput, SubscriptionIdInput, TransactionBlockInput};
use super::models::{
BlockInput, PendingTransactionsSubscriptionInput, SubscriptionIdInput, TransactionBlockInput,
};
use super::{JsonRpcHandler, JsonRpcSubscriptionRequest};
use crate::rpc_core::request::Id;
use crate::subscribe::{NewTransactionStatus, SocketId, Subscription, SubscriptionNotification};
use crate::subscribe::{
AddressFilter, NewTransactionStatus, PendingTransactionNotification, SocketId, Subscription,
SubscriptionNotification, TransactionHashWrapper,
};

/// The definitions of JSON-RPC read endpoints defined in starknet_ws_api.json
impl JsonRpcHandler {
Expand All @@ -24,7 +30,9 @@ impl JsonRpcHandler {
JsonRpcSubscriptionRequest::TransactionStatus(data) => {
self.subscribe_tx_status(data, rpc_request_id, socket_id).await
}
JsonRpcSubscriptionRequest::PendingTransactions => todo!(),
JsonRpcSubscriptionRequest::PendingTransactions(data) => {
self.subscribe_pending_txs(data, rpc_request_id, socket_id).await
}
JsonRpcSubscriptionRequest::Events => todo!(),
JsonRpcSubscriptionRequest::Unsubscribe(SubscriptionIdInput { subscription_id }) => {
let mut sockets = self.api.sockets.lock().await;
Expand Down Expand Up @@ -120,26 +128,95 @@ impl JsonRpcHandler {
.get_block(&BlockId::Number(block_n))
.map_err(ApiError::StarknetDevnetError)?;

let old_header = old_block.into();
let notification = SubscriptionNotification::NewHeads(Box::new(old_header));
let old_header = Box::new(old_block.into());
let notification = SubscriptionNotification::NewHeads(old_header);
socket_context.notify(subscription_id, notification).await;
}

Ok(())
}

/// Based on block generation mode and specified block ID, decide on subscription's sensitivity:
/// Based on pending block usage and specified block ID, decide on subscription's sensitivity:
/// notify of changes in pending or latest block
fn get_subscription_tag(&self, block_id: BlockId) -> BlockTag {
match self.starknet_config.block_generation_on {
BlockGenerationOn::Transaction => BlockTag::Latest,
BlockGenerationOn::Demand | BlockGenerationOn::Interval(_) => match block_id {
if self.starknet_config.uses_pending_block() {
match block_id {
BlockId::Tag(tag) => tag,
BlockId::Hash(_) | BlockId::Number(_) => BlockTag::Pending,
},
}
} else {
BlockTag::Latest
}
}

async fn get_pending_txs(&self) -> Result<Vec<TransactionWithHash>, ApiError> {
let starknet = self.api.starknet.lock().await;
let block = starknet.get_block_with_transactions(&BlockId::Tag(BlockTag::Pending))?;
match block {
BlockResult::PendingBlock(PendingBlock {
transactions: Transactions::Full(txs),
..
}) => Ok(txs),
_ => {
// Never reached if get_block_with_transactions properly implemented.
Err(ApiError::StarknetDevnetError(Error::UnexpectedInternalError {
msg: "Invalid block".into(),
}))
}
}
}

/// Does not return TOO_MANY_ADDRESSES_IN_FILTER
pub async fn subscribe_pending_txs(
&self,
maybe_subscription_input: Option<PendingTransactionsSubscriptionInput>,
rpc_request_id: Id,
socket_id: SocketId,
) -> Result<(), ApiError> {
let with_details = maybe_subscription_input
.as_ref()
.and_then(|subscription_input| subscription_input.transaction_details)
.unwrap_or_default();

let address_filter = AddressFilter::new(
maybe_subscription_input
.and_then(|subscription_input| subscription_input.sender_address)
.unwrap_or_default(),
);

let mut sockets = self.api.sockets.lock().await;
let socket_context = sockets.get_mut(&socket_id).ok_or(ApiError::StarknetDevnetError(
Error::UnexpectedInternalError { msg: format!("Unregistered socket ID: {socket_id}") },
))?;

let subscription = if with_details {
Subscription::PendingTransactionsFull { address_filter }
} else {
Subscription::PendingTransactionsHash { address_filter }
};
let subscription_id = socket_context.subscribe(rpc_request_id, subscription).await;

// Only check pending. Regardless of block generation mode, ignore txs in latest block.
let pending_txs = self.get_pending_txs().await?;
for tx in pending_txs {
let notification = if with_details {
SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Full(
Box::new(tx),
))
} else {
SubscriptionNotification::PendingTransaction(PendingTransactionNotification::Hash(
TransactionHashWrapper {
hash: *tx.get_transaction_hash(),
sender_address: tx.get_sender_address(),
},
))
};
socket_context.notify(subscription_id, notification).await;
}

Ok(())
}

async fn subscribe_tx_status(
&self,
transaction_block_input: TransactionBlockInput,
Expand Down Expand Up @@ -167,10 +244,9 @@ impl JsonRpcHandler {
// TODO if tx present, but in a block before the one specified, no point in subscribing -
// its status shall never change (unless considering block abortion). It would make
// sense to just add a ReorgSubscription
let subscription = Subscription::TransactionStatus {
tag: self.get_subscription_tag(query_block_id),
transaction_hash,
};
let subscription_tag = self.get_subscription_tag(query_block_id);
let subscription =
Subscription::TransactionStatus { tag: subscription_tag, transaction_hash };
let subscription_id = socket_context.subscribe(rpc_request_id, subscription).await;

let starknet = self.api.starknet.lock().await;
Expand All @@ -179,6 +255,7 @@ impl JsonRpcHandler {
let notification = SubscriptionNotification::TransactionStatus(NewTransactionStatus {
transaction_hash,
status: tx.get_status(),
origin_tag: subscription_tag,
});
match tx.get_block_number() {
Some(BlockNumber(block_number))
Expand Down
89 changes: 66 additions & 23 deletions crates/starknet-devnet-server/src/api/json_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use futures::{SinkExt, StreamExt};
use models::{
BlockAndClassHashInput, BlockAndContractAddressInput, BlockAndIndexInput, BlockInput,
CallInput, EstimateFeeInput, EventsInput, GetStorageInput, L1TransactionHashInput,
SubscriptionIdInput, TransactionBlockInput, TransactionHashInput, TransactionHashOutput,
PendingTransactionsSubscriptionInput, SubscriptionIdInput, TransactionBlockInput,
TransactionHashInput, TransactionHashOutput,
};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::json;
use starknet_core::starknet::starknet_config::{BlockGenerationOn, DumpOn, StarknetConfig};
use starknet_core::starknet::starknet_config::{DumpOn, StarknetConfig};
use starknet_core::{CasmContractClass, StarknetBlock};
use starknet_rs_core::types::{BlockId, BlockTag, ContractClass as CodegenContractClass, Felt};
use starknet_types::messaging::{MessageToL1, MessageToL2};
Expand Down Expand Up @@ -71,7 +72,10 @@ use crate::rpc_core::error::{ErrorCode, RpcError};
use crate::rpc_core::request::RpcMethodCall;
use crate::rpc_core::response::{ResponseResult, RpcResponse};
use crate::rpc_handler::RpcHandler;
use crate::subscribe::{NewTransactionStatus, SocketContext, SocketId, SubscriptionNotification};
use crate::subscribe::{
NewTransactionStatus, PendingTransactionNotification, SocketContext, SocketId,
SubscriptionNotification, TransactionHashWrapper,
};
use crate::ServerConfig;

/// Helper trait to easily convert results to rpc results
Expand Down Expand Up @@ -227,21 +231,42 @@ impl JsonRpcHandler {

if new_pending_txs.len() > old_pending_txs.len() {
#[allow(clippy::expect_used)]
let new_tx = new_pending_txs.last().expect("has at least one element");
let new_tx_hash = new_pending_txs.last().expect("has at least one element");

let starknet = self.api.starknet.lock().await;

let status = starknet
.get_transaction_execution_and_finality_status(*new_tx)
.get_transaction_execution_and_finality_status(*new_tx_hash)
.map_err(error::ApiError::StarknetDevnetError)?;
let tx_status_notification =
SubscriptionNotification::TransactionStatus(NewTransactionStatus {
transaction_hash: *new_tx,
transaction_hash: *new_tx_hash,
status,
origin_tag: BlockTag::Pending,
});

let tx = starknet
.get_transaction_by_hash(*new_tx_hash)
.map_err(error::ApiError::StarknetDevnetError)?;
let pending_tx_notification = SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Full(Box::new(tx.clone())),
);

let pending_tx_hash_notification = SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Hash(TransactionHashWrapper {
hash: *tx.get_transaction_hash(),
sender_address: tx.get_sender_address(),
}),
);

let notifications =
[tx_status_notification, pending_tx_notification, pending_tx_hash_notification];

let sockets = self.api.sockets.lock().await;
for (_, socket_context) in sockets.iter() {
socket_context.notify_subscribers(&tx_status_notification, BlockTag::Pending).await;
for notification in &notifications {
socket_context.notify_subscribers(notification).await;
}
}
}

Expand All @@ -252,27 +277,46 @@ impl JsonRpcHandler {
&self,
new_latest_block: StarknetBlock,
) -> Result<(), error::ApiError> {
let block_header = (&new_latest_block).into();
let block_notification = SubscriptionNotification::NewHeads(Box::new(block_header));
let block_header = Box::new((&new_latest_block).into());
let mut notifications = vec![SubscriptionNotification::NewHeads(block_header)];

let starknet = self.api.starknet.lock().await;

let mut tx_status_notifications = vec![];
for tx_hash in new_latest_block.get_transactions() {
let status = starknet
.get_transaction_execution_and_finality_status(*tx_hash)
.map_err(error::ApiError::StarknetDevnetError)?;

tx_status_notifications.push(SubscriptionNotification::TransactionStatus(
NewTransactionStatus { transaction_hash: *tx_hash, status },
));
notifications.push(SubscriptionNotification::TransactionStatus(NewTransactionStatus {
transaction_hash: *tx_hash,
status,
origin_tag: BlockTag::Latest,
}));

// There are no pending txs in this mode, but basically we are pretending that the
// transaction existed for a short period of time in the pending block, thus triggering
// the notification. This is important for users depending on this subscription type to
// find out about all new transactions.
if !self.starknet_config.uses_pending_block() {
let tx = starknet
.get_transaction_by_hash(*tx_hash)
.map_err(error::ApiError::StarknetDevnetError)?;
notifications.push(SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Full(Box::new(tx.clone())),
));
notifications.push(SubscriptionNotification::PendingTransaction(
PendingTransactionNotification::Hash(TransactionHashWrapper {
hash: *tx_hash,
sender_address: tx.get_sender_address(),
}),
));
}
}

let sockets = self.api.sockets.lock().await;
for (_, socket_context) in sockets.iter() {
socket_context.notify_subscribers(&block_notification, BlockTag::Latest).await;
for tx_status_notification in tx_status_notifications.iter() {
socket_context.notify_subscribers(tx_status_notification, BlockTag::Latest).await;
for notification in &notifications {
socket_context.notify_subscribers(notification).await;
}
}

Expand Down Expand Up @@ -312,11 +356,10 @@ impl JsonRpcHandler {

// for later comparison and subscription notifications
let old_latest_block = self.get_block(BlockTag::Latest).await;
let old_pending_block = match self.starknet_config.block_generation_on {
BlockGenerationOn::Transaction => None,
BlockGenerationOn::Interval(_) | BlockGenerationOn::Demand => {
Some(self.get_block(BlockTag::Pending).await)
}
let old_pending_block = if self.starknet_config.uses_pending_block() {
Some(self.get_block(BlockTag::Pending).await)
} else {
None
};

// true if origin should be tried after request fails; relevant in forking mode
Expand Down Expand Up @@ -711,8 +754,8 @@ pub enum JsonRpcSubscriptionRequest {
NewHeads(Option<BlockInput>),
#[serde(rename = "starknet_subscribeTransactionStatus")]
TransactionStatus(TransactionBlockInput),
#[serde(rename = "starknet_subscribePendingTransactions")]
PendingTransactions,
#[serde(rename = "starknet_subscribePendingTransactions", with = "optional_params")]
PendingTransactions(Option<PendingTransactionsSubscriptionInput>),
#[serde(rename = "starknet_subscribeEvents")]
Events,
#[serde(rename = "starknet_unsubscribe")]
Expand Down
7 changes: 7 additions & 0 deletions crates/starknet-devnet-server/src/api/json_rpc/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ pub struct TransactionBlockInput {
pub block: Option<BlockId>,
}

#[derive(Deserialize, Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct PendingTransactionsSubscriptionInput {
pub transaction_details: Option<bool>,
pub sender_address: Option<Vec<ContractAddress>>,
}

#[cfg(test)]
mod tests {
use starknet_rs_core::types::{BlockId as ImportedBlockId, BlockTag, Felt};
Expand Down
Loading

0 comments on commit 979a986

Please sign in to comment.