Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Implement compactor server interface #3375

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions idl/chromadb/proto/compactor.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe as future work it would be useful to add an endpoint to see the status for the currently compacting and last N compactions

Copy link
Collaborator

@HammadB HammadB Dec 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I.e

/compaction/status

returns

id: <QUEUED | RUNNING | FAILED | SUCCEEDED> + Time


package chroma;

message CollectionIds {
repeated string ids = 1;
}

message CompactionRequest {
optional CollectionIds ids = 1;
}

message CompactionResponse {
// Empty
}

service Compactor {
rpc Compact(CompactionRequest) returns (CompactionResponse) {}
}
1 change: 1 addition & 0 deletions rust/types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Compile the protobuf files in the chromadb proto directory.
let mut proto_paths = vec![
"../../idl/chromadb/proto/chroma.proto",
"../../idl/chromadb/proto/compactor.proto",
"../../idl/chromadb/proto/coordinator.proto",
"../../idl/chromadb/proto/logservice.proto",
"../../idl/chromadb/proto/query_executor.proto",
Expand Down
55 changes: 37 additions & 18 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::scheduler::Scheduler;
use super::scheduler_policy::LasCompactionTimeSchedulerPolicy;
use super::OneOffCompactionMessage;
use crate::compactor::types::CompactionJob;
use crate::compactor::types::ScheduleMessage;
use crate::compactor::types::ScheduledCompactionMessage;
use crate::config::CompactionServiceConfig;
use crate::execution::dispatcher::Dispatcher;
use crate::execution::orchestration::orchestrator::Orchestrator;
Expand Down Expand Up @@ -107,7 +108,7 @@ impl CompactionManager {
let dispatcher = match self.dispatcher {
Some(ref dispatcher) => dispatcher.clone(),
None => {
println!("No dispatcher found");
tracing::error!("No dispatcher found");
return Err(Box::new(CompactionError::FailedToCompact));
}
};
Expand Down Expand Up @@ -141,7 +142,7 @@ impl CompactionManager {
}
}
None => {
println!("No system found");
tracing::error!("No system found");
return Err(Box::new(CompactionError::FailedToCompact));
}
};
Expand All @@ -160,19 +161,18 @@ impl CompactionManager {
instrumented_span.follows_from(Span::current());
jobs.push(self.compact(job).instrument(instrumented_span));
}
println!("Compacting {} jobs", jobs.len());
tracing::info!("Compacting {} jobs", jobs.len());
let mut num_completed_jobs = 0;
let mut num_failed_jobs = 0;
while let Some(job) = jobs.next().await {
match job {
Ok(result) => {
println!("Compaction completed: {:?}", result);
tracing::info!("Compaction completed: {:?}", result);
compacted.push(result.compaction_job.collection_id);
num_completed_jobs += 1;
}
Err(e) => {
println!("Compaction failed: {:?}", e);
tracing::info!("Compaction failed {}", e);
num_failed_jobs += 1;
}
}
Expand Down Expand Up @@ -281,11 +281,13 @@ impl Component for CompactionManager {
}

async fn start(&mut self, ctx: &crate::system::ComponentContext<Self>) -> () {
println!("Starting CompactionManager");
ctx.scheduler
.schedule(ScheduleMessage {}, self.compaction_interval, ctx, || {
Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction"))
});
tracing::info!("Starting CompactionManager");
ctx.scheduler.schedule(
ScheduledCompactionMessage {},
self.compaction_interval,
ctx,
|| Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction")),
);
}
}

Expand All @@ -297,25 +299,42 @@ impl Debug for CompactionManager {

// ============== Handlers ==============
#[async_trait]
impl Handler<ScheduleMessage> for CompactionManager {
impl Handler<ScheduledCompactionMessage> for CompactionManager {
type Result = ();

async fn handle(
&mut self,
_message: ScheduleMessage,
_message: ScheduledCompactionMessage,
ctx: &ComponentContext<CompactionManager>,
) {
println!("CompactionManager: Performing compaction");
tracing::info!("CompactionManager: Performing compaction");
let mut ids = Vec::new();
self.compact_batch(&mut ids).await;

self.hnsw_index_provider.purge_by_id(&ids).await;

// Compaction is done, schedule the next compaction
ctx.scheduler
.schedule(ScheduleMessage {}, self.compaction_interval, ctx, || {
Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction"))
});
ctx.scheduler.schedule(
ScheduledCompactionMessage {},
self.compaction_interval,
ctx,
|| Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction")),
);
}
}

#[async_trait]
impl Handler<OneOffCompactionMessage> for CompactionManager {
type Result = ();
async fn handle(
&mut self,
_message: OneOffCompactionMessage,
_ctx: &ComponentContext<CompactionManager>,
) {
tracing::info!("CompactionManager: Performing compaction");
let mut ids = Vec::new();
self.compact_batch(&mut ids).await;
self.hnsw_index_provider.purge_by_id(&ids).await;
}
}

Expand Down
59 changes: 59 additions & 0 deletions rust/worker/src/compactor/compaction_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use async_trait::async_trait;
use chroma_types::chroma_proto::{
compactor_server::{Compactor, CompactorServer},
CompactionRequest, CompactionResponse,
};
use tokio::signal::unix::{signal, SignalKind};
use tonic::{transport::Server, Request, Response, Status};
use tracing::trace_span;

use crate::{compactor::OneOffCompactionMessage, system::ComponentHandle};

use super::CompactionManager;

pub struct CompactionServer {
pub manager: ComponentHandle<CompactionManager>,
pub port: u16,
}

impl CompactionServer {
pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("[::]:{}", self.port).parse().unwrap();
tracing::info!("Compaction server listing at {addr}");
let server = Server::builder().add_service(CompactorServer::new(self));
server
.serve_with_shutdown(addr, async {
match signal(SignalKind::terminate()) {
Ok(mut sigterm) => {
sigterm.recv().await;
tracing::info!("Received SIGTERM, shutting down")
}
Err(err) => {
tracing::error!("Failed to create SIGTERM handler: {err}")
}
}
})
.await?;
Ok(())
}
}

#[async_trait]
impl Compactor for CompactionServer {
async fn compact(
&self,
request: Request<CompactionRequest>,
) -> Result<Response<CompactionResponse>, Status> {
let compaction_span = trace_span!("CompactionRequest", request = ?request);
self.manager
.receiver()
.send(
OneOffCompactionMessage::try_from(request.into_inner())
.map_err(|e| Status::invalid_argument(e.to_string()))?,
Some(compaction_span),
)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(CompactionResponse {}))
}
}
2 changes: 2 additions & 0 deletions rust/worker/src/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ mod types;

pub(crate) use compaction_manager::*;
pub(crate) use types::*;

pub mod compaction_server;
8 changes: 7 additions & 1 deletion rust/worker/src/compactor/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,10 @@ pub(crate) struct CompactionJob {
}

#[derive(Clone, Debug)]
pub(crate) struct ScheduleMessage {}
pub struct ScheduledCompactionMessage {}

#[derive(Clone, Debug)]
pub struct OneOffCompactionMessage {
#[allow(dead_code)]
pub collection_ids: Option<Vec<CollectionUuid>>,
}
11 changes: 11 additions & 0 deletions rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod tracing;
mod utils;

use chroma_config::Configurable;
use compactor::compaction_server::CompactionServer;
use memberlist::MemberlistProvider;

use tokio::select;
Expand Down Expand Up @@ -135,6 +136,15 @@ pub async fn compaction_service_entrypoint() {

let mut memberlist_handle = system.start_component(memberlist);

let compaction_server = CompactionServer {
manager: compaction_manager_handle.clone(),
port: config.my_port,
};

let server_join_handle = tokio::spawn(async move {
let _ = CompactionServer::run(compaction_server).await;
});

let mut sigterm = match signal(SignalKind::terminate()) {
Ok(sigterm) => sigterm,
Err(e) => {
Expand All @@ -155,6 +165,7 @@ pub async fn compaction_service_entrypoint() {
let _ = compaction_manager_handle.join().await;
system.stop().await;
system.join().await;
let _ = server_join_handle.await;
},
};
println!("Server stopped");
Expand Down
42 changes: 33 additions & 9 deletions rust/worker/src/utils/convert.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::str::FromStr;

use chroma_types::{
chroma_proto::{self, GetResult, KnnBatchResult, KnnResult},
ConversionError, ScalarEncoding, Where,
CollectionUuid, ConversionError, ScalarEncoding, Where,
};

use crate::execution::operators::{
filter::FilterOperator,
knn::KnnOperator,
knn_projection::{KnnProjectionOperator, KnnProjectionOutput, KnnProjectionRecord},
limit::LimitOperator,
projection::{ProjectionOperator, ProjectionOutput, ProjectionRecord},
use crate::{
compactor::OneOffCompactionMessage,
execution::operators::{
filter::FilterOperator,
knn::KnnOperator,
knn_projection::{KnnProjectionOperator, KnnProjectionOutput, KnnProjectionRecord},
limit::LimitOperator,
projection::{ProjectionOperator, ProjectionOutput, ProjectionRecord},
},
};

impl TryFrom<chroma_proto::FilterOperator> for FilterOperator {
Expand Down Expand Up @@ -110,7 +115,7 @@ impl TryFrom<KnnProjectionRecord> for chroma_proto::KnnProjectionRecord {
type Error = ConversionError;

fn try_from(value: KnnProjectionRecord) -> Result<Self, ConversionError> {
Ok(chroma_proto::KnnProjectionRecord {
Ok(Self {
record: Some(value.record.try_into()?),
distance: value.distance,
})
Expand All @@ -121,7 +126,7 @@ impl TryFrom<KnnProjectionOutput> for KnnResult {
type Error = ConversionError;

fn try_from(value: KnnProjectionOutput) -> Result<Self, ConversionError> {
Ok(KnnResult {
Ok(Self {
records: value
.records
.into_iter()
Expand Down Expand Up @@ -154,3 +159,22 @@ pub fn to_proto_knn_batch_result(
.collect::<Result<_, _>>()?,
})
}

impl TryFrom<chroma_proto::CompactionRequest> for OneOffCompactionMessage {
type Error = ConversionError;

fn try_from(value: chroma_proto::CompactionRequest) -> Result<Self, ConversionError> {
Ok(Self {
collection_ids: value
.ids
.map(|ids| {
ids.ids
.into_iter()
.map(|id| CollectionUuid::from_str(&id))
.collect::<Result<_, _>>()
})
.transpose()
.map_err(|_| ConversionError::DecodeError)?,
})
}
}
Loading