Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bridge: Implements last relayer mode. #951

Open
wants to merge 18 commits into
base: feature/trusted-relayer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
version: "3"

processes:

postgres:
command: |
./scripts/postgres/start-dev

readiness_probe:
initial_delay_seconds: 5
exec:
command: echo "true"
bridge:
command: |
RUST_BACKTRACE=1 bridge-service
readiness_probe:
initial_delay_seconds: 5
exec:
command: echo "true"
depends_on:
postgres:
condition: process_healthy
bridge_setup:
condition: process_healthy
environment:
- "RELAYER_START_INDEXER=true"
5 changes: 3 additions & 2 deletions protocol-units/bridge/indexer-db/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ impl Client {
Ok(BridgeEventPackage { initiated_events, completed_events })
}

/// Inserts a new transfer action into the database.
pub fn insert_transfer_action(
/// Inserts a new relayer actions into the database.
pub fn insert_relayer_actions(
&mut self,
bridge_transfer_id: BridgeTransferId,
action_type: TransferActionType,
) -> Result<(), diesel::result::Error> {
tracing::info!("Indexer insert_relayer_actions bridge_transfer_id:{bridge_transfer_id} action_type: {action_type}");
match action_type {
TransferActionType::CompleteBridgeTransfer {
bridge_transfer_id,
Expand Down
25 changes: 22 additions & 3 deletions protocol-units/bridge/indexer-db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::client::Client;
use bridge_config::Config;
use bridge_util::chains::bridge_contracts::BridgeContractMonitoring;
use bridge_util::types::BridgeTransferId;
use bridge_util::TransferActionType;
use bridge_util::TransferAction;
use tokio::select;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
Expand All @@ -19,12 +18,13 @@ pub async fn run_indexer_client<
config: Config,
mut stream_source: impl BridgeContractMonitoring<Address = SOURCE>,
mut stream_target: impl BridgeContractMonitoring<Address = TARGET>,
relayer_actions: Option<mpsc::Sender<(BridgeTransferId, TransferActionType)>>,
mut relayer_actions_rx: Option<mpsc::Receiver<TransferAction>>,
) -> Result<(), anyhow::Error>
where
Vec<u8>: From<SOURCE>,
Vec<u8>: From<TARGET>,
{
tracing::info!("Starting bridge indexer.");
let mut indexer_db_client = match Client::from_bridge_config(&config) {
Ok(mut client) => {
client.run_migrations()?;
Expand Down Expand Up @@ -57,6 +57,25 @@ where
tracing::error!("Indexer: Target event integration return an error:{err}")
}
}
// Wait for relayer source and target processed action.
Some(action) = conditional_relayer_recv(&mut relayer_actions_rx) =>{
if let Err(err) = indexer_db_client
.insert_relayer_actions(action.transfer_id, action.kind)
.map_err(|err| err.to_string())
{
tracing::error!("Indexer: Target event integration return an error:{err}")
}
}
}
}
}

//Adapt optional relayer future to be pooled by select.
async fn conditional_relayer_recv(
rx: &mut Option<mpsc::Receiver<TransferAction>>,
) -> Option<TransferAction> {
match rx {
Some(ref mut rx) => rx.recv().await,
None => None,
}
}
8 changes: 6 additions & 2 deletions protocol-units/bridge/service/bin/start_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Result;
use bridge_config::Config;
use bridge_indexer_db::run_indexer_client;
use bridge_service::chains::ethereum::event_monitoring::EthMonitoring;
use bridge_service::chains::movement::event_monitoring::MovementMonitoring;
use bridge_service::rest::BridgeRest;
Expand Down Expand Up @@ -62,7 +61,12 @@ async fn main() -> Result<()> {
tracing::info!("Bridge Eth and Movement Inited. Starting bridge loop.");

// Start indexer
let indexer_jh = tokio::spawn(run_indexer_client(bridge_config, eth_stream, mvt_stream, None));
let indexer_jh = tokio::spawn(bridge_indexer_db::run_indexer_client(
bridge_config,
eth_stream,
mvt_stream,
None,
));

tokio::select! {
res = eth_healh_check_jh => {
Expand Down
38 changes: 36 additions & 2 deletions protocol-units/bridge/service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bridge_grpc::{
bridge_server::BridgeServer, health_check_response::ServingStatus, health_server::HealthServer,
};
use bridge_util::chains::check_monitoring_health;
use std::error::Error;
//use bridge_indexer_db::client::Client;
use bridge_service::{
chains::{
Expand Down Expand Up @@ -97,13 +98,42 @@ async fn main() -> Result<()> {
let mvt_healh_check_jh =
tokio::spawn(check_monitoring_health("Mvt", mvt_client_health_tx, mvt_rest_health_rx));

// If needed start indexer to relay actions
let action_sender = if std::env::var("RELAYER_START_INDEXER")
.map(|val| val.trim().to_lowercase() == "true")
.unwrap_or(false)
{
let (action_tx, action_rx) = tokio::sync::mpsc::channel(100);
tokio::spawn({
let eth_stream = eth_stream.child().await;
let mvt_stream = mvt_stream.child().await;
async move {
bridge_indexer_db::run_indexer_client(
bridge_config,
eth_stream,
mvt_stream,
Some(action_rx),
)
.await
}
});
Some(action_tx)
} else {
None
};

// Start relay in L1-> L2 direction
let loop_jh1 = tokio::spawn({
let eth_stream = eth_stream.child().await;
let mvt_stream = mvt_stream.child().await;
let action_sender = action_sender.clone();
async move {
bridge_service::relayer::run_relayer_one_direction(
"Eth->Mvt", eth_stream, mvt_client, mvt_stream,
"Eth->Mvt",
eth_stream,
mvt_client,
mvt_stream,
action_sender.clone(),
)
.await
}
Expand All @@ -112,7 +142,11 @@ async fn main() -> Result<()> {
// Start relay in L2-> L1 direction
let loop_jh2 = tokio::spawn(async move {
bridge_service::relayer::run_relayer_one_direction(
"Mvt->Eth", mvt_stream, eth_client, eth_stream,
"Mvt->Eth",
mvt_stream,
eth_client,
eth_stream,
action_sender,
)
.await
});
Expand Down
60 changes: 49 additions & 11 deletions protocol-units/bridge/service/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bridge_util::{
};
use futures::stream::FuturesUnordered;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::{select, sync::Mutex};
use tokio_stream::StreamExt;

Expand All @@ -21,6 +22,7 @@ pub async fn run_relayer_one_direction<
mut stream_source: impl BridgeContractMonitoring<Address = SOURCE>,
client_target: impl BridgeRelayerContract<TARGET> + 'static,
mut stream_target: impl BridgeContractMonitoring<Address = TARGET>,
action_sender: Option<mpsc::Sender<TransferAction>>,
) -> Result<(), anyhow::Error>
where
Vec<u8>: From<SOURCE>,
Expand All @@ -43,7 +45,15 @@ where
Ok(BridgeContractEvent::Initiated(detail)) => {
let event : TransferEvent<SOURCE> = BridgeContractEvent::Initiated(detail).into();
tracing::info!("Relayer:{direction}, receive Initiated event :{} ", event.contract_event);
process_event(event, &mut state_runtime, client_target.clone(), client_lock.clone(), &mut client_exec_result_futures);
process_event(
event,
&mut state_runtime,
client_target.clone(),
client_lock.clone(),
&mut client_exec_result_futures,
action_sender.as_ref().cloned(),
)
.await;
}
Ok(_) => (), //do nothing for other event.
Err(err) => tracing::error!("Relayer:{direction} event stream return an error:{err}"),
Expand All @@ -54,7 +64,15 @@ where
Ok(BridgeContractEvent::Completed(detail)) => {
let event : TransferEvent<TARGET> = BridgeContractEvent::Completed(detail).into();
tracing::info!("Relayer:{direction}, receive Completed event :{} ", event.contract_event);
process_event(event, &mut state_runtime, client_target.clone(), client_lock.clone(), &mut client_exec_result_futures);
process_event(
event,
&mut state_runtime,
client_target.clone(),
client_lock.clone(),
&mut client_exec_result_futures,
action_sender.as_ref().cloned(),
)
.await;
}
Ok(_) => (), //do nothing for other event.
Err(err) => tracing::error!("Relayer:{direction} event stream return an error:{err}"),
Expand All @@ -68,7 +86,13 @@ where
Ok(Err(err)) => {
// Manage Tx execution error
if let Some(action) = state_runtime.process_action_exec_error(err) {
execute_action(action, &mut state_runtime, client_target.clone(), client_lock.clone(), &mut client_exec_result_futures);
execute_action(
action,
&mut state_runtime,
client_target.clone(),
client_lock.clone(),
&mut client_exec_result_futures,
);
}
}
Err(err)=>{
Expand All @@ -93,7 +117,7 @@ where
}
}

fn process_event<
async fn process_event<
A: std::clone::Clone + std::fmt::Debug,
TARGET: Send + TryFrom<Vec<u8>> + std::clone::Clone + 'static + std::fmt::Debug,
>(
Expand All @@ -104,17 +128,31 @@ fn process_event<
client_exec_result_futures_one: &mut FuturesUnordered<
tokio::task::JoinHandle<Result<(), ActionExecError>>,
>,
action_sender: Option<mpsc::Sender<TransferAction>>,
) where
Vec<u8>: From<A>,
{
match state_runtime.process_event(event) {
Ok(action) => execute_action(
action,
state_runtime,
client_target,
tx_lock,
client_exec_result_futures_one,
),
Ok(action) => {
if let Some(sender) = action_sender {
// Send in its own task so that the main processing task is never blocked if the send blocks.
tokio::spawn({
let action = action.clone();
async move {
if let Err(err) = sender.send(action).await {
tracing::info!("Erreur when sending action to indexer sink: {err}",);
}
}
});
}
execute_action(
action,
state_runtime,
client_target,
tx_lock,
client_exec_result_futures_one,
)
}
Err(err) => tracing::warn!("Received an invalid event: {err}"),
}
}
Expand Down
Loading