Skip to content

Commit

Permalink
Merge pull request #2668 from maqi/include_pointer_into_data_with_chu…
Browse files Browse the repository at this point in the history
…rn_test

test(CI): include Pointer into data_with_churn test
  • Loading branch information
maqi authored Jan 26, 2025
2 parents 84ab480 + 006f581 commit ecf105e
Showing 1 changed file with 141 additions and 3 deletions.
144 changes: 141 additions & 3 deletions ant-node/tests/data_with_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ use crate::common::{
NodeRestart,
};
use ant_logging::LogBuilder;
use ant_protocol::{storage::ChunkAddress, NetworkAddress};
use ant_protocol::{
storage::{ChunkAddress, PointerTarget},
NetworkAddress,
};
use autonomi::{Client, Wallet};
use bls::SecretKey;
use common::client::transfer_to_new_wallet;
use eyre::{bail, ErrReport, Result};
use rand::Rng;
use self_encryption::MAX_CHUNK_SIZE;
use std::{
collections::{BTreeMap, VecDeque},
collections::{BTreeMap, HashMap, VecDeque},
fmt,
fs::create_dir_all,
sync::{Arc, LazyLock},
Expand All @@ -30,12 +34,14 @@ use tempfile::tempdir;
use test_utils::gen_random_data;
use tokio::{sync::RwLock, task::JoinHandle, time::sleep};
use tracing::{debug, error, info, trace, warn};
use xor_name::XorName;

const TOKENS_TO_TRANSFER: usize = 10000000;

const EXTRA_CHURN_COUNT: u32 = 5;
const CHURN_CYCLES: u32 = 2;
const CHUNK_CREATION_RATIO_TO_CHURN: u32 = 15;
const POINTER_CREATION_RATIO_TO_CHURN: u32 = 15;

static DATA_SIZE: LazyLock<usize> = LazyLock::new(|| *MAX_CHUNK_SIZE / 3);

Expand Down Expand Up @@ -96,7 +102,7 @@ async fn data_availability_during_churn() -> Result<()> {
// Create a cross thread usize for tracking churned nodes
let churn_count = Arc::new(RwLock::new(0_usize));

// Storing and querying only Chunks during churn.
// Allow to disable non-chunk data_types creation/checks.
// Default to be not carry out chunks only during churn.
let chunks_only = std::env::var("CHUNKS_ONLY").is_ok();

Expand Down Expand Up @@ -131,6 +137,21 @@ async fn data_availability_during_churn() -> Result<()> {
churn_period,
);

// Spawn a task to create Pointers at random locations,
// at a higher frequency than the churning events
let create_pointer_handle = if !chunks_only {
let pointer_wallet = transfer_to_new_wallet(&main_wallet, TOKENS_TO_TRANSFER).await?;
let create_pointer_handle = create_pointers_task(
client.clone(),
pointer_wallet,
Arc::clone(&content),
churn_period,
);
Some(create_pointer_handle)
} else {
None
};

// Spawn a task to churn nodes
churn_nodes_task(Arc::clone(&churn_count), test_duration, churn_period);

Expand Down Expand Up @@ -167,6 +188,11 @@ async fn data_availability_during_churn() -> Result<()> {
if store_chunks_handle.is_finished() {
bail!("Store chunks task has finished before the test duration. Probably due to an error.");
}
if let Some(handle) = &create_pointer_handle {
if handle.is_finished() {
bail!("Create pointers task has finished before the test duration. Probably due to an error.");
}
}

let failed = failures.read().await;
if start_time.elapsed().as_secs() % 10 == 0 {
Expand Down Expand Up @@ -242,6 +268,114 @@ async fn data_availability_during_churn() -> Result<()> {
Ok(())
}

// Spawns a task which periodically creates Pointers at random locations.
fn create_pointers_task(
client: Client,
wallet: Wallet,
content: ContentList,
churn_period: Duration,
) -> JoinHandle<Result<()>> {
let handle: JoinHandle<Result<()>> = tokio::spawn(async move {
// Map of the ownership, allowing the later on update can be undertaken.
let mut owners: HashMap<NetworkAddress, (SecretKey, PointerTarget)> = HashMap::new();

// Create Pointers at a higher frequency than the churning events
let delay = churn_period / POINTER_CREATION_RATIO_TO_CHURN;

loop {
sleep(delay).await;

#[allow(unused_assignments)]
let mut pointer_addr = None;

// 50% chance to carry out update instead of creation.
let is_update: bool = if owners.is_empty() {
false
} else {
rand::random()
};

let mut retries = 1;

if is_update {
let index = rand::thread_rng().gen_range(0..owners.len());
let iterator: Vec<_> = owners.iter().collect();
let (addr, (owner, old_target)) = iterator[index];

let new_target =
PointerTarget::ChunkAddress(ChunkAddress::new(XorName(rand::random())));
loop {
match client.pointer_update(owner, new_target.clone()).await {
Ok(_) => {
println!("Updated Pointer at {addr:?} with {old_target:?} to new target {new_target:?} after a delay of: {delay:?}");
pointer_addr = Some((addr.clone(), None, new_target));
break;
}
Err(err) => {
println!(
"Failed to update pointer at {addr:?} with {old_target:?}: {err:?}. Retrying ..."
);
error!(
"Failed to update pointer at {addr:?} with {old_target:?}: {err:?}. Retrying ..."
);
if retries >= 3 {
println!("Failed to update pointer at {addr:?} with {old_target:?} after 3 retries: {err}");
error!("Failed to update pointer at {addr:?} with {old_target:?} after 3 retries: {err}");
bail!("Failed to update pointer at {addr:?} with {old_target:?} after 3 retries: {err}");
}
retries += 1;
}
}
}
} else {
let owner = SecretKey::random();
let pointer_target =
PointerTarget::ChunkAddress(ChunkAddress::new(XorName(rand::random())));
loop {
match client
.pointer_create(&owner, pointer_target.clone(), &wallet)
.await
{
Ok((cost, addr)) => {
println!("Created new Pointer ({pointer_target:?}) at {addr:?} with cost of {cost:?} after a delay of: {delay:?}");
let net_addr = NetworkAddress::PointerAddress(addr);
pointer_addr = Some((net_addr.clone(), Some(owner), pointer_target));
content.write().await.push_back(net_addr);
break;
}
Err(err) => {
println!(
"Failed to create pointer {pointer_target:?}: {err:?}. Retrying ..."
);
error!(
"Failed to create pointer {pointer_target:?}: {err:?}. Retrying ..."
);
if retries >= 3 {
println!("Failed to create pointer {pointer_target:?} after 3 retries: {err}");
error!("Failed to create pointer {pointer_target:?} after 3 retries: {err}");
bail!("Failed to create pointer {pointer_target:?} after 3 retries: {err}");
}
retries += 1;
}
}
}
}
match pointer_addr {
Some((addr, Some(owner), target)) => {
let _ = owners.insert(addr, (owner, target));
}
Some((addr, None, new_target)) => {
if let Some((_owner, target)) = owners.get_mut(&addr) {
*target = new_target;
}
}
_ => {}
}
}
});
handle
}

// Spawns a task which periodically stores Chunks at random locations.
fn store_chunks_task(
client: Client,
Expand Down Expand Up @@ -464,6 +598,10 @@ async fn query_content(client: &Client, net_addr: &NetworkAddress) -> Result<()>
client.data_get_public(*addr.xorname()).await?;
Ok(())
}
NetworkAddress::PointerAddress(addr) => {
let _ = client.pointer_get(*addr).await?;
Ok(())
}
_other => Ok(()), // we don't create/store any other type of content in this test yet
}
}

0 comments on commit ecf105e

Please sign in to comment.