diff --git a/Cargo.lock b/Cargo.lock index 57ca85b1335..6569f3008d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1348,6 +1348,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "chroma-sysdb" +version = "0.1.0" +dependencies = [ + "async-trait", + "chroma-config", + "chroma-error", + "chroma-types", + "futures", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", + "parking_lot", + "serde", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tonic", + "tracing", + "tracing-bunyan-formatter", + "tracing-opentelemetry", + "tracing-subscriber", +] + [[package]] name = "chroma-system" version = "0.1.0" @@ -2378,6 +2402,11 @@ name = "garbage_collector" version = "0.1.0" dependencies = [ "async-trait", + "chroma-config", + "chroma-error", + "chroma-sysdb", + "chroma-system", + "chroma-types", "chrono", "clap", "figment", @@ -6832,6 +6861,7 @@ dependencies = [ "chroma-error", "chroma-index", "chroma-storage", + "chroma-sysdb", "chroma-system", "chroma-types", "criterion", diff --git a/Cargo.toml b/Cargo.toml index 38b45a7899e..9c7c3d79733 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/system", "rust/types", "rust/worker"] +members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/system", "rust/sysdb", "rust/types", "rust/worker"] [workspace.dependencies] arrow = "52.2.0" @@ -44,6 +44,7 @@ chroma-types = { path = "rust/types" } chroma-index = { path = "rust/index" } chroma-distance = { path = "rust/distance" } chroma-system = { path = "rust/system" } +chroma-sysdb = { path = "rust/sysdb" } worker = { path = "rust/worker" } # Dev dependencies diff --git a/rust/garbage_collector/Cargo.toml b/rust/garbage_collector/Cargo.toml index 77fcd3ab2c9..98a1e3cc14e 100644 --- a/rust/garbage_collector/Cargo.toml +++ b/rust/garbage_collector/Cargo.toml @@ -33,3 +33,9 @@ opentelemetry = { workspace = true } opentelemetry-otlp = { workspace = true } opentelemetry-http = { workspace = true } opentelemetry_sdk = { workspace = true } + +chroma-config = { workspace = true } +chroma-error = { workspace = true } +chroma-system = { workspace = true } +chroma-types = { workspace = true } +chroma-sysdb = { workspace = true } diff --git a/rust/garbage_collector/garbage_collector_config.yaml b/rust/garbage_collector/garbage_collector_config.yaml index e32a6a4f732..afd6b00cf48 100644 --- a/rust/garbage_collector/garbage_collector_config.yaml +++ b/rust/garbage_collector/garbage_collector_config.yaml @@ -3,9 +3,13 @@ otel_endpoint: "http://otel-collector:4317" cutoff_time_hours: 12 # GC all versions created at time < now() - cutoff_time_hours max_collections_to_gc: 1000 # Maximum number of collections to GC in one run gc_interval_mins: 120 # Run GC every x mins -disallow_collection_names: [] -sysdb_connection: +disallow_collections: [] # collection ids to disable GC on +sysdb_config: host: "sysdb.chroma" port: 50051 connect_timeout_ms: 60000 request_timeout_ms: 60000 +dispatcher_config: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 diff --git a/rust/garbage_collector/src/config.rs b/rust/garbage_collector/src/config.rs index 1e4b3c822b8..00788b0c670 100644 --- a/rust/garbage_collector/src/config.rs +++ b/rust/garbage_collector/src/config.rs @@ -1,3 +1,4 @@ +use chroma_system::DispatcherConfig; use figment::providers::{Env, Format, Yaml}; const DEFAULT_CONFIG_PATH: &str = "./garbage_collector_config.yaml"; @@ -8,21 +9,12 @@ const DEFAULT_CONFIG_PATH: &str = "./garbage_collector_config.yaml"; pub(super) struct GarbageCollectorConfig { pub(super) service_name: String, pub(super) otel_endpoint: String, - cutoff_time_hours: u32, - max_collections_to_gc: u32, - gc_interval_mins: u32, - disallow_collection_names: Vec, - sysdb_connection: SysdbConnectionConfig, -} - -#[allow(dead_code)] -#[derive(Debug, serde::Deserialize)] -// TODO(Sanket): Remove this dead code annotation. -pub(super) struct SysdbConnectionConfig { - host: String, - port: u32, - connect_timeout_ms: u32, - request_timeout_ms: u32, + pub(super) cutoff_time_hours: u32, + pub(super) max_collections_to_gc: u32, + pub(super) gc_interval_mins: u32, + pub(super) disallow_collections: Vec, + pub(super) sysdb_config: chroma_sysdb::GrpcSysDbConfig, + pub(super) dispatcher_config: DispatcherConfig, } impl GarbageCollectorConfig { @@ -60,10 +52,13 @@ mod tests { assert_eq!(config.max_collections_to_gc, 1000); assert_eq!(config.gc_interval_mins, 120); let empty_vec: Vec = vec![]; - assert_eq!(config.disallow_collection_names, empty_vec); - assert_eq!(config.sysdb_connection.host, "sysdb.chroma"); - assert_eq!(config.sysdb_connection.port, 50051); - assert_eq!(config.sysdb_connection.connect_timeout_ms, 60000); - assert_eq!(config.sysdb_connection.request_timeout_ms, 60000); + assert_eq!(config.disallow_collections, empty_vec); + assert_eq!(config.sysdb_config.host, "sysdb.chroma"); + assert_eq!(config.sysdb_config.port, 50051); + assert_eq!(config.sysdb_config.connect_timeout_ms, 60000); + assert_eq!(config.sysdb_config.request_timeout_ms, 60000); + assert_eq!(config.dispatcher_config.num_worker_threads, 4); + assert_eq!(config.dispatcher_config.dispatcher_queue_size, 100); + assert_eq!(config.dispatcher_config.worker_queue_size, 100); } } diff --git a/rust/garbage_collector/src/garbage_collector_component.rs b/rust/garbage_collector/src/garbage_collector_component.rs new file mode 100644 index 00000000000..a525615162b --- /dev/null +++ b/rust/garbage_collector/src/garbage_collector_component.rs @@ -0,0 +1,120 @@ +use std::{collections::HashSet, str::FromStr, time::Duration}; + +use async_trait::async_trait; +use chroma_config::Configurable; +use chroma_error::ChromaError; +use chroma_sysdb::{SysDb, SysDbConfig}; +use chroma_system::{Component, ComponentContext, ComponentHandle, Dispatcher, Handler}; +use chroma_types::CollectionUuid; +use tracing::span; +use uuid::Uuid; + +use crate::config::GarbageCollectorConfig; + +#[derive(Debug)] +#[allow(dead_code)] +pub(crate) struct GarbageCollector { + gc_interval_mins: u64, + cutoff_time_hours: u32, + max_collections_to_gc: u32, + disabled_collections: HashSet, + sysdb_client: Box, + dispatcher: Option>, + system: Option, +} + +impl GarbageCollector { + pub fn new( + gc_interval_mins: u64, + cutoff_time_hours: u32, + max_collections_to_gc: u32, + disabled_collections: HashSet, + sysdb_client: Box, + ) -> Self { + Self { + gc_interval_mins, + cutoff_time_hours, + max_collections_to_gc, + disabled_collections, + sysdb_client, + dispatcher: None, + system: None, + } + } + + pub(crate) fn set_dispatcher(&mut self, dispatcher: ComponentHandle) { + self.dispatcher = Some(dispatcher); + } + + pub(crate) fn set_system(&mut self, system: chroma_system::System) { + self.system = Some(system); + } +} + +#[async_trait] +impl Component for GarbageCollector { + fn get_name() -> &'static str { + "GarbageCollector" + } + + fn queue_size(&self) -> usize { + 1000 + } + + async fn start(&mut self, ctx: &ComponentContext) { + ctx.scheduler.schedule( + GarbageCollectMessage {}, + Duration::from_secs(self.gc_interval_mins * 60), + ctx, + || Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction")), + ); + } +} + +#[derive(Debug)] +struct GarbageCollectMessage {} + +#[async_trait] +impl Handler for GarbageCollector { + type Result = (); + + async fn handle( + &mut self, + _message: GarbageCollectMessage, + _ctx: &ComponentContext, + ) -> Self::Result { + // TODO(Sanket): Implement the garbage collection logic. + todo!() + } +} + +#[async_trait] +impl Configurable for GarbageCollector { + async fn try_from_config( + config: &GarbageCollectorConfig, + ) -> Result> { + let sysdb_config = SysDbConfig::Grpc(config.sysdb_config.clone()); + let sysdb_client = chroma_sysdb::from_config(&sysdb_config).await?; + + let mut disabled_collections = HashSet::new(); + for collection_id_str in config.disallow_collections.iter() { + let collection_uuid = match Uuid::from_str(collection_id_str) { + Ok(uuid) => uuid, + Err(e) => { + // TODO(Sanket): Return a proper error here. + panic!("Invalid collection id: {}", e); + } + }; + let collection_id = CollectionUuid(collection_uuid); + disabled_collections.insert(collection_id); + } + + Ok(GarbageCollector::new( + config.gc_interval_mins as u64, + config.cutoff_time_hours, + config.max_collections_to_gc, + disabled_collections, + sysdb_client, + )) + } +} diff --git a/rust/garbage_collector/src/lib.rs b/rust/garbage_collector/src/lib.rs index e321381c3db..1e0298e5b3c 100644 --- a/rust/garbage_collector/src/lib.rs +++ b/rust/garbage_collector/src/lib.rs @@ -1,16 +1,41 @@ +use chroma_config::Configurable; +use chroma_system::{Dispatcher, System}; use config::GarbageCollectorConfig; +use garbage_collector_component::GarbageCollector; use opentelemetry_config::init_otel_tracing; mod config; +mod garbage_collector_component; mod opentelemetry_config; +const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH"; + pub async fn garbage_collector_service_entrypoint() { // Parse configuration. Configuration includes sysdb connection details, and - // otel details. - let config = GarbageCollectorConfig::load(); + // gc run details amongst others. + let config = match std::env::var(CONFIG_PATH_ENV_VAR) { + Ok(config_path) => GarbageCollectorConfig::load_from_path(&config_path), + Err(_) => GarbageCollectorConfig::load(), + }; // Enable OTEL tracing. init_otel_tracing(&config.service_name, &config.otel_endpoint); + // Setup the dispatcher and the pool of workers. + let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config) + .await + .expect("Failed to create dispatcher from config"); + + let system = System::new(); + let dispatcher_handle = system.start_component(dispatcher); + // Start a background task to periodically check for garbage. - todo!() + // Garbage collector is a component that gets notified every + // gc_interval_mins to check for garbage. + let mut garbage_collector_component = GarbageCollector::try_from_config(&config) + .await + .expect("Failed to create garbage collector component"); + garbage_collector_component.set_dispatcher(dispatcher_handle); + garbage_collector_component.set_system(system.clone()); + + let _ = system.start_component(garbage_collector_component); } diff --git a/rust/sysdb/Cargo.toml b/rust/sysdb/Cargo.toml new file mode 100644 index 00000000000..15ce7001091 --- /dev/null +++ b/rust/sysdb/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "chroma-sysdb" +version = "0.1.0" +edition = "2021" + +[lib] +path = "src/lib.rs" + +[dependencies] +serde = { workspace = true } +futures = { workspace = true } +thiserror = { workspace = true } +async-trait = { workspace = true } +tracing-bunyan-formatter = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry-otlp = { workspace = true } +opentelemetry_sdk = { workspace = true } +tracing = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +tonic = { workspace = true } +parking_lot = { workspace = true } + +chroma-config = { workspace = true } +chroma-error = { workspace = true } +chroma-types = { workspace = true } diff --git a/rust/sysdb/src/config.rs b/rust/sysdb/src/config.rs new file mode 100644 index 00000000000..e74d75e2991 --- /dev/null +++ b/rust/sysdb/src/config.rs @@ -0,0 +1,26 @@ +use chroma_config::Configurable; +use chroma_error::ChromaError; +use serde::Deserialize; + +use crate::{GrpcSysDb, SysDb}; + +#[derive(Deserialize, Debug, Clone)] +pub struct GrpcSysDbConfig { + pub host: String, + pub port: u16, + pub connect_timeout_ms: u64, + pub request_timeout_ms: u64, +} + +#[derive(Deserialize, Debug)] +pub enum SysDbConfig { + Grpc(GrpcSysDbConfig), +} + +pub async fn from_config(config: &SysDbConfig) -> Result, Box> { + match &config { + SysDbConfig::Grpc(_) => Ok(Box::new(SysDb::Grpc( + GrpcSysDb::try_from_config(config).await?, + ))), + } +} diff --git a/rust/sysdb/src/lib.rs b/rust/sysdb/src/lib.rs new file mode 100644 index 00000000000..dc1f4bc8657 --- /dev/null +++ b/rust/sysdb/src/lib.rs @@ -0,0 +1,8 @@ +pub mod config; +#[allow(clippy::module_inception)] +pub mod sysdb; +pub mod test_sysdb; +mod util; +pub use config::*; +pub use sysdb::*; +pub use test_sysdb::*; diff --git a/rust/worker/src/sysdb/sysdb.rs b/rust/sysdb/src/sysdb.rs similarity index 97% rename from rust/worker/src/sysdb/sysdb.rs rename to rust/sysdb/src/sysdb.rs index f28598c2865..e72574cbd80 100644 --- a/rust/worker/src/sysdb/sysdb.rs +++ b/rust/sysdb/src/sysdb.rs @@ -1,6 +1,6 @@ use super::config::SysDbConfig; use super::test_sysdb::TestSysDb; -use crate::tracing::util::client_interceptor; +use crate::util::client_interceptor; use async_trait::async_trait; use chroma_config::Configurable; use chroma_error::{ChromaError, ErrorCodes}; @@ -20,14 +20,14 @@ use tonic::Request; use tonic::Status; #[derive(Debug, Clone)] -pub(crate) enum SysDb { +pub enum SysDb { Grpc(GrpcSysDb), #[allow(dead_code)] Test(TestSysDb), } impl SysDb { - pub(crate) async fn get_collections( + pub async fn get_collections( &mut self, collection_id: Option, name: Option, @@ -46,7 +46,7 @@ impl SysDb { } } - pub(crate) async fn get_segments( + pub async fn get_segments( &mut self, id: Option, r#type: Option, @@ -59,7 +59,7 @@ impl SysDb { } } - pub(crate) async fn get_last_compaction_time( + pub async fn get_last_compaction_time( &mut self, tanant_ids: Vec, ) -> Result, GetLastCompactionTimeError> { @@ -69,7 +69,7 @@ impl SysDb { } } - pub(crate) async fn flush_compaction( + pub async fn flush_compaction( &mut self, tenant_id: String, collection_id: CollectionUuid, @@ -105,7 +105,7 @@ impl SysDb { #[derive(Clone, Debug)] // Since this uses tonic transport channel, cloning is cheap. Each client only supports // one inflight request at a time, so we need to clone the client for each requester. -pub(crate) struct GrpcSysDb { +pub struct GrpcSysDb { #[allow(clippy::type_complexity)] client: SysDbClient< interceptor::InterceptedService< @@ -356,7 +356,7 @@ impl ChromaError for GetSegmentsError { } #[derive(Error, Debug)] -pub(crate) enum GetLastCompactionTimeError { +pub enum GetLastCompactionTimeError { #[error("Failed to fetch")] FailedToGetLastCompactionTime(#[from] tonic::Status), @@ -374,7 +374,7 @@ impl ChromaError for GetLastCompactionTimeError { } #[derive(Error, Debug)] -pub(crate) enum FlushCompactionError { +pub enum FlushCompactionError { #[error("Failed to flush compaction")] FailedToFlushCompaction(#[from] tonic::Status), #[error("Failed to convert segment flush info")] diff --git a/rust/worker/src/sysdb/test_sysdb.rs b/rust/sysdb/src/test_sysdb.rs similarity index 94% rename from rust/worker/src/sysdb/test_sysdb.rs rename to rust/sysdb/src/test_sysdb.rs index 81551ec37a3..0af96b4a4ec 100644 --- a/rust/worker/src/sysdb/test_sysdb.rs +++ b/rust/sysdb/src/test_sysdb.rs @@ -13,7 +13,7 @@ use super::sysdb::GetLastCompactionTimeError; use super::sysdb::GetSegmentsError; #[derive(Clone, Debug)] -pub(crate) struct TestSysDb { +pub struct TestSysDb { inner: Arc>, } @@ -25,8 +25,8 @@ struct Inner { } impl TestSysDb { - #[cfg(test)] - pub(crate) fn new() -> Self { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { TestSysDb { inner: Arc::new(Mutex::new(Inner { collections: HashMap::new(), @@ -36,26 +36,19 @@ impl TestSysDb { } } - #[cfg(test)] - pub(crate) fn add_collection(&mut self, collection: Collection) { + pub fn add_collection(&mut self, collection: Collection) { let mut inner = self.inner.lock(); inner .collections .insert(collection.collection_id, collection); } - #[cfg(test)] - pub(crate) fn add_segment(&mut self, segment: Segment) { + pub fn add_segment(&mut self, segment: Segment) { let mut inner = self.inner.lock(); inner.segments.insert(segment.id, segment); } - #[cfg(test)] - pub(crate) fn add_tenant_last_compaction_time( - &mut self, - tenant: String, - last_compaction_time: i64, - ) { + pub fn add_tenant_last_compaction_time(&mut self, tenant: String, last_compaction_time: i64) { let mut inner = self.inner.lock(); inner .tenant_last_compaction_time diff --git a/rust/sysdb/src/util.rs b/rust/sysdb/src/util.rs new file mode 100644 index 00000000000..6a46fb15df2 --- /dev/null +++ b/rust/sysdb/src/util.rs @@ -0,0 +1,44 @@ +use std::str::FromStr; + +use opentelemetry::trace::TraceContextExt; +use tonic::{metadata::MetadataValue, Request, Status}; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +const TRACE_ID_HEADER_KEY: &str = "chroma-traceid"; +const SPAN_ID_HEADER_KEY: &str = "chroma-spanid"; + +pub(crate) fn client_interceptor(request: Request<()>) -> Result, Status> { + // If span is disabled then nothing to append in the header. + if Span::current().is_disabled() { + return Ok(request); + } + let mut mut_request = request; + let metadata = mut_request.metadata_mut(); + let trace_id = MetadataValue::from_str( + Span::current() + .context() + .span() + .span_context() + .trace_id() + .to_string() + .as_str(), + ); + let span_id = MetadataValue::from_str( + Span::current() + .context() + .span() + .span_context() + .span_id() + .to_string() + .as_str(), + ); + // Errors are not fatal. + if let Ok(id) = trace_id { + metadata.append(TRACE_ID_HEADER_KEY, id); + } + if let Ok(id) = span_id { + metadata.append(SPAN_ID_HEADER_KEY, id); + } + Ok(mut_request) +} diff --git a/rust/system/src/execution/config.rs b/rust/system/src/execution/config.rs index ada5bf1b6ad..be996c01fb1 100644 --- a/rust/system/src/execution/config.rs +++ b/rust/system/src/execution/config.rs @@ -1,8 +1,8 @@ use serde::Deserialize; -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] pub struct DispatcherConfig { - pub(crate) num_worker_threads: usize, - pub(crate) dispatcher_queue_size: usize, - pub(crate) worker_queue_size: usize, + pub num_worker_threads: usize, + pub dispatcher_queue_size: usize, + pub worker_queue_size: usize, } diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index 7419b59915a..3afa0fa53ae 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -57,6 +57,7 @@ chroma-cache = { workspace = true } chroma-index = { workspace = true } chroma-distance = { workspace = true } chroma-system = { workspace = true } +chroma-sysdb = { workspace = true } fastrace = "0.7" fastrace-opentelemetry = "0.8" diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index 50537adccf6..840de1bbeb1 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -7,14 +7,13 @@ use crate::execution::orchestration::CompactOrchestrator; use crate::execution::orchestration::CompactionResponse; use crate::log::log::Log; use crate::memberlist::Memberlist; -use crate::sysdb; -use crate::sysdb::sysdb::SysDb; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_config::Configurable; use chroma_error::{ChromaError, ErrorCodes}; use chroma_index::hnsw_provider::HnswIndexProvider; use chroma_storage::Storage; +use chroma_sysdb::SysDb; use chroma_system::Dispatcher; use chroma_system::Orchestrator; use chroma_system::{Component, ComponentContext, ComponentHandle, Handler, System}; @@ -201,7 +200,7 @@ impl Configurable for CompactionManager { } }; let sysdb_config = &config.sysdb; - let sysdb = match sysdb::from_config(sysdb_config).await { + let sysdb = match chroma_sysdb::from_config(sysdb_config).await { Ok(sysdb) => sysdb, Err(err) => { return Err(err); @@ -340,10 +339,10 @@ mod tests { use crate::assignment::assignment_policy::RendezvousHashingAssignmentPolicy; use crate::log::log::InMemoryLog; use crate::log::log::InternalLogRecord; - use crate::sysdb::test_sysdb::TestSysDb; use chroma_blockstore::arrow::config::TEST_MAX_BLOCK_SIZE_BYTES; use chroma_cache::{new_cache_for_test, new_non_persistent_cache_for_test}; use chroma_storage::local::LocalStorage; + use chroma_sysdb::TestSysDb; use chroma_system::Dispatcher; use chroma_types::SegmentUuid; use chroma_types::{Collection, LogRecord, Operation, OperationRecord, Segment}; diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index 988508694c8..db99f677ac1 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::str::FromStr; +use chroma_sysdb::SysDb; use chroma_types::CollectionUuid; use figment::providers::Env; use figment::Figment; @@ -14,7 +15,6 @@ use crate::log::log::CollectionInfo; use crate::log::log::CollectionRecord; use crate::log::log::Log; use crate::memberlist::Memberlist; -use crate::sysdb::sysdb::SysDb; pub(crate) struct Scheduler { my_ip: String, @@ -248,7 +248,7 @@ mod tests { use crate::compactor::scheduler_policy::LasCompactionTimeSchedulerPolicy; use crate::log::log::InMemoryLog; use crate::log::log::InternalLogRecord; - use crate::sysdb::test_sysdb::TestSysDb; + use chroma_sysdb::TestSysDb; use chroma_types::{Collection, CollectionUuid, LogRecord, Operation, OperationRecord}; #[tokio::test] diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 16fbc4b4af3..b8a9f756c4e 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -1,3 +1,4 @@ +use chroma_sysdb::SysDbConfig; use figment::providers::{Env, Format, Yaml}; use serde::Deserialize; @@ -105,7 +106,7 @@ pub struct QueryServiceConfig { pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, #[allow(dead_code)] pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig, - pub(crate) sysdb: crate::sysdb::config::SysDbConfig, + pub(crate) sysdb: SysDbConfig, pub(crate) storage: chroma_storage::config::StorageConfig, pub(crate) log: crate::log::config::LogConfig, pub dispatcher: chroma_system::DispatcherConfig, @@ -132,7 +133,7 @@ pub struct CompactionServiceConfig { pub(crate) my_port: u16, pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig, - pub(crate) sysdb: crate::sysdb::config::SysDbConfig, + pub(crate) sysdb: SysDbConfig, pub(crate) storage: chroma_storage::config::StorageConfig, pub(crate) log: crate::log::config::LogConfig, pub(crate) dispatcher: chroma_system::DispatcherConfig, diff --git a/rust/worker/src/execution/operators/register.rs b/rust/worker/src/execution/operators/register.rs index e11bf5d7bb3..330c2975b7b 100644 --- a/rust/worker/src/execution/operators/register.rs +++ b/rust/worker/src/execution/operators/register.rs @@ -1,8 +1,8 @@ +use chroma_sysdb::FlushCompactionError; +use chroma_sysdb::SysDb; use chroma_system::Operator; use crate::log::log::Log; use crate::log::log::UpdateCollectionLogOffsetError; -use crate::sysdb::sysdb::FlushCompactionError; -use crate::sysdb::sysdb::SysDb; use async_trait::async_trait; use chroma_error::{ChromaError, ErrorCodes}; use chroma_types::{CollectionUuid, FlushCompactionResponse, SegmentFlushInfo}; @@ -140,7 +140,7 @@ impl Operator for RegisterOperator { mod tests { use super::*; use crate::log::log::InMemoryLog; - use crate::sysdb::test_sysdb::TestSysDb; + use chroma_sysdb::TestSysDb; use chroma_types::{Collection, Segment, SegmentScope, SegmentType, SegmentUuid}; use std::collections::HashMap; use std::str::FromStr; diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index 97c9f67a50c..b2659463e6b 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -34,14 +34,14 @@ use crate::segment::record_segment::RecordSegmentWriter; use crate::segment::ChromaSegmentFlusher; use crate::segment::ChromaSegmentWriter; use crate::segment::MaterializeLogsResult; -use crate::sysdb::sysdb::GetCollectionsError; -use crate::sysdb::sysdb::GetSegmentsError; -use crate::sysdb::sysdb::SysDb; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::ChromaError; use chroma_error::ErrorCodes; use chroma_index::hnsw_provider::HnswIndexProvider; +use chroma_sysdb::GetCollectionsError; +use chroma_sysdb::GetSegmentsError; +use chroma_sysdb::SysDb; use chroma_system::wrap; use chroma_system::ChannelError; use chroma_system::ComponentContext; diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index 90892b0cc7c..effe97a812c 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -2,7 +2,6 @@ mod assignment; mod compactor; mod memberlist; mod server; -mod sysdb; mod tracing; mod utils; diff --git a/rust/worker/src/server.rs b/rust/worker/src/server.rs index 943476350db..828b575cf73 100644 --- a/rust/worker/src/server.rs +++ b/rust/worker/src/server.rs @@ -5,6 +5,7 @@ use chroma_blockstore::provider::BlockfileProvider; use chroma_config::Configurable; use chroma_error::ChromaError; use chroma_index::hnsw_provider::HnswIndexProvider; +use chroma_sysdb::SysDb; use chroma_system::{ComponentHandle, Dispatcher, Orchestrator, System}; use chroma_types::{ chroma_proto::{ @@ -28,7 +29,6 @@ use crate::{ }, }, log::log::Log, - sysdb::sysdb::SysDb, tracing::util::wrap_span_with_parent_context, utils::convert::{from_proto_knn, to_proto_knn_batch_result}, }; @@ -51,7 +51,7 @@ pub struct WorkerServer { impl Configurable for WorkerServer { async fn try_from_config(config: &QueryServiceConfig) -> Result> { let sysdb_config = &config.sysdb; - let sysdb = match crate::sysdb::from_config(sysdb_config).await { + let sysdb = match chroma_sysdb::from_config(sysdb_config).await { Ok(sysdb) => sysdb, Err(err) => { tracing::error!("Failed to create sysdb component: {:?}", err); @@ -399,11 +399,11 @@ mod tests { use super::*; use crate::log::log::InMemoryLog; use crate::segment::test::TestSegment; - use crate::sysdb::test_sysdb::TestSysDb; use chroma_index::test_hnsw_index_provider; #[cfg(debug_assertions)] use chroma_proto::debug_client::DebugClient; use chroma_proto::query_executor_client::QueryExecutorClient; + use chroma_sysdb::TestSysDb; use chroma_system::dispatcher; use chroma_system::system; use uuid::Uuid; diff --git a/rust/worker/src/sysdb/config.rs b/rust/worker/src/sysdb/config.rs deleted file mode 100644 index 68a3684252c..00000000000 --- a/rust/worker/src/sysdb/config.rs +++ /dev/null @@ -1,14 +0,0 @@ -use serde::Deserialize; - -#[derive(Deserialize)] -pub(crate) struct GrpcSysDbConfig { - pub(crate) host: String, - pub(crate) port: u16, - pub(crate) connect_timeout_ms: u64, - pub(crate) request_timeout_ms: u64, -} - -#[derive(Deserialize)] -pub(crate) enum SysDbConfig { - Grpc(GrpcSysDbConfig), -} diff --git a/rust/worker/src/sysdb/mod.rs b/rust/worker/src/sysdb/mod.rs deleted file mode 100644 index 49b6c77c723..00000000000 --- a/rust/worker/src/sysdb/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -pub(crate) mod config; -#[allow(clippy::module_inception)] -pub(crate) mod sysdb; -pub(crate) mod test_sysdb; - -use self::config::SysDbConfig; -use chroma_config::Configurable; -use chroma_error::ChromaError; - -pub(crate) async fn from_config( - config: &SysDbConfig, -) -> Result, Box> { - match &config { - crate::sysdb::config::SysDbConfig::Grpc(_) => Ok(Box::new(sysdb::SysDb::Grpc( - sysdb::GrpcSysDb::try_from_config(config).await?, - ))), - } -}