From a5f11239c424ce00d49e1759b5a1168216725d28 Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sun, 24 Nov 2024 15:28:30 -0800 Subject: [PATCH] Add compression migrations, fix raw_video_path, fix job --- .../20241101164200_add_videos.up.sql | 2 +- .../20241124231804_compresseion_cols.down.sql | 7 + .../20241124231804_compresseion_cols.up.sql | 7 + packages/queue/src/queue.rs | 19 +- packages/queue/src/runner.rs | 171 ++++++++++++------ services/forge-queue/src/main.rs | 2 +- services/silo-api/src/main.rs | 4 - services/silo-api/src/routes/upload.rs | 1 + 8 files changed, 145 insertions(+), 68 deletions(-) create mode 100644 packages/db/migrations/20241124231804_compresseion_cols.down.sql create mode 100644 packages/db/migrations/20241124231804_compresseion_cols.up.sql diff --git a/packages/db/migrations/20241101164200_add_videos.up.sql b/packages/db/migrations/20241101164200_add_videos.up.sql index 585464b..6e6f171 100644 --- a/packages/db/migrations/20241101164200_add_videos.up.sql +++ b/packages/db/migrations/20241101164200_add_videos.up.sql @@ -4,7 +4,7 @@ CREATE TABLE videos ( id TEXT PRIMARY KEY, user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, title VARCHAR(100) NOT NULL, - raw_video_path VARCHAR(255) NOT NULL, + raw_video_path VARCHAR(255), processed_video_path VARCHAR(255), processing_status processing_status NOT NULL DEFAULT 'pending', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, diff --git a/packages/db/migrations/20241124231804_compresseion_cols.down.sql b/packages/db/migrations/20241124231804_compresseion_cols.down.sql new file mode 100644 index 0000000..282db23 --- /dev/null +++ b/packages/db/migrations/20241124231804_compresseion_cols.down.sql @@ -0,0 +1,7 @@ +-- Remove compression-related columns +ALTER TABLE videos + DROP COLUMN compression_status, + DROP COLUMN compressed_video_path; + +-- Drop the compression status enum type +DROP TYPE compression_status; diff --git a/packages/db/migrations/20241124231804_compresseion_cols.up.sql b/packages/db/migrations/20241124231804_compresseion_cols.up.sql new file mode 100644 index 0000000..261b0e9 --- /dev/null +++ b/packages/db/migrations/20241124231804_compresseion_cols.up.sql @@ -0,0 +1,7 @@ +-- Create compression status enum type +CREATE TYPE compression_status AS ENUM ('pending', 'compressing', 'completed', 'failed'); + +-- Add compression-related columns +ALTER TABLE videos + ADD COLUMN compression_status compression_status NOT NULL DEFAULT 'pending', + ADD COLUMN compressed_video_path VARCHAR(255); diff --git a/packages/queue/src/queue.rs b/packages/queue/src/queue.rs index 2fa39df..f672040 100644 --- a/packages/queue/src/queue.rs +++ b/packages/queue/src/queue.rs @@ -12,6 +12,7 @@ pub trait Queue: Send + Sync + Debug { async fn push( &self, job: Message, + status: PostgresJobStatus, scheduled_for: Option>, ) -> Result<(), Error>; /// pull fetches at most `number_of_jobs` from the queue. @@ -61,18 +62,23 @@ impl Queue for PostgresQueue { async fn push( &self, job: Message, + status: PostgresJobStatus, date: Option>, ) -> Result<(), Error> { let scheduled_for = date.unwrap_or(chrono::Utc::now()); let failed_attempts: i32 = 0; let message = Json(job); - let status = PostgresJobStatus::Queued; let now = chrono::Utc::now(); let job_id: Uuid = Ulid::new().into(); let query = "INSERT INTO queue (id, created_at, updated_at, scheduled_for, failed_attempts, status, message) VALUES ($1, $2, $3, $4, $5, $6, $7)"; - + tracing::debug!( + "Adding job to queue with id: {}, status: {:?}, scheduled_for: {}", + job_id, + status, + scheduled_for + ); sqlx::query(query) .bind(job_id) .bind(now) @@ -94,6 +100,10 @@ impl Queue for PostgresQueue { } async fn fail_job(&self, job_id: Uuid) -> Result<(), Error> { + tracing::debug!( + "Failing job with id: {}, attempting to update status and increment failed attempts", + job_id + ); let now = chrono::Utc::now(); // First get the current failed_attempts count @@ -136,7 +146,7 @@ impl Queue for PostgresQueue { WHERE id IN ( SELECT id FROM queue - WHERE status = $3 AND scheduled_for <= $4 AND failed_attempts < $5 + WHERE status = ANY($3) AND scheduled_for <= $4 AND failed_attempts < $5 ORDER BY scheduled_for FOR UPDATE SKIP LOCKED LIMIT $6 @@ -146,12 +156,13 @@ impl Queue for PostgresQueue { let jobs: Vec = sqlx::query_as::<_, PostgresJob>(query) .bind(PostgresJobStatus::Running) .bind(now) - .bind(PostgresJobStatus::Queued) + .bind(vec![PostgresJobStatus::Queued]) .bind(now) .bind(self.max_attempts as i32) .bind(number_of_jobs) .fetch_all(&self.db) .await?; + Ok(jobs.into_iter().map(Into::into).collect()) } diff --git a/packages/queue/src/runner.rs b/packages/queue/src/runner.rs index e83a1b8..ccbfac8 100644 --- a/packages/queue/src/runner.rs +++ b/packages/queue/src/runner.rs @@ -1,4 +1,5 @@ use crate::error::Error; +use crate::job::PostgresJobStatus; use crate::queue::{Job, Message, Queue}; use db::Video; use futures::{stream, StreamExt}; @@ -34,7 +35,7 @@ pub async fn run_worker(queue: Arc, concurrency: usize, db_conn: &Poo tracing::debug!("Starting job {}", job.id); let job_id = job.id; - let res = match handle_job(job, db_conn).await { + let res = match handle_job(queue.clone(), job, db_conn).await { Ok(_) => queue.delete_job(job_id).await, Err(err) => { tracing::error!("run_worker: handling job({}): {}", job_id, &err); @@ -53,7 +54,8 @@ pub async fn run_worker(queue: Arc, concurrency: usize, db_conn: &Poo } /// Individually processes a single job, based on its Job message type -async fn handle_job(job: Job, db: &Pool) -> Result<(), Error> { +async fn handle_job(queue: Arc, job: Job, db: &Pool) -> Result<(), Error> { + tracing::debug!("Got job of type {:?}", &job.message); match job.message { Message::ProcessRawVideoIntoStream { video_id } => { tracing::info!("Start video processing for video_id {video_id}"); @@ -111,81 +113,134 @@ async fn handle_job(job: Job, db: &Pool) -> Result<(), Error> { .execute(db) .await?; - tracing::info!("Successfully processed video {}", &video_id); + // After completed, queue up a Compress Raw Video job + let scheduled_time = chrono::Utc::now() + chrono::Duration::days(7); + queue + .push( + Message::CompressRawVideo { + video_id: video_id.clone(), + }, + PostgresJobStatus::Queued, + // Some(scheduled_time), // TODO: Uncomment after testing + None, + ) + .await?; + + tracing::info!( + "Successfully processed video {} and queued compression job", + &video_id + ); } Message::CompressRawVideo { video_id } => { tracing::info!("Start video compression for video_id {video_id}"); - // Update video compression status + // Update video compression status to compressing sqlx::query( - "UPDATE videos SET compression_status = 'compressing', updated_at = NOW() WHERE id = $1" - ) - .bind(&video_id) - .execute(db) - .await?; - - // Get video details - let video = sqlx::query_as::<_, Video>("SELECT * FROM videos WHERE id = $1") - .bind(&video_id) - .fetch_one(db) - .await?; + "UPDATE videos SET compression_status = 'compressing', updated_at = NOW() WHERE id = $1" + ) + .bind(&video_id) + .execute(db) + .await?; - // Create output directory if it doesn't exist - let videos_dir = PathBuf::from(get_videos_dir()); - fs::create_dir_all(&videos_dir) - .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + // Wrap the compression logic in a result to handle failures + let compression_result = async { + // Get video details + let video = sqlx::query_as::<_, Video>("SELECT * FROM videos WHERE id = $1") + .bind(&video_id) + .fetch_one(db) + .await?; - let zip_path = videos_dir.join(format!("{}_raw.zip", video_id)); - let mut zip = ZipWriter::new( - fs::File::create(&zip_path) - .map_err(|e| Error::VideoProcessingError(e.to_string()))?, - ); + // Create video-specific directory if it doesn't exist + let videos_dir = PathBuf::from(get_videos_dir()); + let video_dir = videos_dir.join(&video_id.to_string()); + fs::create_dir_all(&video_dir) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; - let raw_video_path = PathBuf::from(&video.raw_video_path); - let file_name = raw_video_path - .file_name() - .ok_or_else(|| Error::VideoProcessingError("Invalid raw video path".to_string()))? - .to_string_lossy() - .into_owned(); + let zip_path = video_dir.join("raw.zip"); + let mut zip = ZipWriter::new( + fs::File::create(&zip_path) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?, + ); - zip.start_file(&file_name, FileOptions::default()) - .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + let raw_video_path = PathBuf::from(&video.raw_video_path); + let file_name = raw_video_path + .file_name() + .ok_or_else(|| { + Error::VideoProcessingError("Invalid raw video path".to_string()) + })? + .to_string_lossy() + .into_owned(); - // Read the file in chunks - let mut file = File::open(&raw_video_path) - .await - .map_err(|e| Error::VideoProcessingError(e.to_string()))?; - let mut buffer = vec![0; 1024 * 1024]; // 1MB chunks + zip.start_file(&file_name, FileOptions::default()) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; - loop { - let n = file - .read(&mut buffer) + // Read the file in chunks + let mut file = File::open(&raw_video_path) .await .map_err(|e| Error::VideoProcessingError(e.to_string()))?; - if n == 0 { - break; + let mut buffer = vec![0; 1024 * 1024]; // 1MB chunks + + loop { + let n = file + .read(&mut buffer) + .await + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + if n == 0 { + break; + } + zip.write_all(&buffer[..n]) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; } - zip.write_all(&buffer[..n]) + + zip.finish() .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + + // Close file handle before trying to remove + drop(file); + + // Remove the original raw video file + tokio::fs::remove_file(&raw_video_path).await.map_err(|e| { + Error::VideoProcessingError(format!("Failed to remove raw video: {}", e)) + })?; + + Ok::(zip_path) } + .await; - zip.finish() - .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + match compression_result { + Ok(zip_path) => { + // Update the video record with success status and compressed file path + sqlx::query( + "UPDATE videos SET + compression_status = 'completed', + compressed_video_path = $1, + raw_video_path = NULL, + updated_at = NOW() + WHERE id = $2", + ) + .bind(zip_path.to_str().unwrap()) + .bind(&video_id) + .execute(db) + .await?; - // Update the video record with the compressed file path - sqlx::query( - "UPDATE videos SET - compression_status = 'completed', - compressed_video_path = $1, - updated_at = NOW() - WHERE id = $2", - ) - .bind(zip_path.to_str().unwrap()) - .bind(&video_id) - .execute(db) - .await?; + tracing::info!("Successfully compressed video {}", &video_id); + } + Err(err) => { + // Update the video record with failed status + sqlx::query( + "UPDATE videos SET + compression_status = 'failed', + updated_at = NOW() + WHERE id = $1", + ) + .bind(&video_id) + .execute(db) + .await?; - tracing::info!("Successfully compressed video {}", &video_id); + tracing::error!("Failed to compress video {}: {}", &video_id, err); + return Err(err); + } + } } _ => tracing::warn!("Unhandled job message passed"), } diff --git a/services/forge-queue/src/main.rs b/services/forge-queue/src/main.rs index 1dc7402..f3b7241 100644 --- a/services/forge-queue/src/main.rs +++ b/services/forge-queue/src/main.rs @@ -9,7 +9,7 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "forge=debug,queue=debug,db=debug".into()), + .unwrap_or_else(|_| "forge=debug,queue=debug".into()), ) .with(tracing_subscriber::fmt::layer()) .init(); diff --git a/services/silo-api/src/main.rs b/services/silo-api/src/main.rs index b007d57..a5da51b 100644 --- a/services/silo-api/src/main.rs +++ b/services/silo-api/src/main.rs @@ -53,10 +53,6 @@ async fn main() { .expect("Could not connect to database"); // Initialize the queue let queue = Arc::new(PostgresQueue::new(db.clone())); - // Run migrations - let _mig = db::run_migrations(&db) - .await - .expect("Could not run database migrations"); // Store shared data as state between routes let state = Arc::new(AppState { db, config, queue }); routes::upload::init_cleanup().await; diff --git a/services/silo-api/src/routes/upload.rs b/services/silo-api/src/routes/upload.rs index c22213c..d556414 100644 --- a/services/silo-api/src/routes/upload.rs +++ b/services/silo-api/src/routes/upload.rs @@ -366,6 +366,7 @@ pub async fn upload_video( .queue .push( process_video_message, + queue::job::PostgresJobStatus::Queued, None, // Schedule for immediate processing ) .await