Skip to content

Commit

Permalink
Remove small file passthrough (#149)
Browse files Browse the repository at this point in the history
Removing all the old code that passes through small text file through to
git. Since we are not using that anymore.
  • Loading branch information
ylow authored Jan 24, 2025
1 parent fb65c8b commit bbcf196
Show file tree
Hide file tree
Showing 9 changed files with 8 additions and 94 deletions.
3 changes: 1 addition & 2 deletions data/src/bin/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use anyhow::Result;
use cas_client::CacheConfig;
use clap::{Args, Parser, Subcommand};
use data::configurations::*;
use data::{PointerFile, PointerFileTranslator, SMALL_FILE_THRESHOLD};
use data::{PointerFile, PointerFileTranslator};
use utils::ThreadPool;

#[derive(Parser)]
Expand Down Expand Up @@ -101,7 +101,6 @@ fn default_clean_config() -> Result<TranslatorConfig> {
},
dedup_config: Some(DedupConfig {
repo_salt: None,
small_file_threshold: SMALL_FILE_THRESHOLD,
global_dedup_policy: Default::default(),
}),
repo_info: Some(RepoInfo {
Expand Down
59 changes: 4 additions & 55 deletions data/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::metrics::FILTER_BYTES_CLEANED;
use crate::parallel_xorb_uploader::XorbUpload;
use crate::remote_shard_interface::RemoteShardInterface;
use crate::repo_salt::RepoSalt;
use crate::small_file_determination::{is_file_passthrough, is_possible_start_to_text_file};
use crate::PointerFile;

// Chunking is the bottleneck, changing batch size doesn't have a big impact.
Expand Down Expand Up @@ -108,7 +107,6 @@ impl ShaGenerator {

pub struct Cleaner {
// Configurations
small_file_threshold: usize,
enable_global_dedup_queries: bool,
cas_prefix: String,
repo_salt: Option<RepoSalt>,
Expand All @@ -129,7 +127,6 @@ pub struct Cleaner {

// Internal Data
tracking_info: Mutex<DedupFileTrackingInfo>,
small_file_buffer: Mutex<Option<Vec<u8>>>,

// Auxiliary info
file_name: Option<PathBuf>,
Expand All @@ -145,7 +142,6 @@ pub struct Cleaner {
impl Cleaner {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new(
small_file_threshold: usize,
enable_global_dedup_queries: bool,
cas_prefix: String,
repo_salt: Option<RepoSalt>,
Expand All @@ -166,7 +162,6 @@ impl Cleaner {
let chunker = chunk_target_default(data_c, chunk_p, threadpool.clone());

let cleaner = Arc::new(Cleaner {
small_file_threshold,
enable_global_dedup_queries,
cas_prefix,
repo_salt,
Expand All @@ -178,7 +173,6 @@ impl Cleaner {
chunking_worker: Mutex::new(Some(chunker)),
dedup_worker: Mutex::new(None),
tracking_info: Mutex::new(Default::default()),
small_file_buffer: Mutex::new(Some(Vec::with_capacity(small_file_threshold))),
file_name: file_name.map(|f| f.to_owned()),
sha_generator: ShaGenerator::new(),
metrics: CleanMetrics {
Expand All @@ -200,9 +194,8 @@ impl Cleaner {
self.metrics.file_size.fetch_add(data.len() as u64, Ordering::Relaxed);

self.sha_generator.update(&data)?;
if !self.check_passthrough_status(&data).await? {
self.add_data_to_chunking(BufferItem::Value(data)).await?
}

self.add_data_to_chunking(BufferItem::Value(data)).await?;

Ok(())
}
Expand All @@ -212,15 +205,8 @@ impl Cleaner {

let file_size = self.metrics.file_size.load(Ordering::Relaxed);

// File is small, all data kept in the small file buffer.
let mut small_file_buffer = self.small_file_buffer.lock().await;
let (new_bytes, return_file) = if let Some(buffer) = small_file_buffer.take() {
let small_file = String::from_utf8(buffer)?;
(small_file.len() as u64, small_file)
} else {
let new_bytes = self.metrics.new_bytes_after_dedup.load(Ordering::Relaxed);
(new_bytes, self.to_pointer_file().await?)
};
let new_bytes = self.metrics.new_bytes_after_dedup.load(Ordering::Relaxed);
let return_file = self.to_pointer_file().await?;

let current_time = SystemTime::now();
let start: DateTime<Utc> = self.metrics.start_time.into();
Expand Down Expand Up @@ -308,32 +294,6 @@ impl Cleaner {
Ok(())
}

/// Check passthrough condition of data.
/// Return true if the incoming data is already processed inside,
/// otherwise return false and let the caller to handle the data.
async fn check_passthrough_status(&self, data: &[u8]) -> Result<bool> {
let mut small_file_buffer = self.small_file_buffer.lock().await;

if let Some(mut buffer) = small_file_buffer.take() {
buffer.extend_from_slice(data);

if !is_possible_start_to_text_file(&buffer) || buffer.len() >= self.small_file_threshold {
self.add_data_to_chunking(BufferItem::Value(buffer)).await?;

// not passthrough, but just sent all buffered data + incoming data to chunker
return Ok(true);
}

*small_file_buffer = Some(buffer);

// may be passthrough, keep accumulating
return Ok(true);
}

// not passthrough, already sent all buffered data to chunker
Ok(false)
}

async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<()> {
debug!("Dedup {} chunks", chunks.len());
let mut tracking_info = self.tracking_info.lock().await;
Expand Down Expand Up @@ -574,17 +534,6 @@ impl Cleaner {
async fn finish(&self) -> Result<()> {
self.task_is_running().await?;

// check if there is remaining data in buffer
let mut small_file_buffer = self.small_file_buffer.lock().await;
if let Some(buffer) = small_file_buffer.take() {
if !is_file_passthrough(&buffer, self.small_file_threshold) {
self.add_data_to_chunking(BufferItem::Value(buffer)).await?;
} else {
// put back for return value
*small_file_buffer = Some(buffer);
}
}

// signal finish
self.add_data_to_chunking(BufferItem::Completed).await?;

Expand Down
1 change: 0 additions & 1 deletion data/src/configurations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub struct StorageConfig {
#[derive(Debug)]
pub struct DedupConfig {
pub repo_salt: Option<RepoSalt>,
pub small_file_threshold: usize,
pub global_dedup_policy: GlobalDedupPolicy,
}

Expand Down
9 changes: 0 additions & 9 deletions data/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@ pub const GIT_MAX_PACKET_SIZE: usize = 65516;
/// See gitxetcore::data::pointer_file for the explanation for this limit.
pub const POINTER_FILE_LIMIT: usize = 150;

/// If a file has size smaller than this threshold, AND if it "looks-like"
/// text, we interpret this as a text file and passthrough the file, letting
/// git handle it. See `small_file_determination.rs` for details.
///
/// We set this to be 1 less than a constant multiple of the GIT_MAX_PACKET_SIZE
/// so we can read exactly up to that multiple of packets to determine if it
/// is a small file.
pub const SMALL_FILE_THRESHOLD: usize = 4 * GIT_MAX_PACKET_SIZE - 1;

// Salt is 256-bit in length.
pub const REPO_SALT_LEN: usize = 32;

Expand Down
4 changes: 0 additions & 4 deletions data/src/data_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ lazy_static! {
}
const MAX_CONCURRENT_DOWNLOADS: usize = 8; // Download is not CPU-bound

// We now process every file delegated from the Python library.
const SMALL_FILE_THRESHOLD: usize = 1;

const DEFAULT_CAS_ENDPOINT: &str = "http://localhost:8080";
const READ_BLOCK_SIZE: usize = 1024 * 1024;

Expand Down Expand Up @@ -76,7 +73,6 @@ pub fn default_config(
},
dedup_config: Some(DedupConfig {
repo_salt: None,
small_file_threshold: SMALL_FILE_THRESHOLD,
global_dedup_policy: Default::default(),
}),
repo_info: Some(RepoInfo {
Expand Down
1 change: 0 additions & 1 deletion data/src/data_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ impl PointerFileTranslator {
};

Cleaner::new(
dedup.small_file_threshold,
matches!(dedup.global_dedup_policy, GlobalDedupPolicy::Always),
self.config.cas_storage_config.prefix.clone(),
dedup.repo_salt,
Expand Down
2 changes: 0 additions & 2 deletions data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ mod pointer_file;
mod remote_shard_interface;
mod repo_salt;
mod shard_interface;
mod small_file_determination;
mod test_utils;

pub use cas_client::CacheConfig;
pub use constants::SMALL_FILE_THRESHOLD;
pub use data_processing::PointerFileTranslator;
pub use pointer_file::PointerFile;
19 changes: 0 additions & 19 deletions data/src/small_file_determination.rs

This file was deleted.

4 changes: 3 additions & 1 deletion data/tests/integration_tests/test_basic_clean_smudge.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ assert_files_equal large.dat large.dat.2
create_text_file small.txt key1 100 1

x clean -d small.pft small.txt
assert_files_equal small.pft small.txt # not converted to pointer file
assert_is_pointer_file small.pft
x smudge -f small.pft small.txt.2
assert_files_equal small.txt small.txt.2

0 comments on commit bbcf196

Please sign in to comment.