Skip to content

Commit

Permalink
Add compression migrations, fix raw_video_path, fix job
Browse files Browse the repository at this point in the history
  • Loading branch information
sneakycrow committed Nov 24, 2024
1 parent e7f9c64 commit a5f1123
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 68 deletions.
2 changes: 1 addition & 1 deletion packages/db/migrations/20241101164200_add_videos.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE TABLE videos (
id TEXT PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(100) NOT NULL,
raw_video_path VARCHAR(255) NOT NULL,
raw_video_path VARCHAR(255),
processed_video_path VARCHAR(255),
processing_status processing_status NOT NULL DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Remove compression-related columns
ALTER TABLE videos
DROP COLUMN compression_status,
DROP COLUMN compressed_video_path;

-- Drop the compression status enum type
DROP TYPE compression_status;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Create compression status enum type
CREATE TYPE compression_status AS ENUM ('pending', 'compressing', 'completed', 'failed');

-- Add compression-related columns
ALTER TABLE videos
ADD COLUMN compression_status compression_status NOT NULL DEFAULT 'pending',
ADD COLUMN compressed_video_path VARCHAR(255);
19 changes: 15 additions & 4 deletions packages/queue/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub trait Queue: Send + Sync + Debug {
async fn push(
&self,
job: Message,
status: PostgresJobStatus,
scheduled_for: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<(), Error>;
/// pull fetches at most `number_of_jobs` from the queue.
Expand Down Expand Up @@ -61,18 +62,23 @@ impl Queue for PostgresQueue {
async fn push(
&self,
job: Message,
status: PostgresJobStatus,
date: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<(), Error> {
let scheduled_for = date.unwrap_or(chrono::Utc::now());
let failed_attempts: i32 = 0;
let message = Json(job);
let status = PostgresJobStatus::Queued;
let now = chrono::Utc::now();
let job_id: Uuid = Ulid::new().into();
let query = "INSERT INTO queue
(id, created_at, updated_at, scheduled_for, failed_attempts, status, message)
VALUES ($1, $2, $3, $4, $5, $6, $7)";

tracing::debug!(
"Adding job to queue with id: {}, status: {:?}, scheduled_for: {}",
job_id,
status,
scheduled_for
);
sqlx::query(query)
.bind(job_id)
.bind(now)
Expand All @@ -94,6 +100,10 @@ impl Queue for PostgresQueue {
}

async fn fail_job(&self, job_id: Uuid) -> Result<(), Error> {
tracing::debug!(
"Failing job with id: {}, attempting to update status and increment failed attempts",
job_id
);
let now = chrono::Utc::now();

// First get the current failed_attempts count
Expand Down Expand Up @@ -136,7 +146,7 @@ impl Queue for PostgresQueue {
WHERE id IN (
SELECT id
FROM queue
WHERE status = $3 AND scheduled_for <= $4 AND failed_attempts < $5
WHERE status = ANY($3) AND scheduled_for <= $4 AND failed_attempts < $5
ORDER BY scheduled_for
FOR UPDATE SKIP LOCKED
LIMIT $6
Expand All @@ -146,12 +156,13 @@ impl Queue for PostgresQueue {
let jobs: Vec<PostgresJob> = sqlx::query_as::<_, PostgresJob>(query)
.bind(PostgresJobStatus::Running)
.bind(now)
.bind(PostgresJobStatus::Queued)
.bind(vec![PostgresJobStatus::Queued])
.bind(now)
.bind(self.max_attempts as i32)
.bind(number_of_jobs)
.fetch_all(&self.db)
.await?;

Ok(jobs.into_iter().map(Into::into).collect())
}

Expand Down
171 changes: 113 additions & 58 deletions packages/queue/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::error::Error;
use crate::job::PostgresJobStatus;
use crate::queue::{Job, Message, Queue};
use db::Video;
use futures::{stream, StreamExt};
Expand Down Expand Up @@ -34,7 +35,7 @@ pub async fn run_worker(queue: Arc<dyn Queue>, concurrency: usize, db_conn: &Poo
tracing::debug!("Starting job {}", job.id);
let job_id = job.id;

let res = match handle_job(job, db_conn).await {
let res = match handle_job(queue.clone(), job, db_conn).await {
Ok(_) => queue.delete_job(job_id).await,
Err(err) => {
tracing::error!("run_worker: handling job({}): {}", job_id, &err);
Expand All @@ -53,7 +54,8 @@ pub async fn run_worker(queue: Arc<dyn Queue>, concurrency: usize, db_conn: &Poo
}

/// Individually processes a single job, based on its Job message type
async fn handle_job(job: Job, db: &Pool<Postgres>) -> Result<(), Error> {
async fn handle_job(queue: Arc<dyn Queue>, job: Job, db: &Pool<Postgres>) -> Result<(), Error> {
tracing::debug!("Got job of type {:?}", &job.message);
match job.message {
Message::ProcessRawVideoIntoStream { video_id } => {
tracing::info!("Start video processing for video_id {video_id}");
Expand Down Expand Up @@ -111,81 +113,134 @@ async fn handle_job(job: Job, db: &Pool<Postgres>) -> Result<(), Error> {
.execute(db)
.await?;

tracing::info!("Successfully processed video {}", &video_id);
// After completed, queue up a Compress Raw Video job
let scheduled_time = chrono::Utc::now() + chrono::Duration::days(7);
queue
.push(
Message::CompressRawVideo {
video_id: video_id.clone(),
},
PostgresJobStatus::Queued,
// Some(scheduled_time), // TODO: Uncomment after testing
None,
)
.await?;

tracing::info!(
"Successfully processed video {} and queued compression job",
&video_id
);
}
Message::CompressRawVideo { video_id } => {
tracing::info!("Start video compression for video_id {video_id}");

// Update video compression status
// Update video compression status to compressing
sqlx::query(
"UPDATE videos SET compression_status = 'compressing', updated_at = NOW() WHERE id = $1"
)
.bind(&video_id)
.execute(db)
.await?;

// Get video details
let video = sqlx::query_as::<_, Video>("SELECT * FROM videos WHERE id = $1")
.bind(&video_id)
.fetch_one(db)
.await?;
"UPDATE videos SET compression_status = 'compressing', updated_at = NOW() WHERE id = $1"
)
.bind(&video_id)
.execute(db)
.await?;

// Create output directory if it doesn't exist
let videos_dir = PathBuf::from(get_videos_dir());
fs::create_dir_all(&videos_dir)
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;
// Wrap the compression logic in a result to handle failures
let compression_result = async {
// Get video details
let video = sqlx::query_as::<_, Video>("SELECT * FROM videos WHERE id = $1")
.bind(&video_id)
.fetch_one(db)
.await?;

let zip_path = videos_dir.join(format!("{}_raw.zip", video_id));
let mut zip = ZipWriter::new(
fs::File::create(&zip_path)
.map_err(|e| Error::VideoProcessingError(e.to_string()))?,
);
// Create video-specific directory if it doesn't exist
let videos_dir = PathBuf::from(get_videos_dir());
let video_dir = videos_dir.join(&video_id.to_string());
fs::create_dir_all(&video_dir)
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;

let raw_video_path = PathBuf::from(&video.raw_video_path);
let file_name = raw_video_path
.file_name()
.ok_or_else(|| Error::VideoProcessingError("Invalid raw video path".to_string()))?
.to_string_lossy()
.into_owned();
let zip_path = video_dir.join("raw.zip");
let mut zip = ZipWriter::new(
fs::File::create(&zip_path)
.map_err(|e| Error::VideoProcessingError(e.to_string()))?,
);

zip.start_file(&file_name, FileOptions::default())
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;
let raw_video_path = PathBuf::from(&video.raw_video_path);
let file_name = raw_video_path
.file_name()
.ok_or_else(|| {
Error::VideoProcessingError("Invalid raw video path".to_string())
})?
.to_string_lossy()
.into_owned();

// Read the file in chunks
let mut file = File::open(&raw_video_path)
.await
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;
let mut buffer = vec![0; 1024 * 1024]; // 1MB chunks
zip.start_file(&file_name, FileOptions::default())
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;

loop {
let n = file
.read(&mut buffer)
// Read the file in chunks
let mut file = File::open(&raw_video_path)
.await
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;
if n == 0 {
break;
let mut buffer = vec![0; 1024 * 1024]; // 1MB chunks

loop {
let n = file
.read(&mut buffer)
.await
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;
if n == 0 {
break;
}
zip.write_all(&buffer[..n])
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;
}
zip.write_all(&buffer[..n])

zip.finish()
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;

// Close file handle before trying to remove
drop(file);

// Remove the original raw video file
tokio::fs::remove_file(&raw_video_path).await.map_err(|e| {
Error::VideoProcessingError(format!("Failed to remove raw video: {}", e))
})?;

Ok::<PathBuf, Error>(zip_path)
}
.await;

zip.finish()
.map_err(|e| Error::VideoProcessingError(e.to_string()))?;
match compression_result {
Ok(zip_path) => {
// Update the video record with success status and compressed file path
sqlx::query(
"UPDATE videos SET
compression_status = 'completed',
compressed_video_path = $1,
raw_video_path = NULL,
updated_at = NOW()
WHERE id = $2",
)
.bind(zip_path.to_str().unwrap())
.bind(&video_id)
.execute(db)
.await?;

// Update the video record with the compressed file path
sqlx::query(
"UPDATE videos SET
compression_status = 'completed',
compressed_video_path = $1,
updated_at = NOW()
WHERE id = $2",
)
.bind(zip_path.to_str().unwrap())
.bind(&video_id)
.execute(db)
.await?;
tracing::info!("Successfully compressed video {}", &video_id);
}
Err(err) => {
// Update the video record with failed status
sqlx::query(
"UPDATE videos SET
compression_status = 'failed',
updated_at = NOW()
WHERE id = $1",
)
.bind(&video_id)
.execute(db)
.await?;

tracing::info!("Successfully compressed video {}", &video_id);
tracing::error!("Failed to compress video {}: {}", &video_id, err);
return Err(err);
}
}
}
_ => tracing::warn!("Unhandled job message passed"),
}
Expand Down
2 changes: 1 addition & 1 deletion services/forge-queue/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "forge=debug,queue=debug,db=debug".into()),
.unwrap_or_else(|_| "forge=debug,queue=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
Expand Down
4 changes: 0 additions & 4 deletions services/silo-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ async fn main() {
.expect("Could not connect to database");
// Initialize the queue
let queue = Arc::new(PostgresQueue::new(db.clone()));
// Run migrations
let _mig = db::run_migrations(&db)
.await
.expect("Could not run database migrations");
// Store shared data as state between routes
let state = Arc::new(AppState { db, config, queue });
routes::upload::init_cleanup().await;
Expand Down
1 change: 1 addition & 0 deletions services/silo-api/src/routes/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ pub async fn upload_video(
.queue
.push(
process_video_message,
queue::job::PostgresJobStatus::Queued,
None, // Schedule for immediate processing
)
.await
Expand Down

0 comments on commit a5f1123

Please sign in to comment.