Skip to content

Commit

Permalink
feat: implement pipeline steering logic (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
nirga authored Nov 12, 2024
1 parent dad45aa commit 2d69d31
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
use crate::{pipelines::pipeline::create_pipeline, state::AppState};
use axum::{extract::Request, routing::get, Router};
use std::collections::HashMap;
use std::sync::Arc;
use tower::steer::Steer;

pub fn create_router(state: Arc<AppState>) -> Router {
let routers = state
.config
.pipelines
.iter()
.map(|pipeline| create_pipeline(pipeline, &state.model_registry))
.collect::<Vec<_>>();
let pipeline_router = Steer::new(routers, |_req: &Request, _services: &[_]| 0);
let mut pipeline_idxs = HashMap::new();
let mut routers = Vec::new();

// Sort pipelines to ensure default is first
let mut sorted_pipelines: Vec<_> = state.config.pipelines.clone();
sorted_pipelines.sort_by_key(|p| p.name != "default"); // "default" will come first since false < true

for pipeline in sorted_pipelines {
let name = pipeline.name.clone();
pipeline_idxs.insert(name, routers.len());
routers.push(create_pipeline(&pipeline, &state.model_registry));
}

let pipeline_router = Steer::new(routers, move |req: &Request, _services: &[_]| {
*req.headers()
.get("x-traceloop-pipeline")
.and_then(|h| h.to_str().ok())
.and_then(|name| pipeline_idxs.get(name))
.unwrap_or(&0)
});

Router::new()
.nest_service("/api/v1", pipeline_router)

0 comments on commit 2d69d31

Please sign in to comment.