Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove electrum tip query on start #708

Merged
merged 7 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 26 additions & 29 deletions lib/core/src/chain/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Mutex, time::Duration};
use std::{sync::OnceLock, time::Duration};

use anyhow::{anyhow, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -75,35 +75,35 @@ pub trait BitcoinChainService: Send + Sync {
}

pub(crate) struct HybridBitcoinChainService {
client: Client,
tip: Mutex<HeaderNotification>,
client: OnceLock<Client>,
config: Config,
}
impl HybridBitcoinChainService {
pub fn new(config: Config) -> Result<Self, Error> {
Self::with_options(config, ElectrumOptions { timeout: Some(3) })
}

/// Creates an Electrum client specifying non default options like timeout
pub fn with_options(config: Config, options: ElectrumOptions) -> Result<Self, Error> {
let electrum_url = ElectrumUrl::new(&config.bitcoin_electrum_url, true, true)?;
let client = electrum_url.build_client(&options)?;
let header = client.block_headers_subscribe_raw()?;
let tip: HeaderNotification = header.try_into()?;

Ok(Self {
client,
tip: Mutex::new(tip),
config,
client: OnceLock::new(),
})
}

fn get_client(&self) -> Result<&Client> {
if let Some(c) = self.client.get() {
return Ok(c);
}
let electrum_url = ElectrumUrl::new(&self.config.bitcoin_electrum_url, true, true)?;
let client = electrum_url.build_client(&ElectrumOptions { timeout: Some(3) })?;

let client = self.client.get_or_init(|| client);
Ok(client)
}
}

#[async_trait]
impl BitcoinChainService for HybridBitcoinChainService {
fn tip(&self) -> Result<HeaderNotification> {
let client = self.get_client()?;
let mut maybe_popped_header = None;
while let Some(header) = self.client.block_headers_pop_raw()? {
while let Some(header) = client.block_headers_pop_raw()? {
maybe_popped_header = Some(header)
}

Expand All @@ -114,30 +114,27 @@ impl BitcoinChainService for HybridBitcoinChainService {
// It might be that the client has reconnected and subscriptions don't persist
// across connections. Calling `client.ping()` won't help here because the
// successful retry will prevent us knowing about the reconnect.
if let Ok(header) = self.client.block_headers_subscribe_raw() {
if let Ok(header) = client.block_headers_subscribe_raw() {
Some(header.try_into()?)
} else {
None
}
}
};

let mut tip = self.tip.lock().unwrap();
if let Some(new_tip) = new_tip {
*tip = new_tip;
}

Ok(tip.clone())
new_tip.ok_or_else(|| anyhow!("Failed to get tip"))
}

fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
let txid = self.client.transaction_broadcast_raw(&serialize(&tx))?;
let txid = self
.get_client()?
.transaction_broadcast_raw(&serialize(&tx))?;
Ok(Txid::from_raw_hash(txid.to_raw_hash()))
}

fn get_transactions(&self, txids: &[Txid]) -> Result<Vec<Transaction>> {
let mut result = vec![];
for tx in self.client.batch_transaction_get_raw(txids)? {
for tx in self.get_client()?.batch_transaction_get_raw(txids)? {
let tx: Transaction = deserialize(&tx)?;
result.push(tx);
}
Expand All @@ -146,7 +143,7 @@ impl BitcoinChainService for HybridBitcoinChainService {

fn get_script_history(&self, script: &Script) -> Result<Vec<History>> {
Ok(self
.client
.get_client()?
.script_get_history(script)?
.into_iter()
.map(Into::into)
Expand All @@ -155,7 +152,7 @@ impl BitcoinChainService for HybridBitcoinChainService {

fn get_scripts_history(&self, scripts: &[&Script]) -> Result<Vec<Vec<History>>> {
Ok(self
.client
.get_client()?
.batch_script_get_history(scripts)?
.into_iter()
.map(|v| v.into_iter().map(Into::into).collect())
Expand Down Expand Up @@ -234,11 +231,11 @@ impl BitcoinChainService for HybridBitcoinChainService {
}

fn script_get_balance(&self, script: &Script) -> Result<GetBalanceRes> {
Ok(self.client.script_get_balance(script)?)
Ok(self.get_client()?.script_get_balance(script)?)
}

fn scripts_get_balance(&self, scripts: &[&Script]) -> Result<Vec<GetBalanceRes>> {
Ok(self.client.batch_script_get_balance(scripts)?)
Ok(self.get_client()?.batch_script_get_balance(scripts)?)
}

async fn script_get_balance_with_retry(
Expand Down
102 changes: 81 additions & 21 deletions lib/core/src/chain/liquid.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::sync::Mutex;
use std::sync::OnceLock;
use std::time::Duration;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use boltz_client::ToHex;
use electrum_client::{Client, ElectrumApi};
use elements::encode::serialize as elements_serialize;
use log::info;
use lwk_wollet::clients::blocking::BlockchainBackend;
use lwk_wollet::elements::hex::FromHex;
use lwk_wollet::ElectrumOptions;
use lwk_wollet::{bitcoin, elements, ElectrumOptions};
use lwk_wollet::{
elements::{Address, OutPoint, Script, Transaction, Txid},
hashes::{sha256, Hash},
ElectrumClient, ElectrumUrl, History,
ElectrumUrl, History,
};

use crate::prelude::Utxo;
Expand Down Expand Up @@ -60,52 +61,111 @@ pub trait LiquidChainService: Send + Sync {
}

pub(crate) struct HybridLiquidChainService {
electrum_client: ElectrumClient,
tip_client: Mutex<ElectrumClient>,
client: OnceLock<Client>,
config: Config,
}

impl HybridLiquidChainService {
pub(crate) fn new(config: Config) -> Result<Self> {
let electrum_url = ElectrumUrl::new(&config.liquid_electrum_url, true, true)?;
let electrum_client =
ElectrumClient::with_options(&electrum_url, ElectrumOptions { timeout: Some(3) })?;
let tip_client =
ElectrumClient::with_options(&electrum_url, ElectrumOptions { timeout: Some(3) })?;
Ok(Self {
electrum_client,
tip_client: Mutex::new(tip_client),
config,
client: OnceLock::new(),
})
}

fn get_client(&self) -> Result<&Client> {
if let Some(c) = self.client.get() {
return Ok(c);
}
let electrum_url = ElectrumUrl::new(&self.config.liquid_electrum_url, true, true)?;
let client = electrum_url.build_client(&ElectrumOptions { timeout: Some(3) })?;

let client = self.client.get_or_init(|| client);
Ok(client)
}
}

#[async_trait]
impl LiquidChainService for HybridLiquidChainService {
async fn tip(&self) -> Result<u32> {
Ok(self.tip_client.lock().unwrap().tip()?.height)
let client = self.get_client()?;
let mut maybe_popped_header = None;
while let Some(header) = client.block_headers_pop_raw()? {
maybe_popped_header = Some(header)
}

let new_tip: Option<u32> = match maybe_popped_header {
Some(popped_header) => Some(popped_header.height.try_into()?),
None => {
// https://github.com/bitcoindevkit/rust-electrum-client/issues/124
// It might be that the client has reconnected and subscriptions don't persist
// across connections. Calling `client.ping()` won't help here because the
// successful retry will prevent us knowing about the reconnect.
if let Ok(header) = client.block_headers_subscribe_raw() {
Some(header.height.try_into()?)
} else {
None
}
}
};

new_tip.ok_or_else(|| anyhow!("Failed to get tip"))
}

async fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
Ok(self.electrum_client.broadcast(tx)?)
let txid = self
.get_client()?
.transaction_broadcast_raw(&elements_serialize(tx))?;
Ok(Txid::from_raw_hash(txid.to_raw_hash()))
}

async fn get_transaction_hex(&self, txid: &Txid) -> Result<Option<Transaction>> {
Ok(self.get_transactions(&[*txid]).await?.first().cloned())
}

async fn get_transactions(&self, txids: &[Txid]) -> Result<Vec<Transaction>> {
Ok(self.electrum_client.get_transactions(txids)?)
let txids: Vec<bitcoin::Txid> = txids
.iter()
.map(|t| bitcoin::Txid::from_raw_hash(t.to_raw_hash()))
.collect();

let mut result = vec![];
for tx in self.get_client()?.batch_transaction_get_raw(&txids)? {
let tx: Transaction = elements::encode::deserialize(&tx)?;
result.push(tx);
}
Ok(result)
}

async fn get_script_history(&self, script: &Script) -> Result<Vec<History>> {
let mut history_vec = self.electrum_client.get_scripts_history(&[script])?;
let scripts = &[script];
let scripts: Vec<&bitcoin::Script> = scripts
.iter()
.map(|t| bitcoin::Script::from_bytes(t.as_bytes()))
.collect();

let mut history_vec: Vec<Vec<History>> = self
.get_client()?
.batch_script_get_history(&scripts)?
.into_iter()
.map(|e| e.into_iter().map(Into::into).collect())
.collect();
let h = history_vec.pop();
Ok(h.unwrap_or(vec![]))
Ok(h.unwrap_or_default())
}

async fn get_scripts_history(&self, scripts: &[&Script]) -> Result<Vec<Vec<History>>> {
self.electrum_client
.get_scripts_history(scripts)
.map_err(Into::into)
let scripts: Vec<&bitcoin::Script> = scripts
.iter()
.map(|t| bitcoin::Script::from_bytes(t.as_bytes()))
.collect();

Ok(self
.get_client()?
.batch_script_get_history(&scripts)?
.into_iter()
.map(|e| e.into_iter().map(Into::into).collect())
.collect())
}

async fn get_script_history_with_retry(
Expand Down
Loading