-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ENH] Implement compactor server interface #3375
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
syntax = "proto3"; | ||
|
||
package chroma; | ||
|
||
message CollectionIds { | ||
repeated string ids = 1; | ||
} | ||
|
||
message CompactionRequest { | ||
CollectionIds ids = 1; | ||
} | ||
|
||
message CompactionResponse { | ||
// Empty | ||
} | ||
|
||
service Compactor { | ||
rpc Compact(CompactionRequest) returns (CompactionResponse) {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
use super::scheduler::Scheduler; | ||
use super::scheduler_policy::LasCompactionTimeSchedulerPolicy; | ||
use super::OneOffCompactionMessage; | ||
use crate::compactor::types::CompactionJob; | ||
use crate::compactor::types::ScheduleMessage; | ||
use crate::compactor::types::ScheduledCompactionMessage; | ||
use crate::config::CompactionServiceConfig; | ||
use crate::execution::orchestration::CompactOrchestrator; | ||
use crate::execution::orchestration::CompactionResponse; | ||
|
@@ -107,7 +108,7 @@ impl CompactionManager { | |
let dispatcher = match self.dispatcher { | ||
Some(ref dispatcher) => dispatcher.clone(), | ||
None => { | ||
println!("No dispatcher found"); | ||
tracing::error!("No dispatcher found"); | ||
return Err(Box::new(CompactionError::FailedToCompact)); | ||
} | ||
}; | ||
|
@@ -139,7 +140,7 @@ impl CompactionManager { | |
} | ||
} | ||
None => { | ||
println!("No system found"); | ||
tracing::error!("No system found"); | ||
return Err(Box::new(CompactionError::FailedToCompact)); | ||
} | ||
}; | ||
|
@@ -158,19 +159,18 @@ impl CompactionManager { | |
instrumented_span.follows_from(Span::current()); | ||
jobs.push(self.compact(job).instrument(instrumented_span)); | ||
} | ||
println!("Compacting {} jobs", jobs.len()); | ||
tracing::info!("Compacting {} jobs", jobs.len()); | ||
let mut num_completed_jobs = 0; | ||
let mut num_failed_jobs = 0; | ||
while let Some(job) = jobs.next().await { | ||
match job { | ||
Ok(result) => { | ||
println!("Compaction completed: {:?}", result); | ||
tracing::info!("Compaction completed: {:?}", result); | ||
compacted.push(result.compaction_job.collection_id); | ||
num_completed_jobs += 1; | ||
} | ||
Err(e) => { | ||
println!("Compaction failed: {:?}", e); | ||
tracing::info!("Compaction failed {}", e); | ||
num_failed_jobs += 1; | ||
} | ||
} | ||
|
@@ -285,11 +285,13 @@ impl Component for CompactionManager { | |
} | ||
|
||
async fn start(&mut self, ctx: &ComponentContext<Self>) -> () { | ||
println!("Starting CompactionManager"); | ||
ctx.scheduler | ||
.schedule(ScheduleMessage {}, self.compaction_interval, ctx, || { | ||
Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction")) | ||
}); | ||
tracing::info!("Starting CompactionManager"); | ||
ctx.scheduler.schedule( | ||
ScheduledCompactionMessage {}, | ||
self.compaction_interval, | ||
ctx, | ||
|| Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction")), | ||
); | ||
} | ||
} | ||
|
||
|
@@ -301,25 +303,40 @@ impl Debug for CompactionManager { | |
|
||
// ============== Handlers ============== | ||
#[async_trait] | ||
impl Handler<ScheduleMessage> for CompactionManager { | ||
impl Handler<ScheduledCompactionMessage> for CompactionManager { | ||
type Result = (); | ||
|
||
async fn handle( | ||
&mut self, | ||
_message: ScheduleMessage, | ||
_message: ScheduledCompactionMessage, | ||
ctx: &ComponentContext<CompactionManager>, | ||
) { | ||
println!("CompactionManager: Performing compaction"); | ||
tracing::info!("CompactionManager: Performing compaction"); | ||
let mut ids = Vec::new(); | ||
self.compact_batch(&mut ids).await; | ||
|
||
self.hnsw_index_provider.purge_by_id(&ids).await; | ||
|
||
// Compaction is done, schedule the next compaction | ||
ctx.scheduler | ||
.schedule(ScheduleMessage {}, self.compaction_interval, ctx, || { | ||
Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction")) | ||
}); | ||
ctx.scheduler.schedule( | ||
ScheduledCompactionMessage {}, | ||
self.compaction_interval, | ||
ctx, | ||
|| Some(span!(parent: None, tracing::Level::INFO, "Scheduled compaction")), | ||
); | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Handler<OneOffCompactionMessage> for CompactionManager { | ||
type Result = (); | ||
async fn handle( | ||
&mut self, | ||
_message: OneOffCompactionMessage, | ||
_ctx: &ComponentContext<CompactionManager>, | ||
) { | ||
tracing::info!("CompactionManager: Performing compaction"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that this should not synchronously perform compaction but instead queue the id for compaction in the next scheduled run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Will queue the one-off collection and compact during scheduled compaction |
||
todo!("To be implemented in next PR in the stack"); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
use async_trait::async_trait; | ||
use chroma_system::ComponentHandle; | ||
use chroma_types::chroma_proto::{ | ||
compactor_server::{Compactor, CompactorServer}, | ||
CompactionRequest, CompactionResponse, | ||
}; | ||
use tokio::signal::unix::{signal, SignalKind}; | ||
use tonic::{transport::Server, Request, Response, Status}; | ||
use tracing::trace_span; | ||
|
||
use crate::compactor::OneOffCompactionMessage; | ||
|
||
use super::CompactionManager; | ||
|
||
pub struct CompactionServer { | ||
pub manager: ComponentHandle<CompactionManager>, | ||
pub port: u16, | ||
} | ||
|
||
impl CompactionServer { | ||
pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> { | ||
let addr = format!("[::]:{}", self.port).parse().unwrap(); | ||
tracing::info!("Compaction server listing at {addr}"); | ||
let server = Server::builder().add_service(CompactorServer::new(self)); | ||
server | ||
.serve_with_shutdown(addr, async { | ||
match signal(SignalKind::terminate()) { | ||
Ok(mut sigterm) => { | ||
sigterm.recv().await; | ||
tracing::info!("Received SIGTERM, shutting down") | ||
} | ||
Err(err) => { | ||
tracing::error!("Failed to create SIGTERM handler: {err}") | ||
} | ||
} | ||
}) | ||
.await?; | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Compactor for CompactionServer { | ||
async fn compact( | ||
&self, | ||
request: Request<CompactionRequest>, | ||
) -> Result<Response<CompactionResponse>, Status> { | ||
let compaction_span = trace_span!("CompactionRequest", request = ?request); | ||
self.manager | ||
.receiver() | ||
.send( | ||
OneOffCompactionMessage::try_from(request.into_inner()) | ||
.map_err(|e| Status::invalid_argument(e.to_string()))?, | ||
Some(compaction_span), | ||
) | ||
.await | ||
.map_err(|e| Status::internal(e.to_string()))?; | ||
Ok(Response::new(CompactionResponse {})) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,3 +6,5 @@ mod types; | |
|
||
pub(crate) use compaction_manager::*; | ||
pub(crate) use types::*; | ||
|
||
pub mod compaction_server; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe as future work it would be useful to add an endpoint to see the status for the currently compacting and last N compactions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I.e
/compaction/status
returns