Skip to content

Commit

Permalink
Disallow futures::executor::block_on (MystenLabs#10912)
Browse files Browse the repository at this point in the history
## Description 
* Adds clippy warning wherever `futures::executor::block_on` is used
* Area owners should address. Looks like mostly in indexer, narwhal,
sui-json, sui-json-rpc, sui-core

For more context see: MystenLabs#10910

## Test Plan 

N/A

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
oxade authored Apr 18, 2023
1 parent bd2b509 commit f53b960
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 27 deletions.
2 changes: 2 additions & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ disallowed-methods = [
{ path = "tokio::sync::mpsc::unbounded_channel", reason = "use a bounded channel instead" },
{ path = "futures::channel::mpsc::unbounded", reason = "use a bounded channel instead" },
{ path = "futures_channel::mpsc::unbounded", reason = "use a bounded channel instead" },
# known to cause blocking issues
{ path = "futures::executor::block_on", reason = "use tokio::runtime::runtime::Runtime::block_on instead"},
# bincode::deserialize_from is easy to shoot your foot with
{ path = "bincode::deserialize_from", reason = "use bincode::deserialize instead" },
]
2 changes: 1 addition & 1 deletion crates/sui-indexer/benches/indexer_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn indexer_benchmark(c: &mut Criterion) {
let pw = env::var("POSTGRES_PASSWORD").unwrap_or_else(|_| "postgrespw".into());
let db_url = format!("postgres://postgres:{pw}@{pg_host}:{pg_port}");

let rt = Runtime::new().unwrap();
let rt: Runtime = Runtime::new().unwrap();
let (mut checkpoints, store) = rt.block_on(async {
let (blocking_cp, async_cp) = new_pg_connection_pool(&db_url).await.unwrap();
reset_database(&mut blocking_cp.get().unwrap(), true).unwrap();
Expand Down
11 changes: 10 additions & 1 deletion crates/sui-indexer/src/apis/indexer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ impl<S> IndexerApiServer for IndexerApi<S>
where
S: IndexerStore + Sync + Send + 'static,
{
// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn get_owned_objects(
&self,
address: SuiAddress,
Expand All @@ -343,7 +345,8 @@ where
}
block_on(self.get_owned_objects_internal(address, query, cursor, limit))
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn query_transaction_blocks(
&self,
query: SuiTransactionBlockResponseQuery,
Expand Down Expand Up @@ -377,6 +380,8 @@ where
))?)
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn query_events(
&self,
query: EventFilter,
Expand Down Expand Up @@ -407,6 +412,8 @@ where
))?)
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn get_dynamic_fields(
&self,
parent_object_id: ObjectID,
Expand All @@ -426,6 +433,8 @@ where
df_resp
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn get_dynamic_field_object(
&self,
parent_object_id: ObjectID,
Expand Down
19 changes: 18 additions & 1 deletion crates/sui-indexer/src/apis/read_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ impl<S> ReadApiServer for ReadApi<S>
where
S: IndexerStore + Sync + Send + 'static,
{
// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn get_object(
&self,
object_id: ObjectID,
Expand All @@ -147,6 +149,8 @@ where
Ok(block_on(self.get_object_internal(object_id, options))?)
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn multi_get_objects(
&self,
object_ids: Vec<ObjectID>,
Expand Down Expand Up @@ -203,6 +207,8 @@ where
.await?)
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn multi_get_transaction_blocks(
&self,
digests: Vec<TransactionDigest>,
Expand All @@ -227,6 +233,8 @@ where
)?)
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn try_get_past_object(
&self,
object_id: ObjectID,
Expand All @@ -246,6 +254,8 @@ where
past_obj_resp
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn try_multi_get_past_objects(
&self,
past_objects: Vec<SuiGetPastObjectRequest>,
Expand Down Expand Up @@ -301,6 +311,8 @@ where
Ok(self.state.get_checkpoint(id).await?)
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn get_checkpoints(
&self,
cursor: Option<BigInt<u64>>,
Expand All @@ -320,6 +332,8 @@ where
cps_resp
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn get_checkpoints_deprecated_limit(
&self,
cursor: Option<BigInt<u64>>,
Expand All @@ -329,6 +343,8 @@ where
self.get_checkpoints(cursor, limit.map(|l| *l as usize), descending_order)
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<SuiEvent>> {
let events_guard = self
.state
Expand All @@ -339,7 +355,8 @@ where
events_guard.stop_and_record();
events_resp
}

// TODO: remove this after `futures::executor::block_on` is removed. @Ge @Chris
#[allow(clippy::disallowed_methods)]
fn get_loaded_child_objects(
&self,
digest: TransactionDigest,
Expand Down
104 changes: 80 additions & 24 deletions crates/sui-tool/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,41 @@ pub enum LocalExecError {

#[error("GeneralError: {:#?}", err)]
GeneralError { err: String },

#[error("ObjectNotExist: {:#?}", id)]
ObjectNotExist { id: ObjectID },

#[error(
"ObjectDeleted: {:#?} at version {:#?} digest {:#?}",
id,
version,
digest
)]
ObjectDeleted {
id: ObjectID,
version: SequenceNumber,
digest: ObjectDigest,
},
}

impl From<SuiObjectResponseError> for LocalExecError {
fn from(err: SuiObjectResponseError) -> Self {
match err {
SuiObjectResponseError::NotExists { object_id } => {
LocalExecError::ObjectNotExist { id: object_id }
}
SuiObjectResponseError::Deleted {
object_id,
digest,
version,
} => LocalExecError::ObjectDeleted {
id: object_id,
version,
digest,
},
_ => LocalExecError::SuiObjectResponseError { err },
}
}
}

impl From<LocalExecError> for SuiError {
Expand Down Expand Up @@ -389,6 +424,8 @@ impl LocalExec {
Ok(objects)
}

// TODO: remove this after `futures::executor::block_on` is removed.
#[allow(clippy::disallowed_methods)]
pub fn download_object(
&self,
object_id: &ObjectID,
Expand Down Expand Up @@ -436,24 +473,42 @@ impl LocalExec {
Ok(o)
}

pub fn download_latest_object(&self, object_id: &ObjectID) -> Result<Object, LocalExecError> {
// TODO: replace use of `block_on`
block_on(self.download_latest_object_impl(object_id))
// TODO: remove this after `futures::executor::block_on` is removed.
#[allow(clippy::disallowed_methods)]
pub fn download_latest_object(
&self,
object_id: &ObjectID,
) -> Result<Option<Object>, LocalExecError> {
block_on({
info!("Downloading latest object {object_id}");
self.download_latest_object_impl(object_id)
})
}

pub async fn download_latest_object_impl(
&self,
object_id: &ObjectID,
) -> Result<Object, LocalExecError> {
) -> Result<Option<Object>, LocalExecError> {
let options = SuiObjectDataOptions::bcs_lossless();
let object = self
.client
.read_api()
.get_object_with_options(*object_id, options)
.await
.map_err(|q| LocalExecError::SuiRpcError { err: q.to_string() })?;

obj_from_sui_obj_response(&object)
self
.client
.read_api()
.get_object_with_options(*object_id, options)
.await.map(|q| match obj_from_sui_obj_response(&q){
Ok(v) => Ok(Some(v)),
Err(LocalExecError::ObjectNotExist { id }) => {
error!("Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed.");
Ok(None)
}
Err(LocalExecError::ObjectDeleted { id, version, digest }) => {
error!("Object {id} {version} {digest} was deleted on RPC server.");
Ok(None)
},
Err(err) => Err(LocalExecError::SuiRpcError {
err: err.to_string(),
})
})?
}

pub async fn execute_all_in_checkpoint(
Expand Down Expand Up @@ -624,13 +679,16 @@ impl LocalExec {
Ok(())
}

pub fn get_or_download_object(&self, obj_id: &ObjectID) -> Result<Object, LocalExecError> {
pub fn get_or_download_object(
&self,
obj_id: &ObjectID,
) -> Result<Option<Object>, LocalExecError> {
if let Some(obj) = self.package_cache.lock().expect("Cannot lock").get(obj_id) {
return Ok(obj.clone());
return Ok(Some(obj.clone()));
};

let o = match self.store.get(obj_id) {
Some(obj) => obj.clone(),
Some(obj) => Some(obj.clone()),
None => {
assert!(
!self.system_package_ids().contains(obj_id),
Expand All @@ -639,6 +697,7 @@ impl LocalExec {
self.download_latest_object(obj_id)?
}
};
let Some(o) = o else { return Ok(None) };

if o.is_package() {
self.package_cache
Expand All @@ -651,7 +710,7 @@ impl LocalExec {
.lock()
.expect("Cannot lock")
.insert((o_ref.0, o_ref.1), o.clone());
Ok(o)
Ok(Some(o))
}

/// Must be called after `populate_protocol_version_tables`
Expand Down Expand Up @@ -1061,8 +1120,7 @@ impl BackingPackageStore for LocalExec {
fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<Object>> {
// If package not present fetch it from the network
self.get_or_download_object(package_id)
.map(Some)
.map_err(|e| e.into())
.map_err(|e| SuiError::GenericStorageError(e.to_string()))
}
}

Expand Down Expand Up @@ -1101,7 +1159,9 @@ impl ResourceResolver for LocalExec {
address: &AccountAddress,
typ: &StructTag,
) -> Result<Option<Vec<u8>>, Self::Error> {
let object: Object = self.get_or_download_object(&ObjectID::from(*address))?;
let Some(object) = self.get_or_download_object(&ObjectID::from(*address))? else {
return Ok(None);
};

match &object.data {
Data::Move(m) => {
Expand Down Expand Up @@ -1189,12 +1249,8 @@ impl GetModule for LocalExec {
}

fn obj_from_sui_obj_response(o: &SuiObjectResponse) -> Result<Object, LocalExecError> {
let o: Result<SuiObjectData, anyhow::Error> = Ok(o
.object()
.map_err(|q| LocalExecError::SuiObjectResponseError { err: q })?
.clone());

obj_from_sui_obj_data(&o.map_err(|q| LocalExecError::GeneralError { err: q.to_string() })?)
let o = o.object().map_err(LocalExecError::from)?.clone();
obj_from_sui_obj_data(&o)
}

fn obj_from_sui_obj_data(o: &SuiObjectData) -> Result<Object, LocalExecError> {
Expand Down
2 changes: 2 additions & 0 deletions narwhal/.clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ disallowed-methods = [
{ path = "log::debug", reason = "use tracing::debug instead" },
{ path = "log::error", reason = "use tracing::error instead" },
{ path = "log::warn", reason = "use tracing::warn instead" },
# known to cause blocking issues
{ path = "futures::executor::block_on", reason = "use tokio::runtime::runtime::Runtime::block_on instead"},
# unbounded channels are for expert use only
{ path = "tokio::sync::mpsc::unbounded_channel", reason = "use a bounded channel instead" },
{ path = "futures::channel::mpsc::unbounded", reason = "use a bounded channel instead" },
Expand Down

0 comments on commit f53b960

Please sign in to comment.