Skip to content

Commit

Permalink
feat: completed the video processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Gmin2 committed Dec 26, 2024
1 parent f9af393 commit c688574
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 1 deletion.
9 changes: 9 additions & 0 deletions src/ai_models/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::models::chat::{ChatCompletionRequest, ChatCompletionResponse};
use crate::models::completion::{CompletionRequest, CompletionResponse};
use crate::models::embeddings::{EmbeddingsRequest, EmbeddingsResponse};
use crate::models::image::{ImageRequest, ImageResponse};
use crate::models::video::{VideoRequest, VideoResponse};
use crate::providers::provider::Provider;
use axum::http::StatusCode;
use std::sync::Arc;
Expand Down Expand Up @@ -52,4 +53,12 @@ impl ModelInstance {
payload.model = self.model_type.clone();
self.provider.process_image(payload, &self.config).await
}

pub async fn process_video(
&self,
mut payload: VideoRequest,
) -> Result<VideoResponse, StatusCode> {
payload.model = self.model_type.clone();
self.provider.process_video(payload, &self.config).await
}
}
1 change: 1 addition & 0 deletions src/config/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub enum PipelineType {
Embeddings,
Audio,
Image,
Video,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
Expand Down
1 change: 1 addition & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pub mod tool_calls;
pub mod tool_choice;
pub mod tool_definition;
pub mod usage;
pub mod video;
25 changes: 25 additions & 0 deletions src/models/video.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use super::usage::Usage;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Clone)]
pub struct VideoRequest {
pub model: String,
pub file_uri: String,
pub mime_type: String,
pub instruction: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub temperature: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_tokens: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub process_audio: Option<bool>,
}

#[derive(Deserialize, Serialize, Clone)]
pub struct VideoResponse {
pub id: String,
pub model: String,
pub created: i64,
pub content: String,
pub usage: Usage,
}
38 changes: 38 additions & 0 deletions src/pipelines/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::models::embeddings::{EmbeddingsInput, EmbeddingsRequest, EmbeddingsRe
use crate::models::image::{ImageRequest, ImageResponse};
use crate::models::streaming::ChatCompletionChunk;
use crate::models::usage::Usage;
use crate::models::video::{VideoRequest, VideoResponse};
use opentelemetry::global::{BoxedSpan, ObjectSafeSpan};
use opentelemetry::trace::{SpanKind, Status, Tracer};
use opentelemetry::{global, KeyValue};
Expand Down Expand Up @@ -380,3 +381,40 @@ impl RecordSpan for ImageResponse {
self.usage.record_span(span);
}
}

impl RecordSpan for VideoRequest {
fn record_span(&self, span: &mut BoxedSpan) {
span.set_attribute(KeyValue::new("llm.request.type", "video"));
span.set_attribute(KeyValue::new(GEN_AI_REQUEST_MODEL, self.model.clone()));

span.set_attribute(KeyValue::new("video.file_uri", self.file_uri.clone()));
span.set_attribute(KeyValue::new("video.mime_type", self.mime_type.clone()));

span.set_attribute(KeyValue::new("gen_ai.prompt", self.instruction.clone()));

if let Some(temp) = self.temperature {
span.set_attribute(KeyValue::new(GEN_AI_REQUEST_TEMPERATURE, temp as f64));
}
if let Some(max_tokens) = self.max_tokens {
span.set_attribute(KeyValue::new(
"gen_ai.request.max_tokens",
max_tokens as i64,
));
}
if let Some(process_audio) = self.process_audio {
span.set_attribute(KeyValue::new("video.process_audio", process_audio));
}
}
}

impl RecordSpan for VideoResponse {
fn record_span(&self, span: &mut BoxedSpan) {
span.set_attribute(KeyValue::new(GEN_AI_RESPONSE_MODEL, self.model.clone()));
span.set_attribute(KeyValue::new(GEN_AI_RESPONSE_ID, self.id.clone()));
span.set_attribute(KeyValue::new(
"gen_ai.completion.content",
self.content.clone(),
));
self.usage.record_span(span);
}
}
26 changes: 26 additions & 0 deletions src/pipelines/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::models::completion::CompletionRequest;
use crate::models::embeddings::EmbeddingsRequest;
use crate::models::image::ImageRequest;
use crate::models::streaming::ChatCompletionChunk;
use crate::models::video::VideoRequest;
use crate::pipelines::otel::OtelTracer;
use crate::{
ai_models::registry::ModelRegistry,
Expand Down Expand Up @@ -50,6 +51,10 @@ pub fn create_pipeline(pipeline: &Pipeline, model_registry: &ModelRegistry) -> R
"/image",
post(move |state, payload| process_image(state, payload, models)),
),
PipelineType::Video => router.route(
"/video",
post(move |state, payload| process_video(state, payload, models)),
),
},
_ => router,
};
Expand Down Expand Up @@ -205,3 +210,24 @@ pub async fn process_image(
tracer.log_error("No matching model found".to_string());
Err(StatusCode::NOT_FOUND)
}

pub async fn process_video(
State(model_registry): State<Arc<ModelRegistry>>,
Json(payload): Json<VideoRequest>,
model_keys: Vec<String>,
) -> Result<impl IntoResponse, StatusCode> {
let mut tracer = OtelTracer::start("video", &payload);

for model_key in model_keys {
let model = model_registry.get(&model_key).unwrap();

if payload.model == model.model_type {
let response = model.process_video(payload.clone()).await?;
tracer.log_success(&response);
return Ok(Json(response));
}
}

tracer.log_error("No matching model found".to_string());
Err(StatusCode::NOT_FOUND)
}
9 changes: 9 additions & 0 deletions src/providers/anthropic/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::models::chat::{ChatCompletionRequest, ChatCompletionResponse};
use crate::models::completion::{CompletionRequest, CompletionResponse};
use crate::models::embeddings::{EmbeddingsRequest, EmbeddingsResponse};
use crate::models::image::{ImageRequest, ImageResponse};
use crate::models::video::{VideoRequest, VideoResponse};
use crate::providers::provider::Provider;

pub struct AnthropicProvider {
Expand Down Expand Up @@ -105,4 +106,12 @@ impl Provider for AnthropicProvider {
) -> Result<ImageResponse, StatusCode> {
unimplemented!()
}

async fn process_video(
&self,
_payload: VideoRequest,
_model_config: &ModelConfig,
) -> Result<VideoResponse, StatusCode> {
unimplemented!()
}
}
9 changes: 9 additions & 0 deletions src/providers/azure/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::models::completion::{CompletionRequest, CompletionResponse};
use crate::models::embeddings::{EmbeddingsRequest, EmbeddingsResponse};
use crate::models::image::{ImageRequest, ImageResponse};
use crate::models::streaming::ChatCompletionChunk;
use crate::models::video::{VideoRequest, VideoResponse};
use crate::providers::provider::Provider;
use reqwest::Client;
pub struct AzureProvider {
Expand Down Expand Up @@ -195,4 +196,12 @@ impl Provider for AzureProvider {
) -> Result<ImageResponse, StatusCode> {
unimplemented!()
}

async fn process_video(
&self,
_payload: VideoRequest,
_model_config: &ModelConfig,
) -> Result<VideoResponse, StatusCode> {
unimplemented!()
}
}
9 changes: 9 additions & 0 deletions src/providers/openai/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::models::completion::{CompletionRequest, CompletionResponse};
use crate::models::embeddings::{EmbeddingsRequest, EmbeddingsResponse};
use crate::models::image::{ImageRequest, ImageResponse};
use crate::models::streaming::ChatCompletionChunk;
use crate::models::video::{VideoRequest, VideoResponse};
use crate::providers::provider::Provider;
use axum::async_trait;
use axum::http::StatusCode;
Expand Down Expand Up @@ -155,4 +156,12 @@ impl Provider for OpenAIProvider {
) -> Result<ImageResponse, StatusCode> {
unimplemented!()
}

async fn process_video(
&self,
_payload: VideoRequest,
_model_config: &ModelConfig,
) -> Result<VideoResponse, StatusCode> {
unimplemented!()
}
}
7 changes: 7 additions & 0 deletions src/providers/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::models::chat::{ChatCompletionRequest, ChatCompletionResponse};
use crate::models::completion::{CompletionRequest, CompletionResponse};
use crate::models::embeddings::{EmbeddingsRequest, EmbeddingsResponse};
use crate::models::image::{ImageRequest, ImageResponse};
use crate::models::video::{VideoRequest, VideoResponse};

#[async_trait]
pub trait Provider: Send + Sync {
Expand Down Expand Up @@ -45,4 +46,10 @@ pub trait Provider: Send + Sync {
payload: ImageRequest,
model_config: &ModelConfig,
) -> Result<ImageResponse, StatusCode>;

async fn process_video(
&self,
payload: VideoRequest,
model_config: &ModelConfig,
) -> Result<VideoResponse, StatusCode>;
}
73 changes: 73 additions & 0 deletions src/providers/vertexai/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ pub(crate) struct VertexAIImageRequest {
pub generation_config: Option<GenerationConfig>,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub(crate) struct VertexAIVideoRequest {
#[serde(rename = "contents")]
pub contents: Vec<Content>,
#[serde(rename = "generation_config")]
pub generation_config: Option<GenerationConfig>,
}

impl From<crate::models::chat::ChatCompletionRequest> for VertexAIChatCompletionRequest {
fn from(request: crate::models::chat::ChatCompletionRequest) -> Self {
let contents = request
Expand Down Expand Up @@ -603,3 +611,68 @@ impl From<VertexAIChatCompletionResponse> for crate::models::image::ImageRespons
}
}
}

impl From<crate::models::video::VideoRequest> for VertexAIVideoRequest {
fn from(request: crate::models::video::VideoRequest) -> Self {
VertexAIVideoRequest {
contents: vec![Content {
role: "user".to_string(),
parts: vec![
// First part contains the video file
Part {
content: PartContent::File {
file_data: FileData {
file_uri: request.file_uri,
mime_type: request.mime_type,
},
},
},
// Second part contains the instruction
Part {
content: PartContent::Text {
text: request.instruction,
},
},
],
}],
generation_config: Some(GenerationConfig {
temperature: request.temperature,
max_output_tokens: request.max_tokens,
top_p: None,
top_k: None,
candidate_count: None,
audio_timestamp: request.process_audio,
}),
}
}
}

impl From<VertexAIChatCompletionResponse> for crate::models::video::VideoResponse {
fn from(response: VertexAIChatCompletionResponse) -> Self {
crate::models::video::VideoResponse {
id: uuid::Uuid::new_v4().to_string(),
model: "gemini-1.5-pro".to_string(),
created: chrono::Utc::now().timestamp(),
content: response
.candidates
.first()
.and_then(
|candidate| match &candidate.content.parts.first().map(|p| &p.content) {
Some(PartContent::Text { text }) => Some(text.clone()),
_ => None,
},
)
.unwrap_or_default(),
usage: response
.usage_metadata
.map(|metadata| Usage {
prompt_tokens: metadata.prompt_token_count as u32,
completion_tokens: metadata.candidates_token_count as u32,
total_tokens: metadata.total_token_count as u32,
completion_tokens_details: None,
prompt_tokens_details: None,
})
.unwrap_or_default(),
}
}
}
22 changes: 21 additions & 1 deletion src/providers/vertexai/provider.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::models::{
VertexAIAudioRequest, VertexAIAudioResponse, VertexAIChatCompletionRequest,
VertexAIChatCompletionResponse, VertexAIEmbeddingsRequest, VertexAIEmbeddingsResponse,
VertexAIImageRequest, VertexAIStreamChunk,
VertexAIImageRequest, VertexAIStreamChunk, VertexAIVideoRequest,
};
use crate::config::constants::stream_buffer_size_bytes;
use crate::config::models::{ModelConfig, Provider as ProviderConfig};
Expand All @@ -10,6 +10,7 @@ use crate::models::chat::{ChatCompletionRequest, ChatCompletionResponse};
use crate::models::completion::{CompletionRequest, CompletionResponse};
use crate::models::embeddings::{EmbeddingsRequest, EmbeddingsResponse};
use crate::models::image::{ImageRequest, ImageResponse};
use crate::models::video::{VideoRequest, VideoResponse};
use crate::providers::provider::Provider;
use axum::async_trait;
use axum::http::StatusCode;
Expand Down Expand Up @@ -287,4 +288,23 @@ impl Provider for VertexAIProvider {
.await?;
Ok(response.into())
}

async fn process_video(
&self,
payload: VideoRequest,
_model_config: &ModelConfig,
) -> Result<VideoResponse, StatusCode> {
let token = self.get_token().await?;
let request: VertexAIVideoRequest = payload.clone().into();

let url = self
.construct_vertex_url(&payload.model, "generateContent")
.await?;

let response: VertexAIChatCompletionResponse = self
.make_vertex_api_call(url, token, json!(request))
.await?;

Ok(response.into())
}
}
2 changes: 2 additions & 0 deletions src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub fn create_router(state: Arc<AppState>) -> Router {
return *pipeline_idxs.get("embeddings").unwrap_or(&0);
} else if path.contains("/image") {
return *pipeline_idxs.get("image").unwrap_or(&0);
} else if path.contains("/video") {
return *pipeline_idxs.get("video").unwrap_or(&0);
}

// Fall back to header-based routing if needed
Expand Down

0 comments on commit c688574

Please sign in to comment.