Skip to content

Commit

Permalink
Fragmentation Prevention (#147)
Browse files Browse the repository at this point in the history
For https://linear.app/xet/issue/XET-246/fragmentation-prevention We use
average chunks / range as a fragmentation estimator, targetting an
average of 16 chunks per range which roughly equates to 1MB per range.
This is computed over the last window of 32 ranges. If the average drops
below the target, dedupe is disabled until the average is above the
target again.

Running on first 1GB of a *highly* fragmented file (comprising of a few
hundred KB of an existing file, followed by a hundred KB of zeros,
repeat) we see the following:
- Baseline: 1000000001 bytes -> 726845953 bytes, 2975 ranges, 336134
average bytes per range
- 512KB target (anti-fragmentation goal of 8 chunk per range):
1000000001 bytes -> 873515521 bytes, 1465 ranges, 682594 average bytes
per range
- 1MB target (anti-fragmentation goal of 16 chunks per range):
1000000001 bytes -> 932235777 bytes, 829 ranges, 1206273 average bytes
per range

This also includes a hysteresis implementation:
- 512KB target (anti-fragmentation goal of 8 chunk per range):
1000000001 bytes -> 873515521 bytes, 1657 ranges, 603500 average bytes
per range.


The hysteresis turned out to be pretty important for deduping a content
defined chunked variant of Parquet:
Without hysteresis (only concern is how v2 dedupes against v1):
``` 
parquet file v1: 5728317968 bytes -> 5728137283 bytes
parquet file v2: 5726717793 bytes -> 4544391399 bytes (11.14 chunks per range)
```
With hysteresis
``` 
parquet file v1: 5728317968 bytes -> 5728137283 bytes
parquet file v2: 5726717793 bytes -> 3568275084 bytes (8.11 chunks per range)
```
So with the hysteresis implementation we are closer to the target chunk
per range and we are able to still dedupe pretty well. As comparison,
*without* any fragmentation prevention:
```
parquet file v1: 5728317968 bytes -> 5728137283 bytes
parquet file v2: 5726717793 bytes -> 3402767500 bytes (6.89 chunks per segment)
```
  • Loading branch information
ylow authored Jan 30, 2025
1 parent b015f31 commit 5cf29c1
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 4 deletions.
90 changes: 86 additions & 4 deletions data/src/clean.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::mem::take;
use std::ops::DerefMut;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -26,7 +26,10 @@ use utils::progress::ProgressUpdater;
use xet_threadpool::ThreadPool;

use crate::chunking::{chunk_target_default, ChunkYieldType};
use crate::constants::MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES;
use crate::constants::{
DEFAULT_MIN_N_CHUNKS_PER_RANGE, MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR, MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES,
NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR,
};
use crate::data_processing::CASDataAggregator;
use crate::errors::DataProcessingError::*;
use crate::errors::Result;
Expand All @@ -44,6 +47,13 @@ lazy_static! {
.unwrap_or(1);
}

lazy_static! {
pub static ref MIN_N_CHUNKS_PER_RANGE: f32 = std::env::var("XET_MIN_N_CHUNKS_PER_RANGE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_MIN_N_CHUNKS_PER_RANGE);
}

pub enum BufferItem<T: Send + Sync + 'static> {
Value(T),
Completed,
Expand All @@ -56,6 +66,39 @@ struct DedupFileTrackingInfo {
current_cas_file_info_indices: Vec<usize>,
current_cas_block_hashes: HashMap<MerkleHash, usize>,
cas_data: CASDataAggregator,
/// This tracks the number of chunks in each of the last N ranges
rolling_last_nranges: VecDeque<usize>,
/// This tracks the total number of chunks in the last N ranges
rolling_nranges_chunks: usize,
/// Used to provide some hysteresis on the defrag decision
/// chooses between MIN_N_CHUNKS_PER_RANGE
/// or MIN_N_CHUNKS_PER_RANGE * HYSTERESIS_FACTOR (hysteresis factor < 1.0)
defrag_at_low_threshold: bool,
}

impl DedupFileTrackingInfo {
fn increment_last_range_in_fragmentation_estimate(&mut self, nchunks: usize) {
if let Some(back) = self.rolling_last_nranges.back_mut() {
*back += nchunks;
self.rolling_nranges_chunks += nchunks;
}
}
fn add_range_to_fragmentation_estimate(&mut self, nchunks: usize) {
self.rolling_last_nranges.push_back(nchunks);
self.rolling_nranges_chunks += nchunks;
if self.rolling_last_nranges.len() > NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR {
self.rolling_nranges_chunks -= self.rolling_last_nranges.pop_front().unwrap();
}
}
/// Returns the average number of chunks per range
/// None if there is is not enough data for an estimate
fn rolling_chunks_per_range(&self) -> Option<f32> {
if self.rolling_last_nranges.len() < NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR {
None
} else {
Some(self.rolling_nranges_chunks as f32 / self.rolling_last_nranges.len() as f32)
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -422,8 +465,41 @@ impl Cleaner {

while cur_idx < chunks.len() {
let mut n_bytes = 0;
let mut dedupe_query = deduped_blocks[cur_idx].take();

// check the fragmentation state and if it is pretty fragmented
// we skip dedupe
let mut forced_nodedupe = false;
if let Some((n_deduped, _)) = dedupe_query {
if let Some(chunks_per_range) = tracking_info.rolling_chunks_per_range() {
let target_cpr = if tracking_info.defrag_at_low_threshold {
(*MIN_N_CHUNKS_PER_RANGE) * MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR
} else {
*MIN_N_CHUNKS_PER_RANGE
};
if chunks_per_range < target_cpr {
// chunks per range is pretty poor, we should not dedupe.
// However, here we do get to look ahead a little bit
// and check the size of the next dedupe window.
// if it is too small, it is not going to improve
// the chunks per range and so we skip it.
if (n_deduped as f32) < chunks_per_range {
dedupe_query = None;
forced_nodedupe = true;
// once I start skipping dedupe, we try to raise
// the cpr to the high threshold
tracking_info.defrag_at_low_threshold = false;
}
} else {
// once I start deduping again, we lower CPR
// to the low threshold so we allow for more small
// fragments.
tracking_info.defrag_at_low_threshold = true;
}
}
}

if let Some((n_deduped, fse)) = deduped_blocks[cur_idx].take() {
if let Some((n_deduped, fse)) = dedupe_query {
// We found one or more chunk hashes present in a cas block somewhere.

// Update all the metrics.
Expand All @@ -442,9 +518,12 @@ impl Cleaner {
let last_entry = tracking_info.file_info.last_mut().unwrap();
last_entry.unpacked_segment_bytes += n_bytes as u32;
last_entry.chunk_index_end = fse.chunk_index_end;
// update the fragmentation estimation window
tracking_info.increment_last_range_in_fragmentation_estimate(n_deduped);
} else {
// This block is new
tracking_info.file_info.push(fse);
tracking_info.add_range_to_fragmentation_estimate(n_deduped);
}

cur_idx += n_deduped;
Expand All @@ -456,7 +535,8 @@ impl Cleaner {
// This is new data.
let add_new_data;

if let Some(idx) = tracking_info.current_cas_block_hashes.get(&chunk.hash) {
if tracking_info.current_cas_block_hashes.contains_key(&chunk.hash) && !forced_nodedupe {
let idx = tracking_info.current_cas_block_hashes.get(&chunk.hash).unwrap();
let idx = *idx;
// This chunk will get the CAS hash updated when the local CAS block
// is full and registered.
Expand All @@ -481,6 +561,7 @@ impl Cleaner {
last_entry.unpacked_segment_bytes += n_bytes as u32;
last_entry.chunk_index_end += 1;
add_new_data = true;
tracking_info.increment_last_range_in_fragmentation_estimate(1);
} else {
// This block is unrelated to the previous one.
// This chunk will get the CAS hash updated when the local CAS block
Expand All @@ -495,6 +576,7 @@ impl Cleaner {
chunk_len,
chunk_len + 1,
));
tracking_info.add_range_to_fragmentation_estimate(1);
add_new_data = true;
}

Expand Down
12 changes: 12 additions & 0 deletions data/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,15 @@ pub const CURRENT_VERSION: &str = env!("CARGO_PKG_VERSION");
/// Maximum number of entries in the file construction cache
/// which stores File Hash -> reconstruction instructions
pub const FILE_RECONSTRUCTION_CACHE_SIZE: usize = 65536;

/// Number of ranges to use when estimating fragmentation
pub const NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR: usize = 128;

/// Minimum number of chunks per range. Used to control fragmentation
/// This targets an average of 1MB per range.
/// The hysteresis factor multiplied by the target Chunks Per Range (CPR) controls
/// the low end of the hysteresis range. Basically, dedupe will stop
/// when CPR drops below hysteresis * target_cpr, and will start again when
/// CPR increases above target CPR.
pub const MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR: f32 = 0.5;
pub const DEFAULT_MIN_N_CHUNKS_PER_RANGE: f32 = 8.0;

0 comments on commit 5cf29c1

Please sign in to comment.