diff --git a/data/src/clean.rs b/data/src/clean.rs
index 4ea8c58..d1d870d 100644
--- a/data/src/clean.rs
+++ b/data/src/clean.rs
@@ -1,4 +1,4 @@
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
 use std::mem::take;
 use std::ops::DerefMut;
 use std::path::{Path, PathBuf};
@@ -26,7 +26,10 @@ use utils::progress::ProgressUpdater;
 use xet_threadpool::ThreadPool;
 
 use crate::chunking::{chunk_target_default, ChunkYieldType};
-use crate::constants::MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES;
+use crate::constants::{
+    DEFAULT_MIN_N_CHUNKS_PER_RANGE, MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR, MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES,
+    NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR,
+};
 use crate::data_processing::CASDataAggregator;
 use crate::errors::DataProcessingError::*;
 use crate::errors::Result;
@@ -44,6 +47,13 @@ lazy_static! {
         .unwrap_or(1);
 }
 
+lazy_static! {
+    pub static ref MIN_N_CHUNKS_PER_RANGE: f32 = std::env::var("XET_MIN_N_CHUNKS_PER_RANGE")
+        .ok()
+        .and_then(|s| s.parse().ok())
+        .unwrap_or(DEFAULT_MIN_N_CHUNKS_PER_RANGE);
+}
+
 pub enum BufferItem<T: Send + Sync + 'static> {
     Value(T),
     Completed,
@@ -56,6 +66,39 @@ struct DedupFileTrackingInfo {
     current_cas_file_info_indices: Vec<usize>,
     current_cas_block_hashes: HashMap<MerkleHash, usize>,
     cas_data: CASDataAggregator,
+    /// This tracks the number of chunks in each of the last N ranges
+    rolling_last_nranges: VecDeque<usize>,
+    /// This tracks the total number of chunks in the last N ranges
+    rolling_nranges_chunks: usize,
+    /// Used to provide some hysteresis on the defrag decision
+    /// chooses between MIN_N_CHUNKS_PER_RANGE
+    /// or MIN_N_CHUNKS_PER_RANGE * HYSTERESIS_FACTOR (hysteresis factor < 1.0)
+    defrag_at_low_threshold: bool,
+}
+
+impl DedupFileTrackingInfo {
+    fn increment_last_range_in_fragmentation_estimate(&mut self, nchunks: usize) {
+        if let Some(back) = self.rolling_last_nranges.back_mut() {
+            *back += nchunks;
+            self.rolling_nranges_chunks += nchunks;
+        }
+    }
+    fn add_range_to_fragmentation_estimate(&mut self, nchunks: usize) {
+        self.rolling_last_nranges.push_back(nchunks);
+        self.rolling_nranges_chunks += nchunks;
+        if self.rolling_last_nranges.len() > NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR {
+            self.rolling_nranges_chunks -= self.rolling_last_nranges.pop_front().unwrap();
+        }
+    }
+    /// Returns the average number of chunks per range
+    /// None if there is is not enough data for an estimate
+    fn rolling_chunks_per_range(&self) -> Option<f32> {
+        if self.rolling_last_nranges.len() < NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR {
+            None
+        } else {
+            Some(self.rolling_nranges_chunks as f32 / self.rolling_last_nranges.len() as f32)
+        }
+    }
 }
 
 #[derive(Debug)]
@@ -422,8 +465,41 @@ impl Cleaner {
 
         while cur_idx < chunks.len() {
             let mut n_bytes = 0;
+            let mut dedupe_query = deduped_blocks[cur_idx].take();
+
+            // check the fragmentation state and if it is pretty fragmented
+            // we skip dedupe
+            let mut forced_nodedupe = false;
+            if let Some((n_deduped, _)) = dedupe_query {
+                if let Some(chunks_per_range) = tracking_info.rolling_chunks_per_range() {
+                    let target_cpr = if tracking_info.defrag_at_low_threshold {
+                        (*MIN_N_CHUNKS_PER_RANGE) * MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR
+                    } else {
+                        *MIN_N_CHUNKS_PER_RANGE
+                    };
+                    if chunks_per_range < target_cpr {
+                        // chunks per range is pretty poor, we should not dedupe.
+                        // However, here we do get to look ahead a little bit
+                        // and check the size of the next dedupe window.
+                        // if it is too small, it is not going to improve
+                        // the chunks per range and so we skip it.
+                        if (n_deduped as f32) < chunks_per_range {
+                            dedupe_query = None;
+                            forced_nodedupe = true;
+                            // once I start skipping dedupe, we try to raise
+                            // the cpr to the high threshold
+                            tracking_info.defrag_at_low_threshold = false;
+                        }
+                    } else {
+                        // once I start deduping again, we lower CPR
+                        // to the low threshold so we allow for more small
+                        // fragments.
+                        tracking_info.defrag_at_low_threshold = true;
+                    }
+                }
+            }
 
-            if let Some((n_deduped, fse)) = deduped_blocks[cur_idx].take() {
+            if let Some((n_deduped, fse)) = dedupe_query {
                 // We found one or more chunk hashes present in a cas block somewhere.
 
                 // Update all the metrics.
@@ -442,9 +518,12 @@ impl Cleaner {
                     let last_entry = tracking_info.file_info.last_mut().unwrap();
                     last_entry.unpacked_segment_bytes += n_bytes as u32;
                     last_entry.chunk_index_end = fse.chunk_index_end;
+                    // update the fragmentation estimation window
+                    tracking_info.increment_last_range_in_fragmentation_estimate(n_deduped);
                 } else {
                     // This block is new
                     tracking_info.file_info.push(fse);
+                    tracking_info.add_range_to_fragmentation_estimate(n_deduped);
                 }
 
                 cur_idx += n_deduped;
@@ -456,7 +535,8 @@ impl Cleaner {
                 // This is new data.
                 let add_new_data;
 
-                if let Some(idx) = tracking_info.current_cas_block_hashes.get(&chunk.hash) {
+                if tracking_info.current_cas_block_hashes.contains_key(&chunk.hash) && !forced_nodedupe {
+                    let idx = tracking_info.current_cas_block_hashes.get(&chunk.hash).unwrap();
                     let idx = *idx;
                     // This chunk will get the CAS hash updated when the local CAS block
                     // is full and registered.
@@ -481,6 +561,7 @@ impl Cleaner {
                     last_entry.unpacked_segment_bytes += n_bytes as u32;
                     last_entry.chunk_index_end += 1;
                     add_new_data = true;
+                    tracking_info.increment_last_range_in_fragmentation_estimate(1);
                 } else {
                     // This block is unrelated to the previous one.
                     // This chunk will get the CAS hash updated when the local CAS block
@@ -495,6 +576,7 @@ impl Cleaner {
                         chunk_len,
                         chunk_len + 1,
                     ));
+                    tracking_info.add_range_to_fragmentation_estimate(1);
                     add_new_data = true;
                 }
 
diff --git a/data/src/constants.rs b/data/src/constants.rs
index f7cb02d..fdf78c5 100644
--- a/data/src/constants.rs
+++ b/data/src/constants.rs
@@ -33,3 +33,15 @@ pub const CURRENT_VERSION: &str = env!("CARGO_PKG_VERSION");
 /// Maximum number of entries in the file construction cache
 /// which stores File Hash -> reconstruction instructions
 pub const FILE_RECONSTRUCTION_CACHE_SIZE: usize = 65536;
+
+/// Number of ranges to use when estimating fragmentation
+pub const NRANGES_IN_STREAMING_FRAGMENTATION_ESTIMATOR: usize = 128;
+
+/// Minimum number of chunks per range. Used to control fragmentation
+/// This targets an average of 1MB per range.
+/// The hysteresis factor multiplied by the target Chunks Per Range (CPR) controls
+/// the low end of the hysteresis range. Basically, dedupe will stop
+/// when CPR drops below hysteresis * target_cpr, and will start again when
+/// CPR increases above target CPR.
+pub const MIN_N_CHUNKS_PER_RANGE_HYSTERESIS_FACTOR: f32 = 0.5;
+pub const DEFAULT_MIN_N_CHUNKS_PER_RANGE: f32 = 8.0;