Skip to content

Commit

Permalink
[ENH] Move sysdb client into its own crate + background poller for GC (
Browse files Browse the repository at this point in the history
…#3492)

## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- GC needs to reuse sysdb client. Handy to have its own separate crate
for this.
   - Sets up the GC component using our system crate
   - Starts the dispatcher and the worker threads

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
sanketkedia authored Jan 15, 2025
1 parent babded1 commit d7b9a49
Show file tree
Hide file tree
Showing 24 changed files with 350 additions and 102 deletions.
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]
resolver = "2"

members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/system", "rust/types", "rust/worker"]
members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/system", "rust/sysdb", "rust/types", "rust/worker"]

[workspace.dependencies]
arrow = "52.2.0"
Expand Down Expand Up @@ -44,6 +44,7 @@ chroma-types = { path = "rust/types" }
chroma-index = { path = "rust/index" }
chroma-distance = { path = "rust/distance" }
chroma-system = { path = "rust/system" }
chroma-sysdb = { path = "rust/sysdb" }
worker = { path = "rust/worker" }

# Dev dependencies
Expand Down
6 changes: 6 additions & 0 deletions rust/garbage_collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true }
opentelemetry-http = { workspace = true }
opentelemetry_sdk = { workspace = true }

chroma-config = { workspace = true }
chroma-error = { workspace = true }
chroma-system = { workspace = true }
chroma-types = { workspace = true }
chroma-sysdb = { workspace = true }
8 changes: 6 additions & 2 deletions rust/garbage_collector/garbage_collector_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ otel_endpoint: "http://otel-collector:4317"
cutoff_time_hours: 12 # GC all versions created at time < now() - cutoff_time_hours
max_collections_to_gc: 1000 # Maximum number of collections to GC in one run
gc_interval_mins: 120 # Run GC every x mins
disallow_collection_names: []
sysdb_connection:
disallow_collections: [] # collection ids to disable GC on
sysdb_config:
host: "sysdb.chroma"
port: 50051
connect_timeout_ms: 60000
request_timeout_ms: 60000
dispatcher_config:
num_worker_threads: 4
dispatcher_queue_size: 100
worker_queue_size: 100
35 changes: 15 additions & 20 deletions rust/garbage_collector/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chroma_system::DispatcherConfig;
use figment::providers::{Env, Format, Yaml};

const DEFAULT_CONFIG_PATH: &str = "./garbage_collector_config.yaml";
Expand All @@ -8,21 +9,12 @@ const DEFAULT_CONFIG_PATH: &str = "./garbage_collector_config.yaml";
pub(super) struct GarbageCollectorConfig {
pub(super) service_name: String,
pub(super) otel_endpoint: String,
cutoff_time_hours: u32,
max_collections_to_gc: u32,
gc_interval_mins: u32,
disallow_collection_names: Vec<String>,
sysdb_connection: SysdbConnectionConfig,
}

#[allow(dead_code)]
#[derive(Debug, serde::Deserialize)]
// TODO(Sanket): Remove this dead code annotation.
pub(super) struct SysdbConnectionConfig {
host: String,
port: u32,
connect_timeout_ms: u32,
request_timeout_ms: u32,
pub(super) cutoff_time_hours: u32,
pub(super) max_collections_to_gc: u32,
pub(super) gc_interval_mins: u32,
pub(super) disallow_collections: Vec<String>,
pub(super) sysdb_config: chroma_sysdb::GrpcSysDbConfig,
pub(super) dispatcher_config: DispatcherConfig,
}

impl GarbageCollectorConfig {
Expand Down Expand Up @@ -60,10 +52,13 @@ mod tests {
assert_eq!(config.max_collections_to_gc, 1000);
assert_eq!(config.gc_interval_mins, 120);
let empty_vec: Vec<String> = vec![];
assert_eq!(config.disallow_collection_names, empty_vec);
assert_eq!(config.sysdb_connection.host, "sysdb.chroma");
assert_eq!(config.sysdb_connection.port, 50051);
assert_eq!(config.sysdb_connection.connect_timeout_ms, 60000);
assert_eq!(config.sysdb_connection.request_timeout_ms, 60000);
assert_eq!(config.disallow_collections, empty_vec);
assert_eq!(config.sysdb_config.host, "sysdb.chroma");
assert_eq!(config.sysdb_config.port, 50051);
assert_eq!(config.sysdb_config.connect_timeout_ms, 60000);
assert_eq!(config.sysdb_config.request_timeout_ms, 60000);
assert_eq!(config.dispatcher_config.num_worker_threads, 4);
assert_eq!(config.dispatcher_config.dispatcher_queue_size, 100);
assert_eq!(config.dispatcher_config.worker_queue_size, 100);
}
}
120 changes: 120 additions & 0 deletions rust/garbage_collector/src/garbage_collector_component.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::{collections::HashSet, str::FromStr, time::Duration};

use async_trait::async_trait;
use chroma_config::Configurable;
use chroma_error::ChromaError;
use chroma_sysdb::{SysDb, SysDbConfig};
use chroma_system::{Component, ComponentContext, ComponentHandle, Dispatcher, Handler};
use chroma_types::CollectionUuid;
use tracing::span;
use uuid::Uuid;

use crate::config::GarbageCollectorConfig;

#[derive(Debug)]
#[allow(dead_code)]
pub(crate) struct GarbageCollector {
gc_interval_mins: u64,
cutoff_time_hours: u32,
max_collections_to_gc: u32,
disabled_collections: HashSet<CollectionUuid>,
sysdb_client: Box<SysDb>,
dispatcher: Option<ComponentHandle<Dispatcher>>,
system: Option<chroma_system::System>,
}

impl GarbageCollector {
pub fn new(
gc_interval_mins: u64,
cutoff_time_hours: u32,
max_collections_to_gc: u32,
disabled_collections: HashSet<CollectionUuid>,
sysdb_client: Box<SysDb>,
) -> Self {
Self {
gc_interval_mins,
cutoff_time_hours,
max_collections_to_gc,
disabled_collections,
sysdb_client,
dispatcher: None,
system: None,
}
}

pub(crate) fn set_dispatcher(&mut self, dispatcher: ComponentHandle<Dispatcher>) {
self.dispatcher = Some(dispatcher);
}

pub(crate) fn set_system(&mut self, system: chroma_system::System) {
self.system = Some(system);
}
}

#[async_trait]
impl Component for GarbageCollector {
fn get_name() -> &'static str {
"GarbageCollector"
}

fn queue_size(&self) -> usize {
1000
}

async fn start(&mut self, ctx: &ComponentContext<Self>) {
ctx.scheduler.schedule(
GarbageCollectMessage {},
Duration::from_secs(self.gc_interval_mins * 60),
ctx,
|| Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction")),
);
}
}

#[derive(Debug)]
struct GarbageCollectMessage {}

#[async_trait]
impl Handler<GarbageCollectMessage> for GarbageCollector {
type Result = ();

async fn handle(
&mut self,
_message: GarbageCollectMessage,
_ctx: &ComponentContext<Self>,
) -> Self::Result {
// TODO(Sanket): Implement the garbage collection logic.
todo!()
}
}

#[async_trait]
impl Configurable<GarbageCollectorConfig> for GarbageCollector {
async fn try_from_config(
config: &GarbageCollectorConfig,
) -> Result<Self, Box<dyn ChromaError>> {
let sysdb_config = SysDbConfig::Grpc(config.sysdb_config.clone());
let sysdb_client = chroma_sysdb::from_config(&sysdb_config).await?;

let mut disabled_collections = HashSet::new();
for collection_id_str in config.disallow_collections.iter() {
let collection_uuid = match Uuid::from_str(collection_id_str) {
Ok(uuid) => uuid,
Err(e) => {
// TODO(Sanket): Return a proper error here.
panic!("Invalid collection id: {}", e);
}
};
let collection_id = CollectionUuid(collection_uuid);
disabled_collections.insert(collection_id);
}

Ok(GarbageCollector::new(
config.gc_interval_mins as u64,
config.cutoff_time_hours,
config.max_collections_to_gc,
disabled_collections,
sysdb_client,
))
}
}
31 changes: 28 additions & 3 deletions rust/garbage_collector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
use chroma_config::Configurable;
use chroma_system::{Dispatcher, System};
use config::GarbageCollectorConfig;
use garbage_collector_component::GarbageCollector;
use opentelemetry_config::init_otel_tracing;

mod config;
mod garbage_collector_component;
mod opentelemetry_config;

const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH";

pub async fn garbage_collector_service_entrypoint() {
// Parse configuration. Configuration includes sysdb connection details, and
// otel details.
let config = GarbageCollectorConfig::load();
// gc run details amongst others.
let config = match std::env::var(CONFIG_PATH_ENV_VAR) {
Ok(config_path) => GarbageCollectorConfig::load_from_path(&config_path),
Err(_) => GarbageCollectorConfig::load(),
};
// Enable OTEL tracing.
init_otel_tracing(&config.service_name, &config.otel_endpoint);

// Setup the dispatcher and the pool of workers.
let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config)
.await
.expect("Failed to create dispatcher from config");

let system = System::new();
let dispatcher_handle = system.start_component(dispatcher);

// Start a background task to periodically check for garbage.
todo!()
// Garbage collector is a component that gets notified every
// gc_interval_mins to check for garbage.
let mut garbage_collector_component = GarbageCollector::try_from_config(&config)
.await
.expect("Failed to create garbage collector component");
garbage_collector_component.set_dispatcher(dispatcher_handle);
garbage_collector_component.set_system(system.clone());

let _ = system.start_component(garbage_collector_component);
}
28 changes: 28 additions & 0 deletions rust/sysdb/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "chroma-sysdb"
version = "0.1.0"
edition = "2021"

[lib]
path = "src/lib.rs"

[dependencies]
serde = { workspace = true }
futures = { workspace = true }
thiserror = { workspace = true }
async-trait = { workspace = true }
tracing-bunyan-formatter = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true }
opentelemetry_sdk = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
parking_lot = { workspace = true }

chroma-config = { workspace = true }
chroma-error = { workspace = true }
chroma-types = { workspace = true }
26 changes: 26 additions & 0 deletions rust/sysdb/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use chroma_config::Configurable;
use chroma_error::ChromaError;
use serde::Deserialize;

use crate::{GrpcSysDb, SysDb};

#[derive(Deserialize, Debug, Clone)]
pub struct GrpcSysDbConfig {
pub host: String,
pub port: u16,
pub connect_timeout_ms: u64,
pub request_timeout_ms: u64,
}

#[derive(Deserialize, Debug)]
pub enum SysDbConfig {
Grpc(GrpcSysDbConfig),
}

pub async fn from_config(config: &SysDbConfig) -> Result<Box<SysDb>, Box<dyn ChromaError>> {
match &config {
SysDbConfig::Grpc(_) => Ok(Box::new(SysDb::Grpc(
GrpcSysDb::try_from_config(config).await?,
))),
}
}
8 changes: 8 additions & 0 deletions rust/sysdb/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub mod config;
#[allow(clippy::module_inception)]
pub mod sysdb;
pub mod test_sysdb;
mod util;
pub use config::*;
pub use sysdb::*;
pub use test_sysdb::*;
Loading

0 comments on commit d7b9a49

Please sign in to comment.