Skip to content

Commit

Permalink
Moved threadpool to separate module to make adding features easier.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoytak committed Jan 24, 2025
1 parent f13a722 commit 36a6bba
Show file tree
Hide file tree
Showing 24 changed files with 84 additions and 37 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ members = [
"utils",
"cas_object",
"cas_types",
"chunk_cache",
"chunk_cache", "xet_threadpool",
]

exclude = ["hf_xet", "chunk_cache_bench"]
Expand Down
1 change: 1 addition & 0 deletions cas_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ utils = { path = "../utils" }
merkledb = { path = "../merkledb" }
mdb_shard = { path = "../mdb_shard" }
merklehash = { path = "../merklehash" }
xet_threadpool = { path = "../xet_threadpool" }
thiserror = "2.0"
tokio = { version = "1.41", features = ["full"] }
async-trait = "0.1.9"
Expand Down
2 changes: 1 addition & 1 deletion cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tracing::{debug, error, trace};
use utils::auth::AuthConfig;
use utils::progress::ProgressUpdater;
use utils::singleflight::Group;
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

use crate::error::Result;
use crate::interface::*;
Expand Down
1 change: 1 addition & 0 deletions data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ utils = { path = "../utils" }
parutils = { path = "../parutils" }
file_utils = { path = "../file_utils" }
error_printer = { path = "../error_printer" }
xet_threadpool = { path = "../xet_threadpool" }
thiserror = "2.0"
tokio = { version = "1.36", features = ["full"] }
anyhow = "1"
Expand Down
2 changes: 1 addition & 1 deletion data/src/bin/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use cas_client::CacheConfig;
use clap::{Args, Parser, Subcommand};
use data::configurations::*;
use data::{PointerFile, PointerFileTranslator, SMALL_FILE_THRESHOLD};
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

#[derive(Parser)]
struct XCommand {
Expand Down
2 changes: 1 addition & 1 deletion data/src/cas_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use cas_client::{CacheConfig, RemoteClient};
use mdb_shard::ShardFileManager;
use tracing::info;
use utils::auth::AuthConfig;
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

use crate::configurations::*;
use crate::errors::Result;
Expand Down
2 changes: 1 addition & 1 deletion data/src/chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use merklehash::compute_data_hash;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

use super::clean::BufferItem;
use crate::errors::{DataProcessingError, Result};
Expand Down
2 changes: 1 addition & 1 deletion data/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::sync::Mutex;
use tokio::task::{JoinHandle, JoinSet};
use tracing::{debug, info, warn};
use utils::progress::ProgressUpdater;
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

use crate::chunking::{chunk_target_default, ChunkYieldType};
use crate::constants::MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES;
Expand Down
2 changes: 1 addition & 1 deletion data/src/data_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use parutils::{tokio_par_for_each, ParallelError};
use tempfile::{tempdir_in, TempDir};
use utils::auth::{AuthConfig, TokenRefresher};
use utils::progress::ProgressUpdater;
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

use crate::configurations::*;
use crate::errors::DataProcessingError;
Expand Down
2 changes: 1 addition & 1 deletion data/src/data_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use mdb_shard::ShardFileManager;
use merklehash::MerkleHash;
use tokio::sync::{Mutex, Semaphore};
use utils::progress::ProgressUpdater;
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

use crate::cas_interface::create_cas_client;
use crate::clean::Cleaner;
Expand Down
2 changes: 1 addition & 1 deletion data/src/parallel_xorb_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use merklehash::MerkleHash;
use tokio::sync::{Mutex, Semaphore};
use tokio::task::JoinSet;
use utils::progress::ProgressUpdater;
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

use crate::data_processing::CASDataAggregator;
use crate::errors::DataProcessingError::*;
Expand Down
2 changes: 1 addition & 1 deletion data/src/remote_shard_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use merklehash::MerkleHash;
use parutils::tokio_par_for_each;
use tokio::task::JoinHandle;
use tracing::{debug, info};
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

use super::configurations::{FileQueryPolicy, StorageConfig};
use super::errors::{DataProcessingError, Result};
Expand Down
14 changes: 14 additions & 0 deletions hf_xet/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hf_xet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pyo3 = { version = "0.23.3", features = [
error_printer = { path = "../error_printer" }
data = { path = "../data" }
utils = { path = "../utils" }
xet_threadpool = { path = "../xet_threadpool" }
tokio = { version = "1.36", features = ["full"] }
parutils = { path = "../parutils" }
tracing = "0.1.*"
Expand Down
2 changes: 1 addition & 1 deletion hf_xet/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing_subscriber::filter::FilterFn;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
use utils::ThreadPool;
use xet_threadpool::ThreadPool;

use crate::log_buffer::{get_telemetry_task, LogBufferLayer, TelemetryTaskInfo, TELEMETRY_PRE_ALLOC_BYTES};

Expand Down
3 changes: 2 additions & 1 deletion hf_xet/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;
use lazy_static::lazy_static;
use pyo3::exceptions::{PyKeyboardInterrupt, PyRuntimeError};
use pyo3::prelude::*;
use utils::threadpool::{MultithreadedRuntimeError, ThreadPool};
use xet_threadpool::errors::MultithreadedRuntimeError;
use xet_threadpool::ThreadPool;

use crate::log;

Expand Down
1 change: 1 addition & 0 deletions utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ path = "src/lib.rs"

[dependencies]
merklehash = { path = "../merklehash" }
xet_threadpool = { path = "../xet_threadpool" }
thiserror = "2.0"
futures = "0.3.28"

Expand Down
2 changes: 0 additions & 2 deletions utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ pub mod auth;
pub mod errors;
pub mod serialization_utils;
pub mod singleflight;
pub mod threadpool;
pub use threadpool::ThreadPool;

mod async_read;
mod output_bytes;
Expand Down
7 changes: 4 additions & 3 deletions utils/src/singleflight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//!
//! use futures::future::join_all;
//! use utils::singleflight::Group;
//! use xet_threadpool;
//!
//! const RES: usize = 7;
//!
Expand All @@ -20,7 +21,7 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let threadpool = Arc::new(utils::ThreadPool::new().unwrap());
//! let threadpool = Arc::new(xet_threadpool::ThreadPool::new().unwrap());
//! let g = Arc::new(Group::<_, ()>::new(threadpool.clone()));
//! let mut handlers = Vec::new();
//! for _ in 0..10 {
Expand Down Expand Up @@ -50,9 +51,9 @@ use parking_lot::RwLock;
use pin_project::{pin_project, pinned_drop};
use tokio::sync::{Mutex, Notify};
use tracing::debug;
use xet_threadpool::ThreadPool;

pub use crate::errors::SingleflightError;
use crate::ThreadPool;

type SingleflightResult<T, E> = Result<T, SingleflightError<E>>;
type CallMap<T, E> = HashMap<String, Arc<Call<T, E>>>;
Expand Down Expand Up @@ -366,11 +367,11 @@ mod tests {
use tokio::sync::{Mutex, Notify};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use xet_threadpool::ThreadPool;

use super::Group;
use crate::errors::SingleflightError;
use crate::singleflight::{Call, OwnerTask};
use crate::ThreadPool;

/// A period of time for waiters to wait for a notification from the owner
/// task. This is expected to be sufficient time for the test futures to
Expand Down
10 changes: 10 additions & 0 deletions xet_threadpool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "xet_threadpool"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.41", features = ["full"] }
thiserror = "2.0"
tracing = "0.1.31"
lazy_static = "1"
18 changes: 18 additions & 0 deletions xet_threadpool/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use thiserror::Error;

/// Define an error time for spawning external threads.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MultithreadedRuntimeError {
#[error("Error Initializing Multithreaded Runtime: {0:?}")]
RuntimeInitializationError(std::io::Error),

#[error("Task Panic: {0:?}.")]
TaskPanic(tokio::task::JoinError),

#[error("Task cancelled; possible runtime shutdown in progress ({0}).")]
TaskCanceled(String),

#[error("Unknown task runtime error: {0}")]
Other(String),
}
4 changes: 4 additions & 0 deletions xet_threadpool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod errors;
pub mod threadpool;

pub use threadpool::ThreadPool;
24 changes: 4 additions & 20 deletions utils/src/threadpool.rs → xet_threadpool/src/threadpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::future::Future;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

use thiserror::Error;
/// This module provides a simple wrapper around Tokio's runtime to create a thread pool
/// with some default settings. It is intended to be used as a singleton thread pool for
/// the entire application.
Expand All @@ -14,7 +13,7 @@ use thiserror::Error;
/// # Example
///
/// ```rust
/// use utils::ThreadPool;
/// use xet_threadpool::ThreadPool;
///
/// let pool = ThreadPool::new().expect("Error initializing runtime.");
///
Expand Down Expand Up @@ -51,31 +50,16 @@ use thiserror::Error;
///
/// - `new_threadpool`: Creates a new Tokio runtime with the specified settings.
use tokio;
use tokio::task::{JoinError, JoinHandle};
use tokio::task::JoinHandle;
use tracing::{debug, error};

use crate::errors::MultithreadedRuntimeError;

const THREADPOOL_NUM_WORKER_THREADS: usize = 4; // 4 active threads
const THREADPOOL_THREAD_ID_PREFIX: &str = "hf-xet"; // thread names will be hf-xet-0, hf-xet-1, etc.
const THREADPOOL_STACK_SIZE: usize = 8_000_000; // 8MB stack size
const THREADPOOL_MAX_BLOCKING_THREADS: usize = 100; // max 100 threads can block IO

/// Define an error time for spawning external threads.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MultithreadedRuntimeError {
#[error("Error Initializing Multithreaded Runtime: {0:?}")]
RuntimeInitializationError(std::io::Error),

#[error("Task Panic: {0:?}.")]
TaskPanic(JoinError),

#[error("Task cancelled; possible runtime shutdown in progress ({0}).")]
TaskCanceled(String),

#[error("Unknown task runtime error: {0}")]
Other(String),
}

#[derive(Debug)]
pub struct ThreadPool {
// This has to allow for exclusive access to enable shutdown when
Expand Down

0 comments on commit 36a6bba

Please sign in to comment.