Skip to content

Commit

Permalink
Prefetch all graph edges for owned channels
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jan 6, 2025
1 parent ce3247d commit f2346fb
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 39 deletions.
81 changes: 48 additions & 33 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl From<(u64, ChannelAnnouncement)> for ChannelInfo {
}
}

#[derive(Debug, Clone)]
pub struct DirectedGraphEdge {
pub channel_outpoint: OutPoint,
// UDT script
Expand Down Expand Up @@ -225,6 +226,9 @@ impl From<&ChannelUpdate> for ChannelUpdateInfo {
pub struct NetworkGraph<S> {
// The pubkey of the node that is running this instance of the network graph.
source: Pubkey,
// Some graph edges that are known locally, but not in the network graph.
// Those edges are normally the channels that are owned by the node.
addtional_known_edges: HashMap<Pubkey, HashMap<OutPoint, DirectedGraphEdge>>,
// All the channels in the network.
channels: HashMap<OutPoint, ChannelInfo>,
// All the nodes in the network.
Expand Down Expand Up @@ -270,6 +274,7 @@ where
pub fn new(store: S, source: Pubkey) -> Self {
let mut network_graph = Self {
source,
addtional_known_edges: Default::default(),
channels: HashMap::new(),
nodes: HashMap::new(),
latest_cursor: Cursor::default(),
Expand Down Expand Up @@ -568,43 +573,17 @@ where
}

pub fn get_node_inbounds(&self, node_id: Pubkey) -> impl Iterator<Item = DirectedGraphEdge> {
let mut channels = vec![];
let mut direct_channels = HashSet::new();
if node_id == self.source {
for (_peer_id, channel_id, _state) in self.store.get_active_channel_states(None) {
match self.store.get_channel_actor_state(&channel_id) {
Some(channel_actor_state) => {
assert_eq!(channel_actor_state.local_pubkey, node_id);
let channel_outpoint =
channel_actor_state.must_get_funding_transaction_outpoint();
direct_channels.insert(channel_outpoint.clone());

if let Some(ref remote_tlc_info) = channel_actor_state.remote_tlc_info {
channels.push(DirectedGraphEdge {
channel_outpoint,
udt_type_script: channel_actor_state
.funding_udt_type_script
.clone(),
from: node_id,
to: channel_actor_state.remote_pubkey,
capacity: channel_actor_state.get_liquid_capacity(),
balance: Some(channel_actor_state.to_local_amount),
tlc_expiry_delta: remote_tlc_info.tlc_expiry_delta,
tlc_minimum_value: remote_tlc_info.tlc_min_value,
fee_rate: remote_tlc_info.tlc_fee_proportional_millionths,
})
}
}
// It is possible that after we obtained the list of channels, the channel is deleted.
None => {}
}
}
}
let tmp_known_edges = HashMap::new();
let known_edges = self
.addtional_known_edges
.get(&node_id)
.unwrap_or(&tmp_known_edges);
let mut channels = known_edges.values().cloned().collect::<Vec<_>>();

for channel in self.channels.values() {
// We always assume that the information obtained from store is the latest.
// So we would not bother to check channel info from the graph.
if direct_channels.contains(&channel.channel_outpoint) {
if known_edges.contains_key(&channel.channel_outpoint) {
continue;
}

Expand Down Expand Up @@ -719,6 +698,42 @@ where
self.source = source;
}

pub fn load_owned_channel_info(&mut self) {
for (_peer_id, channel_id, _state) in self.store.get_active_channel_states(None) {
match self.store.get_channel_actor_state(&channel_id) {
Some(channel_actor_state) => {
assert_eq!(channel_actor_state.local_pubkey, self.source);
let channel_outpoint =
channel_actor_state.must_get_funding_transaction_outpoint();

if let Some(ref remote_tlc_info) = channel_actor_state.remote_tlc_info {
self.addtional_known_edges
.entry(channel_actor_state.remote_pubkey)
.or_default()
.insert(
channel_outpoint.clone(),
DirectedGraphEdge {
channel_outpoint,
udt_type_script: channel_actor_state
.funding_udt_type_script
.clone(),
from: self.source,
to: channel_actor_state.remote_pubkey,
capacity: channel_actor_state.get_liquid_capacity(),
balance: Some(channel_actor_state.to_local_amount),
tlc_expiry_delta: remote_tlc_info.tlc_expiry_delta,
tlc_minimum_value: remote_tlc_info.tlc_min_value,
fee_rate: remote_tlc_info.tlc_fee_proportional_millionths,
},
);
}
}
// It is possible that after we obtained the list of channels, the channel is deleted.
None => {}
}
}
}

/// Returns a list of `PaymentHopData` for all nodes in the route,
/// including the origin and the target node.
pub fn build_route(
Expand Down
12 changes: 6 additions & 6 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1531,12 +1531,12 @@ where
payment_session: &mut PaymentSession,
payment_data: &SendPaymentData,
) -> Result<Vec<PaymentHopData>, Error> {
match self
.network_graph
.read()
.await
.build_route(payment_data.clone())
{
// Load owned channel info before building route, so that we use private channels and also the
// exact balance of the channels.
let mut rwgraph = self.network_graph.write().await;
rwgraph.load_owned_channel_info();
let graph = tokio::sync::RwLockWriteGuard::downgrade(rwgraph);
match graph.build_route(payment_data.clone()) {
Err(e) => {
let error = format!("Failed to build route, {}", e);
self.set_payment_fail_with_error(payment_session, &error);
Expand Down

0 comments on commit f2346fb

Please sign in to comment.