Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
addressed some comments and remove exec when finalized
Browse files Browse the repository at this point in the history
  • Loading branch information
adlrocha committed Jul 26, 2022
1 parent 39e2f60 commit 1f0c0b0
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 83 deletions.
14 changes: 10 additions & 4 deletions actors/hierarchical_sca/src/atomic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ where
fn output(&self, params: LockParams) -> LockableState<T>;
}

/// Return type for all actor functions.
///
/// It returns an option for developers to optionally choose if
/// to return an output in the function.
type ActorResult = anyhow::Result<Option<RawBytes>>;

/// Trait for an actor able to support an atomic execution.
///
/// The functions of this trait represent the set of methods that
Expand All @@ -61,15 +67,15 @@ where
S: Serialize + DeserializeOwned + LockableActorState<T>,
{
/// Locks the state to perform the execution determined by the locking params.
fn lock(params: LockParams) -> anyhow::Result<Option<RawBytes>>;
fn lock(params: LockParams) -> ActorResult;
/// Merges some state to the current state of the actor to prepare for the execution
/// of the protocol.
fn merge(params: MergeParams<T>) -> anyhow::Result<Option<RawBytes>>;
fn merge(params: MergeParams<T>) -> ActorResult;
/// Merges the output state of an execution to the actor and unlocks the state
/// involved in the execution.
fn unlock(params: UnlockParams) -> anyhow::Result<Option<RawBytes>>;
fn unlock(params: UnlockParams) -> ActorResult;
/// Aborts the execution and unlocks the locked state.
fn abort(params: LockParams) -> anyhow::Result<Option<RawBytes>>;
fn abort(params: LockParams) -> ActorResult;
/// Returns the lockable state of the actor.
fn state(params: LockParams) -> S;
}
Expand Down
16 changes: 15 additions & 1 deletion actors/hierarchical_sca/src/cross.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ pub struct StorableMsg {
}
impl Cbor for StorableMsg {}

impl Default for StorableMsg {
fn default() -> Self {
Self {
from: Address::new_id(0),
to: Address::new_id(0),
method: 0,
params: RawBytes::default(),
value: TokenAmount::from(0),
nonce: 0,
}
}
}

#[derive(PartialEq, Eq)]
pub enum HCMsgType {
Unknown = 0,
Expand Down Expand Up @@ -73,7 +86,8 @@ impl StorableMsg {
sig_addr,
)?;
let to = Address::new_hierarchical(sub_id, sig_addr)?;
Ok(Self { from, to, method: METHOD_SEND, params: RawBytes::default(), value, nonce: 0 })
// the nonce and the rest of message fields are set when the message is committed.
Ok(Self { from, to, method: METHOD_SEND, value, ..Default::default() })
}

pub fn hc_type(&self) -> anyhow::Result<HCMsgType> {
Expand Down
77 changes: 57 additions & 20 deletions actors/hierarchical_sca/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,61 @@ pub enum ExecStatus {
/// Data persisted in the SCA for the orchestration of atomic executions.
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct AtomicExec {
pub params: AtomicExecParams,
pub submitted: HashMap<String, Cid>,
pub status: ExecStatus,
/// Parameters of the atomic execution. These parameters also determine
/// the unique ID for the execution.
params: AtomicExecParams,
/// Map that tracks the output submitted by every party involved in the
/// execution.
submitted: HashMap<StringifiedAddr, Cid>,
/// Status of the execution.
status: ExecStatus,
}
impl Cbor for AtomicExec {}

/// The serialization of Address doesn't support
/// undefined addresses. To overcome this problem
/// in order to be able to use addresses as keys of a hashmap
/// we use their string format (thus this type).
type StringifiedAddr = String;

impl AtomicExec {
pub fn new(params: AtomicExecParams) -> Self {
AtomicExec {
params,
submitted: HashMap::<String, Cid>::new(),
submitted: HashMap::<StringifiedAddr, Cid>::new(),
status: ExecStatus::Initialized,
}
}
pub fn status(&self) -> ExecStatus {
self.status
}

pub fn submitted(&self) -> &HashMap<StringifiedAddr, Cid> {
&self.submitted
}

pub fn submitted_mut(&mut self) -> &mut HashMap<StringifiedAddr, Cid> {
&mut self.submitted
}

pub fn params(&self) -> &AtomicExecParams {
&self.params
}

pub fn set_status(&mut self, st: ExecStatus) {
self.status = st;
}
}

/// Parameters used to submit the result of an atomic execution.
#[derive(Clone, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct SubmitExecParams {
/// Cid of the atomic execution for which a submission want to be sent.
pub cid: Cid,
/// Flag to signal if the execution should be aborted.
pub abort: bool,
pub output: atomic::SerializedState, // TODO: LockedState
/// Serialized state for the output (LockableState).
pub output: atomic::SerializedState,
}
impl Cbor for SubmitExecParams {}

Expand All @@ -57,7 +90,7 @@ impl Cbor for SubmitExecParams {}
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct AtomicExecParams {
pub msgs: Vec<StorableMsg>,
pub inputs: HashMap<String, LockedStateInfo>,
pub inputs: HashMap<StringifiedAddr, LockedStateInfo>,
}
impl Cbor for AtomicExecParams {}

Expand Down Expand Up @@ -89,15 +122,15 @@ pub struct LockedStateInfo {
impl Cbor for LockedStateInfo {}

#[derive(PartialEq, Eq, Clone, Serialize_tuple, Deserialize_tuple)]
pub struct MetaExec {
pub struct AtomicExecParamsMeta {
pub msgs_cid: TCid<TAmt<StorableMsg>>,
pub input_cid: TCid<THamt<Address, LockedStateInfo>>,
pub inputs_cid: TCid<THamt<Address, LockedStateInfo>>,
}
impl Cbor for MetaExec {}
impl Cbor for AtomicExecParamsMeta {}

impl MetaExec {
pub fn new<BS: Blockstore>(store: &BS) -> anyhow::Result<MetaExec> {
Ok(Self { msgs_cid: TCid::new_amt(store)?, input_cid: TCid::new_hamt(store)? })
impl AtomicExecParamsMeta {
pub fn new<BS: Blockstore>(store: &BS) -> anyhow::Result<AtomicExecParamsMeta> {
Ok(Self { msgs_cid: TCid::new_amt(store)?, inputs_cid: TCid::new_hamt(store)? })
}
}

Expand All @@ -111,7 +144,7 @@ impl AtomicExecParams {
BS: Blockstore,
RT: Runtime<BS>,
{
let mut out = HashMap::<String, LockedStateInfo>::new();
let mut out = HashMap::<StringifiedAddr, LockedStateInfo>::new();
for (key, val) in self.inputs.iter() {
let addr = Address::from_str(&key)?;
let sn = addr.subnet()?;
Expand All @@ -132,14 +165,14 @@ impl AtomicExecParams {
/// for the execution determines the CID used to uniquely identify the execution.
pub fn cid(&self) -> anyhow::Result<Cid> {
let store = MemoryBlockstore::new();
let mut meta = MetaExec::new(&store)?;
let mut meta = AtomicExecParamsMeta::new(&store)?;

meta.msgs_cid.update(&store, |msgs_array| {
msgs_array.batch_set(self.msgs.clone()).map_err(|e| e.into())
})?;

for (k, v) in self.inputs.iter() {
meta.input_cid.update(&store, |input_map| {
meta.inputs_cid.update(&store, |input_map| {
let addr = Address::from_str(k)?;
input_map.set(addr.to_bytes().into(), v.clone()).map_err(|e| {
e.downcast_wrap(format!("failed to set input map to compute exec cid"))
Expand All @@ -148,7 +181,7 @@ impl AtomicExecParams {
})?;
}

let meta_cid: TCid<TLink<MetaExec>> = TCid::new_link(&store, &meta)?;
let meta_cid: TCid<TLink<AtomicExecParamsMeta>> = TCid::new_link(&store, &meta)?;

Ok(meta_cid.cid())
}
Expand All @@ -157,9 +190,13 @@ impl AtomicExecParams {
/// Computes the common parent for the inputs of the atomic execution.
pub fn is_common_parent(
curr: &SubnetID,
inputs: &HashMap<String, LockedStateInfo>,
inputs: &HashMap<StringifiedAddr, LockedStateInfo>,
) -> anyhow::Result<bool> {
let ks: Vec<String> = inputs.clone().into_keys().collect();
if inputs.len() == 0 {
return Err(anyhow!("wrong length! no inputs in hashmap"));
}

let ks: Vec<&String> = inputs.keys().collect();
let addr = Address::from_str(ks[0].as_str())?;
let mut cp = addr.subnet()?;

Expand All @@ -178,9 +215,9 @@ pub fn is_common_parent(
/// Check if the address is involved in the execution
pub fn is_addr_in_exec(
caller: &Address,
inputs: &HashMap<String, LockedStateInfo>,
inputs: &HashMap<StringifiedAddr, LockedStateInfo>,
) -> anyhow::Result<bool> {
let ks: Vec<String> = inputs.clone().into_keys().collect();
let ks: Vec<&String> = inputs.keys().collect();

for k in ks.iter() {
let addr = Address::from_str(k.as_str())?;
Expand Down
43 changes: 17 additions & 26 deletions actors/hierarchical_sca/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,20 +756,20 @@ impl Actor {
None => {
return Err(actor_error!(
illegal_argument,
format!("execution with cid {} doesn't exist", &cid)
format!("execution with cid {} no longer exist", &cid)
));
}
Some(mut exec) => {
// check if the output is aborted or already succeeded
if exec.status != ExecStatus::Initialized {
if exec.status() != ExecStatus::Initialized {
return Err(actor_error!(
illegal_state,
format!("execution with cid {} doesn't exist", &cid)
format!("execution with cid {} no longer exist", &cid)
));
}

// check if the user is involved in the execution
if !is_addr_in_exec(&caller, &exec.params.inputs).map_err(|e| {
if !is_addr_in_exec(&caller, &exec.params().inputs).map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_ARGUMENT,
"error checking if address is involved in the execution",
Expand All @@ -784,7 +784,7 @@ impl Actor {
// check if the address already submitted an output
// FIXME: At this point we don't support the atomic execution between
// the same address in different subnets. This can be easily supported if needed.
match exec.submitted.get(&caller.to_string()) {
match exec.submitted().get(&caller.to_string()) {
Some(_) => {
return Err(actor_error!(
illegal_argument,
Expand All @@ -797,11 +797,12 @@ impl Actor {
// check if this is an abort
if params.abort {
// mutate status
exec.status = ExecStatus::Aborted;
out_status = exec.status;
exec.set_status(ExecStatus::Aborted);
out_status = exec.status();
// propagate result to subnet
st.propagate_exec_result(
rt.store(),
&cid.into(),
&exec,
params.output,
rt.curr_epoch(),
Expand All @@ -813,32 +814,26 @@ impl Actor {
"error propagating execution result to subnets",
)
})?;
// persist the execution
st.set_atomic_exec(rt.store(), &cid.into(), exec).map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"error putting aborted atomic execution in registry",
)
})?;
return Ok(());
}

// if not aborting
let output_cid = params.output.cid();
// check if all the submitted are equal to current cid
let out_cids: Vec<Cid> = exec.submitted.values().cloned().collect();
let out_cids: Vec<Cid> = exec.submitted().values().cloned().collect();
if !out_cids.iter().all(|&c| c == output_cid) {
return Err(actor_error!(
illegal_argument,
format!("cid provided not equal to the ones submitted: {}", &cid)
));
}
exec.submitted.insert(caller.to_string(), output_cid);
exec.submitted_mut().insert(caller.to_string(), output_cid);
// if all submissions collected
if exec.submitted.len() == exec.params.inputs.len() {
exec.status = ExecStatus::Success;
if exec.submitted().len() == exec.params().inputs.len() {
exec.set_status(ExecStatus::Success);
st.propagate_exec_result(
rt.store(),
&cid.into(),
&exec,
params.output,
rt.curr_epoch(),
Expand All @@ -850,24 +845,20 @@ impl Actor {
"error propagating execution result to subnets",
)
})?;
out_status = exec.status();
return Ok(());
}
out_status = exec.status;
out_status = exec.status();
// persist the execution
st.set_atomic_exec(rt.store(), &cid.into(), exec).map_err(|e| {
e.downcast_default(
ExitCode::USR_ILLEGAL_STATE,
"error putting aborted atomic execution in registry",
)
})?;

// TODO: Clean the execution once is done? Then we need
// to traverse several epochs if we want to have a commands
// that prompts the user with the state of all the execution
// (after succeeding or aborting). If we don't do this we need
// a way to de-duplicate executions, because a user may abort an
// execution and look to make it again (use the epoch?).
}
};

Ok(())
})?;

Expand Down
Loading

0 comments on commit 1f0c0b0

Please sign in to comment.