Skip to content

Commit

Permalink
Remove IdentityFinalizer and move it into IdentityProcessor.
Browse files Browse the repository at this point in the history
  • Loading branch information
piohei committed Jun 20, 2024
1 parent 665a357 commit 66d1306
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 134 deletions.
19 changes: 11 additions & 8 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,22 @@ impl App {
));

let identity_processor: Arc<dyn IdentityProcessor> = if config.offchain_mode.enabled {
Arc::new(OffChainIdentityProcessor::new(database.clone()))
Arc::new(OffChainIdentityProcessor::new(database.clone()).await?)
} else {
let ethereum = Ethereum::new(&config).await?;

let identity_manager = Arc::new(IdentityManager::new(&config, ethereum.clone()).await?);

Arc::new(OnChainIdentityProcessor::new(
ethereum.clone(),
config.clone(),
database.clone(),
identity_manager.clone(),
prover_repository.clone(),
)?)
Arc::new(
OnChainIdentityProcessor::new(
ethereum.clone(),
config.clone(),
database.clone(),
identity_manager.clone(),
prover_repository.clone(),
)
.await?,
)
};

let identity_validator = IdentityValidator::new(&config);
Expand Down
208 changes: 88 additions & 120 deletions src/identity/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ pub type TransactionId = String;
#[async_trait]
pub trait IdentityProcessor: Send + Sync + 'static {
async fn commit_identities(&self, batch: &BatchEntry) -> anyhow::Result<TransactionId>;
async fn produce_transaction_finalizer(&self) -> anyhow::Result<Box<dyn IdentityFinalizer>>;

async fn finalize_identities(
&self,
processed_tree: &TreeVersion<Intermediate>,
mined_tree: &TreeVersion<Canonical>,
) -> anyhow::Result<()>;

async fn await_clean_slate(&self) -> anyhow::Result<()>;

Expand All @@ -41,21 +46,16 @@ pub trait IdentityProcessor: Send + Sync + 'static {
async fn tree_init_correction(&self, initial_root_hash: &Hash) -> anyhow::Result<()>;
}

#[async_trait]
pub trait IdentityFinalizer: Send + Sync + 'static {
async fn finalize_identities(
&mut self,
processed_tree: &TreeVersion<Intermediate>,
mined_tree: &TreeVersion<Canonical>,
) -> anyhow::Result<()>;
}

pub struct OnChainIdentityProcessor {
ethereum: Ethereum,
config: Config,
database: Arc<Database>,
identity_manager: Arc<IdentityManager>,
prover_repository: Arc<ProverRepository>,

mainnet_scanner: tokio::sync::Mutex<BlockScanner<Arc<ReadProvider>>>,
mainnet_address: Address,
secondary_scanners: tokio::sync::Mutex<HashMap<Address, BlockScanner<Arc<ReadProvider>>>>,
}

#[async_trait]
Expand Down Expand Up @@ -90,31 +90,26 @@ impl IdentityProcessor for OnChainIdentityProcessor {
}
}

async fn produce_transaction_finalizer(&self) -> anyhow::Result<Box<dyn IdentityFinalizer>> {
let mainnet_abi = self.identity_manager.abi();
let secondary_abis = self.identity_manager.secondary_abis();
async fn finalize_identities(
&self,
processed_tree: &TreeVersion<Intermediate>,
mined_tree: &TreeVersion<Canonical>,
) -> anyhow::Result<()> {
let mainnet_logs = self.fetch_mainnet_logs().await?;

let mainnet_scanner = BlockScanner::new_latest(
mainnet_abi.client().clone(),
self.config.app.scanning_window_size,
self.finalize_mainnet_roots(
processed_tree,
&mainnet_logs,
self.config.app.max_epoch_duration,
)
.await?
.with_offset(self.config.app.scanning_chain_head_offset);
.await?;

let secondary_scanners =
Self::init_secondary_scanners(secondary_abis, self.config.app.scanning_window_size)
.await?;
let mut roots = Self::extract_roots_from_mainnet_logs(mainnet_logs);
roots.extend(self.fetch_secondary_logs().await?);

let mainnet_address = mainnet_abi.address();
self.finalize_secondary_roots(mined_tree, roots).await?;

Ok(Box::new(OnChainIdentityFinalizer {
config: self.config.clone(),
database: self.database.clone(),
identity_manager: self.identity_manager.clone(),
mainnet_scanner,
mainnet_address,
secondary_scanners,
}))
Ok(())
}

async fn await_clean_slate(&self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -162,19 +157,39 @@ impl IdentityProcessor for OnChainIdentityProcessor {
}

impl OnChainIdentityProcessor {
pub fn new(
pub async fn new(
ethereum: Ethereum,
config: Config,
database: Arc<Database>,
identity_manager: Arc<IdentityManager>,
prover_repository: Arc<ProverRepository>,
) -> anyhow::Result<Self> {
let mainnet_abi = identity_manager.abi();
let secondary_abis = identity_manager.secondary_abis();

let mainnet_scanner = tokio::sync::Mutex::new(
BlockScanner::new_latest(
mainnet_abi.client().clone(),
config.app.scanning_window_size,
)
.await?
.with_offset(config.app.scanning_chain_head_offset),
);

let secondary_scanners = tokio::sync::Mutex::new(
Self::init_secondary_scanners(secondary_abis, config.app.scanning_window_size).await?,
);

let mainnet_address = mainnet_abi.address();
Ok(Self {
ethereum,
config,
database,
identity_manager,
prover_repository,
mainnet_scanner,
mainnet_address,
secondary_scanners,
})
}

Expand Down Expand Up @@ -326,45 +341,8 @@ impl OnChainIdentityProcessor {

Ok(())
}
}

pub struct OnChainIdentityFinalizer {
mainnet_scanner: BlockScanner<Arc<ReadProvider>>,
mainnet_address: Address,
secondary_scanners: HashMap<Address, BlockScanner<Arc<ReadProvider>>>,

config: Config,
database: Arc<Database>,
identity_manager: Arc<IdentityManager>,
}

#[async_trait]
impl IdentityFinalizer for OnChainIdentityFinalizer {
async fn finalize_identities(
&mut self,
processed_tree: &TreeVersion<Intermediate>,
mined_tree: &TreeVersion<Canonical>,
) -> anyhow::Result<()> {
let mainnet_logs = self.fetch_mainnet_logs().await?;

self.finalize_mainnet_roots(
processed_tree,
&mainnet_logs,
self.config.app.max_epoch_duration,
)
.await?;

let mut roots = Self::extract_roots_from_mainnet_logs(mainnet_logs);
roots.extend(self.fetch_secondary_logs().await?);

self.finalize_secondary_roots(mined_tree, roots).await?;

Ok(())
}
}

impl OnChainIdentityFinalizer {
async fn fetch_mainnet_logs(&mut self) -> anyhow::Result<Vec<Log>>
async fn fetch_mainnet_logs(&self) -> anyhow::Result<Vec<Log>>
where
<ReadProvider as Middleware>::Error: 'static,
{
Expand All @@ -377,15 +355,16 @@ impl OnChainIdentityFinalizer {

let mainnet_address = Some(ValueOrArray::Value(self.mainnet_address));

let mainnet_logs = self
.mainnet_scanner
let mut mainnet_scanner = self.mainnet_scanner.lock().await;

let mainnet_logs = mainnet_scanner
.next(mainnet_address, mainnet_topics.clone())
.await?;

Ok(mainnet_logs)
}

async fn fetch_secondary_logs(&mut self) -> anyhow::Result<Vec<U256>>
async fn fetch_secondary_logs(&self) -> anyhow::Result<Vec<U256>>
where
<ReadProvider as Middleware>::Error: 'static,
{
Expand All @@ -398,12 +377,16 @@ impl OnChainIdentityFinalizer {

let mut secondary_logs = vec![];

for (address, scanner) in self.secondary_scanners.iter_mut() {
let logs = scanner
.next(Some(ValueOrArray::Value(*address)), bridged_topics.clone())
.await?;
{
let mut secondary_scanners = self.secondary_scanners.lock().await;

secondary_logs.extend(logs);
for (address, scanner) in secondary_scanners.iter_mut() {
let logs = scanner
.next(Some(ValueOrArray::Value(*address)), bridged_topics.clone())
.await?;

secondary_logs.extend(logs);
}
}

let roots = Self::extract_roots_from_secondary_logs(&secondary_logs);
Expand Down Expand Up @@ -568,57 +551,19 @@ impl OnChainIdentityFinalizer {
}

pub struct OffChainIdentityProcessor {
finalizer: Arc<OffChainIdentityFinalizer>,
committed_batches: Arc<Mutex<Vec<BatchEntry>>>,
database: Arc<Database>,
}

#[async_trait]
impl IdentityProcessor for OffChainIdentityProcessor {
async fn commit_identities(&self, batch: &BatchEntry) -> anyhow::Result<TransactionId> {
self.finalizer.add_batch(batch.clone());
self.add_batch(batch.clone());
Ok(batch.id.to_string())
}

async fn produce_transaction_finalizer(&self) -> anyhow::Result<Box<dyn IdentityFinalizer>> {
Ok(Box::new(self.finalizer.clone()))
}

async fn await_clean_slate(&self) -> anyhow::Result<()> {
// For off chain mode we don't need to wait as transactions are instantly done
Ok(())
}

async fn mine_transaction(&self, _transaction_id: TransactionId) -> anyhow::Result<bool> {
// For off chain mode we don't mine transactions, so we treat all of them as
// mined
Ok(true)
}

async fn tree_init_correction(&self, _initial_root_hash: &Hash) -> anyhow::Result<()> {
// For off chain mode we don't correct tree at all
Ok(())
}
}

impl OffChainIdentityProcessor {
pub fn new(database: Arc<Database>) -> Self {
OffChainIdentityProcessor {
finalizer: Arc::new(OffChainIdentityFinalizer {
committed_batches: Arc::new(Mutex::new(Default::default())),
database,
}),
}
}
}

pub struct OffChainIdentityFinalizer {
committed_batches: Arc<Mutex<Vec<BatchEntry>>>,
database: Arc<Database>,
}

#[async_trait]
impl IdentityFinalizer for Arc<OffChainIdentityFinalizer> {
async fn finalize_identities(
&mut self,
&self,
processed_tree: &TreeVersion<Intermediate>,
mined_tree: &TreeVersion<Canonical>,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -648,9 +593,32 @@ impl IdentityFinalizer for Arc<OffChainIdentityFinalizer> {

Ok(())
}

async fn await_clean_slate(&self) -> anyhow::Result<()> {
// For off chain mode we don't need to wait as transactions are instantly done
Ok(())
}

async fn mine_transaction(&self, _transaction_id: TransactionId) -> anyhow::Result<bool> {
// For off chain mode we don't mine transactions, so we treat all of them as
// mined
Ok(true)
}

async fn tree_init_correction(&self, _initial_root_hash: &Hash) -> anyhow::Result<()> {
// For off chain mode we don't correct tree at all
Ok(())
}
}

impl OffChainIdentityFinalizer {
impl OffChainIdentityProcessor {
pub async fn new(database: Arc<Database>) -> anyhow::Result<Self> {
Ok(OffChainIdentityProcessor {
committed_batches: Arc::new(Mutex::new(Default::default())),
database,
})
}

fn add_batch(&self, batch_entry: BatchEntry) {
let mut committed_batches = self.committed_batches.lock().unwrap();

Expand Down
7 changes: 1 addition & 6 deletions src/task_monitor/tasks/finalize_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,8 @@ use std::sync::Arc;
use crate::app::App;

pub async fn finalize_roots(app: Arc<App>) -> anyhow::Result<()> {
let mut transaction_finalizer = app
.identity_processor
.produce_transaction_finalizer()
.await?;

loop {
transaction_finalizer
app.identity_processor
.finalize_identities(
app.tree_state()?.processed_tree(),
app.tree_state()?.mined_tree(),
Expand Down

0 comments on commit 66d1306

Please sign in to comment.