From 706d32b8fe09e9ae4aeb00a3dc04d662e0541f34 Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sat, 22 Feb 2025 10:50:55 -0800 Subject: [PATCH] Add separate stream for jobs (#58) * Restructure workers * Add separate stream for events v jobs * Fix descriptions --- src/api/app_state.rs | 19 ++-- src/api/twitch/eventsub/callback.rs | 4 +- src/bin/down.rs | 11 ++- src/bin/job_runner.rs | 11 +-- src/bin/stream_listener.rs | 14 +-- src/bin/up.rs | 29 +++--- src/error/mod.rs | 2 +- src/error/queue.rs | 6 ++ src/workers/events.rs | 28 ------ src/workers/{runner => events}/chat.rs | 18 ---- src/workers/events/mod.rs | 35 ++++++++ src/workers/events/stream.rs | 98 +++++++++++++++++++++ src/workers/mod.rs | 10 ++- src/workers/{runner => }/nats.rs | 0 src/workers/{runner => queue}/hls_stream.rs | 0 src/workers/{runner => queue}/legacy.rs | 0 src/workers/{runner => queue}/mod.rs | 9 +- src/workers/{runner => queue}/queue.rs | 19 ++-- 18 files changed, 208 insertions(+), 105 deletions(-) delete mode 100644 src/workers/events.rs rename src/workers/{runner => events}/chat.rs (84%) create mode 100644 src/workers/events/mod.rs create mode 100644 src/workers/events/stream.rs rename src/workers/{runner => }/nats.rs (100%) rename src/workers/{runner => queue}/hls_stream.rs (100%) rename src/workers/{runner => queue}/legacy.rs (100%) rename src/workers/{runner => queue}/mod.rs (83%) rename src/workers/{runner => queue}/queue.rs (83%) diff --git a/src/api/app_state.rs b/src/api/app_state.rs index f02947d..161310d 100644 --- a/src/api/app_state.rs +++ b/src/api/app_state.rs @@ -2,14 +2,16 @@ use super::config::Config; use crate::{ db::connect_to_database, storage::s3::create_s3_client, - workers::{self, events::PRIMARY_STREAM}, + workers::events::Stream, + workers::{create_nats_client, Queue}, }; use sqlx::PgPool; /// Shared state available to the API pub struct AppState { pub db: PgPool, - pub job_queue: workers::Queue, + pub job_queue: Queue, + pub event_stream: Stream, pub config: Config, pub s3_client: aws_sdk_s3::Client, } @@ -24,17 +26,24 @@ impl AppState { // Create the S3 Client let s3_client = create_s3_client().await; + // Create a NATS client + let nats_client = create_nats_client().await?; + // Connect to the job queue - let nats_client = workers::create_nats_client().await?; - let jq_name = PRIMARY_STREAM.to_string(); - let job_queue = workers::Queue::connect(jq_name, nats_client) + let job_queue = Queue::connect(nats_client.clone()) .await .expect("Failed to create worker queue"); + // Connect to the event stream + let event_stream = Stream::connect(nats_client.clone()) + .await + .expect("Failed to connect to event stream"); + Ok(Self { config, db, job_queue, + event_stream, s3_client, }) } diff --git a/src/api/twitch/eventsub/callback.rs b/src/api/twitch/eventsub/callback.rs index c4430e8..d64f73f 100644 --- a/src/api/twitch/eventsub/callback.rs +++ b/src/api/twitch/eventsub/callback.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use crate::{ api::{app_state::AppState, routes::auth::oauth::twitch::TwitchCredentials}, - workers::{events::Event, runner::chat::ChatMessagePayload}, + workers::events::{chat::ChatMessagePayload, Event}, }; type HmacSha256 = Hmac; @@ -117,7 +117,7 @@ pub async fn handle_webhook( // Get the subject let subject = Event::from(message_payload).get_subject(); state - .job_queue + .event_stream .publish( subject.to_string(), raw_payload.to_string(), // Pass the original payload so we can skip serialization diff --git a/src/bin/down.rs b/src/bin/down.rs index 607292e..a847637 100644 --- a/src/bin/down.rs +++ b/src/bin/down.rs @@ -1,5 +1,8 @@ use anyhow::Result; -use farmhand::{db, workers}; +use farmhand::{ + db, + workers::{create_nats_client, Queue, Stream}, +}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[tokio::main] @@ -27,9 +30,9 @@ async fn main() -> Result<()> { // Delete all streams tracing::debug!("Deleting all streams"); - let nats_client = workers::create_nats_client().await?; - let jq_name = "FARMHAND_JOBS".to_string(); - workers::Queue::delete(jq_name, nats_client).await?; + let nats_client = create_nats_client().await?; + Queue::delete(nats_client.clone()).await?; + Stream::delete(nats_client.clone()).await?; tracing::info!("Successfully deleted all streams"); Ok(()) diff --git a/src/bin/job_runner.rs b/src/bin/job_runner.rs index aac1dd3..3f7882b 100644 --- a/src/bin/job_runner.rs +++ b/src/bin/job_runner.rs @@ -1,10 +1,6 @@ use anyhow::Result; use async_nats::jetstream::AckKind; -use farmhand::workers::{ - self, - events::{EVENT_PREFIX, PRIMARY_STREAM}, - runner::process_message, -}; +use farmhand::workers::{self, events::MESSAGE_PREFIX, queue::process_message}; use futures::StreamExt; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -19,14 +15,13 @@ async fn main() -> Result<()> { // Connect to the stream tracing::debug!("Connecting to NATS server"); let nats_client = workers::create_nats_client().await?; - let jq_name = PRIMARY_STREAM.to_string(); tracing::debug!("Connecting to queue"); - let queue = workers::Queue::connect(jq_name, nats_client) + let queue = workers::Queue::connect(nats_client) .await .expect("Failed to create worker queue"); // Get all jobs from the stream - let subject = format!("{}.jobs.>", EVENT_PREFIX); // All farmhand jobs + let subject = format!("{}.jobs.>", MESSAGE_PREFIX); // All farmhand jobs // TODO: Make this ID dynamic so we can run more than one runner at a time // Make sure not too make it too dynamic, as they are intended to be re-used diff --git a/src/bin/stream_listener.rs b/src/bin/stream_listener.rs index 1db906e..0c4cb5b 100644 --- a/src/bin/stream_listener.rs +++ b/src/bin/stream_listener.rs @@ -2,10 +2,7 @@ //! It intentionally does not do anything else use anyhow::Result; -use farmhand::workers::{ - self, - events::{EVENT_PREFIX, PRIMARY_STREAM}, -}; +use farmhand::workers::{self, events::MESSAGE_PREFIX}; use futures::StreamExt; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -22,19 +19,16 @@ async fn main() -> Result<()> { tracing::debug!("Connecting to NATS server"); let nats_client = workers::create_nats_client().await?; // Setup the Jetstream queue - let jq_name = std::env::args() - .nth(1) - .unwrap_or_else(|| PRIMARY_STREAM.to_string()); - let queue = workers::Queue::connect(jq_name, nats_client) + let listener = workers::Stream::connect(nats_client) .await .expect("Failed to create worker queue"); // Get all events from the stream - let subject = format!("{}.>", EVENT_PREFIX); // All farmhand events + let subject = format!("{}.>", MESSAGE_PREFIX); // All farmhand events let runner_name = "farmhand_listener_1".to_string(); tracing::info!("Listening for events {} on {}", subject, runner_name); // Create the consumer to listen for events - let consumer = queue.create_consumer(Some(runner_name), subject).await?; + let consumer = listener.create_consumer(Some(runner_name), subject).await?; loop { let mut jobs = consumer.fetch().max_messages(20).messages().await?; while let Some(job) = jobs.next().await { diff --git a/src/bin/up.rs b/src/bin/up.rs index d245581..90cc402 100644 --- a/src/bin/up.rs +++ b/src/bin/up.rs @@ -3,7 +3,7 @@ use farmhand::{ db, workers::{ self, - events::{EVENT_PREFIX, PRIMARY_STREAM}, + events::{EVENT_PREFIX, EVENT_STREAM, JOB_PREFIX, JOB_STREAM, MESSAGE_PREFIX}, }, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -53,18 +53,27 @@ async fn init_project_nats() { .await .expect("Failed to connect to NATS"); - // Create the job queue stream - let jq_desc = Some("All Farmhand events".to_string()); - let all_events = format!("{}.>", EVENT_PREFIX); - let jq_subjects = vec![all_events]; - workers::Queue::new( - PRIMARY_STREAM.to_string(), - jq_desc, - jq_subjects, - nats_client, + // Create the event stream + let all_events_subject = format!("{}.{}.>", MESSAGE_PREFIX, EVENT_PREFIX); + workers::Stream::new( + EVENT_STREAM.to_string(), + Some("All Farmhand events".to_string()), + vec![all_events_subject], + nats_client.clone(), ) .await .expect("Failed to create worker queue"); + // Create the job stream + let all_jobs_subject = format!("{}.{}.>", MESSAGE_PREFIX, JOB_PREFIX); + workers::Queue::new( + JOB_STREAM.to_string(), + Some("All Farmhand jobs".to_string()), + vec![all_jobs_subject], + nats_client.clone(), + ) + .await + .expect("Failed to create job queue"); + tracing::info!("Successfully initialized NATS worker queue"); } diff --git a/src/error/mod.rs b/src/error/mod.rs index 2a7b988..38b99f1 100644 --- a/src/error/mod.rs +++ b/src/error/mod.rs @@ -1,3 +1,3 @@ pub mod queue; -pub use queue::QueueError; +pub use queue::{QueueError, StreamError}; diff --git a/src/error/queue.rs b/src/error/queue.rs index fc1505c..ce43e37 100644 --- a/src/error/queue.rs +++ b/src/error/queue.rs @@ -5,3 +5,9 @@ pub enum QueueError { #[error("Invalid Connection: {0}")] InvalidConnection(String), } + +#[derive(Error, Debug)] +pub enum StreamError { + #[error("Invalid Connection: {0}")] + InvalidConnection(String), +} diff --git a/src/workers/events.rs b/src/workers/events.rs deleted file mode 100644 index 437a849..0000000 --- a/src/workers/events.rs +++ /dev/null @@ -1,28 +0,0 @@ -use super::runner::chat::ChatMessagePayload; - -/// Represents events we send and receive from NATS -/// Primarily used to get the appropriate subject name for an event -pub enum Event { - ChatMessage(ChatMessagePayload), -} - -pub const EVENT_PREFIX: &str = "farmhand"; -pub const PRIMARY_STREAM: &str = "FARMHAND_EVENTS"; - -impl Event { - pub fn get_subject(&self) -> String { - match self { - // twitch.events.{broadcaster_name}.chat_message - Event::ChatMessage(payload) => format!( - "{}.twitch.events.{}.chat_message", - EVENT_PREFIX, payload.broadcaster_user_name - ), - } - } -} - -impl From for Event { - fn from(payload: ChatMessagePayload) -> Self { - Event::ChatMessage(payload) - } -} diff --git a/src/workers/runner/chat.rs b/src/workers/events/chat.rs similarity index 84% rename from src/workers/runner/chat.rs rename to src/workers/events/chat.rs index 567b5f8..943c284 100644 --- a/src/workers/runner/chat.rs +++ b/src/workers/events/chat.rs @@ -1,8 +1,5 @@ -use anyhow::Result; use serde::{Deserialize, Serialize}; -use super::Runner; - #[derive(Deserialize, Serialize)] pub struct ChatMessagePayload { pub message: Message, @@ -85,18 +82,3 @@ impl ChatMessagePayload { } } } - -/// A runner for processing chat messages -pub struct ChatMessageRunner; - -impl Runner for ChatMessageRunner { - type Payload = ChatMessagePayload; - - async fn process_job(&self, payload: Self::Payload) -> Result<()> { - tracing::debug!( - "Processing job with runner ChatMessageRunner for broadcaster: {broadcaster}", - broadcaster = payload.broadcaster_user_name - ); - Ok(()) - } -} diff --git a/src/workers/events/mod.rs b/src/workers/events/mod.rs new file mode 100644 index 0000000..3ebd482 --- /dev/null +++ b/src/workers/events/mod.rs @@ -0,0 +1,35 @@ +use chat::ChatMessagePayload; + +pub mod chat; +pub mod stream; +pub use stream::Stream; + +/// Represents events we send and receive from NATS +/// Primarily used to get the appropriate subject name for an event +pub enum Event { + ChatMessage(ChatMessagePayload), +} + +pub const MESSAGE_PREFIX: &str = "farmhand"; +pub const EVENT_PREFIX: &str = "events"; +pub const JOB_PREFIX: &str = "jobs"; +pub const EVENT_STREAM: &str = "FARMHAND_EVENTS"; +pub const JOB_STREAM: &str = "FARMHAND_JOBS"; + +impl Event { + pub fn get_subject(&self) -> String { + match self { + // farmhand.events.twitch.{broadcaster_name}.chat_message + Event::ChatMessage(payload) => format!( + "{}.{}.twitch.events.{}.chat_message", + MESSAGE_PREFIX, EVENT_PREFIX, payload.broadcaster_user_name + ), + } + } +} + +impl From for Event { + fn from(payload: ChatMessagePayload) -> Self { + Event::ChatMessage(payload) + } +} diff --git a/src/workers/events/stream.rs b/src/workers/events/stream.rs new file mode 100644 index 0000000..bb92702 --- /dev/null +++ b/src/workers/events/stream.rs @@ -0,0 +1,98 @@ +use async_nats::{ + jetstream::{ + self, + consumer::{pull::Config, Consumer}, + Context, + }, + Client, +}; + +use crate::{error::StreamError, workers::EVENT_STREAM}; + +#[allow(dead_code)] +/// TODO: Remove dead code annotation after implementing +pub struct Stream { + name: String, + jetstream: Context, +} + +impl Stream { + /// Connects to an existing queue + pub async fn connect(nats_client: Client) -> Result { + let jetstream = Self::create_jetstream(nats_client); + jetstream + .get_stream(EVENT_STREAM) + .await + .map_err(|e| StreamError::InvalidConnection(e.to_string()))?; + Ok(Stream { + name: EVENT_STREAM.to_string(), + jetstream, + }) + } + /// Creates a new queue + pub async fn new( + name: String, + description: Option, + subjects: Vec, + nats_client: Client, + ) -> Result { + let jetstream = Self::create_jetstream(nats_client); + jetstream + .create_stream(jetstream::stream::Config { + name: name.clone(), + subjects, + description, + max_bytes: 1024 * 1024 * 1024, // 1GB + ..Default::default() + }) + .await + .map_err(|e| StreamError::InvalidConnection(e.to_string()))?; + Ok(Stream { name, jetstream }) + } + /// Deletes the stream + pub async fn delete(nats_client: Client) -> Result<(), StreamError> { + let jetstream = Self::create_jetstream(nats_client); + + // Check if stream exists first + if jetstream.get_stream(EVENT_STREAM).await.is_ok() { + jetstream + .delete_stream(EVENT_STREAM) + .await + .map_err(|e| StreamError::InvalidConnection(e.to_string()))?; + } else { + tracing::warn!("Stream {} does not exist", EVENT_STREAM); + } + Ok(()) + } + /// Creates a new jetstream context + fn create_jetstream(nats_client: Client) -> Context { + jetstream::new(nats_client) + } + /// Creates a new consumer + pub async fn create_consumer( + &self, + name: Option, + filter: String, + ) -> Result, StreamError> { + let config = jetstream::consumer::pull::Config { + durable_name: name, + filter_subject: filter, + max_deliver: 3, + ..Default::default() + }; + self.jetstream + .create_consumer_on_stream(config, self.name.to_string()) + .await + .map_err(|e| StreamError::InvalidConnection(e.to_string())) + } + /// Publishes a message to the queue + pub async fn publish(&self, subject: String, message: String) -> Result<(), StreamError> { + tracing::debug!("Publishing message to subject {}", subject); + self.jetstream + .publish(subject, message.into()) + .await + .map_err(|e| StreamError::InvalidConnection(e.to_string()))?; + + Ok(()) + } +} diff --git a/src/workers/mod.rs b/src/workers/mod.rs index 4c05f01..10761d1 100644 --- a/src/workers/mod.rs +++ b/src/workers/mod.rs @@ -1,5 +1,9 @@ pub mod events; -pub mod runner; +pub mod nats; +pub mod queue; -pub use runner::Queue; -pub use runner::{create_nats_client, get_nats_url}; +pub use nats::{create_nats_client, get_nats_url}; + +pub use events::Stream; +pub use events::{EVENT_STREAM, JOB_STREAM, MESSAGE_PREFIX}; +pub use queue::Queue; diff --git a/src/workers/runner/nats.rs b/src/workers/nats.rs similarity index 100% rename from src/workers/runner/nats.rs rename to src/workers/nats.rs diff --git a/src/workers/runner/hls_stream.rs b/src/workers/queue/hls_stream.rs similarity index 100% rename from src/workers/runner/hls_stream.rs rename to src/workers/queue/hls_stream.rs diff --git a/src/workers/runner/legacy.rs b/src/workers/queue/legacy.rs similarity index 100% rename from src/workers/runner/legacy.rs rename to src/workers/queue/legacy.rs diff --git a/src/workers/runner/mod.rs b/src/workers/queue/mod.rs similarity index 83% rename from src/workers/runner/mod.rs rename to src/workers/queue/mod.rs index 3fe37d4..298f186 100644 --- a/src/workers/runner/mod.rs +++ b/src/workers/queue/mod.rs @@ -1,13 +1,9 @@ -pub mod chat; pub mod hls_stream; -pub mod nats; pub mod queue; use anyhow::Result; use async_nats::Message; -use chat::ChatMessageRunner; use hls_stream::HlsStreamRunner; -pub use nats::{create_nats_client, get_nats_url}; pub use queue::Queue; use serde::de::DeserializeOwned; @@ -39,7 +35,6 @@ pub(crate) trait Runner: Send + Sync + 'static { /// Represents the different types of runners that can be used in the application pub enum RunnerType { - SaveChat(ChatMessageRunner), TransformVideo(HlsStreamRunner), } @@ -48,15 +43,13 @@ impl RunnerType { pub fn from_subject(subject: &str) -> Result { tracing::debug!("Creating runner for subject: {}", subject); match subject { - "farmhand_jobs.video.to_stream" => Ok(RunnerType::TransformVideo(HlsStreamRunner)), - "farmhand_jobs.chat.save" => Ok(RunnerType::SaveChat(ChatMessageRunner)), + "farmhand.jobs.video_to_stream" => Ok(RunnerType::TransformVideo(HlsStreamRunner)), _ => Err(anyhow::anyhow!("{} has no runner associated", subject)), } } /// Method to run the appropriate runner pub async fn run(&self, message: &Message) -> Result<()> { match self { - RunnerType::SaveChat(runner) => runner.run(message).await, RunnerType::TransformVideo(runner) => runner.run(message).await, } } diff --git a/src/workers/runner/queue.rs b/src/workers/queue/queue.rs similarity index 83% rename from src/workers/runner/queue.rs rename to src/workers/queue/queue.rs index 93ee34e..7de9f25 100644 --- a/src/workers/runner/queue.rs +++ b/src/workers/queue/queue.rs @@ -8,7 +8,7 @@ use async_nats::{ Client, }; -use crate::error::QueueError; +use crate::{error::QueueError, workers::JOB_STREAM}; #[allow(dead_code)] /// TODO: Remove dead code annotation after implementing @@ -19,13 +19,16 @@ pub struct Queue { impl Queue { /// Connects to an existing queue - pub async fn connect(name: String, nats_client: Client) -> Result { + pub async fn connect(nats_client: Client) -> Result { let jetstream = Self::create_jetstream(nats_client); jetstream - .get_stream(&name) + .get_stream(JOB_STREAM) .await .map_err(|e| QueueError::InvalidConnection(e.to_string()))?; - Ok(Queue { name, jetstream }) + Ok(Queue { + name: JOB_STREAM.to_string(), + jetstream, + }) } /// Creates a new queue pub async fn new( @@ -48,17 +51,17 @@ impl Queue { Ok(Queue { name, jetstream }) } /// Deletes the queue - pub async fn delete(name: String, nats_client: Client) -> Result<(), QueueError> { + pub async fn delete(nats_client: Client) -> Result<(), QueueError> { let jetstream = Self::create_jetstream(nats_client); // Check if stream exists first - if jetstream.get_stream(&name).await.is_ok() { + if jetstream.get_stream(JOB_STREAM).await.is_ok() { jetstream - .delete_stream(&name) + .delete_stream(JOB_STREAM) .await .map_err(|e| QueueError::InvalidConnection(e.to_string()))?; } else { - tracing::warn!("Stream {} does not exist", name); + tracing::warn!("Stream {} does not exist", JOB_STREAM); } Ok(()) }