diff --git a/process-compose/bridge/process-compose.bridge-relayer-indexer.yml b/process-compose/bridge/process-compose.bridge-relayer-indexer.yml new file mode 100644 index 000000000..2ab7594e3 --- /dev/null +++ b/process-compose/bridge/process-compose.bridge-relayer-indexer.yml @@ -0,0 +1,26 @@ +version: "3" + +processes: + + postgres: + command: | + ./scripts/postgres/start-dev + + readiness_probe: + initial_delay_seconds: 5 + exec: + command: echo "true" + bridge: + command: | + RUST_BACKTRACE=1 bridge-service + readiness_probe: + initial_delay_seconds: 5 + exec: + command: echo "true" + depends_on: + postgres: + condition: process_healthy + bridge_setup: + condition: process_healthy + environment: + - "RELAYER_START_INDEXER=true" diff --git a/protocol-units/bridge/indexer-db/src/client.rs b/protocol-units/bridge/indexer-db/src/client.rs index 59e0ab8de..038013661 100644 --- a/protocol-units/bridge/indexer-db/src/client.rs +++ b/protocol-units/bridge/indexer-db/src/client.rs @@ -92,12 +92,13 @@ impl Client { Ok(BridgeEventPackage { initiated_events, completed_events }) } - /// Inserts a new transfer action into the database. - pub fn insert_transfer_action( + /// Inserts a new relayer actions into the database. + pub fn insert_relayer_actions( &mut self, bridge_transfer_id: BridgeTransferId, action_type: TransferActionType, ) -> Result<(), diesel::result::Error> { + tracing::info!("Indexer insert_relayer_actions bridge_transfer_id:{bridge_transfer_id} action_type: {action_type}"); match action_type { TransferActionType::CompleteBridgeTransfer { bridge_transfer_id, diff --git a/protocol-units/bridge/indexer-db/src/lib.rs b/protocol-units/bridge/indexer-db/src/lib.rs index 1f9655cc2..3b25582ad 100644 --- a/protocol-units/bridge/indexer-db/src/lib.rs +++ b/protocol-units/bridge/indexer-db/src/lib.rs @@ -1,8 +1,7 @@ use crate::client::Client; use bridge_config::Config; use bridge_util::chains::bridge_contracts::BridgeContractMonitoring; -use bridge_util::types::BridgeTransferId; -use bridge_util::TransferActionType; +use bridge_util::TransferAction; use tokio::select; use tokio::sync::mpsc; use tokio_stream::StreamExt; @@ -19,12 +18,13 @@ pub async fn run_indexer_client< config: Config, mut stream_source: impl BridgeContractMonitoring
, mut stream_target: impl BridgeContractMonitoring
, - relayer_actions: Option>, + mut relayer_actions_rx: Option>, ) -> Result<(), anyhow::Error> where Vec: From, Vec: From, { + tracing::info!("Starting bridge indexer."); let mut indexer_db_client = match Client::from_bridge_config(&config) { Ok(mut client) => { client.run_migrations()?; @@ -57,6 +57,25 @@ where tracing::error!("Indexer: Target event integration return an error:{err}") } } + // Wait for relayer source and target processed action. + Some(action) = conditional_relayer_recv(&mut relayer_actions_rx) =>{ + if let Err(err) = indexer_db_client + .insert_relayer_actions(action.transfer_id, action.kind) + .map_err(|err| err.to_string()) + { + tracing::error!("Indexer: Target event integration return an error:{err}") + } + } } } } + +//Adapt optional relayer future to be pooled by select. +async fn conditional_relayer_recv( + rx: &mut Option>, +) -> Option { + match rx { + Some(ref mut rx) => rx.recv().await, + None => None, + } +} diff --git a/protocol-units/bridge/service/bin/start_indexer.rs b/protocol-units/bridge/service/bin/start_indexer.rs index 962664fd2..a1d2ab921 100644 --- a/protocol-units/bridge/service/bin/start_indexer.rs +++ b/protocol-units/bridge/service/bin/start_indexer.rs @@ -1,6 +1,5 @@ use anyhow::Result; use bridge_config::Config; -use bridge_indexer_db::run_indexer_client; use bridge_service::chains::ethereum::event_monitoring::EthMonitoring; use bridge_service::chains::movement::event_monitoring::MovementMonitoring; use bridge_service::rest::BridgeRest; @@ -62,7 +61,12 @@ async fn main() -> Result<()> { tracing::info!("Bridge Eth and Movement Inited. Starting bridge loop."); // Start indexer - let indexer_jh = tokio::spawn(run_indexer_client(bridge_config, eth_stream, mvt_stream, None)); + let indexer_jh = tokio::spawn(bridge_indexer_db::run_indexer_client( + bridge_config, + eth_stream, + mvt_stream, + None, + )); tokio::select! { res = eth_healh_check_jh => { diff --git a/protocol-units/bridge/service/src/main.rs b/protocol-units/bridge/service/src/main.rs index 2a22eb399..122559144 100644 --- a/protocol-units/bridge/service/src/main.rs +++ b/protocol-units/bridge/service/src/main.rs @@ -4,6 +4,7 @@ use bridge_grpc::{ bridge_server::BridgeServer, health_check_response::ServingStatus, health_server::HealthServer, }; use bridge_util::chains::check_monitoring_health; +use std::error::Error; //use bridge_indexer_db::client::Client; use bridge_service::{ chains::{ @@ -97,13 +98,42 @@ async fn main() -> Result<()> { let mvt_healh_check_jh = tokio::spawn(check_monitoring_health("Mvt", mvt_client_health_tx, mvt_rest_health_rx)); + // If needed start indexer to relay actions + let action_sender = if std::env::var("RELAYER_START_INDEXER") + .map(|val| val.trim().to_lowercase() == "true") + .unwrap_or(false) + { + let (action_tx, action_rx) = tokio::sync::mpsc::channel(100); + tokio::spawn({ + let eth_stream = eth_stream.child().await; + let mvt_stream = mvt_stream.child().await; + async move { + bridge_indexer_db::run_indexer_client( + bridge_config, + eth_stream, + mvt_stream, + Some(action_rx), + ) + .await + } + }); + Some(action_tx) + } else { + None + }; + // Start relay in L1-> L2 direction let loop_jh1 = tokio::spawn({ let eth_stream = eth_stream.child().await; let mvt_stream = mvt_stream.child().await; + let action_sender = action_sender.clone(); async move { bridge_service::relayer::run_relayer_one_direction( - "Eth->Mvt", eth_stream, mvt_client, mvt_stream, + "Eth->Mvt", + eth_stream, + mvt_client, + mvt_stream, + action_sender.clone(), ) .await } @@ -112,7 +142,11 @@ async fn main() -> Result<()> { // Start relay in L2-> L1 direction let loop_jh2 = tokio::spawn(async move { bridge_service::relayer::run_relayer_one_direction( - "Mvt->Eth", mvt_stream, eth_client, eth_stream, + "Mvt->Eth", + mvt_stream, + eth_client, + eth_stream, + action_sender, ) .await }); diff --git a/protocol-units/bridge/service/src/relayer.rs b/protocol-units/bridge/service/src/relayer.rs index 2a77467f0..113ed6107 100644 --- a/protocol-units/bridge/service/src/relayer.rs +++ b/protocol-units/bridge/service/src/relayer.rs @@ -10,6 +10,7 @@ use bridge_util::{ }; use futures::stream::FuturesUnordered; use std::sync::Arc; +use tokio::sync::mpsc; use tokio::{select, sync::Mutex}; use tokio_stream::StreamExt; @@ -21,6 +22,7 @@ pub async fn run_relayer_one_direction< mut stream_source: impl BridgeContractMonitoring
, client_target: impl BridgeRelayerContract + 'static, mut stream_target: impl BridgeContractMonitoring
, + action_sender: Option>, ) -> Result<(), anyhow::Error> where Vec: From, @@ -43,7 +45,15 @@ where Ok(BridgeContractEvent::Initiated(detail)) => { let event : TransferEvent = BridgeContractEvent::Initiated(detail).into(); tracing::info!("Relayer:{direction}, receive Initiated event :{} ", event.contract_event); - process_event(event, &mut state_runtime, client_target.clone(), client_lock.clone(), &mut client_exec_result_futures); + process_event( + event, + &mut state_runtime, + client_target.clone(), + client_lock.clone(), + &mut client_exec_result_futures, + action_sender.as_ref().cloned(), + ) + .await; } Ok(_) => (), //do nothing for other event. Err(err) => tracing::error!("Relayer:{direction} event stream return an error:{err}"), @@ -54,7 +64,15 @@ where Ok(BridgeContractEvent::Completed(detail)) => { let event : TransferEvent = BridgeContractEvent::Completed(detail).into(); tracing::info!("Relayer:{direction}, receive Completed event :{} ", event.contract_event); - process_event(event, &mut state_runtime, client_target.clone(), client_lock.clone(), &mut client_exec_result_futures); + process_event( + event, + &mut state_runtime, + client_target.clone(), + client_lock.clone(), + &mut client_exec_result_futures, + action_sender.as_ref().cloned(), + ) + .await; } Ok(_) => (), //do nothing for other event. Err(err) => tracing::error!("Relayer:{direction} event stream return an error:{err}"), @@ -68,7 +86,13 @@ where Ok(Err(err)) => { // Manage Tx execution error if let Some(action) = state_runtime.process_action_exec_error(err) { - execute_action(action, &mut state_runtime, client_target.clone(), client_lock.clone(), &mut client_exec_result_futures); + execute_action( + action, + &mut state_runtime, + client_target.clone(), + client_lock.clone(), + &mut client_exec_result_futures, + ); } } Err(err)=>{ @@ -93,7 +117,7 @@ where } } -fn process_event< +async fn process_event< A: std::clone::Clone + std::fmt::Debug, TARGET: Send + TryFrom> + std::clone::Clone + 'static + std::fmt::Debug, >( @@ -104,17 +128,31 @@ fn process_event< client_exec_result_futures_one: &mut FuturesUnordered< tokio::task::JoinHandle>, >, + action_sender: Option>, ) where Vec: From, { match state_runtime.process_event(event) { - Ok(action) => execute_action( - action, - state_runtime, - client_target, - tx_lock, - client_exec_result_futures_one, - ), + Ok(action) => { + if let Some(sender) = action_sender { + // Send in its own task so that the main processing task is never blocked if the send blocks. + tokio::spawn({ + let action = action.clone(); + async move { + if let Err(err) = sender.send(action).await { + tracing::info!("Erreur when sending action to indexer sink: {err}",); + } + } + }); + } + execute_action( + action, + state_runtime, + client_target, + tx_lock, + client_exec_result_futures_one, + ) + } Err(err) => tracing::warn!("Received an invalid event: {err}"), } }