Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove electrum tip query on start #708

Merged
merged 7 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 27 additions & 29 deletions lib/core/src/chain/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Mutex, time::Duration};
use std::{sync::OnceLock, time::Duration};

use anyhow::{anyhow, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -75,35 +75,35 @@ pub trait BitcoinChainService: Send + Sync {
}

pub(crate) struct HybridBitcoinChainService {
client: Client,
tip: Mutex<HeaderNotification>,
client: OnceLock<Client>,
config: Config,
}
impl HybridBitcoinChainService {
pub fn new(config: Config) -> Result<Self, Error> {
Self::with_options(config, ElectrumOptions { timeout: Some(3) })
}

/// Creates an Electrum client specifying non default options like timeout
pub fn with_options(config: Config, options: ElectrumOptions) -> Result<Self, Error> {
let electrum_url = ElectrumUrl::new(&config.bitcoin_electrum_url, true, true)?;
let client = electrum_url.build_client(&options)?;
let header = client.block_headers_subscribe_raw()?;
let tip: HeaderNotification = header.try_into()?;

Ok(Self {
client,
tip: Mutex::new(tip),
config,
client: OnceLock::new(),
})
}

fn get_client(&self) -> Result<&Client> {
if let Some(c) = self.client.get() {
return Ok(c);
}
let electrum_url = ElectrumUrl::new(&self.config.bitcoin_electrum_url, true, true)?;
let client = electrum_url.build_client(&ElectrumOptions { timeout: Some(3) })?;

let client = self.client.get_or_init(|| client);
Ok(client)
}
}

#[async_trait]
impl BitcoinChainService for HybridBitcoinChainService {
fn tip(&self) -> Result<HeaderNotification> {
let client = self.get_client()?;
let mut maybe_popped_header = None;
while let Some(header) = self.client.block_headers_pop_raw()? {
while let Some(header) = client.block_headers_pop_raw()? {
maybe_popped_header = Some(header)
}

Expand All @@ -114,30 +114,28 @@ impl BitcoinChainService for HybridBitcoinChainService {
// It might be that the client has reconnected and subscriptions don't persist
// across connections. Calling `client.ping()` won't help here because the
// successful retry will prevent us knowing about the reconnect.
if let Ok(header) = self.client.block_headers_subscribe_raw() {
if let Ok(header) = client.block_headers_subscribe_raw() {
Some(header.try_into()?)
} else {
None
}
}
};

let mut tip = self.tip.lock().unwrap();
if let Some(new_tip) = new_tip {
*tip = new_tip;
}

Ok(tip.clone())
let tip = new_tip.ok_or_else(|| anyhow!("Failed to get tip"))?;
Ok(tip)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the new_tip name no longer makes sense. Maybe rename to maybe_tip? We can even get rid of it and return the tip directly within the match.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
let txid = self.client.transaction_broadcast_raw(&serialize(&tx))?;
let txid = self
.get_client()?
.transaction_broadcast_raw(&serialize(&tx))?;
Ok(Txid::from_raw_hash(txid.to_raw_hash()))
}

fn get_transactions(&self, txids: &[Txid]) -> Result<Vec<Transaction>> {
let mut result = vec![];
for tx in self.client.batch_transaction_get_raw(txids)? {
for tx in self.get_client()?.batch_transaction_get_raw(txids)? {
let tx: Transaction = deserialize(&tx)?;
result.push(tx);
}
Expand All @@ -146,7 +144,7 @@ impl BitcoinChainService for HybridBitcoinChainService {

fn get_script_history(&self, script: &Script) -> Result<Vec<History>> {
Ok(self
.client
.get_client()?
.script_get_history(script)?
.into_iter()
.map(Into::into)
Expand All @@ -155,7 +153,7 @@ impl BitcoinChainService for HybridBitcoinChainService {

fn get_scripts_history(&self, scripts: &[&Script]) -> Result<Vec<Vec<History>>> {
Ok(self
.client
.get_client()?
.batch_script_get_history(scripts)?
.into_iter()
.map(|v| v.into_iter().map(Into::into).collect())
Expand Down Expand Up @@ -234,11 +232,11 @@ impl BitcoinChainService for HybridBitcoinChainService {
}

fn script_get_balance(&self, script: &Script) -> Result<GetBalanceRes> {
Ok(self.client.script_get_balance(script)?)
Ok(self.get_client()?.script_get_balance(script)?)
}

fn scripts_get_balance(&self, scripts: &[&Script]) -> Result<Vec<GetBalanceRes>> {
Ok(self.client.batch_script_get_balance(scripts)?)
Ok(self.get_client()?.batch_script_get_balance(scripts)?)
}

async fn script_get_balance_with_retry(
Expand Down
103 changes: 82 additions & 21 deletions lib/core/src/chain/liquid.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::sync::Mutex;
use std::sync::OnceLock;
use std::time::Duration;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use boltz_client::ToHex;
use electrum_client::{Client, ElectrumApi};
use elements::encode::serialize as elements_serialize;
use log::info;
use lwk_wollet::clients::blocking::BlockchainBackend;
use lwk_wollet::elements::hex::FromHex;
use lwk_wollet::ElectrumOptions;
use lwk_wollet::{bitcoin, elements, ElectrumOptions};
use lwk_wollet::{
elements::{Address, OutPoint, Script, Transaction, Txid},
hashes::{sha256, Hash},
ElectrumClient, ElectrumUrl, History,
ElectrumUrl, History,
};

use crate::prelude::Utxo;
Expand Down Expand Up @@ -60,52 +61,112 @@ pub trait LiquidChainService: Send + Sync {
}

pub(crate) struct HybridLiquidChainService {
electrum_client: ElectrumClient,
tip_client: Mutex<ElectrumClient>,
client: OnceLock<Client>,
config: Config,
}

impl HybridLiquidChainService {
pub(crate) fn new(config: Config) -> Result<Self> {
let electrum_url = ElectrumUrl::new(&config.liquid_electrum_url, true, true)?;
let electrum_client =
ElectrumClient::with_options(&electrum_url, ElectrumOptions { timeout: Some(3) })?;
let tip_client =
ElectrumClient::with_options(&electrum_url, ElectrumOptions { timeout: Some(3) })?;
Ok(Self {
electrum_client,
tip_client: Mutex::new(tip_client),
config,
client: OnceLock::new(),
})
}

fn get_client(&self) -> Result<&Client> {
if let Some(c) = self.client.get() {
return Ok(c);
}
let electrum_url = ElectrumUrl::new(&self.config.liquid_electrum_url, true, true)?;
let client = electrum_url.build_client(&ElectrumOptions { timeout: Some(3) })?;

let client = self.client.get_or_init(|| client);
Ok(client)
}
}

#[async_trait]
impl LiquidChainService for HybridLiquidChainService {
async fn tip(&self) -> Result<u32> {
Ok(self.tip_client.lock().unwrap().tip()?.height)
let client = self.get_client()?;
let mut maybe_popped_header = None;
while let Some(header) = client.block_headers_pop_raw()? {
maybe_popped_header = Some(header)
}

let new_tip: Option<u32> = match maybe_popped_header {
Some(popped_header) => Some(popped_header.height.try_into()?),
None => {
// https://github.com/bitcoindevkit/rusprintln!("Fetching block headers");t-electrum-client/issues/124
// It might be that the client has reconnected and subscriptions don't persist
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// https://github.com/bitcoindevkit/rusprintln!("Fetching block headers");t-electrum-client/issues/124
// https://github.com/bitcoindevkit/rust-electrum-client/issues/124

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... how did it got there.
Done

// across connections. Calling `client.ping()` won't help here because the
// successful retry will prevent us knowing about the reconnect.
if let Ok(header) = client.block_headers_subscribe_raw() {
Some(header.height.try_into()?)
} else {
None
}
}
};

let tip: u32 = new_tip.ok_or_else(|| anyhow!("Failed to get tip"))?;
Ok(tip)
}

async fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
Ok(self.electrum_client.broadcast(tx)?)
let txid = self
.get_client()?
.transaction_broadcast_raw(&elements_serialize(tx))?;
Ok(Txid::from_raw_hash(txid.to_raw_hash()))
}

async fn get_transaction_hex(&self, txid: &Txid) -> Result<Option<Transaction>> {
Ok(self.get_transactions(&[*txid]).await?.first().cloned())
}

async fn get_transactions(&self, txids: &[Txid]) -> Result<Vec<Transaction>> {
Ok(self.electrum_client.get_transactions(txids)?)
let txids: Vec<bitcoin::Txid> = txids
.iter()
.map(|t| bitcoin::Txid::from_raw_hash(t.to_raw_hash()))
.collect();

let mut result = vec![];
for tx in self.get_client()?.batch_transaction_get_raw(&txids)? {
let tx: Transaction = elements::encode::deserialize(&tx)?;
result.push(tx);
}
Ok(result)
}

async fn get_script_history(&self, script: &Script) -> Result<Vec<History>> {
let mut history_vec = self.electrum_client.get_scripts_history(&[script])?;
let scripts = &[script];
let scripts: Vec<&bitcoin::Script> = scripts
.iter()
.map(|t| bitcoin::Script::from_bytes(t.as_bytes()))
.collect();

let mut history_vec: Vec<Vec<History>> = self
.get_client()?
.batch_script_get_history(&scripts)?
.into_iter()
.map(|e| e.into_iter().map(Into::into).collect())
.collect();
let h = history_vec.pop();
Ok(h.unwrap_or(vec![]))
Ok(h.unwrap_or_default())
}

async fn get_scripts_history(&self, scripts: &[&Script]) -> Result<Vec<Vec<History>>> {
self.electrum_client
.get_scripts_history(scripts)
.map_err(Into::into)
let scripts: Vec<&bitcoin::Script> = scripts
.iter()
.map(|t| bitcoin::Script::from_bytes(t.as_bytes()))
.collect();

Ok(self
.get_client()?
.batch_script_get_history(&scripts)?
.into_iter()
.map(|e| e.into_iter().map(Into::into).collect())
.collect())
}

async fn get_script_history_with_retry(
Expand Down
Loading