Skip to content

Commit

Permalink
Cleaning up cln model, adding better error handling, adding amount to…
Browse files Browse the repository at this point in the history
… fund_node command
  • Loading branch information
bjohnson5 committed Nov 25, 2024
1 parent 9283a4c commit 6e84264
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 61 deletions.
8 changes: 6 additions & 2 deletions blast_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ async fn run_command(blast: &mut blast_core::Blast, cmd: String) -> Vec<String>
output.push(String::from("close_channel source_node channel_id"));
output.push(String::from("connect_peer source_node dest_node"));
output.push(String::from("disconnect_peer source_node dest_node"));
output.push(String::from("fund_node source_node"));
output.push(String::from("fund_node source_node amount_btc"));
}
"save" => {
match blast.save(words.next().unwrap_or("simulation1")).await {
Expand Down Expand Up @@ -599,7 +599,11 @@ async fn run_command(blast: &mut blast_core::Blast, cmd: String) -> Vec<String>
},
"fund_node" => {
let source = String::from(words.next().unwrap_or(""));
match blast.fund_node(source, true).await {
let amount = match words.next().unwrap_or("1.0").parse::<f64>() {
Ok(value) => { value },
Err(_) => { 1.0 }
};
match blast.fund_node(source, amount, true).await {
Ok(_) => {},
Err(e) => {
let msg = format!("Unable to fund node: {}", e);
Expand Down
18 changes: 9 additions & 9 deletions blast_core/src/blast_model_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ impl BlastModelManager {
// Get the current working directory
let mut current_dir = match env::current_dir() {
Ok(d) => d,
Err(_) => {
return Err(String::from("Failed to get the current directory"));
Err(e) => {
return Err(format!("Failed to get the current directory: {:?}", e));
}
};

Expand Down Expand Up @@ -185,12 +185,12 @@ impl BlastModelManager {
// Create a stop request
let request = tonic::Request::new(BlastStopModelRequest {
});

// Execute the stop RPC
let response = match client.stop_model(request).await {
Ok(r) => r,
Err(_) => {
return Err(String::from("RPC stop_model failed"));
Err(e) => {
return Err(format!("RPC stop_model failed: {:?}", e));
}
};

Expand Down Expand Up @@ -277,8 +277,8 @@ impl BlastModelManager {
// Execute the start RPC
let response = match client.start_nodes(request).await {
Ok(r) => r,
Err(_) => {
return Err(String::from("RPC start nodes failed"));
Err(e) => {
return Err(format!("RPC start nodes failed: {:?}", e));
}
};

Expand All @@ -288,8 +288,8 @@ impl BlastModelManager {
let request = tonic::Request::new(BlastSimlnRequest {});
let response = match client.get_sim_ln(request).await {
Ok(r) => r,
Err(_) => {
return Err(String::from("RPC get_sim_ln failed"));
Err(e) => {
return Err(format!("RPC get_sim_ln failed: {:?}", e));
}
};
match std::str::from_utf8(&response.get_ref().simln_data) {
Expand Down
10 changes: 8 additions & 2 deletions blast_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,12 +539,18 @@ impl Blast {
}

/// Send funds to a node on-chain and optionally mines blocks to confirm that payment
pub async fn fund_node(&mut self, node_id: String, confirm: bool) -> Result<String, String> {
pub async fn fund_node(&mut self, node_id: String, amt_btc: f64, confirm: bool) -> Result<String, String> {
match self.blast_model_manager.get_btc_address(node_id).await {
Ok(a) => {
let address = bitcoincore_rpc::bitcoin::Address::from_str(&a).map_err(|e|e.to_string())?
.require_network(bitcoincore_rpc::bitcoin::Network::Regtest).map_err(|e|e.to_string())?;
let txid = self.bitcoin_rpc.as_mut().unwrap().send_to_address(&address, bitcoincore_rpc::bitcoin::Amount::ONE_BTC, None, None, None, None, None, None)
let amt = match bitcoincore_rpc::bitcoin::Amount::from_btc(amt_btc) {
Ok(a) => a,
Err(e) => {
return Err(format!("Error getting funding amount: {}", e));
}
};
let txid = self.bitcoin_rpc.as_mut().unwrap().send_to_address(&address, amt, None, None, None, None, None, None)
.map_err(|e| e.to_string())?;

if confirm {
Expand Down
4 changes: 2 additions & 2 deletions blast_example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,14 @@ async fn new_simulation() {

println!("----------------------------------------------- FUND / CONNECT NODES -----------------------------------------------");

match blast.fund_node(String::from("blast_lnd-0000"), true).await {
match blast.fund_node(String::from("blast_lnd-0000"), 1.0, true).await {
Ok(_) => {},
Err(e) => {
println!("{}", format!("Unable to fund node: {}", e));
}
}

match blast.fund_node(String::from("blast_lnd-0001"), true).await {
match blast.fund_node(String::from("blast_lnd-0001"), 1.0, true).await {
Ok(_) => {},
Err(e) => {
println!("{}", format!("Unable to fund node: {}", e));
Expand Down
102 changes: 74 additions & 28 deletions blast_models/blast_cln/blast_cln/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub const SIM_DIR: &str = ".blast/blast_sims";
// The temporary directory to save runtime cln data
pub const DATA_DIR: &str = ".blast/blast_data/blast_cln";

// The address to connect to this model
pub const RPC_ADDR: &str = "127.0.0.1:5052";

// The data that is stored in the sim-ln sim.json file
#[derive(Serialize, Deserialize, Debug)]
struct SimLnNode {
Expand Down Expand Up @@ -107,7 +110,7 @@ struct BlastClnServer {

// Helper functions for the RPC server
impl BlastClnServer {
// Get an ldk-node "Node" object from an id
/// Get a cln node connection from an id
async fn get_node(&self, id: String) -> Result<NodeClient<Channel>, Status> {
let bcln = self.blast_cln.lock().await;
let node = match bcln.nodes.get(&id) {
Expand All @@ -120,12 +123,31 @@ impl BlastClnServer {
Ok(node.clone())
}

// Get an available port that can be used for listening
fn get_available_port(&self, start: u16, end: u16) -> Option<u16> {
(start..end).find(|port| self.port_is_available(*port))
/// Get the HOME environment variable
fn get_home(&self) -> Result<String, Status> {
match env::var("HOME") {
Ok(h) => {
Ok(h)
},
Err(_) => {
Err(Status::new(Code::NotFound, "HOME environment variable is not set."))
}
}
}

/// Get an available port that can be used for listening
fn get_available_port(&self, start: u16, end: u16) -> Result<u16, Status> {
match (start..end).find(|port| self.port_is_available(*port)) {
Some(p) => {
Ok(p)
},
None => {
Err(Status::new(Code::NotFound, "Could not find an available port."))
}
}
}

// Check if a port is available
/// Check if a port is available
fn port_is_available(&self, port: u16) -> bool {
match TcpListener::bind(("127.0.0.1", port)) {
Ok(_) => true,
Expand All @@ -141,22 +163,22 @@ impl BlastRpc for BlastClnServer {
async fn start_nodes(&self, request: Request<BlastStartRequest>) -> Result<Response<BlastStartResponse>,Status> {
let num_nodes = request.get_ref().num_nodes;
let mut node_list = SimJsonFile{nodes: Vec::new()};
let home = env::var("HOME").expect("HOME environment variable not set");
let home = self.get_home()?;
let data_dir = PathBuf::from(home).join(DATA_DIR).display().to_string();

// Start the requested number of cln nodes
for i in 0..num_nodes {
// Create a node id and alias
// Create a node id, get available ports and set the cert paths
let node_id = format!("{}{:04}", "blast_cln-", i);
let port = self.get_available_port(8000, 9000).unwrap();
let rpcport = self.get_available_port(port+1, 9000).unwrap().to_string();
let port = self.get_available_port(8000, 9000)?;
let rpcport = self.get_available_port(port+1, 9000)?.to_string();
let cln_dir = format!("{}/{}", data_dir, node_id);
let addr = format!("{}:{}", "https://localhost", rpcport.to_string());
let ca_path = format!("{}{}", cln_dir, "/regtest/ca.pem");
let client_path = format!("{}{}", cln_dir, "/regtest/client.pem");
let client_key_path = format!("{}{}", cln_dir, "/regtest/client-key.pem");

// Start the nodes
// Start a node
let mut command = Command::new("bash");
let mut script_file = env!("CARGO_MANIFEST_DIR").to_owned();
script_file.push_str("/start_cln.sh");
Expand All @@ -174,11 +196,20 @@ impl BlastRpc for BlastClnServer {
thread::sleep(Duration::from_secs(2));

// Load the certificates
let ca_cert = fs::read(ca_path.clone()).unwrap();
let ca_cert = match fs::read(ca_path.clone()) {
Ok(c) => { c }
Err(_) => return Err(Status::new(Code::Unknown, "Could not read the ca path.")),
};
let ca_certificate = Certificate::from_pem(ca_cert);
let client_cert = fs::read(client_path.clone()).unwrap();
let client_key_cert = fs::read(client_key_path.clone()).unwrap();
let id=tonic::transport::Identity::from_pem(client_cert, client_key_cert);
let client_cert = match fs::read(client_path.clone()) {
Ok(c) => { c }
Err(_) => return Err(Status::new(Code::Unknown, "Could not read the client path.")),
};
let client_key_cert = match fs::read(client_key_path.clone()) {
Ok(c) => { c }
Err(_) => return Err(Status::new(Code::Unknown, "Could not read the client key.")),
};
let id = tonic::transport::Identity::from_pem(client_cert, client_key_cert);

// Configure TLS settings with the CA certificate
let tls_config = ClientTlsConfig::new()
Expand All @@ -187,13 +218,19 @@ impl BlastRpc for BlastClnServer {
.ca_certificate(ca_certificate);

// Create the URI from the generated address
let uri: Uri = addr.parse().expect("Invalid URI format");
let uri: Uri = match addr.parse() {
Ok(u) => { u }
Err(_) => return Err(Status::new(Code::Unknown, "Invalid uri.")),
};

// Connect to the gRPC server using SSL/TLS
let channel = Channel::builder(uri)
let channel = match Channel::builder(uri)
.tls_config(tls_config).unwrap()
.connect()
.await.unwrap();
.await {
Ok(c) => { c }
Err(_) => return Err(Status::new(Code::Unknown, "Could not connect to server.")),
};

// Create a new client from the connected channel
let client = NodeClient::new(channel);
Expand All @@ -203,7 +240,6 @@ impl BlastRpc for BlastClnServer {
bcln.nodes.insert(node_id.clone(), client);
let n = SimLnNode{id: node_id.clone(), address: addr.clone(), ca_cert: ca_path, client_cert: client_path, client_key: client_key_path};
node_list.nodes.push(n);

bcln.addresses.insert(node_id.clone(), format!("localhost:{}", &port.to_string()));
}

Expand Down Expand Up @@ -340,7 +376,10 @@ impl BlastRpc for BlastClnServer {
let req = &request.get_ref();
let node_id = &req.node;
let peer = &req.peer_pub_key;
let peer_pub = hex::decode(peer.to_string()).unwrap();
let peer_pub = match hex::decode(peer.to_string()) {
Ok(p) => { p }
Err(_) => return Err(Status::new(Code::Unknown, "Could not decode the peer pub key.")),
};
let id = req.channel_id;
let amount = req.amount;
let push = Amount { msat: req.push_amout as u64 };
Expand Down Expand Up @@ -472,7 +511,11 @@ impl BlastRpc for BlastClnServer {
let node_id = &request.get_ref().node;
let mut node = self.get_node(node_id.to_string()).await?;

match node.disconnect(DisconnectRequest{id: hex::decode(&req.peer_pub_key).unwrap(), force: None}).await {
let id = match hex::decode(&req.peer_pub_key) {
Ok(i) => { i }
Err(_) => return Err(Status::new(Code::Unknown, "Could not decode the peer pub key.")),
};
match node.disconnect(DisconnectRequest{id: id, force: None}).await {
Ok(_) => {
let connect_response = BlastDisconnectResponse { success: true };
let response = Response::new(connect_response);
Expand All @@ -498,7 +541,11 @@ impl BlastRpc for BlastClnServer {
}
};

let addr_response = BlastBtcAddressResponse { address: cln_resp.p2tr.unwrap() };
let addr = match cln_resp.p2tr {
Some(a) => { a },
None => return Err(Status::new(Code::Unknown, "Could not get btc address.")),
};
let addr_response = BlastBtcAddressResponse { address: addr };
let response = Response::new(addr_response);
Ok(response)
}
Expand All @@ -522,7 +569,7 @@ impl BlastRpc for BlastClnServer {

/// Shutdown the nodes
async fn stop_model(&self, _request: Request<BlastStopModelRequest>) -> Result<Response<BlastStopModelResponse>, Status> {
let home = env::var("HOME").expect("HOME environment variable not set");
let home = self.get_home()?;
let data_dir = PathBuf::from(home).join(DATA_DIR).display().to_string();

let mut bcln = self.blast_cln.lock().await;
Expand Down Expand Up @@ -555,7 +602,7 @@ impl BlastRpc for BlastClnServer {
async fn load(&self, request: Request<BlastLoadRequest>) -> Result<Response<BlastLoadResponse>, Status> {
let req = &request.get_ref();
let sim_name = &req.sim;
let home_dir = env::var("HOME").expect("HOME environment variable not set");
let home_dir = self.get_home()?;
let sim_dir = String::from(SIM_DIR);
let sim_model_dir = format!("{}/{}/{}/{}/", home_dir, sim_dir, sim_name, MODEL_NAME);

Expand All @@ -568,13 +615,12 @@ impl BlastRpc for BlastClnServer {
let decompressor = GzDecoder::new(tar_gz);
let mut archive = Archive::new(decompressor);
// Extract the archive into the specified directory
let home = env::var("HOME").expect("HOME environment variable not set");
let home = self.get_home()?;
let data_dir = PathBuf::from(home).join(DATA_DIR).display().to_string();
let data_path = Path::new(&data_dir);
fs::create_dir_all(data_path).unwrap();
archive.unpack(data_path).unwrap();


// Count the number of nodes to start and remove the old symlink
let mut count = 0;
for entry in fs::read_dir(data_path).unwrap() {
Expand Down Expand Up @@ -642,7 +688,7 @@ impl BlastRpc for BlastClnServer {
async fn save(&self, request: Request<BlastSaveRequest>) -> Result<Response<BlastSaveResponse>, Status> {
let req = &request.get_ref();
let sim_name = &req.sim;
let home_dir = env::var("HOME").expect("HOME environment variable not set");
let home_dir = self.get_home()?;
let sim_dir = String::from(SIM_DIR);
let sim_model_dir = format!("{}/{}/{}/{}/", home_dir, sim_dir, sim_name, MODEL_NAME);

Expand All @@ -651,7 +697,7 @@ impl BlastRpc for BlastClnServer {
let json_path = Path::new(&sim_model_dir).join(format!("{}_channels.json", sim_name));

// Create the .tar.gz archive
let home = env::var("HOME").expect("HOME environment variable not set");
let home = self.get_home()?;
let data_dir = PathBuf::from(home).join(DATA_DIR).display().to_string();
if let Some(parent) = archive_path.parent() {
fs::create_dir_all(parent).unwrap();
Expand Down Expand Up @@ -684,7 +730,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
File::create(folder_path).unwrap(),
);

let addr = "127.0.0.1:5052".parse()?;
let addr = RPC_ADDR.parse()?;
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let mut bcln = BlastCln::new();
bcln.shutdown_sender = Some(shutdown_sender);
Expand Down
Loading

0 comments on commit 6e84264

Please sign in to comment.